diff --git a/mineru/backend/pipeline/pipeline_analyze.py b/mineru/backend/pipeline/pipeline_analyze.py index b6d402d8..0e3089eb 100644 --- a/mineru/backend/pipeline/pipeline_analyze.py +++ b/mineru/backend/pipeline/pipeline_analyze.py @@ -299,12 +299,10 @@ def batch_image_analyze( gpu_memory = get_vram(device) if gpu_memory >= 16: - batch_ratio = 16 - elif gpu_memory >= 8: batch_ratio = 8 - elif gpu_memory >= 6: + elif gpu_memory >= 8: batch_ratio = 4 - elif gpu_memory >= 4: + elif gpu_memory >= 6: batch_ratio = 2 else: batch_ratio = 1 diff --git a/mineru/cli/client.py b/mineru/cli/client.py index d220d99a..10cec1a1 100644 --- a/mineru/cli/client.py +++ b/mineru/cli/client.py @@ -47,7 +47,7 @@ TASK_RESULT_TIMEOUT_SECONDS = 600 LOCAL_API_STARTUP_TIMEOUT_SECONDS = 30 LOCAL_API_CLEANUP_RETRIES = 8 LOCAL_API_CLEANUP_RETRY_INTERVAL_SECONDS = 0.25 -LOCAL_API_MAX_CONCURRENT_REQUESTS = 1 +LOCAL_API_MAX_CONCURRENT_REQUESTS = 3 @dataclass(frozen=True) @@ -66,6 +66,15 @@ class PlannedTask: total_pages: int +@dataclass +class TaskExecutionProgress: + total_tasks: int + total_pages: int + completed_tasks: int + completed_pages: int + lock: asyncio.Lock + + @dataclass(frozen=True) class ServerHealth: base_url: str @@ -190,6 +199,71 @@ def format_task_label(task: PlannedTask) -> str: return f"task#{task.index} [{doc_names}]" +def format_task_log_label(task: PlannedTask, max_documents: int = 3) -> str: + if len(task.documents) <= max_documents: + return format_task_label(task) + + visible_names = ", ".join(doc.stem for doc in task.documents[:max_documents]) + hidden_count = len(task.documents) - max_documents + return f"task#{task.index} [{visible_names}, +{hidden_count} more]" + + +def format_count(value: int, singular: str, plural: Optional[str] = None) -> str: + unit = singular if value == 1 else (plural or f"{singular}s") + return f"{value} {unit}" + + +def format_task_submission_message( + task: PlannedTask, + progress: TaskExecutionProgress, +) -> str: + return ( + f"Submitting batch {task.index}/{progress.total_tasks} | " + f"{format_count(len(task.documents), 'document')}, " + f"{format_count(task.total_pages, 'page')} in this batch | " + f"{format_count(progress.total_pages, 'page')} total | " + f"{format_task_log_label(task)}" + ) + + +def format_task_completion_message( + task: PlannedTask, + progress: TaskExecutionProgress, + completed_tasks: int, + completed_pages: int, +) -> str: + batch_word = "batch" if progress.total_tasks == 1 else "batches" + page_word = "page" if progress.total_pages == 1 else "pages" + return ( + f"Completed batch {task.index}/{progress.total_tasks} | " + f"Processed {completed_pages}/{progress.total_pages} {page_word} | " + f"{completed_tasks} of {progress.total_tasks} {batch_word} finished | " + f"{format_task_log_label(task)}" + ) + + +def build_task_execution_progress( + planned_tasks: list[PlannedTask], +) -> TaskExecutionProgress: + return TaskExecutionProgress( + total_tasks=len(planned_tasks), + total_pages=sum(task.total_pages for task in planned_tasks), + completed_tasks=0, + completed_pages=0, + lock=asyncio.Lock(), + ) + + +async def mark_task_completed( + progress: TaskExecutionProgress, + completed_pages: int, +) -> tuple[int, int]: + async with progress.lock: + progress.completed_tasks += 1 + progress.completed_pages += completed_pages + return progress.completed_tasks, progress.completed_pages + + def response_detail(response: httpx.Response) -> str: try: payload = response.json() @@ -651,13 +725,11 @@ async def run_planned_task( client: httpx.AsyncClient, server_health: ServerHealth, planned_task: PlannedTask, + progress: TaskExecutionProgress, form_data: dict[str, str], output_dir: Path, ) -> None: - logger.info( - f"Submitting {format_task_label(planned_task)} " - f"with {len(planned_task.documents)} document(s), {planned_task.total_pages} page(s)" - ) + logger.info(format_task_submission_message(planned_task, progress)) submit_response = await submit_task( client=client, base_url=server_health.base_url, @@ -678,6 +750,18 @@ async def run_planned_task( safe_extract_zip(zip_path, output_dir) finally: zip_path.unlink(missing_ok=True) + completed_tasks, completed_pages = await mark_task_completed( + progress, + planned_task.total_pages, + ) + logger.info( + format_task_completion_message( + planned_task, + progress, + completed_tasks, + completed_pages, + ) + ) async def run_orchestrated_cli( @@ -727,6 +811,7 @@ async def run_orchestrated_cli( if backend == "pipeline" else DEFAULT_PROCESSING_WINDOW_SIZE, ) + progress = build_task_execution_progress(planned_tasks) concurrency = resolve_submit_concurrency( server_health.max_concurrent_requests, len(planned_tasks), @@ -748,6 +833,7 @@ async def run_orchestrated_cli( client=http_client, server_health=server_health, planned_task=planned_task, + progress=progress, form_data=form_data, output_dir=output_dir, ), diff --git a/mineru/cli/fast_api.py b/mineru/cli/fast_api.py index 3baf8358..268d58bc 100644 --- a/mineru/cli/fast_api.py +++ b/mineru/cli/fast_api.py @@ -64,7 +64,7 @@ DEFAULT_TASK_RETENTION_SECONDS = 24 * 60 * 60 DEFAULT_TASK_CLEANUP_INTERVAL_SECONDS = 5 * 60 DEFAULT_OUTPUT_ROOT = "./output" ALLOWED_PARSE_METHODS = {"auto", "txt", "ocr"} -DEFAULT_MAX_CONCURRENT_REQUESTS = 1 +DEFAULT_MAX_CONCURRENT_REQUESTS = 3 FILE_PARSE_TASK_ID_HEADER = "X-MinerU-Task-Id" FILE_PARSE_TASK_STATUS_HEADER = "X-MinerU-Task-Status" FILE_PARSE_TASK_STATUS_URL_HEADER = "X-MinerU-Task-Status-Url" diff --git a/mineru/utils/pdf_page_id.py b/mineru/utils/pdf_page_id.py index c1e72336..5471297f 100644 --- a/mineru/utils/pdf_page_id.py +++ b/mineru/utils/pdf_page_id.py @@ -5,6 +5,6 @@ from loguru import logger def get_end_page_id(end_page_id, pdf_page_num): end_page_id = end_page_id if end_page_id is not None and end_page_id >= 0 else pdf_page_num - 1 if end_page_id > pdf_page_num - 1: - logger.warning("end_page_id is out of range, use images length") + logger.debug("end_page_id is out of range, use images length") end_page_id = pdf_page_num - 1 return end_page_id