""" JSON 解析工具 提供强大的 JSON 响应解析功能,支持多种格式的 JSON 内容提取。 """ import json import re from typing import Any, Dict, List, Optional from src.common.logging_config import get_logger logger = get_logger(__name__) def parse_json_response(response: str, expected_type: Optional[type] = None) -> Any: """ 解析 JSON 响应文本 支持多种格式的 JSON 内容提取: 1. 直接 JSON 解析 2. raw_decode 解析(跳过前面的非JSON文本) 3. Markdown 代码块包裹的 JSON 4. 括号匹配提取 JSON 数组 5. 正则表达式提取 6. 逐行查找 JSON 7. 提取独立 JSON 对象并组合 Args: response: 模型响应文本 expected_type: 期望的返回类型(list 或 dict),如果指定则验证返回类型 Returns: 解析后的 JSON 对象(通常是列表或字典),解析失败返回空列表或空字典 Examples: >>> # 解析 QA 响应(返回列表) >>> qa_pairs = parse_json_response(response_text) >>> >>> # 解析配置响应(返回字典) >>> config = parse_json_response(response_text, expected_type=dict) """ if not response: return [] if expected_type == list else {} if expected_type == dict else None # 清理响应文本 cleaned_response = response.strip() # 移除 BOM 标记 if cleaned_response.startswith('\ufeff'): cleaned_response = cleaned_response[1:] # 1. 尝试直接解析JSON try: result = json.loads(cleaned_response) if expected_type is None or isinstance(result, expected_type): return result except json.JSONDecodeError as e: logger.debug(f"直接解析失败: {str(e)}") pass # 1.5. 尝试使用 raw_decode 解析(可以跳过前面的非JSON文本) try: decoder = json.JSONDecoder() result, idx = decoder.raw_decode(cleaned_response) if expected_type is None or isinstance(result, expected_type): return result except (json.JSONDecodeError, ValueError) as e: logger.debug(f"raw_decode 解析失败: {str(e)}") pass # 2. 尝试去除 markdown 代码块标记 # 匹配 ```json ... ``` 或 ``` ... ``` code_block_pattern = r'```(?:json)?\s*\n?(.*?)\n?```' code_block_match = re.search(code_block_pattern, cleaned_response, re.DOTALL) if code_block_match: try: json_content = code_block_match.group(1).strip() result = json.loads(json_content) if expected_type is None or isinstance(result, expected_type): return result except json.JSONDecodeError: pass # 3. 尝试提取第一个完整的 JSON 数组 # 使用括号匹配算法,正确处理嵌套的 [] 和 {} bracket_count = 0 brace_count = 0 start_idx = -1 in_string = False escape_next = False for i, char in enumerate(cleaned_response): if escape_next: escape_next = False continue if char == '\\': escape_next = True continue if char == '"' and not escape_next: in_string = not in_string continue if in_string: continue if char == '[': if start_idx == -1: start_idx = i bracket_count += 1 elif char == ']': bracket_count -= 1 if bracket_count == 0 and brace_count == 0 and start_idx != -1: try: json_content = cleaned_response[start_idx:i+1] result = json.loads(json_content) if expected_type is None or isinstance(result, expected_type): return result except json.JSONDecodeError: # 继续尝试下一个匹配 start_idx = -1 bracket_count = 0 brace_count = 0 elif char == '{': if start_idx != -1: brace_count += 1 elif char == '}': if start_idx != -1: brace_count -= 1 # 4. 尝试使用正则提取 JSON 数组(更宽松的方式) json_array_pattern = r'\[\s*(?:\{[^}]*\}(?:\s*,\s*\{[^}]*\})*)?\s*\]' json_match = re.search(json_array_pattern, cleaned_response, re.DOTALL) if json_match: try: result = json.loads(json_match.group()) if expected_type is None or isinstance(result, expected_type): return result except json.JSONDecodeError: pass # 5. 尝试逐行查找 JSON 数组 lines = cleaned_response.split('\n') json_lines = [] in_json = False bracket_count = 0 for line in lines: stripped_line = line.strip() if not stripped_line: continue # 检查是否包含 JSON 数组的开始 if '[' in stripped_line and not in_json: in_json = True json_lines = [stripped_line] bracket_count = stripped_line.count('[') - stripped_line.count(']') elif in_json: json_lines.append(stripped_line) bracket_count += stripped_line.count('[') - stripped_line.count(']') if bracket_count == 0: try: json_content = '\n'.join(json_lines) result = json.loads(json_content) if expected_type is None or isinstance(result, expected_type): return result except json.JSONDecodeError: in_json = False json_lines = [] bracket_count = 0 # 如果收集到了 JSON 行但还没闭合,尝试解析 if json_lines: try: json_content = '\n'.join(json_lines) result = json.loads(json_content) if expected_type is None or isinstance(result, expected_type): return result except json.JSONDecodeError: pass # 6. 最后尝试:查找所有可能的 JSON 对象并组合成数组 try: # 查找所有 { ... } 模式的对象 json_objects = re.findall(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', cleaned_response, re.DOTALL) if json_objects: parsed_objects = [] for obj_str in json_objects: try: parsed_obj = json.loads(obj_str) # 如果是 QA 格式(包含 question 和 answer),则添加 if isinstance(parsed_obj, dict) and 'question' in parsed_obj and 'answer' in parsed_obj: parsed_objects.append(parsed_obj) elif expected_type is None: # 如果没有指定期望类型,也添加 parsed_objects.append(parsed_obj) except json.JSONDecodeError: continue if parsed_objects: logger.info(f"通过对象提取方式解析到 {len(parsed_objects)} 个对象") return parsed_objects except Exception as e: logger.debug(f"对象提取方式失败: {str(e)}") # 所有方法都失败 # 记录更详细的错误信息用于调试 error_info = { "response_length": len(cleaned_response), "first_100_chars": repr(cleaned_response[:100]), "last_100_chars": repr(cleaned_response[-100:]) if len(cleaned_response) > 100 else "", "has_bracket": '[' in cleaned_response, "has_brace": '{' in cleaned_response, } logger.warning(f"无法解析JSON响应: {error_info}") # 尝试最后一次:如果响应看起来像 JSON 数组,尝试修复常见问题 if cleaned_response.startswith('[') and cleaned_response.endswith(']'): try: # 尝试修复常见的 JSON 问题:替换中文引号 fixed_response = cleaned_response.replace('"', '"').replace('"', '"').replace(''', "'").replace(''', "'") result = json.loads(fixed_response) if expected_type is None or isinstance(result, expected_type): return result except json.JSONDecodeError: pass # 根据期望类型返回默认值 if expected_type == list: return [] elif expected_type == dict: return {} return None def parse_qa_response(response: str) -> List[Dict[str, str]]: """ 解析 QA 响应文本(便捷函数) 专门用于解析 QA 问答对响应,返回格式化的 QA 列表。 Args: response: 模型响应文本,应包含 JSON 格式的 QA 对列表 Returns: QA 对列表,每个元素包含 "question" 和 "answer" 字段 Examples: >>> response = '[{"question": "问题1", "answer": "答案1"}]' >>> qa_pairs = parse_qa_response(response) >>> # 返回: [{"question": "问题1", "answer": "答案1"}] """ result = parse_json_response(response, expected_type=list) if result is None: return [] # 验证并过滤有效的 QA 对 valid_qa_pairs = [] for item in result: if isinstance(item, dict) and 'question' in item and 'answer' in item: valid_qa_pairs.append({ "question": str(item.get("question", "")), "answer": str(item.get("answer", "")) }) return valid_qa_pairs