""" Infinity向量数据库文档管理器 """ from typing import List, Dict, Any, Optional from services.utils.infinity.base import InfinityConnection class InfinityDocumentManager: """ Infinity向量数据库文档管理器 负责文档的增删改查操作 """ def __init__(self, infinity_connection: Optional[InfinityConnection] = None): """ 初始化文档管理器 Args: infinity_connection: Infinity连接实例,可选 """ self.infinity_conn = infinity_connection or InfinityConnection() def insert_document(self, index_name: str, document: Dict[str, Any], id: str = None) -> bool: """ 插入单个文档 Args: index_name: 索引名称 document: 文档内容 id: 文档ID,可选 Returns: bool: 插入是否成功 """ try: path = f"/api/collections/{index_name}/documents" response = self.infinity_conn._make_request("POST", path, {"documents": [document]}) return "error" not in response except Exception as e: print(f"插入Infinity文档失败: {e}") return False def bulk_insert(self, index_name: str, documents: List[Dict[str, Any]]) -> Dict[str, Any]: """ 批量插入文档 Args: index_name: 索引名称 documents: 文档列表 Returns: Dict: 包含成功和失败信息的字典 """ try: path = f"/api/collections/{index_name}/documents" response = self.infinity_conn._make_request("POST", path, {"documents": documents}) if "error" not in response: return { "success": len(documents), "failed": 0, "errors": [] } else: return { "success": 0, "failed": len(documents), "errors": [response["error"]] * len(documents) } except Exception as e: print(f"批量插入Infinity文档失败: {e}") return { "success": 0, "failed": len(documents), "errors": [str(e)] * len(documents) } def update_document(self, index_name: str, document_id: str, document: Dict[str, Any]) -> bool: """ 更新单个文档 Args: index_name: 索引名称 document_id: 文档ID document: 要更新的文档内容 Returns: bool: 更新是否成功 """ try: path = f"/api/collections/{index_name}/documents/{document_id}" response = self.infinity_conn._make_request("PUT", path, document) return "error" not in response except Exception as e: print(f"更新Infinity文档失败: {e}") return False def delete_document(self, index_name: str, document_id: str) -> bool: """ 删除单个文档 Args: index_name: 索引名称 document_id: 文档ID Returns: bool: 删除是否成功 """ try: path = f"/api/collections/{index_name}/documents/{document_id}" response = self.infinity_conn._make_request("DELETE", path) return "error" not in response except Exception as e: print(f"删除Infinity文档失败: {e}") return False def get_document(self, index_name: str, document_id: str) -> Optional[Dict[str, Any]]: """ 获取单个文档 Args: index_name: 索引名称 document_id: 文档ID Returns: Optional[Dict[str, Any]]: 文档内容,不存在则返回None """ try: path = f"/api/collections/{index_name}/documents/{document_id}" response = self.infinity_conn._make_request("GET", path) return response if "error" not in response else None except Exception as e: print(f"获取Infinity文档失败: {e}") return None def delete_by_query(self, index_name: str, query: Dict[str, Any]) -> Dict[str, Any]: """ 按查询条件删除文档 Args: index_name: 索引名称 query: 查询条件 Returns: Dict: 删除结果 """ try: path = f"/api/collections/{index_name}/delete_by_query" response = self.infinity_conn._make_request("POST", path, {"query": query}) if "error" not in response: return { "deleted": response.get("total", 0), "failed": 0 } else: return { "deleted": 0, "failed": 1, "error": response["error"] } except Exception as e: print(f"按条件删除Infinity文档失败: {e}") return { "deleted": 0, "failed": 1, "error": str(e) }