| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382 |
- """
- Infinity向量数据库主类
- 基于官方Infinity Python SDK实现
- """
- from typing import List, Dict, Any, Optional
- import json
- class InfinityVectorDB:
- """
- Infinity向量数据库主类
- 提供统一的接口,整合索引、文档和搜索功能
- """
-
- def __init__(self):
- """
- 初始化Infinity向量数据库
- 使用HTTP API实现,不依赖官方SDK
- """
- from conf.config import VectorDBConfig
- from utils.http_client import HTTPClient
- import base64
-
- # 获取配置
- self.host = VectorDBConfig.get_infinity_host()
- self.port = VectorDBConfig.get_infinity_port()
- self.user = VectorDBConfig.get_infinity_user()
- self.password = VectorDBConfig.get_infinity_password()
- self.database = VectorDBConfig.get_infinity_database()
- self.headers = {
- "Accept": "application/json",
- "Content-Type": "application/json"
- }
-
- # 生成Basic Auth令牌
- auth_str = f"{self.user}:{self.password}"
- auth_token = base64.b64encode(auth_str.encode()).decode()
-
- # 初始化HTTP客户端
- self.base_url = f"http://{self.host}:{self.port}"
- self.http_client = HTTPClient(
- base_url=self.base_url,
- api_key=auth_token,
- auth_type='basic'
- )
-
- def create_index(self, index_name: str, mappings: Dict[str, Any] = None) -> bool:
- """创建索引"""
- try:
- # 使用Infinity官方HTTP API创建表(对应索引)
- path = f"/databases/{self.database}/tables/{index_name}"
-
- # 定义表字段
- with open("conf/infinity_mapping.json", "r", encoding="utf-8") as f:
- fields = json.load(f)
-
- data = {
- "create_option": "ignore_if_exists",
- "fields": fields
- }
-
- response = self.http_client.post(path, json_data=data, headers=self.headers)
- return response.get("error_code") == 0
- except Exception as e:
- print(f"Failed to create index: {str(e)}")
- return False
-
- def delete_index(self, index_name: str) -> bool:
- """删除索引"""
- try:
- # 使用Infinity官方HTTP API删除表(对应索引)
- path = f"/databases/{self.database}/tables/{index_name}"
-
- data = {
- "drop_option": "ignore_if_not_exists"
- }
-
- response = self.http_client.delete(path, json_data=data, headers=self.headers)
- return response.get("error_code") == 0
- except Exception as e:
- print(f"Failed to delete index: {str(e)}")
- return False
-
- def index_exists(self, index_name: str) -> bool:
- """检查索引是否存在"""
- try:
- # 使用Infinity官方HTTP API获取表列表
- path = f"/databases/{self.database}/tables"
- response = self.http_client.get(path, headers=self.headers)
-
- if response.get("error_code") == 0:
- tables = response.get("tables", [])
- return index_name in tables
- return False
- except Exception as e:
- print(f"Failed to check index existence: {str(e)}")
- return False
-
- def insert_document(self, index_name: str, document: Dict[str, Any], id: str = None) -> bool:
- """插入单个文档"""
- try:
- # 使用Infinity官方HTTP API插入单行数据
- path = f"/databases/{self.database}/tables/{index_name}/docs"
-
- # 如果提供了id,将其添加到文档中
- if id:
- document["id"] = id
-
- data = [document]
- response = self.http_client.post(path, json_data=data, headers=self.headers)
- return response.get("error_code") == 0
- except Exception as e:
- print(f"Failed to insert document: {str(e)}")
- return False
-
- def bulk_insert(self, index_name: str, documents: List[Dict[str, Any]]) -> Dict[str, Any]:
- """批量插入文档"""
- try:
- # 使用Infinity官方HTTP API批量插入数据
- path = f"/databases/{self.database}/tables/{index_name}/docs"
-
- data = documents
- response = self.http_client.post(path, json_data=data, headers=self.headers)
-
- if response.get("error_code") == 0:
- return {
- "success": True,
- "inserted": len(documents)
- }
- else:
- return {
- "success": False,
- "error": response.get("error_msg", "Unknown error"),
- "inserted": 0
- }
- except Exception as e:
- print(f"Failed to bulk insert documents: {str(e)}")
- return {
- "success": False,
- "error": str(e),
- "inserted": 0
- }
-
- def update_document(self, index_name: str, document_id: str, document: Dict[str, Any]) -> bool:
- """更新单个文档"""
- try:
- # 使用Infinity官方HTTP API更新行
- path = f"/databases/{self.database}/tables/{index_name}/rows"
-
- data = {
- "update_by": {
- "column": "id",
- "value": document_id
- },
- "update_data": document
- }
-
- response = self.http_client.put(path, json_data=data, headers=self.headers)
- return response.get("error_code") == 0
- except Exception as e:
- print(f"Failed to update document: {str(e)}")
- return False
-
- def delete_document(self, index_name: str, document_id: str) -> bool:
- """删除单个文档"""
- try:
- # 使用Infinity官方HTTP API删除行
- path = f"/databases/{self.database}/tables/{index_name}/rows"
-
- data = {
- "delete_by": {
- "column": "id",
- "value": document_id
- }
- }
-
- response = self.http_client.delete(path, json_data=data, headers=self.headers)
- return response.get("error_code") == 0
- except Exception as e:
- print(f"Failed to delete document: {str(e)}")
- return False
-
- def get_document(self, index_name: str, document_id: str) -> Optional[Dict[str, Any]]:
- """获取单个文档"""
- try:
- # 使用Infinity官方HTTP API查询单行数据
- path = f"/databases/{self.database}/tables/{index_name}/query"
-
- data = {
- "filter": {
- "column": "id",
- "operator": "=",
- "value": document_id
- },
- "limit": 1
- }
-
- response = self.http_client.post(path, json_data=data, headers=self.headers)
-
- if response.get("error_code") == 0:
- rows = response.get("rows", [])
- if rows:
- return rows[0]
- return None
- except Exception as e:
- print(f"Failed to get document: {str(e)}")
- return None
-
- def delete_by_query(self, index_name: str, query: Dict[str, Any]) -> Dict[str, Any]:
- """按查询条件删除文档"""
- try:
- # 使用Infinity官方HTTP API按条件删除行
- path = f"/databases/{self.database}/tables/{index_name}/rows"
-
- # 构建删除条件
- # 这里假设query是一个简单的字典,如{"dataset_id": "xxx"}
- filter_conditions = []
- for key, value in query.items():
- filter_conditions.append({
- "column": key,
- "operator": "=",
- "value": value
- })
-
- data = {
- "delete_by": {
- "and": filter_conditions
- }
- }
-
- response = self.http_client.delete(path, json_data=data, headers=self.headers)
-
- if response.get("error_code") == 0:
- return {"success": True}
- else:
- return {"success": False, "error": response.get("error_msg", "Unknown error")}
- except Exception as e:
- print(f"Failed to delete by query: {str(e)}")
- return {"success": False, "error": str(e)}
-
- def search(self, index_name: str, query: Dict[str, Any], size: int = 10) -> Dict[str, Any]:
- """搜索文档"""
- try:
- # 使用Infinity官方HTTP API查询数据
- path = f"/databases/{self.database}/tables/{index_name}/docs"
-
- data = {
- "filter": query,
- "limit": size
- }
-
- response = self.http_client.post(path, json_data=data, headers=self.headers)
-
- if response.get("error_code") == 0:
- rows = response.get("output", [])
- return {
- "output": rows,
- }
- else:
- return {"hits": [], "total": 0, "error": response.get("error_msg", "Unknown error")}
- except Exception as e:
- print(f"Failed to search: {str(e)}")
- return {"output": [], "error": str(e)}
-
- def vector_search(self, index_name: str, vector_field: str, vector: List[float], size: int = 10, filter: Dict[str, Any] = None) -> Dict[str, Any]:
- """向量检索"""
- try:
- # 使用Infinity官方HTTP API进行向量检索
- path = f"/databases/{self.database}/tables/{index_name}/docs"
-
- data = {
- "vector_field": vector_field,
- "vector": vector,
- "limit": size
- }
-
- if filter:
- data["filter"] = filter
-
- response = self.http_client.post(path, json_data=data, headers=self.headers)
-
- if response.get("error_code") == 0:
- rows = response.get("output", [])
- return {
- "hits": rows,
- "total": len(rows)
- }
- else:
- return {"hits": [], "total": 0, "error": response.get("error_msg", "Unknown error")}
- except Exception as e:
- print(f"Failed to vector search: {str(e)}")
- return {"hits": [], "total": 0, "error": str(e)}
-
- def hybrid_search(self, index_name: str, match_method: str, vector_field: str, query_vector: List[float], element_type: str,
- metric_type: str = "cosine", topn: int = 3, rank_constant: int = 60,
- text_query: str = "", text_field: str = "file_name"
- ) -> Dict[str, Any]:
- """混合检索"""
- try:
- # 使用Infinity官方HTTP API进行混合检索
- path = f"/databases/{self.database}/tables/{index_name}/docs"
-
- # 构建搜索配置列表
- search_config = [
- {
- "match_method": match_method,
- "fields": vector_field,
- "query_vector": query_vector,
- "element_type": element_type,
- "metric_type": metric_type,
- "topn": topn,
- "params": {
- "ef": "10"
- }
- }
- ]
-
- # 只有当text_query和text_field都不为空时,才添加文本搜索配置
- # if text_query and text_field:
- # search_config.append(
- # {
- # "match_method": "text",
- # "fields": text_field,
- # "matching_text": text_query,
- # "topn": 1,
- # "params":
- # {
- # "default_fields": text_field,
- # "operator": "or"
- # }
- # }
- # )
-
- # 添加融合方法配置
- # if vector_field and vector and text_query and text_field:
- # search_config.append(
- # {
- # "fusion_method": "rrf",
- # "topn": topn,
- # "params":{"rank_constant": rank_constant}
- # }
- # )
-
- data = {
- "output": [
- "file_name",
- "page_number",
- "content",
- "image_path",
- "dataset_id",
- "document_id",
- "_similarity"
- ],
- "search": search_config
- }
-
- response = self.http_client.get_json(path, json_data=data, headers=self.headers)
-
- if response["error_code"] == 0:
- rows = response["output"]
- # 将列表的列表转换为字典列表
- output_fields = ["file_name", "page_number", "content", "image_path", "dataset_id", "document_id", "_similarity"]
- formatted_rows = []
- for row in rows:
- # 创建字典,将每个字段名与对应的值匹配
- formatted_row = {}
- for i, field in enumerate(output_fields):
- if i < len(row):
- # 处理字段值,确保是字典类型
- if isinstance(row[i], dict):
- formatted_row.update(row[i])
- else:
- formatted_row[field] = row[i]
- formatted_rows.append(formatted_row)
- return {
- "output": formatted_rows,
- "total": len(formatted_rows)
- }
- else:
- return {"output": [], "total": 0, "error": response["error_msg"]}
- except Exception as e:
- print(f"Failed to hybrid search: {str(e)}")
- return {"output": [], "total": 0, "error": str(e)}
|