pdf_parser_workflow.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. import os
  2. import concurrent.futures
  3. from concurrent.futures import ThreadPoolExecutor
  4. from langgraph.graph import StateGraph, START, END
  5. from typing import List, Dict, Any
  6. from pydantic import BaseModel, Field, ConfigDict
  7. from parser.pdf_parser.pdf_splitter import PDFSplitter
  8. from model.qwen_vl import QWenVLParser
  9. from utils.ragflow_sdk import DataSetUtil, DocumentUtil, ChunkUtil
  10. from utils.ragflow.ragflow_service import RAGFlowService
  11. from model.multimodal_embedding import Embedding
  12. from conf.settings import model_settings, vector_db_settings
  13. from utils.infinity import get_client
  14. # 定义工作流状态类
  15. class PDFParsingState(BaseModel):
  16. """PDF解析工作流状态"""
  17. model_config = ConfigDict(arbitrary_types_allowed=True)
  18. pdf_path: str = Field(..., description="PDF文件路径")
  19. dataset_id: str = Field(..., description="数据集ID")
  20. page_dataset_id: str = Field(..., description="页面数据集ID")
  21. ragflow_service: RAGFlowService = Field(default_factory=RAGFlowService, description="RAGFlow服务实例")
  22. dataset_util: DataSetUtil = Field(default_factory=DataSetUtil, description="数据集工具类实例")
  23. document_util: DocumentUtil = Field(default_factory=DocumentUtil, description="文档工具类实例")
  24. chunk_util: ChunkUtil = Field(default_factory=ChunkUtil, description="文档工具类实例")
  25. embedding_model: Embedding = Field(default_factory=Embedding, description="多模态嵌入模型实例")
  26. document_id: str = Field(default="", description="上传后的文档ID")
  27. split_pages: List[Dict[str, Any]] = Field(default_factory=list, description="拆分后的页面列表")
  28. current_page: Dict[str, Any] = Field(default_factory=dict, description="当前处理的页面")
  29. parsed_results: List[Dict[str, Any]] = Field(default_factory=list, description="解析结果列表")
  30. vectorized_results: List[Dict[str, Any]] = Field(default_factory=list, description="向量化结果列表")
  31. processed_pages: int = Field(default=0, description="已处理的页面数量")
  32. vectorized_pages: int = Field(default=0, description="已向量化的页面数量")
  33. is_complete: bool = Field(default=False, description="是否处理完成")
  34. # 创建工作流构建器
  35. class PDFParsingWorkflow:
  36. """PDF扫描件拆分解析工作流"""
  37. def __init__(self, model_name: str = "Qwen/Qwen3-VL-8B-Instruct"):
  38. """
  39. 初始化PDF解析工作流
  40. Args:
  41. model_name: QWEN VL模型名称
  42. """
  43. self.model_name = model_name
  44. self.workflow = self._build_workflow()
  45. def _build_workflow(self):
  46. """构建langgraph工作流,实现基于条件路由的并行处理"""
  47. # 创建状态图
  48. graph = StateGraph(PDFParsingState)
  49. # 添加上传文档节点
  50. graph.add_node("upload_document", self._upload_document_node)
  51. # 添加解析文档节点
  52. graph.add_node("parse_document", self._parse_document_node)
  53. # 添加拆分PDF节点
  54. graph.add_node("split_pdf", self._split_pdf_node)
  55. # 添加解析图像节点
  56. graph.add_node("parse_image", self._parse_image_node)
  57. # 添加向量化入库节点
  58. graph.add_node("vectorize_store", self._vectorize_store_node)
  59. # 添加完成节点
  60. graph.add_node("complete", self._complete_node)
  61. # 定义边
  62. # 定义RagFLow解析文档
  63. graph.add_edge(START, "upload_document")
  64. # 添加解析文档边
  65. graph.add_edge("upload_document", "parse_document")
  66. graph.add_edge("parse_document", "split_pdf")
  67. # 定义图片解析边
  68. graph.add_edge("split_pdf", "parse_image")
  69. # 添加条件边:判断是否继续解析
  70. graph.add_conditional_edges(
  71. "parse_image",
  72. self._should_continue_parsing,
  73. {
  74. "continue": "parse_image",
  75. "complete": "vectorize_store"
  76. }
  77. )
  78. # 添加向量化入库边
  79. graph.add_edge("vectorize_store", "complete")
  80. graph.add_edge("complete", END)
  81. # 编译工作流
  82. return graph.compile()
  83. def get_ragflow_dataset(self, dataset_name: str) -> str:
  84. """获取RAGFLOW数据集ID"""
  85. try:
  86. dataset_id = self.dataset_util.get_dataset(name=dataset_name)
  87. print(f"数据集 {dataset_name} 的ID为: {dataset_id}")
  88. return dataset_id
  89. except Exception as e:
  90. print(f"获取数据集ID时出错: {str(e)}")
  91. raise
  92. def create_ragflow_dataset(self, state: PDFParsingState, dataset_name: str) -> str:
  93. """创建RAGFLOW数据集"""
  94. if state.dataset_id:
  95. print(f"数据集 {dataset_name} 已存在,数据集ID: {state.dataset_id}")
  96. return state.dataset_id
  97. print(f"开始创建数据集: {dataset_name}")
  98. try:
  99. # 创建数据集
  100. dataset_id = self.dataset_util.create_dataset(
  101. chunk_method="naive",
  102. dataset_name=dataset_name,
  103. dataset_desc="",
  104. )
  105. print(f"数据集创建成功,数据集ID: {dataset_id}")
  106. return dataset_id
  107. except Exception as e:
  108. print(f"创建数据集时出错: {str(e)}")
  109. raise
  110. def _upload_document_node(self, state: PDFParsingState) -> Dict[str, Any]:
  111. """RAGFLOW上传文档节点"""
  112. print(f"开始上传文档到数据集 {state.dataset_id}: {state.pdf_path}")
  113. try:
  114. # 上传文档
  115. document_info_list = state.ragflow_service.upload_document(
  116. dataset_id=state.dataset_id,
  117. file_path=state.pdf_path
  118. )
  119. # 上传文档
  120. document_info_list2 = state.ragflow_service.upload_document(
  121. dataset_id=state.page_dataset_id,
  122. file_path=state.pdf_path
  123. )
  124. # 检查响应
  125. if document_info_list and len(document_info_list) > 0:
  126. document_id = document_info_list[0]["id"]
  127. page_document_id = document_info_list2[0]["id"]
  128. print(f"文档上传成功,文档ID: {document_id}")
  129. return {
  130. "document_id": document_id,
  131. "page_document_id": page_document_id
  132. }
  133. else:
  134. print("文档上传失败: 未返回有效的文档信息")
  135. raise Exception("文档上传失败: 未返回有效的文档信息")
  136. except Exception as e:
  137. print(f"上传文档时出错: {str(e)}")
  138. raise
  139. def _parse_document_node(self, state: PDFParsingState) -> Dict[str, Any]:
  140. """RAGFLOW文档解析节点"""
  141. print(f"开始解析文档 {state.dataset_id}: {state.document_id}")
  142. try:
  143. # 解析文档
  144. parse_success = state.ragflow_service.parse_document(
  145. dataset_id=state.dataset_id,
  146. document_ids=[state.document_id]
  147. )
  148. # 检查响应parse_success为bool
  149. if parse_success:
  150. print(f"文档解析成功,文档ID: {state.document_id}")
  151. # 返回空列表,因为parsed_results字段期望是列表类型
  152. return {
  153. "parsed_results": []
  154. }
  155. else:
  156. print("文档解析失败: 未返回有效的解析结果")
  157. raise Exception("文档解析失败: 未返回有效的解析结果")
  158. except Exception as e:
  159. print(f"解析文档时出错: {str(e)}")
  160. raise
  161. def _split_pdf_node(self, state: PDFParsingState) -> Dict[str, Any]:
  162. """拆分PDF节点"""
  163. print(f"开始拆分PDF: {state.pdf_path}")
  164. # 拆分PDF
  165. splitter = PDFSplitter()
  166. split_pages = splitter.split_pdf(state.pdf_path)
  167. print(f"PDF拆分完成,共 {len(split_pages)} 页")
  168. return {
  169. "split_pages": split_pages,
  170. "parsed_results": [],
  171. "processed_pages": 0,
  172. "is_complete": False
  173. }
  174. def _parse_single_page(self, page: Dict[str, Any], model_name: str) -> Dict[str, Any]:
  175. """解析单个页面(用于并行处理)"""
  176. prompt = """
  177. 角色定位:你是一位顶尖的儿童绘本分析师与视觉工程专家,擅长将插画视觉信息转化为高精度的结构化元数据。
  178. 任务描述:请深度解析提供的绘本页面,不仅提取基本要素,还要进行“像素级”的特征拆解。重点关注角色的微表情、服饰纹理、环境光效、构图视角及整体艺术风格。
  179. 提取维度:
  180. 艺术风格 (Style):包括笔触(如水彩、蜡笔)、线条粗细、整体色调偏好。
  181. 角色特征 (Character):五官细节、肢体动作的动态感、衣物材质、标志性配饰。
  182. 空间构图 (Composition):透视关系(仰拍/俯拍)、视觉焦点、前景/中景/背景的层次。
  183. 物品与环境 (Object & Environment):物体的精确形状、材质光泽、环境中的自然元素(风吹草动的方向等)。
  184. 内容标签 (content_tags):请从以下三个维度进行打标:
  185. 主题维度(如:自然探索、家庭学校、科学科普、传统文化)
  186. 具体对象(如:昆虫、交通工具、五官、家具)
  187. 情感氛围(如:惊喜、友爱、好奇、安静)
  188. 能力标签 (ability_tags):请严格参照以下教育能力模型,根据图中元素体现的教育价值进行选择:
  189. [语言表达、逻辑思维、数理逻辑、空间想象、艺术创造、身体协调、自我认知、社会交往、自然观察、情绪管理]。
  190. 输出约束:
  191. 保持描述积极向上,符合0-12岁儿童阅读的安全标准。
  192. 描述精度:单条描述需包含具体视觉属性(颜色、形状、质感),字数控制在50字以内。
  193. 格式要求:严谨按照指定的JSON结构输出。
  194. json格式:
  195. {
  196. "page_meta": {
  197. "page_number": 1,
  198. "content_text": "页面原文本内容",
  199. "overall_style": {
  200. "art_medium": "艺术媒介(如:手绘水彩、矢量平涂、3D渲染)",
  201. "color_palette": ["主色调1", "主色调2"],
  202. "lighting": "光影描述(如:柔和侧光、清晨自然光)",
  203. "composition": "构图(如:三分法、对角线构图、大远景)"
  204. }
  205. },
  206. "elements": [
  207. {
  208. "element_name": "元素名称(如:小兔子)",
  209. "character_name": "角色名称(如果有,没有的话,角色名称为空字符串)",
  210. "category": "分类(角色/场景/道具)",
  211. "spatial_layer": "所在层级(前景/中景/背景)",
  212. "visual_attributes": {
  213. "appearance": "外貌细节描述(发型、五官、材质感)",
  214. "action_emotion": "行为动作与情感流露",
  215. "color_detail": "像素级颜色描述(如:淡茱萸粉、薄荷绿)",
  216. "ability_tag": "如果为角色,其表现出的正面能力/特质"
  217. },
  218. "content_tags": {
  219. "theme": ["自然", "社交", "生活常识"],
  220. "object": ["动物", "服装", "植物"],
  221. "emotion": ["快乐", "勇敢"]
  222. },
  223. "ability_tags": ["语言表达", "逻辑思维", "自我认知"],
  224. "description": "综合性简洁描述(50字内)"
  225. }
  226. ]
  227. }
  228. """
  229. page_number = page["page_number"]
  230. image = page["image"]
  231. print(f"开始解析第 {page_number} 页")
  232. # 使用QWEN VL模型解析图像
  233. parser = QWenVLParser(model_name)
  234. result = parser.parse_image(image, page_number, prompt)
  235. print(f"第 {page_number} 页解析完成")
  236. return result
  237. def _parse_image_node(self, state: PDFParsingState) -> Dict[str, Any]:
  238. """解析图像节点,使用并行处理"""
  239. if not state.split_pages:
  240. return state.dict()
  241. print(f"开始并行解析 {len(state.split_pages)} 页")
  242. parsed_results = []
  243. # 使用ThreadPoolExecutor实现并行处理
  244. with ThreadPoolExecutor(max_workers=5, thread_name_prefix="parse_page_") as executor:
  245. # 提交所有页面解析任务
  246. future_to_page = {
  247. executor.submit(self._parse_single_page, page, self.model_name): page
  248. for page in state.split_pages
  249. }
  250. # 收集解析结果
  251. for future in concurrent.futures.as_completed(future_to_page):
  252. try:
  253. result = future.result()
  254. parsed_results.append(result)
  255. except Exception as e:
  256. page = future_to_page[future]
  257. print(f"解析第 {page['page_number']} 页时出错: {str(e)}")
  258. # 按页码排序结果
  259. parsed_results.sort(key=lambda x: x["page_number"])
  260. print(f"所有页面解析完成,共解析 {len(parsed_results)} 页")
  261. return {
  262. "split_pages": state.split_pages, # 保留split_pages,以便后续访问图片
  263. "parsed_results": parsed_results,
  264. "processed_pages": len(parsed_results),
  265. "is_complete": True
  266. }
  267. def _should_continue_parsing(self, state: PDFParsingState) -> str:
  268. """判断是否继续解析"""
  269. # 由于我们使用了并行处理,parse_image_node会一次性处理所有页面
  270. # 所以这里总是返回"complete"
  271. return "complete"
  272. def create_ragflow_chunk(self, state: PDFParsingState):
  273. """单页上传节点"""
  274. print(f"开始单页上传,共 {len(state.parsed_results)} 页")
  275. # 遍历所有解析结果,上传单页
  276. for parsed_result in state.parsed_results:
  277. page_number = parsed_result.get("page_number")
  278. text = parsed_result.get("content", "")
  279. image = state.split_pages[page_number - 1].get("image")
  280. # 上传单页到RagFlow Chunk
  281. chunk = state.chunk_util.add_chunk(
  282. dataset_name=state.dataset_name,
  283. document_id=state.page_document_id,
  284. content=text,
  285. )
  286. infinity_client = get_client()
  287. infinity_client.update(database_name=state.dataset_name, table_name="", cond=f"id = {chunk_id}", data={"tag_kwd": tag_name})
  288. # 检查响应
  289. if document_info and document_info.get("id"):
  290. print(f"第 {page_number} 页上传成功,文档ID: {document_info['id']}")
  291. else:
  292. print(f"第 {page_number} 页上传失败")
  293. def _vectorize_store_node(self, state: PDFParsingState) -> Dict[str, Any]:
  294. """向量化入库节点"""
  295. print(f"开始向量化入库,共 {len(state.parsed_results)} 页")
  296. # 创建索引(如果不存在)
  297. index_name = f"{vector_db_settings.infinity_table_name}"
  298. state.vector_db.create_index(index_name)
  299. # 准备要入库的文档列表
  300. documents_to_store = []
  301. # 获取文件名和总页数
  302. file_name = os.path.basename(state.pdf_path)
  303. file_page_count = len(state.split_pages)
  304. # 遍历所有解析结果,生成向量化文档
  305. for i, parsed_result in enumerate(state.parsed_results):
  306. try:
  307. page_number = parsed_result.get("page_number")
  308. text = parsed_result.get("content", "")
  309. image = state.split_pages[i].get("image")
  310. image_path = state.split_pages[i].get("image_path")
  311. # 获取多模态嵌入向量
  312. print(f"正在生成第 {page_number} 页的多模态嵌入...")
  313. embedding = state.embedding_model.get_multimodal_embedding(text, image)
  314. # 生成1024维稠密向量(如果嵌入向量维度不是1024,这里需要处理)
  315. dense_vector_1024 = embedding[:1024] # 取前1024维
  316. # 创建文档
  317. document = {
  318. "id": f"{state.document_id}_{page_number}",
  319. "file_name": file_name,
  320. "file_page_count": file_page_count,
  321. "page_number": page_number,
  322. "content": text,
  323. "image_path": image_path,
  324. "dense_vector_1024": dense_vector_1024,
  325. "dataset_id": state.dataset_id,
  326. "document_id": state.document_id
  327. }
  328. documents_to_store.append(document)
  329. print(f"第 {page_number} 页向量化完成")
  330. except Exception as e:
  331. print(f"第 {i+1} 页向量化失败: {str(e)}")
  332. # 批量入库
  333. if documents_to_store:
  334. print(f"开始入库,共 {len(documents_to_store)} 个文档")
  335. infinity_client = get_client()
  336. result = infinity_client.insert(index_name, documents_to_store)
  337. print(f"入库结果: {result}")
  338. return {
  339. "vectorized_results": documents_to_store,
  340. "vectorized_pages": len(documents_to_store),
  341. "is_complete": True
  342. }
  343. def _complete_node(self, state: PDFParsingState) -> Dict[str, Any]:
  344. """完成节点"""
  345. print(f"PDF解析工作流完成,共解析 {len(state.parsed_results)} 页,向量化 {state.vectorized_pages} 页")
  346. # 判断ragflow是否解析成功
  347. return {
  348. "is_complete": True
  349. }
  350. def run(self, pdf_path: str, page_dataset_id: str, ragflow_api_url: str, rag_flow_api_key: str) -> Dict[str, Any]:
  351. """
  352. 运行PDF解析工作流
  353. Args:
  354. pdf_path: PDF文件路径
  355. page_dataset_id: 数据集ID
  356. ragflow_api_url: RAGFLOW API URL
  357. rag_flow_api_key: RAGFLOW API密钥
  358. Returns:
  359. Dict: 包含最终状态的字典
  360. """
  361. initial_state = PDFParsingState(
  362. pdf_path=pdf_path,
  363. page_dataset_id=page_dataset_id,
  364. embedding_model=Embedding(model_name=model_settings.multimodal_embedding_model_name, api_key=model_settings.dashscope_api_key),
  365. dataset_util=DataSetUtil(),
  366. document_util=DocumentUtil(),
  367. chunk_util=ChunkUtil(),
  368. ragflow_service=RAGFlowService(api_url=ragflow_api_url, api_key=rag_flow_api_key)
  369. )
  370. result = self.workflow.invoke(initial_state)
  371. # 检查结果类型,如果是字典直接返回,否则调用dict()方法
  372. if isinstance(result, dict):
  373. return result
  374. else:
  375. return result.dict()