""" Infinity向量数据库主类 基于官方Infinity Python SDK实现 """ from typing import List, Dict, Any, Optional import json class InfinityVectorDB: """ Infinity向量数据库主类 提供统一的接口,整合索引、文档和搜索功能 """ def __init__(self): """ 初始化Infinity向量数据库 使用HTTP API实现,不依赖官方SDK """ from conf.config import VectorDBConfig from utils.http_client import HTTPClient import base64 # 获取配置 self.host = VectorDBConfig.get_infinity_host() self.port = VectorDBConfig.get_infinity_port() self.user = VectorDBConfig.get_infinity_user() self.password = VectorDBConfig.get_infinity_password() self.database = VectorDBConfig.get_infinity_database() self.headers = { "Accept": "application/json", "Content-Type": "application/json" } # 生成Basic Auth令牌 auth_str = f"{self.user}:{self.password}" auth_token = base64.b64encode(auth_str.encode()).decode() # 初始化HTTP客户端 self.base_url = f"http://{self.host}:{self.port}" self.http_client = HTTPClient( base_url=self.base_url, api_key=auth_token, auth_type='basic' ) def create_index(self, index_name: str, mappings: Dict[str, Any] = None) -> bool: """创建索引""" try: # 使用Infinity官方HTTP API创建表(对应索引) path = f"/databases/{self.database}/tables/{index_name}" # 定义表字段 with open("conf/infinity_mapping.json", "r", encoding="utf-8") as f: fields = json.load(f) data = { "create_option": "ignore_if_exists", "fields": fields } response = self.http_client.post(path, json_data=data, headers=self.headers) return response.get("error_code") == 0 except Exception as e: print(f"Failed to create index: {str(e)}") return False def delete_index(self, index_name: str) -> bool: """删除索引""" try: # 使用Infinity官方HTTP API删除表(对应索引) path = f"/databases/{self.database}/tables/{index_name}" data = { "drop_option": "ignore_if_not_exists" } response = self.http_client.delete(path, json_data=data, headers=self.headers) return response.get("error_code") == 0 except Exception as e: print(f"Failed to delete index: {str(e)}") return False def index_exists(self, index_name: str) -> bool: """检查索引是否存在""" try: # 使用Infinity官方HTTP API获取表列表 path = f"/databases/{self.database}/tables" response = self.http_client.get(path, headers=self.headers) if response.get("error_code") == 0: tables = response.get("tables", []) return index_name in tables return False except Exception as e: print(f"Failed to check index existence: {str(e)}") return False def insert_document(self, index_name: str, document: Dict[str, Any], id: str = None) -> bool: """插入单个文档""" try: # 使用Infinity官方HTTP API插入单行数据 path = f"/databases/{self.database}/tables/{index_name}/docs" # 如果提供了id,将其添加到文档中 if id: document["id"] = id data = [document] response = self.http_client.post(path, json_data=data, headers=self.headers) return response.get("error_code") == 0 except Exception as e: print(f"Failed to insert document: {str(e)}") return False def bulk_insert(self, index_name: str, documents: List[Dict[str, Any]]) -> Dict[str, Any]: """批量插入文档""" try: # 使用Infinity官方HTTP API批量插入数据 path = f"/databases/{self.database}/tables/{index_name}/docs" data = documents response = self.http_client.post(path, json_data=data, headers=self.headers) if response.get("error_code") == 0: return { "success": True, "inserted": len(documents) } else: return { "success": False, "error": response.get("error_msg", "Unknown error"), "inserted": 0 } except Exception as e: print(f"Failed to bulk insert documents: {str(e)}") return { "success": False, "error": str(e), "inserted": 0 } def update_document(self, index_name: str, document_id: str, document: Dict[str, Any]) -> bool: """更新单个文档""" try: # 使用Infinity官方HTTP API更新行 path = f"/databases/{self.database}/tables/{index_name}/rows" data = { "update_by": { "column": "id", "value": document_id }, "update_data": document } response = self.http_client.put(path, json_data=data, headers=self.headers) return response.get("error_code") == 0 except Exception as e: print(f"Failed to update document: {str(e)}") return False def delete_document(self, index_name: str, document_id: str) -> bool: """删除单个文档""" try: # 使用Infinity官方HTTP API删除行 path = f"/databases/{self.database}/tables/{index_name}/rows" data = { "delete_by": { "column": "id", "value": document_id } } response = self.http_client.delete(path, json_data=data, headers=self.headers) return response.get("error_code") == 0 except Exception as e: print(f"Failed to delete document: {str(e)}") return False def get_document(self, index_name: str, document_id: str) -> Optional[Dict[str, Any]]: """获取单个文档""" try: # 使用Infinity官方HTTP API查询单行数据 path = f"/databases/{self.database}/tables/{index_name}/query" data = { "filter": { "column": "id", "operator": "=", "value": document_id }, "limit": 1 } response = self.http_client.post(path, json_data=data, headers=self.headers) if response.get("error_code") == 0: rows = response.get("rows", []) if rows: return rows[0] return None except Exception as e: print(f"Failed to get document: {str(e)}") return None def delete_by_query(self, index_name: str, query: Dict[str, Any]) -> Dict[str, Any]: """按查询条件删除文档""" try: # 使用Infinity官方HTTP API按条件删除行 path = f"/databases/{self.database}/tables/{index_name}/rows" # 构建删除条件 # 这里假设query是一个简单的字典,如{"dataset_id": "xxx"} filter_conditions = [] for key, value in query.items(): filter_conditions.append({ "column": key, "operator": "=", "value": value }) data = { "delete_by": { "and": filter_conditions } } response = self.http_client.delete(path, json_data=data, headers=self.headers) if response.get("error_code") == 0: return {"success": True} else: return {"success": False, "error": response.get("error_msg", "Unknown error")} except Exception as e: print(f"Failed to delete by query: {str(e)}") return {"success": False, "error": str(e)} def search(self, index_name: str, query: Dict[str, Any], size: int = 10) -> Dict[str, Any]: """搜索文档""" try: # 使用Infinity官方HTTP API查询数据 path = f"/databases/{self.database}/tables/{index_name}/docs" data = { "filter": query, "limit": size } response = self.http_client.post(path, json_data=data, headers=self.headers) if response.get("error_code") == 0: rows = response.get("output", []) return { "output": rows, } else: return {"hits": [], "total": 0, "error": response.get("error_msg", "Unknown error")} except Exception as e: print(f"Failed to search: {str(e)}") return {"output": [], "error": str(e)} def vector_search(self, index_name: str, vector_field: str, vector: List[float], size: int = 10, filter: Dict[str, Any] = None) -> Dict[str, Any]: """向量检索""" try: # 使用Infinity官方HTTP API进行向量检索 path = f"/databases/{self.database}/tables/{index_name}/docs" data = { "vector_field": vector_field, "vector": vector, "limit": size } if filter: data["filter"] = filter response = self.http_client.post(path, json_data=data, headers=self.headers) if response.get("error_code") == 0: rows = response.get("output", []) return { "hits": rows, "total": len(rows) } else: return {"hits": [], "total": 0, "error": response.get("error_msg", "Unknown error")} except Exception as e: print(f"Failed to vector search: {str(e)}") return {"hits": [], "total": 0, "error": str(e)} def hybrid_search(self, index_name: str, match_method: str, vector_field: str, query_vector: List[float], element_type: str, metric_type: str = "cosine", topn: int = 3, rank_constant: int = 60, text_query: str = "", text_field: str = "file_name" ) -> Dict[str, Any]: """混合检索""" try: # 使用Infinity官方HTTP API进行混合检索 path = f"/databases/{self.database}/tables/{index_name}/docs" # 构建搜索配置列表 search_config = [ { "match_method": match_method, "fields": vector_field, "query_vector": query_vector, "element_type": element_type, "metric_type": metric_type, "topn": topn, "params": { "ef": "10" } } ] # 只有当text_query和text_field都不为空时,才添加文本搜索配置 # if text_query and text_field: # search_config.append( # { # "match_method": "text", # "fields": text_field, # "matching_text": text_query, # "topn": 1, # "params": # { # "default_fields": text_field, # "operator": "or" # } # } # ) # 添加融合方法配置 # if vector_field and vector and text_query and text_field: # search_config.append( # { # "fusion_method": "rrf", # "topn": topn, # "params":{"rank_constant": rank_constant} # } # ) data = { "output": [ "file_name", "page_number", "content", "image_path", "dataset_id", "document_id", "_similarity" ], "search": search_config } response = self.http_client.get_json(path, json_data=data, headers=self.headers) if response["error_code"] == 0: rows = response["output"] # 将列表的列表转换为字典列表 output_fields = ["file_name", "page_number", "content", "image_path", "dataset_id", "document_id", "_similarity"] formatted_rows = [] for row in rows: # 创建字典,将每个字段名与对应的值匹配 formatted_row = {} for i, field in enumerate(output_fields): if i < len(row): # 处理字段值,确保是字典类型 if isinstance(row[i], dict): formatted_row.update(row[i]) else: formatted_row[field] = row[i] formatted_rows.append(formatted_row) return { "output": formatted_rows, "total": len(formatted_rows) } else: return {"output": [], "total": 0, "error": response["error_msg"]} except Exception as e: print(f"Failed to hybrid search: {str(e)}") return {"output": [], "total": 0, "error": str(e)}