TL;DR
- Document Intelligenceを使ってPDFをテキストと画像に分離した。
- テキストと画像を別々のデータとして扱わず、画像のリンクをテキストに組み込んでPDFの文脈を維持するようにした。
- テキストと画像を用いた簡単なRAGを作成した。
自己紹介
NTTドコモ データプラットフォーム部(以下DP部)藤平です。 NTTドコモでは様々なサービスで機械学習やLLMを取り入れることでサービス価値の向上を目指しています。 データプラットフォーム部(以下DP部)ではこうした技術の適用を含め、全社におけるデータ活用をミッションとしています。
今回執筆いただいた協働者の鶴薗さんとは、DP部が展開する社内の分析サービスに関するドキュメントやソースコードをリソースとしたRAG(Retrieval Augmented Generation)の詳細検討・実装・運用を一緒に取り組んでおり、SlackBot化して日々改良を続けながら運用しています。
昨年はRAGの評価についての記事を執筆いただきましたが、今回はこれまで扱いづらかったPDFをAzureのDocument Intelligenceサービスを活用してRAGに利用する方法を紹介していただきます。
はじめに
writer : DP部 鶴薗 智博
昨年はこちらの記事を書かせていただきました。様々な良質な記事がある中、ご覧になっていただいた方はありがとうございました。
今回もRAGについての記事ということで、PDFの画像を含めたRAGを紹介させていただきます。
LLMの登場以降、テキストベースのRAGシステムは着実に発展してきました。私たちも、社内のデータ活用サービスに関するGitHubのコードやドキュメントを活用してSlackBotアプリを構築し、現在まで運用しています。
しかし、生成AI関連の技術進歩は目覚ましく、LLMのプロンプトに画像を入力できるようになり、AzureのDocument Intelligenceサービスの登場により、これまで扱いづらかったPDFを容易かつ高精度にテキストと画像に分離できるようになり、より幅広いデータを使用したRAGの実現が可能となりました。
そこで今回は、Document Intelligenceを活用して、社内で広く使用されているPDFファイルや、PDF化されたMicrosoft Officeファイルを用いて、画像も含めて扱えるシンプルなRAGを構築したいと思います。
使用データとアーキテクチャ
- 今回は、昨年のAdvent Calenderの記事をPDF化して、RAGを構築します。
- まずはDocument Intelligenceを用いてPDFをテキストと画像に分離し、それらをFAISS(ベクトルDB)に投入し、質問に関連したドキュメントを検索で抽出した後、それらをLLMに渡して質問に回答させる簡単なRAGを作成します。
実行はAzure Machine Learning StudioのNotebook環境にて行います。
実装はLangchainを使用しています。
- Python : 3.10.11
- 主要ライブラリ
- azure-ai-ml : 1.8.0
- azure-ai-documentintelligence : 1.0.0b4
- langchain : 0.3.2
- langchain_core : 0.3.9
- langchain_community : 0.3.1
- langchain-openai : 0.2.2
- langchain-text-splitters : 0.3.0
Document Intelligenceを用いたデータ準備
やること
Document Intelligenceを使用して、PDFをテキストと画像に分離します。
最終的にPromptに組み込む際には、PDFと同じ文脈のままテキストと画像を入れ込みたいため、
画像ファイルへのリンクをテキストに持たせることで参照できるようにすることを目指します。
実行コード
まずは以下のライブラリをインストールします。
pip install azure-ai-ml pip install azure-ai-documentintelligence pip install pillow PyMuPDF
以下のコードでPDFをDocument Intelligenceで解析し、その結果を得ることができます。
BASE_DIR
は今回のRAGの生成物を配置するベースのディレクトリを作成し指定してください。
PDFファイルを {BASE_DIR}/docs
ディレクトリ配下に配置しておきます。
また、BASE_DIR
に2つのディレクトリ di_results
と images
を作成しておきます。
di_results
: Document Intelligenceの処理で作成した、Markdownファイルimages
: 切り出した画像ファイル
BASE_DIR = './pdf_rag' # RAG用データを配置するベースのディレクトリ pdf_filename = 'イケてるLLMOps.pdf' # 実行対象のPDF endpoint = '<your-endpoint>' # Document Intelligenceのendpoint client_id = os.environ.get('DEFAULT_IDENTITY_CLIENT_ID') credential = ManagedIdentityCredential(client_id=client_id) document_intelligence_client = DocumentIntelligenceClient(endpoint=endpoint, credential=credential) with open(f'{BASE_DIR}/docs/{pdf_filename}', 'rb') as f: poller = document_intelligence_client.begin_analyze_document( model_id = 'prebuilt-layout', analyze_request=f, content_type='application/octet-stream', output_content_format='markdown' ) result = poller.result()
解析結果には以下のようなデータが出力されます。
sections
- 投入したファイルの内容と流れを
paragraphs
とtables
とfigures
を構造的な組み合わせで表現したものです。
- 投入したファイルの内容と流れを
paragraphs
- テキストブロックの段落情報になります。
- 段落ごとに
ID
が振られています。 title
やsectionHeading
のようなrole
も振られています。- 簡単に「PDFの文字列部分」と考えて大丈夫です。
tables
- 表情報になります。
- 表ごとに
ID
が振られています。 - 表領域の
BoundingBox
の情報も持っているため、画像に切り出すことが可能です。
figures
- 画像情報になります。
- 画像ごとに
ID
が振られており、付近のCaption
情報も含まれています。 - 画像領域の
BoundingBox
の情報も持っているため、画像に切り出すことが可能です。
content
- PDFの全内容をMarkdown形式に変換した文字列が格納されています。
- 今回は画像ファイルへのリンクをテキストに持たせたいため、
content
は使用しません。
Document Intelligenceの結果は以下の記事が素晴らしいので、是非ご確認ください。 qiita.com
次に、sections
を用いてドキュメントの要素を順番に処理していきます。
paragraph
であればその中の文字列を順次結合(適宜Header
を付与)figure
やtable
であれば画像に切り出し、その画像ファイルへのリンクを結合
という処理によって1つの大きな文字列を作成していきます。
以下処理コードですが、4つの関数(get_paragraphs()
, crop_image_from_image()
, crop_image_from_pdf_page()
, crop_image_from_file()
)は先ほどの記事からそのまま抜粋しており、get_sections()
は一部修正して使用しています。(全コードは記事の巻末に載せています。)
def get_sections(result): sections = [] for section in result.sections: # 今回は簡単のためsectionによる階層構造を無視するためextendにしています。 sections.extend(section.elements) return sections def make_markdown_from_document_intelligence_result_with_image(result, title): all_markdown = '' # 最終的に出力したいMarkdown形式の文字列 result_paragraphs = get_paragraphs(result) result_sections = get_sections(result) for sec in result_sections: if sec.startswith('/paragraphs/'): if result_paragraphs[sec]['role'] == 'title': # titleならHeader1を付与 all_markdown = all_markdown + '# ' + result_paragraphs[sec]['content'] + '\n' elif result_paragraphs[sec]['role'] == 'sectionHeading': # sectionHeadingならHeader2を付与 all_markdown = all_markdown + '## ' + result_paragraphs[sec]['content'] + '\n' else: all_markdown = all_markdown + result_paragraphs[sec]['content'] if sec.startswith('/figures/'): idx = int(sec.split('/')[-1]) # figureのIDを取得 figure = result.figures[idx] if 'boundingRegions' in figure: for i, br in enumerate(figure['boundingRegions']): page = br['pageNumber'] bbox = br['polygon'] bbox = (bbox[0], bbox[1], bbox[4], bbox[5]) image = crop_image_from_file(pdf_path, page-1, bbox) image_path = f'{BASE_DIR}/images/{title}_figure_{idx}_{i}.png' image.save(image_path) # 切り出した画像へのリンクをMarkdownに追加 all_markdown = all_markdown + f'![figure_{idx}_{i}]({image_path})' if sec.startswith('/tables/'): idx = int(sec.split('/')[-1]) # tableのIDを取得 table = result.tables[idx] if 'boundingRegions' in table: for i, br in enumerate(table['boundingRegions']): page = br['pageNumber'] bbox = br['polygon'] bbox = (bbox[0], bbox[1], bbox[4], bbox[5]) image = crop_image_from_file(pdf_path, page-1, bbox) image_path = f'{BASE_DIR}/images/{title}_table_{idx}_{i}.png' image.save(image_path) # 切り出した画像へのリンクをMarkdownに追加 all_markdown = all_markdown + f'![table_{idx}_{i}]({image_path})' all_markdown = all_markdown + '\n' return all_markdown all_pdf_paths = glob.glob(f'{BASE_DIR}/docs/*.pdf') for pdf_path in tqdm(all_pdf_paths): title = pdf_path.split('/')[-1].split('.')[0] # PDFのタイトルを取得 with open(pdf_path, 'rb') as f: poller = document_intelligence_client.begin_analyze_document( model_id = 'prebuilt-layout', analyze_request=f, content_type='application/octet-stream', output_content_format='markdown' ) result = poller.result() markdown_content = make_markdown_from_document_intelligence_result_with_image(result, title) with open(f'{BASE_DIR}/di_results/{title}.md', "w") as file: file.write(markdown_content)
これにより、{BASE_DIR}/docs
ディレクトリ内のすべてのPDFに対して処理が行われます。
結果
実行すると、画像へのリンク付きでMarkdownが出力されます。(一部抜粋で掲載します。)
## RAGの概要 ![figure_0_0](./pdf_rag/images/イケてるLLMOps_figure_0_0.png) RAGはAzure上でLangchainを用いて実装しており、データソースにはgitの該当リポジトリをCognitiveSearch に取り込んでIndex化し、質問に対して適したContextを取得しながら回答を作成しています。モデルのファ インチューニングは現時点でおこなっておりません。RAGの詳細な説明をするとそれだけで1つの記事とな ってしまうため、今回はアーキテクチャー図を載せてRAGの説明としたいと思います。 ![figure_1_0](./pdf_rag/images/イケてるLLMOps_figure_1_0.png) こちらのRAGをエンジンとしてSlackBot化し、現在運用しています。 実際にRAGを動かすことができるようになると、当然次のフェーズとして性能を担保しながら改良を加えて いくというLLMOps的な運用が重要になってきました。 そんな中、Azureに登場したまさにLMOpsを実現させる機能であるPrompt Flowに触れてみました。
出力に含まれている./pdf_rag/images/イケてるLLMOps_figure_1_0.png
は以下の画像です。
Retriever(検索エンジン)の作成
やること
データセットができたため、次は作成したMarkdownをFAISSに投入します。
OPENAI_API_VERSION
, AZURE_OPENAI_ENDPOINT
, OPENAI_API_KEY
, azure_deployment
にはそれぞれの環境に合った値を設定してください。
Document IntelligenceをAzure AI Searchに組み込んでIndexを作成する場合は、以下の記事が秀逸でので参考にして下さい。
実行コード
以下のコードにより、作成したMarkdownを見出し(##)単位でチャンク分割し、FAISSに取り込んでRetrieverを作成します。
Document
のmetadata
には簡単のためファイルパスしか入れていませんが、用途によって適宜追加してください。
retriever
のパラメータも性能を検証してチューニングしてみてください。
from langchain_core.documents import Document from langchain_community.vectorstores import FAISS from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_openai.embeddings.azure import AzureOpenAIEmbeddings import glob def init_azure_embeddings(): return AzureOpenAIEmbeddings( azure_deployment='text-embedding-3-small', api_version=OPENAI_API_VERSION, azure_endpoint=AZURE_OPENAI_ENDPOINT, api_key = OPENAI_API_KEY, timeout=30, max_retries=4 ) mdfiles = glob.glob(f'{BASE_DIR}/di_results/*.md') # 作成したMarkdownを取得 docs = [] for f in mdfiles: with open(f, 'r', encoding='utf-8') as file: data = file.read() data = data.replace('}', ')').replace('{', '(') # {}はPromptの変数と認識されるため()に変換 docs.append(Document(page_content=data, metadata={'path':f})) md_splitter = RecursiveCharacterTextSplitter( separators = [ "\n#{2} ", ], is_separator_regex=True, chunk_size=1500, chunk_overlap=200 ) splitted_docs = md_splitter.split_documents(docs) vector_store = FAISS.from_documents(splitted_docs, embedding=init_azure_embeddings()) retriever = vector_store.as_retriever() retriever.search_kwargs['distance_metric'] = 'cos' retriever.search_kwargs['fetch_k'] = 100 retriever.search_kwargs['k'] = 5
結果
作成したRetriever
に質問文を入れると関連するDocument
が返されます。
retriever.invoke('アーキテクチャ図から読み取れる、このRAGのエンジン部分(Langchainでの実装部分)の処理の流れを解説してください')
回答
[Document('# Prompt Flowで評価Flowを自作してRAGのイケてる LLMOpsを実現してみた\n\n\n\n\n\n...'), Document(page_content='## 今後の方向性\n\nRAGアプリのよりよい運用のため、gitでのversion...'), Document(page_content='今年はLLMの登場により世界に激震が走る年となりましたが、弊社でもその...'), Document(page_content="## 正確性の評価\n\n背景\nRAGに最も問われる性能は「回答内容の正確性..."), Document(page_content='## 評価Flow作成手順\n\n● Prompt flow → Create → Create by type ...')]
簡単なRAGの作成
やること
それではこのRetriever
を用いてシンプルなRAGを構築してみましょう。
RAGの処理の流れは以下です。
- 質問文で
Retriever
に検索をかけて、質問文に関連するDocument
を抽出する Prompt
に質問文と、文脈情報(Context
)として抽出したDocument
を追加Prompt
をLLMに渡して質問に回答
Document
をContext
としてPrompt
に組み込む際に、先ほどテキストに埋め込んだ画像へのリンクがある場合、その画像をPrompt
に追加するため別関数としています。
また、最後にどの文書を参照して回答を作成したのかがわかるよう、引用情報(Citation
)も追加します。
Prompt
内で各Context
にラベリングを行い、LLMに回答時に参照したContext
のラベルを返させることで引用情報を付与できるようにします。
実行コード
Promptの前処理部分は以下です。
多少煩雑になっていますが、やっていることは上記で説明したとおりです。
def encode_image(image_path): with open(image_path, 'rb') as image_file: return base64.b64encode(image_file.read()).decode('utf-8') def create_human_prompt_template_with_image(label, content): # テキストと画像でPrompt Templateを分けないといけないため、 # 画像リンクを検出したらそこで分割して、それぞれPrompt Templateを作成し、結合していく pattern = r'!\[(figure_|table_)([^\]]+)\]\((' + f'{BASE_DIR}/images/)' last_end = 0 contents = [] pos = 0 content_length = len(content) while pos < content_length: match = re.search(pattern, content[pos:]) if not match: contents.append(content[pos:]) break start = pos + match.start() end = pos + match.end() if start > pos: contents.append(content[pos:start]) url_start = end png_end = content.find('.png)', url_start) if png_end == -1: contents.append(content[end:]) break url = f'{BASE_DIR}/images/' + content[url_start:png_end + 4] contents.append(url) last_end = png_end + 5 pos = last_end templates = [] # Citationで使用するため、Contextごとにラベリングを行います。 templates.append(f'Context# {label} Start---------------') for c in contents: if c.startswith(f'{BASE_DIR}/images/'): base64_image = encode_image(c) template = {'image_url': {'url': f'data:image/png;base64,{base64_image}'}} templates.append(template) else: templates.append(c) templates.append(f'Context# {label} End-----------------') return templates def preprocess_context(contexts, base_human_prompt): human_prompt_templates = [base_human_prompt,] for i, d in enumerate(contexts): human_prompt_templates.extend(create_human_prompt_template_with_image(i, d.page_content)) new_templates = [] image_num = 0 for c in human_prompt_templates: if type(c) == dict: if image_num < 50: # 画像は50枚が上限のため制限する new_templates.append(c) image_num += 1 else: continue else: new_templates.append(c) labelled_context = HumanMessagePromptTemplate.from_template(new_templates) return labelled_context
次に回答生成部分のコードです。
class AnswerWithCitation(BaseModel): answer: str = Field(description='''質問の回答。 各文章の末尾に回答に使用した文脈情報の文脈番号を[Context# n]の形式で追記すること。 例: ~~である。[Context# 0]''') source_label: List[int] = Field(description='''文脈(context)のうち、 回答に使用した文脈情報の文脈番号(Context#)のリスト''') def answer_question(question, retriever): system_prompt_template = ( '次の質問(question)について、文脈(context)の内容だけに基づいて論理的に段階的に必要十分な分量で回答してください。\n' '文脈(context)に論拠がない場合はその旨を伝えて回答しないでください。\n' '{format_instructions}' ) human_prompt_template = '------------\n質問\n{question}\n------------\n文脈\n' # Contextの前処理が必要なため、Chainに組み込まず先にRetrieve contexts = retriever.invoke(question) if len(contexts) == 0: return {'answer': '文脈が見つかりません', 'source': []} human_prompt_templates = preprocess_context(contexts, human_prompt_template) answer_llm = init_azure_llm('gpt-4o') output_parser = PydanticOutputParser(pydantic_object=AnswerWithCitation) answer_with_context_prompt = ChatPromptTemplate( [ ('system', system_prompt_template), human_prompt_templates ], partial_variables={'format_instructions':output_parser.get_format_instructions()} ) answer_chain = RunnablePassthrough() | answer_with_context_prompt | answer_llm | output_parser result = answer_chain.invoke({'question':question}) return {'answer': result.answer, 'source': {l:contexts[l] for l in result.source_label}}
最後にCitation
の処理コードです。
def citation(answer, source): # LLMの返した参照ドキュメントのラベル番号を0始まりに修正。 # 参照ドキュメントが1つの場合は、文章ごとのラベルを削除し末尾に参照ドキュメントを追記 doc_label_map = {} if len(source) == 0: return answer answer = answer + '\n\n引用' source = dict(sorted(source.items())) for k in source: if source[k].metadata['path'] in doc_label_map: doc_label_map[source[k].metadata['path']].append(k) else: doc_label_map[source[k].metadata['path']] = [k] if len(doc_label_map) == 1: p = list(doc_label_map.keys())[0] for l in doc_label_map[p]: answer = answer.replace(f'[Context# {l}]', '') title = p.split('/')[-1].split('.')[0] answer = answer + f'\n{title}\n' else: for i, p in enumerate(doc_label_map): for l in doc_label_map[p]: answer = answer.replace(f'[Context# {l}]', f'[{i}]') title = p.split('/')[-1].split('.')[0] answer = answer + f'\n[{i}] : {title}\n' return answer
結果
それではRAGに質問を投げてみましょう。
answer = answer_question('RAGに最も問われる性能はなんですか?', retriever) answer['final_answer'] = citation(answer['answer'], answer['source']) print(answer['final_answer'])
回答
RAGに最も問われる性能は「回答内容の正確性」である。 引用 Prompt Flowで評価Flowを自作してRAGのイケてるLLMOpsを実現してみた
いい感じです!
次に、画像についての質問も投げてみます。
answer = answer_question('アーキテクチャ図から読み取れる、RAGのエンジン部分(Langchainでの実装部分)の処理の流れを解説してください', retriever) answer['final_answer'] = citation(answer['answer'], answer['source']) print(answer['final_answer'])
回答
RAGのエンジン部分(Langchainでの実装部分)の処理の流れは以下の通りです。 まず、ユーザーからの質問が入力されると、Question Splitter (gpt-35-turbo-16k)が質問を分割します。 次に、Keywords Extractor (gpt-35-turbo-16k)が質問からキーワードを抽出し、RelatedWords Generator (gpt-35-turbo-16k)が関連する単語を生成します。 これらのキーワードを用いてRetrieverがFAISSを使って関連するドキュメントをAzure Cognitive Searchから取得します。 取得したドキュメントはEmbedding (text-embedding-ada-002)を用いてベクトル化され、Conversational Retrieval Chain (gpt-35-turbo-16k)が回答を生成します。 最後に、生成された回答はSummary (gpt-4)によって要約され、最終的な回答がユーザーに提供されます。 引用 Prompt Flowで評価Flowを自作してRAGのイケてるLLMOpsを実現してみた
前述のイケてるLLMOps_figure_1_0.png
を元に回答ができています。素晴らしいですね!
非同期化
現在のLangchainは非同期処理が充実しているため、RAGスターターセットとして並列処理の対応も行います。
上記で作成したanswer_question
関数のコメントを加えた部分を修正しています。
async def answer_question(question, retriever):# 非同期関数化 system_prompt_template = ( '次の質問(question)について、文脈(context)の内容だけに基づいて論理的に段階的に必要十分な分量で回答してください。\n' '文脈(context)に論拠がない場合はその旨を伝えて回答しないでください。\n' '{format_instructions}' ) human_prompt_template = '------------\n質問\n{question}\n------------\n文脈\n' contexts = await retriever.ainvoke(question) # 非同期化 if len(contexts) == 0: return {'answer': '文脈が見つかりません', 'source': []} human_prompt_templates = preprocess_context(contexts, human_prompt_template) answer_llm = init_azure_llm('gpt-4o') output_parser = PydanticOutputParser(pydantic_object=AnswerWithSource) answer_with_context_prompt = ChatPromptTemplate( [ ('system', system_prompt_template), human_prompt_templates ], partial_variables={'format_instructions':output_parser.get_format_instructions()} ) answer_chain = RunnablePassthrough() | answer_with_context_prompt | answer_llm | output_parser result = await answer_chain.ainvoke({'question':question}) # 非同期化 return {'answer': result.answer, 'source': {l:contexts[l] for l in result.source_label}}
最後に
今回はDocument Intelligenceを活用したPDFの簡単なRAGを構築してみました。
回答生成だけを目指していたため実装としては本当に最低限で、エラーハンドリングや、ロギング、評価のための中間生成物出力等やることは大量にありますが、スターターセットとしては最低限作れたかなと思っています。
まだテキストを用いたRAGしか作成したことがない方や、RAGをやりたいけどなかなかやれていない方の参考になったら幸いです。
本当はここから精度向上のために改良を加えてRAGを評価する流れや、我々のRAGの評価の考え方、これからのRAGの方向性等を書いていたのですが、それだけで5記事分くらいのボリュームになったため割愛しています。
最後に今回作成したコード全体を載せておきます。
コード全体
import os from azure.core.credentials import AzureKeyCredential from azure.ai.documentintelligence import DocumentIntelligenceClient from azure.ai.documentintelligence.models import AnalyzeDocumentRequest, AnalyzeResult from azure.identity import ManagedIdentityCredential from PIL import Image import fitz import mimetypes from mimetypes import guess_type from tqdm.auto import tqdm import base64 import re from langchain_openai import AzureChatOpenAI from langchain_core.output_parsers.string import StrOutputParser from langchain_core.output_parsers.pydantic import PydanticOutputParser from langchain_core.output_parsers.list import CommaSeparatedListOutputParser from langchain_core.prompts.chat import ChatPromptTemplate, HumanMessagePromptTemplate, SystemMessagePromptTemplate from langchain_core.runnables import RunnablePassthrough from pydantic import BaseModel, Field from typing import List, Dict from langchain_core.documents import Document from langchain_community.vectorstores import FAISS from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_openai.embeddings.azure import AzureOpenAIEmbeddings import glob def get_sections(result): sections = [] for section in result.sections: # 今回は簡単のためsectionによる階層構造を無視するためextendにしています。 sections.extend(section.elements) return sections def get_paragraphs(result): paragraphs = {} for idx, paragraph in enumerate(result.paragraphs): item = { "id": "/paragraphs/" + str(idx), "content": paragraph.content if paragraph.content else "", "role": paragraph.role if paragraph.role else "", "polygon": paragraph.get("boundingRegions")[0]["polygon"], "pageNumber": paragraph.get("boundingRegions")[0]["pageNumber"] } paragraphs["/paragraphs/" + str(idx)] = item return paragraphs def crop_image_from_image(image_path, page_number, bounding_box): """ Crops an image based on a bounding box. :param image_path: Path to the image file. :param page_number: The page number of the image to crop (for TIFF format). :param bounding_box: A tuple of (left, upper, right, lower) coordinates for the bounding box. :return: A cropped image. :rtype: PIL.Image.Image """ with Image.open(image_path) as img: if img.format == "TIFF": # Open the TIFF image img.seek(page_number) img = img.copy() # The bounding box is expected to be in the format (left, upper, right, lower). cropped_image = img.crop(bounding_box) return cropped_image def crop_image_from_pdf_page(pdf_path, page_number, bounding_box): """ Crops a region from a given page in a PDF and returns it as an image. :param pdf_path: Path to the PDF file. :param page_number: The page number to crop from (0-indexed). :param bounding_box: A tuple of (x0, y0, x1, y1) coordinates for the bounding box. :return: A PIL Image of the cropped area. """ doc = fitz.open(pdf_path) page = doc.load_page(page_number) # Cropping the page. The rect requires the coordinates in the format (x0, y0, x1, y1). # The coordinates are in points (1/72 inch). bbx = [x * 72 for x in bounding_box] rect = fitz.Rect(bbx) pix = page.get_pixmap(matrix=fitz.Matrix(300/72, 300/72), clip=rect) img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) doc.close() return img def crop_image_from_file(file_path, page_number, bounding_box): """ Crop an image from a file. Args: file_path (str): The path to the file. page_number (int): The page number (for PDF and TIFF files, 0-indexed). bounding_box (tuple): The bounding box coordinates in the format (x0, y0, x1, y1). Returns: A PIL Image of the cropped area. """ mime_type = mimetypes.guess_type(file_path)[0] if mime_type == "application/pdf": return crop_image_from_pdf_page(file_path, page_number, bounding_box) else: return crop_image_from_image(file_path, page_number, bounding_box) def make_markdown_from_document_intelligence_result_with_image(result, title): all_markdown = '' # 最終的に出力したいMarkdown形式の文字列 result_paragraphs = get_paragraphs(result) result_sections = get_sections(result) for sec in result_sections: if sec.startswith('/paragraphs/'): if result_paragraphs[sec]['role'] == 'title': # titleならHeader1を付与 all_markdown = all_markdown + '# ' + result_paragraphs[sec]['content'] + '\n' elif result_paragraphs[sec]['role'] == 'sectionHeading': # sectionHeadingならHeader2を付与 all_markdown = all_markdown + '## ' + result_paragraphs[sec]['content'] + '\n' else: all_markdown = all_markdown + result_paragraphs[sec]['content'] if sec.startswith('/figures/'): idx = int(sec.split('/')[-1]) # figureのIDを取得 figure = result.figures[idx] if 'boundingRegions' in figure: for i, br in enumerate(figure['boundingRegions']): page = br['pageNumber'] bbox = br['polygon'] bbox = (bbox[0], bbox[1], bbox[4], bbox[5]) image = crop_image_from_file(pdf_path, page-1, bbox) image_path = f'{BASE_DIR}/images/{title}_figure_{idx}_{i}.png' image.save(image_path) # 切り出した画像へのリンクをMarkdownに追加 all_markdown = all_markdown + f'![figure_{idx}_{i}]({image_path})' if sec.startswith('/tables/'): idx = int(sec.split('/')[-1]) # tableのIDを取得 table = result.tables[idx] if 'boundingRegions' in table: for i, br in enumerate(table['boundingRegions']): page = br['pageNumber'] bbox = br['polygon'] bbox = (bbox[0], bbox[1], bbox[4], bbox[5]) image = crop_image_from_file(pdf_path, page-1, bbox) image_path = f'{BASE_DIR}/images/{title}_table_{idx}_{i}.png' image.save(image_path) # 切り出した画像へのリンクをMarkdownに追加 all_markdown = all_markdown + f'![table_{idx}_{i}]({image_path})' all_markdown = all_markdown + '\n' return all_markdown def init_azure_embeddings(): return AzureOpenAIEmbeddings( azure_deployment='text-embedding-3-small', api_version=OPENAI_API_VERSION, azure_endpoint=AZURE_OPENAI_ENDPOINT, api_key = OPENAI_API_KEY, timeout=30, max_retries=4 ) def init_azure_llm(model_name='gpt-4o-mini'): return AzureChatOpenAI( azure_endpoint=AZURE_OPENAI_ENDPOINT, openai_api_version=OPENAI_API_VERSION, deployment_name=model_name, model_name=model_name, openai_api_key=OPENAI_API_KEY, openai_api_type = OPENAI_API_TYPE, temperature=1.0 if model_name in ['o1-mini', 'o1-preview'] else 0.0, cache=False, timeout=30, max_retries=4 ) def encode_image(image_path): with open(image_path, 'rb') as image_file: return base64.b64encode(image_file.read()).decode('utf-8') def create_human_prompt_template_with_image(label, content): # テキストと画像でPrompt Templateを分けないといけないため、 # 画像リンクを検出したらそこで分割して、それぞれPrompt Templateを作成し、結合していく pattern = r'!\[(figure_|table_)([^\]]+)\]\((' + f'{BASE_DIR}/images/)' last_end = 0 contents = [] pos = 0 content_length = len(content) while pos < content_length: match = re.search(pattern, content[pos:]) if not match: contents.append(content[pos:]) break start = pos + match.start() end = pos + match.end() if start > pos: contents.append(content[pos:start]) url_start = end png_end = content.find('.png)', url_start) if png_end == -1: contents.append(content[end:]) break url = f'{BASE_DIR}/images/' + content[url_start:png_end + 4] contents.append(url) last_end = png_end + 5 pos = last_end templates = [] # Citationで使用するため、Contextごとにラベリングを行います。 templates.append(f'Context# {label} Start---------------') for c in contents: if c.startswith(f'{BASE_DIR}/images/'): base64_image = encode_image(c) template = {'image_url': {'url': f'data:image/png;base64,{base64_image}'}} templates.append(template) else: templates.append(c) templates.append(f'Context# {label} End-----------------') return templates def preprocess_context(contexts, base_human_prompt): human_prompt_templates = [base_human_prompt,] for i, d in enumerate(contexts): human_prompt_templates.extend(create_human_prompt_template_with_image(i, d.page_content)) new_templates = [] image_num = 0 for c in human_prompt_templates: if type(c) == dict: if image_num < 50: # 画像は50枚が上限のため制限する new_templates.append(c) image_num += 1 else: continue else: new_templates.append(c) labelled_context = HumanMessagePromptTemplate.from_template(new_templates) return labelled_context def citation(answer, source): # LLMの返した参照ドキュメントのラベル番号を0始まりに修正。 # 参照ドキュメントが1つの場合は、文章ごとのラベルを削除し末尾に参照ドキュメントを追記 doc_label_map = {} if len(source) == 0: return answer answer = answer + '\n\n引用' source = dict(sorted(source.items())) for k in source: if source[k].metadata['path'] in doc_label_map: doc_label_map[source[k].metadata['path']].append(k) else: doc_label_map[source[k].metadata['path']] = [k] if len(doc_label_map) == 1: p = list(doc_label_map.keys())[0] for l in doc_label_map[p]: answer = answer.replace(f'[Context# {l}]', '') title = p.split('/')[-1].split('.')[0] answer = answer + f'\n{title}\n' else: for i, p in enumerate(doc_label_map): for l in doc_label_map[p]: answer = answer.replace(f'[Context# {l}]', f'[{i}]') title = p.split('/')[-1].split('.')[0] answer = answer + f'\n[{i}] : {title}\n' return answer class AnswerWithCitation(BaseModel): answer: str = Field(description='''質問の回答。 各文章の末尾に回答に使用した文脈情報の文脈番号を[Context# n]の形式で追記すること。 例: ~~である。[Context# 0]''') source_label: List[int] = Field(description='文脈(context)のうち、回答に使用した文脈情報の文脈番号(Context#)のリスト') async def answer_question(question, retriever):# 非同期関数化 system_prompt_template = ( '次の質問(question)について、文脈(context)の内容だけに基づいて論理的に段階的に必要十分な分量で回答してください。\n' '文脈(context)に論拠がない場合はその旨を伝えて回答しないでください。\n' '{format_instructions}' ) human_prompt_template = '------------\n質問\n{question}\n------------\n文脈\n' contexts = await retriever.ainvoke(question) # 非同期化 if len(contexts) == 0: return {'answer': '文脈が見つかりません', 'source': []} human_prompt_templates = preprocess_context(contexts, human_prompt_template) answer_llm = init_azure_llm('gpt-4o') output_parser = PydanticOutputParser(pydantic_object=AnswerWithSource) answer_with_context_prompt = ChatPromptTemplate( [ ('system', system_prompt_template), human_prompt_templates ], partial_variables={'format_instructions':output_parser.get_format_instructions()} ) answer_chain = RunnablePassthrough() | answer_with_context_prompt | answer_llm | output_parser result = await answer_chain.ainvoke({'question':question}) # 非同期化 return {'answer': result.answer, 'source': {l:contexts[l] for l in result.source_label}} BASE_DIR = './pdf_rag' # RAG用データを配置するベースのディレクトリ pdf_filename = 'イケてるLLMOps.pdf' # 実行対象のPDF endpoint = '<your-endpoint>' # Document Intelligenceのendpoint client_id = os.environ.get('DEFAULT_IDENTITY_CLIENT_ID') credential = ManagedIdentityCredential(client_id=client_id) document_intelligence_client = DocumentIntelligenceClient(endpoint=endpoint, credential=credential) with open(f'{BASE_DIR}/docs/{pdf_filename}', 'rb') as f: poller = document_intelligence_client.begin_analyze_document( model_id = 'prebuilt-layout', analyze_request=f, content_type='application/octet-stream', output_content_format='markdown' ) result = poller.result() all_pdf_paths = glob.glob(f'{BASE_DIR}/docs/*.pdf') for pdf_path in tqdm(all_pdf_paths): title = pdf_path.split('/')[-1].split('.')[0] # PDFのタイトルを取得 with open(pdf_path, 'rb') as f: poller = document_intelligence_client.begin_analyze_document( model_id = 'prebuilt-layout', analyze_request=f, content_type='application/octet-stream', output_content_format='markdown' ) result = poller.result() markdown_content = make_markdown_from_document_intelligence_result_with_image(result, title) with open(f'{BASE_DIR}/di_results/{title}.md', "w") as file: file.write(markdown_content) mdfiles = glob.glob(f'{BASE_DIR}/di_results/*.md') # 作成したMarkdownファイルを取得 docs = [] for f in mdfiles: with open(f, 'r', encoding='utf-8') as file: data = file.read() data = data.replace('}', ')').replace('{', '(') # {}はPromptの変数と認識されるため()に変換 docs.append(Document(page_content=data, metadata={'path':f})) md_splitter = RecursiveCharacterTextSplitter( separators = [ "\n#{2} ", ], is_separator_regex=True, chunk_size=1500, chunk_overlap=200 ) splitted_docs = md_splitter.split_documents(docs) vector_store = FAISS.from_documents(splitted_docs, embedding=init_azure_embeddings()) retriever = vector_store.as_retriever() retriever.search_kwargs['distance_metric'] = 'cos' retriever.search_kwargs['fetch_k'] = 100 retriever.search_kwargs['k'] = 5 answer = await answer_question('<質問内容>', retriever) answer['final_answer'] = citation(answer['answer'], answer['source']) print(answer['final_answer'])