158 lines
5.0 KiB
Python
158 lines
5.0 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
import jwt
|
|
from fastapi import Form, Request
|
|
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
|
|
from fastapi.security.utils import get_authorization_scheme_param
|
|
|
|
from app.core.exceptions import CustomException
|
|
from app.config.setting import settings
|
|
from app.api.v1.module_system.auth.schema import JWTPayloadSchema
|
|
|
|
|
|
class CustomOAuth2PasswordBearer(OAuth2PasswordBearer):
|
|
"""自定义OAuth2认证类,继承自OAuth2PasswordBearer"""
|
|
|
|
def __init__(
|
|
self,
|
|
token_url: str,
|
|
scheme_name: str | None = None,
|
|
scopes: dict[str, str] | None = None,
|
|
description: str | None = None,
|
|
auto_error: bool = True
|
|
) -> None:
|
|
super().__init__(
|
|
tokenUrl=token_url,
|
|
scheme_name=scheme_name,
|
|
scopes=scopes,
|
|
description=description,
|
|
auto_error=auto_error
|
|
)
|
|
|
|
async def __call__(self, request: Request) -> str | None:
|
|
"""
|
|
重写认证方法,校验token
|
|
|
|
参数:
|
|
- request (Request): FastAPI请求对象。
|
|
|
|
返回:
|
|
- str | None: 校验通过的token,如果校验失败则返回None。
|
|
|
|
异常:
|
|
- CustomException: 认证失败时抛出,状态码为401。
|
|
"""
|
|
authorization = request.headers.get("Authorization")
|
|
scheme, token = get_authorization_scheme_param(authorization)
|
|
|
|
if not authorization or scheme.lower() != settings.TOKEN_TYPE:
|
|
if self.auto_error:
|
|
raise CustomException(msg="认证失败,请登录后再试", code=10401, status_code=401)
|
|
return None
|
|
return token
|
|
|
|
|
|
class CustomOAuth2PasswordRequestForm(OAuth2PasswordRequestForm):
|
|
"""
|
|
自定义登录表单,扩展验证码等字段
|
|
|
|
参数:
|
|
- grant_type (str | None): 授权类型,默认值为None,正则表达式为'password'。
|
|
- scope (str): 作用域,默认值为空字符串。
|
|
- client_id (str | None): 客户端ID,默认值为None。
|
|
- client_secret (str | None): 客户端密钥,默认值为None。
|
|
- username (str): 用户名。
|
|
- password (str): 密码。
|
|
- captcha_key (str | None): 验证码键,默认值为空字符串。
|
|
- captcha (str | None): 验证码值,默认值为空字符串。
|
|
- login_type (str | None): 登录类型,默认值为"PC端",描述为"PC端 | 移动端"。
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
grant_type: str | None = Form(default=None, pattern='password'),
|
|
scope: str = Form(default=''),
|
|
client_id: str | None = Form(default=None),
|
|
client_secret: str | None = Form(default=None),
|
|
username: str = Form(),
|
|
password: str = Form(),
|
|
captcha_key: str | None = Form(default=""),
|
|
captcha: str | None = Form(default=""),
|
|
login_type: str | None = Form(default="PC端", description="PC端 | 移动端")
|
|
):
|
|
super().__init__(
|
|
grant_type=grant_type,
|
|
scope=scope,
|
|
client_id=client_id,
|
|
client_secret=client_secret,
|
|
username=username,
|
|
password=password,
|
|
)
|
|
self.captcha_key = captcha_key
|
|
self.captcha = captcha
|
|
self.login_type = login_type
|
|
|
|
|
|
# OAuth2认证配置
|
|
OAuth2Schema = CustomOAuth2PasswordBearer(
|
|
token_url="system/auth/login",
|
|
description="认证"
|
|
)
|
|
|
|
|
|
def create_access_token(payload: JWTPayloadSchema) -> str:
|
|
"""
|
|
生成JWT访问令牌
|
|
|
|
参数:
|
|
- payload (JWTPayloadSchema): JWT有效载荷,包含用户信息等。
|
|
|
|
返回:
|
|
- str: 生成的JWT访问令牌。
|
|
"""
|
|
payload_dict = payload.model_dump()
|
|
return jwt.encode(
|
|
payload=payload_dict,
|
|
key=settings.SECRET_KEY,
|
|
algorithm=settings.ALGORITHM
|
|
)
|
|
|
|
|
|
def decode_access_token(token: str) -> JWTPayloadSchema:
|
|
"""
|
|
解析JWT访问令牌
|
|
|
|
参数:
|
|
- token (str): JWT访问令牌字符串。
|
|
|
|
返回:
|
|
- JWTPayloadSchema: 解析后的JWT有效载荷,包含用户信息等。
|
|
|
|
异常:
|
|
- CustomException: 解析失败时抛出,状态码为401。
|
|
"""
|
|
if not token:
|
|
raise CustomException(msg="认证不存在,请重新登录", code=10401, status_code=401)
|
|
|
|
try:
|
|
payload = jwt.decode(
|
|
jwt=token,
|
|
key=settings.SECRET_KEY,
|
|
algorithms=[settings.ALGORITHM]
|
|
)
|
|
|
|
online_user_info = payload.get("sub")
|
|
if not online_user_info:
|
|
raise CustomException(msg="无效认证,请重新登录", code=10401, status_code=401)
|
|
|
|
return JWTPayloadSchema(**payload)
|
|
|
|
except (jwt.InvalidSignatureError, jwt.DecodeError):
|
|
raise CustomException(msg="无效认证,请重新登录", code=10401, status_code=401)
|
|
|
|
except jwt.ExpiredSignatureError:
|
|
raise CustomException(msg="认证已过期,请重新登录", code=10401, status_code=401)
|
|
|
|
except jwt.InvalidTokenError:
|
|
raise CustomException(msg="token已失效,请重新登录", code=10401, status_code=401)
|