pdf_ocr_node.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. """
  2. PDF OCR解析节点
  3. """
  4. import concurrent.futures
  5. from typing import Dict, Any, List
  6. from src.datasets.parser.core.base import BaseNode
  7. from src.datasets.parser.pdf_parser.pdf_splitter import PDFSplitter
  8. from src.model.qwen_vl import QWenVLParser
  9. from src.common.logging_config import get_logger
  10. from src.conf.settings import model_settings
  11. from src.utils.async_utils import ThreadPoolManager
  12. logger = get_logger(__name__)
  13. class PDFOCRNode(BaseNode):
  14. """
  15. PDF OCR解析节点
  16. 使用VL模型提取PDF文本内容。
  17. """
  18. def __init__(self, model_name: str = None, max_workers: int = 4):
  19. self.model_name = model_name or model_settings.vl_model_name
  20. self.max_workers = max_workers
  21. @property
  22. def name(self) -> str:
  23. return "pdf_ocr"
  24. def _parse_single_page(self, page: Dict[str, Any]) -> Dict[str, Any]:
  25. """
  26. 解析单个页面
  27. Args:
  28. page: 页面信息,包含page_number和image字段
  29. Returns:
  30. 解析结果字典,包含page_number和content字段
  31. """
  32. page_number = page.get("page_number", 0)
  33. image = page.get("image")
  34. prompt = "请提取这张图片中的所有文字内容,只输出文字,不要添加任何解释。"
  35. logger.debug(f"开始解析第 {page_number} 页")
  36. try:
  37. parser = QWenVLParser(self.model_name)
  38. result = parser.parse_image(image, page_number, prompt)
  39. text = result.get("content", "")
  40. logger.info(f"页面 {page_number} 提取 {len(text)} 字符")
  41. return {
  42. "page_number": page_number,
  43. "content": text
  44. }
  45. except Exception as e:
  46. logger.error(f"解析第 {page_number} 页时出错: {str(e)}")
  47. return {
  48. "page_number": page_number,
  49. "content": ""
  50. }
  51. def execute(self, state) -> Dict[str, Any]:
  52. pdf_path = state.pdf_path
  53. logger.info(f"开始OCR解析PDF: {pdf_path}")
  54. # 拆分PDF为图片
  55. splitter = PDFSplitter()
  56. pages = splitter.split_pdf(pdf_path=pdf_path, is_upload=False)
  57. if not pages:
  58. logger.warning("PDF拆分后没有页面")
  59. return {"full_text": ""}
  60. logger.info(f"开始并行OCR解析 {len(pages)} 页,最大线程数: {self.max_workers}")
  61. parsed_results = []
  62. # 使用全局线程池
  63. pool = ThreadPoolManager.get_pool("parser")
  64. future_to_page = {
  65. pool.submit(self._parse_single_page, page): page
  66. for page in pages
  67. }
  68. for future in concurrent.futures.as_completed(future_to_page):
  69. try:
  70. result = future.result()
  71. parsed_results.append(result)
  72. except Exception as e:
  73. page = future_to_page[future]
  74. page_number = page.get("page_number", 0)
  75. logger.error(f"解析第 {page_number} 页时出错: {str(e)}")
  76. parsed_results.append({
  77. "page_number": page_number,
  78. "content": ""
  79. })
  80. # 按页码排序结果,确保顺序性
  81. parsed_results.sort(key=lambda x: x.get("page_number", 0))
  82. # 按顺序拼接文本
  83. full_text_parts = [result.get("content", "") for result in parsed_results]
  84. full_text = "\n\n".join(full_text_parts)
  85. logger.info(f"PDF OCR完成,提取 {len(full_text)} 字符")
  86. return {"full_text": full_text}