| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- """
- 向量数据库工厂类
- 支持动态切换Elasticsearch和Infinity向量数据库
- """
- from typing import Any, List, Dict, Optional
- from conf.config import VectorDBConfig
- from services.utils.es import ESConnection as ElasticsearchConnection
- class VectorDBFactory:
- """
- 向量数据库工厂类
- 根据配置创建不同类型的向量数据库连接
- """
-
- @staticmethod
- def get_vector_db():
- """
- 获取向量数据库实例
-
- Returns:
- VectorDBBase: 向量数据库实例
- """
- vector_db_type = VectorDBConfig.get_vector_db_type().lower()
-
- if vector_db_type == "es":
- return ElasticsearchVectorDB()
- elif vector_db_type == "infinity":
- return InfinityVectorDB()
- else:
- raise ValueError(f"不支持的向量数据库类型: {vector_db_type}")
- class VectorDBBase:
- """
- 向量数据库基类
- 定义了向量数据库应该实现的接口
- """
-
- def create_index(self, index_name: str, mappings: Dict[str, Any] = None) -> bool:
- """创建索引"""
- raise NotImplementedError()
-
- def insert_document(self, index_name: str, document: Dict[str, Any], id: str = None) -> bool:
- """插入单个文档"""
- raise NotImplementedError()
-
- def bulk_insert(self, index_name: str, documents: List[Dict[str, Any]]) -> Dict[str, Any]:
- """批量插入文档"""
- raise NotImplementedError()
-
- def search(self, index_name: str, query: Dict[str, Any], size: int = 10) -> Dict[str, Any]:
- """搜索文档"""
- raise NotImplementedError()
-
- def vector_search(self, index_name: str, vector_field: str, vector: List[float], size: int = 10, filter: Dict[str, Any] = None) -> Dict[str, Any]:
- """向量检索"""
- raise NotImplementedError()
-
- def hybrid_search(self, index_name: str, text_query: str, vector_field: str, vector: List[float],
- size: int = 10, text_weight: float = 0.5, vector_weight: float = 0.5) -> Dict[str, Any]:
- """混合检索"""
- raise NotImplementedError()
-
- def close(self):
- """关闭连接"""
- raise NotImplementedError()
- class ElasticsearchVectorDB(VectorDBBase):
- """
- Elasticsearch向量数据库实现
- """
-
- def __init__(self):
- """初始化Elasticsearch向量数据库"""
- self.es_conn = ElasticsearchConnection()
-
- def create_index(self, index_name: str, mappings: Dict[str, Any] = None) -> bool:
- """创建索引"""
- from services.utils.es.index import IndexManager
- index_manager = IndexManager(self.es_conn)
- return index_manager.create_index(index_name, mappings)
-
- def insert_document(self, index_name: str, document: Dict[str, Any], id: str = None) -> bool:
- """插入单个文档"""
- from services.utils.es.document import DocumentManager
- doc_manager = DocumentManager(self.es_conn)
- return doc_manager.insert(index_name, document, id)
-
- def bulk_insert(self, index_name: str, documents: List[Dict[str, Any]]) -> Dict[str, Any]:
- """批量插入文档"""
- from services.utils.es.document import DocumentManager
- doc_manager = DocumentManager(self.es_conn)
- return doc_manager.bulk_insert(index_name, documents)
-
- def search(self, index_name: str, query: Dict[str, Any], size: int = 10) -> Dict[str, Any]:
- """搜索文档"""
- from services.utils.es.search import SearchManager
- search_manager = SearchManager(self.es_conn)
- return search_manager.search(index_name, query, size=size)
-
- def vector_search(self, index_name: str, vector_field: str, vector: List[float], size: int = 10, filter: Dict[str, Any] = None) -> Dict[str, Any]:
- """向量检索"""
- from services.utils.es.search import SearchManager
- search_manager = SearchManager(self.es_conn)
- return search_manager.knn_search(index_name, vector_field, vector, size, filter)
-
- def hybrid_search(self, index_name: str, text_query: str, vector_field: str, vector: List[float],
- size: int = 10, text_weight: float = 0.5, vector_weight: float = 0.5) -> Dict[str, Any]:
- """混合检索"""
- from services.utils.es.search import SearchManager
- search_manager = SearchManager(self.es_conn)
- return search_manager.hybrid_search(index_name, text_query, vector_field, vector, size, text_weight=text_weight, vector_weight=vector_weight)
-
- def close(self):
- """关闭连接"""
- self.es_conn.close()
- class InfinityVectorDB(VectorDBBase):
- """
- Infinity向量数据库实现
- 支持infinity向量数据库的具体实现,包含PDF元数据入库
- """
-
- def __init__(self):
- """初始化Infinity向量数据库"""
- from services.utils.infinity import InfinityVectorDB as _InfinityVectorDB
- from conf.config import VectorDBConfig
-
- # 获取Infinity配置
- host = VectorDBConfig.get_infinity_host()
- port = VectorDBConfig.get_infinity_port()
- user = VectorDBConfig.get_infinity_user()
- password = VectorDBConfig.get_infinity_password()
-
- # 初始化新的InfinityVectorDB实例
- self._infinity_db = _InfinityVectorDB(host=host, port=port, user=user, password=password)
-
- def create_index(self, index_name: str, mappings: Dict[str, Any] = None) -> bool:
- """创建索引"""
- return self._infinity_db.create_index(index_name, mappings)
-
- def insert_document(self, index_name: str, document: Dict[str, Any], id: str = None) -> bool:
- """插入单个文档"""
- return self._infinity_db.insert_document(index_name, document, id)
-
- def bulk_insert(self, index_name: str, documents: List[Dict[str, Any]]) -> Dict[str, Any]:
- """批量插入文档"""
- return self._infinity_db.bulk_insert(index_name, documents)
-
- def search(self, index_name: str, query: Dict[str, Any], size: int = 10) -> Dict[str, Any]:
- """搜索文档"""
- return self._infinity_db.search(index_name, query, size)
-
- def vector_search(self, index_name: str, vector_field: str, vector: List[float], size: int = 10, filter: Dict[str, Any] = None) -> Dict[str, Any]:
- """向量检索"""
- return self._infinity_db.vector_search(index_name, vector_field, vector, size, filter)
-
- def hybrid_search(self, index_name: str, text_query: str, vector_field: str, vector: List[float],
- size: int = 10, text_weight: float = 0.5, vector_weight: float = 0.5) -> Dict[str, Any]:
- """混合检索"""
- return self._infinity_db.hybrid_search(index_name, text_query, vector_field, vector, size, text_weight, vector_weight)
-
- def close(self):
- """关闭连接"""
- self._infinity_db.close()
|