| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294 |
- #!/usr/bin/env python3
- """
- 图片处理工具类
- """
- import os
- import zipfile
- import re
- from typing import List, Dict, Any
- from io import BytesIO
- from PIL import Image
- from utils.minio.minio_util import MinIOUtil
- from utils.minio.file_utils import generate_unique_filename
- class ImageUtil:
- """图片处理工具类"""
-
- def __init__(self):
- """初始化图片处理工具类"""
- self.minio_util = MinIOUtil()
-
- def process_image_zip(self, zip_file_path: str, book_name: str) -> List[str]:
- """
- 处理图片压缩包,解压并将图片存入minio,返回按页码顺序排序的url集合
-
- Args:
- zip_file_path: 图片压缩包路径
- book_name: 书名,用于生成图片文件名
-
- Returns:
- List[str]: 按页码顺序排序的minio url集合
- """
- print(f"开始处理图片压缩包: {zip_file_path}")
-
- # 用于存储图片信息的列表,格式: (页码, url)
- image_info_list = []
-
- try:
- # 打开并解压压缩包
- with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
- # 获取压缩包中的所有文件名
- all_files = zip_ref.namelist()
-
- # 过滤出图片文件
- image_files = [
- f for f in all_files
- if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif'))
- and not f.startswith('__MACOSX') # 排除macOS生成的隐藏文件
- ]
-
- print(f"找到 {len(image_files)} 张图片")
-
- # 遍历所有图片文件
- for image_file in image_files:
- try:
- # 从文件名中提取页码
- page_num = self._extract_page_number(image_file)
-
- # 生成符合要求的文件名:书名_页码,例如:莉莉兰的小虫虫_P1.png
- _, ext = os.path.splitext(image_file)
- new_filename = f"{book_name}_P{page_num}{ext}"
-
- print(f"处理图片: {image_file} -> {new_filename}, 页码: {page_num}")
-
- # 读取图片文件内容
- with zip_ref.open(image_file) as f:
- image_content = f.read()
-
- # 将图片内容转换为BytesIO流
- image_stream = BytesIO(image_content)
-
- # 压缩图片
- compressed_stream = self._compress_image(image_stream, new_filename)
-
- # 上传到minio
- image_url = self.minio_util.upload_file(compressed_stream, new_filename)
-
- # 添加到图片信息列表
- image_info_list.append((page_num, image_url))
-
- except Exception as e:
- print(f"处理图片 {image_file} 失败: {str(e)}")
- continue
-
- # 按页码顺序排序
- image_info_list.sort(key=lambda x: x[0])
-
- # 提取url列表
- image_urls = [url for _, url in image_info_list]
-
- print(f"图片压缩包处理完成,共处理 {len(image_urls)} 张图片")
- return image_urls
-
- except Exception as e:
- print(f"处理图片压缩包失败: {str(e)}")
- raise
-
- def _compress_image(self, image_stream: BytesIO, original_filename: str, max_size_kb: int = 5000) -> BytesIO:
- """
- 压缩图片,确保最终压缩大小不超过max_size_kb
- 使用尺寸调整和质量调整结合的方式,确保压缩效果
-
- Args:
- image_stream: 图片流
- original_filename: 原始文件名
- max_size_kb: 最大文件大小,单位KB
-
- Returns:
- BytesIO: 压缩后的图片流
- """
- # 检查图片大小
- image_stream.seek(0, 2) # 移动到文件末尾
- current_size = image_stream.tell() / 1024 # 当前大小,单位KB
- original_stream_data = image_stream.getvalue() # 保存原始流数据
- image_stream.seek(0) # 回到文件开头
-
- # 严格目标大小,使用max_size_kb作为目标
- target_size = max_size_kb
-
- # 如果当前大小小于等于目标大小,直接返回
- if current_size <= target_size:
- return image_stream
-
- # 打开图片
- img = Image.open(image_stream)
- original_width, original_height = img.size
-
- # 获取原始图片格式
- original_format = img.format or 'JPEG' # 默认使用JPEG格式
-
- # 使用LANCZOS高质量重采样算法
- resample_method = Image.Resampling.LANCZOS
-
- # 保存最佳结果
- best_result = None
- best_size = float('inf')
-
- # 辅助函数:获取指定尺寸和质量的压缩大小和字节流
- def get_compressed_data(width, height, quality_val):
- """
- 获取指定尺寸和质量的压缩大小和字节流
- """
- # 调整图片尺寸
- resized_img = img.resize((width, height), resample_method)
-
- # 保存调整后的图片
- compressed_stream = BytesIO()
- resized_img.save(compressed_stream, format=original_format, quality=quality_val)
- compressed_stream.seek(0, 2)
- compressed_size = compressed_stream.tell() / 1024
- compressed_stream.seek(0)
-
- return compressed_size, compressed_stream.getvalue()
-
- # 主要压缩逻辑:逐步缩小尺寸和降低质量,直到符合要求
- # 尺寸调整为主,质量调整为辅
- sizes_to_try = []
-
- # 生成要尝试的尺寸列表(从原始尺寸开始,逐步缩小)
- current_try_width, current_try_height = original_width, original_height
- for i in range(15): # 最多尝试15种尺寸
- sizes_to_try.append((current_try_width, current_try_height))
- # 每次缩小10%
- current_try_width = int(current_try_width * 0.9)
- current_try_height = int(current_try_height * 0.9)
- # 确保尺寸不小于原始尺寸的30%
- if current_try_width < original_width * 0.3 or current_try_height < original_height * 0.3:
- break
-
- # 质量级别列表(从高质量开始,逐步降低)
- quality_levels = [90, 85, 80, 75, 70, 65, 60]
-
- # 遍历所有尺寸和质量组合,寻找最佳结果
- for width, height in sizes_to_try:
- for quality in quality_levels:
- # 获取当前参数的压缩数据
- compressed_size, compressed_data = get_compressed_data(width, height, quality)
-
- # 更新最佳结果
- if compressed_size < best_size:
- best_result = compressed_data
- best_size = compressed_size
-
- # 如果已经达到目标大小,直接返回
- if compressed_size <= target_size:
- final_stream = BytesIO(compressed_data)
- final_stream.seek(0)
- return final_stream
-
- # 如果没有找到符合要求的结果,使用最佳结果
- if best_result is None:
- # 返回原始图片
- return BytesIO(original_stream_data)
-
- # 最终检查:如果最佳结果仍超过目标,使用最激进的压缩
- final_stream = BytesIO(best_result)
- final_stream.seek(0, 2)
- final_size = final_stream.tell() / 1024
- final_stream.seek(0)
-
- if final_size > target_size:
- # 使用最激进的压缩参数
- aggressive_width = int(original_width * 0.5)
- aggressive_height = int(original_height * 0.5)
- aggressive_quality = 50
-
- aggressive_size, aggressive_data = get_compressed_data(aggressive_width, aggressive_height, aggressive_quality)
-
- if aggressive_size < final_size:
- final_stream = BytesIO(aggressive_data)
- final_stream.seek(0)
-
- return final_stream
-
- def _compress_image_to_bytes(self, image_stream: BytesIO, max_size_kb: int = 5000) -> bytes:
- """
- 压缩图片,将大于max_size_kb的图片压缩到max_size_kb以内,返回图片的字节流
-
- Args:
- image_stream: 图片流
- max_size_kb: 最大文件大小,单位KB
-
- Returns:
- bytes: 压缩后的图片字节流
- """
- # 创建一个临时文件名用于日志
- temp_filename = "temp_image"
-
- # 调用现有的压缩方法获取压缩后的BytesIO对象
- compressed_stream = self._compress_image(image_stream, temp_filename, max_size_kb)
-
- # 读取字节流并返回
- compressed_bytes = compressed_stream.getvalue()
- print(f"图片转换为字节流完成,字节大小为 {len(compressed_bytes)} 字节")
-
- return compressed_bytes
-
- def compress_image_bytes(self, image_bytes: bytes, max_size_kb: int = 5000) -> bytes:
- """
- 压缩图片,将大于max_size_kb的图片压缩到max_size_kb以内,返回图片的字节流
-
- Args:
- image_bytes: 图片字节流
- max_size_kb: 最大文件大小,单位KB
-
- Returns:
- bytes: 压缩后的图片字节流
- """
- print(f"开始压缩图片,原大小为 {len(image_bytes) / 1024:.2f}KB")
-
- # 将字节流转换为BytesIO对象
- image_stream = BytesIO(image_bytes)
-
- # 调用现有的压缩方法
- compressed_bytes = self._compress_image_to_bytes(image_stream, max_size_kb)
-
- print(f"图片压缩完成,压缩后大小为 {len(compressed_bytes) / 1024:.2f}KB")
-
- return compressed_bytes
-
- def _extract_page_number(self, filename: str) -> int:
- """
- 从文件名中提取页码
-
- Args:
- filename: 文件名
-
- Returns:
- int: 页码
- """
- # 提取文件名(不含路径)
- basename = os.path.basename(filename)
-
- # 使用正则表达式匹配页码
- # 匹配类似 P1, Page1, 001, 1 等格式的页码
- patterns = [
- r'P(\d+)', # P1, P123
- r'Page(\d+)', # Page1, Page123
- r'(\d+)\.(?:png|jpg|jpeg|gif)$', # 1.png, 123.jpg
- r'(\d+)_', # 1_, 123_
- ]
-
- for pattern in patterns:
- match = re.search(pattern, basename, re.IGNORECASE)
- if match:
- return int(match.group(1))
-
- # 如果没有匹配到页码,返回0
- return 0
- # 单例模式
- image_util = ImageUtil()
|