summary_node.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. """
  2. 总结节点
  3. 对图像解析结果进行总结提要,并保存到文件。
  4. """
  5. import os
  6. from typing import Dict, Any, List, Optional
  7. from src.datasets.parser.core.base import BaseNode, BaseState
  8. from src.datasets.parser.core.registry import register_node
  9. from src.conf.settings import model_settings
  10. from src.model.qwen_vl import QWenVLParser
  11. from src.common.logging_config import get_logger
  12. logger = get_logger(__name__)
  13. @register_node()
  14. class SummaryNode(BaseNode):
  15. """
  16. 总结节点
  17. 对图像解析结果进行总结提要,并将结果保存到文件。
  18. """
  19. def __init__(self, dimension_id: int = 0, model_name: Optional[str] = None):
  20. """
  21. 初始化总结节点
  22. Args:
  23. dimension_id: 维度ID,用于生成文件名
  24. model_name: 模型名称
  25. """
  26. self.dimension_id = dimension_id
  27. self.model_name = model_name or model_settings.chat_model_name
  28. # 总结提示模板
  29. self.summary_prompt = """
  30. 你是一位专业的内容总结专家,擅长从长篇文本中提取核心内容并生成简洁的总结。
  31. 请对以下内容进行总结,要求:
  32. 1. 提炼核心观点和关键信息
  33. 2. 保持内容的完整性和准确性
  34. 3. 使用清晰、连贯的语言
  35. 4. 总结长度适中,不要过于冗长
  36. 内容:
  37. {content}
  38. 总结:
  39. """
  40. @property
  41. def name(self) -> str:
  42. return f"summary_node"
  43. def _summarize_content(self, content: str) -> str:
  44. """
  45. 使用模型对内容进行总结
  46. Args:
  47. content: 待总结的内容
  48. Returns:
  49. str: 总结结果
  50. """
  51. try:
  52. # 构建提示,使用安全的替换方式
  53. if "{content}" in self.summary_prompt:
  54. messages = self.summary_prompt.replace("{content}", content)
  55. else:
  56. messages = self.summary_prompt
  57. # 调用模型生成总结
  58. chat_model = QWenVLParser(self.model_name)
  59. response = chat_model.chat(prompt=messages)
  60. return response
  61. except Exception as e:
  62. logger.error(f"总结内容时出错: {str(e)}")
  63. return f"总结失败: {str(e)}"
  64. def execute(self, state: BaseState) -> Dict[str, Any]:
  65. """
  66. 执行总结操作
  67. Args:
  68. state: 包含解析结果的状态
  69. Returns:
  70. 包含总结结果的字典
  71. """
  72. # 获取解析结果
  73. parsed_results = getattr(state, 'parsed_results', [])
  74. original_filename = getattr(state, 'original_filename', '')
  75. if not parsed_results:
  76. logger.warning("没有解析结果可总结")
  77. return {
  78. "summary": "",
  79. "saved_path": "",
  80. "is_complete": True
  81. }
  82. # 提取并合并内容
  83. import json
  84. content_parts = []
  85. for result in parsed_results:
  86. if isinstance(result, dict):
  87. content = result.get('content', '')
  88. if content:
  89. # 处理content:如果是字典,转换为JSON字符串;如果是字符串,直接使用
  90. if isinstance(content, dict):
  91. content_parts.append(json.dumps(content, ensure_ascii=False, indent=2))
  92. else:
  93. content_parts.append(str(content))
  94. if not content_parts:
  95. logger.warning("解析结果中没有内容可总结")
  96. return {
  97. "summary": "",
  98. "saved_path": "",
  99. "is_complete": True
  100. }
  101. # 合并内容
  102. combined_content = "\n".join(content_parts)
  103. logger.info(f"开始总结内容,长度: {len(combined_content)} 字符")
  104. # 生成总结
  105. summary = self._summarize_content(combined_content)
  106. logger.info("内容总结完成")
  107. # 确保temp目录存在
  108. temp_dir = "temp"
  109. os.makedirs(temp_dir, exist_ok=True)
  110. # 保存总结到文件
  111. file_path = os.path.join(temp_dir, f"{original_filename}_{self.dimension_id}.md")
  112. try:
  113. with open(file_path, 'w', encoding='utf-8') as f:
  114. f.write(summary)
  115. logger.info(f"总结已保存到: {file_path}")
  116. except Exception as e:
  117. logger.error(f"保存总结文件时出错: {str(e)}")
  118. file_path = ""
  119. return {
  120. "summary": summary,
  121. "saved_path": file_path,
  122. "is_complete": True
  123. }