Files

133 lines
4.4 KiB
Python

# -*- coding: utf-8 -*-
from app.core.base_schema import BatchSetAvailable
from app.core.exceptions import CustomException
from app.api.v1.module_system.auth.schema import AuthSchema
from .schema import (
ApplicationCreateSchema,
ApplicationUpdateSchema,
ApplicationOutSchema,
ApplicationQueryParam
)
from .crud import ApplicationCRUD
class ApplicationService:
"""
应用系统管理服务层
"""
@classmethod
async def detail_service(cls, auth: AuthSchema, id: int) -> dict:
"""
获取应用详情
参数:
- auth (AuthSchema): 认证信息模型
- id (int): 应用ID
返回:
- dict: 应用详情字典
"""
obj = await ApplicationCRUD(auth).get_by_id_crud(id=id)
if not obj:
raise CustomException(msg='应用不存在')
return ApplicationOutSchema.model_validate(obj).model_dump()
@classmethod
async def list_service(cls, auth: AuthSchema, search: ApplicationQueryParam | None = None, order_by: list[dict[str, str]] | None = None) -> list[dict]:
"""
获取应用列表
参数:
- auth (AuthSchema): 认证信息模型
- search (ApplicationQueryParam | None): 查询参数模型
- order_by (list[dict[str, str]] | None): 排序参数,支持字符串或字典列表
返回:
- list[dict]: 应用详情字典列表
"""
# 过滤空值
search_dict = search.__dict__ if search else None
obj_list = await ApplicationCRUD(auth).list_crud(search=search_dict, order_by=order_by)
return [ApplicationOutSchema.model_validate(obj).model_dump() for obj in obj_list]
@classmethod
async def create_service(cls, auth: AuthSchema, data: ApplicationCreateSchema) -> dict:
"""
创建应用
参数:
- auth (AuthSchema): 认证信息模型
- data (ApplicationCreateSchema): 应用创建模型
返回:
- Dict: 应用详情字典
"""
# 检查名称是否重复
obj = await ApplicationCRUD(auth).get(name=data.name)
if obj:
raise CustomException(msg='创建失败,应用名称已存在')
obj = await ApplicationCRUD(auth).create_crud(data=data)
return ApplicationOutSchema.model_validate(obj).model_dump()
@classmethod
async def update_service(cls, auth: AuthSchema, id: int, data: ApplicationUpdateSchema) -> dict:
"""
更新应用
参数:
- auth (AuthSchema): 认证信息模型
- id (int): 应用ID
- data (ApplicationUpdateSchema): 应用更新模型
返回:
- Dict: 应用详情字典
"""
obj = await ApplicationCRUD(auth).get_by_id_crud(id=id)
if not obj:
raise CustomException(msg='更新失败,该应用不存在')
# 检查名称重复
exist_obj = await ApplicationCRUD(auth).get(name=data.name)
if exist_obj and exist_obj.id != id:
raise CustomException(msg='更新失败,应用名称重复')
obj = await ApplicationCRUD(auth).update_crud(id=id, data=data)
return ApplicationOutSchema.model_validate(obj).model_dump()
@classmethod
async def delete_service(cls, auth: AuthSchema, ids: list[int]) -> None:
"""
删除应用
参数:
- auth (AuthSchema): 认证信息模型
- ids (list[int]): 应用ID列表
返回:
- None
"""
if len(ids) < 1:
raise CustomException(msg='删除失败,删除对象不能为空')
for id in ids:
obj = await ApplicationCRUD(auth).get_by_id_crud(id=id)
if not obj:
raise CustomException(msg=f'删除失败,应用 {id} 不存在')
await ApplicationCRUD(auth).delete_crud(ids=ids)
@classmethod
async def set_available_service(cls, auth: AuthSchema, data: BatchSetAvailable) -> None:
"""
批量设置应用状态
参数:
- auth (AuthSchema): 认证信息模型
- data (BatchSetAvailable): 批量设置应用状态模型
返回:
- None
"""
await ApplicationCRUD(auth).set_available_crud(ids=data.ids, status=data.status)