|
|
@@ -3,25 +3,67 @@
|
|
|
|
|
|
该文件提供数据集管理的 API 接口,支持:
|
|
|
- PDF 文件上传和解析
|
|
|
+- QA问答对生成
|
|
|
+- 图片解析
|
|
|
- 数据集创建
|
|
|
"""
|
|
|
|
|
|
+import tempfile
|
|
|
+import os
|
|
|
from fastapi import FastAPI, UploadFile, File, Form
|
|
|
+from pydantic import BaseModel, Field
|
|
|
+from typing import Optional
|
|
|
+
|
|
|
from src.api.dataset.services.dataset_manage_service import DatasetManageService
|
|
|
from src.common.result import Result
|
|
|
+from src.utils.task_queue import get_task_queue
|
|
|
+from src.common.logging_config import get_logger
|
|
|
|
|
|
+logger = get_logger(__name__)
|
|
|
|
|
|
# 创建 FastAPI 应用
|
|
|
app = FastAPI(
|
|
|
title="数据集管理 API",
|
|
|
- description="数据集管理服务,提供 PDF 解析和数据集创建功能",
|
|
|
- version="1.0.0"
|
|
|
+ description="数据集管理服务,提供 PDF 解析、QA生成和图片解析功能",
|
|
|
+ version="2.0.0"
|
|
|
)
|
|
|
|
|
|
# 创建数据集管理服务实例
|
|
|
dataset_service = DatasetManageService()
|
|
|
|
|
|
|
|
|
+# ==================== 请求模型 ====================
|
|
|
+
|
|
|
+class PDFParseRequest(BaseModel):
|
|
|
+ """PDF解析请求模型"""
|
|
|
+ dataset_name: str = Field(..., description="数据集名称")
|
|
|
+ page_dataset_id: str = Field(..., description="页面数据集ID")
|
|
|
+
|
|
|
+
|
|
|
+class QAParseRequest(BaseModel):
|
|
|
+ """QA解析请求模型"""
|
|
|
+ dataset_id: str = Field(..., description="RAGFlow数据集ID")
|
|
|
+ qa_count_per_chunk: int = Field(default=50, ge=1, le=200, description="每块生成的QA数量")
|
|
|
+ chunk_size: int = Field(default=1000, ge=100, le=5000, description="文本分块大小")
|
|
|
+ chunk_overlap: int = Field(default=200, ge=0, le=1000, description="分块重叠大小")
|
|
|
+
|
|
|
+
|
|
|
+class ImageParseRequest(BaseModel):
|
|
|
+ """图片解析请求模型"""
|
|
|
+ book_name: str = Field(..., description="书名")
|
|
|
+ dataset_id: str = Field(..., description="数据集ID")
|
|
|
+
|
|
|
+
|
|
|
+class DynamicParseRequest(BaseModel):
|
|
|
+ """动态多维度解析请求模型"""
|
|
|
+ dimension_ids: list = Field(..., description="维度ID列表")
|
|
|
+ book_name: str = Field(..., description="书名")
|
|
|
+ dataset_id: str = Field(..., description="数据集ID")
|
|
|
+ document_id: str = Field(default="", description="文档ID")
|
|
|
+
|
|
|
+
|
|
|
+# ==================== 原有接口 ====================
|
|
|
+
|
|
|
@app.post("/parse-pdf")
|
|
|
async def parse_pdf(
|
|
|
file: UploadFile = File(...),
|
|
|
@@ -29,7 +71,7 @@ async def parse_pdf(
|
|
|
|
|
|
):
|
|
|
"""
|
|
|
- 解析 PDF 文件接口
|
|
|
+ 解析 PDF 文件接口(原版)
|
|
|
|
|
|
- **file**: PDF 文件附件
|
|
|
- **series_name**: 系列名
|
|
|
@@ -51,4 +93,337 @@ async def parse_pdf(
|
|
|
|
|
|
return Result.success(data=result, message="PDF 解析成功")
|
|
|
except Exception as e:
|
|
|
- return Result.error(code=500, message=f"解析 PDF 文件失败: {str(e)}")
|
|
|
+ return Result.error(code=500, message=f"解析 PDF 文件失败: {str(e)}")
|
|
|
+
|
|
|
+
|
|
|
+# ==================== V2 工作流接口 ====================
|
|
|
+
|
|
|
+@app.post("/v2/pdf-parse")
|
|
|
+async def pdf_parse_v2(
|
|
|
+ file: UploadFile = File(..., description="PDF文件"),
|
|
|
+ dataset_name: str = Form(..., description="数据集名称"),
|
|
|
+ page_dataset_id: str = Form(..., description="页面数据集ID")
|
|
|
+):
|
|
|
+ """
|
|
|
+ PDF解析接口 (V2工作流)
|
|
|
+
|
|
|
+ 使用组件化工作流解析PDF,包含:
|
|
|
+ - PDF拆分为图片
|
|
|
+ - VL模型OCR解析
|
|
|
+ - 向量化入库Infinity
|
|
|
+ - 同步到RAGFlow
|
|
|
+
|
|
|
+ - **file**: PDF 文件附件
|
|
|
+ - **dataset_name**: 数据集名称
|
|
|
+ - **page_dataset_id**: 页面数据集ID
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 验证文件格式
|
|
|
+ if not file.filename.endswith((".pdf", ".PDF")):
|
|
|
+ return Result.error(code=400, message="只支持 PDF 格式的文件")
|
|
|
+
|
|
|
+ # 保存文件到临时目录
|
|
|
+ file_content = await file.read()
|
|
|
+ with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
|
|
|
+ tmp_file.write(file_content)
|
|
|
+ pdf_path = tmp_file.name
|
|
|
+
|
|
|
+ logger.info(f"开始PDF解析V2: {file.filename}")
|
|
|
+
|
|
|
+ # 定义工作流执行函数
|
|
|
+ def run_pdf_workflow():
|
|
|
+ try:
|
|
|
+ from src.datasets.parser.workflows import PDFParsingWorkflowV2
|
|
|
+ workflow = PDFParsingWorkflowV2()
|
|
|
+ result = workflow.run(
|
|
|
+ pdf_path=pdf_path,
|
|
|
+ page_dataset_id=page_dataset_id,
|
|
|
+ dataset_name=dataset_name
|
|
|
+ )
|
|
|
+ return result
|
|
|
+ finally:
|
|
|
+ # 清理临时文件
|
|
|
+ if os.path.exists(pdf_path):
|
|
|
+ os.unlink(pdf_path)
|
|
|
+
|
|
|
+ # 提交到任务队列
|
|
|
+ task_queue = get_task_queue()
|
|
|
+ task_id = await task_queue.submit(
|
|
|
+ name=f"PDF解析-{file.filename}",
|
|
|
+ func=run_pdf_workflow
|
|
|
+ )
|
|
|
+
|
|
|
+ queue_info = task_queue.get_queue_info()
|
|
|
+ return Result.success(data={
|
|
|
+ "task_id": task_id,
|
|
|
+ "message": "任务已提交到队列",
|
|
|
+ "queue_info": queue_info
|
|
|
+ }, message="任务已提交")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"PDF解析V2提交失败: {str(e)}")
|
|
|
+ return Result.error(code=500, message=f"任务提交失败: {str(e)}")
|
|
|
+
|
|
|
+
|
|
|
+@app.post("/v2/qa-parse")
|
|
|
+async def qa_parse_v2(
|
|
|
+ file: UploadFile = File(..., description="PDF文件"),
|
|
|
+ dataset_id: str = Form(..., description="RAGFlow数据集ID"),
|
|
|
+ qa_count_per_chunk: int = Form(default=50, description="每块生成的QA数量"),
|
|
|
+ chunk_size: int = Form(default=1000, description="文本分块大小"),
|
|
|
+ chunk_overlap: int = Form(default=200, description="分块重叠大小")
|
|
|
+):
|
|
|
+ """
|
|
|
+ QA问答对解析接口 (V2工作流)
|
|
|
+
|
|
|
+ 从PDF生成问答对,包含:
|
|
|
+ - PDF OCR解析
|
|
|
+ - 文本分块
|
|
|
+ - 并行QA对生成
|
|
|
+ - 导出CSV并上传RAGFlow
|
|
|
+
|
|
|
+ - **file**: PDF 文件附件
|
|
|
+ - **dataset_id**: RAGFlow数据集ID
|
|
|
+ - **qa_count_per_chunk**: 每块生成的QA数量(默认50)
|
|
|
+ - **chunk_size**: 文本分块大小(默认1000)
|
|
|
+ - **chunk_overlap**: 分块重叠大小(默认200)
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 验证文件格式
|
|
|
+ if not file.filename.endswith((".pdf", ".PDF")):
|
|
|
+ return Result.error(code=400, message="只支持 PDF 格式的文件")
|
|
|
+
|
|
|
+ # 保存文件到临时目录
|
|
|
+ file_content = await file.read()
|
|
|
+ with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
|
|
|
+ tmp_file.write(file_content)
|
|
|
+ pdf_path = tmp_file.name
|
|
|
+
|
|
|
+ logger.info(f"开始QA解析V2: {file.filename}")
|
|
|
+
|
|
|
+ # 定义工作流执行函数
|
|
|
+ def run_qa_workflow():
|
|
|
+ try:
|
|
|
+ from src.datasets.parser.workflows import QAParsingWorkflowV2
|
|
|
+ workflow = QAParsingWorkflowV2()
|
|
|
+ result = workflow.run(
|
|
|
+ pdf_path=pdf_path,
|
|
|
+ dataset_id=dataset_id,
|
|
|
+ qa_count_per_chunk=qa_count_per_chunk,
|
|
|
+ chunk_size=chunk_size,
|
|
|
+ chunk_overlap=chunk_overlap
|
|
|
+ )
|
|
|
+ return result
|
|
|
+ finally:
|
|
|
+ # 清理临时文件
|
|
|
+ if os.path.exists(pdf_path):
|
|
|
+ os.unlink(pdf_path)
|
|
|
+
|
|
|
+ # 提交到任务队列
|
|
|
+ task_queue = get_task_queue()
|
|
|
+ task_id = await task_queue.submit(
|
|
|
+ name=f"QA解析-{file.filename}",
|
|
|
+ func=run_qa_workflow
|
|
|
+ )
|
|
|
+
|
|
|
+ queue_info = task_queue.get_queue_info()
|
|
|
+ return Result.success(data={
|
|
|
+ "task_id": task_id,
|
|
|
+ "message": "任务已提交到队列",
|
|
|
+ "queue_info": queue_info
|
|
|
+ }, message="任务已提交")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"QA解析V2提交失败: {str(e)}")
|
|
|
+ return Result.error(code=500, message=f"任务提交失败: {str(e)}")
|
|
|
+
|
|
|
+
|
|
|
+@app.post("/v2/image-parse")
|
|
|
+async def image_parse_v2(
|
|
|
+ file: UploadFile = File(..., description="图片压缩包(ZIP)"),
|
|
|
+ book_name: str = Form(..., description="书名"),
|
|
|
+ dataset_id: str = Form(..., description="数据集ID")
|
|
|
+):
|
|
|
+ """
|
|
|
+ 图片解析接口 (V2工作流)
|
|
|
+
|
|
|
+ 解析图片压缩包,包含:
|
|
|
+ - 解压并上传到MinIO
|
|
|
+ - VL模型图片解析
|
|
|
+ - 向量化入库Infinity
|
|
|
+
|
|
|
+ - **file**: 图片压缩包(ZIP格式)
|
|
|
+ - **book_name**: 书名
|
|
|
+ - **dataset_id**: 数据集ID
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 验证文件格式
|
|
|
+ if not file.filename.endswith((".zip", ".ZIP")):
|
|
|
+ return Result.error(code=400, message="只支持 ZIP 格式的压缩包")
|
|
|
+
|
|
|
+ # 保存文件到临时目录
|
|
|
+ file_content = await file.read()
|
|
|
+ with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp_file:
|
|
|
+ tmp_file.write(file_content)
|
|
|
+ zip_path = tmp_file.name
|
|
|
+
|
|
|
+ logger.info(f"开始图片解析V2: {file.filename}")
|
|
|
+
|
|
|
+ # 定义工作流执行函数
|
|
|
+ def run_image_workflow():
|
|
|
+ try:
|
|
|
+ from src.datasets.parser.workflows import ImageParsingWorkflowV2
|
|
|
+ workflow = ImageParsingWorkflowV2()
|
|
|
+ result = workflow.run(
|
|
|
+ zip_file_path=zip_path,
|
|
|
+ book_name=book_name,
|
|
|
+ dataset_id=dataset_id
|
|
|
+ )
|
|
|
+ return result
|
|
|
+ finally:
|
|
|
+ # 清理临时文件
|
|
|
+ if os.path.exists(zip_path):
|
|
|
+ os.unlink(zip_path)
|
|
|
+
|
|
|
+ # 提交到任务队列
|
|
|
+ task_queue = get_task_queue()
|
|
|
+ task_id = await task_queue.submit(
|
|
|
+ name=f"图片解析-{file.filename}",
|
|
|
+ func=run_image_workflow
|
|
|
+ )
|
|
|
+
|
|
|
+ queue_info = task_queue.get_queue_info()
|
|
|
+ return Result.success(data={
|
|
|
+ "task_id": task_id,
|
|
|
+ "message": "任务已提交到队列",
|
|
|
+ "queue_info": queue_info
|
|
|
+ }, message="任务已提交")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"图片解析V2提交失败: {str(e)}")
|
|
|
+ return Result.error(code=500, message=f"任务提交失败: {str(e)}")
|
|
|
+
|
|
|
+
|
|
|
+# ==================== 任务队列接口 ====================
|
|
|
+
|
|
|
+@app.get("/v2/task/{task_id}")
|
|
|
+async def get_task_status(task_id: str):
|
|
|
+ """
|
|
|
+ 获取任务状态
|
|
|
+
|
|
|
+ - **task_id**: 任务ID
|
|
|
+ """
|
|
|
+ task_queue = get_task_queue()
|
|
|
+ status = task_queue.get_task_status(task_id)
|
|
|
+ if not status:
|
|
|
+ return Result.error(code=404, message=f"任务不存在: {task_id}")
|
|
|
+ return Result.success(data=status, message="获取任务状态成功")
|
|
|
+
|
|
|
+
|
|
|
+@app.get("/v2/queue")
|
|
|
+async def get_queue_status():
|
|
|
+ """
|
|
|
+ 获取队列状态信息
|
|
|
+ """
|
|
|
+ task_queue = get_task_queue()
|
|
|
+ queue_info = task_queue.get_queue_info()
|
|
|
+ return Result.success(data=queue_info, message="获取队列状态成功")
|
|
|
+
|
|
|
+
|
|
|
+@app.post("/v2/queue/clear")
|
|
|
+async def clear_completed_tasks():
|
|
|
+ """
|
|
|
+ 清理已完成的任务记录
|
|
|
+ """
|
|
|
+ task_queue = get_task_queue()
|
|
|
+ task_queue.clear_completed()
|
|
|
+ return Result.success(data={}, message="已清理完成的任务")
|
|
|
+
|
|
|
+
|
|
|
+# ==================== 动态多维度解析接口 ====================
|
|
|
+
|
|
|
+@app.post("/v2/dynamic-parse")
|
|
|
+async def dynamic_parse_v2(
|
|
|
+ file: UploadFile = File(..., description="图片压缩包(ZIP)"),
|
|
|
+ dimension_ids: str = Form(..., description="维度ID列表,逗号分隔,如: 1,2,3"),
|
|
|
+ book_name: str = Form(..., description="书名"),
|
|
|
+ dataset_id: str = Form(..., description="数据集ID"),
|
|
|
+ document_id: str = Form(default="", description="文档ID")
|
|
|
+):
|
|
|
+ """
|
|
|
+ 动态多维度解析接口 (V2工作流)
|
|
|
+
|
|
|
+ 根据传入的维度ID列表,依次使用每个维度的提示词解析图片并入库。
|
|
|
+ 每个维度对应的表/索引命名为 book_{dimension_id}。
|
|
|
+
|
|
|
+ - **file**: 图片压缩包(ZIP格式)
|
|
|
+ - **dimension_ids**: 维度ID列表,逗号分隔
|
|
|
+ - **book_name**: 书名
|
|
|
+ - **dataset_id**: 数据集ID
|
|
|
+ - **document_id**: 文档ID(可选)
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ # 验证文件格式
|
|
|
+ if not file.filename.endswith((".pdf", ".PDF")):
|
|
|
+ return Result.error(code=400, message="只支持 PDF 格式的压缩包")
|
|
|
+
|
|
|
+ # 解析维度ID列表
|
|
|
+ try:
|
|
|
+ dim_ids = [int(x.strip()) for x in dimension_ids.split(",") if x.strip()]
|
|
|
+ except ValueError:
|
|
|
+ return Result.error(code=400, message="维度ID格式错误,应为逗号分隔的整数")
|
|
|
+
|
|
|
+ if not dim_ids:
|
|
|
+ return Result.error(code=400, message="维度ID列表不能为空")
|
|
|
+
|
|
|
+ # 保存文件到临时目录
|
|
|
+ file_content = await file.read()
|
|
|
+ with tempfile.NamedTemporaryFile(delete=False, suffix=".zip") as tmp_file:
|
|
|
+ tmp_file.write(file_content)
|
|
|
+ zip_path = tmp_file.name
|
|
|
+
|
|
|
+ logger.info(f"开始动态多维度解析: {file.filename}, 维度: {dim_ids}")
|
|
|
+
|
|
|
+ # 定义工作流执行函数
|
|
|
+ def run_dynamic_workflow():
|
|
|
+ try:
|
|
|
+ from src.datasets.parser.workflows.dynamic_dimension_workflow import DynamicDimensionWorkflow
|
|
|
+ from src.utils.file.image_util import image_util
|
|
|
+
|
|
|
+ # 解压图片并获取页面列表
|
|
|
+ image_urls = image_util.process_image_zip(zip_path, book_name)
|
|
|
+ image_pages = [
|
|
|
+ {"page_number": i + 1, "image_url": url}
|
|
|
+ for i, url in enumerate(image_urls)
|
|
|
+ ]
|
|
|
+
|
|
|
+ # 执行工作流
|
|
|
+ workflow = DynamicDimensionWorkflow()
|
|
|
+ result = workflow.run(
|
|
|
+ dimension_ids=dim_ids,
|
|
|
+ image_pages=image_pages,
|
|
|
+ split_pages=image_pages,
|
|
|
+ document_id=document_id or f"{book_name}_{dataset_id}",
|
|
|
+ dataset_id=dataset_id,
|
|
|
+ pdf_path=""
|
|
|
+ )
|
|
|
+ return result
|
|
|
+ finally:
|
|
|
+ # 清理临时文件
|
|
|
+ if os.path.exists(zip_path):
|
|
|
+ os.unlink(zip_path)
|
|
|
+
|
|
|
+ # 提交到任务队列
|
|
|
+ task_queue = get_task_queue()
|
|
|
+ task_id = await task_queue.submit(
|
|
|
+ name=f"动态解析-{file.filename}-维度{len(dim_ids)}个",
|
|
|
+ func=run_dynamic_workflow
|
|
|
+ )
|
|
|
+
|
|
|
+ queue_info = task_queue.get_queue_info()
|
|
|
+ return Result.success(data={
|
|
|
+ "task_id": task_id,
|
|
|
+ "dimension_ids": dim_ids,
|
|
|
+ "message": "任务已提交到队列",
|
|
|
+ "queue_info": queue_info
|
|
|
+ }, message="任务已提交")
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"动态解析提交失败: {str(e)}")
|
|
|
+ return Result.error(code=500, message=f"任务提交失败: {str(e)}")
|