mirror of
https://github.com/opendatalab/MinerU.git
synced 2026-03-27 11:08:32 +07:00
feat: enhance concurrent request handling with configurable limits
This commit is contained in:
@@ -1,4 +1,8 @@
|
||||
from mineru.utils.config_reader import get_processing_window_size
|
||||
from mineru.utils.config_reader import (
|
||||
get_max_concurrent_requests,
|
||||
get_processing_window_size,
|
||||
)
|
||||
|
||||
API_PROTOCOL_VERSION = 1
|
||||
DEFAULT_PROCESSING_WINDOW_SIZE = get_processing_window_size()
|
||||
DEFAULT_MAX_CONCURRENT_REQUESTS = get_max_concurrent_requests(default=3)
|
||||
DEFAULT_PROCESSING_WINDOW_SIZE = get_processing_window_size(default=64)
|
||||
|
||||
@@ -24,7 +24,9 @@ from loguru import logger
|
||||
|
||||
from mineru.cli.api_protocol import (
|
||||
API_PROTOCOL_VERSION,
|
||||
DEFAULT_MAX_CONCURRENT_REQUESTS,
|
||||
DEFAULT_PROCESSING_WINDOW_SIZE,
|
||||
get_max_concurrent_requests as read_max_concurrent_requests,
|
||||
)
|
||||
from mineru.utils.guess_suffix_or_lang import guess_suffix_by_path
|
||||
from mineru.utils.pdf_page_id import get_end_page_id
|
||||
@@ -57,7 +59,6 @@ TASK_RESULT_TIMEOUT_SECONDS = 600
|
||||
LOCAL_API_STARTUP_TIMEOUT_SECONDS = 30
|
||||
LOCAL_API_CLEANUP_RETRIES = 8
|
||||
LOCAL_API_CLEANUP_RETRY_INTERVAL_SECONDS = 0.25
|
||||
LOCAL_API_MAX_CONCURRENT_REQUESTS = 3
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
@@ -278,7 +279,7 @@ class LocalAPIServer:
|
||||
env = os.environ.copy()
|
||||
env["MINERU_API_OUTPUT_ROOT"] = str(self.output_root)
|
||||
env["MINERU_API_MAX_CONCURRENT_REQUESTS"] = str(
|
||||
LOCAL_API_MAX_CONCURRENT_REQUESTS
|
||||
read_max_concurrent_requests(default=DEFAULT_MAX_CONCURRENT_REQUESTS)
|
||||
)
|
||||
env["MINERU_API_DISABLE_ACCESS_LOG"] = "1"
|
||||
self.output_root.mkdir(parents=True, exist_ok=True)
|
||||
@@ -955,6 +956,19 @@ def resolve_submit_concurrency(max_concurrent_requests: int, task_count: int) ->
|
||||
return max(1, min(max_concurrent_requests, task_count))
|
||||
|
||||
|
||||
def resolve_effective_max_concurrent_requests(
|
||||
local_max: int,
|
||||
server_max: int,
|
||||
) -> int:
|
||||
if local_max > 0 and server_max > 0:
|
||||
return min(local_max, server_max)
|
||||
if local_max > 0:
|
||||
return local_max
|
||||
if server_max > 0:
|
||||
return server_max
|
||||
return 0
|
||||
|
||||
|
||||
async def execute_planned_tasks(
|
||||
planned_tasks: list[PlannedTask],
|
||||
concurrency: int,
|
||||
@@ -1100,11 +1114,22 @@ async def run_orchestrated_cli(
|
||||
base_url = local_server.start()
|
||||
logger.info(f"Started local mineru-api at {base_url}")
|
||||
server_health = await wait_for_local_api_ready(http_client, local_server)
|
||||
effective_max_concurrent_requests = (
|
||||
server_health.max_concurrent_requests
|
||||
)
|
||||
else:
|
||||
server_health = await fetch_server_health(
|
||||
http_client,
|
||||
normalize_base_url(api_url),
|
||||
)
|
||||
effective_max_concurrent_requests = (
|
||||
resolve_effective_max_concurrent_requests(
|
||||
read_max_concurrent_requests(
|
||||
default=DEFAULT_MAX_CONCURRENT_REQUESTS
|
||||
),
|
||||
server_health.max_concurrent_requests,
|
||||
)
|
||||
)
|
||||
live_renderer = create_live_task_status_renderer(api_url)
|
||||
|
||||
planned_tasks = plan_tasks(
|
||||
@@ -1116,7 +1141,7 @@ async def run_orchestrated_cli(
|
||||
)
|
||||
progress = build_task_execution_progress(planned_tasks)
|
||||
concurrency = resolve_submit_concurrency(
|
||||
server_health.max_concurrent_requests,
|
||||
effective_max_concurrent_requests,
|
||||
len(planned_tasks),
|
||||
)
|
||||
form_data = build_request_form_data(
|
||||
|
||||
@@ -44,7 +44,9 @@ from mineru.cli.common import (
|
||||
from mineru.cli.output_paths import resolve_parse_dir
|
||||
from mineru.cli.api_protocol import (
|
||||
API_PROTOCOL_VERSION,
|
||||
DEFAULT_MAX_CONCURRENT_REQUESTS,
|
||||
DEFAULT_PROCESSING_WINDOW_SIZE,
|
||||
get_max_concurrent_requests as read_max_concurrent_requests,
|
||||
)
|
||||
from mineru.utils.cli_parser import arg_parse
|
||||
from mineru.utils.config_reader import get_device, get_processing_window_size
|
||||
@@ -66,7 +68,6 @@ DEFAULT_TASK_RETENTION_SECONDS = 24 * 60 * 60
|
||||
DEFAULT_TASK_CLEANUP_INTERVAL_SECONDS = 5 * 60
|
||||
DEFAULT_OUTPUT_ROOT = "./output"
|
||||
ALLOWED_PARSE_METHODS = {"auto", "txt", "ocr"}
|
||||
DEFAULT_MAX_CONCURRENT_REQUESTS = 3
|
||||
FILE_PARSE_TASK_ID_HEADER = "X-MinerU-Task-Id"
|
||||
FILE_PARSE_TASK_STATUS_HEADER = "X-MinerU-Task-Status"
|
||||
FILE_PARSE_TASK_STATUS_URL_HEADER = "X-MinerU-Task-Status-Url"
|
||||
@@ -189,12 +190,9 @@ def create_app():
|
||||
)
|
||||
|
||||
global _request_semaphore, _configured_max_concurrent_requests
|
||||
try:
|
||||
max_concurrent_requests = int(
|
||||
os.getenv("MINERU_API_MAX_CONCURRENT_REQUESTS", f"{DEFAULT_MAX_CONCURRENT_REQUESTS}")
|
||||
)
|
||||
except ValueError:
|
||||
max_concurrent_requests = DEFAULT_MAX_CONCURRENT_REQUESTS
|
||||
max_concurrent_requests = read_max_concurrent_requests(
|
||||
default=DEFAULT_MAX_CONCURRENT_REQUESTS
|
||||
)
|
||||
|
||||
_configured_max_concurrent_requests = max_concurrent_requests
|
||||
app.state.max_concurrent_requests = max_concurrent_requests
|
||||
|
||||
@@ -139,6 +139,20 @@ def get_processing_window_size(default: int = 64) -> int:
|
||||
return max(1, window_size)
|
||||
|
||||
|
||||
def get_max_concurrent_requests(default: int = 3) -> int:
|
||||
value = os.getenv('MINERU_API_MAX_CONCURRENT_REQUESTS')
|
||||
if value is None:
|
||||
return default
|
||||
try:
|
||||
max_concurrent_requests = int(value)
|
||||
except ValueError:
|
||||
logger.warning(
|
||||
f"Invalid MINERU_API_MAX_CONCURRENT_REQUESTS value: {value}, use default {default}"
|
||||
)
|
||||
return default
|
||||
return max(0, max_concurrent_requests)
|
||||
|
||||
|
||||
def get_latex_delimiter_config():
|
||||
config = read_config()
|
||||
if config is None:
|
||||
|
||||
Reference in New Issue
Block a user