base.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. """
  2. Infinity向量数据库连接基础类
  3. """
  4. import http.client
  5. import json
  6. from typing import Dict, Any
  7. from conf.config import VectorDBConfig
  8. class InfinityConnection:
  9. """
  10. Infinity向量数据库连接管理器
  11. 负责管理与Infinity向量数据库的连接
  12. """
  13. def __init__(self, host: str = None, port: int = None, user: str = None, password: str = None):
  14. """
  15. 初始化Infinity连接
  16. Args:
  17. host: Infinity主机地址
  18. port: Infinity端口
  19. user: 用户名
  20. password: 密码
  21. """
  22. # 获取配置
  23. self.host = host or VectorDBConfig.get_infinity_host()
  24. self.port = port or VectorDBConfig.get_infinity_port()
  25. self.user = user or VectorDBConfig.get_infinity_user()
  26. self.password = password or VectorDBConfig.get_infinity_password()
  27. self.base_url = f"http://{self.host}:{self.port}"
  28. # 初始化HTTP客户端
  29. self.http_client = http.client.HTTPConnection(self.host, self.port)
  30. self.headers = {
  31. 'Content-Type': 'application/json',
  32. 'Authorization': f'Basic {self._get_auth_token()}'
  33. }
  34. def _get_auth_token(self) -> str:
  35. """生成Basic Auth令牌"""
  36. import base64
  37. auth_str = f"{self.user}:{self.password}"
  38. return base64.b64encode(auth_str.encode()).decode()
  39. def _make_request(self, method: str, path: str, data: dict = None) -> dict:
  40. """发送HTTP请求"""
  41. try:
  42. body = json.dumps(data) if data else None
  43. self.http_client.request(method, path, body, self.headers)
  44. response = self.http_client.getresponse()
  45. response_data = json.loads(response.read().decode())
  46. return response_data
  47. except Exception as e:
  48. print(f"HTTP请求失败: {e}")
  49. return {"error": str(e)}
  50. def close(self):
  51. """关闭连接"""
  52. try:
  53. self.http_client.close()
  54. except Exception as e:
  55. print(f"关闭Infinity连接失败: {e}")