excel_util.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. """
  2. Excel 工具类
  3. 该文件提供 Excel 解析功能,支持:
  4. - 解析 .xlsx 和 .xls 格式的 Excel 文件
  5. - 将 Excel 数据转换为 List[Dict[str, Any]] 格式
  6. - 支持指定工作表
  7. - 支持自定义表头行
  8. """
  9. from typing import List, Dict, Any, Optional
  10. import os
  11. try:
  12. from openpyxl import load_workbook
  13. from openpyxl.worksheet.worksheet import Worksheet
  14. except ImportError:
  15. raise ImportError("openpyxl 库未安装,请使用 pip install openpyxl 安装")
  16. try:
  17. import xlrd
  18. from xlrd.sheet import Sheet
  19. except ImportError:
  20. raise ImportError("xlrd 库未安装,请使用 pip install xlrd 安装")
  21. class ExcelUtil:
  22. """
  23. Excel 工具类,用于解析 Excel 文件
  24. """
  25. @staticmethod
  26. def _parse_xlsx(file_path: str, sheet_name: Optional[str] = None, header_row: int = 0) -> List[Dict[str, Any]]:
  27. """
  28. 解析 .xlsx 格式的 Excel 文件
  29. Args:
  30. file_path: Excel 文件路径
  31. sheet_name: 工作表名称,None 表示使用第一个工作表
  32. header_row: 表头所在行索引,默认第 0 行
  33. Returns:
  34. List[Dict[str, Any]]: 解析后的 Excel 数据
  35. """
  36. # 加载工作簿
  37. workbook = load_workbook(filename=file_path, read_only=True)
  38. # 获取工作表
  39. if sheet_name:
  40. sheet: Worksheet = workbook[sheet_name]
  41. else:
  42. sheet: Worksheet = workbook.active
  43. # 获取所有行数据
  44. rows = list(sheet.iter_rows(values_only=True))
  45. # 检查数据是否为空
  46. if not rows:
  47. return []
  48. # 检查表头行索引是否合法
  49. if header_row >= len(rows):
  50. raise ValueError(f"表头行索引 {header_row} 超出数据总行数 {len(rows)}")
  51. # 获取表头
  52. headers = rows[header_row]
  53. # 检查表头是否为空
  54. if not headers:
  55. return []
  56. # 解析数据行
  57. data: List[Dict[str, Any]] = []
  58. for row in rows[header_row + 1:]:
  59. if not row or all(cell is None for cell in row):
  60. continue
  61. # 创建数据字典
  62. row_data: Dict[str, Any] = {}
  63. for i, header in enumerate(headers):
  64. if i < len(row):
  65. row_data[header] = row[i]
  66. else:
  67. row_data[header] = None
  68. data.append(row_data)
  69. workbook.close()
  70. return data
  71. @staticmethod
  72. def _parse_xls(file_path: str, sheet_name: Optional[str] = None, header_row: int = 0) -> List[Dict[str, Any]]:
  73. """
  74. 解析 .xls 格式的 Excel 文件
  75. Args:
  76. file_path: Excel 文件路径
  77. sheet_name: 工作表名称,None 表示使用第一个工作表
  78. header_row: 表头所在行索引,默认第 0 行
  79. Returns:
  80. List[Dict[str, Any]]: 解析后的 Excel 数据
  81. """
  82. # 打开工作簿
  83. workbook = xlrd.open_workbook(file_path)
  84. # 获取工作表
  85. if sheet_name:
  86. sheet: Sheet = workbook.sheet_by_name(sheet_name)
  87. else:
  88. sheet: Sheet = workbook.sheet_by_index(0)
  89. # 获取所有行数据
  90. rows = []
  91. for i in range(sheet.nrows):
  92. rows.append(sheet.row_values(i))
  93. # 检查数据是否为空
  94. if not rows:
  95. return []
  96. # 检查表头行索引是否合法
  97. if header_row >= len(rows):
  98. raise ValueError(f"表头行索引 {header_row} 超出数据总行数 {len(rows)}")
  99. # 获取表头
  100. headers = rows[header_row]
  101. # 检查表头是否为空
  102. if not headers:
  103. return []
  104. # 解析数据行
  105. data: List[Dict[str, Any]] = []
  106. for row in rows[header_row + 1:]:
  107. if not row or all(cell == '' or cell is None for cell in row):
  108. continue
  109. # 创建数据字典
  110. row_data: Dict[str, Any] = {}
  111. for i, header in enumerate(headers):
  112. if i < len(row):
  113. row_data[header] = row[i]
  114. else:
  115. row_data[header] = None
  116. data.append(row_data)
  117. return data
  118. @staticmethod
  119. def parse_excel(file_path: str, sheet_name: Optional[str] = None, header_row: int = 0) -> List[Dict[str, Any]]:
  120. """
  121. 解析 Excel 文件,支持 .xlsx 和 .xls 格式
  122. Args:
  123. file_path: Excel 文件路径
  124. sheet_name: 工作表名称,None 表示使用第一个工作表
  125. header_row: 表头所在行索引,默认第 0 行
  126. Returns:
  127. List[Dict[str, Any]]: 解析后的 Excel 数据
  128. Raises:
  129. ValueError: 不支持的文件格式
  130. FileNotFoundError: 文件不存在
  131. """
  132. # 检查文件是否存在
  133. if not os.path.exists(file_path):
  134. raise FileNotFoundError(f"文件 {file_path} 不存在")
  135. # 获取文件扩展名
  136. file_ext = os.path.splitext(file_path)[1].lower()
  137. # 根据文件格式选择解析方法
  138. if file_ext == '.xlsx':
  139. return ExcelUtil._parse_xlsx(file_path, sheet_name, header_row)
  140. elif file_ext == '.xls':
  141. return ExcelUtil._parse_xls(file_path, sheet_name, header_row)
  142. else:
  143. raise ValueError(f"不支持的文件格式 {file_ext},只支持 .xlsx 和 .xls 格式")
  144. @staticmethod
  145. def get_sheet_names(file_path: str) -> List[str]:
  146. """
  147. 获取 Excel 文件中的所有工作表名称
  148. Args:
  149. file_path: Excel 文件路径
  150. Returns:
  151. List[str]: 工作表名称列表
  152. Raises:
  153. ValueError: 不支持的文件格式
  154. FileNotFoundError: 文件不存在
  155. """
  156. # 检查文件是否存在
  157. if not os.path.exists(file_path):
  158. raise FileNotFoundError(f"文件 {file_path} 不存在")
  159. # 获取文件扩展名
  160. file_ext = os.path.splitext(file_path)[1].lower()
  161. # 根据文件格式选择方法
  162. if file_ext == '.xlsx':
  163. workbook = load_workbook(filename=file_path, read_only=True)
  164. sheet_names = workbook.sheetnames
  165. workbook.close()
  166. return sheet_names
  167. elif file_ext == '.xls':
  168. workbook = xlrd.open_workbook(file_path)
  169. sheet_names = workbook.sheet_names()
  170. return sheet_names
  171. else:
  172. raise ValueError(f"不支持的文件格式 {file_ext},只支持 .xlsx 和 .xls 格式")
  173. # 单例模式实例
  174. excel_util = ExcelUtil()