diff --git a/docker/compose.yaml b/docker/compose.yaml index 2ff25673..543804a0 100644 --- a/docker/compose.yaml +++ b/docker/compose.yaml @@ -48,6 +48,8 @@ services: memlock: -1 stack: 67108864 ipc: host + healthcheck: + test: ["CMD-SHELL", "curl -f http://localhost:8000/health || exit 1"] deploy: resources: reservations: diff --git a/docs/en/usage/quick_usage.md b/docs/en/usage/quick_usage.md index fe350204..dfe79879 100644 --- a/docs/en/usage/quick_usage.md +++ b/docs/en/usage/quick_usage.md @@ -33,6 +33,37 @@ If you need to adjust parsing options through custom parameters, you can also ch ``` >[!TIP] >Access `http://127.0.0.1:8000/docs` in your browser to view the API documentation. + > + >- Health endpoint: `GET /health` + >- Task submission endpoint: `POST /tasks` + >- Task query endpoints: `GET /tasks/{task_id}`, `GET /tasks/{task_id}/result` + >- Compatibility route: `POST /file_parse`, which now behaves the same as `POST /tasks` and returns a `task_id` immediately + >- API outputs are controlled by the server and written to `./output` by default + > + >Async tasks are tracked only in-process for a single `mineru-api` instance. Task status is not preserved across service restarts, `--reload`, or multi-process deployments. + >Completed or failed tasks are retained for 24 hours by default, then their task state and output directory are cleaned automatically. After cleanup, task status and result endpoints return `404`. + >Use `MINERU_API_TASK_RETENTION_SECONDS` and `MINERU_API_TASK_CLEANUP_INTERVAL_SECONDS` to adjust retention and cleanup polling intervals. + > + >Task submission example: + >```bash + >curl -X POST http://127.0.0.1:8000/tasks \ + > -F "files=@demo/pdfs/demo1.pdf" \ + > -F "return_md=true" + >``` + > + >Compatibility route example: + >```bash + >curl -X POST http://127.0.0.1:8000/file_parse \ + > -F "files=@demo/pdfs/demo1.pdf" \ + > -F "return_md=true" + >``` + > + >Poll task status and fetch results: + >```bash + >curl http://127.0.0.1:8000/tasks/ + >curl http://127.0.0.1:8000/tasks//result + >curl http://127.0.0.1:8000/health + >``` - Start Gradio WebUI visual frontend: ```bash mineru-gradio --server-name 0.0.0.0 --server-port 7860 diff --git a/docs/zh/usage/quick_usage.md b/docs/zh/usage/quick_usage.md index 835a9c4f..88243ec2 100644 --- a/docs/zh/usage/quick_usage.md +++ b/docs/zh/usage/quick_usage.md @@ -33,6 +33,37 @@ mineru -p -o ``` >[!TIP] >在浏览器中访问 `http://127.0.0.1:8000/docs` 查看API文档。 + > + >- 健康检查接口:`GET /health` + >- 任务提交接口:`POST /tasks` + >- 任务查询接口:`GET /tasks/{task_id}`、`GET /tasks/{task_id}/result` + >- 兼容路由:`POST /file_parse`,行为与 `POST /tasks` 相同,都会立即返回 `task_id` + >- API 输出目录由服务端固定控制,默认写入 `./output` + > + >异步任务为单进程、进程内状态实现,服务重启、`--reload` 热重载或多进程部署后不保证仍可查询历史任务状态。 + >默认任务完成或失败后保留 24 小时,随后自动清理任务状态和输出目录;清理后访问任务状态或结果会返回 `404`。 + >可通过环境变量 `MINERU_API_TASK_RETENTION_SECONDS` 和 `MINERU_API_TASK_CLEANUP_INTERVAL_SECONDS` 调整保留时长与清理轮询间隔。 + > + >任务提交示例: + >```bash + >curl -X POST http://127.0.0.1:8000/tasks \ + > -F "files=@demo/pdfs/demo1.pdf" \ + > -F "return_md=true" + >``` + > + >兼容路由示例: + >```bash + >curl -X POST http://127.0.0.1:8000/file_parse \ + > -F "files=@demo/pdfs/demo1.pdf" \ + > -F "return_md=true" + >``` + > + >轮询任务状态与结果: + >```bash + >curl http://127.0.0.1:8000/tasks/ + >curl http://127.0.0.1:8000/tasks//result + >curl http://127.0.0.1:8000/health + >``` - 启动gradio webui 可视化前端: ```bash mineru-gradio --server-name 0.0.0.0 --server-port 7860 diff --git a/mineru/cli/fast_api.py b/mineru/cli/fast_api.py index cae6bc39..98fe9f43 100644 --- a/mineru/cli/fast_api.py +++ b/mineru/cli/fast_api.py @@ -1,49 +1,149 @@ -import sys -import uuid +import asyncio +import glob import os import re -import tempfile -import asyncio -import uvicorn -import click -import zipfile import shutil +import sys +import tempfile +import uuid +import zipfile +from contextlib import asynccontextmanager, suppress +from dataclasses import dataclass +from datetime import datetime, timezone from pathlib import Path -import glob -from fastapi import Depends, FastAPI, HTTPException, UploadFile, File, Form, BackgroundTasks -from fastapi.middleware.gzip import GZipMiddleware -from fastapi.responses import JSONResponse, FileResponse -from typing import List, Optional -from loguru import logger +from typing import Any, Optional -log_level = os.getenv("MINERU_LOG_LEVEL", "INFO").upper() -logger.remove() # 移除默认handler -logger.add(sys.stderr, level=log_level) # 添加新handler +import click +import uvicorn +from fastapi import ( + BackgroundTasks, + Depends, + FastAPI, + File, + Form, + HTTPException, + Request, + UploadFile, +) +from fastapi.middleware.gzip import GZipMiddleware +from fastapi.responses import FileResponse, JSONResponse, Response +from loguru import logger from base64 import b64encode -from mineru.cli.common import aio_do_parse, read_fn, pdf_suffixes, image_suffixes, office_suffixes +from mineru.cli.common import ( + aio_do_parse, + do_parse, + image_suffixes, + office_suffixes, + pdf_suffixes, + read_fn, +) from mineru.utils.cli_parser import arg_parse from mineru.utils.guess_suffix_or_lang import guess_suffix_by_path from mineru.version import __version__ +log_level = os.getenv("MINERU_LOG_LEVEL", "INFO").upper() +logger.remove() +logger.add(sys.stderr, level=log_level) + +TASK_PENDING = "pending" +TASK_PROCESSING = "processing" +TASK_COMPLETED = "completed" +TASK_FAILED = "failed" +SUPPORTED_UPLOAD_SUFFIXES = pdf_suffixes + image_suffixes + office_suffixes +DEFAULT_TASK_RETENTION_SECONDS = 24 * 60 * 60 +DEFAULT_TASK_CLEANUP_INTERVAL_SECONDS = 5 * 60 +DEFAULT_OUTPUT_ROOT = "./output" +ALLOWED_PARSE_METHODS = {"auto", "txt", "ocr"} + # 并发控制器 _request_semaphore: Optional[asyncio.Semaphore] = None -# 并发控制依赖函数 -async def limit_concurrency(): - if _request_semaphore is not None: - # 检查信号量是否已用尽,如果是则拒绝请求 - if _request_semaphore._value == 0: - raise HTTPException( - status_code=503, - detail=f"Server is at maximum capacity: {os.getenv('MINERU_API_MAX_CONCURRENT_REQUESTS', 'unset')}. Please try again later.", - ) - async with _request_semaphore: - yield - else: +@dataclass +class ParseRequestOptions: + files: list[UploadFile] + lang_list: list[str] + backend: str + parse_method: str + formula_enable: bool + table_enable: bool + server_url: Optional[str] + return_md: bool + return_middle_json: bool + return_model_output: bool + return_content_list: bool + return_images: bool + response_format_zip: bool + start_page_id: int + end_page_id: int + + +@dataclass +class StoredUpload: + original_name: str + stem: str + path: str + + +@dataclass +class AsyncParseTask: + task_id: str + status: str + backend: str + file_names: list[str] + created_at: str + output_dir: str + parse_method: str + lang_list: list[str] + formula_enable: bool + table_enable: bool + server_url: Optional[str] + return_md: bool + return_middle_json: bool + return_model_output: bool + return_content_list: bool + return_images: bool + response_format_zip: bool + start_page_id: int + end_page_id: int + uploads: list[str] + started_at: Optional[str] = None + completed_at: Optional[str] = None + error: Optional[str] = None + + def to_status_payload(self, request: Request) -> dict[str, Any]: + return { + "task_id": self.task_id, + "status": self.status, + "backend": self.backend, + "file_names": self.file_names, + "created_at": self.created_at, + "started_at": self.started_at, + "completed_at": self.completed_at, + "error": self.error, + "status_url": str( + request.url_for("get_async_task_status", task_id=self.task_id) + ), + "result_url": str( + request.url_for("get_async_task_result", task_id=self.task_id) + ), + } + + +@asynccontextmanager +async def lifespan(app: FastAPI): + task_manager = AsyncTaskManager(app) + await task_manager.start() + app.state.task_manager = task_manager + try: yield + finally: + current_task_manager = getattr(app.state, "task_manager", None) + if current_task_manager is not None: + await current_task_manager.shutdown() + app.state.task_manager = None def create_app(): @@ -58,9 +158,9 @@ def create_app(): openapi_url="/openapi.json" if enable_docs else None, docs_url="/docs" if enable_docs else None, redoc_url="/redoc" if enable_docs else None, + lifespan=lifespan, ) - # 初始化并发控制器:从环境变量MINERU_API_MAX_CONCURRENT_REQUESTS读取 global _request_semaphore try: max_concurrent_requests = int( @@ -80,6 +180,60 @@ def create_app(): app = create_app() +def utc_now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def get_int_env(name: str, default: int, minimum: int = 0) -> int: + try: + value = int(os.getenv(name, str(default))) + except ValueError: + return default + if value < minimum: + return default + return value + + +def get_max_concurrent_requests() -> int: + if _request_semaphore is None: + return 0 + return get_int_env("MINERU_API_MAX_CONCURRENT_REQUESTS", 0) + + +def get_task_retention_seconds() -> int: + return get_int_env( + "MINERU_API_TASK_RETENTION_SECONDS", + DEFAULT_TASK_RETENTION_SECONDS, + minimum=0, + ) + + +def get_task_cleanup_interval_seconds() -> int: + return get_int_env( + "MINERU_API_TASK_CLEANUP_INTERVAL_SECONDS", + DEFAULT_TASK_CLEANUP_INTERVAL_SECONDS, + minimum=1, + ) + + +def get_output_root() -> Path: + root = Path(os.getenv("MINERU_API_OUTPUT_ROOT", DEFAULT_OUTPUT_ROOT)).expanduser() + root.mkdir(parents=True, exist_ok=True) + return root.resolve() + + +def validate_parse_method(parse_method: str) -> str: + if parse_method not in ALLOWED_PARSE_METHODS: + raise HTTPException( + status_code=400, + detail=( + "Invalid parse_method. Allowed values: " + + ", ".join(sorted(ALLOWED_PARSE_METHODS)) + ), + ) + return parse_method + + def sanitize_filename(filename: str) -> str: """ 格式化压缩文件的文件名 @@ -122,14 +276,218 @@ def get_infer_result( return None -@app.post(path="/file_parse", dependencies=[Depends(limit_concurrency)]) -async def parse_pdf( +def normalize_lang_list(lang_list: list[str], file_count: int) -> list[str]: + if len(lang_list) == file_count: + return lang_list + base_lang = lang_list[0] if lang_list else "ch" + return [base_lang] * file_count + + +def get_parse_dir(output_dir: str, pdf_name: str, backend: str, parse_method: str) -> str: + candidates = [] + if backend.startswith("pipeline"): + candidates.append(os.path.join(output_dir, pdf_name, parse_method)) + elif backend.startswith("vlm"): + candidates.append(os.path.join(output_dir, pdf_name, "vlm")) + elif backend.startswith("hybrid"): + candidates.append(os.path.join(output_dir, pdf_name, f"hybrid_{parse_method}")) + + candidates.append(os.path.join(output_dir, pdf_name, "office")) + for candidate in candidates: + if os.path.exists(candidate): + return candidate + + if candidates: + return candidates[0] + raise ValueError(f"Unknown backend type: {backend}") + + +def build_result_dict( + output_dir: str, + pdf_file_names: list[str], + backend: str, + parse_method: str, + return_md: bool, + return_middle_json: bool, + return_model_output: bool, + return_content_list: bool, + return_images: bool, +) -> dict[str, dict[str, Any]]: + result_dict: dict[str, dict[str, Any]] = {} + for pdf_name in pdf_file_names: + result_dict[pdf_name] = {} + data = result_dict[pdf_name] + + try: + parse_dir = get_parse_dir(output_dir, pdf_name, backend, parse_method) + except ValueError: + logger.warning(f"Unknown backend type: {backend}, skipping {pdf_name}") + continue + + if not os.path.exists(parse_dir): + continue + + if return_md: + data["md_content"] = get_infer_result(".md", pdf_name, parse_dir) + if return_middle_json: + data["middle_json"] = get_infer_result("_middle.json", pdf_name, parse_dir) + if return_model_output: + data["model_output"] = get_infer_result("_model.json", pdf_name, parse_dir) + if return_content_list: + data["content_list"] = get_infer_result( + "_content_list.json", pdf_name, parse_dir + ) + if return_images: + images_dir = os.path.join(parse_dir, "images") + safe_pattern = os.path.join(glob.escape(images_dir), "*.jpg") + image_paths = glob.glob(safe_pattern) + data["images"] = { + os.path.basename( + image_path + ): f"data:image/jpeg;base64,{encode_image(image_path)}" + for image_path in image_paths + } + return result_dict + + +def create_result_zip( + output_dir: str, + pdf_file_names: list[str], + backend: str, + parse_method: str, + return_md: bool, + return_middle_json: bool, + return_model_output: bool, + return_content_list: bool, + return_images: bool, +) -> str: + zip_fd, zip_path = tempfile.mkstemp(suffix=".zip", prefix="mineru_results_") + os.close(zip_fd) + + with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: + for pdf_name in pdf_file_names: + safe_pdf_name = sanitize_filename(pdf_name) + + try: + parse_dir = get_parse_dir(output_dir, pdf_name, backend, parse_method) + except ValueError: + logger.warning(f"Unknown backend type: {backend}, skipping {pdf_name}") + continue + + if not os.path.exists(parse_dir): + continue + + if return_md: + path = os.path.join(parse_dir, f"{pdf_name}.md") + if os.path.exists(path): + zf.write( + path, + arcname=os.path.join(safe_pdf_name, f"{safe_pdf_name}.md"), + ) + + if return_middle_json: + path = os.path.join(parse_dir, f"{pdf_name}_middle.json") + if os.path.exists(path): + zf.write( + path, + arcname=os.path.join( + safe_pdf_name, f"{safe_pdf_name}_middle.json" + ), + ) + + if return_model_output: + path = os.path.join(parse_dir, f"{pdf_name}_model.json") + if os.path.exists(path): + zf.write( + path, + arcname=os.path.join( + safe_pdf_name, f"{safe_pdf_name}_model.json" + ), + ) + + if return_content_list: + path = os.path.join(parse_dir, f"{pdf_name}_content_list.json") + if os.path.exists(path): + zf.write( + path, + arcname=os.path.join( + safe_pdf_name, f"{safe_pdf_name}_content_list.json" + ), + ) + + if return_images: + images_dir = os.path.join(parse_dir, "images") + image_paths = glob.glob(os.path.join(glob.escape(images_dir), "*.jpg")) + for image_path in image_paths: + zf.write( + image_path, + arcname=os.path.join( + safe_pdf_name, "images", os.path.basename(image_path) + ), + ) + return zip_path + + +def build_result_response( background_tasks: BackgroundTasks, - files: List[UploadFile] = File( + status_code: int, + output_dir: str, + pdf_file_names: list[str], + backend: str, + parse_method: str, + return_md: bool, + return_middle_json: bool, + return_model_output: bool, + return_content_list: bool, + return_images: bool, + response_format_zip: bool, +) -> Response: + if response_format_zip: + zip_path = create_result_zip( + output_dir=output_dir, + pdf_file_names=pdf_file_names, + backend=backend, + parse_method=parse_method, + return_md=return_md, + return_middle_json=return_middle_json, + return_model_output=return_model_output, + return_content_list=return_content_list, + return_images=return_images, + ) + background_tasks.add_task(cleanup_file, zip_path) + return FileResponse( + path=zip_path, + media_type="application/zip", + filename="results.zip", + status_code=status_code, + ) + + result_dict = build_result_dict( + output_dir=output_dir, + pdf_file_names=pdf_file_names, + backend=backend, + parse_method=parse_method, + return_md=return_md, + return_middle_json=return_middle_json, + return_model_output=return_model_output, + return_content_list=return_content_list, + return_images=return_images, + ) + return JSONResponse( + status_code=status_code, + content={ + "backend": backend, + "version": __version__, + "results": result_dict, + }, + ) + + +async def parse_request_form( + files: list[UploadFile] = File( ..., description="Upload pdf or image files for parsing" ), - output_dir: str = Form("./output", description="Output local directory"), - lang_list: List[str] = Form( + lang_list: list[str] = Form( ["ch"], description="""(Adapted only for pipeline and hybrid backend)Input the languages in the pdf to improve OCR accuracy.Options: - ch: Chinese, English, Chinese Traditional. @@ -196,229 +554,480 @@ async def parse_pdf( end_page_id: int = Form( 99999, description="The ending page for PDF parsing, beginning from 0" ), -): - # 获取命令行配置参数 - config = getattr(app.state, "config", {}) +) -> ParseRequestOptions: + return ParseRequestOptions( + files=files, + lang_list=lang_list, + backend=backend, + parse_method=validate_parse_method(parse_method), + formula_enable=formula_enable, + table_enable=table_enable, + server_url=server_url, + return_md=return_md, + return_middle_json=return_middle_json, + return_model_output=return_model_output, + return_content_list=return_content_list, + return_images=return_images, + response_format_zip=response_format_zip, + start_page_id=start_page_id, + end_page_id=end_page_id, + ) + + +async def save_upload_files(upload_dir: str, files: list[UploadFile]) -> list[StoredUpload]: + os.makedirs(upload_dir, exist_ok=True) + uploads: list[StoredUpload] = [] + + for upload in files: + original_name = upload.filename or f"upload-{uuid.uuid4()}" + filename = Path(original_name).name + destination = Path(upload_dir) / filename + with open(destination, "wb") as handle: + while True: + chunk = await upload.read(1 << 20) + if not chunk: + break + handle.write(chunk) + + file_suffix = guess_suffix_by_path(destination) + if file_suffix not in SUPPORTED_UPLOAD_SUFFIXES: + cleanup_file(str(destination)) + raise HTTPException( + status_code=400, + detail=f"Unsupported file type: {file_suffix}", + ) + + uploads.append( + StoredUpload( + original_name=original_name, + stem=Path(filename).stem, + path=str(destination), + ) + ) + return uploads + + +def load_parse_inputs(uploads: list[StoredUpload]) -> tuple[list[str], list[bytes]]: + pdf_file_names = [] + pdf_bytes_list = [] + + for upload in uploads: + try: + pdf_bytes = read_fn(Path(upload.path)) + except Exception as exc: + raise RuntimeError(f"Failed to load file {upload.original_name}: {exc}") from exc + pdf_file_names.append(upload.stem) + pdf_bytes_list.append(pdf_bytes) + return pdf_file_names, pdf_bytes_list + + +async def run_parse_job( + output_dir: str, + uploads: list[StoredUpload], + request_options: ParseRequestOptions | AsyncParseTask, + config: dict[str, Any], +) -> list[str]: + pdf_file_names, pdf_bytes_list = await asyncio.to_thread(load_parse_inputs, uploads) + actual_lang_list = normalize_lang_list(request_options.lang_list, len(pdf_file_names)) + response_file_names = list(pdf_file_names) + + parse_kwargs = dict( + output_dir=output_dir, + pdf_file_names=list(pdf_file_names), + pdf_bytes_list=list(pdf_bytes_list), + p_lang_list=list(actual_lang_list), + backend=request_options.backend, + parse_method=request_options.parse_method, + formula_enable=request_options.formula_enable, + table_enable=request_options.table_enable, + server_url=request_options.server_url, + f_draw_layout_bbox=False, + f_draw_span_bbox=False, + f_dump_md=request_options.return_md, + f_dump_middle_json=request_options.return_middle_json, + f_dump_model_output=request_options.return_model_output, + f_dump_orig_pdf=False, + f_dump_content_list=request_options.return_content_list, + start_page_id=request_options.start_page_id, + end_page_id=request_options.end_page_id, + **config, + ) + + if request_options.backend == "pipeline": + await asyncio.to_thread(do_parse, **parse_kwargs) + else: + await aio_do_parse(**parse_kwargs) + return response_file_names + + +def create_task_output_dir(task_id: str) -> str: + output_root = get_output_root() + task_output_dir = output_root / task_id + task_output_dir.mkdir(parents=True, exist_ok=True) + return str(task_output_dir) + + +async def submit_async_parse_task_internal( + http_request: Request, + request_options: ParseRequestOptions, +) -> JSONResponse: + task_id = str(uuid.uuid4()) + task_output_dir = create_task_output_dir(task_id) + uploads_dir = os.path.join(task_output_dir, "uploads") + task_manager = get_task_manager() try: - # 创建唯一的输出目录 - unique_dir = os.path.join(output_dir, str(uuid.uuid4())) - os.makedirs(unique_dir, exist_ok=True) - background_tasks.add_task(cleanup_file, unique_dir) + uploads = await save_upload_files(uploads_dir, request_options.files) + file_names = [upload.stem for upload in uploads] + task = AsyncParseTask( + task_id=task_id, + status=TASK_PENDING, + backend=request_options.backend, + file_names=file_names, + created_at=utc_now_iso(), + output_dir=task_output_dir, + parse_method=request_options.parse_method, + lang_list=request_options.lang_list, + formula_enable=request_options.formula_enable, + table_enable=request_options.table_enable, + server_url=request_options.server_url, + return_md=request_options.return_md, + return_middle_json=request_options.return_middle_json, + return_model_output=request_options.return_model_output, + return_content_list=request_options.return_content_list, + return_images=request_options.return_images, + response_format_zip=request_options.response_format_zip, + start_page_id=request_options.start_page_id, + end_page_id=request_options.end_page_id, + uploads=[upload.path for upload in uploads], + ) + await task_manager.submit(task) - # 处理上传的PDF文件 - pdf_file_names = [] - pdf_bytes_list = [] + payload = task.to_status_payload(http_request) + payload["message"] = "Task submitted successfully" + return JSONResponse(status_code=202, content=payload) + except HTTPException: + cleanup_file(task_output_dir) + raise + except Exception: + cleanup_file(task_output_dir) + raise - for file in files: - content = await file.read() - file_path = Path(file.filename) - # 创建临时文件 - temp_path = Path(unique_dir) / file_path.name - with open(temp_path, "wb") as f: - f.write(content) +class AsyncTaskManager: + def __init__(self, fastapi_app: FastAPI): + self.app = fastapi_app + self.tasks: dict[str, AsyncParseTask] = {} + self.queue: asyncio.Queue[str] = asyncio.Queue() + self.dispatcher_task: Optional[asyncio.Task[Any]] = None + self.cleanup_task: Optional[asyncio.Task[Any]] = None + self.active_tasks: set[asyncio.Task[Any]] = set() + self.last_worker_error: Optional[str] = None + self.is_shutting_down = False + self.task_retention_seconds = get_task_retention_seconds() + self.task_cleanup_interval_seconds = get_task_cleanup_interval_seconds() - # 如果是图像文件或PDF,使用read_fn处理 - file_suffix = guess_suffix_by_path(temp_path) - if file_suffix in pdf_suffixes + image_suffixes + office_suffixes: - try: - pdf_bytes = read_fn(temp_path) - pdf_bytes_list.append(pdf_bytes) - pdf_file_names.append(file_path.stem) - os.remove(temp_path) # 删除临时文件 - except Exception as e: - return JSONResponse( - status_code=400, - content={"error": f"Failed to load file: {str(e)}"}, - ) - else: - return JSONResponse( - status_code=400, - content={"error": f"Unsupported file type: {file_suffix}"}, + async def start(self) -> None: + self.is_shutting_down = False + self.last_worker_error = None + if self.dispatcher_task is None or self.dispatcher_task.done(): + self.dispatcher_task = asyncio.create_task( + self._dispatcher_loop(), name="mineru-fastapi-task-dispatcher" + ) + if ( + self.task_retention_seconds > 0 + and (self.cleanup_task is None or self.cleanup_task.done()) + ): + self.cleanup_task = asyncio.create_task( + self._cleanup_loop(), name="mineru-fastapi-task-cleanup" + ) + + async def shutdown(self) -> None: + self.is_shutting_down = True + if self.dispatcher_task is not None: + self.dispatcher_task.cancel() + with suppress(asyncio.CancelledError): + await self.dispatcher_task + self.dispatcher_task = None + if self.cleanup_task is not None: + self.cleanup_task.cancel() + with suppress(asyncio.CancelledError): + await self.cleanup_task + self.cleanup_task = None + + pending = list(self.active_tasks) + for processor in pending: + processor.cancel() + if pending: + await asyncio.gather(*pending, return_exceptions=True) + self.active_tasks.clear() + + async def submit(self, task: AsyncParseTask) -> None: + self.tasks[task.task_id] = task + await self.queue.put(task.task_id) + + def get(self, task_id: str) -> Optional[AsyncParseTask]: + return self.tasks.get(task_id) + + def get_stats(self) -> dict[str, int]: + stats = { + TASK_PENDING: 0, + TASK_PROCESSING: 0, + TASK_COMPLETED: 0, + TASK_FAILED: 0, + } + for task in self.tasks.values(): + if task.status in stats: + stats[task.status] += 1 + return stats + + def is_healthy(self) -> bool: + if self.dispatcher_task is None: + return False + if self.dispatcher_task.done() and not self.is_shutting_down: + return False + if self.task_retention_seconds > 0 and self.cleanup_task is None: + return False + if ( + self.task_retention_seconds > 0 + and self.cleanup_task is not None + and self.cleanup_task.done() + and not self.is_shutting_down + ): + return False + return self.last_worker_error is None + + async def _dispatcher_loop(self) -> None: + try: + while True: + task_id = await self.queue.get() + processor = asyncio.create_task( + self._process_task(task_id), + name=f"mineru-fastapi-task-{task_id}", ) + self.active_tasks.add(processor) + processor.add_done_callback(self._on_processor_done) + self.queue.task_done() + except asyncio.CancelledError: + raise + except Exception as exc: + self.last_worker_error = str(exc) + logger.exception("Async task dispatcher crashed") + raise - # 设置语言列表,确保与文件数量一致 - actual_lang_list = lang_list - if len(actual_lang_list) != len(pdf_file_names): - # 如果语言列表长度不匹配,使用第一个语言或默认"ch" - actual_lang_list = [ - actual_lang_list[0] if actual_lang_list else "ch" - ] * len(pdf_file_names) + async def _cleanup_loop(self) -> None: + try: + while True: + await asyncio.sleep(self.task_cleanup_interval_seconds) + self.cleanup_expired_tasks() + except asyncio.CancelledError: + raise + except Exception as exc: + self.last_worker_error = str(exc) + logger.exception("Async task cleanup loop crashed") + raise - # 调用异步处理函数 - await aio_do_parse( - output_dir=unique_dir, - pdf_file_names=pdf_file_names, - pdf_bytes_list=pdf_bytes_list, - p_lang_list=actual_lang_list, - backend=backend, - parse_method=parse_method, - formula_enable=formula_enable, - table_enable=table_enable, - server_url=server_url, - f_draw_layout_bbox=False, - f_draw_span_bbox=False, - f_dump_md=return_md, - f_dump_middle_json=return_middle_json, - f_dump_model_output=return_model_output, - f_dump_orig_pdf=False, - f_dump_content_list=return_content_list, - start_page_id=start_page_id, - end_page_id=end_page_id, - **config, + def _on_processor_done(self, processor: asyncio.Task[Any]) -> None: + self.active_tasks.discard(processor) + if processor.cancelled(): + return + exception = processor.exception() + if exception is not None: + logger.error(f"Async task processor crashed: {exception}") + self.last_worker_error = str(exception) + + async def _process_task(self, task_id: str) -> None: + task = self.tasks.get(task_id) + if task is None: + return + + try: + if _request_semaphore is not None: + async with _request_semaphore: + await self._run_task(task) + else: + await self._run_task(task) + except asyncio.CancelledError: + raise + except Exception as exc: + task.status = TASK_FAILED + task.error = str(exc) + task.completed_at = utc_now_iso() + logger.exception(f"Async task failed: {task_id}") + + async def _run_task(self, task: AsyncParseTask) -> None: + task.status = TASK_PROCESSING + task.started_at = utc_now_iso() + task.error = None + + uploads = [ + StoredUpload( + original_name=Path(upload_path).name, + stem=Path(upload_path).stem, + path=upload_path, + ) + for upload_path in task.uploads + ] + config = getattr(self.app.state, "config", {}) + await run_parse_job( + output_dir=task.output_dir, + uploads=uploads, + request_options=task, + config=config, ) + task.status = TASK_COMPLETED + task.completed_at = utc_now_iso() - # 根据 response_format_zip 决定返回类型 - if response_format_zip: - zip_fd, zip_path = tempfile.mkstemp(suffix=".zip", prefix="mineru_results_") - os.close(zip_fd) - background_tasks.add_task(cleanup_file, zip_path) + def cleanup_expired_tasks(self) -> int: + if self.task_retention_seconds <= 0: + return 0 - with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: - for pdf_name in pdf_file_names: - safe_pdf_name = sanitize_filename(pdf_name) + now = datetime.now(timezone.utc) + expired_task_ids = [ + task_id + for task_id, task in self.tasks.items() + if self._is_task_expired(task, now) + ] - if backend.startswith("pipeline"): - parse_dir = os.path.join(unique_dir, pdf_name, parse_method) - elif backend.startswith("vlm"): - parse_dir = os.path.join(unique_dir, pdf_name, "vlm") - elif backend.startswith("hybrid"): - parse_dir = os.path.join( - unique_dir, pdf_name, f"hybrid_{parse_method}" - ) - else: - # 未知 backend,跳过此文件 - logger.warning(f"Unknown backend type: {backend}, skipping {pdf_name}") - continue + for task_id in expired_task_ids: + task = self.tasks.pop(task_id, None) + if task is None: + continue + cleanup_file(task.output_dir) + logger.info(f"Cleaned expired async task: {task_id}") + return len(expired_task_ids) - if not os.path.exists(parse_dir): - continue + def _is_task_expired(self, task: AsyncParseTask, now: datetime) -> bool: + if task.status not in (TASK_COMPLETED, TASK_FAILED): + return False + if not task.completed_at: + return False + try: + completed_at = datetime.fromisoformat(task.completed_at) + except ValueError: + logger.warning(f"Invalid completed_at for task {task.task_id}: {task.completed_at}") + return False + if completed_at.tzinfo is None: + completed_at = completed_at.replace(tzinfo=timezone.utc) + return (now - completed_at).total_seconds() >= self.task_retention_seconds - # 写入文本类结果 - if return_md: - path = os.path.join(parse_dir, f"{pdf_name}.md") - if os.path.exists(path): - zf.write( - path, - arcname=os.path.join( - safe_pdf_name, f"{safe_pdf_name}.md" - ), - ) - if return_middle_json: - path = os.path.join(parse_dir, f"{pdf_name}_middle.json") - if os.path.exists(path): - zf.write( - path, - arcname=os.path.join( - safe_pdf_name, f"{safe_pdf_name}_middle.json" - ), - ) +def get_task_manager() -> AsyncTaskManager: + task_manager = getattr(app.state, "task_manager", None) + if task_manager is None: + raise HTTPException(status_code=503, detail="Task manager is not initialized") + return task_manager - if return_model_output: - path = os.path.join(parse_dir, f"{pdf_name}_model.json") - if os.path.exists(path): - zf.write( - path, - arcname=os.path.join( - safe_pdf_name, f"{safe_pdf_name}_model.json" - ), - ) - if return_content_list: - path = os.path.join(parse_dir, f"{pdf_name}_content_list.json") - if os.path.exists(path): - zf.write( - path, - arcname=os.path.join( - safe_pdf_name, f"{safe_pdf_name}_content_list.json" - ), - ) +@app.post(path="/file_parse", status_code=202) +async def parse_pdf( + http_request: Request, + request_options: ParseRequestOptions = Depends(parse_request_form), +): + # Compatibility alias for clients still posting to /file_parse. + return await submit_async_parse_task_internal(http_request, request_options) - # 写入图片 - if return_images: - images_dir = os.path.join(parse_dir, "images") - image_paths = glob.glob( - os.path.join(glob.escape(images_dir), "*.jpg") - ) - for image_path in image_paths: - zf.write( - image_path, - arcname=os.path.join( - safe_pdf_name, - "images", - os.path.basename(image_path), - ), - ) - return FileResponse( - path=zip_path, - media_type="application/zip", - filename="results.zip", - ) - else: - # 构建 JSON 结果 - result_dict = {} - for pdf_name in pdf_file_names: - result_dict[pdf_name] = {} - data = result_dict[pdf_name] +@app.post(path="/tasks", status_code=202) +async def submit_parse_task( + http_request: Request, + request_options: ParseRequestOptions = Depends(parse_request_form), +): + return await submit_async_parse_task_internal(http_request, request_options) - if backend.startswith("pipeline"): - parse_dir = os.path.join(unique_dir, pdf_name, parse_method) - elif backend.startswith("vlm"): - parse_dir = os.path.join(unique_dir, pdf_name, "vlm") - elif backend.startswith("hybrid"): - parse_dir = os.path.join( - unique_dir, pdf_name, f"hybrid_{parse_method}" - ) - else: - # 未知 backend,跳过此文件 - logger.warning(f"Unknown backend type: {backend}, skipping {pdf_name}") - continue - if os.path.exists(parse_dir): - if return_md: - data["md_content"] = get_infer_result( - ".md", pdf_name, parse_dir - ) - if return_middle_json: - data["middle_json"] = get_infer_result( - "_middle.json", pdf_name, parse_dir - ) - if return_model_output: - data["model_output"] = get_infer_result( - "_model.json", pdf_name, parse_dir - ) - if return_content_list: - data["content_list"] = get_infer_result( - "_content_list.json", pdf_name, parse_dir - ) - if return_images: - images_dir = os.path.join(parse_dir, "images") - safe_pattern = os.path.join(glob.escape(images_dir), "*.jpg") - image_paths = glob.glob(safe_pattern) - data["images"] = { - os.path.basename( - image_path - ): f"data:image/jpeg;base64,{encode_image(image_path)}" - for image_path in image_paths - } +@app.get(path="/tasks/{task_id}", name="get_async_task_status") +async def get_async_task_status(task_id: str, request: Request): + task_manager = get_task_manager() + task = task_manager.get(task_id) + if task is None: + raise HTTPException(status_code=404, detail="Task not found") + return task.to_status_payload(request) - return JSONResponse( - status_code=200, - content={ - "backend": backend, - "version": __version__, - "results": result_dict, - }, - ) - except Exception as e: - logger.exception(e) + +@app.get(path="/tasks/{task_id}/result", name="get_async_task_result") +async def get_async_task_result( + task_id: str, + request: Request, + background_tasks: BackgroundTasks, +): + task_manager = get_task_manager() + task = task_manager.get(task_id) + if task is None: + raise HTTPException(status_code=404, detail="Task not found") + + if task.status in (TASK_PENDING, TASK_PROCESSING): return JSONResponse( - status_code=500, content={"error": f"Failed to process file: {str(e)}"} + status_code=202, + content={ + **task.to_status_payload(request), + "message": "Task result is not ready yet", + }, ) + if task.status == TASK_FAILED: + return JSONResponse( + status_code=409, + content={ + **task.to_status_payload(request), + "message": "Task execution failed", + }, + ) + + return build_result_response( + background_tasks=background_tasks, + status_code=200, + output_dir=task.output_dir, + pdf_file_names=task.file_names, + backend=task.backend, + parse_method=task.parse_method, + return_md=task.return_md, + return_middle_json=task.return_middle_json, + return_model_output=task.return_model_output, + return_content_list=task.return_content_list, + return_images=task.return_images, + response_format_zip=task.response_format_zip, + ) + + +@app.get(path="/health") +async def health_check(): + task_manager = getattr(app.state, "task_manager", None) + if task_manager is None or not task_manager.is_healthy(): + return JSONResponse( + status_code=503, + content={ + "status": "unhealthy", + "version": __version__, + "error": ( + "Task manager is not initialized" + if task_manager is None + else task_manager.last_worker_error + or ( + "Task cleanup loop is not running" + if ( + task_manager.task_retention_seconds > 0 + and task_manager.cleanup_task is None + ) + else "Task dispatcher is not running" + ) + ), + }, + ) + + stats = task_manager.get_stats() + return { + "status": "healthy", + "version": __version__, + "queued_tasks": stats[TASK_PENDING], + "processing_tasks": stats[TASK_PROCESSING], + "completed_tasks": stats[TASK_COMPLETED], + "failed_tasks": stats[TASK_FAILED], + "max_concurrent_requests": get_max_concurrent_requests(), + "task_retention_seconds": task_manager.task_retention_seconds, + "task_cleanup_interval_seconds": task_manager.task_cleanup_interval_seconds, + } + @click.command( context_settings=dict(ignore_unknown_options=True, allow_extra_args=True) @@ -430,17 +1039,14 @@ async def parse_pdf( def main(ctx, host, port, reload, **kwargs): kwargs.update(arg_parse(ctx)) - # 将配置参数存储到应用状态中 app.state.config = kwargs - # 将 CLI 的并发参数同步到环境变量,确保 uvicorn 重载子进程可见 try: mcr = int(kwargs.get("mineru_api_max_concurrent_requests", 0) or 0) except ValueError: mcr = 0 os.environ["MINERU_API_MAX_CONCURRENT_REQUESTS"] = str(mcr) - """启动MinerU FastAPI服务器的命令行入口""" print(f"Start MinerU FastAPI Service: http://{host}:{port}") print(f"API documentation: http://{host}:{port}/docs") diff --git a/projects/mcp/src/mineru/server.py b/projects/mcp/src/mineru/server.py index 61f18119..2e64d0a6 100644 --- a/projects/mcp/src/mineru/server.py +++ b/projects/mcp/src/mineru/server.py @@ -1,5 +1,6 @@ """MinerU File转Markdown转换的FastMCP服务器实现。""" +import asyncio import json import re import traceback @@ -1011,8 +1012,7 @@ async def _parse_file_local( Returns: Dict[str, Any]: 包含解析结果的字典 """ - # API URL路径 - api_url = f"{config.LOCAL_MINERU_API_BASE}/file_parse" + submit_url = f"{config.LOCAL_MINERU_API_BASE}/tasks" # 使用Path对象确保文件路径正确 file_path_obj = Path(file_path) @@ -1027,33 +1027,58 @@ async def _parse_file_local( file_type = file_path_obj.suffix.lower() form_data = aiohttp.FormData() form_data.add_field( - "file", file_data, filename=file_path_obj.name, content_type=file_type + "files", file_data, filename=file_path_obj.name, content_type=file_type ) form_data.add_field("parse_method", parse_method) - config.logger.debug(f"发送本地API请求到: {api_url}") + config.logger.debug(f"发送本地API请求到: {submit_url}") config.logger.debug(f"上传文件: {file_path_obj.name} (大小: {len(file_data)} 字节)") - # 发送请求 + poll_timeout_seconds = 300 + poll_interval_seconds = 1 + try: async with aiohttp.ClientSession() as session: - async with session.post(api_url, data=form_data) as response: - if response.status != 200: + async with session.post(submit_url, data=form_data) as response: + if response.status != 202: error_text = await response.text() config.logger.error( f"API返回错误状态码: {response.status}, 错误信息: {error_text}" ) raise RuntimeError(f"API返回错误: {response.status}, {error_text}") - result = await response.json() + submit_result = await response.json() + task_id = submit_result.get("task_id") + if not task_id: + raise RuntimeError(f"任务提交成功但未返回 task_id: {submit_result}") - config.logger.debug(f"本地API响应: {result}") + result_url = f"{config.LOCAL_MINERU_API_BASE}/tasks/{task_id}/result" + deadline = asyncio.get_running_loop().time() + poll_timeout_seconds + while True: + async with session.get(result_url) as result_response: + if result_response.status == 200: + result = await result_response.json() + config.logger.debug(f"本地API响应: {result}") + if "error" in result: + return {"status": "error", "error": result["error"]} + return {"status": "success", "result": result} - # 处理响应 - if "error" in result: - return {"status": "error", "error": result["error"]} + if result_response.status == 202: + if asyncio.get_running_loop().time() >= deadline: + raise RuntimeError( + f"任务 {task_id} 超时未完成,超过 {poll_timeout_seconds} 秒" + ) + await asyncio.sleep(poll_interval_seconds) + continue - return {"status": "success", "result": result} + error_text = await result_response.text() + config.logger.error( + "任务结果查询失败: " + f"task_id={task_id}, status={result_response.status}, error={error_text}" + ) + raise RuntimeError( + f"任务结果查询失败: {result_response.status}, {error_text}" + ) except aiohttp.ClientError as e: error_msg = f"与本地API通信时出错: {str(e)}" config.logger.error(error_msg)