| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- from typing import Optional
- from ragflow_sdk import Chunk
- from utils.ragflow_sdk.document_util import DocumentUtil
- from .base_util import RAGFlowBaseUtil
- class ChunkUtil(RAGFlowBaseUtil):
- def __init__(self, dataset_id: str, document_id: str):
- super().__init__()
- self.dataset_id = dataset_id
- self.document_id = document_id
- def add_chunk(self, dataset_name: Optional[str] = None, document_id: Optional[str] = None, content: str = None, important_keywords: list[str] = []):
- """
- 添加文档分块
- """
- try:
- if not dataset_name or not document_id or not content:
- raise ValueError("数据集名称、文档ID和内容不能为空")
- chunk = DocumentUtil().get_document(dataset_name=dataset_name, document_id=document_id).add_chunk(content=content, important_keywords=important_keywords)
- return chunk
- except Exception as e:
- print(f"添加文档分块失败: {e}")
- raise e
-
- def delete_chunks(self, chunk_ids: list[str]):
- """
- 删除文档分块
- """
- try:
- DocumentUtil().get_document(dataset_name=dataset_name, document_id=document_id).delete_chunks(chunk_ids)
- except Exception as e:
- print(f"删除文档分块失败: {e}")
- raise e
-
- def list_chunks(self, keywords: str = None, page: int = 1, page_size: int = 30, id : str = None) -> list[Chunk]:
- """
- 列出文档分块
- """
- try:
- chunks = DocumentUtil().get_document(dataset_name=dataset_name, document_id=document_id).list_chunks(keywords=keywords, page=page, page_size=page_size, id=id)
- return chunks
- except Exception as e:
- print(f"列出文档分块失败: {e}")
- raise e
- def get_chunk(self, chunk_id: str) -> Chunk:
- """
- 获取文档分块
- """
- _list = self.list_chunks(id=chunk_id)
- if len(_list) > 0:
- return _list[0]
- raise Exception("Chunk %s not found" % chunk_id)
-
- def update_chunk(self, chunk_id: str, update_message: dict):
- """
- 更新文档分块
- """
- try:
- self.get_chunk(chunk_id=chunk_id).update(update_message)
- except Exception as e:
- print(f"更新文档分块失败: {e}")
- raise e
-
- def retrieve(self, question: str, dataset_ids: list[str] = None, document_ids: list[str] = None, page: int = 1, page_size: int = 30, similarity_threshold: float = 0.2, vector_similarity_weight: float = 0.3, top_k: int = 1024, rerank_id: str = None, keyword: bool = False, cross_languages: list[str] = None, metadata_condition: dict = None) -> list[Chunk]:
- """
- 检索文档分块
- """
- try:
- chunks = self.ragflow_client.retrieve(question=question, dataset_ids=dataset_ids, document_ids=document_ids, page=page, page_size=page_size, similarity_threshold=similarity_threshold, vector_similarity_weight=vector_similarity_weight, top_k=top_k, rerank_id=rerank_id, keyword=keyword, cross_languages=cross_languages, metadata_condition=metadata_condition)
- return chunks
- except Exception as e:
- print(f"检索文档分块失败: {e}")
- raise e
-
|