Files

218 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
from starlette.responses import HTMLResponse
from typing import Any, AsyncGenerator
from fastapi import Depends, FastAPI, Request, Response
from fastapi.staticfiles import StaticFiles
from fastapi.concurrency import asynccontextmanager
from fastapi.openapi.docs import (
get_redoc_html,
get_swagger_ui_html,
get_swagger_ui_oauth2_redirect_html
)
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
from math import ceil
from app.config.setting import settings
from app.core.logger import log
from app.core.discover import router
from app.core.exceptions import CustomException, handle_exception
from app.utils.common_util import import_module, import_modules_async
from app.scripts.initialize import InitializeData
from app.core.database import redis_connect
from app.api.v1.module_application.job.tools.ap_scheduler import SchedulerUtil
from app.api.v1.module_system.params.service import ParamsService
from app.api.v1.module_system.dict.service import DictDataService
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[Any, Any]:
"""
自定义 FastAPI 应用生命周期。
参数:
- app (FastAPI): FastAPI 应用实例。
返回:
- AsyncGenerator[Any, Any]: 生命周期上下文生成器。
"""
try:
await InitializeData().init_db()
log.info(f"{settings.DATABASE_TYPE}数据库初始化完成")
# 初始化Redis连接如果启用
if settings.REDIS_ENABLE:
await redis_connect(app=app, status=True)
log.info("✅ Redis连接初始化完成")
await import_modules_async(modules=settings.EVENT_LIST, desc="全局事件", app=app, status=True)
log.info("✅ 全局事件模块加载完成")
# 只有在Redis启用时才初始化Redis相关的服务
if settings.REDIS_ENABLE:
if not hasattr(app.state, 'redis') or app.state.redis is None:
raise CustomException(msg="Redis未正确初始化", data="请检查Redis配置和连接")
await ParamsService().init_config_service(redis=app.state.redis)
log.info("✅ Redis系统配置初始化完成")
await DictDataService().init_dict_service(redis=app.state.redis)
log.info("✅ Redis数据字典初始化完成")
# 初始化请求限制器
await FastAPILimiter.init(
redis=app.state.redis,
prefix=settings.REQUEST_LIMITER_REDIS_PREFIX,
http_callback=http_limit_callback,
)
log.info("✅ 请求限制器初始化完成")
await SchedulerUtil.init_system_scheduler()
scheduler_jobs_count = len(SchedulerUtil.get_all_jobs())
scheduler_status = SchedulerUtil.get_job_status()
log.info(f"✅ 定时任务调度器初始化完成 ({scheduler_jobs_count} 个任务)")
# 导入并显示最终的启动信息面板
from app.utils.console import run as console_run
from app.common.enums import EnvironmentEnum
console_run(
host=settings.SERVER_HOST,
port=settings.SERVER_PORT,
reload=True if settings.ENVIRONMENT == EnvironmentEnum.DEV else False,
redis_ready=settings.REDIS_ENABLE and hasattr(app.state, 'redis') and app.state.redis is not None,
scheduler_jobs=scheduler_jobs_count,
scheduler_status=scheduler_status,
)
except Exception as e:
log.error(f"❌ 应用初始化失败: {str(e)}")
raise
yield
try:
await import_modules_async(modules=settings.EVENT_LIST, desc="全局事件", app=app, status=False)
log.info("✅ 全局事件模块卸载完成")
await SchedulerUtil.close_system_scheduler()
log.info("✅ 定时任务调度器已关闭")
# 只有在Redis启用时才关闭请求限制器
if settings.REDIS_ENABLE:
await FastAPILimiter.close()
log.info("✅ 请求限制器已关闭")
except Exception as e:
log.error(f"❌ 应用关闭过程中发生错误: {str(e)}")
def register_middlewares(app: FastAPI) -> None:
"""
注册全局中间件。
参数:
- app (FastAPI): FastAPI 应用实例。
返回:
- None
"""
for middleware in settings.MIDDLEWARE_LIST[::-1]:
if not middleware:
continue
middleware = import_module(middleware, desc="中间件")
app.add_middleware(middleware)
def register_exceptions(app: FastAPI) -> None:
"""
统一注册异常处理器。
参数:
- app (FastAPI): FastAPI 应用实例。
返回:
- None
"""
handle_exception(app)
def register_routers(app: FastAPI) -> None:
"""
注册根路由。
参数:
- app (FastAPI): FastAPI 应用实例。
返回:
- None
"""
# 只有在Redis启用时才使用限流器
if settings.REDIS_ENABLE:
app.include_router(router=router, dependencies=[Depends(RateLimiter(times=5, seconds=10))])
else:
app.include_router(router=router)
def register_files(app: FastAPI) -> None:
"""
注册静态资源挂载和文件相关配置。
参数:
- app (FastAPI): FastAPI 应用实例。
返回:
- None
"""
# 挂载静态文件目录
if settings.STATIC_ENABLE:
# 确保日志目录存在
settings.STATIC_ROOT.mkdir(parents=True, exist_ok=True)
app.mount(path=settings.STATIC_URL, app=StaticFiles(directory=settings.STATIC_ROOT), name=settings.STATIC_DIR)
def reset_api_docs(app: FastAPI) -> None:
"""
使用本地静态资源自定义 API 文档页面Swagger UI 与 ReDoc
参数:
- app (FastAPI): FastAPI 应用实例。
返回:
- None
"""
@app.get(settings.DOCS_URL, include_in_schema=False)
async def custom_swagger_ui_html() -> HTMLResponse:
return get_swagger_ui_html(
openapi_url=str(app.root_path) + str(app.openapi_url),
title=app.title + " - Swagger UI",
oauth2_redirect_url=app.swagger_ui_oauth2_redirect_url,
swagger_js_url=settings.SWAGGER_JS_URL,
swagger_css_url=settings.SWAGGER_CSS_URL,
swagger_favicon_url=settings.FAVICON_URL,
)
@app.get(str(app.swagger_ui_oauth2_redirect_url), include_in_schema=False)
async def swagger_ui_redirect():
return get_swagger_ui_oauth2_redirect_html()
@app.get(settings.REDOC_URL, include_in_schema=False)
async def custom_redoc_html():
return get_redoc_html(
openapi_url=str(app.root_path) + str(app.openapi_url),
title=app.title + " - ReDoc",
redoc_js_url=settings.REDOC_JS_URL,
redoc_favicon_url=settings.FAVICON_URL,
)
async def http_limit_callback(request: Request, response: Response, expire: int):
"""
请求限制时的默认回调函数
:param request: FastAPI 请求对象
:param response: FastAPI 响应对象
:param expire: 剩余毫秒数
:return:
"""
expires = ceil(expire / 30)
raise CustomException(
status_code=429,
msg='请求过于频繁,请稍后重试',
data={'Retry-After': str(expires)},
)