""" PDF OCR解析节点 """ import concurrent.futures from typing import Dict, Any, List from src.datasets.parser.core.base import BaseNode from src.datasets.parser.pdf_parser.pdf_splitter import PDFSplitter from src.model.qwen_vl import QWenVLParser from src.common.logging_config import get_logger from src.conf.settings import model_settings from src.utils.async_utils import ThreadPoolManager logger = get_logger(__name__) class PDFOCRNode(BaseNode): """ PDF OCR解析节点 使用VL模型提取PDF文本内容。 """ def __init__(self, model_name: str = None, max_workers: int = 4): self.model_name = model_name or model_settings.vl_model_name self.max_workers = max_workers @property def name(self) -> str: return "pdf_ocr" def _parse_single_page(self, page: Dict[str, Any]) -> Dict[str, Any]: """ 解析单个页面 Args: page: 页面信息,包含page_number和image字段 Returns: 解析结果字典,包含page_number和content字段 """ page_number = page.get("page_number", 0) image = page.get("image") prompt = "请提取这张图片中的所有文字内容,只输出文字,不要添加任何解释。" logger.debug(f"开始解析第 {page_number} 页") try: parser = QWenVLParser(self.model_name) result = parser.parse_image(image, page_number, prompt) text = result.get("content", "") logger.info(f"页面 {page_number} 提取 {len(text)} 字符") return { "page_number": page_number, "content": text } except Exception as e: logger.error(f"解析第 {page_number} 页时出错: {str(e)}") return { "page_number": page_number, "content": "" } def execute(self, state) -> Dict[str, Any]: pdf_path = state.pdf_path logger.info(f"开始OCR解析PDF: {pdf_path}") # 拆分PDF为图片 splitter = PDFSplitter() pages = splitter.split_pdf(pdf_path=pdf_path, is_upload=False) if not pages: logger.warning("PDF拆分后没有页面") return {"full_text": ""} logger.info(f"开始并行OCR解析 {len(pages)} 页,最大线程数: {self.max_workers}") parsed_results = [] # 使用全局线程池 pool = ThreadPoolManager.get_pool("parser") future_to_page = { pool.submit(self._parse_single_page, page): page for page in pages } for future in concurrent.futures.as_completed(future_to_page): try: result = future.result() parsed_results.append(result) except Exception as e: page = future_to_page[future] page_number = page.get("page_number", 0) logger.error(f"解析第 {page_number} 页时出错: {str(e)}") parsed_results.append({ "page_number": page_number, "content": "" }) # 按页码排序结果,确保顺序性 parsed_results.sort(key=lambda x: x.get("page_number", 0)) # 按顺序拼接文本 full_text_parts = [result.get("content", "") for result in parsed_results] full_text = "\n\n".join(full_text_parts) logger.info(f"PDF OCR完成,提取 {len(full_text)} 字符") return {"full_text": full_text}