259 lines
7.7 KiB
Python
259 lines
7.7 KiB
Python
# -*- coding: utf-8 -*-
|
||
|
||
import random
|
||
import mimetypes
|
||
from datetime import datetime
|
||
import aiofiles
|
||
from fastapi import UploadFile
|
||
from pathlib import Path
|
||
from urllib.parse import urljoin
|
||
|
||
from app.config.setting import settings
|
||
from app.core.exceptions import CustomException
|
||
from app.core.logger import log
|
||
|
||
class UploadUtil:
|
||
"""
|
||
上传工具类
|
||
"""
|
||
|
||
@staticmethod
|
||
def generate_random_number() -> str:
|
||
"""
|
||
生成3位随机数字字符串。
|
||
|
||
返回:
|
||
- str: 三位随机数字字符串。
|
||
"""
|
||
return f'{random.randint(1, 999):03}'
|
||
|
||
@staticmethod
|
||
def check_file_exists(filepath: str) -> bool:
|
||
"""
|
||
检查文件是否存在。
|
||
|
||
参数:
|
||
- filepath (str): 文件路径。
|
||
|
||
返回:
|
||
- bool: 文件是否存在。
|
||
"""
|
||
return Path(filepath).exists()
|
||
|
||
@staticmethod
|
||
def check_file_extension(file: UploadFile) -> bool:
|
||
"""
|
||
检查文件后缀是否合法。
|
||
|
||
参数:
|
||
- file (UploadFile): 上传的文件对象。
|
||
|
||
返回:
|
||
- bool: 文件后缀是否合法。
|
||
|
||
异常:
|
||
- CustomException: 文件类型不支持时抛出。
|
||
"""
|
||
if file.content_type and file.filename:
|
||
# 优先使用文件名的扩展名
|
||
file_extension = '.' + file.filename.rsplit('.', 1)[-1].lower() if '.' in file.filename else None
|
||
if file_extension and file_extension in settings.ALLOWED_EXTENSIONS:
|
||
return True
|
||
# 备用:使用content_type推断
|
||
guessed_ext = mimetypes.guess_extension(file.content_type)
|
||
if guessed_ext and guessed_ext in settings.ALLOWED_EXTENSIONS:
|
||
return True
|
||
raise CustomException(msg="文件类型不支持")
|
||
else:
|
||
raise CustomException(msg="文件类型不支持")
|
||
|
||
@staticmethod
|
||
def check_file_timestamp(filename: str) -> bool:
|
||
"""
|
||
校验文件时间戳是否合法。
|
||
|
||
参数:
|
||
- filename (str): 文件名(包含时间戳片段)。
|
||
|
||
返回:
|
||
- bool: 时间戳是否合法。
|
||
"""
|
||
try:
|
||
name_parts = filename.rsplit('.', 1)[0].split('_')
|
||
timestamp = name_parts[-1].split(settings.UPLOAD_MACHINE)[0]
|
||
datetime.strptime(timestamp, '%Y%m%d%H%M%S')
|
||
return True
|
||
except (ValueError, IndexError):
|
||
return False
|
||
|
||
@staticmethod
|
||
def check_file_machine(filename: str) -> bool:
|
||
"""
|
||
校验文件机器码是否合法。
|
||
|
||
参数:
|
||
- filename (str): 文件名。
|
||
|
||
返回:
|
||
- bool: 机器码是否合法。
|
||
"""
|
||
try:
|
||
name_without_ext = filename.rsplit('.', 1)[0]
|
||
return len(name_without_ext) >= 4 and name_without_ext[-4] == settings.UPLOAD_MACHINE
|
||
except IndexError:
|
||
return False
|
||
|
||
@staticmethod
|
||
def check_file_random_code(filename: str) -> bool:
|
||
"""
|
||
校验文件随机码是否合法。
|
||
|
||
参数:
|
||
- filename (str): 文件名。
|
||
|
||
返回:
|
||
- bool: 随机码是否合法(000–999)。
|
||
"""
|
||
try:
|
||
code = filename.rsplit('.', 1)[0][-3:]
|
||
return code.isdigit() and 1 <= int(code) <= 999
|
||
except IndexError:
|
||
return False
|
||
|
||
@staticmethod
|
||
def check_file_size(file: UploadFile) -> bool:
|
||
"""
|
||
校验文件大小是否合法。
|
||
|
||
参数:
|
||
- file (UploadFile): 上传的文件对象。
|
||
|
||
返回:
|
||
- bool: 文件大小是否合法(未提供 size 返回 False)。
|
||
"""
|
||
if file.size:
|
||
return file.size <= settings.MAX_FILE_SIZE
|
||
else:
|
||
return False
|
||
|
||
@classmethod
|
||
def generate_file_name(cls, filename: str) -> str:
|
||
"""
|
||
生成文件名称。
|
||
|
||
参数:
|
||
- filename (str): 原始文件名(包含拓展名)。
|
||
|
||
返回:
|
||
- str: 生成的文件名(包含时间戳、机器码、随机码)。
|
||
"""
|
||
name, ext = filename.rsplit(".", 1)
|
||
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
|
||
return f'{name}_{timestamp}{settings.UPLOAD_MACHINE}{cls.generate_random_number()}.{ext}'
|
||
|
||
@staticmethod
|
||
def generate_file(filepath: Path, chunk_size: int = 8192):
|
||
"""
|
||
根据文件生成二进制数据迭代器。
|
||
|
||
参数:
|
||
- filepath (Path): 文件路径。
|
||
- chunk_size (int): 分块大小,默认 8192 字节。
|
||
|
||
返回:
|
||
- Iterator[bytes]: 文件二进制数据分块迭代器。
|
||
"""
|
||
with filepath.open('rb') as f:
|
||
while chunk := f.read(chunk_size):
|
||
yield chunk
|
||
|
||
@staticmethod
|
||
def delete_file(filepath: Path) -> bool:
|
||
"""
|
||
删除文件。
|
||
|
||
参数:
|
||
- filepath (Path): 文件路径。
|
||
|
||
返回:
|
||
- bool: 删除是否成功。
|
||
"""
|
||
try:
|
||
filepath.unlink(missing_ok=True)
|
||
return True
|
||
except OSError:
|
||
return False
|
||
|
||
@classmethod
|
||
async def upload_file(cls, file: UploadFile, base_url: str) -> tuple[str, Path, str]:
|
||
"""
|
||
文件上传。
|
||
|
||
参数:
|
||
- file (UploadFile): 上传的文件对象。
|
||
- base_url (str): 基础 URL。
|
||
|
||
返回:
|
||
- tuple[str, Path, str]: (文件名, 文件路径, 文件 URL)。
|
||
|
||
异常:
|
||
- CustomException: 当文件类型不支持或大小超限时抛出。
|
||
"""
|
||
# 文件校验
|
||
if not all([cls.check_file_extension(file), cls.check_file_size(file)]):
|
||
raise CustomException(msg='文件类型或大小不合法')
|
||
|
||
try:
|
||
# 构建完整的目录路径
|
||
dir_path = settings.UPLOAD_FILE_PATH.joinpath(datetime.now().strftime("%Y/%m/%d"))
|
||
log.info(f"上传目录路径 dir_path is {dir_path}")
|
||
dir_path.mkdir(parents=True, exist_ok=True)
|
||
|
||
filename = ""
|
||
# 生成文件名并保存
|
||
if file.filename:
|
||
filename = cls.generate_file_name(file.filename)
|
||
filepath = dir_path.joinpath(filename)
|
||
file_url = urljoin(base_url, filepath.as_posix())
|
||
# filepath.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 分块写入文件
|
||
chunk_size = 8 * 1024 * 1024 # 8MB chunks
|
||
async with aiofiles.open(filepath, 'wb') as f:
|
||
while chunk := await file.read(chunk_size):
|
||
await f.write(chunk)
|
||
|
||
# 返回相对路径
|
||
return filename, filepath, file_url
|
||
|
||
except Exception as e:
|
||
log.error(f"文件上传失败: {e}")
|
||
raise CustomException(msg='文件上传失败')
|
||
|
||
@staticmethod
|
||
def get_file_tree(file_path: str) -> list[dict]:
|
||
"""
|
||
获取文件树结构。
|
||
|
||
参数:
|
||
- file_path (str): 文件路径。
|
||
|
||
返回:
|
||
- list[dict]: 文件树列表。
|
||
"""
|
||
return [{"name": item.name, "is_dir": item.is_dir()} for item in Path(file_path).iterdir()]
|
||
|
||
@classmethod
|
||
async def download_file(cls, file_path: str) -> str:
|
||
"""
|
||
下载文件,生成新的文件名。
|
||
|
||
参数:
|
||
- file_path (str): 文件路径。
|
||
|
||
返回:
|
||
- str: 文件下载信息。
|
||
"""
|
||
filename = cls.generate_file(Path(file_path))
|
||
return str(filename)
|
||
|