import sys import os import concurrent.futures from concurrent.futures import ThreadPoolExecutor # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from typing import List, Dict, Any, Annotated from pydantic import BaseModel, Field from services.pdf_parser.pdf_splitter import PDFSplitter from services.model.qwen_vl import QWenVLParser # 定义工作流状态类 class PDFParsingState(BaseModel): """PDF解析工作流状态""" pdf_path: str = Field(..., description="PDF文件路径") split_pages: List[Dict[str, Any]] = Field(default_factory=list, description="拆分后的页面列表") current_page: Dict[str, Any] = Field(default_factory=dict, description="当前处理的页面") parsed_results: List[Dict[str, Any]] = Field(default_factory=list, description="解析结果列表") processed_pages: int = Field(default=0, description="已处理的页面数量") is_complete: bool = Field(default=False, description="是否处理完成") # 创建工作流构建器 class PDFParsingWorkflow: """PDF扫描件拆分解析工作流""" def __init__(self, model_name: str = "Qwen/Qwen3-VL-8B-Instruct"): """ 初始化PDF解析工作流 Args: model_name: QWEN VL模型名称 """ self.model_name = model_name self.workflow = self._build_workflow() def _build_workflow(self): """构建langgraph工作流,实现基于条件路由的并行处理""" # 创建状态图 graph = StateGraph(PDFParsingState) # 添加拆分PDF节点 graph.add_node("split_pdf", self._split_pdf_node) # 添加解析图像节点 graph.add_node("parse_image", self._parse_image_node) # 添加完成节点 graph.add_node("complete", self._complete_node) # 定义边 graph.add_edge(START, "split_pdf") graph.add_edge("split_pdf", "parse_image") # 添加条件边:判断是否继续解析 graph.add_conditional_edges( "parse_image", self._should_continue_parsing, { "continue": "parse_image", "complete": "complete" } ) graph.add_edge("complete", END) # 编译工作流 return graph.compile() def _split_pdf_node(self, state: PDFParsingState) -> Dict[str, Any]: """拆分PDF节点""" print(f"开始拆分PDF: {state.pdf_path}") # 拆分PDF splitter = PDFSplitter() split_pages = splitter.split_pdf(state.pdf_path) print(f"PDF拆分完成,共 {len(split_pages)} 页") return { "split_pages": split_pages, "parsed_results": [], "processed_pages": 0, "is_complete": False } def _parse_single_page(self, page: Dict[str, Any], model_name: str) -> Dict[str, Any]: """解析单个页面(用于并行处理)""" prompt = """ 你是一个画本类童书的创作者,创作的内容适合0-12岁的儿童 任务:你需要根据现有童书插画与内容,提取出插画中的各种要素、行为、情感,并针对每个要素进行独立描述 注意:描述内容要积极正向,符合社会主义核心价值观 """ page_number = page["page_number"] image = page["image"] print(f"开始解析第 {page_number} 页") # 使用QWEN VL模型解析图像 parser = QWenVLParser(model_name) result = parser.parse_image(image, page_number, prompt) print(f"第 {page_number} 页解析完成") return result def _parse_image_node(self, state: PDFParsingState) -> Dict[str, Any]: """解析图像节点,使用并行处理""" if not state.split_pages: return state.dict() print(f"开始并行解析 {len(state.split_pages)} 页") parsed_results = [] # 使用ThreadPoolExecutor实现并行处理 with ThreadPoolExecutor(max_workers=6) as executor: # 提交所有页面解析任务 future_to_page = { executor.submit(self._parse_single_page, page, self.model_name): page for page in state.split_pages } # 收集解析结果 for future in concurrent.futures.as_completed(future_to_page): try: result = future.result() parsed_results.append(result) except Exception as e: page = future_to_page[future] print(f"解析第 {page['page_number']} 页时出错: {str(e)}") # 按页码排序结果 parsed_results.sort(key=lambda x: x["page_number"]) print(f"所有页面解析完成,共解析 {len(parsed_results)} 页") return { "split_pages": state.split_pages, # 保留split_pages,以便后续访问图片 "parsed_results": parsed_results, "processed_pages": len(parsed_results), "is_complete": True } def _should_continue_parsing(self, state: PDFParsingState) -> str: """判断是否继续解析""" # 由于我们使用了并行处理,parse_image_node会一次性处理所有页面 # 所以这里总是返回"complete" return "complete" def _complete_node(self, state: PDFParsingState) -> Dict[str, Any]: """完成节点""" print(f"PDF解析工作流完成,共解析 {len(state.parsed_results)} 页") return { "is_complete": True } def run(self, pdf_path: str) -> Dict[str, Any]: """ 运行PDF解析工作流 Args: pdf_path: PDF文件路径 Returns: Dict: 包含最终状态的字典 """ initial_state = PDFParsingState(pdf_path=pdf_path) result = self.workflow.invoke(initial_state) # 检查结果类型,如果是字典直接返回,否则调用dict()方法 if isinstance(result, dict): return result else: return result.dict()