| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150 |
- # 主应用入口,整合多个 FastAPI 应用
- import uvicorn
- from fastapi import FastAPI
- from contextlib import asynccontextmanager
- from src.common.logging_config import get_logger
- # 获取日志器
- logger = get_logger(__name__)
- from src.api.sdk.search_infinity import app as search_app
- from src.api.sdk.tag_manage import app as tag_app
- from src.api.sdk.dataset_manage import app as dataset_app
- from src.api.sdk.api_manage import app as api_manage_app
- from src.api.sdk.dify_dataset_manage import app as dify_dataset_manage_app
- from src.api.sdk.prompt_manage import app as prompt_manage_app
- # 导入认证中间件
- from src.utils.auth import verify_api_key
- # 定义主应用的生命周期管理
- @asynccontextmanager
- async def main_lifespan(app: FastAPI):
- """主应用生命周期管理"""
- from src.conf.settings import vector_db_settings
- from src.utils.vector_db import get_vector_db_client, close_vector_db_client
-
- # 1. 初始化向量数据库客户端(根据配置选择 Infinity 或 Elasticsearch)
- db_type = vector_db_settings.vector_db_type
- if db_type == "infinity":
- get_vector_db_client(database="book_image_db")
- logger.info("✅ Infinity向量数据库客户端已初始化")
- elif db_type == "es":
- get_vector_db_client()
- logger.info("✅ Elasticsearch向量数据库客户端已初始化")
- else:
- logger.warning(f"⚠️ 未知的向量数据库类型: {db_type}")
-
- # 2. 初始化MySQL全局客户端
- from src.utils.mysql import init_global_mysql_client, close_global_mysql_client
- init_global_mysql_client()
- logger.info("✅ MySQL客户端已初始化")
-
- # 3. 初始化MinIO全局客户端并校验存储桶
- from src.utils.file.minio.minio_util import init_minio_client, close_minio_client
- init_minio_client(check_bucket=True)
- logger.info("✅ MinIO客户端已初始化并校验存储桶")
- # 4. 初始化提示词维度对应的向量数据库表/索引
- from src.api.db.services.prompt_service import get_prompt_service
- prompt_service = get_prompt_service()
- prompt_service.init_vector_db_tables()
- logger.info("✅ 提示词维度向量数据库表/索引已初始化")
- # 5. 启动Chunk更新定时任务
- from src.job.chunk_update_job import start_scheduler, shutdown_scheduler
- start_scheduler()
- logger.info("✅ Chunk update scheduler started")
-
- yield
- # 1. 关闭Chunk更新定时任务
- shutdown_scheduler()
- logger.info("✅ Chunk update scheduler shutdown")
- # 2. 关闭MinIO全局客户端
- close_minio_client()
- logger.info("✅ MinIO客户端已关闭")
- # 3. 关闭MySQL全局客户端
- close_global_mysql_client()
- logger.info("✅ MySQL客户端已关闭")
- # 4. 关闭向量数据库客户端
- close_vector_db_client()
- logger.info(f"✅ 向量数据库客户端已关闭 (类型: {db_type})")
-
-
- # 创建主应用
- main_app = FastAPI(
- title="Infinity API Gateway",
- description="整合多个 FastAPI 应用的 API 网关",
- version="1.0.0",
- lifespan=main_lifespan
- )
- # 添加认证中间件
- main_app.middleware("http")(verify_api_key)
- # 挂载子应用
- # 1. 搜索 API - 访问路径: /search/*
- main_app.mount("/search", search_app, name="search_api")
- # 2. 标签管理 API - 访问路径: /tag/*
- main_app.mount("/tag", tag_app, name="tag_api")
- # 3. 数据集管理 API - 访问路径: /dataset/*
- main_app.mount("/dataset", dataset_app, name="dataset_api")
- # 4. API 管理 - 访问路径: /api/*
- main_app.mount("/api", api_manage_app, name="api_manage")
- # 5. Dify 数据集管理 API - 访问路径: /dify_dataset/*
- main_app.mount("/dify_dataset", dify_dataset_manage_app, name="dify_dataset_manage")
- # 6. 提示词管理 API - 访问路径: /prompt/*
- main_app.mount("/prompt", prompt_manage_app, name="prompt_manage")
- from src.common.result import Result
- # 主应用根路径
- @main_app.get("/")
- async def root():
- """API 网关根路径"""
- data = {
- "message": "Welcome to GRAPH_RAG API Gateway",
- "available_apps": {
- "search_api": "访问路径: /search, 文档: /search/docs",
- "hybrid_http_api": "访问路径: /hybrid, 文档: /hybrid/docs",
- "tag_api": "访问路径: /tag, 文档: /tag/docs",
- "dataset_api": "访问路径: /dataset, 文档: /dataset/docs",
- "api_manage": "访问路径: /api, 文档: /api/docs"
- }
- }
- return Result.success(data=data, message="欢迎访问 GRAPH_RAG API Gateway")
- # 健康检查端点
- @main_app.get("/health")
- async def health_check():
- """主应用健康检查"""
- from src.utils.task_queue import get_task_queue
- queue_info = get_task_queue().get_queue_info()
- data = {
- "status": "healthy",
- "service": "Infinity API Gateway",
- "version": "2.0.0",
- "queue": queue_info
- }
- return Result.success(data=data, message="服务健康")
- if __name__ == "__main__":
- logger.info("=== 启动 GRAPH_RAG API Gateway ===")
- """启动主应用"""
- uvicorn.run(
- "main:main_app", # 应用路径: 模块名:应用实例名
- host="0.0.0.0", # 允许所有IP访问
- port=18001, # 服务端口
- reload=False, # 开发模式下自动重载
- workers=1, # 生产环境可根据需要增加
- log_level="info", # 日志级别
- limit_concurrency=100, # 并发连接限制
- timeout_keep_alive=30, # 保持连接超时
- timeout_graceful_shutdown=10 # 优雅关闭超时
- )
|