from minio import Minio from typing import BinaryIO from datetime import timedelta from conf.settings import minio_settings from utils.file.file_utils import generate_unique_filename # 全局MinIO客户端实例 _global_minio_client = None class MinIOUtil: def __init__(self, check_bucket=False): self.client = Minio( endpoint=minio_settings.minio_endpoint, access_key=minio_settings.minio_access_key, secret_key=minio_settings.minio_secret_key, secure=False ) self.bucket_name = minio_settings.minio_bucket_name # 仅在明确要求时才校验存储桶 if check_bucket: self._ensure_bucket_exists() def _ensure_bucket_exists(self): """确保存储桶存在,若不存在则创建""" try: if not self.client.bucket_exists(self.bucket_name): self.client.make_bucket(self.bucket_name) print(f"Bucket '{self.bucket_name}' created successfully.") else: print(f"Bucket '{self.bucket_name}' already exists.") except Exception as e: raise RuntimeError(f"Failed to create bucket: {e}") def close(self): """关闭MinIO客户端连接""" # MinIO客户端不需要显式关闭连接,此方法用于统一接口 pass def upload_file(self, file: BinaryIO, original_filename: str) -> str: """上传文件并返回URL""" try: # 生成唯一文件名,防止冲突 unique_filename = generate_unique_filename(original_filename) content_type = self._get_content_type(original_filename) # 获取文件长度 if hasattr(file, 'getbuffer'): # 对于BytesIO对象,获取其缓冲区大小 length = file.getbuffer().nbytes elif hasattr(file, 'tell') and hasattr(file, 'seek'): # 对于支持seek/tell的文件对象,获取其大小 current_pos = file.tell() file.seek(0, 2) # 移动到文件末尾 length = file.tell() file.seek(current_pos) # 恢复到原始位置 else: # 对于其他类型,使用-1让MinIO自动处理 length = -1 # 上传文件(支持大文件分块上传) self.client.put_object( bucket_name=self.bucket_name, object_name=unique_filename, data=file, length=length, content_type=content_type ) # 生成公开可访问的URL(可选:设置过期时间或私有访问) url = self.client.get_presigned_url( method="GET", bucket_name=self.bucket_name, object_name=unique_filename, expires=timedelta(seconds=3600) # 1小时有效期(可设为永久或更短) ) return url except Exception as e: raise RuntimeError(f"File upload failed: {e}") def download_file(self, object_name: str) -> BinaryIO: """下载文件并返回文件流""" try: response = self.client.get_object( bucket_name=self.bucket_name, object_name=object_name ) return response except Exception as e: raise RuntimeError(f"File download failed: {e}") def delete_file(self, object_name: str) -> bool: """删除文件""" try: self.client.remove_object( bucket_name=self.bucket_name, object_name=object_name ) return True except Exception as e: print(f"Delete failed: {e}") return False def _get_content_type(self, filename: str) -> str: """根据文件后缀推断MIME类型""" ext = filename.split('.')[-1].lower() mime_map = { 'jpg': 'image/jpeg', 'jpeg': 'image/jpeg', 'png': 'image/png', 'gif': 'image/gif', 'pdf': 'application/pdf', 'txt': 'text/plain', 'mp4': 'video/mp4', 'mp3': 'audio/mpeg' } return mime_map.get(ext, 'application/octet-stream') def get_minio_client() -> MinIOUtil: """获取全局MinIO客户端实例""" global _global_minio_client if _global_minio_client is None: raise RuntimeError("MinIO client has not been initialized. Call init_minio_client() first.") return _global_minio_client def init_minio_client(check_bucket=False) -> None: """初始化全局MinIO客户端 Args: check_bucket: 是否在初始化时校验存储桶 """ global _global_minio_client if _global_minio_client is None: _global_minio_client = MinIOUtil(check_bucket=check_bucket) def close_minio_client() -> None: """关闭全局MinIO客户端""" global _global_minio_client if _global_minio_client is not None: _global_minio_client.close() _global_minio_client = None