""" Infinity向量数据库索引管理器 """ from typing import Dict, Any, Optional from services.utils.infinity.base import InfinityConnection class InfinityIndexManager: """ Infinity向量数据库索引管理器 负责索引(collection)的创建、删除和检查 """ def __init__(self, infinity_connection: Optional[InfinityConnection] = None): """ 初始化索引管理器 Args: infinity_connection: Infinity连接实例,可选 """ self.infinity_conn = infinity_connection or InfinityConnection() def create_index(self, index_name: str, mappings: Dict[str, Any] = None) -> bool: """ 创建索引(Infinity中的collection) Args: index_name: 索引名称 mappings: 自定义映射,会与默认映射合并 Returns: bool: 创建是否成功 """ try: # 创建collection的API路径 path = f"/api/collections/{index_name}" # 默认映射配置 default_mappings = { "fields": [ {"name": "file_name", "type": "string"}, {"name": "file_page_count", "type": "integer"}, {"name": "page_number", "type": "integer"}, {"name": "text", "type": "string"}, {"name": "image_path", "type": "string"}, {"name": "sparse_vector", "type": "array>"}, {"name": "dense_vector_1024", "type": "array"}, {"name": "dataset_id", "type": "string"}, {"name": "document_id", "type": "string"} ], "indexes": [ { "name": f"{index_name}_vector_index", "field": "dense_vector_1024", "type": "vector", "params": { "dimension": 1024, "metric": "cosine" } } ] } # 合并用户映射和默认映射 final_mappings = {**default_mappings, **(mappings or {})} response = self.infinity_conn._make_request("PUT", path, final_mappings) return "error" not in response except Exception as e: print(f"创建Infinity索引失败: {e}") return False def delete_index(self, index_name: str) -> bool: """ 删除索引(collection) Args: index_name: 索引名称 Returns: bool: 删除是否成功 """ try: path = f"/api/collections/{index_name}" response = self.infinity_conn._make_request("DELETE", path) return "error" not in response except Exception as e: print(f"删除Infinity索引失败: {e}") return False def index_exists(self, index_name: str) -> bool: """ 检查索引是否存在 Args: index_name: 索引名称 Returns: bool: 索引是否存在 """ try: path = f"/api/collections/{index_name}" response = self.infinity_conn._make_request("GET", path) return "error" not in response except Exception as e: print(f"检查Infinity索引存在失败: {e}") return False def get_index_info(self, index_name: str) -> Dict[str, Any]: """ 获取索引信息 Args: index_name: 索引名称 Returns: Dict[str, Any]: 索引信息 """ try: path = f"/api/collections/{index_name}" response = self.infinity_conn._make_request("GET", path) return response except Exception as e: print(f"获取Infinity索引信息失败: {e}") return {} def list_indexes(self) -> list: """ 获取所有索引列表 Returns: list: 索引列表 """ try: path = "/api/collections" response = self.infinity_conn._make_request("GET", path) return response.get("collections", []) except Exception as e: print(f"获取Infinity索引列表失败: {e}") return []