From 181277fed6b73805168cc5ec632b62e9836079c8 Mon Sep 17 00:00:00 2001 From: myhloli Date: Thu, 26 Mar 2026 11:42:18 +0800 Subject: [PATCH] feat: enhance concurrent request handling with configurable limits --- mineru/cli/api_protocol.py | 8 ++++++-- mineru/cli/client.py | 31 ++++++++++++++++++++++++++++--- mineru/cli/fast_api.py | 12 +++++------- mineru/utils/config_reader.py | 14 ++++++++++++++ 4 files changed, 53 insertions(+), 12 deletions(-) diff --git a/mineru/cli/api_protocol.py b/mineru/cli/api_protocol.py index fe7a8846..c59b57f6 100644 --- a/mineru/cli/api_protocol.py +++ b/mineru/cli/api_protocol.py @@ -1,4 +1,8 @@ -from mineru.utils.config_reader import get_processing_window_size +from mineru.utils.config_reader import ( + get_max_concurrent_requests, + get_processing_window_size, +) API_PROTOCOL_VERSION = 1 -DEFAULT_PROCESSING_WINDOW_SIZE = get_processing_window_size() +DEFAULT_MAX_CONCURRENT_REQUESTS = get_max_concurrent_requests(default=3) +DEFAULT_PROCESSING_WINDOW_SIZE = get_processing_window_size(default=64) diff --git a/mineru/cli/client.py b/mineru/cli/client.py index ec70edde..b7fecd4d 100644 --- a/mineru/cli/client.py +++ b/mineru/cli/client.py @@ -24,7 +24,9 @@ from loguru import logger from mineru.cli.api_protocol import ( API_PROTOCOL_VERSION, + DEFAULT_MAX_CONCURRENT_REQUESTS, DEFAULT_PROCESSING_WINDOW_SIZE, + get_max_concurrent_requests as read_max_concurrent_requests, ) from mineru.utils.guess_suffix_or_lang import guess_suffix_by_path from mineru.utils.pdf_page_id import get_end_page_id @@ -57,7 +59,6 @@ TASK_RESULT_TIMEOUT_SECONDS = 600 LOCAL_API_STARTUP_TIMEOUT_SECONDS = 30 LOCAL_API_CLEANUP_RETRIES = 8 LOCAL_API_CLEANUP_RETRY_INTERVAL_SECONDS = 0.25 -LOCAL_API_MAX_CONCURRENT_REQUESTS = 3 @dataclass(frozen=True) @@ -278,7 +279,7 @@ class LocalAPIServer: env = os.environ.copy() env["MINERU_API_OUTPUT_ROOT"] = str(self.output_root) env["MINERU_API_MAX_CONCURRENT_REQUESTS"] = str( - LOCAL_API_MAX_CONCURRENT_REQUESTS + read_max_concurrent_requests(default=DEFAULT_MAX_CONCURRENT_REQUESTS) ) env["MINERU_API_DISABLE_ACCESS_LOG"] = "1" self.output_root.mkdir(parents=True, exist_ok=True) @@ -955,6 +956,19 @@ def resolve_submit_concurrency(max_concurrent_requests: int, task_count: int) -> return max(1, min(max_concurrent_requests, task_count)) +def resolve_effective_max_concurrent_requests( + local_max: int, + server_max: int, +) -> int: + if local_max > 0 and server_max > 0: + return min(local_max, server_max) + if local_max > 0: + return local_max + if server_max > 0: + return server_max + return 0 + + async def execute_planned_tasks( planned_tasks: list[PlannedTask], concurrency: int, @@ -1100,11 +1114,22 @@ async def run_orchestrated_cli( base_url = local_server.start() logger.info(f"Started local mineru-api at {base_url}") server_health = await wait_for_local_api_ready(http_client, local_server) + effective_max_concurrent_requests = ( + server_health.max_concurrent_requests + ) else: server_health = await fetch_server_health( http_client, normalize_base_url(api_url), ) + effective_max_concurrent_requests = ( + resolve_effective_max_concurrent_requests( + read_max_concurrent_requests( + default=DEFAULT_MAX_CONCURRENT_REQUESTS + ), + server_health.max_concurrent_requests, + ) + ) live_renderer = create_live_task_status_renderer(api_url) planned_tasks = plan_tasks( @@ -1116,7 +1141,7 @@ async def run_orchestrated_cli( ) progress = build_task_execution_progress(planned_tasks) concurrency = resolve_submit_concurrency( - server_health.max_concurrent_requests, + effective_max_concurrent_requests, len(planned_tasks), ) form_data = build_request_form_data( diff --git a/mineru/cli/fast_api.py b/mineru/cli/fast_api.py index 7571f36b..4a9376da 100644 --- a/mineru/cli/fast_api.py +++ b/mineru/cli/fast_api.py @@ -44,7 +44,9 @@ from mineru.cli.common import ( from mineru.cli.output_paths import resolve_parse_dir from mineru.cli.api_protocol import ( API_PROTOCOL_VERSION, + DEFAULT_MAX_CONCURRENT_REQUESTS, DEFAULT_PROCESSING_WINDOW_SIZE, + get_max_concurrent_requests as read_max_concurrent_requests, ) from mineru.utils.cli_parser import arg_parse from mineru.utils.config_reader import get_device, get_processing_window_size @@ -66,7 +68,6 @@ DEFAULT_TASK_RETENTION_SECONDS = 24 * 60 * 60 DEFAULT_TASK_CLEANUP_INTERVAL_SECONDS = 5 * 60 DEFAULT_OUTPUT_ROOT = "./output" ALLOWED_PARSE_METHODS = {"auto", "txt", "ocr"} -DEFAULT_MAX_CONCURRENT_REQUESTS = 3 FILE_PARSE_TASK_ID_HEADER = "X-MinerU-Task-Id" FILE_PARSE_TASK_STATUS_HEADER = "X-MinerU-Task-Status" FILE_PARSE_TASK_STATUS_URL_HEADER = "X-MinerU-Task-Status-Url" @@ -189,12 +190,9 @@ def create_app(): ) global _request_semaphore, _configured_max_concurrent_requests - try: - max_concurrent_requests = int( - os.getenv("MINERU_API_MAX_CONCURRENT_REQUESTS", f"{DEFAULT_MAX_CONCURRENT_REQUESTS}") - ) - except ValueError: - max_concurrent_requests = DEFAULT_MAX_CONCURRENT_REQUESTS + max_concurrent_requests = read_max_concurrent_requests( + default=DEFAULT_MAX_CONCURRENT_REQUESTS + ) _configured_max_concurrent_requests = max_concurrent_requests app.state.max_concurrent_requests = max_concurrent_requests diff --git a/mineru/utils/config_reader.py b/mineru/utils/config_reader.py index 983537d3..63e52d04 100644 --- a/mineru/utils/config_reader.py +++ b/mineru/utils/config_reader.py @@ -139,6 +139,20 @@ def get_processing_window_size(default: int = 64) -> int: return max(1, window_size) +def get_max_concurrent_requests(default: int = 3) -> int: + value = os.getenv('MINERU_API_MAX_CONCURRENT_REQUESTS') + if value is None: + return default + try: + max_concurrent_requests = int(value) + except ValueError: + logger.warning( + f"Invalid MINERU_API_MAX_CONCURRENT_REQUESTS value: {value}, use default {default}" + ) + return default + return max(0, max_concurrent_requests) + + def get_latex_delimiter_config(): config = read_config() if config is None: