document.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. """
  2. Infinity向量数据库文档管理器
  3. """
  4. from typing import List, Dict, Any, Optional
  5. from services.utils.infinity.base import InfinityConnection
  6. class InfinityDocumentManager:
  7. """
  8. Infinity向量数据库文档管理器
  9. 负责文档的增删改查操作
  10. """
  11. def __init__(self, infinity_connection: Optional[InfinityConnection] = None):
  12. """
  13. 初始化文档管理器
  14. Args:
  15. infinity_connection: Infinity连接实例,可选
  16. """
  17. self.infinity_conn = infinity_connection or InfinityConnection()
  18. def insert_document(self, index_name: str, document: Dict[str, Any], id: str = None) -> bool:
  19. """
  20. 插入单个文档
  21. Args:
  22. index_name: 索引名称
  23. document: 文档内容
  24. id: 文档ID,可选
  25. Returns:
  26. bool: 插入是否成功
  27. """
  28. try:
  29. path = f"/api/collections/{index_name}/documents"
  30. response = self.infinity_conn._make_request("POST", path, {"documents": [document]})
  31. return "error" not in response
  32. except Exception as e:
  33. print(f"插入Infinity文档失败: {e}")
  34. return False
  35. def bulk_insert(self, index_name: str, documents: List[Dict[str, Any]]) -> Dict[str, Any]:
  36. """
  37. 批量插入文档
  38. Args:
  39. index_name: 索引名称
  40. documents: 文档列表
  41. Returns:
  42. Dict: 包含成功和失败信息的字典
  43. """
  44. try:
  45. path = f"/api/collections/{index_name}/documents"
  46. response = self.infinity_conn._make_request("POST", path, {"documents": documents})
  47. if "error" not in response:
  48. return {
  49. "success": len(documents),
  50. "failed": 0,
  51. "errors": []
  52. }
  53. else:
  54. return {
  55. "success": 0,
  56. "failed": len(documents),
  57. "errors": [response["error"]] * len(documents)
  58. }
  59. except Exception as e:
  60. print(f"批量插入Infinity文档失败: {e}")
  61. return {
  62. "success": 0,
  63. "failed": len(documents),
  64. "errors": [str(e)] * len(documents)
  65. }
  66. def update_document(self, index_name: str, document_id: str, document: Dict[str, Any]) -> bool:
  67. """
  68. 更新单个文档
  69. Args:
  70. index_name: 索引名称
  71. document_id: 文档ID
  72. document: 要更新的文档内容
  73. Returns:
  74. bool: 更新是否成功
  75. """
  76. try:
  77. path = f"/api/collections/{index_name}/documents/{document_id}"
  78. response = self.infinity_conn._make_request("PUT", path, document)
  79. return "error" not in response
  80. except Exception as e:
  81. print(f"更新Infinity文档失败: {e}")
  82. return False
  83. def delete_document(self, index_name: str, document_id: str) -> bool:
  84. """
  85. 删除单个文档
  86. Args:
  87. index_name: 索引名称
  88. document_id: 文档ID
  89. Returns:
  90. bool: 删除是否成功
  91. """
  92. try:
  93. path = f"/api/collections/{index_name}/documents/{document_id}"
  94. response = self.infinity_conn._make_request("DELETE", path)
  95. return "error" not in response
  96. except Exception as e:
  97. print(f"删除Infinity文档失败: {e}")
  98. return False
  99. def get_document(self, index_name: str, document_id: str) -> Optional[Dict[str, Any]]:
  100. """
  101. 获取单个文档
  102. Args:
  103. index_name: 索引名称
  104. document_id: 文档ID
  105. Returns:
  106. Optional[Dict[str, Any]]: 文档内容,不存在则返回None
  107. """
  108. try:
  109. path = f"/api/collections/{index_name}/documents/{document_id}"
  110. response = self.infinity_conn._make_request("GET", path)
  111. return response if "error" not in response else None
  112. except Exception as e:
  113. print(f"获取Infinity文档失败: {e}")
  114. return None
  115. def delete_by_query(self, index_name: str, query: Dict[str, Any]) -> Dict[str, Any]:
  116. """
  117. 按查询条件删除文档
  118. Args:
  119. index_name: 索引名称
  120. query: 查询条件
  121. Returns:
  122. Dict: 删除结果
  123. """
  124. try:
  125. path = f"/api/collections/{index_name}/delete_by_query"
  126. response = self.infinity_conn._make_request("POST", path, {"query": query})
  127. if "error" not in response:
  128. return {
  129. "deleted": response.get("total", 0),
  130. "failed": 0
  131. }
  132. else:
  133. return {
  134. "deleted": 0,
  135. "failed": 1,
  136. "error": response["error"]
  137. }
  138. except Exception as e:
  139. print(f"按条件删除Infinity文档失败: {e}")
  140. return {
  141. "deleted": 0,
  142. "failed": 1,
  143. "error": str(e)
  144. }