import requests import logging import os import json from typing import Dict, Any, Optional from urllib3.util.retry import Retry from requests.adapters import HTTPAdapter # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class HTTPClient: """HTTP请求工具类,用于发送各种HTTP请求""" def __init__(self, base_url: str, api_key: str = None, auth_type: str = 'bearer'): """ 初始化HTTP客户端 Args: base_url: API基础URL api_key: API密钥 auth_type: 认证类型,支持'bearer'和'basic' """ self.base_url = base_url.rstrip('/') self.api_key = api_key self.session = requests.Session() # 设置请求超时(秒) self.timeout = 30 # 配置重试机制 retry_strategy = Retry( total=3, # 最大重试次数 backoff_factor=1, # 重试间隔:1秒、2秒、4秒 status_forcelist=[502, 503, 504], # 重试的状态码(移除500,避免过多无效重试) allowed_methods=["GET", "POST", "PUT", "DELETE"] # 允许重试的方法 ) # 创建HTTP适配器并设置重试策略 adapter = HTTPAdapter(max_retries=retry_strategy) # 将适配器应用到所有请求 self.session.mount("http://", adapter) self.session.mount("https://", adapter) # 设置默认请求头 if self.api_key: if auth_type == 'bearer': self.session.headers.update({ 'Authorization': f'Bearer {self.api_key}' }) elif auth_type == 'basic': # 处理Basic Auth,格式为"username:password" self.session.headers.update({ 'Authorization': f'Basic {self.api_key}' }) self.session.headers.update({ 'Content-Type': 'application/json' }) def post(self, endpoint: str, data: Optional[Dict] = None, json_data: Optional[Dict] = None, files: Optional[Dict] = None, headers: Optional[Dict] = None) -> Dict[str, Any]: """ 发送POST请求 Args: endpoint: API端点路径(以/开头) data: 表单数据 json_data: JSON数据 files: 文件数据 headers: 自定义请求头 Returns: Dict: 响应JSON数据 Raises: requests.exceptions.RequestException: 请求失败时抛出 """ url = f"{self.base_url}{endpoint}" # 记录请求日志 request_info = { "method": "POST", "url": url, "data": data, "json": json_data, "headers": headers, "files": files is not None # 不记录文件内容,只记录是否有文件 } # 将请求报文写入D:\project\work\ragflow_plugs\book\output\temp下的request.txt文件 with open(r"D:\project\work\ragflow_plugs\book\output\temp\request.txt", "w", encoding="utf-8") as f: f.write(str(request_info)) logger.info(f"Sending request: {request_info}") try: # 当上传文件时,不使用默认的Content-Type: application/json头 # 让requests库自动生成正确的multipart/form-data头 if files: # 创建一个临时会话,不包含默认的Content-Type头 temp_session = requests.Session() # 复制认证头 if self.api_key: auth_header = self.session.headers.get('Authorization') if auth_header: temp_session.headers.update({'Authorization': auth_header}) # 使用临时会话发送请求 response = temp_session.post( url=url, data=data, json=json_data, files=files, headers=headers, timeout=self.timeout # 添加超时参数 ) else: # 正常请求,使用默认会话 response = self.session.post( url=url, data=data, json=json_data, headers=headers, timeout=self.timeout # 添加超时参数 ) # 记录响应日志 response_info = { "status_code": response.status_code, "url": url, "headers": dict(response.headers), "content_length": len(response.content) } logger.info(f"Received response: {response_info}") response.raise_for_status() # 抛出HTTP错误 return response.json() except Exception as e: # 记录错误日志 logger.error(f"Request failed: {str(e)}") raise def get(self, endpoint: str, params: Optional[Dict] = None, headers: Optional[Dict] = None) -> Dict[str, Any]: """ 发送GET请求 Args: endpoint: API端点路径(以/开头) params: 查询参数 headers: 自定义请求头 Returns: Dict: 响应JSON数据 Raises: requests.exceptions.RequestException: 请求失败时抛出 """ url = f"{self.base_url}{endpoint}" # 记录请求日志 request_info = { "method": "GET", "url": url, "params": params, "headers": headers } logger.info(f"Sending request: {request_info}") try: response = self.session.get( url=url, params=params, headers=headers, timeout=self.timeout # 添加超时参数 ) # 记录响应日志 response_info = { "status_code": response.status_code, "url": url, "headers": dict(response.headers), "content_length": len(response.content) } logger.info(f"Received response: {response_info}") response.raise_for_status() # 抛出HTTP错误 return response.json() except Exception as e: # 记录错误日志 logger.error(f"Request failed: {str(e)}") raise def get_json(self, endpoint: str, json_data: Optional[Dict] = None, headers: Optional[Dict] = None) -> Dict[str, Any]: """ 发送带有JSON数据的GET请求 Args: endpoint: API端点路径(以/开头) json_data: JSON数据 headers: 自定义请求头 Returns: Dict: 响应JSON数据 Raises: requests.exceptions.RequestException: 请求失败时抛出 """ url = f"{self.base_url}{endpoint}" # 记录请求日志 request_info = { "method": "GET", "url": url, "json": json_data, "headers": headers } logger.info(f"Sending request: {request_info}") try: response = self.session.get( url=url, json=json_data, headers=headers, timeout=self.timeout # 添加超时参数 ) # 记录响应日志 response_info = { "status_code": response.status_code, "url": url, "headers": dict(response.headers), "content_length": len(response.content) } logger.info(f"Received response: {response_info}") response.raise_for_status() # 抛出HTTP错误 # 将response.content转换为JSON return json.loads(response.content.decode('utf-8')) except Exception as e: # 记录错误日志 logger.error(f"Request failed: {str(e)}") raise def put(self, endpoint: str, data: Optional[Dict] = None, json_data: Optional[Dict] = None, headers: Optional[Dict] = None) -> Dict[str, Any]: """ 发送PUT请求 Args: endpoint: API端点路径(以/开头) data: 表单数据 json_data: JSON数据 headers: 自定义请求头 Returns: Dict: 响应JSON数据 Raises: requests.exceptions.RequestException: 请求失败时抛出 """ url = f"{self.base_url}{endpoint}" # 记录请求日志 request_info = { "method": "PUT", "url": url, "data": data, "json": json_data, "headers": headers } logger.info(f"Sending request: {request_info}") try: response = self.session.put( url=url, data=data, json=json_data, headers=headers, timeout=self.timeout # 添加超时参数 ) # 记录响应日志 response_info = { "status_code": response.status_code, "url": url, "headers": dict(response.headers), "content_length": len(response.content) } logger.info(f"Received response: {response_info}") response.raise_for_status() # 抛出HTTP错误 return response.json() except Exception as e: # 记录错误日志 logger.error(f"Request failed: {str(e)}") raise def delete(self, endpoint: str, data: Optional[Dict] = None, json_data: Optional[Dict] = None, headers: Optional[Dict] = None) -> Dict[str, Any]: """ 发送DELETE请求 Args: endpoint: API端点路径(以/开头) data: 表单数据 json_data: JSON数据 headers: 自定义请求头 Returns: Dict: 响应JSON数据 Raises: requests.exceptions.RequestException: 请求失败时抛出 """ url = f"{self.base_url}{endpoint}" # 记录请求日志 request_info = { "method": "DELETE", "url": url, "data": data, "json": json_data, "headers": headers } logger.info(f"Sending request: {request_info}") try: response = self.session.delete( url=url, data=data, json=json_data, headers=headers, timeout=self.timeout # 添加超时参数 ) # 记录响应日志 response_info = { "status_code": response.status_code, "url": url, "headers": dict(response.headers), "content_length": len(response.content) } logger.info(f"Received response: {response_info}") response.raise_for_status() # 抛出HTTP错误 return response.json() except Exception as e: # 记录错误日志 logger.error(f"Request failed: {str(e)}") raise def upload_file(self, endpoint: str, file_path: str, file_field_name: str = 'file', data: Optional[Dict] = None, headers: Optional[Dict] = None) -> Dict[str, Any]: """ 上传文件 Args: endpoint: API端点路径(以/开头) file_path: 本地文件路径 file_field_name: 表单字段名称 data: 额外的表单数据 headers: 自定义请求头 Returns: Dict: 响应JSON数据 Raises: requests.exceptions.RequestException: 请求失败时抛出 """ # 打开文件并构建files字典 with open(file_path, 'rb') as f: files = { file_field_name: (os.path.basename(file_path), f) } # 发送POST请求 return self.post(endpoint, data=data, files=files, headers=headers)