Refactor FastAPI to add asynchronous task upload and download interfaces.

This commit is contained in:
myhloli
2026-03-19 19:05:54 +08:00
parent d268ca70f8
commit 1d96a77a19
5 changed files with 943 additions and 248 deletions

View File

@@ -48,6 +48,8 @@ services:
memlock: -1
stack: 67108864
ipc: host
healthcheck:
test: ["CMD-SHELL", "curl -f http://localhost:8000/health || exit 1"]
deploy:
resources:
reservations:

View File

@@ -33,6 +33,37 @@ If you need to adjust parsing options through custom parameters, you can also ch
```
>[!TIP]
>Access `http://127.0.0.1:8000/docs` in your browser to view the API documentation.
>
>- Health endpoint: `GET /health`
>- Task submission endpoint: `POST /tasks`
>- Task query endpoints: `GET /tasks/{task_id}`, `GET /tasks/{task_id}/result`
>- Compatibility route: `POST /file_parse`, which now behaves the same as `POST /tasks` and returns a `task_id` immediately
>- API outputs are controlled by the server and written to `./output` by default
>
>Async tasks are tracked only in-process for a single `mineru-api` instance. Task status is not preserved across service restarts, `--reload`, or multi-process deployments.
>Completed or failed tasks are retained for 24 hours by default, then their task state and output directory are cleaned automatically. After cleanup, task status and result endpoints return `404`.
>Use `MINERU_API_TASK_RETENTION_SECONDS` and `MINERU_API_TASK_CLEANUP_INTERVAL_SECONDS` to adjust retention and cleanup polling intervals.
>
>Task submission example:
>```bash
>curl -X POST http://127.0.0.1:8000/tasks \
> -F "files=@demo/pdfs/demo1.pdf" \
> -F "return_md=true"
>```
>
>Compatibility route example:
>```bash
>curl -X POST http://127.0.0.1:8000/file_parse \
> -F "files=@demo/pdfs/demo1.pdf" \
> -F "return_md=true"
>```
>
>Poll task status and fetch results:
>```bash
>curl http://127.0.0.1:8000/tasks/<task_id>
>curl http://127.0.0.1:8000/tasks/<task_id>/result
>curl http://127.0.0.1:8000/health
>```
- Start Gradio WebUI visual frontend:
```bash
mineru-gradio --server-name 0.0.0.0 --server-port 7860

View File

@@ -33,6 +33,37 @@ mineru -p <input_path> -o <output_path>
```
>[!TIP]
>在浏览器中访问 `http://127.0.0.1:8000/docs` 查看API文档。
>
>- 健康检查接口:`GET /health`
>- 任务提交接口:`POST /tasks`
>- 任务查询接口:`GET /tasks/{task_id}`、`GET /tasks/{task_id}/result`
>- 兼容路由:`POST /file_parse`,行为与 `POST /tasks` 相同,都会立即返回 `task_id`
>- API 输出目录由服务端固定控制,默认写入 `./output`
>
>异步任务为单进程、进程内状态实现,服务重启、`--reload` 热重载或多进程部署后不保证仍可查询历史任务状态。
>默认任务完成或失败后保留 24 小时,随后自动清理任务状态和输出目录;清理后访问任务状态或结果会返回 `404`。
>可通过环境变量 `MINERU_API_TASK_RETENTION_SECONDS` 和 `MINERU_API_TASK_CLEANUP_INTERVAL_SECONDS` 调整保留时长与清理轮询间隔。
>
>任务提交示例:
>```bash
>curl -X POST http://127.0.0.1:8000/tasks \
> -F "files=@demo/pdfs/demo1.pdf" \
> -F "return_md=true"
>```
>
>兼容路由示例:
>```bash
>curl -X POST http://127.0.0.1:8000/file_parse \
> -F "files=@demo/pdfs/demo1.pdf" \
> -F "return_md=true"
>```
>
>轮询任务状态与结果:
>```bash
>curl http://127.0.0.1:8000/tasks/<task_id>
>curl http://127.0.0.1:8000/tasks/<task_id>/result
>curl http://127.0.0.1:8000/health
>```
- 启动gradio webui 可视化前端:
```bash
mineru-gradio --server-name 0.0.0.0 --server-port 7860

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,6 @@
"""MinerU File转Markdown转换的FastMCP服务器实现。"""
import asyncio
import json
import re
import traceback
@@ -1011,8 +1012,7 @@ async def _parse_file_local(
Returns:
Dict[str, Any]: 包含解析结果的字典
"""
# API URL路径
api_url = f"{config.LOCAL_MINERU_API_BASE}/file_parse"
submit_url = f"{config.LOCAL_MINERU_API_BASE}/tasks"
# 使用Path对象确保文件路径正确
file_path_obj = Path(file_path)
@@ -1027,33 +1027,58 @@ async def _parse_file_local(
file_type = file_path_obj.suffix.lower()
form_data = aiohttp.FormData()
form_data.add_field(
"file", file_data, filename=file_path_obj.name, content_type=file_type
"files", file_data, filename=file_path_obj.name, content_type=file_type
)
form_data.add_field("parse_method", parse_method)
config.logger.debug(f"发送本地API请求到: {api_url}")
config.logger.debug(f"发送本地API请求到: {submit_url}")
config.logger.debug(f"上传文件: {file_path_obj.name} (大小: {len(file_data)} 字节)")
# 发送请求
poll_timeout_seconds = 300
poll_interval_seconds = 1
try:
async with aiohttp.ClientSession() as session:
async with session.post(api_url, data=form_data) as response:
if response.status != 200:
async with session.post(submit_url, data=form_data) as response:
if response.status != 202:
error_text = await response.text()
config.logger.error(
f"API返回错误状态码: {response.status}, 错误信息: {error_text}"
)
raise RuntimeError(f"API返回错误: {response.status}, {error_text}")
result = await response.json()
submit_result = await response.json()
task_id = submit_result.get("task_id")
if not task_id:
raise RuntimeError(f"任务提交成功但未返回 task_id: {submit_result}")
config.logger.debug(f"本地API响应: {result}")
result_url = f"{config.LOCAL_MINERU_API_BASE}/tasks/{task_id}/result"
deadline = asyncio.get_running_loop().time() + poll_timeout_seconds
while True:
async with session.get(result_url) as result_response:
if result_response.status == 200:
result = await result_response.json()
config.logger.debug(f"本地API响应: {result}")
if "error" in result:
return {"status": "error", "error": result["error"]}
return {"status": "success", "result": result}
# 处理响应
if "error" in result:
return {"status": "error", "error": result["error"]}
if result_response.status == 202:
if asyncio.get_running_loop().time() >= deadline:
raise RuntimeError(
f"任务 {task_id} 超时未完成,超过 {poll_timeout_seconds}"
)
await asyncio.sleep(poll_interval_seconds)
continue
return {"status": "success", "result": result}
error_text = await result_response.text()
config.logger.error(
"任务结果查询失败: "
f"task_id={task_id}, status={result_response.status}, error={error_text}"
)
raise RuntimeError(
f"任务结果查询失败: {result_response.status}, {error_text}"
)
except aiohttp.ClientError as e:
error_msg = f"与本地API通信时出错: {str(e)}"
config.logger.error(error_msg)