image_util.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. #!/usr/bin/env python3
  2. """
  3. 图片处理工具类
  4. """
  5. import os
  6. import zipfile
  7. import re
  8. from typing import List, Dict, Any
  9. from io import BytesIO
  10. from PIL import Image
  11. from utils.minio.minio_util import MinIOUtil
  12. from utils.minio.file_utils import generate_unique_filename
  13. class ImageUtil:
  14. """图片处理工具类"""
  15. def __init__(self):
  16. """初始化图片处理工具类"""
  17. self.minio_util = MinIOUtil()
  18. def process_image_zip(self, zip_file_path: str, book_name: str) -> List[str]:
  19. """
  20. 处理图片压缩包,解压并将图片存入minio,返回按页码顺序排序的url集合
  21. Args:
  22. zip_file_path: 图片压缩包路径
  23. book_name: 书名,用于生成图片文件名
  24. Returns:
  25. List[str]: 按页码顺序排序的minio url集合
  26. """
  27. print(f"开始处理图片压缩包: {zip_file_path}")
  28. # 用于存储图片信息的列表,格式: (页码, url)
  29. image_info_list = []
  30. try:
  31. # 打开并解压压缩包
  32. with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
  33. # 获取压缩包中的所有文件名
  34. all_files = zip_ref.namelist()
  35. # 过滤出图片文件
  36. image_files = [
  37. f for f in all_files
  38. if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif'))
  39. and not f.startswith('__MACOSX') # 排除macOS生成的隐藏文件
  40. ]
  41. print(f"找到 {len(image_files)} 张图片")
  42. # 遍历所有图片文件
  43. for image_file in image_files:
  44. try:
  45. # 从文件名中提取页码
  46. page_num = self._extract_page_number(image_file)
  47. # 生成符合要求的文件名:书名_页码,例如:莉莉兰的小虫虫_P1.png
  48. _, ext = os.path.splitext(image_file)
  49. new_filename = f"{book_name}_P{page_num}{ext}"
  50. print(f"处理图片: {image_file} -> {new_filename}, 页码: {page_num}")
  51. # 读取图片文件内容
  52. with zip_ref.open(image_file) as f:
  53. image_content = f.read()
  54. # 将图片内容转换为BytesIO流
  55. image_stream = BytesIO(image_content)
  56. # 压缩图片
  57. compressed_stream = self._compress_image(image_stream, new_filename)
  58. # 上传到minio
  59. image_url = self.minio_util.upload_file(compressed_stream, new_filename)
  60. # 添加到图片信息列表
  61. image_info_list.append((page_num, image_url))
  62. except Exception as e:
  63. print(f"处理图片 {image_file} 失败: {str(e)}")
  64. continue
  65. # 按页码顺序排序
  66. image_info_list.sort(key=lambda x: x[0])
  67. # 提取url列表
  68. image_urls = [url for _, url in image_info_list]
  69. print(f"图片压缩包处理完成,共处理 {len(image_urls)} 张图片")
  70. return image_urls
  71. except Exception as e:
  72. print(f"处理图片压缩包失败: {str(e)}")
  73. raise
  74. def _compress_image(self, image_stream: BytesIO, original_filename: str, max_size_kb: int = 5000) -> BytesIO:
  75. """
  76. 压缩图片,确保最终压缩大小不超过max_size_kb
  77. 使用尺寸调整和质量调整结合的方式,确保压缩效果
  78. Args:
  79. image_stream: 图片流
  80. original_filename: 原始文件名
  81. max_size_kb: 最大文件大小,单位KB
  82. Returns:
  83. BytesIO: 压缩后的图片流
  84. """
  85. # 检查图片大小
  86. image_stream.seek(0, 2) # 移动到文件末尾
  87. current_size = image_stream.tell() / 1024 # 当前大小,单位KB
  88. original_stream_data = image_stream.getvalue() # 保存原始流数据
  89. image_stream.seek(0) # 回到文件开头
  90. # 严格目标大小,使用max_size_kb作为目标
  91. target_size = max_size_kb
  92. # 如果当前大小小于等于目标大小,直接返回
  93. if current_size <= target_size:
  94. return image_stream
  95. # 打开图片
  96. img = Image.open(image_stream)
  97. original_width, original_height = img.size
  98. # 获取原始图片格式
  99. original_format = img.format or 'JPEG' # 默认使用JPEG格式
  100. # 使用LANCZOS高质量重采样算法
  101. resample_method = Image.Resampling.LANCZOS
  102. # 保存最佳结果
  103. best_result = None
  104. best_size = float('inf')
  105. # 辅助函数:获取指定尺寸和质量的压缩大小和字节流
  106. def get_compressed_data(width, height, quality_val):
  107. """
  108. 获取指定尺寸和质量的压缩大小和字节流
  109. """
  110. # 调整图片尺寸
  111. resized_img = img.resize((width, height), resample_method)
  112. # 保存调整后的图片
  113. compressed_stream = BytesIO()
  114. resized_img.save(compressed_stream, format=original_format, quality=quality_val)
  115. compressed_stream.seek(0, 2)
  116. compressed_size = compressed_stream.tell() / 1024
  117. compressed_stream.seek(0)
  118. return compressed_size, compressed_stream.getvalue()
  119. # 主要压缩逻辑:逐步缩小尺寸和降低质量,直到符合要求
  120. # 尺寸调整为主,质量调整为辅
  121. sizes_to_try = []
  122. # 生成要尝试的尺寸列表(从原始尺寸开始,逐步缩小)
  123. current_try_width, current_try_height = original_width, original_height
  124. for i in range(15): # 最多尝试15种尺寸
  125. sizes_to_try.append((current_try_width, current_try_height))
  126. # 每次缩小10%
  127. current_try_width = int(current_try_width * 0.9)
  128. current_try_height = int(current_try_height * 0.9)
  129. # 确保尺寸不小于原始尺寸的30%
  130. if current_try_width < original_width * 0.3 or current_try_height < original_height * 0.3:
  131. break
  132. # 质量级别列表(从高质量开始,逐步降低)
  133. quality_levels = [90, 85, 80, 75, 70, 65, 60]
  134. # 遍历所有尺寸和质量组合,寻找最佳结果
  135. for width, height in sizes_to_try:
  136. for quality in quality_levels:
  137. # 获取当前参数的压缩数据
  138. compressed_size, compressed_data = get_compressed_data(width, height, quality)
  139. # 更新最佳结果
  140. if compressed_size < best_size:
  141. best_result = compressed_data
  142. best_size = compressed_size
  143. # 如果已经达到目标大小,直接返回
  144. if compressed_size <= target_size:
  145. final_stream = BytesIO(compressed_data)
  146. final_stream.seek(0)
  147. return final_stream
  148. # 如果没有找到符合要求的结果,使用最佳结果
  149. if best_result is None:
  150. # 返回原始图片
  151. return BytesIO(original_stream_data)
  152. # 最终检查:如果最佳结果仍超过目标,使用最激进的压缩
  153. final_stream = BytesIO(best_result)
  154. final_stream.seek(0, 2)
  155. final_size = final_stream.tell() / 1024
  156. final_stream.seek(0)
  157. if final_size > target_size:
  158. # 使用最激进的压缩参数
  159. aggressive_width = int(original_width * 0.5)
  160. aggressive_height = int(original_height * 0.5)
  161. aggressive_quality = 50
  162. aggressive_size, aggressive_data = get_compressed_data(aggressive_width, aggressive_height, aggressive_quality)
  163. if aggressive_size < final_size:
  164. final_stream = BytesIO(aggressive_data)
  165. final_stream.seek(0)
  166. return final_stream
  167. def _compress_image_to_bytes(self, image_stream: BytesIO, max_size_kb: int = 5000) -> bytes:
  168. """
  169. 压缩图片,将大于max_size_kb的图片压缩到max_size_kb以内,返回图片的字节流
  170. Args:
  171. image_stream: 图片流
  172. max_size_kb: 最大文件大小,单位KB
  173. Returns:
  174. bytes: 压缩后的图片字节流
  175. """
  176. # 创建一个临时文件名用于日志
  177. temp_filename = "temp_image"
  178. # 调用现有的压缩方法获取压缩后的BytesIO对象
  179. compressed_stream = self._compress_image(image_stream, temp_filename, max_size_kb)
  180. # 读取字节流并返回
  181. compressed_bytes = compressed_stream.getvalue()
  182. print(f"图片转换为字节流完成,字节大小为 {len(compressed_bytes)} 字节")
  183. return compressed_bytes
  184. def compress_image_bytes(self, image_bytes: bytes, max_size_kb: int = 5000) -> bytes:
  185. """
  186. 压缩图片,将大于max_size_kb的图片压缩到max_size_kb以内,返回图片的字节流
  187. Args:
  188. image_bytes: 图片字节流
  189. max_size_kb: 最大文件大小,单位KB
  190. Returns:
  191. bytes: 压缩后的图片字节流
  192. """
  193. print(f"开始压缩图片,原大小为 {len(image_bytes) / 1024:.2f}KB")
  194. # 将字节流转换为BytesIO对象
  195. image_stream = BytesIO(image_bytes)
  196. # 调用现有的压缩方法
  197. compressed_bytes = self._compress_image_to_bytes(image_stream, max_size_kb)
  198. print(f"图片压缩完成,压缩后大小为 {len(compressed_bytes) / 1024:.2f}KB")
  199. return compressed_bytes
  200. def _extract_page_number(self, filename: str) -> int:
  201. """
  202. 从文件名中提取页码
  203. Args:
  204. filename: 文件名
  205. Returns:
  206. int: 页码
  207. """
  208. # 提取文件名(不含路径)
  209. basename = os.path.basename(filename)
  210. # 使用正则表达式匹配页码
  211. # 匹配类似 P1, Page1, 001, 1 等格式的页码
  212. patterns = [
  213. r'P(\d+)', # P1, P123
  214. r'Page(\d+)', # Page1, Page123
  215. r'(\d+)\.(?:png|jpg|jpeg|gif)$', # 1.png, 123.jpg
  216. r'(\d+)_', # 1_, 123_
  217. ]
  218. for pattern in patterns:
  219. match = re.search(pattern, basename, re.IGNORECASE)
  220. if match:
  221. return int(match.group(1))
  222. # 如果没有匹配到页码,返回0
  223. return 0
  224. # 单例模式
  225. image_util = ImageUtil()