""" Infinity向量数据库连接基础类 """ import http.client import json from typing import Dict, Any from conf.config import VectorDBConfig class InfinityConnection: """ Infinity向量数据库连接管理器 负责管理与Infinity向量数据库的连接 """ def __init__(self, host: str = None, port: int = None, user: str = None, password: str = None): """ 初始化Infinity连接 Args: host: Infinity主机地址 port: Infinity端口 user: 用户名 password: 密码 """ # 获取配置 self.host = host or VectorDBConfig.get_infinity_host() self.port = port or VectorDBConfig.get_infinity_port() self.user = user or VectorDBConfig.get_infinity_user() self.password = password or VectorDBConfig.get_infinity_password() self.base_url = f"http://{self.host}:{self.port}" # 初始化HTTP客户端 self.http_client = http.client.HTTPConnection(self.host, self.port) self.headers = { 'Content-Type': 'application/json', 'Authorization': f'Basic {self._get_auth_token()}' } def _get_auth_token(self) -> str: """生成Basic Auth令牌""" import base64 auth_str = f"{self.user}:{self.password}" return base64.b64encode(auth_str.encode()).decode() def _make_request(self, method: str, path: str, data: dict = None) -> dict: """发送HTTP请求""" try: body = json.dumps(data) if data else None self.http_client.request(method, path, body, self.headers) response = self.http_client.getresponse() response_data = json.loads(response.read().decode()) return response_data except Exception as e: print(f"HTTP请求失败: {e}") return {"error": str(e)} def close(self): """关闭连接""" try: self.http_client.close() except Exception as e: print(f"关闭Infinity连接失败: {e}")