| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- """
- Infinity向量数据库连接基础类
- """
- import http.client
- import json
- from typing import Dict, Any
- from conf.config import VectorDBConfig
- class InfinityConnection:
- """
- Infinity向量数据库连接管理器
- 负责管理与Infinity向量数据库的连接
- """
-
- def __init__(self, host: str = None, port: int = None, user: str = None, password: str = None):
- """
- 初始化Infinity连接
-
- Args:
- host: Infinity主机地址
- port: Infinity端口
- user: 用户名
- password: 密码
- """
- # 获取配置
- self.host = host or VectorDBConfig.get_infinity_host()
- self.port = port or VectorDBConfig.get_infinity_port()
- self.user = user or VectorDBConfig.get_infinity_user()
- self.password = password or VectorDBConfig.get_infinity_password()
- self.base_url = f"http://{self.host}:{self.port}"
-
- # 初始化HTTP客户端
- self.http_client = http.client.HTTPConnection(self.host, self.port)
- self.headers = {
- 'Content-Type': 'application/json',
- 'Authorization': f'Basic {self._get_auth_token()}'
- }
-
- def _get_auth_token(self) -> str:
- """生成Basic Auth令牌"""
- import base64
- auth_str = f"{self.user}:{self.password}"
- return base64.b64encode(auth_str.encode()).decode()
-
- def _make_request(self, method: str, path: str, data: dict = None) -> dict:
- """发送HTTP请求"""
- try:
- body = json.dumps(data) if data else None
- self.http_client.request(method, path, body, self.headers)
- response = self.http_client.getresponse()
- response_data = json.loads(response.read().decode())
- return response_data
- except Exception as e:
- print(f"HTTP请求失败: {e}")
- return {"error": str(e)}
-
- def close(self):
- """关闭连接"""
- try:
- self.http_client.close()
- except Exception as e:
- print(f"关闭Infinity连接失败: {e}")
|