""" Infinity 向量数据库适配器 封装现有的 InfinityClient,实现 VectorDBClient 接口。 """ from typing import Dict, Any, List, Optional from .base import VectorDBClient from src.utils.infinity import InfinityClient, get_client as get_infinity_client, close_client as close_infinity_client from src.conf.settings import vector_db_settings class InfinityAdapter(VectorDBClient): """ Infinity 向量数据库适配器 将现有的 InfinityClient 封装为统一的 VectorDBClient 接口。 """ def __init__( self, database: Optional[str] = None, host: Optional[str] = None, port: Optional[int] = None, min_connections: int = 5, max_connections: int = 10 ): """ 初始化 Infinity 适配器 Args: database: 数据库名称 host: Infinity 服务地址 port: Infinity 服务端口 min_connections: 最小连接数 max_connections: 最大连接数 """ self._database = database or vector_db_settings.infinity_database self._host = host or vector_db_settings.infinity_host self._port = port or vector_db_settings.infinity_sdk_port self._min_connections = min_connections self._max_connections = max_connections # 获取底层 InfinityClient self._client: InfinityClient = get_infinity_client( host=self._host, port=self._port, database=self._database, min_connections=self._min_connections, max_connections=self._max_connections ) @property def client(self) -> InfinityClient: """获取底层 InfinityClient""" return self._client def search( self, table_name: str, output_fields: List[str], query: Dict[str, Any], database_name: Optional[str] = None ) -> Any: """全文搜索""" return self._client.search(table_name, output_fields, query, database_name) def vector_search( self, table_name: str, output_fields: List[str], query: Dict[str, Any], database_name: Optional[str] = None ) -> Any: """向量搜索""" return self._client.vector_search(table_name, output_fields, query, database_name) def hybrid_search( self, table_name: str, output_fields: List[str], query: Dict[str, Any], database_name: Optional[str] = None ) -> Any: """混合搜索""" return self._client.hybrid_search(table_name, output_fields, query, database_name) def insert( self, table_name: str, documents: List[Dict[str, Any]], database_name: Optional[str] = None ) -> Any: """插入文档""" return self._client.insert(table_name, documents, database_name) def update( self, table_name: str, cond: str, data: Dict[str, Any], database_name: Optional[str] = None ) -> Any: """更新文档""" return self._client.update(table_name, cond, data, database_name) def get_status(self) -> Dict[str, Any]: """获取客户端状态""" return self._client.get_status() def close(self): """关闭客户端连接""" close_infinity_client(self._database) # ========== Infinity 特有方法(扩展) ========== def list_databases(self) -> List[str]: """获取所有数据库""" return self._client.list_databases() def list_tables(self, database_name: Optional[str] = None) -> List[str]: """获取所有表""" return self._client.list_tables(database_name) def create_table( self, table_name: str, columns_definition: List[Dict[str, Any]], database_name: Optional[str] = None ): """创建表""" return self._client.create_table(table_name, columns_definition, database_name=database_name) def drop_table(self, table_name: str, database_name: Optional[str] = None): """删除表""" return self._client.drop_table(table_name, database_name) def create_index( self, table_name: str, index_name: str, index_info: Dict[str, Any], database_name: Optional[str] = None ): """创建索引""" return self._client.create_index(table_name, index_name, index_info, database_name)