upload project source code
This commit is contained in:
@@ -0,0 +1 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
@@ -0,0 +1,280 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from fastapi import APIRouter, Body, Depends, Query, Request, UploadFile, Form
|
||||
from fastapi.responses import JSONResponse, StreamingResponse, FileResponse
|
||||
|
||||
from app.common.response import StreamResponse, SuccessResponse
|
||||
from app.common.request import PaginationService
|
||||
from app.core.router_class import OperationLogRoute
|
||||
from app.utils.common_util import bytes2file_response
|
||||
from app.core.base_params import PaginationQueryParam
|
||||
from app.core.dependencies import AuthPermission
|
||||
from app.core.logger import log
|
||||
|
||||
from .service import ResourceService
|
||||
from .schema import (
|
||||
ResourceMoveSchema,
|
||||
ResourceCopySchema,
|
||||
ResourceRenameSchema,
|
||||
ResourceCreateDirSchema,
|
||||
ResourceSearchQueryParam
|
||||
)
|
||||
|
||||
|
||||
ResourceRouter = APIRouter(route_class=OperationLogRoute, prefix="/resource", tags=["资源管理"])
|
||||
|
||||
@ResourceRouter.get(
|
||||
"/list",
|
||||
summary="获取目录列表",
|
||||
description="获取指定目录下的文件和子目录列表",
|
||||
dependencies=[Depends(AuthPermission(["module_monitor:resource:query"]))]
|
||||
)
|
||||
async def get_directory_list_controller(
|
||||
request: Request,
|
||||
page: PaginationQueryParam = Depends(),
|
||||
search: ResourceSearchQueryParam = Depends(),
|
||||
) -> JSONResponse:
|
||||
"""
|
||||
获取目录列表
|
||||
|
||||
参数:
|
||||
- request (Request): FastAPI请求对象,用于获取基础URL。
|
||||
- page (PaginationQueryParam): 分页查询参数模型。
|
||||
- search (ResourceSearchQueryParam): 资源查询参数模型。
|
||||
|
||||
返回:
|
||||
- JSONResponse: 包含目录列表的JSON响应。
|
||||
"""
|
||||
# 获取资源列表(与案例模块保持一致的分页实现)
|
||||
result_dict_list = await ResourceService.get_resources_list_service(
|
||||
search=search,
|
||||
base_url=str(request.base_url)
|
||||
)
|
||||
# 使用分页服务进行分页处理(与案例模块保持一致)
|
||||
result_dict = await PaginationService.paginate(
|
||||
data_list=result_dict_list,
|
||||
page_no=page.page_no,
|
||||
page_size=page.page_size
|
||||
)
|
||||
|
||||
log.info(f"获取目录列表成功: {getattr(search, 'name', None) or ''}")
|
||||
return SuccessResponse(data=result_dict, msg="获取目录列表成功")
|
||||
|
||||
|
||||
@ResourceRouter.post(
|
||||
"/upload",
|
||||
summary="上传文件",
|
||||
description="上传文件到指定目录",
|
||||
dependencies=[Depends(AuthPermission(["module_monitor:resource:upload"]))])
|
||||
async def upload_file_controller(
|
||||
file: UploadFile,
|
||||
request: Request,
|
||||
target_path: str | None = Form(None, description="目标目录路径")
|
||||
) -> JSONResponse:
|
||||
"""
|
||||
上传文件
|
||||
|
||||
参数:
|
||||
- file (UploadFile): 要上传的文件对象。
|
||||
- request (Request): FastAPI请求对象,用于获取基础URL。
|
||||
- target_path (str | None): 目标目录路径,默认为None。
|
||||
|
||||
返回:
|
||||
- JSONResponse: 包含上传文件信息的JSON响应。
|
||||
"""
|
||||
result_dict = await ResourceService.upload_file_service(
|
||||
file=file,
|
||||
target_path=target_path,
|
||||
base_url=str(request.base_url)
|
||||
)
|
||||
log.info(f"上传文件成功: {result_dict['filename']}")
|
||||
return SuccessResponse(data=result_dict, msg="上传文件成功")
|
||||
|
||||
|
||||
@ResourceRouter.get(
|
||||
"/download",
|
||||
summary="下载文件",
|
||||
description="下载指定文件",
|
||||
dependencies=[Depends(AuthPermission(["module_monitor:resource:download"]))]
|
||||
)
|
||||
async def download_file_controller(
|
||||
request: Request,
|
||||
path: str = Query(..., description="文件路径")
|
||||
) -> FileResponse:
|
||||
"""
|
||||
下载文件
|
||||
|
||||
参数:
|
||||
- request (Request): FastAPI请求对象,用于获取基础URL。
|
||||
- path (str): 文件路径。
|
||||
|
||||
返回:
|
||||
- FileResponse: 包含文件内容的文件响应。
|
||||
"""
|
||||
file_path = await ResourceService.download_file_service(
|
||||
file_path=path,
|
||||
base_url=str(request.base_url)
|
||||
)
|
||||
|
||||
# 获取文件名
|
||||
import os
|
||||
filename = os.path.basename(file_path)
|
||||
|
||||
log.info(f"下载文件成功: {filename}")
|
||||
return FileResponse(
|
||||
path=file_path,
|
||||
filename=filename,
|
||||
media_type='application/octet-stream'
|
||||
)
|
||||
|
||||
|
||||
@ResourceRouter.delete(
|
||||
"/delete",
|
||||
summary="删除文件",
|
||||
description="删除指定文件或目录",
|
||||
dependencies=[Depends(AuthPermission(["module_monitor:resource:delete"]))]
|
||||
)
|
||||
async def delete_files_controller(
|
||||
paths: list[str] = Body(..., description="文件路径列表")
|
||||
) -> JSONResponse:
|
||||
"""
|
||||
删除文件
|
||||
|
||||
参数:
|
||||
- paths (list[str]): 文件路径列表。
|
||||
|
||||
返回:
|
||||
- JSONResponse: 包含删除结果的JSON响应。
|
||||
"""
|
||||
await ResourceService.delete_file_service(paths=paths)
|
||||
log.info(f"删除文件成功: {paths}")
|
||||
return SuccessResponse(msg="删除文件成功")
|
||||
|
||||
|
||||
@ResourceRouter.post(
|
||||
"/move",
|
||||
summary="移动文件",
|
||||
description="移动文件或目录",
|
||||
dependencies=[Depends(AuthPermission(["module_monitor:resource:move"]))]
|
||||
)
|
||||
async def move_file_controller(
|
||||
data: ResourceMoveSchema
|
||||
) -> JSONResponse:
|
||||
"""
|
||||
移动文件
|
||||
|
||||
参数:
|
||||
- data (ResourceMoveSchema): 移动文件参数模型。
|
||||
|
||||
返回:
|
||||
- JSONResponse: 包含移动结果的JSON响应。
|
||||
"""
|
||||
await ResourceService.move_file_service(data=data)
|
||||
log.info(f"移动文件成功: {data.source_path} -> {data.target_path}")
|
||||
return SuccessResponse(msg="移动文件成功")
|
||||
|
||||
|
||||
@ResourceRouter.post(
|
||||
"/copy",
|
||||
summary="复制文件",
|
||||
description="复制文件或目录",
|
||||
dependencies=[Depends(AuthPermission(["module_monitor:resource:copy"]))]
|
||||
)
|
||||
async def copy_file_controller(
|
||||
data: ResourceCopySchema
|
||||
) -> JSONResponse:
|
||||
"""
|
||||
复制文件
|
||||
|
||||
参数:
|
||||
- data (ResourceCopySchema): 复制文件参数模型。
|
||||
|
||||
返回:
|
||||
- JSONResponse: 包含复制结果的JSON响应。
|
||||
"""
|
||||
await ResourceService.copy_file_service(data=data)
|
||||
log.info(f"复制文件成功: {data.source_path} -> {data.target_path}")
|
||||
return SuccessResponse(msg="复制文件成功")
|
||||
|
||||
|
||||
@ResourceRouter.post(
|
||||
"/rename",
|
||||
summary="重命名文件",
|
||||
description="重命名文件或目录",
|
||||
dependencies=[Depends(AuthPermission(["module_monitor:resource:rename"]))]
|
||||
)
|
||||
async def rename_file_controller(
|
||||
data: ResourceRenameSchema
|
||||
) -> JSONResponse:
|
||||
"""
|
||||
重命名文件
|
||||
|
||||
参数:
|
||||
- data (ResourceRenameSchema): 重命名文件参数模型。
|
||||
|
||||
返回:
|
||||
- JSONResponse: 包含重命名结果的JSON响应。
|
||||
"""
|
||||
await ResourceService.rename_file_service(data=data)
|
||||
log.info(f"重命名文件成功: {data.old_path} -> {data.new_name}")
|
||||
return SuccessResponse(msg="重命名文件成功")
|
||||
|
||||
|
||||
@ResourceRouter.post(
|
||||
"/create-dir",
|
||||
summary="创建目录",
|
||||
description="在指定路径创建新目录",
|
||||
dependencies=[Depends(AuthPermission(["module_monitor:resource:create_dir"]))]
|
||||
)
|
||||
async def create_directory_controller(
|
||||
data: ResourceCreateDirSchema
|
||||
) -> JSONResponse:
|
||||
"""
|
||||
创建目录
|
||||
|
||||
参数:
|
||||
- data (ResourceCreateDirSchema): 创建目录参数模型。
|
||||
|
||||
返回:
|
||||
- JSONResponse: 包含创建目录结果的JSON响应。
|
||||
"""
|
||||
await ResourceService.create_directory_service(data=data)
|
||||
log.info(f"创建目录成功: {data.parent_path}/{data.dir_name}")
|
||||
return SuccessResponse(msg="创建目录成功")
|
||||
|
||||
|
||||
@ResourceRouter.post(
|
||||
"/export",
|
||||
summary="导出资源列表",
|
||||
description="导出资源列表",
|
||||
dependencies=[Depends(AuthPermission(["module_monitor:resource:export"]))]
|
||||
)
|
||||
async def export_resource_list_controller(
|
||||
request: Request,
|
||||
search: ResourceSearchQueryParam = Depends()
|
||||
) -> StreamingResponse:
|
||||
"""
|
||||
导出资源列表
|
||||
|
||||
参数:
|
||||
- request (Request): FastAPI请求对象,用于获取基础URL。
|
||||
- search (ResourceSearchQueryParam): 资源查询参数模型。
|
||||
|
||||
返回:
|
||||
- StreamingResponse: 包含导出资源列表的流式响应。
|
||||
"""
|
||||
# 获取搜索结果
|
||||
result_dict_list = await ResourceService.get_resources_list_service(
|
||||
search=search,
|
||||
base_url=str(request.base_url)
|
||||
)
|
||||
export_result = await ResourceService.export_resource_service(data_list=result_dict_list)
|
||||
|
||||
log.info("导出资源列表成功")
|
||||
return StreamResponse(
|
||||
data=bytes2file_response(export_result),
|
||||
media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
|
||||
headers={
|
||||
'Content-Disposition': 'attachment; filename=resource_list.xlsx'
|
||||
}
|
||||
)
|
||||
@@ -0,0 +1,156 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from datetime import datetime
|
||||
from pydantic import BaseModel, ConfigDict, Field, field_validator, model_validator
|
||||
from urllib.parse import urlparse
|
||||
from fastapi import Query
|
||||
|
||||
|
||||
class ResourceItemSchema(BaseModel):
|
||||
"""资源项目模型"""
|
||||
model_config = ConfigDict(from_attributes=True)
|
||||
|
||||
name: str = Field(..., description="文件名")
|
||||
file_url: str = Field(..., description="文件URL路径")
|
||||
relative_path: str = Field(..., description="相对路径")
|
||||
is_file: bool = Field(..., description="是否为文件")
|
||||
is_dir: bool = Field(..., description="是否为目录")
|
||||
size: int | None = Field(None, description="文件大小(字节)")
|
||||
created_time: datetime | None = Field(None, description="创建时间")
|
||||
modified_time: datetime | None = Field(None, description="修改时间")
|
||||
is_hidden: bool = Field(False, description="是否为隐藏文件")
|
||||
|
||||
@field_validator('file_url')
|
||||
@classmethod
|
||||
def _validate_file_url(cls, v: str) -> str:
|
||||
v = v.strip()
|
||||
parsed = urlparse(v)
|
||||
if parsed.scheme not in ('http', 'https'):
|
||||
raise ValueError('文件URL必须为 http/https')
|
||||
return v
|
||||
|
||||
@field_validator('relative_path')
|
||||
@classmethod
|
||||
def _validate_relative_path(cls, v: str) -> str:
|
||||
v = v.strip()
|
||||
if '..' in v or v.startswith('\\'):
|
||||
raise ValueError('相对路径包含不安全字符')
|
||||
return v
|
||||
|
||||
@model_validator(mode='after')
|
||||
def _validate_flags(self):
|
||||
if self.is_file and self.is_dir:
|
||||
raise ValueError('不能同时为文件和目录')
|
||||
if not self.is_file and not self.is_dir:
|
||||
raise ValueError('必须是文件或目录之一')
|
||||
# 根据名称自动修正隐藏标记
|
||||
self.is_hidden = self.name.startswith('.')
|
||||
return self
|
||||
|
||||
|
||||
class ResourceDirectorySchema(BaseModel):
|
||||
"""资源目录模型"""
|
||||
model_config = ConfigDict(from_attributes=True)
|
||||
|
||||
path: str = Field(..., description="目录路径")
|
||||
name: str = Field(..., description="目录名称")
|
||||
items: list[ResourceItemSchema] = Field(default_factory=list, description="目录项")
|
||||
total_files: int = Field(0, description="文件总数")
|
||||
total_dirs: int = Field(0, description="目录总数")
|
||||
total_size: int = Field(0, description="总大小")
|
||||
|
||||
|
||||
class ResourceUploadSchema(BaseModel):
|
||||
"""资源上传响应模型"""
|
||||
model_config = ConfigDict(from_attributes=True)
|
||||
|
||||
filename: str = Field(..., description="文件名")
|
||||
file_url: str = Field(..., description="访问URL")
|
||||
file_size: int = Field(..., description="文件大小")
|
||||
upload_time: datetime = Field(..., description="上传时间")
|
||||
|
||||
|
||||
class ResourceMoveSchema(BaseModel):
|
||||
"""资源移动模型"""
|
||||
model_config = ConfigDict(from_attributes=True)
|
||||
|
||||
source_path: str = Field(..., description="源路径")
|
||||
target_path: str = Field(..., description="目标路径")
|
||||
overwrite: bool = Field(False, description="是否覆盖")
|
||||
|
||||
@field_validator('source_path', 'target_path')
|
||||
@classmethod
|
||||
def validate_paths(cls, value: str):
|
||||
if not value or len(value.strip()) == 0:
|
||||
raise ValueError("路径不能为空")
|
||||
return value.strip()
|
||||
|
||||
|
||||
class ResourceCopySchema(ResourceMoveSchema):
|
||||
"""资源复制模型"""
|
||||
pass
|
||||
|
||||
|
||||
class ResourceRenameSchema(BaseModel):
|
||||
"""资源重命名模型"""
|
||||
model_config = ConfigDict(from_attributes=True)
|
||||
|
||||
old_path: str = Field(..., description="原路径")
|
||||
new_name: str = Field(..., description="新名称")
|
||||
|
||||
@field_validator('old_path', 'new_name')
|
||||
@classmethod
|
||||
def validate_inputs(cls, value: str):
|
||||
if not value or len(value.strip()) == 0:
|
||||
raise ValueError("参数不能为空")
|
||||
return value.strip()
|
||||
|
||||
@field_validator('new_name')
|
||||
@classmethod
|
||||
def _validate_new_name(cls, v: str) -> str:
|
||||
v = v.strip()
|
||||
if '..' in v or '/' in v or '\\' in v:
|
||||
raise ValueError('新名称包含不安全字符')
|
||||
return v
|
||||
|
||||
|
||||
class ResourceCreateDirSchema(BaseModel):
|
||||
"""创建目录模型"""
|
||||
model_config = ConfigDict(from_attributes=True)
|
||||
|
||||
parent_path: str = Field(..., description="父目录路径")
|
||||
dir_name: str = Field(..., description="目录名称", max_length=255)
|
||||
|
||||
@field_validator('parent_path', 'dir_name')
|
||||
@classmethod
|
||||
def validate_inputs(cls, value: str, info):
|
||||
# 对于parent_path允许为空字符串(表示根目录)或 '/',其他情况必须非空
|
||||
if info.field_name == 'parent_path':
|
||||
# 允许空字符串或 '/' 表示根目录
|
||||
if value is None:
|
||||
raise ValueError("参数不能为空")
|
||||
# 对于parent_path仍然严格检查路径遍历
|
||||
if '..' in value or value.startswith('\\'):
|
||||
raise ValueError("参数包含不安全字符")
|
||||
else: # 对于dir_name仍然严格检查
|
||||
if not value or len(value.strip()) == 0:
|
||||
raise ValueError("参数不能为空")
|
||||
if '..' in value or value.startswith('/') or value.startswith('\\'):
|
||||
raise ValueError("参数包含不安全字符")
|
||||
return value.strip()
|
||||
|
||||
|
||||
class ResourceSearchQueryParam:
|
||||
"""资源搜索查询参数"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str | None = Query(None, description="搜索关键词"),
|
||||
path: str | None = Query(None, description="目录路径"),
|
||||
) -> None:
|
||||
|
||||
# 模糊查询字段
|
||||
self.name = ("like", name) if name else None
|
||||
|
||||
# 精确查询字段
|
||||
self.path = path
|
||||
@@ -0,0 +1,826 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import os
|
||||
import shutil
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlparse
|
||||
from fastapi import UploadFile
|
||||
|
||||
from app.core.exceptions import CustomException
|
||||
from app.core.logger import log
|
||||
from app.utils.excel_util import ExcelUtil
|
||||
from app.config.setting import settings
|
||||
|
||||
from .schema import (
|
||||
ResourceItemSchema,
|
||||
ResourceDirectorySchema,
|
||||
ResourceUploadSchema,
|
||||
ResourceMoveSchema,
|
||||
ResourceCopySchema,
|
||||
ResourceRenameSchema,
|
||||
ResourceCreateDirSchema,
|
||||
ResourceSearchQueryParam
|
||||
)
|
||||
|
||||
|
||||
class ResourceService:
|
||||
"""
|
||||
资源管理模块服务层 - 管理系统静态文件目录
|
||||
"""
|
||||
|
||||
# 配置常量
|
||||
MAX_UPLOAD_SIZE = 100 * 1024 * 1024 # 100MB
|
||||
MAX_SEARCH_RESULTS = 1000 # 最大搜索结果数
|
||||
MAX_PATH_DEPTH = 20 # 最大路径深度
|
||||
|
||||
@classmethod
|
||||
def _get_resource_root(cls) -> str:
|
||||
"""
|
||||
获取资源管理根目录
|
||||
|
||||
返回:
|
||||
- str: 资源管理根目录路径。
|
||||
"""
|
||||
if not settings.STATIC_ENABLE:
|
||||
raise CustomException(msg='静态文件服务未启用')
|
||||
return str(settings.STATIC_ROOT)
|
||||
|
||||
@classmethod
|
||||
def _get_safe_path(cls, path: str | None = None) -> str:
|
||||
"""
|
||||
获取安全的文件路径
|
||||
|
||||
参数:
|
||||
- path (str | None): 原始文件路径。
|
||||
|
||||
返回:
|
||||
- str: 安全的文件路径。
|
||||
"""
|
||||
resource_root = cls._get_resource_root()
|
||||
|
||||
if not path:
|
||||
return resource_root
|
||||
|
||||
# 支持前端传递的完整URL或以STATIC_URL/ROOT_PATH+STATIC_URL开头的URL路径,转换为相对资源路径
|
||||
if isinstance(path, str):
|
||||
static_prefix = settings.STATIC_URL.rstrip('/')
|
||||
root_prefix = settings.ROOT_PATH.rstrip('/') if getattr(settings, 'ROOT_PATH', '') else ''
|
||||
root_static_prefix = f"{root_prefix}{static_prefix}" if root_prefix else static_prefix
|
||||
|
||||
def strip_prefix(p: str) -> str:
|
||||
if p.startswith(root_static_prefix):
|
||||
return p[len(root_static_prefix):].lstrip('/')
|
||||
if p.startswith(static_prefix):
|
||||
return p[len(static_prefix):].lstrip('/')
|
||||
return p
|
||||
|
||||
if path.startswith('http://') or path.startswith('https://'):
|
||||
parsed = urlparse(path)
|
||||
url_path = parsed.path or ''
|
||||
path = strip_prefix(url_path)
|
||||
else:
|
||||
path = strip_prefix(path)
|
||||
|
||||
# 清理路径,移除危险字符
|
||||
path = path.strip().replace('..', '').replace('//', '/')
|
||||
|
||||
# 规范化路径
|
||||
if os.path.isabs(path):
|
||||
safe_path = os.path.normpath(path)
|
||||
else:
|
||||
safe_path = os.path.normpath(os.path.join(resource_root, path))
|
||||
|
||||
# 检查路径是否在允许的范围内
|
||||
resource_root_abs = os.path.normpath(os.path.abspath(resource_root))
|
||||
safe_path_abs = os.path.normpath(os.path.abspath(safe_path))
|
||||
|
||||
if not safe_path_abs.startswith(resource_root_abs):
|
||||
raise CustomException(msg=f'访问路径不在允许范围内: {path}')
|
||||
|
||||
# 防止路径遍历攻击
|
||||
if '..' in safe_path or safe_path.count('/') > cls.MAX_PATH_DEPTH:
|
||||
raise CustomException(msg=f'不安全的路径格式: {path}')
|
||||
|
||||
return safe_path
|
||||
|
||||
@classmethod
|
||||
def _path_exists(cls, path: str) -> bool:
|
||||
"""
|
||||
检查路径是否存在
|
||||
|
||||
参数:
|
||||
- path (str): 要检查的路径。
|
||||
|
||||
返回:
|
||||
- bool: 如果路径存在则返回True,否则返回False。
|
||||
"""
|
||||
try:
|
||||
safe_path = cls._get_safe_path(path)
|
||||
return os.path.exists(safe_path)
|
||||
except:
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def _generate_http_url(cls, file_path: str, base_url: str | None = None) -> str:
|
||||
"""
|
||||
生成文件的HTTP URL
|
||||
|
||||
参数:
|
||||
- file_path (str): 文件的绝对路径。
|
||||
- base_url (str | None): 基础URL,用于生成完整URL。
|
||||
|
||||
返回:
|
||||
- str: 文件的HTTP URL。
|
||||
"""
|
||||
resource_root = cls._get_resource_root()
|
||||
try:
|
||||
relative_path = os.path.relpath(file_path, resource_root)
|
||||
# 确保路径使用正斜杠(URL格式)
|
||||
url_path = relative_path.replace(os.sep, '/')
|
||||
except ValueError:
|
||||
# 如果无法计算相对路径,使用文件名
|
||||
url_path = os.path.basename(file_path)
|
||||
|
||||
# 如果提供了base_url,使用它生成完整URL,否则使用settings.STATIC_URL
|
||||
if base_url:
|
||||
# 修复URL生成逻辑
|
||||
base_part = base_url.rstrip('/')
|
||||
static_part = settings.STATIC_URL.lstrip('/')
|
||||
file_part = url_path.lstrip('/')
|
||||
if base_part.endswith(':') or (len(base_part) > 0 and base_part[-1] not in ['/', ':']):
|
||||
base_part += '/'
|
||||
http_url = f"{base_part}{static_part}/{file_part}".replace('//', '/').replace(':/', '://')
|
||||
else:
|
||||
http_url = f"{settings.STATIC_URL}/{url_path}".replace('//', '/')
|
||||
|
||||
return http_url
|
||||
|
||||
@classmethod
|
||||
def _get_file_info(cls, file_path: str, base_url: str | None = None) -> dict:
|
||||
"""
|
||||
获取文件或目录的详细信息,如名称、大小、创建时间、修改时间、路径、深度、HTTP URL、是否隐藏、是否为目录等。
|
||||
|
||||
参数:
|
||||
- file_path (str): 文件或目录的路径。
|
||||
- base_url (str | None): 基础URL,用于生成完整URL。
|
||||
|
||||
返回:
|
||||
- dict: 文件或目录的详细信息字典。
|
||||
"""
|
||||
try:
|
||||
safe_path = cls._get_safe_path(file_path)
|
||||
if not os.path.exists(safe_path):
|
||||
return {}
|
||||
|
||||
stat = os.stat(safe_path)
|
||||
path_obj = Path(safe_path)
|
||||
resource_root = cls._get_resource_root()
|
||||
|
||||
# 计算相对路径
|
||||
try:
|
||||
relative_path = os.path.relpath(safe_path, resource_root)
|
||||
except ValueError:
|
||||
relative_path = os.path.basename(safe_path)
|
||||
|
||||
# 计算深度
|
||||
try:
|
||||
depth = len(Path(safe_path).relative_to(resource_root).parts)
|
||||
except ValueError:
|
||||
depth = 0
|
||||
|
||||
# 生成HTTP URL路径而不是文件系统路径
|
||||
http_url = cls._generate_http_url(safe_path, base_url)
|
||||
|
||||
# 检查是否为隐藏文件(文件名以点开头)
|
||||
is_hidden = path_obj.name.startswith('.')
|
||||
|
||||
# 对于目录,设置is_directory字段(兼容前端)
|
||||
is_directory = os.path.isdir(safe_path)
|
||||
|
||||
# 将datetime对象转换为ISO格式的字符串,确保JSON序列化成功
|
||||
created_time = datetime.fromtimestamp(stat.st_ctime).isoformat()
|
||||
modified_time = datetime.fromtimestamp(stat.st_mtime).isoformat()
|
||||
|
||||
return {
|
||||
'name': path_obj.name,
|
||||
'file_url': http_url, # 统一使用file_url字段
|
||||
'relative_path': relative_path,
|
||||
'is_file': os.path.isfile(safe_path),
|
||||
'is_dir': is_directory,
|
||||
'size': stat.st_size if os.path.isfile(safe_path) else None,
|
||||
'created_time': created_time,
|
||||
'modified_time': modified_time,
|
||||
'is_hidden': is_hidden
|
||||
}
|
||||
except Exception as e:
|
||||
log.error(f'获取文件信息失败: {str(e)}')
|
||||
return {}
|
||||
|
||||
@classmethod
|
||||
async def get_directory_list_service(cls, path: str | None = None, include_hidden: bool = False, base_url: str | None = None) -> dict:
|
||||
"""
|
||||
获取目录列表
|
||||
|
||||
参数:
|
||||
- path (str | None): 目录路径。如果未指定,将使用静态文件根目录。
|
||||
- include_hidden (bool): 是否包含隐藏文件。
|
||||
- base_url (str | None): 基础URL,用于生成完整URL。
|
||||
|
||||
返回:
|
||||
- dict: 包含目录列表和统计信息的字典。
|
||||
"""
|
||||
try:
|
||||
# 如果没有指定路径,使用静态文件根目录
|
||||
if path is None:
|
||||
safe_path = cls._get_resource_root()
|
||||
display_path = cls._generate_http_url(safe_path, base_url)
|
||||
else:
|
||||
safe_path = cls._get_safe_path(path)
|
||||
display_path = cls._generate_http_url(safe_path, base_url)
|
||||
|
||||
if not os.path.exists(safe_path):
|
||||
raise CustomException(msg='目录不存在')
|
||||
|
||||
if not os.path.isdir(safe_path):
|
||||
raise CustomException(msg='路径不是目录')
|
||||
|
||||
items = []
|
||||
total_files = 0
|
||||
total_dirs = 0
|
||||
total_size = 0
|
||||
|
||||
try:
|
||||
for item_name in os.listdir(safe_path):
|
||||
# 跳过隐藏文件
|
||||
if not include_hidden and item_name.startswith('.'):
|
||||
continue
|
||||
|
||||
item_path = os.path.join(safe_path, item_name)
|
||||
file_info = cls._get_file_info(item_path, base_url)
|
||||
|
||||
if file_info:
|
||||
items.append(ResourceItemSchema(**file_info))
|
||||
|
||||
if file_info['is_file']:
|
||||
total_files += 1
|
||||
total_size += file_info.get('size', 0) or 0
|
||||
elif file_info['is_dir']:
|
||||
total_dirs += 1
|
||||
|
||||
except PermissionError:
|
||||
raise CustomException(msg='没有权限访问此目录')
|
||||
|
||||
return ResourceDirectorySchema(
|
||||
path=display_path, # 返回HTTP URL路径而不是文件系统路径
|
||||
name=os.path.basename(safe_path),
|
||||
items=items,
|
||||
total_files=total_files,
|
||||
total_dirs=total_dirs,
|
||||
total_size=total_size
|
||||
).model_dump()
|
||||
|
||||
except CustomException:
|
||||
raise
|
||||
except Exception as e:
|
||||
log.error(f'获取目录列表失败: {str(e)}')
|
||||
raise CustomException(msg=f'获取目录列表失败: {str(e)}')
|
||||
|
||||
@classmethod
|
||||
async def get_resources_list_service(cls, search: ResourceSearchQueryParam | None = None, order_by: str | None = None, base_url: str | None = None) -> list[dict]:
|
||||
"""
|
||||
搜索资源列表(用于分页和导出)
|
||||
|
||||
参数:
|
||||
- search (ResourceSearchQueryParam | None): 查询参数模型。
|
||||
- order_by (str | None): 排序参数。
|
||||
- base_url (str | None): 基础URL,用于生成完整URL。
|
||||
|
||||
返回:
|
||||
- list[dict]: 资源详情字典列表。
|
||||
"""
|
||||
try:
|
||||
# 确定搜索路径
|
||||
if search and hasattr(search, 'path') and search.path:
|
||||
resource_root = cls._get_safe_path(search.path)
|
||||
else:
|
||||
resource_root = cls._get_resource_root()
|
||||
|
||||
# 检查路径是否存在
|
||||
if not os.path.exists(resource_root):
|
||||
raise CustomException(msg='目录不存在')
|
||||
|
||||
if not os.path.isdir(resource_root):
|
||||
raise CustomException(msg='路径不是目录')
|
||||
|
||||
# 收集资源
|
||||
all_resources = []
|
||||
|
||||
try:
|
||||
for item_name in os.listdir(resource_root):
|
||||
# 跳过隐藏文件
|
||||
if item_name.startswith('.'):
|
||||
continue
|
||||
|
||||
item_path = os.path.join(resource_root, item_name)
|
||||
file_info = cls._get_file_info(item_path, base_url)
|
||||
|
||||
if file_info:
|
||||
# 应用名称过滤
|
||||
if search and hasattr(search, 'name') and search.name and search.name[1]:
|
||||
search_keyword = search.name[1].lower()
|
||||
if search_keyword not in file_info.get('name', '').lower():
|
||||
continue
|
||||
|
||||
all_resources.append(file_info)
|
||||
|
||||
except PermissionError:
|
||||
raise CustomException(msg='没有权限访问此目录')
|
||||
|
||||
# 应用排序
|
||||
sorted_resources = cls._sort_results(all_resources, order_by)
|
||||
|
||||
# 限制最大结果数
|
||||
if len(sorted_resources) > cls.MAX_SEARCH_RESULTS:
|
||||
sorted_resources = sorted_resources[:cls.MAX_SEARCH_RESULTS]
|
||||
|
||||
return sorted_resources
|
||||
|
||||
except Exception as e:
|
||||
log.error(f'搜索资源失败: {str(e)}')
|
||||
raise CustomException(msg=f'搜索资源失败: {str(e)}')
|
||||
|
||||
@classmethod
|
||||
async def export_resource_service(cls, data_list: list[dict]) -> bytes:
|
||||
"""
|
||||
导出资源列表
|
||||
|
||||
参数:
|
||||
- data_list (list[dict]): 资源详情字典列表。
|
||||
|
||||
返回:
|
||||
- bytes: Excel文件的二进制数据。
|
||||
"""
|
||||
mapping_dict = {
|
||||
'name': '文件名',
|
||||
'path': '文件路径',
|
||||
'size': '文件大小',
|
||||
'created_time': '创建时间',
|
||||
'modified_time': '修改时间',
|
||||
'parent_path': '父目录'
|
||||
}
|
||||
|
||||
# 复制数据并转换状态
|
||||
export_data = data_list.copy()
|
||||
|
||||
# 格式化文件大小
|
||||
for item in export_data:
|
||||
if item.get('size'):
|
||||
item['size'] = cls._format_file_size(item['size'])
|
||||
|
||||
return ExcelUtil.export_list2excel(list_data=export_data, mapping_dict=mapping_dict)
|
||||
|
||||
@classmethod
|
||||
async def _get_directory_stats(cls, path: str, include_hidden: bool = False) -> dict[str, int]:
|
||||
"""
|
||||
递归获取目录统计信息
|
||||
|
||||
参数:
|
||||
- path (str): 目录路径。
|
||||
- include_hidden (bool): 是否包含隐藏文件。
|
||||
|
||||
返回:
|
||||
- dict[str, int]: 包含文件数、目录数和总大小的字典。
|
||||
"""
|
||||
stats = {'files': 0, 'dirs': 0, 'size': 0}
|
||||
|
||||
try:
|
||||
for root, dirs, files in os.walk(path):
|
||||
# 过滤隐藏目录
|
||||
if not include_hidden:
|
||||
dirs[:] = [d for d in dirs if not d.startswith('.')]
|
||||
files = [f for f in files if not f.startswith('.')]
|
||||
|
||||
stats['dirs'] += len(dirs)
|
||||
stats['files'] += len(files)
|
||||
|
||||
for file in files:
|
||||
file_path = os.path.join(root, file)
|
||||
try:
|
||||
stats['size'] += os.path.getsize(file_path)
|
||||
except (OSError, IOError):
|
||||
continue
|
||||
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return stats
|
||||
|
||||
@classmethod
|
||||
def _sort_results(cls, results: list[dict], order_by: str | None = None) -> list[dict]:
|
||||
"""
|
||||
排序搜索结果
|
||||
|
||||
参数:
|
||||
- results (list[dict]): 资源详情字典列表。
|
||||
- order_by (str | None): 排序参数。
|
||||
|
||||
返回:
|
||||
- list[dict]: 排序后的资源详情字典列表。
|
||||
"""
|
||||
try:
|
||||
# 默认按名称升序排序
|
||||
if not order_by:
|
||||
return sorted(results, key=lambda x: x.get('name', ''), reverse=False)
|
||||
|
||||
# 解析order_by参数,格式: [{'field':'asc/desc'}]
|
||||
try:
|
||||
sort_conditions = eval(order_by)
|
||||
if isinstance(sort_conditions, list):
|
||||
# 构建排序键函数
|
||||
def sort_key(item):
|
||||
keys = []
|
||||
for cond in sort_conditions:
|
||||
field = cond.get('field', 'name')
|
||||
direction = cond.get('direction', 'asc')
|
||||
# 获取字段值,默认为空字符串
|
||||
value = item.get(field, '')
|
||||
# 如果是日期字段,转换为可比较的格式
|
||||
if field in ['created_time', 'modified_time', 'accessed_time'] and value:
|
||||
value = datetime.fromisoformat(value)
|
||||
keys.append(value)
|
||||
return keys
|
||||
|
||||
# 确定排序方向(这里只支持单一方向,多个条件时使用第一个条件的方向)
|
||||
reverse = False
|
||||
if sort_conditions and isinstance(sort_conditions[0], dict):
|
||||
direction = sort_conditions[0].get('direction', '').lower()
|
||||
reverse = direction == 'desc'
|
||||
|
||||
return sorted(results, key=sort_key, reverse=reverse)
|
||||
except:
|
||||
# 如果解析失败,使用默认排序
|
||||
pass
|
||||
|
||||
return sorted(results, key=lambda x: x.get('name', ''), reverse=False)
|
||||
except:
|
||||
return results
|
||||
|
||||
@classmethod
|
||||
async def upload_file_service(cls, file: UploadFile, target_path: str | None = None, base_url: str | None = None) -> dict:
|
||||
"""
|
||||
上传文件到指定目录
|
||||
|
||||
参数:
|
||||
- file (UploadFile): 上传的文件对象。
|
||||
- target_path (str | None): 目标目录路径。
|
||||
- base_url (str | None): 基础URL,用于生成完整URL。
|
||||
|
||||
返回:
|
||||
- dict: 包含文件信息的字典。
|
||||
"""
|
||||
if not file or not file.filename:
|
||||
raise CustomException(msg="请选择要上传的文件")
|
||||
|
||||
# 文件名安全检查
|
||||
if '..' in file.filename or '/' in file.filename or '\\' in file.filename:
|
||||
raise CustomException(msg="文件名包含不安全字符")
|
||||
|
||||
try:
|
||||
# 检查文件大小
|
||||
content = await file.read()
|
||||
if len(content) > cls.MAX_UPLOAD_SIZE:
|
||||
raise CustomException(msg=f"文件太大,最大支持{cls.MAX_UPLOAD_SIZE // (1024*1024)}MB")
|
||||
|
||||
# 确定上传目录,如果没有指定目标路径,使用静态文件根目录
|
||||
if target_path is None:
|
||||
safe_dir = cls._get_resource_root()
|
||||
else:
|
||||
safe_dir = cls._get_safe_path(target_path)
|
||||
|
||||
# 创建目录(如果不存在)
|
||||
os.makedirs(safe_dir, exist_ok=True)
|
||||
|
||||
# 生成文件路径
|
||||
filename = file.filename
|
||||
file_path = os.path.join(safe_dir, filename)
|
||||
|
||||
# 检查文件是否已存在
|
||||
if os.path.exists(file_path):
|
||||
# 生成唯一文件名
|
||||
base_name, ext = os.path.splitext(filename)
|
||||
counter = 1
|
||||
while os.path.exists(file_path):
|
||||
new_filename = f"{base_name}_{counter}{ext}"
|
||||
file_path = os.path.join(safe_dir, new_filename)
|
||||
counter += 1
|
||||
filename = os.path.basename(file_path)
|
||||
|
||||
# 保存文件(使用已读取的内容)
|
||||
with open(file_path, 'wb') as f:
|
||||
f.write(content)
|
||||
|
||||
# 获取文件信息
|
||||
file_info = cls._get_file_info(file_path, base_url)
|
||||
|
||||
# 生成文件URL
|
||||
file_url = cls._generate_http_url(file_path, base_url)
|
||||
|
||||
log.info(f"文件上传成功: {filename}")
|
||||
|
||||
return ResourceUploadSchema(
|
||||
filename=filename,
|
||||
file_url=file_url,
|
||||
file_size=file_info.get('size', 0),
|
||||
upload_time=datetime.now()
|
||||
).model_dump(mode='json')
|
||||
|
||||
except Exception as e:
|
||||
log.error(f"文件上传失败: {str(e)}")
|
||||
raise CustomException(msg=f"文件上传失败: {str(e)}")
|
||||
|
||||
@classmethod
|
||||
async def download_file_service(cls, file_path: str, base_url: str | None = None) -> str:
|
||||
"""
|
||||
下载文件(返回本地文件系统路径)
|
||||
|
||||
参数:
|
||||
- file_path (str): 文件路径(可为相对路径、绝对路径或完整URL)。
|
||||
- base_url (str | None): 基础URL,用于生成完整URL(不再直接返回URL)。
|
||||
|
||||
返回:
|
||||
- str: 本地文件系统路径。
|
||||
"""
|
||||
try:
|
||||
safe_path = cls._get_safe_path(file_path)
|
||||
|
||||
if not os.path.exists(safe_path):
|
||||
raise CustomException(msg='文件不存在')
|
||||
|
||||
if not os.path.isfile(safe_path):
|
||||
raise CustomException(msg='路径不是文件')
|
||||
|
||||
# 返回本地文件路径给 FileResponse 使用
|
||||
log.info(f"定位文件路径: {safe_path}")
|
||||
return safe_path
|
||||
|
||||
except CustomException:
|
||||
raise
|
||||
except Exception as e:
|
||||
log.error(f"下载文件失败: {str(e)}")
|
||||
raise CustomException(msg=f"下载文件失败: {str(e)}")
|
||||
|
||||
@classmethod
|
||||
async def delete_file_service(cls, paths: list[str]) -> None:
|
||||
"""
|
||||
删除文件或目录
|
||||
|
||||
参数:
|
||||
- paths (list[str]): 文件或目录路径列表。
|
||||
|
||||
返回:
|
||||
- None
|
||||
"""
|
||||
if not paths:
|
||||
raise CustomException(msg='删除失败,删除路径不能为空')
|
||||
|
||||
for path in paths:
|
||||
try:
|
||||
safe_path = cls._get_safe_path(path)
|
||||
|
||||
if not os.path.exists(safe_path):
|
||||
log.error(f"路径不存在,跳过: {path}")
|
||||
continue
|
||||
|
||||
if os.path.isfile(safe_path):
|
||||
os.remove(safe_path)
|
||||
log.info(f"删除文件成功: {safe_path}")
|
||||
elif os.path.isdir(safe_path):
|
||||
shutil.rmtree(safe_path)
|
||||
log.info(f"删除目录成功: {safe_path}")
|
||||
|
||||
except Exception as e:
|
||||
log.error(f"删除失败 {path}: {str(e)}")
|
||||
raise CustomException(msg=f"删除失败 {path}: {str(e)}")
|
||||
|
||||
@classmethod
|
||||
async def batch_delete_service(cls, paths: list[str]) -> dict[str, list[str]]:
|
||||
"""
|
||||
批量删除文件或目录
|
||||
|
||||
参数:
|
||||
- paths (List[str]): 文件或目录路径列表。
|
||||
|
||||
返回:
|
||||
- Dict[str, List[str]]: 包含成功删除路径和失败删除路径的字典。
|
||||
"""
|
||||
if not paths:
|
||||
raise CustomException(msg='删除失败,删除路径不能为空')
|
||||
|
||||
success_paths = []
|
||||
failed_paths = []
|
||||
|
||||
for path in paths:
|
||||
try:
|
||||
safe_path = cls._get_safe_path(path)
|
||||
|
||||
if not os.path.exists(safe_path):
|
||||
failed_paths.append(path)
|
||||
continue
|
||||
|
||||
if os.path.isfile(safe_path):
|
||||
os.remove(safe_path)
|
||||
success_paths.append(path)
|
||||
log.info(f"删除文件成功: {safe_path}")
|
||||
elif os.path.isdir(safe_path):
|
||||
shutil.rmtree(safe_path)
|
||||
success_paths.append(path)
|
||||
log.info(f"删除目录成功: {safe_path}")
|
||||
|
||||
except Exception as e:
|
||||
log.error(f"删除失败 {path}: {str(e)}")
|
||||
failed_paths.append(path)
|
||||
|
||||
return {
|
||||
"success": success_paths,
|
||||
"failed": failed_paths
|
||||
}
|
||||
|
||||
@classmethod
|
||||
async def move_file_service(cls, data: ResourceMoveSchema) -> None:
|
||||
"""
|
||||
移动文件或目录
|
||||
|
||||
参数:
|
||||
- data (ResourceMoveSchema): 包含源路径和目标路径的模型。
|
||||
|
||||
返回:
|
||||
- None
|
||||
"""
|
||||
try:
|
||||
source_path = cls._get_safe_path(data.source_path)
|
||||
target_path = cls._get_safe_path(data.target_path)
|
||||
|
||||
if not os.path.exists(source_path):
|
||||
raise CustomException(msg='源路径不存在')
|
||||
|
||||
# 检查目标路径是否已存在
|
||||
if os.path.exists(target_path):
|
||||
if not data.overwrite:
|
||||
raise CustomException(msg='目标路径已存在')
|
||||
else:
|
||||
# 删除目标路径
|
||||
if os.path.isfile(target_path):
|
||||
os.remove(target_path)
|
||||
else:
|
||||
shutil.rmtree(target_path)
|
||||
|
||||
# 确保目标目录存在
|
||||
target_dir = os.path.dirname(target_path)
|
||||
os.makedirs(target_dir, exist_ok=True)
|
||||
|
||||
# 移动文件
|
||||
shutil.move(source_path, target_path)
|
||||
log.info(f"移动成功: {source_path} -> {target_path}")
|
||||
|
||||
except CustomException:
|
||||
raise
|
||||
except Exception as e:
|
||||
log.error(f"移动失败: {str(e)}")
|
||||
raise CustomException(msg=f"移动失败: {str(e)}")
|
||||
|
||||
@classmethod
|
||||
async def copy_file_service(cls, data: ResourceCopySchema) -> None:
|
||||
"""
|
||||
复制文件或目录
|
||||
|
||||
参数:
|
||||
- data (ResourceCopySchema): 包含源路径和目标路径的模型。
|
||||
|
||||
返回:
|
||||
- None
|
||||
"""
|
||||
try:
|
||||
source_path = cls._get_safe_path(data.source_path)
|
||||
target_path = cls._get_safe_path(data.target_path)
|
||||
|
||||
if not os.path.exists(source_path):
|
||||
raise CustomException(msg='源路径不存在')
|
||||
|
||||
# 检查目标路径是否已存在
|
||||
if os.path.exists(target_path) and not data.overwrite:
|
||||
raise CustomException(msg='目标路径已存在')
|
||||
|
||||
# 确保目标目录存在
|
||||
target_dir = os.path.dirname(target_path)
|
||||
os.makedirs(target_dir, exist_ok=True)
|
||||
|
||||
# 复制文件或目录
|
||||
if os.path.isfile(source_path):
|
||||
shutil.copy2(source_path, target_path)
|
||||
else:
|
||||
shutil.copytree(source_path, target_path, dirs_exist_ok=data.overwrite)
|
||||
|
||||
log.info(f"复制成功: {source_path} -> {target_path}")
|
||||
|
||||
except CustomException:
|
||||
raise
|
||||
except Exception as e:
|
||||
log.error(f"复制失败: {str(e)}")
|
||||
raise CustomException(msg=f"复制失败: {str(e)}")
|
||||
|
||||
@classmethod
|
||||
async def rename_file_service(cls, data: ResourceRenameSchema) -> None:
|
||||
"""
|
||||
重命名文件或目录
|
||||
|
||||
参数:
|
||||
- data (ResourceRenameSchema): 包含旧路径和新名称的模型。
|
||||
|
||||
返回:
|
||||
- None
|
||||
"""
|
||||
try:
|
||||
old_path = cls._get_safe_path(data.old_path)
|
||||
|
||||
if not os.path.exists(old_path):
|
||||
raise CustomException(msg='文件或目录不存在')
|
||||
|
||||
# 生成新路径
|
||||
parent_dir = os.path.dirname(old_path)
|
||||
new_path = os.path.join(parent_dir, data.new_name)
|
||||
|
||||
if os.path.exists(new_path):
|
||||
raise CustomException(msg='目标名称已存在')
|
||||
|
||||
# 重命名
|
||||
os.rename(old_path, new_path)
|
||||
log.info(f"重命名成功: {old_path} -> {new_path}")
|
||||
|
||||
except CustomException:
|
||||
raise
|
||||
except Exception as e:
|
||||
log.error(f"重命名失败: {str(e)}")
|
||||
raise CustomException(msg=f"重命名失败: {str(e)}")
|
||||
|
||||
@classmethod
|
||||
async def create_directory_service(cls, data: ResourceCreateDirSchema) -> None:
|
||||
"""
|
||||
创建目录
|
||||
|
||||
参数:
|
||||
- data (ResourceCreateDirSchema): 包含父目录路径和目录名称的模型。
|
||||
|
||||
返回:
|
||||
- None
|
||||
"""
|
||||
try:
|
||||
parent_path = cls._get_safe_path(data.parent_path)
|
||||
|
||||
if not os.path.exists(parent_path):
|
||||
raise CustomException(msg='父目录不存在')
|
||||
|
||||
if not os.path.isdir(parent_path):
|
||||
raise CustomException(msg='父路径不是目录')
|
||||
|
||||
# 生成新目录路径
|
||||
new_dir_path = os.path.join(parent_path, data.dir_name)
|
||||
|
||||
# 安全检查:确保新目录名称不包含路径遍历字符
|
||||
if '..' in data.dir_name or '/' in data.dir_name or '\\' in data.dir_name:
|
||||
raise CustomException(msg='目录名称包含不安全字符')
|
||||
|
||||
if os.path.exists(new_dir_path):
|
||||
raise CustomException(msg='目录已存在')
|
||||
|
||||
# 创建目录
|
||||
os.makedirs(new_dir_path)
|
||||
log.info(f"创建目录成功: {new_dir_path}")
|
||||
|
||||
except CustomException:
|
||||
raise
|
||||
except Exception as e:
|
||||
log.error(f"创建目录失败: {str(e)}")
|
||||
raise CustomException(msg=f"创建目录失败: {str(e)}")
|
||||
|
||||
@classmethod
|
||||
def _format_file_size(cls, size_bytes: int) -> str:
|
||||
"""
|
||||
格式化文件大小
|
||||
|
||||
参数:
|
||||
- size_bytes (int): 文件大小(字节)
|
||||
|
||||
返回:
|
||||
- str: 格式化后的文件大小字符串(例如:"123.45MB")
|
||||
"""
|
||||
if size_bytes == 0:
|
||||
return "0B"
|
||||
|
||||
size_names = ["B", "KB", "MB", "GB", "TB"]
|
||||
i = 0
|
||||
while size_bytes >= 1024 and i < len(size_names) - 1:
|
||||
size_bytes = int(size_bytes / 1024)
|
||||
i += 1
|
||||
|
||||
return f"{size_bytes:.2f}{size_names[i]}"
|
||||
Reference in New Issue
Block a user