| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493 |
- import os
- import concurrent.futures
- import time
- from concurrent.futures import ThreadPoolExecutor
- from langgraph.graph import StateGraph, START, END
- from typing import List, Dict, Any
- from pydantic import BaseModel, Field, ConfigDict
- from parser.pdf_parser.pdf_splitter import PDFSplitter
- from model.qwen_vl import QWenVLParser
- from utils.ragflow.ragflow_service import RAGFlowService
- from model.multimodal_embedding import Embedding
- from conf.settings import model_settings, vector_db_settings
- from utils.infinity import get_client
- from langfuse.langchain import CallbackHandler
- from conf.rag_parser_config import RagParserDefaults
- # 定义工作流状态类
- class PDFParsingState(BaseModel):
- """PDF解析工作流状态"""
- model_config = ConfigDict(arbitrary_types_allowed=True)
- pdf_path: str = Field(..., description="PDF文件路径")
- dataset_name: str = Field(..., description="数据集名称")
- dataset_id: str = Field(default="", description="RAGFLOW数据集ID")
- document_id: str = Field(default="", description="上传后的文档ID")
- page_dataset_id: str = Field(..., description="页面数据集ID")
- page_document_id: str = Field(default="", description="上传后的页面文档ID")
- split_pages: List[Dict[str, Any]] = Field(default_factory=list, description="拆分后的页面列表")
- current_page: Dict[str, Any] = Field(default_factory=dict, description="当前处理的页面")
- parsed_results: List[Dict[str, Any]] = Field(default_factory=list, description="解析结果列表")
- vectorized_results: List[Dict[str, Any]] = Field(default_factory=list, description="向量化结果列表")
- processed_pages: int = Field(default=0, description="已处理的页面数量")
- vectorized_pages: int = Field(default=0, description="已向量化的页面数量")
- is_complete: bool = Field(default=False, description="是否处理完成")
- # 创建工作流构建器
- class PDFParsingWorkflow:
- """PDF扫描件拆分解析工作流"""
-
- def __init__(self, model_name: str = "Qwen/Qwen3-VL-8B-Instruct"):
- """
- 初始化PDF解析工作流
-
- Args:
- model_name: QWEN VL模型名称
- """
- self.model_name = model_name
- self.workflow = self._build_workflow()
- self.ragflow_service = RAGFlowService()
- self.langfuse_handler = CallbackHandler()
- self.embedding_model = Embedding(model_name=model_settings.multimodal_embedding_model_name, api_key=model_settings.dashscope_api_key)
-
-
- def _build_workflow(self):
- """构建langgraph工作流,实现基于条件路由的并行处理"""
- # 创建状态图
- graph = StateGraph(PDFParsingState)
-
- # 添加查询知识库是否存在节点
- graph.add_node("get_ragflow_dataset", self.get_ragflow_dataset)
- # 添加创建知识库节点
- graph.add_node("create_ragflow_dataset", self.create_ragflow_dataset)
- # 添加上传文档节点
- graph.add_node("upload_document", self._upload_document_node)
- # 添加上传图书页面文档节点
- graph.add_node("upload_page_document", self._upload_page_document_node)
-
- # 添加解析文档节点
- graph.add_node("parse_document", self._parse_document_node)
-
- # 添加拆分PDF节点
- graph.add_node("split_pdf", self._split_pdf_node)
-
- # 添加解析图像节点
- graph.add_node("parse_image", self._parse_image_node)
- # 添加解析图书页面图像节点
- graph.add_node("create_ragflow_chunk", self.create_ragflow_chunk)
-
- # 添加向量化入库节点
- graph.add_node("vectorize_store", self._vectorize_store_node)
-
- # 添加完成节点
- graph.add_node("complete", self._complete_node)
-
- # 定义边
- # 查询知识库是否存在
- graph.add_edge(START, "get_ragflow_dataset")
- # 添加条件边,判断知识库是否存在
- graph.add_conditional_edges(
- "get_ragflow_dataset",
- self._check_dataset_exists,
- {
- "exists": "upload_document",
- "not_exists": "create_ragflow_dataset"
- }
- )
- # 添加解析文档边
- graph.add_edge("create_ragflow_dataset", "upload_document")
- graph.add_edge("upload_document", "parse_document")
- graph.add_edge("upload_document", "upload_page_document")
- graph.add_edge("parse_document", "split_pdf")
- # 定义图片解析边
- graph.add_edge("split_pdf", "parse_image")
-
- # 添加条件边:判断是否继续解析
- graph.add_conditional_edges(
- "parse_image",
- self._should_continue_parsing,
- {
- "continue": "parse_image",
- "complete": "vectorize_store",
- }
- )
-
- # 添加从vectorize_store到create_ragflow_chunk的边
- graph.add_edge("vectorize_store", "create_ragflow_chunk")
-
- graph.add_edge("create_ragflow_chunk", "complete")
-
- graph.add_edge("complete", END)
-
- # 编译工作流
- return graph.compile()
-
- def get_ragflow_dataset(self, state: PDFParsingState) -> str:
- """获取RAGFLOW数据集ID"""
- try:
- dataset = self.ragflow_service.get_dataset(name=state.dataset_name)
- dataset_id = dataset["id"] if dataset else ""
- print(f"数据集 {state.dataset_name} 的ID为: {dataset_id}")
- return {
- "dataset_id": dataset_id
- }
- except Exception as e:
- raise Exception(f"获取数据集ID时出错: {str(e)}")
- def _check_dataset_exists(self, state: PDFParsingState) -> str:
- """检查RAGFLOW数据集是否存在"""
- # 判断state.dataset_id是否为空,为空则返回"not_exists",否则返回"exists"
- if state.dataset_id == "":
- return "not_exists"
- else:
- return "exists"
-
- def create_ragflow_dataset(self, state: PDFParsingState) -> str:
- """创建RAGFLOW数据集"""
- print(f"开始创建数据集: {state.dataset_name}")
-
- try:
- # 创建数据集
- dataset = self.ragflow_service.create_dataset(name=state.dataset_name, description="",
- permission=RagParserDefaults.DATASET_PERMISSION,
- chunk_method=RagParserDefaults.DATASET_CHUNK_METHOD,
- parser_config=RagParserDefaults.DATASET_CONFIG_DICT)
- dataset_id = dataset["id"]
- print(f"数据集创建成功,数据集ID: {dataset_id}")
- return {
- "dataset_id": dataset_id
- }
- except Exception as e:
- print(f"创建数据集时出错: {str(e)}")
- raise
- def _upload_document_node(self, state: PDFParsingState) -> Dict[str, Any]:
- """RAGFLOW上传文档节点"""
- print(f"开始上传文档到数据集 {state.dataset_id}: {state.pdf_path}")
-
- try:
- # 上传文档
- document_info_list = self.ragflow_service.upload_document(
- dataset_id=state.dataset_id,
- file_path=state.pdf_path
- )
-
- # 检查响应
- if document_info_list and len(document_info_list) > 0:
- document_id = document_info_list[0]["id"]
- print(f"文档上传成功,文档ID: {document_id}")
- return {
- "document_id": document_id,
- }
- else:
- print("文档上传失败: 未返回有效的文档信息")
- raise Exception("文档上传失败: 未返回有效的文档信息")
- except Exception as e:
- print(f"上传文档时出错: {str(e)}")
- raise
- def _upload_page_document_node(self, state: PDFParsingState) -> Dict[str, Any]:
- """RAGFLOW上传页面文档节点"""
- print(f"开始上传页面文档到数据集 {state.dataset_id}: {state.pdf_path}")
-
- try:
- # 上传文档
- document_info_list = self.ragflow_service.upload_document(
- dataset_id=state.page_dataset_id,
- file_path=state.pdf_path
- )
-
- # 检查响应
- if document_info_list and len(document_info_list) > 0:
- page_document_id = document_info_list[0]["id"]
- print(f"文档上传成功,文档ID: {page_document_id}")
- return {
- "page_document_id": page_document_id,
- }
- else:
- print("文档上传失败: 未返回有效的文档信息")
- raise Exception("文档上传失败: 未返回有效的文档信息")
- except Exception as e:
- print(f"上传文档时出错: {str(e)}")
- raise
- def _parse_document_node(self, state: PDFParsingState) -> Dict[str, Any]:
- """RAGFLOW文档解析节点"""
- print(f"开始解析文档 {state.dataset_id}: {state.document_id}")
-
- try:
- # 解析文档
- parse_success = self.ragflow_service.parse_document(
- dataset_id=state.dataset_id,
- document_ids=[state.document_id]
- )
-
- # 检查响应parse_success为bool
- if parse_success:
- print(f"文档解析成功,文档ID: {state.document_id}")
- # 返回空列表,因为parsed_results字段期望是列表类型
- return {
- "parsed_results": []
- }
- else:
- print("文档解析失败: 未返回有效的解析结果")
- raise Exception("文档解析失败: 未返回有效的解析结果")
- except Exception as e:
- print(f"解析文档时出错: {str(e)}")
- raise
-
- def _split_pdf_node(self, state: PDFParsingState) -> Dict[str, Any]:
- """拆分PDF节点"""
- print(f"开始拆分PDF: {state.pdf_path}")
-
- # 拆分PDF
- splitter = PDFSplitter()
- split_pages = splitter.split_pdf(state.pdf_path)
-
- print(f"PDF拆分完成,共 {len(split_pages)} 页")
-
- return {
- "split_pages": split_pages,
- "parsed_results": [],
- "processed_pages": 0,
- "is_complete": False
- }
-
- def _parse_single_page(self, page: Dict[str, Any], model_name: str) -> Dict[str, Any]:
- """解析单个页面(用于并行处理)"""
- prompt = """
- 角色定位:你是一位顶尖的儿童绘本分析师与视觉工程专家,擅长将插画视觉信息转化为高精度的结构化元数据。
- 任务描述:请深度解析提供的绘本页面,不仅提取基本要素,还要进行“像素级”的特征拆解。重点关注角色的微表情、服饰纹理、环境光效、构图视角及整体艺术风格。
- 提取维度:
- 艺术风格 (Style):包括笔触(如水彩、蜡笔)、线条粗细、整体色调偏好。
- 角色特征 (Character):五官细节、肢体动作的动态感、衣物材质、标志性配饰。
- 空间构图 (Composition):透视关系(仰拍/俯拍)、视觉焦点、前景/中景/背景的层次。
- 物品与环境 (Object & Environment):物体的精确形状、材质光泽、环境中的自然元素(风吹草动的方向等)。
- 内容标签 (content_tags):请从以下三个维度进行打标:
- 主题维度(如:自然探索、家庭学校、科学科普、传统文化)
- 具体对象(如:昆虫、交通工具、五官、家具)
- 情感氛围(如:惊喜、友爱、好奇、安静)
- 能力标签 (ability_tags):请严格参照以下教育能力模型,根据图中元素体现的教育价值进行选择:
- [语言表达、逻辑思维、数理逻辑、空间想象、艺术创造、身体协调、自我认知、社会交往、自然观察、情绪管理]。
- 输出约束:
- 保持描述积极向上,符合0-12岁儿童阅读的安全标准。
- 描述精度:单条描述需包含具体视觉属性(颜色、形状、质感),字数控制在50字以内。
- 格式要求:严谨按照指定的JSON结构输出。
- json格式:
- {
- "page_meta": {
- "page_number": 1,
- "content_text": "页面原文本内容",
- "overall_style": {
- "art_medium": "艺术媒介(如:手绘水彩、矢量平涂、3D渲染)",
- "color_palette": ["主色调1", "主色调2"],
- "lighting": "光影描述(如:柔和侧光、清晨自然光)",
- "composition": "构图(如:三分法、对角线构图、大远景)"
- }
- },
- "elements": [
- {
- "element_name": "元素名称(如:小兔子)",
- "character_name": "角色名称(如果有,没有的话,角色名称为空字符串)",
- "category": "分类(角色/场景/道具)",
- "spatial_layer": "所在层级(前景/中景/背景)",
- "visual_attributes": {
- "appearance": "外貌细节描述(发型、五官、材质感)",
- "action_emotion": "行为动作与情感流露",
- "color_detail": "像素级颜色描述(如:淡茱萸粉、薄荷绿)",
- "ability_tag": "如果为角色,其表现出的正面能力/特质"
- },
- "content_tags": {
- "theme": ["自然", "社交", "生活常识"],
- "object": ["动物", "服装", "植物"],
- "emotion": ["快乐", "勇敢"]
- },
- "ability_tags": ["语言表达", "逻辑思维", "自我认知"],
- "description": "综合性简洁描述(50字内)"
- }
- ]
- }
- """
-
- page_number = page["page_number"]
- image = page["image"]
-
- print(f"开始解析第 {page_number} 页")
-
- # 使用QWEN VL模型解析图像
- parser = QWenVLParser(model_name)
- result = parser.parse_image(image, page_number, prompt)
-
- print(f"第 {page_number} 页解析完成")
- return result
- def _parse_image_node(self, state: PDFParsingState) -> Dict[str, Any]:
- """解析图像节点,使用并行处理"""
- if not state.split_pages:
- return state.dict()
-
- print(f"开始并行解析 {len(state.split_pages)} 页")
-
- parsed_results = []
-
- # 使用ThreadPoolExecutor实现并行处理
- with ThreadPoolExecutor(max_workers=5, thread_name_prefix="parse_page_") as executor:
- # 提交所有页面解析任务
- future_to_page = {
- executor.submit(self._parse_single_page, page, self.model_name): page
- for page in state.split_pages
- }
-
- # 收集解析结果
- for future in concurrent.futures.as_completed(future_to_page):
- try:
- result = future.result()
- parsed_results.append(result)
- except Exception as e:
- page = future_to_page[future]
- print(f"解析第 {page['page_number']} 页时出错: {str(e)}")
-
- # 按页码排序结果
- parsed_results.sort(key=lambda x: x["page_number"])
-
- print(f"所有页面解析完成,共解析 {len(parsed_results)} 页")
-
- return {
- "split_pages": state.split_pages, # 保留split_pages,以便后续访问图片
- "parsed_results": parsed_results,
- "processed_pages": len(parsed_results),
- "is_complete": True
- }
-
-
- def _should_continue_parsing(self, state: PDFParsingState) -> str:
- """判断是否继续解析"""
- # 由于我们使用了并行处理,parse_image_node会一次性处理所有页面
- # 所以这里总是返回"complete"
- return "complete"
-
- def create_ragflow_chunk(self, state: PDFParsingState):
- """单页上传节点"""
- print(f"开始单页上传,共 {len(state.parsed_results)} 页")
-
- # 遍历所有解析结果,上传单页
- # 遍历所有解析结果,生成向量化文档
- for i, parsed_result in enumerate(state.parsed_results):
- page_number = parsed_result.get("page_number")
- text = parsed_result.get("content", "")
- image_path = state.split_pages[i].get("image_path")
-
- # 上传单页到RagFlow Chunk
- chunk = self.ragflow_service.create_chunk(dataset_id=state.page_dataset_id,
- document_id=state.page_document_id,
- content=text)
- chunk_id = chunk["chunk"]["id"]
- print(f"上传第 {page_number} 页,Chunk ID: {chunk_id}")
- # # 睡眠50ms,避免上传过快
- # time.sleep(0.05)
- # result = get_client().update(database_name=state.dataset_name, table_name="", cond=f"id = '{chunk_id}'", data={"img_id": img_id})
- # print(f"更新第 {page_number} 页,Chunk ID: {chunk_id},结果: {result}")
- def _vectorize_store_node(self, state: PDFParsingState) -> Dict[str, Any]:
- """向量化入库节点"""
- print(f"开始向量化入库,共 {len(state.parsed_results)} 页")
-
- # 创建索引(如果不存在)
- index_name = f"{vector_db_settings.infinity_table_name}"
- # get_client().create_index()
-
- # 准备要入库的文档列表
- documents_to_store = []
-
- # 获取文件名和总页数
- file_name = os.path.basename(state.pdf_path)
- file_page_count = len(state.split_pages)
-
- # 遍历所有解析结果,生成向量化文档
- for i, parsed_result in enumerate(state.parsed_results):
- try:
- page_number = parsed_result.get("page_number")
- text = parsed_result.get("content", "")
- image = state.split_pages[i].get("image")
-
- image_path = state.split_pages[i].get("image_path")
- # 获取多模态嵌入向量
- print(f"正在生成第 {page_number} 页的多模态嵌入...")
- embedding = self.embedding_model.get_multimodal_embedding(text, image)
-
- # 生成1024维稠密向量(如果嵌入向量维度不是1024,这里需要处理)
- dense_vector_1024 = embedding[:1024] # 取前1024维
-
- # 创建文档
- document = {
- "id": f"{state.document_id}_{page_number}",
- "file_name": file_name,
- "file_page_count": file_page_count,
- "page_number": page_number,
- "content": text,
- "image_path": image_path,
- "dense_vector_1024": dense_vector_1024,
- "dataset_id": state.dataset_id,
- "document_id": state.document_id
- }
-
- documents_to_store.append(document)
- print(f"第 {page_number} 页向量化完成")
- except Exception as e:
- print(f"第 {i+1} 页向量化失败: {str(e)}")
-
- # 批量入库
- if documents_to_store:
- print(f"开始入库,共 {len(documents_to_store)} 个文档")
- result = get_client().insert(
- table_name=vector_db_settings.infinity_table_name,
- documents=documents_to_store,
- database_name=vector_db_settings.infinity_database
- )
- print(f"入库结果: {result}")
-
- return {
- "vectorized_results": documents_to_store,
- "vectorized_pages": len(documents_to_store),
- "is_complete": True
- }
-
- def _complete_node(self, state: PDFParsingState) -> Dict[str, Any]:
- """完成节点"""
- print(f"PDF解析工作流完成,共解析 {len(state.parsed_results)} 页,向量化 {state.vectorized_pages} 页")
- # 判断ragflow是否解析成功
- return {
- "is_complete": True
- }
-
- def run(self, pdf_path: str, page_dataset_id: str, dataset_name: str) -> Dict[str, Any]:
- """
- 运行PDF解析工作流
-
- Args:
- pdf_path: PDF文件路径
- page_dataset_id: 数据集ID
- ragflow_api_url: RAGFLOW API URL
- rag_flow_api_key: RAGFLOW API密钥
-
- Returns:
- Dict: 包含最终状态的字典
- """
- initial_state = PDFParsingState(
- pdf_path=pdf_path,
- page_dataset_id=page_dataset_id,
- dataset_name=dataset_name
- )
- result = self.workflow.invoke(initial_state, config={"callbacks": [self.langfuse_handler]})
-
- # 检查结果类型,如果是字典直接返回,否则调用dict()方法
- if isinstance(result, dict):
- return result
- else:
- return result.dict()
|