Merge pull request #27 from myhloli/new_client

feat: increase maximum concurrent requests to 3 and enhance task logg…
This commit is contained in:
Xiaomeng Zhao
2026-03-26 01:16:21 +08:00
committed by GitHub
4 changed files with 95 additions and 11 deletions

View File

@@ -299,12 +299,10 @@ def batch_image_analyze(
gpu_memory = get_vram(device)
if gpu_memory >= 16:
batch_ratio = 16
elif gpu_memory >= 8:
batch_ratio = 8
elif gpu_memory >= 6:
elif gpu_memory >= 8:
batch_ratio = 4
elif gpu_memory >= 4:
elif gpu_memory >= 6:
batch_ratio = 2
else:
batch_ratio = 1

View File

@@ -47,7 +47,7 @@ TASK_RESULT_TIMEOUT_SECONDS = 600
LOCAL_API_STARTUP_TIMEOUT_SECONDS = 30
LOCAL_API_CLEANUP_RETRIES = 8
LOCAL_API_CLEANUP_RETRY_INTERVAL_SECONDS = 0.25
LOCAL_API_MAX_CONCURRENT_REQUESTS = 1
LOCAL_API_MAX_CONCURRENT_REQUESTS = 3
@dataclass(frozen=True)
@@ -66,6 +66,15 @@ class PlannedTask:
total_pages: int
@dataclass
class TaskExecutionProgress:
total_tasks: int
total_pages: int
completed_tasks: int
completed_pages: int
lock: asyncio.Lock
@dataclass(frozen=True)
class ServerHealth:
base_url: str
@@ -190,6 +199,71 @@ def format_task_label(task: PlannedTask) -> str:
return f"task#{task.index} [{doc_names}]"
def format_task_log_label(task: PlannedTask, max_documents: int = 3) -> str:
if len(task.documents) <= max_documents:
return format_task_label(task)
visible_names = ", ".join(doc.stem for doc in task.documents[:max_documents])
hidden_count = len(task.documents) - max_documents
return f"task#{task.index} [{visible_names}, +{hidden_count} more]"
def format_count(value: int, singular: str, plural: Optional[str] = None) -> str:
unit = singular if value == 1 else (plural or f"{singular}s")
return f"{value} {unit}"
def format_task_submission_message(
task: PlannedTask,
progress: TaskExecutionProgress,
) -> str:
return (
f"Submitting batch {task.index}/{progress.total_tasks} | "
f"{format_count(len(task.documents), 'document')}, "
f"{format_count(task.total_pages, 'page')} in this batch | "
f"{format_count(progress.total_pages, 'page')} total | "
f"{format_task_log_label(task)}"
)
def format_task_completion_message(
task: PlannedTask,
progress: TaskExecutionProgress,
completed_tasks: int,
completed_pages: int,
) -> str:
batch_word = "batch" if progress.total_tasks == 1 else "batches"
page_word = "page" if progress.total_pages == 1 else "pages"
return (
f"Completed batch {task.index}/{progress.total_tasks} | "
f"Processed {completed_pages}/{progress.total_pages} {page_word} | "
f"{completed_tasks} of {progress.total_tasks} {batch_word} finished | "
f"{format_task_log_label(task)}"
)
def build_task_execution_progress(
planned_tasks: list[PlannedTask],
) -> TaskExecutionProgress:
return TaskExecutionProgress(
total_tasks=len(planned_tasks),
total_pages=sum(task.total_pages for task in planned_tasks),
completed_tasks=0,
completed_pages=0,
lock=asyncio.Lock(),
)
async def mark_task_completed(
progress: TaskExecutionProgress,
completed_pages: int,
) -> tuple[int, int]:
async with progress.lock:
progress.completed_tasks += 1
progress.completed_pages += completed_pages
return progress.completed_tasks, progress.completed_pages
def response_detail(response: httpx.Response) -> str:
try:
payload = response.json()
@@ -651,13 +725,11 @@ async def run_planned_task(
client: httpx.AsyncClient,
server_health: ServerHealth,
planned_task: PlannedTask,
progress: TaskExecutionProgress,
form_data: dict[str, str],
output_dir: Path,
) -> None:
logger.info(
f"Submitting {format_task_label(planned_task)} "
f"with {len(planned_task.documents)} document(s), {planned_task.total_pages} page(s)"
)
logger.info(format_task_submission_message(planned_task, progress))
submit_response = await submit_task(
client=client,
base_url=server_health.base_url,
@@ -678,6 +750,18 @@ async def run_planned_task(
safe_extract_zip(zip_path, output_dir)
finally:
zip_path.unlink(missing_ok=True)
completed_tasks, completed_pages = await mark_task_completed(
progress,
planned_task.total_pages,
)
logger.info(
format_task_completion_message(
planned_task,
progress,
completed_tasks,
completed_pages,
)
)
async def run_orchestrated_cli(
@@ -727,6 +811,7 @@ async def run_orchestrated_cli(
if backend == "pipeline"
else DEFAULT_PROCESSING_WINDOW_SIZE,
)
progress = build_task_execution_progress(planned_tasks)
concurrency = resolve_submit_concurrency(
server_health.max_concurrent_requests,
len(planned_tasks),
@@ -748,6 +833,7 @@ async def run_orchestrated_cli(
client=http_client,
server_health=server_health,
planned_task=planned_task,
progress=progress,
form_data=form_data,
output_dir=output_dir,
),

View File

@@ -64,7 +64,7 @@ DEFAULT_TASK_RETENTION_SECONDS = 24 * 60 * 60
DEFAULT_TASK_CLEANUP_INTERVAL_SECONDS = 5 * 60
DEFAULT_OUTPUT_ROOT = "./output"
ALLOWED_PARSE_METHODS = {"auto", "txt", "ocr"}
DEFAULT_MAX_CONCURRENT_REQUESTS = 1
DEFAULT_MAX_CONCURRENT_REQUESTS = 3
FILE_PARSE_TASK_ID_HEADER = "X-MinerU-Task-Id"
FILE_PARSE_TASK_STATUS_HEADER = "X-MinerU-Task-Status"
FILE_PARSE_TASK_STATUS_URL_HEADER = "X-MinerU-Task-Status-Url"

View File

@@ -5,6 +5,6 @@ from loguru import logger
def get_end_page_id(end_page_id, pdf_page_num):
end_page_id = end_page_id if end_page_id is not None and end_page_id >= 0 else pdf_page_num - 1
if end_page_id > pdf_page_num - 1:
logger.warning("end_page_id is out of range, use images length")
logger.debug("end_page_id is out of range, use images length")
end_page_id = pdf_page_num - 1
return end_page_id