Files

312 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import io
from typing import Any
from fastapi import UploadFile
import pandas as pd
from app.core.base_schema import BatchSetAvailable
from app.core.exceptions import CustomException
from app.utils.excel_util import ExcelUtil
from app.core.logger import log
from app.api.v1.module_system.auth.schema import AuthSchema
from .schema import DemoCreateSchema, DemoUpdateSchema, DemoOutSchema, DemoQueryParam
from .crud import DemoCRUD
class DemoService:
"""
示例管理模块服务层
"""
@classmethod
async def detail_service(cls, auth: AuthSchema, id: int) -> dict:
"""
详情
参数:
- auth (AuthSchema): 认证信息模型
- id (int): 示例ID
返回:
- dict: 示例模型实例字典
"""
obj = await DemoCRUD(auth).get_by_id_crud(id=id)
if not obj:
raise CustomException(msg="该数据不存在")
return DemoOutSchema.model_validate(obj).model_dump()
@classmethod
async def list_service(cls, auth: AuthSchema, search: DemoQueryParam | None = None, order_by: list[dict[str, str]] | None = None) -> list[dict]:
"""
列表查询
参数:
- auth (AuthSchema): 认证信息模型
- search (DemoQueryParam | None): 查询参数
- order_by (list[dict[str, str]] | None): 排序参数
返回:
- list[dict]: 示例模型实例字典列表
"""
search_dict = search.__dict__ if search else None
obj_list = await DemoCRUD(auth).list_crud(search=search_dict, order_by=order_by)
return [DemoOutSchema.model_validate(obj).model_dump() for obj in obj_list]
@classmethod
async def page_service(cls, auth: AuthSchema, page_no: int, page_size: int, search: DemoQueryParam | None = None, order_by: list[dict[str, str]] | None = None) -> dict:
"""
分页查询
参数:
- auth (AuthSchema): 认证信息模型
- page_no (int): 页码
- page_size (int): 每页数量
- search (DemoQueryParam | None): 查询参数
- order_by (list[dict[str, str]] | None): 排序参数
返回:
- dict: 分页数据
"""
search_dict = search.__dict__ if search else {}
order_by_list = order_by or [{'id': 'asc'}]
offset = (page_no - 1) * page_size
result = await DemoCRUD(auth).page_crud(
offset=offset,
limit=page_size,
order_by=order_by_list,
search=search_dict
)
return result
@classmethod
async def create_service(cls, auth: AuthSchema, data: DemoCreateSchema) -> dict:
"""
创建
参数:
- auth (AuthSchema): 认证信息模型
- data (DemoCreateSchema): 示例创建模型
返回:
- dict: 示例模型实例字典
"""
obj = await DemoCRUD(auth).get(name=data.name)
if obj:
raise CustomException(msg='创建失败,名称已存在')
obj = await DemoCRUD(auth).create_crud(data=data)
return DemoOutSchema.model_validate(obj).model_dump()
@classmethod
async def update_service(cls, auth: AuthSchema, id: int, data: DemoUpdateSchema) -> dict:
"""
更新
参数:
- auth (AuthSchema): 认证信息模型
- id (int): 示例ID
- data (DemoUpdateSchema): 示例更新模型
返回:
- dict: 示例模型实例字典
"""
# 检查数据是否存在
obj = await DemoCRUD(auth).get_by_id_crud(id=id)
if not obj:
raise CustomException(msg='更新失败,该数据不存在')
# 检查名称是否重复
exist_obj = await DemoCRUD(auth).get(name=data.name)
if exist_obj and exist_obj.id != id:
raise CustomException(msg='更新失败,名称重复')
obj = await DemoCRUD(auth).update_crud(id=id, data=data)
return DemoOutSchema.model_validate(obj).model_dump()
@classmethod
async def delete_service(cls, auth: AuthSchema, ids: list[int]) -> None:
"""
删除
参数:
- auth (AuthSchema): 认证信息模型
- ids (list[int]): 示例ID列表
返回:
- None
"""
if len(ids) < 1:
raise CustomException(msg='删除失败,删除对象不能为空')
# 检查所有要删除的数据是否存在
for id in ids:
obj = await DemoCRUD(auth).get_by_id_crud(id=id)
if not obj:
raise CustomException(msg=f'删除失败ID为{id}的数据不存在')
await DemoCRUD(auth).delete_crud(ids=ids)
@classmethod
async def set_available_service(cls, auth: AuthSchema, data: BatchSetAvailable) -> None:
"""
批量设置状态
参数:
- auth (AuthSchema): 认证信息模型
- data (BatchSetAvailable): 批量设置状态模型
返回:
- None
"""
await DemoCRUD(auth).set_available_crud(ids=data.ids, status=data.status)
@classmethod
async def batch_export_service(cls, obj_list: list[dict[str, Any]]) -> bytes:
"""
批量导出
参数:
- obj_list (list[dict[str, Any]]): 示例模型实例字典列表
返回:
- bytes: Excel文件字节流
"""
mapping_dict = {
'id': '编号',
'name': '名称',
'status': '状态',
'description': '备注',
'created_time': '创建时间',
'updated_time': '更新时间',
'created_id': '创建者',
}
# 复制数据并转换状态
data = obj_list.copy()
for item in data:
# 处理状态
item['status'] = '启用' if item.get('status') == '0' else '停用'
# 处理创建者
creator_info = item.get('created_id')
if isinstance(creator_info, dict):
item['created_id'] = creator_info.get('name', '未知')
else:
item['created_id'] = '未知'
return ExcelUtil.export_list2excel(list_data=data, mapping_dict=mapping_dict)
@classmethod
async def batch_import_service(cls, auth: AuthSchema, file: UploadFile, update_support: bool = False) -> str:
"""
批量导入
参数:
- auth (AuthSchema): 认证信息模型
- file (UploadFile): 上传的Excel文件
- update_support (bool): 是否支持更新存在数据
返回:
- str: 导入结果信息
"""
header_dict = {
'名称': 'name',
'状态': 'status',
'描述': 'description'
}
try:
# 读取Excel文件
contents = await file.read()
df = pd.read_excel(io.BytesIO(contents))
await file.close()
if df.empty:
raise CustomException(msg="导入文件为空")
# 检查表头是否完整
missing_headers = [header for header in header_dict.keys() if header not in df.columns]
if missing_headers:
raise CustomException(msg=f"导入文件缺少必要的列: {', '.join(missing_headers)}")
# 重命名列名
df.rename(columns=header_dict, inplace=True)
# 验证必填字段
required_fields = ['name', 'status']
errors = []
for field in required_fields:
missing_rows = df[df[field].isnull()].index.tolist()
if missing_rows:
field_name = [k for k,v in header_dict.items() if v == field][0]
rows_str = "".join([str(i+1) for i in missing_rows])
errors.append(f"{field_name}不能为空,第{rows_str}")
if errors:
raise CustomException(msg=f"导入失败,以下行缺少必要字段:\n{'; '.join(errors)}")
error_msgs = []
success_count = 0
count = 0
# 处理每一行数据
for index, row in df.iterrows():
count += 1
try:
# 数据转换前的类型检查
try:
status = True if row['status'] == '正常' else False
except ValueError:
error_msgs.append(f"{count}行: 状态必须是'正常''停用'")
continue
# 构建用户数据
data = {
"name": str(row['name']),
"status": status,
"description": str(row['description']),
}
# 处理用户导入
exists_obj = await DemoCRUD(auth).get(name=data["name"])
if exists_obj:
if update_support:
await DemoCRUD(auth).update(id=exists_obj.id, data=data)
success_count += 1
else:
error_msgs.append(f"{count}行: 对象 {data['name']} 已存在")
else:
await DemoCRUD(auth).create(data=data)
success_count += 1
except Exception as e:
error_msgs.append(f"{count}行: {str(e)}")
continue
# 返回详细的导入结果
result = f"成功导入 {success_count} 条数据"
if error_msgs:
result += "\n错误信息:\n" + "\n".join(error_msgs)
return result
except Exception as e:
log.error(f"批量导入用户失败: {str(e)}")
raise CustomException(msg=f"导入失败: {str(e)}")
@classmethod
async def import_template_download_service(cls) -> bytes:
"""
下载导入模板
返回:
- bytes: Excel文件字节流
"""
header_list = ['名称', '状态', '描述']
selector_header_list = ['状态']
option_list = [{'状态': ['正常', '停用']}]
return ExcelUtil.get_excel_template(
header_list=header_list,
selector_header_list=selector_header_list,
option_list=option_list
)