import sys import os import concurrent.futures from concurrent.futures import ThreadPoolExecutor # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from langgraph.graph import StateGraph, START, END from langgraph.graph.message import add_messages from typing import List, Dict, Any from pydantic import BaseModel, Field, ConfigDict from parser.pdf_parser.pdf_splitter import PDFSplitter from model.qwen_vl import QWenVLParser from utils.ragflow.ragflow_service import RAGFlowService from model.multimodal_embedding import Embedding from conf.config import ModelConfig, VectorDBConfig from utils.infinity import get_client # 定义工作流状态类 class PDFParsingState(BaseModel): """PDF解析工作流状态""" model_config = ConfigDict(arbitrary_types_allowed=True) pdf_path: str = Field(..., description="PDF文件路径") dataset_id: str = Field(..., description="数据集ID") ragflow_service: RAGFlowService = Field(default_factory=RAGFlowService, description="RAGFLOW服务") embedding_model: Embedding = Field(default_factory=Embedding, description="多模态嵌入模型实例") document_id: str = Field(default="", description="上传后的文档ID") split_pages: List[Dict[str, Any]] = Field(default_factory=list, description="拆分后的页面列表") current_page: Dict[str, Any] = Field(default_factory=dict, description="当前处理的页面") parsed_results: List[Dict[str, Any]] = Field(default_factory=list, description="解析结果列表") vectorized_results: List[Dict[str, Any]] = Field(default_factory=list, description="向量化结果列表") processed_pages: int = Field(default=0, description="已处理的页面数量") vectorized_pages: int = Field(default=0, description="已向量化的页面数量") is_complete: bool = Field(default=False, description="是否处理完成") # 创建工作流构建器 class PDFParsingWorkflow: """PDF扫描件拆分解析工作流""" def __init__(self, model_name: str = "Qwen/Qwen3-VL-8B-Instruct"): """ 初始化PDF解析工作流 Args: model_name: QWEN VL模型名称 """ self.model_name = model_name self.workflow = self._build_workflow() def _build_workflow(self): """构建langgraph工作流,实现基于条件路由的并行处理""" # 创建状态图 graph = StateGraph(PDFParsingState) # 添加上传文档节点 graph.add_node("upload_document", self._upload_document_node) # 添加解析文档节点 graph.add_node("parse_document", self._parse_document_node) # 添加拆分PDF节点 graph.add_node("split_pdf", self._split_pdf_node) # 添加解析图像节点 graph.add_node("parse_image", self._parse_image_node) # 添加向量化入库节点 graph.add_node("vectorize_store", self._vectorize_store_node) # 添加完成节点 graph.add_node("complete", self._complete_node) # 定义边 # 定义RagFLow解析文档 graph.add_edge(START, "upload_document") # 添加解析文档边 graph.add_edge("upload_document", "parse_document") graph.add_edge("parse_document", "split_pdf") # 定义图片解析边 graph.add_edge("split_pdf", "parse_image") # 添加条件边:判断是否继续解析 graph.add_conditional_edges( "parse_image", self._should_continue_parsing, { "continue": "parse_image", "complete": "vectorize_store" } ) # 添加向量化入库边 graph.add_edge("vectorize_store", "complete") graph.add_edge("complete", END) # 编译工作流 return graph.compile() def _upload_document_node(self, state: PDFParsingState) -> Dict[str, Any]: """RAGFLOW上传文档节点""" print(f"开始上传文档到数据集 {state.dataset_id}: {state.pdf_path}") try: # 上传文档 document_info_list = state.ragflow_service.upload_document( dataset_id=state.dataset_id, file_path=state.pdf_path ) # 检查响应 if document_info_list and len(document_info_list) > 0: document_id = document_info_list[0]["id"] print(f"文档上传成功,文档ID: {document_id}") return { "document_id": document_id } else: print("文档上传失败: 未返回有效的文档信息") raise Exception("文档上传失败: 未返回有效的文档信息") except Exception as e: print(f"上传文档时出错: {str(e)}") raise def _parse_document_node(self, state: PDFParsingState) -> Dict[str, Any]: """RAGFLOW文档解析节点""" print(f"开始解析文档 {state.dataset_id}: {state.document_id}") try: # 解析文档 parse_success = state.ragflow_service.parse_document( dataset_id=state.dataset_id, document_ids=[state.document_id] ) # 检查响应parse_success为bool if parse_success: print(f"文档解析成功,文档ID: {state.document_id}") # 返回空列表,因为parsed_results字段期望是列表类型 return { "parsed_results": [] } else: print("文档解析失败: 未返回有效的解析结果") raise Exception("文档解析失败: 未返回有效的解析结果") except Exception as e: print(f"解析文档时出错: {str(e)}") raise def _split_pdf_node(self, state: PDFParsingState) -> Dict[str, Any]: """拆分PDF节点""" print(f"开始拆分PDF: {state.pdf_path}") # 拆分PDF splitter = PDFSplitter() split_pages = splitter.split_pdf(state.pdf_path) print(f"PDF拆分完成,共 {len(split_pages)} 页") return { "split_pages": split_pages, "parsed_results": [], "processed_pages": 0, "is_complete": False } def _parse_single_page(self, page: Dict[str, Any], model_name: str) -> Dict[str, Any]: """解析单个页面(用于并行处理)""" prompt = """ 角色定位:你是一位顶尖的儿童绘本分析师与视觉工程专家,擅长将插画视觉信息转化为高精度的结构化元数据。 任务描述:请深度解析提供的绘本页面,不仅提取基本要素,还要进行“像素级”的特征拆解。重点关注角色的微表情、服饰纹理、环境光效、构图视角及整体艺术风格。 提取维度: 艺术风格 (Style):包括笔触(如水彩、蜡笔)、线条粗细、整体色调偏好。 角色特征 (Character):五官细节、肢体动作的动态感、衣物材质、标志性配饰。 空间构图 (Composition):透视关系(仰拍/俯拍)、视觉焦点、前景/中景/背景的层次。 物品与环境 (Object & Environment):物体的精确形状、材质光泽、环境中的自然元素(风吹草动的方向等)。 内容标签 (content_tags):请从以下三个维度进行打标: 主题维度(如:自然探索、家庭学校、科学科普、传统文化) 具体对象(如:昆虫、交通工具、五官、家具) 情感氛围(如:惊喜、友爱、好奇、安静) 能力标签 (ability_tags):请严格参照以下教育能力模型,根据图中元素体现的教育价值进行选择: [语言表达、逻辑思维、数理逻辑、空间想象、艺术创造、身体协调、自我认知、社会交往、自然观察、情绪管理]。 输出约束: 保持描述积极向上,符合0-12岁儿童阅读的安全标准。 描述精度:单条描述需包含具体视觉属性(颜色、形状、质感),字数控制在50字以内。 格式要求:严谨按照指定的JSON结构输出。 json格式: { "page_meta": { "page_number": 1, "content_text": "页面原文本内容", "overall_style": { "art_medium": "艺术媒介(如:手绘水彩、矢量平涂、3D渲染)", "color_palette": ["主色调1", "主色调2"], "lighting": "光影描述(如:柔和侧光、清晨自然光)", "composition": "构图(如:三分法、对角线构图、大远景)" } }, "elements": [ { "element_name": "元素名称(如:小兔子)", "character_name": "角色名称(如果有,没有的话,角色名称为空字符串)", "category": "分类(角色/场景/道具)", "spatial_layer": "所在层级(前景/中景/背景)", "visual_attributes": { "appearance": "外貌细节描述(发型、五官、材质感)", "action_emotion": "行为动作与情感流露", "color_detail": "像素级颜色描述(如:淡茱萸粉、薄荷绿)", "ability_tag": "如果为角色,其表现出的正面能力/特质" }, "content_tags": { "theme": ["自然", "社交", "生活常识"], "object": ["动物", "服装", "植物"], "emotion": ["快乐", "勇敢"] }, "ability_tags": ["语言表达", "逻辑思维", "自我认知"], "description": "综合性简洁描述(50字内)" } ] } """ page_number = page["page_number"] image = page["image"] print(f"开始解析第 {page_number} 页") # 使用QWEN VL模型解析图像 parser = QWenVLParser(model_name) result = parser.parse_image(image, page_number, prompt) print(f"第 {page_number} 页解析完成") return result def _parse_image_node(self, state: PDFParsingState) -> Dict[str, Any]: """解析图像节点,使用并行处理""" if not state.split_pages: return state.dict() print(f"开始并行解析 {len(state.split_pages)} 页") parsed_results = [] # 使用ThreadPoolExecutor实现并行处理 with ThreadPoolExecutor(max_workers=5, thread_name_prefix="parse_page_") as executor: # 提交所有页面解析任务 future_to_page = { executor.submit(self._parse_single_page, page, self.model_name): page for page in state.split_pages } # 收集解析结果 for future in concurrent.futures.as_completed(future_to_page): try: result = future.result() parsed_results.append(result) except Exception as e: page = future_to_page[future] print(f"解析第 {page['page_number']} 页时出错: {str(e)}") # 按页码排序结果 parsed_results.sort(key=lambda x: x["page_number"]) print(f"所有页面解析完成,共解析 {len(parsed_results)} 页") return { "split_pages": state.split_pages, # 保留split_pages,以便后续访问图片 "parsed_results": parsed_results, "processed_pages": len(parsed_results), "is_complete": True } def _should_continue_parsing(self, state: PDFParsingState) -> str: """判断是否继续解析""" # 由于我们使用了并行处理,parse_image_node会一次性处理所有页面 # 所以这里总是返回"complete" return "complete" def _vectorize_store_node(self, state: PDFParsingState) -> Dict[str, Any]: """向量化入库节点""" print(f"开始向量化入库,共 {len(state.parsed_results)} 页") # 创建索引(如果不存在) index_name = f"{VectorDBConfig.get_infinity_table_name()}" state.vector_db.create_index(index_name) # 准备要入库的文档列表 documents_to_store = [] # 获取文件名和总页数 file_name = os.path.basename(state.pdf_path) file_page_count = len(state.split_pages) # 遍历所有解析结果,生成向量化文档 for i, parsed_result in enumerate(state.parsed_results): try: page_number = parsed_result.get("page_number") text = parsed_result.get("content", "") image = state.split_pages[i].get("image") image_path = state.split_pages[i].get("image_path") # 获取多模态嵌入向量 print(f"正在生成第 {page_number} 页的多模态嵌入...") embedding = state.embedding_model.get_multimodal_embedding(text, image) # 生成1024维稠密向量(如果嵌入向量维度不是1024,这里需要处理) dense_vector_1024 = embedding[:1024] # 取前1024维 # 创建文档 document = { "id": f"{state.document_id}_{page_number}", "file_name": file_name, "file_page_count": file_page_count, "page_number": page_number, "content": text, "image_path": image_path, "dense_vector_1024": dense_vector_1024, "dataset_id": state.dataset_id, "document_id": state.document_id } documents_to_store.append(document) print(f"第 {page_number} 页向量化完成") except Exception as e: print(f"第 {i+1} 页向量化失败: {str(e)}") # 批量入库 if documents_to_store: print(f"开始入库,共 {len(documents_to_store)} 个文档") infinity_client = get_client() result = infinity_client.insert(index_name, documents_to_store) print(f"入库结果: {result}") return { "vectorized_results": documents_to_store, "vectorized_pages": len(documents_to_store), "is_complete": True } def _complete_node(self, state: PDFParsingState) -> Dict[str, Any]: """完成节点""" print(f"PDF解析工作流完成,共解析 {len(state.parsed_results)} 页,向量化 {state.vectorized_pages} 页") # 判断ragflow是否解析成功 return { "is_complete": True } def run(self, pdf_path: str, dataset_id: str, ragflow_api_url: str, rag_flow_api_key: str) -> Dict[str, Any]: """ 运行PDF解析工作流 Args: pdf_path: PDF文件路径 dataset_id: 数据集ID ragflow_api_url: RAGFLOW API URL rag_flow_api_key: RAGFLOW API密钥 Returns: Dict: 包含最终状态的字典 """ initial_state = PDFParsingState( pdf_path=pdf_path, dataset_id=dataset_id, embedding_model=Embedding(model_name=ModelConfig.get_multimodal_embedding_model_name(), api_key=ModelConfig.get_dashscope_api_key()), ragflow_service=RAGFlowService(base_url=ragflow_api_url, api_key=rag_flow_api_key) ) result = self.workflow.invoke(initial_state) # 检查结果类型,如果是字典直接返回,否则调用dict()方法 if isinstance(result, dict): return result else: return result.dict()