vector_db.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. """
  2. 向量数据库工厂类
  3. 支持动态切换Elasticsearch和Infinity向量数据库
  4. """
  5. from typing import Any, List, Dict, Optional
  6. from conf.config import VectorDBConfig
  7. from utils.es import ESConnection as ElasticsearchConnection
  8. class VectorDBFactory:
  9. """
  10. 向量数据库工厂类
  11. 根据配置创建不同类型的向量数据库连接
  12. """
  13. @staticmethod
  14. def get_vector_db():
  15. """
  16. 获取向量数据库实例
  17. Returns:
  18. VectorDBBase: 向量数据库实例
  19. """
  20. vector_db_type = VectorDBConfig.get_vector_db_type().lower()
  21. if vector_db_type == "es":
  22. return ElasticsearchVectorDB()
  23. elif vector_db_type == "infinity":
  24. return InfinityVectorDB()
  25. else:
  26. raise ValueError(f"不支持的向量数据库类型: {vector_db_type}")
  27. class VectorDBBase:
  28. """
  29. 向量数据库基类
  30. 定义了向量数据库应该实现的接口
  31. """
  32. def create_index(self, index_name: str, mappings: Dict[str, Any] = None) -> bool:
  33. """创建索引"""
  34. raise NotImplementedError()
  35. def insert_document(self, index_name: str, document: Dict[str, Any], id: str = None) -> bool:
  36. """插入单个文档"""
  37. raise NotImplementedError()
  38. def bulk_insert(self, index_name: str, documents: List[Dict[str, Any]]) -> Dict[str, Any]:
  39. """批量插入文档"""
  40. raise NotImplementedError()
  41. def search(self, index_name: str, query: Dict[str, Any], size: int = 10) -> Dict[str, Any]:
  42. """搜索文档"""
  43. raise NotImplementedError()
  44. def vector_search(self, index_name: str, vector_field: str, vector: List[float], size: int = 10, filter: Dict[str, Any] = None) -> Dict[str, Any]:
  45. """向量检索"""
  46. raise NotImplementedError()
  47. def hybrid_search(self, index_name: str, text_query: str, vector_field: str, vector: List[float],
  48. size: int = 10, text_weight: float = 0.5, vector_weight: float = 0.5) -> Dict[str, Any]:
  49. """混合检索"""
  50. raise NotImplementedError()
  51. def close(self):
  52. """关闭连接"""
  53. raise NotImplementedError()
  54. class ElasticsearchVectorDB(VectorDBBase):
  55. """
  56. Elasticsearch向量数据库实现
  57. """
  58. def __init__(self):
  59. """初始化Elasticsearch向量数据库"""
  60. self.es_conn = ElasticsearchConnection()
  61. def create_index(self, index_name: str, mappings: Dict[str, Any] = None) -> bool:
  62. """创建索引"""
  63. from utils.es.index import IndexManager
  64. index_manager = IndexManager(self.es_conn)
  65. return index_manager.create_index(index_name, mappings)
  66. def insert_document(self, index_name: str, document: Dict[str, Any], id: str = None) -> bool:
  67. """插入单个文档"""
  68. from utils.es.document import DocumentManager
  69. doc_manager = DocumentManager(self.es_conn)
  70. return doc_manager.insert(index_name, document, id)
  71. def bulk_insert(self, index_name: str, documents: List[Dict[str, Any]]) -> Dict[str, Any]:
  72. """批量插入文档"""
  73. from services.utils.es.document import DocumentManager
  74. doc_manager = DocumentManager(self.es_conn)
  75. return doc_manager.bulk_insert(index_name, documents)
  76. def search(self, index_name: str, query: Dict[str, Any], size: int = 10) -> Dict[str, Any]:
  77. """搜索文档"""
  78. from utils.es.search import SearchManager
  79. search_manager = SearchManager(self.es_conn)
  80. return search_manager.search(index_name, query, size=size)
  81. def vector_search(self, index_name: str, vector_field: str, vector: List[float], size: int = 10, filter: Dict[str, Any] = None) -> Dict[str, Any]:
  82. """向量检索"""
  83. from services.utils.es.search import SearchManager
  84. search_manager = SearchManager(self.es_conn)
  85. return search_manager.knn_search(index_name, vector_field, vector, size, filter)
  86. def hybrid_search(self, index_name: str, text_query: str, vector_field: str, vector: List[float],
  87. size: int = 10, text_weight: float = 0.5, vector_weight: float = 0.5) -> Dict[str, Any]:
  88. """混合检索"""
  89. from services.utils.es.search import SearchManager
  90. search_manager = SearchManager(self.es_conn)
  91. return search_manager.hybrid_search(index_name, text_query, vector_field, vector, size, text_weight=text_weight, vector_weight=vector_weight)
  92. def close(self):
  93. """关闭连接"""
  94. self.es_conn.close()
  95. class InfinityVectorDB(VectorDBBase):
  96. """
  97. Infinity向量数据库实现
  98. 支持infinity向量数据库的具体实现,包含PDF元数据入库
  99. """
  100. def __init__(self):
  101. """初始化Infinity向量数据库"""
  102. from utils.infinity_util import InfinityVectorDB as _InfinityVectorDB
  103. from conf.config import VectorDBConfig
  104. # 获取Infinity配置
  105. host = VectorDBConfig.get_infinity_host()
  106. port = VectorDBConfig.get_infinity_port()
  107. user = VectorDBConfig.get_infinity_user()
  108. password = VectorDBConfig.get_infinity_password()
  109. # 初始化新的InfinityVectorDB实例
  110. self._infinity_db = _InfinityVectorDB()
  111. def create_index(self, index_name: str, mappings: Dict[str, Any] = None) -> bool:
  112. """创建索引"""
  113. return self._infinity_db.create_index(index_name, mappings)
  114. def insert_document(self, index_name: str, document: Dict[str, Any], id: str = None) -> bool:
  115. """插入单个文档"""
  116. return self._infinity_db.insert_document(index_name, document, id)
  117. def bulk_insert(self, index_name: str, documents: List[Dict[str, Any]]) -> Dict[str, Any]:
  118. """批量插入文档"""
  119. return self._infinity_db.bulk_insert(index_name, documents)
  120. def search(self, index_name: str, query: Dict[str, Any], size: int = 10) -> Dict[str, Any]:
  121. """搜索文档"""
  122. return self._infinity_db.search(index_name, query, size)
  123. def vector_search(self, index_name: str, vector_field: str, vector: List[float], size: int = 10, filter: Dict[str, Any] = None) -> Dict[str, Any]:
  124. """向量检索"""
  125. return self._infinity_db.vector_search(index_name, vector_field, vector, size, filter)
  126. def hybrid_search(self, index_name: str, text_query: str, vector_field: str, vector: List[float],
  127. size: int = 10, text_weight: float = 0.5, vector_weight: float = 0.5) -> Dict[str, Any]:
  128. """混合检索"""
  129. return self._infinity_db.hybrid_search(index_name, text_query, vector_field, vector, size, text_weight, vector_weight)
  130. def close(self):
  131. """关闭连接"""
  132. self._infinity_db.close()