#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ PDF解析工作流测试脚本 测试包含向量化入库的完整工作流 """ import os import sys # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.abspath(__file__))) from services.pdf_parser.workflow import PDFParsingWorkflow def test_pdf_parsing_workflow(): """ 测试PDF解析工作流,包括向量化入库 """ print("=== 测试PDF解析工作流 ===") # 测试参数 pdf_path = "test/sample.pdf" # 替换为实际的测试PDF路径 dataset_id = "test_dataset" ragflow_api_url = "http://localhost:8000/" # 替换为实际的RAGFLOW API URL rag_flow_api_key = "test_api_key" # 替换为实际的API密钥 try: # 检查测试PDF文件是否存在 if not os.path.exists(pdf_path): print(f"测试PDF文件不存在: {pdf_path}") print("请将测试PDF文件放置在指定位置") return False # 初始化工作流 workflow = PDFParsingWorkflow() print(f"工作流初始化成功") # 运行工作流 print(f"开始运行工作流,解析PDF: {pdf_path}") result = workflow.run( pdf_path=pdf_path, dataset_id=dataset_id, ragflow_api_url=ragflow_api_url, rag_flow_api_key=rag_flow_api_key ) # 打印结果 print(f"\n工作流运行完成") print(f"解析页面数量: {len(result.get('parsed_results', []))}") print(f"向量化页面数量: {result.get('vectorized_pages', 0)}") print(f"向量化结果数量: {len(result.get('vectorized_results', []))}") # 检查结果 if result.get('is_complete', False): print("\n✅ 工作流运行成功!") return True else: print("\n❌ 工作流运行失败!") return False except Exception as e: print(f"\n❌ 测试失败: {e}") import traceback traceback.print_exc() return False if __name__ == "__main__": test_pdf_parsing_workflow()