mirror of
https://github.com/opendatalab/MinerU.git
synced 2026-03-30 20:48:38 +07:00
Compare commits
9 Commits
mineru-3.0
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ff2caf3a84 | ||
|
|
be675a5a97 | ||
|
|
fa5f8d61f9 | ||
|
|
a6f250b89c | ||
|
|
2bda24815a | ||
|
|
980670a649 | ||
|
|
84d1384c2b | ||
|
|
de8ea317ba | ||
|
|
d0d2bf920b |
@@ -489,7 +489,17 @@ async def wait_for_task_result(
|
||||
) -> None:
|
||||
deadline = asyncio.get_running_loop().time() + timeout_seconds
|
||||
while asyncio.get_running_loop().time() < deadline:
|
||||
response = await client.get(submit_response.status_url)
|
||||
try:
|
||||
response = await client.get(submit_response.status_url)
|
||||
except httpx.ReadTimeout:
|
||||
logger.warning(
|
||||
"Timed out while polling task status for {} (task_id={}). "
|
||||
"This can happen during cold start; retrying until the task deadline.",
|
||||
task_label,
|
||||
submit_response.task_id,
|
||||
)
|
||||
await asyncio.sleep(TASK_STATUS_POLL_INTERVAL_SECONDS)
|
||||
continue
|
||||
if response.status_code != 200:
|
||||
raise click.ClickException(
|
||||
f"Failed to query task status for {task_label}: "
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
# Copyright (c) Opendatalab. All rights reserved.
|
||||
import atexit
|
||||
import multiprocessing
|
||||
import os
|
||||
import signal
|
||||
import threading
|
||||
import time
|
||||
from io import BytesIO
|
||||
|
||||
@@ -25,6 +28,7 @@ from mineru.utils.pdfium_guard import (
|
||||
)
|
||||
|
||||
from concurrent.futures import ProcessPoolExecutor, wait, ALL_COMPLETED
|
||||
from concurrent.futures.process import BrokenProcessPool
|
||||
|
||||
|
||||
DEFAULT_PDF_IMAGE_DPI = 200
|
||||
@@ -32,6 +36,9 @@ DEFAULT_PDF_IMAGE_DPI = 200
|
||||
MAX_PDF_RENDER_PROCESSES = 4
|
||||
MIN_PAGES_PER_RENDER_PROCESS = 30
|
||||
|
||||
_pdf_render_executor: ProcessPoolExecutor | None = None
|
||||
_pdf_render_executor_lock = threading.Lock()
|
||||
|
||||
|
||||
def pdf_page_to_image(
|
||||
page: pdfium.PdfPage,
|
||||
@@ -113,6 +120,79 @@ def _get_render_process_plan(
|
||||
)
|
||||
|
||||
|
||||
def _get_pdf_render_pool_capacity(cpu_count=None) -> int:
|
||||
available_cpus = max(1, cpu_count if cpu_count is not None else (os.cpu_count() or 1))
|
||||
configured_threads = max(1, get_load_images_threads())
|
||||
return min(
|
||||
available_cpus,
|
||||
configured_threads,
|
||||
MAX_PDF_RENDER_PROCESSES,
|
||||
)
|
||||
|
||||
|
||||
def _create_pdf_render_executor(max_workers: int) -> ProcessPoolExecutor:
|
||||
if is_windows_environment():
|
||||
return ProcessPoolExecutor(max_workers=max_workers)
|
||||
|
||||
start_method = multiprocessing.get_start_method()
|
||||
if start_method == "fork":
|
||||
logger.debug(
|
||||
"PDF image rendering switches multiprocessing start method from fork to spawn"
|
||||
)
|
||||
return ProcessPoolExecutor(
|
||||
max_workers=max_workers,
|
||||
mp_context=multiprocessing.get_context("spawn"),
|
||||
)
|
||||
|
||||
return ProcessPoolExecutor(max_workers=max_workers)
|
||||
|
||||
|
||||
def _get_pdf_render_executor() -> ProcessPoolExecutor:
|
||||
global _pdf_render_executor
|
||||
|
||||
with _pdf_render_executor_lock:
|
||||
if _pdf_render_executor is None:
|
||||
max_workers = _get_pdf_render_pool_capacity()
|
||||
_pdf_render_executor = _create_pdf_render_executor(max_workers=max_workers)
|
||||
logger.debug(
|
||||
f"Created persistent PDF render executor with max_workers={max_workers}"
|
||||
)
|
||||
return _pdf_render_executor
|
||||
|
||||
|
||||
def _recycle_pdf_render_executor(
|
||||
executor: ProcessPoolExecutor | None,
|
||||
*,
|
||||
terminate_processes: bool,
|
||||
) -> None:
|
||||
global _pdf_render_executor
|
||||
|
||||
if executor is None:
|
||||
return
|
||||
|
||||
with _pdf_render_executor_lock:
|
||||
if _pdf_render_executor is executor:
|
||||
_pdf_render_executor = None
|
||||
|
||||
if terminate_processes:
|
||||
_terminate_executor_processes(executor)
|
||||
executor.shutdown(wait=False, cancel_futures=True)
|
||||
|
||||
|
||||
def shutdown_pdf_render_executor() -> None:
|
||||
global _pdf_render_executor
|
||||
|
||||
with _pdf_render_executor_lock:
|
||||
executor = _pdf_render_executor
|
||||
_pdf_render_executor = None
|
||||
|
||||
if executor is not None:
|
||||
executor.shutdown(wait=False, cancel_futures=True)
|
||||
|
||||
|
||||
atexit.register(shutdown_pdf_render_executor)
|
||||
|
||||
|
||||
def _load_images_from_pdf_bytes_range(
|
||||
pdf_bytes: bytes,
|
||||
dpi=DEFAULT_PDF_IMAGE_DPI,
|
||||
@@ -141,8 +221,8 @@ def _load_images_from_pdf_bytes_range(
|
||||
f"{start_page_id + 1}-{end_page_id + 1}: {page_ranges}"
|
||||
)
|
||||
|
||||
executor = ProcessPoolExecutor(max_workers=actual_threads)
|
||||
cancel_futures = False
|
||||
executor = _get_pdf_render_executor()
|
||||
recycle_executor = False
|
||||
try:
|
||||
futures = []
|
||||
future_to_range = {}
|
||||
@@ -160,8 +240,7 @@ def _load_images_from_pdf_bytes_range(
|
||||
|
||||
_, not_done = wait(futures, timeout=timeout, return_when=ALL_COMPLETED)
|
||||
if not_done:
|
||||
cancel_futures = True
|
||||
_terminate_executor_processes(executor)
|
||||
recycle_executor = True
|
||||
raise TimeoutError(
|
||||
f"PDF image rendering timeout after {timeout}s "
|
||||
f"for pages {start_page_id + 1}-{end_page_id + 1}"
|
||||
@@ -179,15 +258,16 @@ def _load_images_from_pdf_bytes_range(
|
||||
images_list.extend(imgs)
|
||||
|
||||
return images_list
|
||||
except Exception as exc:
|
||||
cancel_futures = True
|
||||
if not isinstance(exc, TimeoutError):
|
||||
_terminate_executor_processes(executor)
|
||||
except BrokenProcessPool:
|
||||
recycle_executor = True
|
||||
raise
|
||||
finally:
|
||||
# Block until worker processes are fully reaped so multiprocessing
|
||||
# can unregister its semaphores before interpreter shutdown.
|
||||
executor.shutdown(wait=True, cancel_futures=cancel_futures)
|
||||
if recycle_executor:
|
||||
logger.warning("Recycling persistent PDF render executor after render failure")
|
||||
_recycle_pdf_render_executor(
|
||||
executor,
|
||||
terminate_processes=True,
|
||||
)
|
||||
|
||||
|
||||
def _terminate_executor_processes(executor):
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = "3.0.1"
|
||||
__version__ = "3.0.3"
|
||||
|
||||
Reference in New Issue
Block a user