""" 向量数据库工厂类 支持动态切换Elasticsearch和Infinity向量数据库 """ from typing import Any, List, Dict, Optional from conf.config import VectorDBConfig from services.utils.es import ESConnection as ElasticsearchConnection class VectorDBFactory: """ 向量数据库工厂类 根据配置创建不同类型的向量数据库连接 """ @staticmethod def get_vector_db(): """ 获取向量数据库实例 Returns: VectorDBBase: 向量数据库实例 """ vector_db_type = VectorDBConfig.get_vector_db_type().lower() if vector_db_type == "es": return ElasticsearchVectorDB() elif vector_db_type == "infinity": return InfinityVectorDB() else: raise ValueError(f"不支持的向量数据库类型: {vector_db_type}") class VectorDBBase: """ 向量数据库基类 定义了向量数据库应该实现的接口 """ def create_index(self, index_name: str, mappings: Dict[str, Any] = None) -> bool: """创建索引""" raise NotImplementedError() def insert_document(self, index_name: str, document: Dict[str, Any], id: str = None) -> bool: """插入单个文档""" raise NotImplementedError() def bulk_insert(self, index_name: str, documents: List[Dict[str, Any]]) -> Dict[str, Any]: """批量插入文档""" raise NotImplementedError() def search(self, index_name: str, query: Dict[str, Any], size: int = 10) -> Dict[str, Any]: """搜索文档""" raise NotImplementedError() def vector_search(self, index_name: str, vector_field: str, vector: List[float], size: int = 10, filter: Dict[str, Any] = None) -> Dict[str, Any]: """向量检索""" raise NotImplementedError() def hybrid_search(self, index_name: str, text_query: str, vector_field: str, vector: List[float], size: int = 10, text_weight: float = 0.5, vector_weight: float = 0.5) -> Dict[str, Any]: """混合检索""" raise NotImplementedError() def close(self): """关闭连接""" raise NotImplementedError() class ElasticsearchVectorDB(VectorDBBase): """ Elasticsearch向量数据库实现 """ def __init__(self): """初始化Elasticsearch向量数据库""" self.es_conn = ElasticsearchConnection() def create_index(self, index_name: str, mappings: Dict[str, Any] = None) -> bool: """创建索引""" from services.utils.es.index import IndexManager index_manager = IndexManager(self.es_conn) return index_manager.create_index(index_name, mappings) def insert_document(self, index_name: str, document: Dict[str, Any], id: str = None) -> bool: """插入单个文档""" from services.utils.es.document import DocumentManager doc_manager = DocumentManager(self.es_conn) return doc_manager.insert(index_name, document, id) def bulk_insert(self, index_name: str, documents: List[Dict[str, Any]]) -> Dict[str, Any]: """批量插入文档""" from services.utils.es.document import DocumentManager doc_manager = DocumentManager(self.es_conn) return doc_manager.bulk_insert(index_name, documents) def search(self, index_name: str, query: Dict[str, Any], size: int = 10) -> Dict[str, Any]: """搜索文档""" from services.utils.es.search import SearchManager search_manager = SearchManager(self.es_conn) return search_manager.search(index_name, query, size=size) def vector_search(self, index_name: str, vector_field: str, vector: List[float], size: int = 10, filter: Dict[str, Any] = None) -> Dict[str, Any]: """向量检索""" from services.utils.es.search import SearchManager search_manager = SearchManager(self.es_conn) return search_manager.knn_search(index_name, vector_field, vector, size, filter) def hybrid_search(self, index_name: str, text_query: str, vector_field: str, vector: List[float], size: int = 10, text_weight: float = 0.5, vector_weight: float = 0.5) -> Dict[str, Any]: """混合检索""" from services.utils.es.search import SearchManager search_manager = SearchManager(self.es_conn) return search_manager.hybrid_search(index_name, text_query, vector_field, vector, size, text_weight=text_weight, vector_weight=vector_weight) def close(self): """关闭连接""" self.es_conn.close() class InfinityVectorDB(VectorDBBase): """ Infinity向量数据库实现 支持infinity向量数据库的具体实现,包含PDF元数据入库 """ def __init__(self): """初始化Infinity向量数据库""" from services.utils.infinity import InfinityVectorDB as _InfinityVectorDB from conf.config import VectorDBConfig # 获取Infinity配置 host = VectorDBConfig.get_infinity_host() port = VectorDBConfig.get_infinity_port() user = VectorDBConfig.get_infinity_user() password = VectorDBConfig.get_infinity_password() # 初始化新的InfinityVectorDB实例 self._infinity_db = _InfinityVectorDB(host=host, port=port, user=user, password=password) def create_index(self, index_name: str, mappings: Dict[str, Any] = None) -> bool: """创建索引""" return self._infinity_db.create_index(index_name, mappings) def insert_document(self, index_name: str, document: Dict[str, Any], id: str = None) -> bool: """插入单个文档""" return self._infinity_db.insert_document(index_name, document, id) def bulk_insert(self, index_name: str, documents: List[Dict[str, Any]]) -> Dict[str, Any]: """批量插入文档""" return self._infinity_db.bulk_insert(index_name, documents) def search(self, index_name: str, query: Dict[str, Any], size: int = 10) -> Dict[str, Any]: """搜索文档""" return self._infinity_db.search(index_name, query, size) def vector_search(self, index_name: str, vector_field: str, vector: List[float], size: int = 10, filter: Dict[str, Any] = None) -> Dict[str, Any]: """向量检索""" return self._infinity_db.vector_search(index_name, vector_field, vector, size, filter) def hybrid_search(self, index_name: str, text_query: str, vector_field: str, vector: List[float], size: int = 10, text_weight: float = 0.5, vector_weight: float = 0.5) -> Dict[str, Any]: """混合检索""" return self._infinity_db.hybrid_search(index_name, text_query, vector_field, vector, size, text_weight, vector_weight) def close(self): """关闭连接""" self._infinity_db.close()