| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267 |
- """
- JSON 解析工具
- 提供强大的 JSON 响应解析功能,支持多种格式的 JSON 内容提取。
- """
- import json
- import re
- from typing import Any, Dict, List, Optional
- from src.common.logging_config import get_logger
- logger = get_logger(__name__)
- def parse_json_response(response: str, expected_type: Optional[type] = None) -> Any:
- """
- 解析 JSON 响应文本
-
- 支持多种格式的 JSON 内容提取:
- 1. 直接 JSON 解析
- 2. raw_decode 解析(跳过前面的非JSON文本)
- 3. Markdown 代码块包裹的 JSON
- 4. 括号匹配提取 JSON 数组
- 5. 正则表达式提取
- 6. 逐行查找 JSON
- 7. 提取独立 JSON 对象并组合
-
- Args:
- response: 模型响应文本
- expected_type: 期望的返回类型(list 或 dict),如果指定则验证返回类型
-
- Returns:
- 解析后的 JSON 对象(通常是列表或字典),解析失败返回空列表或空字典
-
- Examples:
- >>> # 解析 QA 响应(返回列表)
- >>> qa_pairs = parse_json_response(response_text)
- >>>
- >>> # 解析配置响应(返回字典)
- >>> config = parse_json_response(response_text, expected_type=dict)
- """
- if not response:
- return [] if expected_type == list else {} if expected_type == dict else None
-
- # 清理响应文本
- cleaned_response = response.strip()
-
- # 移除 BOM 标记
- if cleaned_response.startswith('\ufeff'):
- cleaned_response = cleaned_response[1:]
-
- # 1. 尝试直接解析JSON
- try:
- result = json.loads(cleaned_response)
- if expected_type is None or isinstance(result, expected_type):
- return result
- except json.JSONDecodeError as e:
- logger.debug(f"直接解析失败: {str(e)}")
- pass
-
- # 1.5. 尝试使用 raw_decode 解析(可以跳过前面的非JSON文本)
- try:
- decoder = json.JSONDecoder()
- result, idx = decoder.raw_decode(cleaned_response)
- if expected_type is None or isinstance(result, expected_type):
- return result
- except (json.JSONDecodeError, ValueError) as e:
- logger.debug(f"raw_decode 解析失败: {str(e)}")
- pass
-
- # 2. 尝试去除 markdown 代码块标记
- # 匹配 ```json ... ``` 或 ``` ... ```
- code_block_pattern = r'```(?:json)?\s*\n?(.*?)\n?```'
- code_block_match = re.search(code_block_pattern, cleaned_response, re.DOTALL)
- if code_block_match:
- try:
- json_content = code_block_match.group(1).strip()
- result = json.loads(json_content)
- if expected_type is None or isinstance(result, expected_type):
- return result
- except json.JSONDecodeError:
- pass
-
- # 3. 尝试提取第一个完整的 JSON 数组
- # 使用括号匹配算法,正确处理嵌套的 [] 和 {}
- bracket_count = 0
- brace_count = 0
- start_idx = -1
- in_string = False
- escape_next = False
-
- for i, char in enumerate(cleaned_response):
- if escape_next:
- escape_next = False
- continue
-
- if char == '\\':
- escape_next = True
- continue
-
- if char == '"' and not escape_next:
- in_string = not in_string
- continue
-
- if in_string:
- continue
-
- if char == '[':
- if start_idx == -1:
- start_idx = i
- bracket_count += 1
- elif char == ']':
- bracket_count -= 1
- if bracket_count == 0 and brace_count == 0 and start_idx != -1:
- try:
- json_content = cleaned_response[start_idx:i+1]
- result = json.loads(json_content)
- if expected_type is None or isinstance(result, expected_type):
- return result
- except json.JSONDecodeError:
- # 继续尝试下一个匹配
- start_idx = -1
- bracket_count = 0
- brace_count = 0
- elif char == '{':
- if start_idx != -1:
- brace_count += 1
- elif char == '}':
- if start_idx != -1:
- brace_count -= 1
-
- # 4. 尝试使用正则提取 JSON 数组(更宽松的方式)
- json_array_pattern = r'\[\s*(?:\{[^}]*\}(?:\s*,\s*\{[^}]*\})*)?\s*\]'
- json_match = re.search(json_array_pattern, cleaned_response, re.DOTALL)
- if json_match:
- try:
- result = json.loads(json_match.group())
- if expected_type is None or isinstance(result, expected_type):
- return result
- except json.JSONDecodeError:
- pass
-
- # 5. 尝试逐行查找 JSON 数组
- lines = cleaned_response.split('\n')
- json_lines = []
- in_json = False
- bracket_count = 0
-
- for line in lines:
- stripped_line = line.strip()
- if not stripped_line:
- continue
-
- # 检查是否包含 JSON 数组的开始
- if '[' in stripped_line and not in_json:
- in_json = True
- json_lines = [stripped_line]
- bracket_count = stripped_line.count('[') - stripped_line.count(']')
- elif in_json:
- json_lines.append(stripped_line)
- bracket_count += stripped_line.count('[') - stripped_line.count(']')
-
- if bracket_count == 0:
- try:
- json_content = '\n'.join(json_lines)
- result = json.loads(json_content)
- if expected_type is None or isinstance(result, expected_type):
- return result
- except json.JSONDecodeError:
- in_json = False
- json_lines = []
- bracket_count = 0
-
- # 如果收集到了 JSON 行但还没闭合,尝试解析
- if json_lines:
- try:
- json_content = '\n'.join(json_lines)
- result = json.loads(json_content)
- if expected_type is None or isinstance(result, expected_type):
- return result
- except json.JSONDecodeError:
- pass
-
- # 6. 最后尝试:查找所有可能的 JSON 对象并组合成数组
- try:
- # 查找所有 { ... } 模式的对象
- json_objects = re.findall(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', cleaned_response, re.DOTALL)
- if json_objects:
- parsed_objects = []
- for obj_str in json_objects:
- try:
- parsed_obj = json.loads(obj_str)
- # 如果是 QA 格式(包含 question 和 answer),则添加
- if isinstance(parsed_obj, dict) and 'question' in parsed_obj and 'answer' in parsed_obj:
- parsed_objects.append(parsed_obj)
- elif expected_type is None:
- # 如果没有指定期望类型,也添加
- parsed_objects.append(parsed_obj)
- except json.JSONDecodeError:
- continue
- if parsed_objects:
- logger.info(f"通过对象提取方式解析到 {len(parsed_objects)} 个对象")
- return parsed_objects
- except Exception as e:
- logger.debug(f"对象提取方式失败: {str(e)}")
-
- # 所有方法都失败
- # 记录更详细的错误信息用于调试
- error_info = {
- "response_length": len(cleaned_response),
- "first_100_chars": repr(cleaned_response[:100]),
- "last_100_chars": repr(cleaned_response[-100:]) if len(cleaned_response) > 100 else "",
- "has_bracket": '[' in cleaned_response,
- "has_brace": '{' in cleaned_response,
- }
- logger.warning(f"无法解析JSON响应: {error_info}")
-
- # 尝试最后一次:如果响应看起来像 JSON 数组,尝试修复常见问题
- if cleaned_response.startswith('[') and cleaned_response.endswith(']'):
- try:
- # 尝试修复常见的 JSON 问题:替换中文引号
- fixed_response = cleaned_response.replace('"', '"').replace('"', '"').replace(''', "'").replace(''', "'")
- result = json.loads(fixed_response)
- if expected_type is None or isinstance(result, expected_type):
- return result
- except json.JSONDecodeError:
- pass
-
- # 根据期望类型返回默认值
- if expected_type == list:
- return []
- elif expected_type == dict:
- return {}
- return None
- def parse_qa_response(response: str) -> List[Dict[str, str]]:
- """
- 解析 QA 响应文本(便捷函数)
-
- 专门用于解析 QA 问答对响应,返回格式化的 QA 列表。
-
- Args:
- response: 模型响应文本,应包含 JSON 格式的 QA 对列表
-
- Returns:
- QA 对列表,每个元素包含 "question" 和 "answer" 字段
-
- Examples:
- >>> response = '[{"question": "问题1", "answer": "答案1"}]'
- >>> qa_pairs = parse_qa_response(response)
- >>> # 返回: [{"question": "问题1", "answer": "答案1"}]
- """
- result = parse_json_response(response, expected_type=list)
- if result is None:
- return []
-
- # 验证并过滤有效的 QA 对
- valid_qa_pairs = []
- for item in result:
- if isinstance(item, dict) and 'question' in item and 'answer' in item:
- valid_qa_pairs.append({
- "question": str(item.get("question", "")),
- "answer": str(item.get("answer", ""))
- })
-
- return valid_qa_pairs
|