#!/usr/bin/env python3 """ 图片解析工作流 """ import sys import os import concurrent.futures from concurrent.futures import ThreadPoolExecutor from PIL import Image import requests # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from langgraph.graph import StateGraph, START, END from typing import List, Dict, Any, Annotated from pydantic import BaseModel, Field, ConfigDict from model.qwen_vl import QWenVLParser from utils.ragflow.ragflow_service import RAGFlowService from model.multimodal_embedding import Embedding from utils.minio.image_util import image_util from conf.settings import model_settings from utils.infinity import get_client # 定义工作流状态类 class ImageParsingState(BaseModel): """图片解析工作流状态""" model_config = ConfigDict(arbitrary_types_allowed=True) zip_file_path: str = Field(..., description="图片压缩包路径") book_name: str = Field(..., description="书名") dataset_id: str = Field(..., description="数据集ID") ragflow_service: RAGFlowService = Field(default_factory=RAGFlowService, description="RAGFLOW服务") embedding_model: Embedding = Field(default_factory=Embedding, description="多模态嵌入模型实例") document_id: str = Field(default="", description="文档ID") split_images: List[Dict[str, Any]] = Field(default_factory=list, description="拆分后的图片列表,包含图片URL和页码信息") parsed_results: List[Dict[str, Any]] = Field(default_factory=list, description="解析结果列表") vectorized_results: List[Dict[str, Any]] = Field(default_factory=list, description="向量化结果列表") processed_images: int = Field(default=0, description="已处理的图片数量") vectorized_images: int = Field(default=0, description="已向量化的图片数量") is_complete: bool = Field(default=False, description="是否处理完成") # 创建工作流构建器 class ImageParsingWorkflow: """图片解析工作流""" def __init__(self, model_name: str = "Qwen/Qwen3-VL-8B-Instruct"): """ 初始化图片解析工作流 Args: model_name: QWEN VL模型名称 """ self.model_name = model_name self.workflow = self._build_workflow() def _build_workflow(self): """构建langgraph工作流,实现基于条件路由的并行处理""" # 创建状态图 graph = StateGraph(ImageParsingState) # 添加节点 graph.add_node("upload_images", self._upload_images_node) graph.add_node("parse_image", self._parse_image_node) graph.add_node("vectorize_store", self._vectorize_store_node) graph.add_node("complete", self._complete_node) # 定义边 graph.add_edge(START, "upload_images") graph.add_edge("upload_images", "parse_image") # 添加条件边:判断是否继续解析 graph.add_conditional_edges( "parse_image", self._should_continue_parsing, { "continue": "parse_image", "complete": "vectorize_store" } ) graph.add_edge("vectorize_store", "complete") graph.add_edge("complete", END) # 编译工作流 return graph.compile() def _upload_images_node(self, state: ImageParsingState) -> Dict[str, Any]: """上传图片节点,调用image_util处理图片压缩包""" print(f"开始处理图片压缩包: {state.zip_file_path}") try: # 调用image_util处理图片压缩包,获取图片URL列表 image_urls = image_util.process_image_zip( state.zip_file_path, state.book_name ) print(f"图片压缩包处理完成,共 {len(image_urls)} 张图片") # 构建split_images列表,格式与PDF解析工作流保持一致 split_images = [] for i, url in enumerate(image_urls): split_images.append({ "page_number": i + 1, "image_url": url, "image": None # 稍后在解析时加载 }) return { "split_images": split_images, "processed_images": 0, "is_complete": False } except Exception as e: print(f"处理图片压缩包时出错: {str(e)}") raise def _parse_single_page(self, image_info: Dict[str, Any], model_name: str) -> Dict[str, Any]: """解析单个图片(用于并行处理)""" prompt = """ 你是一个画本类童书的创作者,创作的内容适合0-12岁的儿童 任务:你需要根据现有童书插画与内容,提取出插画中的各种要素、行为、情感,并针对每个要素进行独立描述 注意:描述内容要积极正向,符合社会主义核心价值观 输出要求: 1.以json的格式输出,结构为: { "page_number": 页码, "content": 页面原文本内容, "elements": [ { "element": "元素描述", "description": "详细描述" }, ... ] } 2.每个要素的描述要简洁明了,不超过50个中文字符 3.每个元素的描述要与插画中的元素相关,不能脱离插画而独立存在 4.每个元素的描述要符合社会价值观,不能包含任何负面或不道德的内容 """ page_number = image_info["page_number"] image_url = image_info["image_url"] print(f"开始解析第 {page_number} 页,图片URL: {image_url}") try: # 从URL加载图片 response = requests.get(image_url, timeout=30) response.raise_for_status() image = Image.open(requests.get(image_url, stream=True).raw) # 使用QWEN VL模型解析图像 parser = QWenVLParser(model_name) result = parser.parse_image(image, page_number, prompt) print(f"第 {page_number} 页解析完成") return result except Exception as e: print(f"解析第 {page_number} 页时出错: {str(e)}") raise def _parse_image_node(self, state: ImageParsingState) -> Dict[str, Any]: """解析图像节点,使用并行处理""" if not state.split_images: return state.dict() print(f"开始并行解析 {len(state.split_images)} 张图片") parsed_results = [] # 使用ThreadPoolExecutor实现并行处理 with ThreadPoolExecutor(max_workers=4) as executor: # 提交所有图片解析任务 future_to_image = { executor.submit(self._parse_single_page, image_info, self.model_name): image_info for image_info in state.split_images } # 收集解析结果 for future in concurrent.futures.as_completed(future_to_image): try: result = future.result() parsed_results.append(result) except Exception as e: image_info = future_to_image[future] print(f"解析第 {image_info['page_number']} 页时出错: {str(e)}") # 按页码排序结果 parsed_results.sort(key=lambda x: x["page_number"]) print(f"所有图片解析完成,共解析 {len(parsed_results)} 张图片") return { "split_images": state.split_images, # 保留split_images,以便后续访问图片 "parsed_results": parsed_results, "processed_images": len(parsed_results), "is_complete": True } def _should_continue_parsing(self, state: ImageParsingState) -> str: """判断是否继续解析""" # 由于我们使用了并行处理,parse_image_node会一次性处理所有图片 # 所以这里总是返回"complete" return "complete" def _vectorize_store_node(self, state: ImageParsingState) -> Dict[str, Any]: """向量化入库节点""" print(f"开始向量化入库,共 {len(state.parsed_results)} 张图片") # 创建索引(如果不存在) index_name = f"image_documents_{state.dataset_id}" state.vector_db.create_index(index_name) # 准备要入库的文档列表 documents_to_store = [] # 获取文件名和总页数 file_name = f"{state.book_name}.zip" file_page_count = len(state.split_images) # 遍历所有解析结果,生成向量化文档 for i, parsed_result in enumerate(state.parsed_results): try: page_number = parsed_result.get("page_number") text = parsed_result.get("content", "") image_url = state.split_images[i].get("image_url") # 从URL加载图片 image = None try: response = requests.get(image_url, timeout=30) response.raise_for_status() image = Image.open(requests.get(image_url, stream=True).raw) except Exception as e: print(f"加载图片 {image_url} 失败: {str(e)}") # 获取多模态嵌入向量 print(f"正在生成第 {page_number} 页的多模态嵌入...") embedding = state.embedding_model.get_multimodal_embedding(text, image) # 生成1024维稠密向量 dense_vector_1024 = embedding[:1024] # 取前1024维 # 创建文档 document = { "id": f"{state.document_id}_{page_number}" if state.document_id else f"image_{state.dataset_id}_{page_number}", "file_name": file_name, "file_page_count": file_page_count, "page_number": page_number, "content": text, "image_path": image_url, # 这里可以根据实际情况生成图片ID "dense_vector_1024": dense_vector_1024, "dataset_id": state.dataset_id, "document_id": state.document_id } documents_to_store.append(document) print(f"第 {page_number} 页向量化完成") except Exception as e: print(f"第 {i+1} 页向量化失败: {str(e)}") # 批量入库 if documents_to_store: print(f"开始入库,共 {len(documents_to_store)} 个文档") infinity_client = get_client() result = infinity_client.insert(index_name, documents_to_store) print(f"入库结果: {result}") return { "vectorized_results": documents_to_store, "vectorized_images": len(documents_to_store), "is_complete": True } def _complete_node(self, state: ImageParsingState) -> Dict[str, Any]: """完成节点""" print(f"图片解析工作流完成,共解析 {len(state.parsed_results)} 张图片,向量化 {state.vectorized_images} 张图片") return { "is_complete": True } def run(self, zip_file_path: str, book_name: str, dataset_id: str, ragflow_api_url: str, rag_flow_api_key: str) -> Dict[str, Any]: """ 运行图片解析工作流 Args: zip_file_path: 图片压缩包路径 book_name: 书名 dataset_id: 数据集ID ragflow_api_url: RAGFLOW API URL rag_flow_api_key: RAGFLOW API密钥 Returns: Dict: 包含最终状态的字典 """ initial_state = ImageParsingState( zip_file_path=zip_file_path, book_name=book_name, dataset_id=dataset_id, embedding_model=Embedding(model_name=model_settings.multimodal_embedding_model_name, api_key=model_settings.dashscope_api_key), ragflow_service=RAGFlowService(base_url=ragflow_api_url, api_key=rag_flow_api_key) ) result = self.workflow.invoke(initial_state) # 检查结果类型,如果是字典直接返回,否则调用dict()方法 if isinstance(result, dict): return result else: return result.dict()