| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- """
- Infinity向量数据库文档管理器
- """
- from typing import List, Dict, Any, Optional
- from services.utils.infinity.base import InfinityConnection
- class InfinityDocumentManager:
- """
- Infinity向量数据库文档管理器
- 负责文档的增删改查操作
- """
-
- def __init__(self, infinity_connection: Optional[InfinityConnection] = None):
- """
- 初始化文档管理器
-
- Args:
- infinity_connection: Infinity连接实例,可选
- """
- self.infinity_conn = infinity_connection or InfinityConnection()
-
- def insert_document(self, index_name: str, document: Dict[str, Any], id: str = None) -> bool:
- """
- 插入单个文档
-
- Args:
- index_name: 索引名称
- document: 文档内容
- id: 文档ID,可选
-
- Returns:
- bool: 插入是否成功
- """
- try:
- path = f"/api/collections/{index_name}/documents"
- response = self.infinity_conn._make_request("POST", path, {"documents": [document]})
- return "error" not in response
- except Exception as e:
- print(f"插入Infinity文档失败: {e}")
- return False
-
- def bulk_insert(self, index_name: str, documents: List[Dict[str, Any]]) -> Dict[str, Any]:
- """
- 批量插入文档
-
- Args:
- index_name: 索引名称
- documents: 文档列表
-
- Returns:
- Dict: 包含成功和失败信息的字典
- """
- try:
- path = f"/api/collections/{index_name}/documents"
- response = self.infinity_conn._make_request("POST", path, {"documents": documents})
-
- if "error" not in response:
- return {
- "success": len(documents),
- "failed": 0,
- "errors": []
- }
- else:
- return {
- "success": 0,
- "failed": len(documents),
- "errors": [response["error"]] * len(documents)
- }
- except Exception as e:
- print(f"批量插入Infinity文档失败: {e}")
- return {
- "success": 0,
- "failed": len(documents),
- "errors": [str(e)] * len(documents)
- }
-
- def update_document(self, index_name: str, document_id: str, document: Dict[str, Any]) -> bool:
- """
- 更新单个文档
-
- Args:
- index_name: 索引名称
- document_id: 文档ID
- document: 要更新的文档内容
-
- Returns:
- bool: 更新是否成功
- """
- try:
- path = f"/api/collections/{index_name}/documents/{document_id}"
- response = self.infinity_conn._make_request("PUT", path, document)
- return "error" not in response
- except Exception as e:
- print(f"更新Infinity文档失败: {e}")
- return False
-
- def delete_document(self, index_name: str, document_id: str) -> bool:
- """
- 删除单个文档
-
- Args:
- index_name: 索引名称
- document_id: 文档ID
-
- Returns:
- bool: 删除是否成功
- """
- try:
- path = f"/api/collections/{index_name}/documents/{document_id}"
- response = self.infinity_conn._make_request("DELETE", path)
- return "error" not in response
- except Exception as e:
- print(f"删除Infinity文档失败: {e}")
- return False
-
- def get_document(self, index_name: str, document_id: str) -> Optional[Dict[str, Any]]:
- """
- 获取单个文档
-
- Args:
- index_name: 索引名称
- document_id: 文档ID
-
- Returns:
- Optional[Dict[str, Any]]: 文档内容,不存在则返回None
- """
- try:
- path = f"/api/collections/{index_name}/documents/{document_id}"
- response = self.infinity_conn._make_request("GET", path)
- return response if "error" not in response else None
- except Exception as e:
- print(f"获取Infinity文档失败: {e}")
- return None
-
- def delete_by_query(self, index_name: str, query: Dict[str, Any]) -> Dict[str, Any]:
- """
- 按查询条件删除文档
-
- Args:
- index_name: 索引名称
- query: 查询条件
-
- Returns:
- Dict: 删除结果
- """
- try:
- path = f"/api/collections/{index_name}/delete_by_query"
- response = self.infinity_conn._make_request("POST", path, {"query": query})
-
- if "error" not in response:
- return {
- "deleted": response.get("total", 0),
- "failed": 0
- }
- else:
- return {
- "deleted": 0,
- "failed": 1,
- "error": response["error"]
- }
- except Exception as e:
- print(f"按条件删除Infinity文档失败: {e}")
- return {
- "deleted": 0,
- "failed": 1,
- "error": str(e)
- }
|