chunk_util.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. from typing import Optional
  2. from ragflow_sdk import Chunk
  3. from utils.ragflow_sdk.document_util import DocumentUtil
  4. from .base_util import RAGFlowBaseUtil
  5. class ChunkUtil(RAGFlowBaseUtil):
  6. def __init__(self, dataset_id: str, document_id: str):
  7. super().__init__()
  8. self.dataset_id = dataset_id
  9. self.document_id = document_id
  10. def add_chunk(self, dataset_name: Optional[str] = None, document_id: Optional[str] = None, content: str = None, important_keywords: list[str] = []):
  11. """
  12. 添加文档分块
  13. """
  14. try:
  15. if not dataset_name or not document_id or not content:
  16. raise ValueError("数据集名称、文档ID和内容不能为空")
  17. chunk = DocumentUtil().get_document(dataset_name=dataset_name, document_id=document_id).add_chunk(content=content, important_keywords=important_keywords)
  18. return chunk
  19. except Exception as e:
  20. print(f"添加文档分块失败: {e}")
  21. raise e
  22. def delete_chunks(self, chunk_ids: list[str]):
  23. """
  24. 删除文档分块
  25. """
  26. try:
  27. DocumentUtil().get_document(dataset_name=dataset_name, document_id=document_id).delete_chunks(chunk_ids)
  28. except Exception as e:
  29. print(f"删除文档分块失败: {e}")
  30. raise e
  31. def list_chunks(self, keywords: str = None, page: int = 1, page_size: int = 30, id : str = None) -> list[Chunk]:
  32. """
  33. 列出文档分块
  34. """
  35. try:
  36. chunks = DocumentUtil().get_document(dataset_name=dataset_name, document_id=document_id).list_chunks(keywords=keywords, page=page, page_size=page_size, id=id)
  37. return chunks
  38. except Exception as e:
  39. print(f"列出文档分块失败: {e}")
  40. raise e
  41. def get_chunk(self, chunk_id: str) -> Chunk:
  42. """
  43. 获取文档分块
  44. """
  45. _list = self.list_chunks(id=chunk_id)
  46. if len(_list) > 0:
  47. return _list[0]
  48. raise Exception("Chunk %s not found" % chunk_id)
  49. def update_chunk(self, chunk_id: str, update_message: dict):
  50. """
  51. 更新文档分块
  52. """
  53. try:
  54. self.get_chunk(chunk_id=chunk_id).update(update_message)
  55. except Exception as e:
  56. print(f"更新文档分块失败: {e}")
  57. raise e
  58. def retrieve(self, question: str, dataset_ids: list[str] = None, document_ids: list[str] = None, page: int = 1, page_size: int = 30, similarity_threshold: float = 0.2, vector_similarity_weight: float = 0.3, top_k: int = 1024, rerank_id: str = None, keyword: bool = False, cross_languages: list[str] = None, metadata_condition: dict = None) -> list[Chunk]:
  59. """
  60. 检索文档分块
  61. """
  62. try:
  63. chunks = self.ragflow_client.retrieve(question=question, dataset_ids=dataset_ids, document_ids=document_ids, page=page, page_size=page_size, similarity_threshold=similarity_threshold, vector_similarity_weight=vector_similarity_weight, top_k=top_k, rerank_id=rerank_id, keyword=keyword, cross_languages=cross_languages, metadata_condition=metadata_condition)
  64. return chunks
  65. except Exception as e:
  66. print(f"检索文档分块失败: {e}")
  67. raise e