| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383 |
- import requests
- import logging
- import os
- import json
- from typing import Dict, Any, Optional
- from urllib3.util.retry import Retry
- from requests.adapters import HTTPAdapter
- # 配置日志
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- class HTTPClient:
- """HTTP请求工具类,用于发送各种HTTP请求"""
-
- def __init__(self, base_url: str, api_key: str = None, auth_type: str = 'bearer'):
- """
- 初始化HTTP客户端
-
- Args:
- base_url: API基础URL
- api_key: API密钥
- auth_type: 认证类型,支持'bearer'和'basic'
- """
- self.base_url = base_url.rstrip('/')
- self.api_key = api_key
- self.session = requests.Session()
-
- # 设置请求超时(秒)
- self.timeout = 30
-
- # 配置重试机制
- retry_strategy = Retry(
- total=3, # 最大重试次数
- backoff_factor=1, # 重试间隔:1秒、2秒、4秒
- status_forcelist=[502, 503, 504], # 重试的状态码(移除500,避免过多无效重试)
- allowed_methods=["GET", "POST", "PUT", "DELETE"] # 允许重试的方法
- )
-
- # 创建HTTP适配器并设置重试策略
- adapter = HTTPAdapter(max_retries=retry_strategy)
-
- # 将适配器应用到所有请求
- self.session.mount("http://", adapter)
- self.session.mount("https://", adapter)
-
- # 设置默认请求头
- if self.api_key:
- if auth_type == 'bearer':
- self.session.headers.update({
- 'Authorization': f'Bearer {self.api_key}'
- })
- elif auth_type == 'basic':
- # 处理Basic Auth,格式为"username:password"
- self.session.headers.update({
- 'Authorization': f'Basic {self.api_key}'
- })
-
- self.session.headers.update({
- 'Content-Type': 'application/json'
- })
-
- def post(self, endpoint: str, data: Optional[Dict] = None,
- json_data: Optional[Dict] = None, files: Optional[Dict] = None,
- headers: Optional[Dict] = None) -> Dict[str, Any]:
- """
- 发送POST请求
-
- Args:
- endpoint: API端点路径(以/开头)
- data: 表单数据
- json_data: JSON数据
- files: 文件数据
- headers: 自定义请求头
-
- Returns:
- Dict: 响应JSON数据
-
- Raises:
- requests.exceptions.RequestException: 请求失败时抛出
- """
- url = f"{self.base_url}{endpoint}"
-
- # 记录请求日志
- request_info = {
- "method": "POST",
- "url": url,
- "data": data,
- "json": json_data,
- "headers": headers,
- "files": files is not None # 不记录文件内容,只记录是否有文件
- }
- # 将请求报文写入D:\project\work\ragflow_plugs\book\output\temp下的request.txt文件
- with open(r"D:\project\work\ragflow_plugs\book\output\temp\request.txt", "w", encoding="utf-8") as f:
- f.write(str(request_info))
-
- logger.info(f"Sending request: {request_info}")
-
- try:
- # 当上传文件时,不使用默认的Content-Type: application/json头
- # 让requests库自动生成正确的multipart/form-data头
- if files:
- # 创建一个临时会话,不包含默认的Content-Type头
- temp_session = requests.Session()
-
- # 复制认证头
- if self.api_key:
- auth_header = self.session.headers.get('Authorization')
- if auth_header:
- temp_session.headers.update({'Authorization': auth_header})
-
- # 使用临时会话发送请求
- response = temp_session.post(
- url=url,
- data=data,
- json=json_data,
- files=files,
- headers=headers,
- timeout=self.timeout # 添加超时参数
- )
- else:
- # 正常请求,使用默认会话
- response = self.session.post(
- url=url,
- data=data,
- json=json_data,
- headers=headers,
- timeout=self.timeout # 添加超时参数
- )
-
- # 记录响应日志
- response_info = {
- "status_code": response.status_code,
- "url": url,
- "headers": dict(response.headers),
- "content_length": len(response.content)
- }
- logger.info(f"Received response: {response_info}")
-
- response.raise_for_status() # 抛出HTTP错误
- return response.json()
- except Exception as e:
- # 记录错误日志
- logger.error(f"Request failed: {str(e)}")
- raise
-
- def get(self, endpoint: str, params: Optional[Dict] = None,
- headers: Optional[Dict] = None) -> Dict[str, Any]:
- """
- 发送GET请求
-
- Args:
- endpoint: API端点路径(以/开头)
- params: 查询参数
- headers: 自定义请求头
-
- Returns:
- Dict: 响应JSON数据
-
- Raises:
- requests.exceptions.RequestException: 请求失败时抛出
- """
- url = f"{self.base_url}{endpoint}"
-
- # 记录请求日志
- request_info = {
- "method": "GET",
- "url": url,
- "params": params,
- "headers": headers
- }
- logger.info(f"Sending request: {request_info}")
-
- try:
- response = self.session.get(
- url=url,
- params=params,
- headers=headers,
- timeout=self.timeout # 添加超时参数
- )
-
- # 记录响应日志
- response_info = {
- "status_code": response.status_code,
- "url": url,
- "headers": dict(response.headers),
- "content_length": len(response.content)
- }
- logger.info(f"Received response: {response_info}")
-
- response.raise_for_status() # 抛出HTTP错误
- return response.json()
- except Exception as e:
- # 记录错误日志
- logger.error(f"Request failed: {str(e)}")
- raise
-
- def get_json(self, endpoint: str, json_data: Optional[Dict] = None,
- headers: Optional[Dict] = None) -> Dict[str, Any]:
- """
- 发送带有JSON数据的GET请求
-
- Args:
- endpoint: API端点路径(以/开头)
- json_data: JSON数据
- headers: 自定义请求头
-
- Returns:
- Dict: 响应JSON数据
-
- Raises:
- requests.exceptions.RequestException: 请求失败时抛出
- """
- url = f"{self.base_url}{endpoint}"
-
- # 记录请求日志
- request_info = {
- "method": "GET",
- "url": url,
- "json": json_data,
- "headers": headers
- }
- logger.info(f"Sending request: {request_info}")
-
- try:
- response = self.session.get(
- url=url,
- json=json_data,
- headers=headers,
- timeout=self.timeout # 添加超时参数
- )
-
- # 记录响应日志
- response_info = {
- "status_code": response.status_code,
- "url": url,
- "headers": dict(response.headers),
- "content_length": len(response.content)
- }
- logger.info(f"Received response: {response_info}")
-
- response.raise_for_status() # 抛出HTTP错误
- # 将response.content转换为JSON
- return json.loads(response.content.decode('utf-8'))
- except Exception as e:
- # 记录错误日志
- logger.error(f"Request failed: {str(e)}")
- raise
-
- def put(self, endpoint: str, data: Optional[Dict] = None,
- json_data: Optional[Dict] = None, headers: Optional[Dict] = None) -> Dict[str, Any]:
- """
- 发送PUT请求
-
- Args:
- endpoint: API端点路径(以/开头)
- data: 表单数据
- json_data: JSON数据
- headers: 自定义请求头
-
- Returns:
- Dict: 响应JSON数据
-
- Raises:
- requests.exceptions.RequestException: 请求失败时抛出
- """
- url = f"{self.base_url}{endpoint}"
-
- # 记录请求日志
- request_info = {
- "method": "PUT",
- "url": url,
- "data": data,
- "json": json_data,
- "headers": headers
- }
- logger.info(f"Sending request: {request_info}")
-
- try:
- response = self.session.put(
- url=url,
- data=data,
- json=json_data,
- headers=headers,
- timeout=self.timeout # 添加超时参数
- )
-
- # 记录响应日志
- response_info = {
- "status_code": response.status_code,
- "url": url,
- "headers": dict(response.headers),
- "content_length": len(response.content)
- }
- logger.info(f"Received response: {response_info}")
-
- response.raise_for_status() # 抛出HTTP错误
- return response.json()
- except Exception as e:
- # 记录错误日志
- logger.error(f"Request failed: {str(e)}")
- raise
-
- def delete(self, endpoint: str, data: Optional[Dict] = None,
- json_data: Optional[Dict] = None, headers: Optional[Dict] = None) -> Dict[str, Any]:
- """
- 发送DELETE请求
-
- Args:
- endpoint: API端点路径(以/开头)
- data: 表单数据
- json_data: JSON数据
- headers: 自定义请求头
-
- Returns:
- Dict: 响应JSON数据
-
- Raises:
- requests.exceptions.RequestException: 请求失败时抛出
- """
- url = f"{self.base_url}{endpoint}"
-
- # 记录请求日志
- request_info = {
- "method": "DELETE",
- "url": url,
- "data": data,
- "json": json_data,
- "headers": headers
- }
- logger.info(f"Sending request: {request_info}")
-
- try:
- response = self.session.delete(
- url=url,
- data=data,
- json=json_data,
- headers=headers,
- timeout=self.timeout # 添加超时参数
- )
-
- # 记录响应日志
- response_info = {
- "status_code": response.status_code,
- "url": url,
- "headers": dict(response.headers),
- "content_length": len(response.content)
- }
- logger.info(f"Received response: {response_info}")
-
- response.raise_for_status() # 抛出HTTP错误
- return response.json()
- except Exception as e:
- # 记录错误日志
- logger.error(f"Request failed: {str(e)}")
- raise
-
- def upload_file(self, endpoint: str, file_path: str, file_field_name: str = 'file',
- data: Optional[Dict] = None, headers: Optional[Dict] = None) -> Dict[str, Any]:
- """
- 上传文件
-
- Args:
- endpoint: API端点路径(以/开头)
- file_path: 本地文件路径
- file_field_name: 表单字段名称
- data: 额外的表单数据
- headers: 自定义请求头
-
- Returns:
- Dict: 响应JSON数据
-
- Raises:
- requests.exceptions.RequestException: 请求失败时抛出
- """
- # 打开文件并构建files字典
- with open(file_path, 'rb') as f:
- files = {
- file_field_name: (os.path.basename(file_path), f)
- }
-
- # 发送POST请求
- return self.post(endpoint, data=data, files=files, headers=headers)
|