# 主应用入口,整合多个 FastAPI 应用 import uvicorn from fastapi import FastAPI from contextlib import asynccontextmanager from src.common.logging_config import get_logger # 获取日志器 logger = get_logger(__name__) from src.api.sdk.search_infinity import app as search_app from src.api.sdk.tag_manage import app as tag_app from src.api.sdk.dataset_manage import app as dataset_app from src.api.sdk.api_manage import app as api_manage_app from src.api.sdk.dify_dataset_manage import app as dify_dataset_manage_app from src.api.sdk.prompt_manage import app as prompt_manage_app # 导入认证中间件 from src.utils.auth import verify_api_key # 定义主应用的生命周期管理 @asynccontextmanager async def main_lifespan(app: FastAPI): """主应用生命周期管理""" from src.conf.settings import vector_db_settings from src.utils.vector_db import get_vector_db_client, close_vector_db_client # 1. 初始化向量数据库客户端(根据配置选择 Infinity 或 Elasticsearch) db_type = vector_db_settings.vector_db_type if db_type == "infinity": get_vector_db_client(database="book_image_db") logger.info("✅ Infinity向量数据库客户端已初始化") elif db_type == "es": get_vector_db_client() logger.info("✅ Elasticsearch向量数据库客户端已初始化") else: logger.warning(f"⚠️ 未知的向量数据库类型: {db_type}") # 2. 初始化MySQL全局客户端 from src.utils.mysql import init_global_mysql_client, close_global_mysql_client init_global_mysql_client() logger.info("✅ MySQL客户端已初始化") # 3. 初始化MinIO全局客户端并校验存储桶 from src.utils.file.minio.minio_util import init_minio_client, close_minio_client init_minio_client(check_bucket=True) logger.info("✅ MinIO客户端已初始化并校验存储桶") # 4. 初始化提示词维度对应的向量数据库表/索引 from src.api.db.services.prompt_service import get_prompt_service prompt_service = get_prompt_service() prompt_service.init_vector_db_tables() logger.info("✅ 提示词维度向量数据库表/索引已初始化") # 5. 启动Chunk更新定时任务 # from src.job.chunk_update_job import start_scheduler, shutdown_scheduler # start_scheduler() # logger.info("✅ Chunk update scheduler started") yield # 1. 关闭Chunk更新定时任务 # shutdown_scheduler() # logger.info("✅ Chunk update scheduler shutdown") # 2. 关闭全局线程池 from src.utils.async_utils import ThreadPoolManager ThreadPoolManager.shutdown_all() logger.info("✅ 全局线程池已关闭") # 3. 关闭MinIO全局客户端 close_minio_client() logger.info("✅ MinIO客户端已关闭") # 4. 关闭MySQL全局客户端 close_global_mysql_client() logger.info("✅ MySQL客户端已关闭") # 5. 关闭向量数据库客户端 close_vector_db_client() logger.info(f"✅ 向量数据库客户端已关闭 (类型: {db_type})") # 创建主应用 main_app = FastAPI( title="Infinity API Gateway", description="整合多个 FastAPI 应用的 API 网关", version="1.0.0", lifespan=main_lifespan ) # 添加认证中间件 main_app.middleware("http")(verify_api_key) # 挂载子应用 # 1. 搜索 API - 访问路径: /search/* main_app.mount("/search", search_app, name="search_api") # 2. 标签管理 API - 访问路径: /tag/* main_app.mount("/tag", tag_app, name="tag_api") # 3. 数据集管理 API - 访问路径: /dataset/* main_app.mount("/dataset", dataset_app, name="dataset_api") # 4. API 管理 - 访问路径: /api/* main_app.mount("/api", api_manage_app, name="api_manage") # 5. Dify 数据集管理 API - 访问路径: /dify_dataset/* main_app.mount("/dify_dataset", dify_dataset_manage_app, name="dify_dataset_manage") # 6. 提示词管理 API - 访问路径: /prompt/* main_app.mount("/prompt", prompt_manage_app, name="prompt_manage") from src.common.result import Result # 主应用根路径 @main_app.get("/") async def root(): """API 网关根路径""" data = { "message": "Welcome to GRAPH_RAG API Gateway", "available_apps": { "search_api": "访问路径: /search, 文档: /search/docs", "hybrid_http_api": "访问路径: /hybrid, 文档: /hybrid/docs", "tag_api": "访问路径: /tag, 文档: /tag/docs", "dataset_api": "访问路径: /dataset, 文档: /dataset/docs", "api_manage": "访问路径: /api, 文档: /api/docs" } } return Result.success(data=data, message="欢迎访问 GRAPH_RAG API Gateway") # 健康检查端点 @main_app.get("/health") async def health_check(): """主应用健康检查""" from src.utils.task_queue import get_task_queue queue_info = get_task_queue().get_queue_info() data = { "status": "healthy", "service": "Infinity API Gateway", "version": "2.0.0", "queue": queue_info } return Result.success(data=data, message="服务健康") if __name__ == "__main__": logger.info("=== 启动 GRAPH_RAG API Gateway ===") """启动主应用""" uvicorn.run( "main:main_app", # 应用路径: 模块名:应用实例名 host="0.0.0.0", # 允许所有IP访问 port=18001, # 服务端口 reload=False, # 开发模式下自动重载 workers=1, # 生产环境可根据需要增加 log_level="info", # 日志级别 limit_concurrency=100, # 并发连接限制 timeout_keep_alive=30, # 保持连接超时 timeout_graceful_shutdown=10 # 优雅关闭超时 )