pdf_parser_workflow.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493
  1. import os
  2. import concurrent.futures
  3. import time
  4. from concurrent.futures import ThreadPoolExecutor
  5. from langgraph.graph import StateGraph, START, END
  6. from typing import List, Dict, Any
  7. from pydantic import BaseModel, Field, ConfigDict
  8. from parser.pdf_parser.pdf_splitter import PDFSplitter
  9. from model.qwen_vl import QWenVLParser
  10. from utils.ragflow.ragflow_service import RAGFlowService
  11. from model.multimodal_embedding import Embedding
  12. from conf.settings import model_settings, vector_db_settings
  13. from utils.infinity import get_client
  14. from langfuse.langchain import CallbackHandler
  15. from conf.rag_parser_config import RagParserDefaults
  16. # 定义工作流状态类
  17. class PDFParsingState(BaseModel):
  18. """PDF解析工作流状态"""
  19. model_config = ConfigDict(arbitrary_types_allowed=True)
  20. pdf_path: str = Field(..., description="PDF文件路径")
  21. dataset_name: str = Field(..., description="数据集名称")
  22. dataset_id: str = Field(default="", description="RAGFLOW数据集ID")
  23. document_id: str = Field(default="", description="上传后的文档ID")
  24. page_dataset_id: str = Field(..., description="页面数据集ID")
  25. page_document_id: str = Field(default="", description="上传后的页面文档ID")
  26. split_pages: List[Dict[str, Any]] = Field(default_factory=list, description="拆分后的页面列表")
  27. current_page: Dict[str, Any] = Field(default_factory=dict, description="当前处理的页面")
  28. parsed_results: List[Dict[str, Any]] = Field(default_factory=list, description="解析结果列表")
  29. vectorized_results: List[Dict[str, Any]] = Field(default_factory=list, description="向量化结果列表")
  30. processed_pages: int = Field(default=0, description="已处理的页面数量")
  31. vectorized_pages: int = Field(default=0, description="已向量化的页面数量")
  32. is_complete: bool = Field(default=False, description="是否处理完成")
  33. # 创建工作流构建器
  34. class PDFParsingWorkflow:
  35. """PDF扫描件拆分解析工作流"""
  36. def __init__(self, model_name: str = "Qwen/Qwen3-VL-8B-Instruct"):
  37. """
  38. 初始化PDF解析工作流
  39. Args:
  40. model_name: QWEN VL模型名称
  41. """
  42. self.model_name = model_name
  43. self.workflow = self._build_workflow()
  44. self.ragflow_service = RAGFlowService()
  45. self.langfuse_handler = CallbackHandler()
  46. self.embedding_model = Embedding(model_name=model_settings.multimodal_embedding_model_name, api_key=model_settings.dashscope_api_key)
  47. def _build_workflow(self):
  48. """构建langgraph工作流,实现基于条件路由的并行处理"""
  49. # 创建状态图
  50. graph = StateGraph(PDFParsingState)
  51. # 添加查询知识库是否存在节点
  52. graph.add_node("get_ragflow_dataset", self.get_ragflow_dataset)
  53. # 添加创建知识库节点
  54. graph.add_node("create_ragflow_dataset", self.create_ragflow_dataset)
  55. # 添加上传文档节点
  56. graph.add_node("upload_document", self._upload_document_node)
  57. # 添加上传图书页面文档节点
  58. graph.add_node("upload_page_document", self._upload_page_document_node)
  59. # 添加解析文档节点
  60. graph.add_node("parse_document", self._parse_document_node)
  61. # 添加拆分PDF节点
  62. graph.add_node("split_pdf", self._split_pdf_node)
  63. # 添加解析图像节点
  64. graph.add_node("parse_image", self._parse_image_node)
  65. # 添加解析图书页面图像节点
  66. graph.add_node("create_ragflow_chunk", self.create_ragflow_chunk)
  67. # 添加向量化入库节点
  68. graph.add_node("vectorize_store", self._vectorize_store_node)
  69. # 添加完成节点
  70. graph.add_node("complete", self._complete_node)
  71. # 定义边
  72. # 查询知识库是否存在
  73. graph.add_edge(START, "get_ragflow_dataset")
  74. # 添加条件边,判断知识库是否存在
  75. graph.add_conditional_edges(
  76. "get_ragflow_dataset",
  77. self._check_dataset_exists,
  78. {
  79. "exists": "upload_document",
  80. "not_exists": "create_ragflow_dataset"
  81. }
  82. )
  83. # 添加解析文档边
  84. graph.add_edge("create_ragflow_dataset", "upload_document")
  85. graph.add_edge("upload_document", "parse_document")
  86. graph.add_edge("upload_document", "upload_page_document")
  87. graph.add_edge("parse_document", "split_pdf")
  88. # 定义图片解析边
  89. graph.add_edge("split_pdf", "parse_image")
  90. # 添加条件边:判断是否继续解析
  91. graph.add_conditional_edges(
  92. "parse_image",
  93. self._should_continue_parsing,
  94. {
  95. "continue": "parse_image",
  96. "complete": "vectorize_store",
  97. }
  98. )
  99. # 添加从vectorize_store到create_ragflow_chunk的边
  100. graph.add_edge("vectorize_store", "create_ragflow_chunk")
  101. graph.add_edge("create_ragflow_chunk", "complete")
  102. graph.add_edge("complete", END)
  103. # 编译工作流
  104. return graph.compile()
  105. def get_ragflow_dataset(self, state: PDFParsingState) -> str:
  106. """获取RAGFLOW数据集ID"""
  107. try:
  108. dataset = self.ragflow_service.get_dataset(name=state.dataset_name)
  109. dataset_id = dataset["id"] if dataset else ""
  110. print(f"数据集 {state.dataset_name} 的ID为: {dataset_id}")
  111. return {
  112. "dataset_id": dataset_id
  113. }
  114. except Exception as e:
  115. raise Exception(f"获取数据集ID时出错: {str(e)}")
  116. def _check_dataset_exists(self, state: PDFParsingState) -> str:
  117. """检查RAGFLOW数据集是否存在"""
  118. # 判断state.dataset_id是否为空,为空则返回"not_exists",否则返回"exists"
  119. if state.dataset_id == "":
  120. return "not_exists"
  121. else:
  122. return "exists"
  123. def create_ragflow_dataset(self, state: PDFParsingState) -> str:
  124. """创建RAGFLOW数据集"""
  125. print(f"开始创建数据集: {state.dataset_name}")
  126. try:
  127. # 创建数据集
  128. dataset = self.ragflow_service.create_dataset(name=state.dataset_name, description="",
  129. permission=RagParserDefaults.DATASET_PERMISSION,
  130. chunk_method=RagParserDefaults.DATASET_CHUNK_METHOD,
  131. parser_config=RagParserDefaults.DATASET_CONFIG_DICT)
  132. dataset_id = dataset["id"]
  133. print(f"数据集创建成功,数据集ID: {dataset_id}")
  134. return {
  135. "dataset_id": dataset_id
  136. }
  137. except Exception as e:
  138. print(f"创建数据集时出错: {str(e)}")
  139. raise
  140. def _upload_document_node(self, state: PDFParsingState) -> Dict[str, Any]:
  141. """RAGFLOW上传文档节点"""
  142. print(f"开始上传文档到数据集 {state.dataset_id}: {state.pdf_path}")
  143. try:
  144. # 上传文档
  145. document_info_list = self.ragflow_service.upload_document(
  146. dataset_id=state.dataset_id,
  147. file_path=state.pdf_path
  148. )
  149. # 检查响应
  150. if document_info_list and len(document_info_list) > 0:
  151. document_id = document_info_list[0]["id"]
  152. print(f"文档上传成功,文档ID: {document_id}")
  153. return {
  154. "document_id": document_id,
  155. }
  156. else:
  157. print("文档上传失败: 未返回有效的文档信息")
  158. raise Exception("文档上传失败: 未返回有效的文档信息")
  159. except Exception as e:
  160. print(f"上传文档时出错: {str(e)}")
  161. raise
  162. def _upload_page_document_node(self, state: PDFParsingState) -> Dict[str, Any]:
  163. """RAGFLOW上传页面文档节点"""
  164. print(f"开始上传页面文档到数据集 {state.dataset_id}: {state.pdf_path}")
  165. try:
  166. # 上传文档
  167. document_info_list = self.ragflow_service.upload_document(
  168. dataset_id=state.page_dataset_id,
  169. file_path=state.pdf_path
  170. )
  171. # 检查响应
  172. if document_info_list and len(document_info_list) > 0:
  173. page_document_id = document_info_list[0]["id"]
  174. print(f"文档上传成功,文档ID: {page_document_id}")
  175. return {
  176. "page_document_id": page_document_id,
  177. }
  178. else:
  179. print("文档上传失败: 未返回有效的文档信息")
  180. raise Exception("文档上传失败: 未返回有效的文档信息")
  181. except Exception as e:
  182. print(f"上传文档时出错: {str(e)}")
  183. raise
  184. def _parse_document_node(self, state: PDFParsingState) -> Dict[str, Any]:
  185. """RAGFLOW文档解析节点"""
  186. print(f"开始解析文档 {state.dataset_id}: {state.document_id}")
  187. try:
  188. # 解析文档
  189. parse_success = self.ragflow_service.parse_document(
  190. dataset_id=state.dataset_id,
  191. document_ids=[state.document_id]
  192. )
  193. # 检查响应parse_success为bool
  194. if parse_success:
  195. print(f"文档解析成功,文档ID: {state.document_id}")
  196. # 返回空列表,因为parsed_results字段期望是列表类型
  197. return {
  198. "parsed_results": []
  199. }
  200. else:
  201. print("文档解析失败: 未返回有效的解析结果")
  202. raise Exception("文档解析失败: 未返回有效的解析结果")
  203. except Exception as e:
  204. print(f"解析文档时出错: {str(e)}")
  205. raise
  206. def _split_pdf_node(self, state: PDFParsingState) -> Dict[str, Any]:
  207. """拆分PDF节点"""
  208. print(f"开始拆分PDF: {state.pdf_path}")
  209. # 拆分PDF
  210. splitter = PDFSplitter()
  211. split_pages = splitter.split_pdf(state.pdf_path)
  212. print(f"PDF拆分完成,共 {len(split_pages)} 页")
  213. return {
  214. "split_pages": split_pages,
  215. "parsed_results": [],
  216. "processed_pages": 0,
  217. "is_complete": False
  218. }
  219. def _parse_single_page(self, page: Dict[str, Any], model_name: str) -> Dict[str, Any]:
  220. """解析单个页面(用于并行处理)"""
  221. prompt = """
  222. 角色定位:你是一位顶尖的儿童绘本分析师与视觉工程专家,擅长将插画视觉信息转化为高精度的结构化元数据。
  223. 任务描述:请深度解析提供的绘本页面,不仅提取基本要素,还要进行“像素级”的特征拆解。重点关注角色的微表情、服饰纹理、环境光效、构图视角及整体艺术风格。
  224. 提取维度:
  225. 艺术风格 (Style):包括笔触(如水彩、蜡笔)、线条粗细、整体色调偏好。
  226. 角色特征 (Character):五官细节、肢体动作的动态感、衣物材质、标志性配饰。
  227. 空间构图 (Composition):透视关系(仰拍/俯拍)、视觉焦点、前景/中景/背景的层次。
  228. 物品与环境 (Object & Environment):物体的精确形状、材质光泽、环境中的自然元素(风吹草动的方向等)。
  229. 内容标签 (content_tags):请从以下三个维度进行打标:
  230. 主题维度(如:自然探索、家庭学校、科学科普、传统文化)
  231. 具体对象(如:昆虫、交通工具、五官、家具)
  232. 情感氛围(如:惊喜、友爱、好奇、安静)
  233. 能力标签 (ability_tags):请严格参照以下教育能力模型,根据图中元素体现的教育价值进行选择:
  234. [语言表达、逻辑思维、数理逻辑、空间想象、艺术创造、身体协调、自我认知、社会交往、自然观察、情绪管理]。
  235. 输出约束:
  236. 保持描述积极向上,符合0-12岁儿童阅读的安全标准。
  237. 描述精度:单条描述需包含具体视觉属性(颜色、形状、质感),字数控制在50字以内。
  238. 格式要求:严谨按照指定的JSON结构输出。
  239. json格式:
  240. {
  241. "page_meta": {
  242. "page_number": 1,
  243. "content_text": "页面原文本内容",
  244. "overall_style": {
  245. "art_medium": "艺术媒介(如:手绘水彩、矢量平涂、3D渲染)",
  246. "color_palette": ["主色调1", "主色调2"],
  247. "lighting": "光影描述(如:柔和侧光、清晨自然光)",
  248. "composition": "构图(如:三分法、对角线构图、大远景)"
  249. }
  250. },
  251. "elements": [
  252. {
  253. "element_name": "元素名称(如:小兔子)",
  254. "character_name": "角色名称(如果有,没有的话,角色名称为空字符串)",
  255. "category": "分类(角色/场景/道具)",
  256. "spatial_layer": "所在层级(前景/中景/背景)",
  257. "visual_attributes": {
  258. "appearance": "外貌细节描述(发型、五官、材质感)",
  259. "action_emotion": "行为动作与情感流露",
  260. "color_detail": "像素级颜色描述(如:淡茱萸粉、薄荷绿)",
  261. "ability_tag": "如果为角色,其表现出的正面能力/特质"
  262. },
  263. "content_tags": {
  264. "theme": ["自然", "社交", "生活常识"],
  265. "object": ["动物", "服装", "植物"],
  266. "emotion": ["快乐", "勇敢"]
  267. },
  268. "ability_tags": ["语言表达", "逻辑思维", "自我认知"],
  269. "description": "综合性简洁描述(50字内)"
  270. }
  271. ]
  272. }
  273. """
  274. page_number = page["page_number"]
  275. image = page["image"]
  276. print(f"开始解析第 {page_number} 页")
  277. # 使用QWEN VL模型解析图像
  278. parser = QWenVLParser(model_name)
  279. result = parser.parse_image(image, page_number, prompt)
  280. print(f"第 {page_number} 页解析完成")
  281. return result
  282. def _parse_image_node(self, state: PDFParsingState) -> Dict[str, Any]:
  283. """解析图像节点,使用并行处理"""
  284. if not state.split_pages:
  285. return state.dict()
  286. print(f"开始并行解析 {len(state.split_pages)} 页")
  287. parsed_results = []
  288. # 使用ThreadPoolExecutor实现并行处理
  289. with ThreadPoolExecutor(max_workers=5, thread_name_prefix="parse_page_") as executor:
  290. # 提交所有页面解析任务
  291. future_to_page = {
  292. executor.submit(self._parse_single_page, page, self.model_name): page
  293. for page in state.split_pages
  294. }
  295. # 收集解析结果
  296. for future in concurrent.futures.as_completed(future_to_page):
  297. try:
  298. result = future.result()
  299. parsed_results.append(result)
  300. except Exception as e:
  301. page = future_to_page[future]
  302. print(f"解析第 {page['page_number']} 页时出错: {str(e)}")
  303. # 按页码排序结果
  304. parsed_results.sort(key=lambda x: x["page_number"])
  305. print(f"所有页面解析完成,共解析 {len(parsed_results)} 页")
  306. return {
  307. "split_pages": state.split_pages, # 保留split_pages,以便后续访问图片
  308. "parsed_results": parsed_results,
  309. "processed_pages": len(parsed_results),
  310. "is_complete": True
  311. }
  312. def _should_continue_parsing(self, state: PDFParsingState) -> str:
  313. """判断是否继续解析"""
  314. # 由于我们使用了并行处理,parse_image_node会一次性处理所有页面
  315. # 所以这里总是返回"complete"
  316. return "complete"
  317. def create_ragflow_chunk(self, state: PDFParsingState):
  318. """单页上传节点"""
  319. print(f"开始单页上传,共 {len(state.parsed_results)} 页")
  320. # 遍历所有解析结果,上传单页
  321. # 遍历所有解析结果,生成向量化文档
  322. for i, parsed_result in enumerate(state.parsed_results):
  323. page_number = parsed_result.get("page_number")
  324. text = parsed_result.get("content", "")
  325. image_path = state.split_pages[i].get("image_path")
  326. # 上传单页到RagFlow Chunk
  327. chunk = self.ragflow_service.create_chunk(dataset_id=state.page_dataset_id,
  328. document_id=state.page_document_id,
  329. content=text)
  330. chunk_id = chunk["chunk"]["id"]
  331. print(f"上传第 {page_number} 页,Chunk ID: {chunk_id}")
  332. # # 睡眠50ms,避免上传过快
  333. # time.sleep(0.05)
  334. # result = get_client().update(database_name=state.dataset_name, table_name="", cond=f"id = '{chunk_id}'", data={"img_id": img_id})
  335. # print(f"更新第 {page_number} 页,Chunk ID: {chunk_id},结果: {result}")
  336. def _vectorize_store_node(self, state: PDFParsingState) -> Dict[str, Any]:
  337. """向量化入库节点"""
  338. print(f"开始向量化入库,共 {len(state.parsed_results)} 页")
  339. # 创建索引(如果不存在)
  340. index_name = f"{vector_db_settings.infinity_table_name}"
  341. # get_client().create_index()
  342. # 准备要入库的文档列表
  343. documents_to_store = []
  344. # 获取文件名和总页数
  345. file_name = os.path.basename(state.pdf_path)
  346. file_page_count = len(state.split_pages)
  347. # 遍历所有解析结果,生成向量化文档
  348. for i, parsed_result in enumerate(state.parsed_results):
  349. try:
  350. page_number = parsed_result.get("page_number")
  351. text = parsed_result.get("content", "")
  352. image = state.split_pages[i].get("image")
  353. image_path = state.split_pages[i].get("image_path")
  354. # 获取多模态嵌入向量
  355. print(f"正在生成第 {page_number} 页的多模态嵌入...")
  356. embedding = self.embedding_model.get_multimodal_embedding(text, image)
  357. # 生成1024维稠密向量(如果嵌入向量维度不是1024,这里需要处理)
  358. dense_vector_1024 = embedding[:1024] # 取前1024维
  359. # 创建文档
  360. document = {
  361. "id": f"{state.document_id}_{page_number}",
  362. "file_name": file_name,
  363. "file_page_count": file_page_count,
  364. "page_number": page_number,
  365. "content": text,
  366. "image_path": image_path,
  367. "dense_vector_1024": dense_vector_1024,
  368. "dataset_id": state.dataset_id,
  369. "document_id": state.document_id
  370. }
  371. documents_to_store.append(document)
  372. print(f"第 {page_number} 页向量化完成")
  373. except Exception as e:
  374. print(f"第 {i+1} 页向量化失败: {str(e)}")
  375. # 批量入库
  376. if documents_to_store:
  377. print(f"开始入库,共 {len(documents_to_store)} 个文档")
  378. result = get_client().insert(
  379. table_name=vector_db_settings.infinity_table_name,
  380. documents=documents_to_store,
  381. database_name=vector_db_settings.infinity_database
  382. )
  383. print(f"入库结果: {result}")
  384. return {
  385. "vectorized_results": documents_to_store,
  386. "vectorized_pages": len(documents_to_store),
  387. "is_complete": True
  388. }
  389. def _complete_node(self, state: PDFParsingState) -> Dict[str, Any]:
  390. """完成节点"""
  391. print(f"PDF解析工作流完成,共解析 {len(state.parsed_results)} 页,向量化 {state.vectorized_pages} 页")
  392. # 判断ragflow是否解析成功
  393. return {
  394. "is_complete": True
  395. }
  396. def run(self, pdf_path: str, page_dataset_id: str, dataset_name: str) -> Dict[str, Any]:
  397. """
  398. 运行PDF解析工作流
  399. Args:
  400. pdf_path: PDF文件路径
  401. page_dataset_id: 数据集ID
  402. ragflow_api_url: RAGFLOW API URL
  403. rag_flow_api_key: RAGFLOW API密钥
  404. Returns:
  405. Dict: 包含最终状态的字典
  406. """
  407. initial_state = PDFParsingState(
  408. pdf_path=pdf_path,
  409. page_dataset_id=page_dataset_id,
  410. dataset_name=dataset_name
  411. )
  412. result = self.workflow.invoke(initial_state, config={"callbacks": [self.langfuse_handler]})
  413. # 检查结果类型,如果是字典直接返回,否则调用dict()方法
  414. if isinstance(result, dict):
  415. return result
  416. else:
  417. return result.dict()