| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- """
- Infinity 向量数据库适配器
- 封装现有的 InfinityClient,实现 VectorDBClient 接口。
- """
- from typing import Dict, Any, List, Optional
- from .base import VectorDBClient
- from src.utils.infinity import InfinityClient, get_client as get_infinity_client, close_client as close_infinity_client
- from src.conf.settings import vector_db_settings
- class InfinityAdapter(VectorDBClient):
- """
- Infinity 向量数据库适配器
-
- 将现有的 InfinityClient 封装为统一的 VectorDBClient 接口。
- """
-
- def __init__(
- self,
- database: Optional[str] = None,
- host: Optional[str] = None,
- port: Optional[int] = None,
- min_connections: int = 5,
- max_connections: int = 10
- ):
- """
- 初始化 Infinity 适配器
-
- Args:
- database: 数据库名称
- host: Infinity 服务地址
- port: Infinity 服务端口
- min_connections: 最小连接数
- max_connections: 最大连接数
- """
- self._database = database or vector_db_settings.infinity_database
- self._host = host or vector_db_settings.infinity_host
- self._port = port or vector_db_settings.infinity_sdk_port
- self._min_connections = min_connections
- self._max_connections = max_connections
-
- # 获取底层 InfinityClient
- self._client: InfinityClient = get_infinity_client(
- host=self._host,
- port=self._port,
- database=self._database,
- min_connections=self._min_connections,
- max_connections=self._max_connections
- )
-
- @property
- def client(self) -> InfinityClient:
- """获取底层 InfinityClient"""
- return self._client
-
- def search(
- self,
- table_name: str,
- output_fields: List[str],
- query: Dict[str, Any],
- database_name: Optional[str] = None
- ) -> Any:
- """全文搜索"""
- return self._client.search(table_name, output_fields, query, database_name)
-
- def vector_search(
- self,
- table_name: str,
- output_fields: List[str],
- query: Dict[str, Any],
- database_name: Optional[str] = None
- ) -> Any:
- """向量搜索"""
- return self._client.vector_search(table_name, output_fields, query, database_name)
-
- def hybrid_search(
- self,
- table_name: str,
- output_fields: List[str],
- query: Dict[str, Any],
- database_name: Optional[str] = None
- ) -> Any:
- """混合搜索"""
- return self._client.hybrid_search(table_name, output_fields, query, database_name)
-
- def insert(
- self,
- table_name: str,
- documents: List[Dict[str, Any]],
- database_name: Optional[str] = None
- ) -> Any:
- """插入文档"""
- return self._client.insert(table_name, documents, database_name)
-
- def update(
- self,
- table_name: str,
- cond: str,
- data: Dict[str, Any],
- database_name: Optional[str] = None
- ) -> Any:
- """更新文档"""
- return self._client.update(table_name, cond, data, database_name)
-
- def get_status(self) -> Dict[str, Any]:
- """获取客户端状态"""
- return self._client.get_status()
-
- def close(self):
- """关闭客户端连接"""
- close_infinity_client(self._database)
-
- # ========== Infinity 特有方法(扩展) ==========
-
- def list_databases(self) -> List[str]:
- """获取所有数据库"""
- return self._client.list_databases()
-
- def list_tables(self, database_name: Optional[str] = None) -> List[str]:
- """获取所有表"""
- return self._client.list_tables(database_name)
-
- def create_table(
- self,
- table_name: str,
- columns_definition: List[Dict[str, Any]],
- database_name: Optional[str] = None
- ):
- """创建表"""
- return self._client.create_table(table_name, columns_definition, database_name=database_name)
-
- def drop_table(self, table_name: str, database_name: Optional[str] = None):
- """删除表"""
- return self._client.drop_table(table_name, database_name)
-
- def create_index(
- self,
- table_name: str,
- index_name: str,
- index_info: Dict[str, Any],
- database_name: Optional[str] = None
- ):
- """创建索引"""
- return self._client.create_index(table_name, index_name, index_info, database_name)
|