#!/usr/bin/env python3 """ 图片处理工具类 """ import os import zipfile import re from typing import List from io import BytesIO from PIL import Image from utils.file.minio.minio_util import MinIOUtil from utils.file.file_utils import generate_unique_filename class ImageUtil: """图片处理工具类""" def __init__(self): """初始化图片处理工具类""" self.minio_util = MinIOUtil() # 将图片url转换为Image对象 def _url_to_image(self, image_url: str) -> Image.Image: """ 将图片url转换为Image对象 Args: image_url: 图片url Returns: Image.Image: 图片对象 """ import requests # 处理image_url为image: Image.Image if isinstance(image_url, str): # 下载图片 response = requests.get(image_url) response.raise_for_status() # 检查HTTP状态码 # 将响应内容转换为字节流 image_bytes = BytesIO(response.content) # 创建Image对象 image = Image.open(image_bytes) return image def process_image_zip(self, zip_file_path: str, book_name: str) -> List[str]: """ 处理图片压缩包,解压并将图片存入minio,返回按页码顺序排序的url集合 Args: zip_file_path: 图片压缩包路径 book_name: 书名,用于生成图片文件名 Returns: List[str]: 按页码顺序排序的minio url集合 """ print(f"开始处理图片压缩包: {zip_file_path}") # 用于存储图片信息的列表,格式: (页码, url) image_info_list = [] try: # 打开并解压压缩包 with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: # 获取压缩包中的所有文件名 all_files = zip_ref.namelist() # 过滤出图片文件 image_files = [ f for f in all_files if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif')) and not f.startswith('__MACOSX') # 排除macOS生成的隐藏文件 ] print(f"找到 {len(image_files)} 张图片") # 遍历所有图片文件 for image_file in image_files: try: # 从文件名中提取页码 page_num = self._extract_page_number(image_file) # 生成符合要求的文件名:书名_页码,例如:莉莉兰的小虫虫_P1.png _, ext = os.path.splitext(image_file) new_filename = f"{book_name}_P{page_num}{ext}" print(f"处理图片: {image_file} -> {new_filename}, 页码: {page_num}") # 读取图片文件内容 with zip_ref.open(image_file) as f: image_content = f.read() # 将图片内容转换为BytesIO流 image_stream = BytesIO(image_content) # 压缩图片 compressed_stream = self._compress_image(image_stream, new_filename) # 上传到minio image_url = self.minio_util.upload_file(compressed_stream, new_filename) # 添加到图片信息列表 image_info_list.append((page_num, image_url)) except Exception as e: print(f"处理图片 {image_file} 失败: {str(e)}") continue # 按页码顺序排序 image_info_list.sort(key=lambda x: x[0]) # 提取url列表 image_urls = [url for _, url in image_info_list] print(f"图片压缩包处理完成,共处理 {len(image_urls)} 张图片") return image_urls except Exception as e: print(f"处理图片压缩包失败: {str(e)}") raise def _compress_image(self, image_stream: BytesIO, original_filename: str, max_size_kb: int = 5000) -> BytesIO: """ 压缩图片,确保最终压缩大小不超过max_size_kb 使用尺寸调整和质量调整结合的方式,确保压缩效果 Args: image_stream: 图片流 original_filename: 原始文件名 max_size_kb: 最大文件大小,单位KB Returns: BytesIO: 压缩后的图片流 """ # 检查图片大小 image_stream.seek(0, 2) # 移动到文件末尾 current_size = image_stream.tell() / 1024 # 当前大小,单位KB original_stream_data = image_stream.getvalue() # 保存原始流数据 image_stream.seek(0) # 回到文件开头 # 严格目标大小,使用max_size_kb作为目标 target_size = max_size_kb # 如果当前大小小于等于目标大小,直接返回 if current_size <= target_size: return image_stream # 打开图片 img = Image.open(image_stream) original_width, original_height = img.size # 获取原始图片格式 original_format = img.format or 'JPEG' # 默认使用JPEG格式 # 使用LANCZOS高质量重采样算法 resample_method = Image.Resampling.LANCZOS # 保存最佳结果 best_result = None best_size = float('inf') # 辅助函数:获取指定尺寸和质量的压缩大小和字节流 def get_compressed_data(width, height, quality_val): """ 获取指定尺寸和质量的压缩大小和字节流 """ # 调整图片尺寸 resized_img = img.resize((width, height), resample_method) # 保存调整后的图片 compressed_stream = BytesIO() resized_img.save(compressed_stream, format=original_format, quality=quality_val) compressed_stream.seek(0, 2) compressed_size = compressed_stream.tell() / 1024 compressed_stream.seek(0) return compressed_size, compressed_stream.getvalue() # 主要压缩逻辑:逐步缩小尺寸和降低质量,直到符合要求 # 尺寸调整为主,质量调整为辅 sizes_to_try = [] # 生成要尝试的尺寸列表(从原始尺寸开始,逐步缩小) current_try_width, current_try_height = original_width, original_height for i in range(15): # 最多尝试15种尺寸 sizes_to_try.append((current_try_width, current_try_height)) # 每次缩小10% current_try_width = int(current_try_width * 0.9) current_try_height = int(current_try_height * 0.9) # 确保尺寸不小于原始尺寸的30% if current_try_width < original_width * 0.3 or current_try_height < original_height * 0.3: break # 质量级别列表(从高质量开始,逐步降低) quality_levels = [90, 85, 80, 75, 70, 65, 60] # 遍历所有尺寸和质量组合,寻找最佳结果 for width, height in sizes_to_try: for quality in quality_levels: # 获取当前参数的压缩数据 compressed_size, compressed_data = get_compressed_data(width, height, quality) # 更新最佳结果 if compressed_size < best_size: best_result = compressed_data best_size = compressed_size # 如果已经达到目标大小,直接返回 if compressed_size <= target_size: final_stream = BytesIO(compressed_data) final_stream.seek(0) return final_stream # 如果没有找到符合要求的结果,使用最佳结果 if best_result is None: # 返回原始图片 return BytesIO(original_stream_data) # 最终检查:如果最佳结果仍超过目标,使用最激进的压缩 final_stream = BytesIO(best_result) final_stream.seek(0, 2) final_size = final_stream.tell() / 1024 final_stream.seek(0) if final_size > target_size: # 使用最激进的压缩参数 aggressive_width = int(original_width * 0.5) aggressive_height = int(original_height * 0.5) aggressive_quality = 50 aggressive_size, aggressive_data = get_compressed_data(aggressive_width, aggressive_height, aggressive_quality) if aggressive_size < final_size: final_stream = BytesIO(aggressive_data) final_stream.seek(0) return final_stream def _compress_image_to_bytes(self, image_stream: BytesIO, max_size_kb: int = 5000) -> bytes: """ 压缩图片,将大于max_size_kb的图片压缩到max_size_kb以内,返回图片的字节流 Args: image_stream: 图片流 max_size_kb: 最大文件大小,单位KB Returns: bytes: 压缩后的图片字节流 """ # 创建一个临时文件名用于日志 temp_filename = "temp_image" # 调用现有的压缩方法获取压缩后的BytesIO对象 compressed_stream = self._compress_image(image_stream, temp_filename, max_size_kb) # 读取字节流并返回 compressed_bytes = compressed_stream.getvalue() print(f"图片转换为字节流完成,字节大小为 {len(compressed_bytes)} 字节") return compressed_bytes def compress_image_bytes(self, image_bytes: bytes, max_size_kb: int = 5000) -> bytes: """ 压缩图片,将大于max_size_kb的图片压缩到max_size_kb以内,返回图片的字节流 Args: image_bytes: 图片字节流 max_size_kb: 最大文件大小,单位KB Returns: bytes: 压缩后的图片字节流 """ print(f"开始压缩图片,原大小为 {len(image_bytes) / 1024:.2f}KB") # 将字节流转换为BytesIO对象 image_stream = BytesIO(image_bytes) # 调用现有的压缩方法 compressed_bytes = self._compress_image_to_bytes(image_stream, max_size_kb) print(f"图片压缩完成,压缩后大小为 {len(compressed_bytes) / 1024:.2f}KB") return compressed_bytes def _extract_page_number(self, filename: str) -> int: """ 从文件名中提取页码 Args: filename: 文件名 Returns: int: 页码 """ # 提取文件名(不含路径) basename = os.path.basename(filename) # 使用正则表达式匹配页码 # 匹配类似 P1, Page1, 001, 1 等格式的页码 patterns = [ r'P(\d+)', # P1, P123 r'Page(\d+)', # Page1, Page123 r'(\d+)\.(?:png|jpg|jpeg|gif)$', # 1.png, 123.jpg r'(\d+)_', # 1_, 123_ ] for pattern in patterns: match = re.search(pattern, basename, re.IGNORECASE) if match: return int(match.group(1)) # 如果没有匹配到页码,返回0 return 0 # 单例模式 image_util = ImageUtil()