minio_util.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. from minio import Minio
  2. from typing import BinaryIO
  3. from datetime import timedelta
  4. from conf.config import MinioConfig
  5. from .file_utils import generate_unique_filename
  6. class MinIOUtil:
  7. def __init__(self):
  8. self.client = Minio(
  9. endpoint=MinioConfig.get_minio_endpoint(),
  10. access_key=MinioConfig.get_minio_access_key(),
  11. secret_key=MinioConfig.get_minio_secret_key(),
  12. secure=False
  13. )
  14. self.bucket_name = MinioConfig.get_minio_bucket_name()
  15. self._ensure_bucket_exists()
  16. def _ensure_bucket_exists(self):
  17. """确保存储桶存在,若不存在则创建"""
  18. try:
  19. if not self.client.bucket_exists(self.bucket_name):
  20. self.client.make_bucket(self.bucket_name)
  21. print(f"Bucket '{self.bucket_name}' created successfully.")
  22. else:
  23. print(f"Bucket '{self.bucket_name}' already exists.")
  24. except Exception as e:
  25. raise RuntimeError(f"Failed to create bucket: {e}")
  26. def upload_file(self, file: BinaryIO, original_filename: str) -> str:
  27. """上传文件并返回URL"""
  28. try:
  29. # 生成唯一文件名,防止冲突
  30. unique_filename = generate_unique_filename(original_filename)
  31. content_type = self._get_content_type(original_filename)
  32. # 获取文件长度
  33. if hasattr(file, 'getbuffer'):
  34. # 对于BytesIO对象,获取其缓冲区大小
  35. length = file.getbuffer().nbytes
  36. elif hasattr(file, 'tell') and hasattr(file, 'seek'):
  37. # 对于支持seek/tell的文件对象,获取其大小
  38. current_pos = file.tell()
  39. file.seek(0, 2) # 移动到文件末尾
  40. length = file.tell()
  41. file.seek(current_pos) # 恢复到原始位置
  42. else:
  43. # 对于其他类型,使用-1让MinIO自动处理
  44. length = -1
  45. # 上传文件(支持大文件分块上传)
  46. self.client.put_object(
  47. bucket_name=self.bucket_name,
  48. object_name=unique_filename,
  49. data=file,
  50. length=length,
  51. content_type=content_type
  52. )
  53. # 生成公开可访问的URL(可选:设置过期时间或私有访问)
  54. url = self.client.get_presigned_url(
  55. method="GET",
  56. bucket_name=self.bucket_name,
  57. object_name=unique_filename,
  58. expires=timedelta(seconds=3600) # 1小时有效期(可设为永久或更短)
  59. )
  60. return url
  61. except Exception as e:
  62. raise RuntimeError(f"File upload failed: {e}")
  63. def download_file(self, object_name: str) -> BinaryIO:
  64. """下载文件并返回文件流"""
  65. try:
  66. response = self.client.get_object(
  67. bucket_name=self.bucket_name,
  68. object_name=object_name
  69. )
  70. return response
  71. except Exception as e:
  72. raise RuntimeError(f"File download failed: {e}")
  73. def delete_file(self, object_name: str) -> bool:
  74. """删除文件"""
  75. try:
  76. self.client.remove_object(
  77. bucket_name=self.bucket_name,
  78. object_name=object_name
  79. )
  80. return True
  81. except Exception as e:
  82. print(f"Delete failed: {e}")
  83. return False
  84. def _get_content_type(self, filename: str) -> str:
  85. """根据文件后缀推断MIME类型"""
  86. ext = filename.split('.')[-1].lower()
  87. mime_map = {
  88. 'jpg': 'image/jpeg', 'jpeg': 'image/jpeg',
  89. 'png': 'image/png', 'gif': 'image/gif',
  90. 'pdf': 'application/pdf',
  91. 'txt': 'text/plain',
  92. 'mp4': 'video/mp4',
  93. 'mp3': 'audio/mpeg'
  94. }
  95. return mime_map.get(ext, 'application/octet-stream')