| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- from minio import Minio
- from typing import BinaryIO
- from datetime import timedelta
- from conf.settings import minio_settings
- from utils.file.file_utils import generate_unique_filename
- # 全局MinIO客户端实例
- _global_minio_client = None
- class MinIOUtil:
- def __init__(self, check_bucket=False):
- self.client = Minio(
- endpoint=minio_settings.minio_endpoint,
- access_key=minio_settings.minio_access_key,
- secret_key=minio_settings.minio_secret_key,
- secure=False
- )
- self.bucket_name = minio_settings.minio_bucket_name
-
- # 仅在明确要求时才校验存储桶
- if check_bucket:
- self._ensure_bucket_exists()
- def _ensure_bucket_exists(self):
- """确保存储桶存在,若不存在则创建"""
- try:
- if not self.client.bucket_exists(self.bucket_name):
- self.client.make_bucket(self.bucket_name)
- print(f"Bucket '{self.bucket_name}' created successfully.")
- else:
- print(f"Bucket '{self.bucket_name}' already exists.")
- except Exception as e:
- raise RuntimeError(f"Failed to create bucket: {e}")
- def close(self):
- """关闭MinIO客户端连接"""
- # MinIO客户端不需要显式关闭连接,此方法用于统一接口
- pass
- def upload_file(self, file: BinaryIO, original_filename: str) -> str:
- """上传文件并返回URL"""
- try:
- # 生成唯一文件名,防止冲突
- unique_filename = generate_unique_filename(original_filename)
- content_type = self._get_content_type(original_filename)
-
- # 获取文件长度
- if hasattr(file, 'getbuffer'):
- # 对于BytesIO对象,获取其缓冲区大小
- length = file.getbuffer().nbytes
- elif hasattr(file, 'tell') and hasattr(file, 'seek'):
- # 对于支持seek/tell的文件对象,获取其大小
- current_pos = file.tell()
- file.seek(0, 2) # 移动到文件末尾
- length = file.tell()
- file.seek(current_pos) # 恢复到原始位置
- else:
- # 对于其他类型,使用-1让MinIO自动处理
- length = -1
-
- # 上传文件(支持大文件分块上传)
- self.client.put_object(
- bucket_name=self.bucket_name,
- object_name=unique_filename,
- data=file,
- length=length,
- content_type=content_type
- )
-
- # 生成公开可访问的URL(可选:设置过期时间或私有访问)
- url = self.client.get_presigned_url(
- method="GET",
- bucket_name=self.bucket_name,
- object_name=unique_filename,
- expires=timedelta(seconds=3600) # 1小时有效期(可设为永久或更短)
- )
-
- return url
- except Exception as e:
- raise RuntimeError(f"File upload failed: {e}")
- def download_file(self, object_name: str) -> BinaryIO:
- """下载文件并返回文件流"""
- try:
- response = self.client.get_object(
- bucket_name=self.bucket_name,
- object_name=object_name
- )
- return response
- except Exception as e:
- raise RuntimeError(f"File download failed: {e}")
- def delete_file(self, object_name: str) -> bool:
- """删除文件"""
- try:
- self.client.remove_object(
- bucket_name=self.bucket_name,
- object_name=object_name
- )
- return True
- except Exception as e:
- print(f"Delete failed: {e}")
- return False
- def _get_content_type(self, filename: str) -> str:
- """根据文件后缀推断MIME类型"""
- ext = filename.split('.')[-1].lower()
- mime_map = {
- 'jpg': 'image/jpeg', 'jpeg': 'image/jpeg',
- 'png': 'image/png', 'gif': 'image/gif',
- 'pdf': 'application/pdf',
- 'txt': 'text/plain',
- 'mp4': 'video/mp4',
- 'mp3': 'audio/mpeg'
- }
- return mime_map.get(ext, 'application/octet-stream')
- def get_minio_client() -> MinIOUtil:
- """获取全局MinIO客户端实例"""
- global _global_minio_client
- if _global_minio_client is None:
- raise RuntimeError("MinIO client has not been initialized. Call init_minio_client() first.")
- return _global_minio_client
- def init_minio_client(check_bucket=False) -> None:
- """初始化全局MinIO客户端
-
- Args:
- check_bucket: 是否在初始化时校验存储桶
- """
- global _global_minio_client
- if _global_minio_client is None:
- _global_minio_client = MinIOUtil(check_bucket=check_bucket)
- def close_minio_client() -> None:
- """关闭全局MinIO客户端"""
- global _global_minio_client
- if _global_minio_client is not None:
- _global_minio_client.close()
- _global_minio_client = None
|