workflow.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. import sys
  2. import os
  3. import concurrent.futures
  4. from concurrent.futures import ThreadPoolExecutor
  5. # 添加项目根目录到Python路径
  6. sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
  7. from langgraph.graph import StateGraph, START, END
  8. from langgraph.graph.message import add_messages
  9. from typing import List, Dict, Any, Annotated
  10. from pydantic import BaseModel, Field
  11. from services.pdf_parser.pdf_splitter import PDFSplitter
  12. from services.model.qwen_vl import QWenVLParser
  13. # 定义工作流状态类
  14. class PDFParsingState(BaseModel):
  15. """PDF解析工作流状态"""
  16. pdf_path: str = Field(..., description="PDF文件路径")
  17. split_pages: List[Dict[str, Any]] = Field(default_factory=list, description="拆分后的页面列表")
  18. current_page: Dict[str, Any] = Field(default_factory=dict, description="当前处理的页面")
  19. parsed_results: List[Dict[str, Any]] = Field(default_factory=list, description="解析结果列表")
  20. processed_pages: int = Field(default=0, description="已处理的页面数量")
  21. is_complete: bool = Field(default=False, description="是否处理完成")
  22. # 创建工作流构建器
  23. class PDFParsingWorkflow:
  24. """PDF扫描件拆分解析工作流"""
  25. def __init__(self, model_name: str = "Qwen/Qwen3-VL-8B-Instruct"):
  26. """
  27. 初始化PDF解析工作流
  28. Args:
  29. model_name: QWEN VL模型名称
  30. """
  31. self.model_name = model_name
  32. self.workflow = self._build_workflow()
  33. def _build_workflow(self):
  34. """构建langgraph工作流,实现基于条件路由的并行处理"""
  35. # 创建状态图
  36. graph = StateGraph(PDFParsingState)
  37. # 添加拆分PDF节点
  38. graph.add_node("split_pdf", self._split_pdf_node)
  39. # 添加解析图像节点
  40. graph.add_node("parse_image", self._parse_image_node)
  41. # 添加完成节点
  42. graph.add_node("complete", self._complete_node)
  43. # 定义边
  44. graph.add_edge(START, "split_pdf")
  45. graph.add_edge("split_pdf", "parse_image")
  46. # 添加条件边:判断是否继续解析
  47. graph.add_conditional_edges(
  48. "parse_image",
  49. self._should_continue_parsing,
  50. {
  51. "continue": "parse_image",
  52. "complete": "complete"
  53. }
  54. )
  55. graph.add_edge("complete", END)
  56. # 编译工作流
  57. return graph.compile()
  58. def _split_pdf_node(self, state: PDFParsingState) -> Dict[str, Any]:
  59. """拆分PDF节点"""
  60. print(f"开始拆分PDF: {state.pdf_path}")
  61. # 拆分PDF
  62. splitter = PDFSplitter()
  63. split_pages = splitter.split_pdf(state.pdf_path)
  64. print(f"PDF拆分完成,共 {len(split_pages)} 页")
  65. return {
  66. "split_pages": split_pages,
  67. "parsed_results": [],
  68. "processed_pages": 0,
  69. "is_complete": False
  70. }
  71. def _parse_single_page(self, page: Dict[str, Any], model_name: str) -> Dict[str, Any]:
  72. """解析单个页面(用于并行处理)"""
  73. prompt = """
  74. 你是一个画本类童书的创作者,创作的内容适合0-12岁的儿童
  75. 任务:你需要根据现有童书插画与内容,提取出插画中的各种要素、行为、情感,并针对每个要素进行独立描述
  76. 注意:描述内容要积极正向,符合社会主义核心价值观
  77. """
  78. page_number = page["page_number"]
  79. image = page["image"]
  80. print(f"开始解析第 {page_number} 页")
  81. # 使用QWEN VL模型解析图像
  82. parser = QWenVLParser(model_name)
  83. result = parser.parse_image(image, page_number, prompt)
  84. print(f"第 {page_number} 页解析完成")
  85. return result
  86. def _parse_image_node(self, state: PDFParsingState) -> Dict[str, Any]:
  87. """解析图像节点,使用并行处理"""
  88. if not state.split_pages:
  89. return state.dict()
  90. print(f"开始并行解析 {len(state.split_pages)} 页")
  91. parsed_results = []
  92. # 使用ThreadPoolExecutor实现并行处理
  93. with ThreadPoolExecutor(max_workers=6) as executor:
  94. # 提交所有页面解析任务
  95. future_to_page = {
  96. executor.submit(self._parse_single_page, page, self.model_name): page
  97. for page in state.split_pages
  98. }
  99. # 收集解析结果
  100. for future in concurrent.futures.as_completed(future_to_page):
  101. try:
  102. result = future.result()
  103. parsed_results.append(result)
  104. except Exception as e:
  105. page = future_to_page[future]
  106. print(f"解析第 {page['page_number']} 页时出错: {str(e)}")
  107. # 按页码排序结果
  108. parsed_results.sort(key=lambda x: x["page_number"])
  109. print(f"所有页面解析完成,共解析 {len(parsed_results)} 页")
  110. return {
  111. "split_pages": state.split_pages, # 保留split_pages,以便后续访问图片
  112. "parsed_results": parsed_results,
  113. "processed_pages": len(parsed_results),
  114. "is_complete": True
  115. }
  116. def _should_continue_parsing(self, state: PDFParsingState) -> str:
  117. """判断是否继续解析"""
  118. # 由于我们使用了并行处理,parse_image_node会一次性处理所有页面
  119. # 所以这里总是返回"complete"
  120. return "complete"
  121. def _complete_node(self, state: PDFParsingState) -> Dict[str, Any]:
  122. """完成节点"""
  123. print(f"PDF解析工作流完成,共解析 {len(state.parsed_results)} 页")
  124. return {
  125. "is_complete": True
  126. }
  127. def run(self, pdf_path: str) -> Dict[str, Any]:
  128. """
  129. 运行PDF解析工作流
  130. Args:
  131. pdf_path: PDF文件路径
  132. Returns:
  133. Dict: 包含最终状态的字典
  134. """
  135. initial_state = PDFParsingState(pdf_path=pdf_path)
  136. result = self.workflow.invoke(initial_state)
  137. # 检查结果类型,如果是字典直接返回,否则调用dict()方法
  138. if isinstance(result, dict):
  139. return result
  140. else:
  141. return result.dict()