feat: add support for configurable thread count in PDF rendering

This commit is contained in:
myhloli
2026-02-02 19:21:09 +08:00
parent aa960b105a
commit 951ebd8c04
2 changed files with 75 additions and 23 deletions

View File

@@ -11,6 +11,10 @@ def get_load_images_timeout() -> int:
return get_value_from_string(env_value, 300)
def get_load_images_threads() -> int:
env_value = os.getenv('MINERU_PDF_RENDER_THREADS', None)
return get_value_from_string(env_value, 4)
def get_value_from_string(env_value: str, default_value: int) -> int:
if env_value is not None:
try:

View File

@@ -1,5 +1,7 @@
# Copyright (c) Opendatalab. All rights reserved.
import os
import signal
import time
from io import BytesIO
import numpy as np
@@ -9,13 +11,13 @@ from PIL import Image, ImageOps
from mineru.data.data_reader_writer import FileBasedDataWriter
from mineru.utils.check_sys_env import is_windows_environment
from mineru.utils.os_env_config import get_load_images_timeout
from mineru.utils.os_env_config import get_load_images_timeout, get_load_images_threads
from mineru.utils.pdf_reader import image_to_b64str, image_to_bytes, page_to_image
from mineru.utils.enum_class import ImageType
from mineru.utils.hash_utils import str_sha256
from mineru.utils.pdf_page_id import get_end_page_id
from concurrent.futures import ProcessPoolExecutor, TimeoutError as FuturesTimeoutError
from concurrent.futures import ProcessPoolExecutor, wait, ALL_COMPLETED
def pdf_page_to_image(page: pdfium.PdfPage, dpi=200, image_type=ImageType.PIL) -> dict:
@@ -57,7 +59,7 @@ def load_images_from_pdf(
end_page_id=None,
image_type=ImageType.PIL,
timeout=None,
threads=4,
threads=None,
):
"""带超时控制的 PDF 转图片函数,支持多进程加速
@@ -67,8 +69,8 @@ def load_images_from_pdf(
start_page_id (int, optional): 起始页码. Defaults to 0.
end_page_id (int | None, optional): 结束页码. Defaults to None.
image_type (ImageType, optional): 图片类型. Defaults to ImageType.PIL.
timeout (int | None, optional): 超时时间(秒)。如果为 None则从环境变量 MINERU_PDF_LOAD_IMAGES_TIMEOUT 读取,若未设置则默认为 300 秒。
threads (int): 进程数,默认 4
timeout (int | None, optional): 超时时间(秒)。如果为 None则从环境变量 MINERU_PDF_RENDER_TIMEOUT 读取,若未设置则默认为 300 秒。
threads (int): 进程数, 如果为 None则从环境变量 MINERU_PDF_RENDER_THREADS 读取,若未设置则默认 4.
Raises:
TimeoutError: 当转换超时时抛出
@@ -86,6 +88,9 @@ def load_images_from_pdf(
else:
if timeout is None:
timeout = get_load_images_timeout()
if threads is None:
threads = get_load_images_threads()
end_page_id = get_end_page_id(end_page_id, len(pdf_doc))
# 计算总页数
@@ -108,11 +113,13 @@ def load_images_from_pdf(
page_ranges.append((range_start, range_end))
# logger.debug(f"PDF to images using {actual_threads} processes, page ranges: {page_ranges}")
logger.debug(f"PDF to images using {actual_threads} processes, page ranges: {page_ranges}")
with ProcessPoolExecutor(max_workers=actual_threads) as executor:
executor = ProcessPoolExecutor(max_workers=actual_threads)
try:
# 提交所有任务
futures = []
future_to_range = {}
for range_start, range_end in page_ranges:
future = executor.submit(
_load_images_from_pdf_worker,
@@ -122,27 +129,68 @@ def load_images_from_pdf(
range_end,
image_type,
)
futures.append((range_start, future))
futures.append(future)
future_to_range[future] = range_start
try:
# 收集结果并按页码排序
all_results = []
for range_start, future in futures:
images_list = future.result(timeout=timeout)
all_results.append((range_start, images_list))
# 使用 wait() 设置单一全局超时
done, not_done = wait(futures, timeout=timeout, return_when=ALL_COMPLETED)
# 按起始页码排序并合并结果
all_results.sort(key=lambda x: x[0])
images_list = []
for _, imgs in all_results:
images_list.extend(imgs)
return images_list, pdf_doc
except FuturesTimeoutError:
# 检查是否有未完成的任务(超时情况)
if not_done:
# 超时:强制终止所有子进程
_terminate_executor_processes(executor)
pdf_doc.close()
executor.shutdown(wait=False, cancel_futures=True)
raise TimeoutError(f"PDF to images conversion timeout after {timeout}s")
# 所有任务完成,收集结果
all_results = []
for future in futures:
range_start = future_to_range[future]
# 这里不需要 timeout因为任务已完成
images_list = future.result()
all_results.append((range_start, images_list))
# 按起始页码排序并合并结果
all_results.sort(key=lambda x: x[0])
images_list = []
for _, imgs in all_results:
images_list.extend(imgs)
return images_list, pdf_doc
except Exception as e:
# 发生任何异常时,确保清理子进程
_terminate_executor_processes(executor)
pdf_doc.close()
if isinstance(e, TimeoutError):
raise
raise
finally:
executor.shutdown(wait=False, cancel_futures=True)
def _terminate_executor_processes(executor):
"""强制终止 ProcessPoolExecutor 中的所有子进程"""
if hasattr(executor, '_processes'):
for pid, process in executor._processes.items():
if process.is_alive():
try:
# 先发送 SIGTERM 允许优雅退出
os.kill(pid, signal.SIGTERM)
except (ProcessLookupError, OSError):
pass
# 给子进程一点时间响应 SIGTERM
time.sleep(0.1)
# 对仍然存活的进程发送 SIGKILL 强制终止
for pid, process in executor._processes.items():
if process.is_alive():
try:
os.kill(pid, signal.SIGKILL)
except (ProcessLookupError, OSError):
pass
def load_images_from_pdf_core(
pdf_bytes: bytes,