# Copyright (c) Opendatalab. All rights reserved. import os import signal import time from io import BytesIO import numpy as np import pypdfium2 as pdfium from loguru import logger from PIL import Image, ImageOps from mineru.data.data_reader_writer import FileBasedDataWriter from mineru.utils.check_sys_env import is_windows_environment from mineru.utils.os_env_config import get_load_images_timeout, get_load_images_threads from mineru.utils.pdf_reader import image_to_b64str, image_to_bytes, page_to_image from mineru.utils.enum_class import ImageType from mineru.utils.hash_utils import str_sha256 from mineru.utils.pdf_page_id import get_end_page_id from concurrent.futures import ProcessPoolExecutor, wait, ALL_COMPLETED def pdf_page_to_image(page: pdfium.PdfPage, dpi=200, image_type=ImageType.PIL) -> dict: """Convert pdfium.PdfDocument to image, Then convert the image to base64. Args: page (_type_): pdfium.PdfPage dpi (int, optional): reset the dpi of dpi. Defaults to 200. image_type (ImageType, optional): The type of image to return. Defaults to ImageType.PIL. Returns: dict: {'img_base64': str, 'img_pil': pil_img, 'scale': float } """ pil_img, scale = page_to_image(page, dpi=dpi) image_dict = { "scale": scale, } if image_type == ImageType.BASE64: image_dict["img_base64"] = image_to_b64str(pil_img) else: image_dict["img_pil"] = pil_img return image_dict def _load_images_from_pdf_worker( pdf_bytes, dpi, start_page_id, end_page_id, image_type ): """用于进程池的包装函数""" return load_images_from_pdf_core( pdf_bytes, dpi, start_page_id, end_page_id, image_type ) def load_images_from_pdf( pdf_bytes: bytes, dpi=200, start_page_id=0, end_page_id=None, image_type=ImageType.PIL, timeout=None, threads=None, ): """带超时控制的 PDF 转图片函数,支持多进程加速 Args: pdf_bytes (bytes): PDF 文件的 bytes dpi (int, optional): reset the dpi of dpi. Defaults to 200. start_page_id (int, optional): 起始页码. Defaults to 0. end_page_id (int | None, optional): 结束页码. Defaults to None. image_type (ImageType, optional): 图片类型. Defaults to ImageType.PIL. timeout (int | None, optional): 超时时间(秒)。如果为 None,则从环境变量 MINERU_PDF_RENDER_TIMEOUT 读取,若未设置则默认为 300 秒。 threads (int): 进程数, 如果为 None,则从环境变量 MINERU_PDF_RENDER_THREADS 读取,若未设置则默认为 4. Raises: TimeoutError: 当转换超时时抛出 """ pdf_doc = pdfium.PdfDocument(pdf_bytes) if is_windows_environment(): # Windows 环境下不使用多进程 return load_images_from_pdf_core( pdf_bytes, dpi, start_page_id, get_end_page_id(end_page_id, len(pdf_doc)), image_type, ), pdf_doc else: if timeout is None: timeout = get_load_images_timeout() if threads is None: threads = get_load_images_threads() end_page_id = get_end_page_id(end_page_id, len(pdf_doc)) # 计算总页数 total_pages = end_page_id - start_page_id + 1 # 实际使用的进程数不超过总页数 actual_threads = min(os.cpu_count() or 1, threads, total_pages) # 根据实际进程数分组页面范围 pages_per_thread = max(1, total_pages // actual_threads) page_ranges = [] for i in range(actual_threads): range_start = start_page_id + i * pages_per_thread if i == actual_threads - 1: # 最后一个进程处理剩余所有页面 range_end = end_page_id else: range_end = start_page_id + (i + 1) * pages_per_thread - 1 page_ranges.append((range_start, range_end)) logger.debug(f"PDF to images using {actual_threads} processes, page ranges: {page_ranges}") executor = ProcessPoolExecutor(max_workers=actual_threads) try: # 提交所有任务 futures = [] future_to_range = {} for range_start, range_end in page_ranges: future = executor.submit( _load_images_from_pdf_worker, pdf_bytes, dpi, range_start, range_end, image_type, ) futures.append(future) future_to_range[future] = range_start # 使用 wait() 设置单一全局超时 done, not_done = wait(futures, timeout=timeout, return_when=ALL_COMPLETED) # 检查是否有未完成的任务(超时情况) if not_done: # 超时:强制终止所有子进程 _terminate_executor_processes(executor) pdf_doc.close() raise TimeoutError(f"PDF to images conversion timeout after {timeout}s") # 所有任务完成,收集结果 all_results = [] for future in futures: range_start = future_to_range[future] # 这里不需要 timeout,因为任务已完成 images_list = future.result() all_results.append((range_start, images_list)) # 按起始页码排序并合并结果 all_results.sort(key=lambda x: x[0]) images_list = [] for _, imgs in all_results: images_list.extend(imgs) return images_list, pdf_doc except Exception as e: # 发生任何异常时,确保清理子进程 _terminate_executor_processes(executor) pdf_doc.close() if isinstance(e, TimeoutError): raise raise finally: executor.shutdown(wait=False, cancel_futures=True) def _terminate_executor_processes(executor): """强制终止 ProcessPoolExecutor 中的所有子进程""" if hasattr(executor, '_processes'): for pid, process in executor._processes.items(): if process.is_alive(): try: # 先发送 SIGTERM 允许优雅退出 os.kill(pid, signal.SIGTERM) except (ProcessLookupError, OSError): pass # 给子进程一点时间响应 SIGTERM time.sleep(0.1) # 对仍然存活的进程发送 SIGKILL 强制终止 for pid, process in executor._processes.items(): if process.is_alive(): try: os.kill(pid, signal.SIGKILL) except (ProcessLookupError, OSError): pass def load_images_from_pdf_core( pdf_bytes: bytes, dpi=200, start_page_id=0, end_page_id=None, image_type=ImageType.PIL, # PIL or BASE64 ): images_list = [] pdf_doc = pdfium.PdfDocument(pdf_bytes) pdf_page_num = len(pdf_doc) end_page_id = get_end_page_id(end_page_id, pdf_page_num) for index in range(start_page_id, end_page_id + 1): # logger.debug(f"Converting page {index}/{pdf_page_num} to image") page = pdf_doc[index] image_dict = pdf_page_to_image(page, dpi=dpi, image_type=image_type) images_list.append(image_dict) pdf_doc.close() return images_list def cut_image( bbox: tuple, page_num: int, page_pil_img, return_path, image_writer: FileBasedDataWriter, scale=2, ): """从第page_num页的page中,根据bbox进行裁剪出一张jpg图片,返回图片路径 save_path:需要同时支持s3和本地, 图片存放在save_path下,文件名是: {page_num}_{bbox[0]}_{bbox[1]}_{bbox[2]}_{bbox[3]}.jpg , bbox内数字取整。""" # 拼接文件名 filename = f"{page_num}_{int(bbox[0])}_{int(bbox[1])}_{int(bbox[2])}_{int(bbox[3])}" # 老版本返回不带bucket的路径 img_path = f"{return_path}_{filename}" if return_path is not None else None # 新版本生成平铺路径 img_hash256_path = f"{str_sha256(img_path)}.jpg" # img_hash256_path = f'{img_path}.jpg' crop_img = get_crop_img(bbox, page_pil_img, scale=scale) img_bytes = image_to_bytes(crop_img, image_format="JPEG") image_writer.write(img_hash256_path, img_bytes) return img_hash256_path def get_crop_img(bbox: tuple, pil_img, scale=2): scale_bbox = ( int(bbox[0] * scale), int(bbox[1] * scale), int(bbox[2] * scale), int(bbox[3] * scale), ) return pil_img.crop(scale_bbox) def get_crop_np_img(bbox: tuple, input_img, scale=2): if isinstance(input_img, Image.Image): np_img = np.asarray(input_img) elif isinstance(input_img, np.ndarray): np_img = input_img else: raise ValueError("Input must be a pillow object or a numpy array.") scale_bbox = ( int(bbox[0] * scale), int(bbox[1] * scale), int(bbox[2] * scale), int(bbox[3] * scale), ) return np_img[scale_bbox[1] : scale_bbox[3], scale_bbox[0] : scale_bbox[2]] def images_bytes_to_pdf_bytes(image_bytes): # 内存缓冲区 pdf_buffer = BytesIO() # 载入并转换所有图像为 RGB 模式 image = Image.open(BytesIO(image_bytes)) # 根据 EXIF 信息自动转正(处理手机拍摄的带 Orientation 标记的图片) image = ImageOps.exif_transpose(image) or image # 只在必要时转换 if image.mode != "RGB": image = image.convert("RGB") # 第一张图保存为 PDF,其余追加 image.save( pdf_buffer, format="PDF", # save_all=True ) # 获取 PDF bytes 并重置指针(可选) pdf_bytes = pdf_buffer.getvalue() pdf_buffer.close() return pdf_bytes