Compare commits

...

9 Commits

Author SHA1 Message Date
myhloli
ff2caf3a84 Update version.py with new version 2026-03-30 12:23:38 +00:00
Xiaomeng Zhao
be675a5a97 Merge pull request #4689 from opendatalab/dev
3.0.3
2026-03-30 20:20:43 +08:00
Xiaomeng Zhao
fa5f8d61f9 Merge pull request #4688 from myhloli/dev
feat: enhance PDF rendering with persistent executor and recycling logic
2026-03-30 20:19:51 +08:00
myhloli
a6f250b89c feat: enhance PDF rendering with persistent executor and recycling logic 2026-03-30 20:08:56 +08:00
myhloli
2bda24815a feat: implement custom PDF render executor for improved multiprocessing handling 2026-03-30 19:15:22 +08:00
myhloli
980670a649 Merge remote-tracking branch 'origin/dev' into dev 2026-03-30 18:49:58 +08:00
myhloli
84d1384c2b feat: improve task status polling with timeout handling and logging 2026-03-30 18:49:51 +08:00
Xiaomeng Zhao
de8ea317ba Merge pull request #4687 from opendatalab/master
master->dev
2026-03-30 17:31:41 +08:00
myhloli
d0d2bf920b Update version.py with new version 2026-03-30 09:29:31 +00:00
3 changed files with 103 additions and 13 deletions

View File

@@ -489,7 +489,17 @@ async def wait_for_task_result(
) -> None:
deadline = asyncio.get_running_loop().time() + timeout_seconds
while asyncio.get_running_loop().time() < deadline:
response = await client.get(submit_response.status_url)
try:
response = await client.get(submit_response.status_url)
except httpx.ReadTimeout:
logger.warning(
"Timed out while polling task status for {} (task_id={}). "
"This can happen during cold start; retrying until the task deadline.",
task_label,
submit_response.task_id,
)
await asyncio.sleep(TASK_STATUS_POLL_INTERVAL_SECONDS)
continue
if response.status_code != 200:
raise click.ClickException(
f"Failed to query task status for {task_label}: "

View File

@@ -1,6 +1,9 @@
# Copyright (c) Opendatalab. All rights reserved.
import atexit
import multiprocessing
import os
import signal
import threading
import time
from io import BytesIO
@@ -25,6 +28,7 @@ from mineru.utils.pdfium_guard import (
)
from concurrent.futures import ProcessPoolExecutor, wait, ALL_COMPLETED
from concurrent.futures.process import BrokenProcessPool
DEFAULT_PDF_IMAGE_DPI = 200
@@ -32,6 +36,9 @@ DEFAULT_PDF_IMAGE_DPI = 200
MAX_PDF_RENDER_PROCESSES = 4
MIN_PAGES_PER_RENDER_PROCESS = 30
_pdf_render_executor: ProcessPoolExecutor | None = None
_pdf_render_executor_lock = threading.Lock()
def pdf_page_to_image(
page: pdfium.PdfPage,
@@ -113,6 +120,79 @@ def _get_render_process_plan(
)
def _get_pdf_render_pool_capacity(cpu_count=None) -> int:
available_cpus = max(1, cpu_count if cpu_count is not None else (os.cpu_count() or 1))
configured_threads = max(1, get_load_images_threads())
return min(
available_cpus,
configured_threads,
MAX_PDF_RENDER_PROCESSES,
)
def _create_pdf_render_executor(max_workers: int) -> ProcessPoolExecutor:
if is_windows_environment():
return ProcessPoolExecutor(max_workers=max_workers)
start_method = multiprocessing.get_start_method()
if start_method == "fork":
logger.debug(
"PDF image rendering switches multiprocessing start method from fork to spawn"
)
return ProcessPoolExecutor(
max_workers=max_workers,
mp_context=multiprocessing.get_context("spawn"),
)
return ProcessPoolExecutor(max_workers=max_workers)
def _get_pdf_render_executor() -> ProcessPoolExecutor:
global _pdf_render_executor
with _pdf_render_executor_lock:
if _pdf_render_executor is None:
max_workers = _get_pdf_render_pool_capacity()
_pdf_render_executor = _create_pdf_render_executor(max_workers=max_workers)
logger.debug(
f"Created persistent PDF render executor with max_workers={max_workers}"
)
return _pdf_render_executor
def _recycle_pdf_render_executor(
executor: ProcessPoolExecutor | None,
*,
terminate_processes: bool,
) -> None:
global _pdf_render_executor
if executor is None:
return
with _pdf_render_executor_lock:
if _pdf_render_executor is executor:
_pdf_render_executor = None
if terminate_processes:
_terminate_executor_processes(executor)
executor.shutdown(wait=False, cancel_futures=True)
def shutdown_pdf_render_executor() -> None:
global _pdf_render_executor
with _pdf_render_executor_lock:
executor = _pdf_render_executor
_pdf_render_executor = None
if executor is not None:
executor.shutdown(wait=False, cancel_futures=True)
atexit.register(shutdown_pdf_render_executor)
def _load_images_from_pdf_bytes_range(
pdf_bytes: bytes,
dpi=DEFAULT_PDF_IMAGE_DPI,
@@ -141,8 +221,8 @@ def _load_images_from_pdf_bytes_range(
f"{start_page_id + 1}-{end_page_id + 1}: {page_ranges}"
)
executor = ProcessPoolExecutor(max_workers=actual_threads)
cancel_futures = False
executor = _get_pdf_render_executor()
recycle_executor = False
try:
futures = []
future_to_range = {}
@@ -160,8 +240,7 @@ def _load_images_from_pdf_bytes_range(
_, not_done = wait(futures, timeout=timeout, return_when=ALL_COMPLETED)
if not_done:
cancel_futures = True
_terminate_executor_processes(executor)
recycle_executor = True
raise TimeoutError(
f"PDF image rendering timeout after {timeout}s "
f"for pages {start_page_id + 1}-{end_page_id + 1}"
@@ -179,15 +258,16 @@ def _load_images_from_pdf_bytes_range(
images_list.extend(imgs)
return images_list
except Exception as exc:
cancel_futures = True
if not isinstance(exc, TimeoutError):
_terminate_executor_processes(executor)
except BrokenProcessPool:
recycle_executor = True
raise
finally:
# Block until worker processes are fully reaped so multiprocessing
# can unregister its semaphores before interpreter shutdown.
executor.shutdown(wait=True, cancel_futures=cancel_futures)
if recycle_executor:
logger.warning("Recycling persistent PDF render executor after render failure")
_recycle_pdf_render_executor(
executor,
terminate_processes=True,
)
def _terminate_executor_processes(executor):

View File

@@ -1 +1 @@
__version__ = "3.0.1"
__version__ = "3.0.3"