json_utils.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. """
  2. JSON 解析工具
  3. 提供强大的 JSON 响应解析功能,支持多种格式的 JSON 内容提取。
  4. """
  5. import json
  6. import re
  7. from typing import Any, Dict, List, Optional
  8. from src.common.logging_config import get_logger
  9. logger = get_logger(__name__)
  10. def parse_json_response(response: str, expected_type: Optional[type] = None) -> Any:
  11. """
  12. 解析 JSON 响应文本
  13. 支持多种格式的 JSON 内容提取:
  14. 1. 直接 JSON 解析
  15. 2. raw_decode 解析(跳过前面的非JSON文本)
  16. 3. Markdown 代码块包裹的 JSON
  17. 4. 括号匹配提取 JSON 数组
  18. 5. 正则表达式提取
  19. 6. 逐行查找 JSON
  20. 7. 提取独立 JSON 对象并组合
  21. Args:
  22. response: 模型响应文本
  23. expected_type: 期望的返回类型(list 或 dict),如果指定则验证返回类型
  24. Returns:
  25. 解析后的 JSON 对象(通常是列表或字典),解析失败返回空列表或空字典
  26. Examples:
  27. >>> # 解析 QA 响应(返回列表)
  28. >>> qa_pairs = parse_json_response(response_text)
  29. >>>
  30. >>> # 解析配置响应(返回字典)
  31. >>> config = parse_json_response(response_text, expected_type=dict)
  32. """
  33. if not response:
  34. return [] if expected_type == list else {} if expected_type == dict else None
  35. # 清理响应文本
  36. cleaned_response = response.strip()
  37. # 移除 BOM 标记
  38. if cleaned_response.startswith('\ufeff'):
  39. cleaned_response = cleaned_response[1:]
  40. # 1. 尝试直接解析JSON
  41. try:
  42. result = json.loads(cleaned_response)
  43. if expected_type is None or isinstance(result, expected_type):
  44. return result
  45. except json.JSONDecodeError as e:
  46. logger.debug(f"直接解析失败: {str(e)}")
  47. pass
  48. # 1.5. 尝试使用 raw_decode 解析(可以跳过前面的非JSON文本)
  49. try:
  50. decoder = json.JSONDecoder()
  51. result, idx = decoder.raw_decode(cleaned_response)
  52. if expected_type is None or isinstance(result, expected_type):
  53. return result
  54. except (json.JSONDecodeError, ValueError) as e:
  55. logger.debug(f"raw_decode 解析失败: {str(e)}")
  56. pass
  57. # 2. 尝试去除 markdown 代码块标记
  58. # 匹配 ```json ... ``` 或 ``` ... ```
  59. code_block_pattern = r'```(?:json)?\s*\n?(.*?)\n?```'
  60. code_block_match = re.search(code_block_pattern, cleaned_response, re.DOTALL)
  61. if code_block_match:
  62. try:
  63. json_content = code_block_match.group(1).strip()
  64. result = json.loads(json_content)
  65. if expected_type is None or isinstance(result, expected_type):
  66. return result
  67. except json.JSONDecodeError:
  68. pass
  69. # 3. 尝试提取第一个完整的 JSON 数组
  70. # 使用括号匹配算法,正确处理嵌套的 [] 和 {}
  71. bracket_count = 0
  72. brace_count = 0
  73. start_idx = -1
  74. in_string = False
  75. escape_next = False
  76. for i, char in enumerate(cleaned_response):
  77. if escape_next:
  78. escape_next = False
  79. continue
  80. if char == '\\':
  81. escape_next = True
  82. continue
  83. if char == '"' and not escape_next:
  84. in_string = not in_string
  85. continue
  86. if in_string:
  87. continue
  88. if char == '[':
  89. if start_idx == -1:
  90. start_idx = i
  91. bracket_count += 1
  92. elif char == ']':
  93. bracket_count -= 1
  94. if bracket_count == 0 and brace_count == 0 and start_idx != -1:
  95. try:
  96. json_content = cleaned_response[start_idx:i+1]
  97. result = json.loads(json_content)
  98. if expected_type is None or isinstance(result, expected_type):
  99. return result
  100. except json.JSONDecodeError:
  101. # 继续尝试下一个匹配
  102. start_idx = -1
  103. bracket_count = 0
  104. brace_count = 0
  105. elif char == '{':
  106. if start_idx != -1:
  107. brace_count += 1
  108. elif char == '}':
  109. if start_idx != -1:
  110. brace_count -= 1
  111. # 4. 尝试使用正则提取 JSON 数组(更宽松的方式)
  112. json_array_pattern = r'\[\s*(?:\{[^}]*\}(?:\s*,\s*\{[^}]*\})*)?\s*\]'
  113. json_match = re.search(json_array_pattern, cleaned_response, re.DOTALL)
  114. if json_match:
  115. try:
  116. result = json.loads(json_match.group())
  117. if expected_type is None or isinstance(result, expected_type):
  118. return result
  119. except json.JSONDecodeError:
  120. pass
  121. # 5. 尝试逐行查找 JSON 数组
  122. lines = cleaned_response.split('\n')
  123. json_lines = []
  124. in_json = False
  125. bracket_count = 0
  126. for line in lines:
  127. stripped_line = line.strip()
  128. if not stripped_line:
  129. continue
  130. # 检查是否包含 JSON 数组的开始
  131. if '[' in stripped_line and not in_json:
  132. in_json = True
  133. json_lines = [stripped_line]
  134. bracket_count = stripped_line.count('[') - stripped_line.count(']')
  135. elif in_json:
  136. json_lines.append(stripped_line)
  137. bracket_count += stripped_line.count('[') - stripped_line.count(']')
  138. if bracket_count == 0:
  139. try:
  140. json_content = '\n'.join(json_lines)
  141. result = json.loads(json_content)
  142. if expected_type is None or isinstance(result, expected_type):
  143. return result
  144. except json.JSONDecodeError:
  145. in_json = False
  146. json_lines = []
  147. bracket_count = 0
  148. # 如果收集到了 JSON 行但还没闭合,尝试解析
  149. if json_lines:
  150. try:
  151. json_content = '\n'.join(json_lines)
  152. result = json.loads(json_content)
  153. if expected_type is None or isinstance(result, expected_type):
  154. return result
  155. except json.JSONDecodeError:
  156. pass
  157. # 6. 最后尝试:查找所有可能的 JSON 对象并组合成数组
  158. try:
  159. # 查找所有 { ... } 模式的对象
  160. json_objects = re.findall(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', cleaned_response, re.DOTALL)
  161. if json_objects:
  162. parsed_objects = []
  163. for obj_str in json_objects:
  164. try:
  165. parsed_obj = json.loads(obj_str)
  166. # 如果是 QA 格式(包含 question 和 answer),则添加
  167. if isinstance(parsed_obj, dict) and 'question' in parsed_obj and 'answer' in parsed_obj:
  168. parsed_objects.append(parsed_obj)
  169. elif expected_type is None:
  170. # 如果没有指定期望类型,也添加
  171. parsed_objects.append(parsed_obj)
  172. except json.JSONDecodeError:
  173. continue
  174. if parsed_objects:
  175. logger.info(f"通过对象提取方式解析到 {len(parsed_objects)} 个对象")
  176. return parsed_objects
  177. except Exception as e:
  178. logger.debug(f"对象提取方式失败: {str(e)}")
  179. # 所有方法都失败
  180. # 记录更详细的错误信息用于调试
  181. error_info = {
  182. "response_length": len(cleaned_response),
  183. "first_100_chars": repr(cleaned_response[:100]),
  184. "last_100_chars": repr(cleaned_response[-100:]) if len(cleaned_response) > 100 else "",
  185. "has_bracket": '[' in cleaned_response,
  186. "has_brace": '{' in cleaned_response,
  187. }
  188. logger.warning(f"无法解析JSON响应: {error_info}")
  189. # 尝试最后一次:如果响应看起来像 JSON 数组,尝试修复常见问题
  190. if cleaned_response.startswith('[') and cleaned_response.endswith(']'):
  191. try:
  192. # 尝试修复常见的 JSON 问题:替换中文引号
  193. fixed_response = cleaned_response.replace('"', '"').replace('"', '"').replace(''', "'").replace(''', "'")
  194. result = json.loads(fixed_response)
  195. if expected_type is None or isinstance(result, expected_type):
  196. return result
  197. except json.JSONDecodeError:
  198. pass
  199. # 根据期望类型返回默认值
  200. if expected_type == list:
  201. return []
  202. elif expected_type == dict:
  203. return {}
  204. return None
  205. def parse_qa_response(response: str) -> List[Dict[str, str]]:
  206. """
  207. 解析 QA 响应文本(便捷函数)
  208. 专门用于解析 QA 问答对响应,返回格式化的 QA 列表。
  209. Args:
  210. response: 模型响应文本,应包含 JSON 格式的 QA 对列表
  211. Returns:
  212. QA 对列表,每个元素包含 "question" 和 "answer" 字段
  213. Examples:
  214. >>> response = '[{"question": "问题1", "answer": "答案1"}]'
  215. >>> qa_pairs = parse_qa_response(response)
  216. >>> # 返回: [{"question": "问题1", "answer": "答案1"}]
  217. """
  218. result = parse_json_response(response, expected_type=list)
  219. if result is None:
  220. return []
  221. # 验证并过滤有效的 QA 对
  222. valid_qa_pairs = []
  223. for item in result:
  224. if isinstance(item, dict) and 'question' in item and 'answer' in item:
  225. valid_qa_pairs.append({
  226. "question": str(item.get("question", "")),
  227. "answer": str(item.get("answer", ""))
  228. })
  229. return valid_qa_pairs