Merge pull request #3760 from magicyuan876/master

feat(tianshu): v2.0 架构升级 - Worker主动拉取模式
This commit is contained in:
Xiaomeng Zhao
2025-10-17 18:31:38 +08:00
committed by GitHub
6 changed files with 886 additions and 280 deletions

View File

@@ -5,29 +5,44 @@
## 🌟 核心特性
-**异步处理** - 客户端立即响应(~100ms无需等待处理完成
-**任务持久化** - SQLite 存储,服务重启任务不丢失
-**GPU 负载均衡** - LitServe 自动调度,资源利用最优
### 高性能架构
-**Worker 主动拉取** - 0.5秒响应速度,无需调度器触发
-**并发安全** - 原子操作防止任务重复,支持多Worker并发
-**GPU 负载均衡** - LitServe 自动调度,避免显存冲突
-**多GPU隔离** - 每个进程只使用分配的GPU,彻底解决多卡占用
### 企业级功能
-**异步处理** - 客户端立即响应(~100ms,无需等待处理完成
-**任务持久化** - SQLite 存储,服务重启任务不丢失
-**优先级队列** - 重要任务优先处理
-**实时查询** - 随时查看任务进度和状态
-**自动清理** - 定期清理旧结果文件,保留数据库记录
### 智能解析
-**双解析器** - PDF/图片用 MinerU(GPU加速), Office/HTML等用 MarkItDown(快速)
-**内容获取** - API自动返回 Markdown 内容,支持图片上传到 MinIO
-**RESTful API** - 支持任何编程语言接入
-**智能解析器** - PDF/图片用 MinerU其他所有格式用 MarkItDown
-**内容获取** - 获取解析后的 Markdown 内容,支持图片上传到 MinIO
-**实时查询** - 随时查看任务进度和状态
## 🏗️ 系统架构
```
客户端请求 → FastAPI Server (立即返回 task_id)
SQLite 任务队列
SQLite 任务队列 (并发安全)
Task Scheduler (调度器)
LitServe Worker Pool (主动拉取 + GPU自动负载均衡)
LitServe Worker Pool (GPU自动负载均衡)
MinerU / MarkItDown 解析
MinerU 核心处理
Task Scheduler (可选监控组件)
```
**架构特点**:
-**Worker 主动模式**: Workers 持续循环拉取任务,无需调度器触发
-**并发安全**: SQLite 使用原子操作防止任务重复处理
-**自动负载均衡**: LitServe 自动分配任务到空闲 GPU
-**智能解析**: PDF/图片用 MinerU,其他格式用 MarkItDown
## 🚀 快速开始
### 1. 安装依赖
@@ -86,18 +101,24 @@ curl http://localhost:8000/api/v1/tasks/{task_id}?upload_images=true
```
mineru_tianshu/
├── task_db.py # 数据库管理
├── api_server.py # API 服务器
├── litserve_worker.py # Worker Pool (MinerU + MarkItDown)
├── task_scheduler.py # 任务调度器
├── task_db.py # 数据库管理 (并发安全,支持清理)
├── api_server.py # API 服务器 (自动返回内容)
├── litserve_worker.py # Worker Pool (主动拉取 + 双解析器)
├── task_scheduler.py # 任务调度器 (可选监控)
├── start_all.py # 启动脚本
├── client_example.py # 客户端示例
└── requirements.txt # 依赖配置
```
**核心组件说明**:
- `task_db.py`: 使用原子操作保证并发安全,支持旧任务清理
- `api_server.py`: 查询接口自动返回Markdown内容,支持MinIO图片上传
- `litserve_worker.py`: Worker主动循环拉取任务,支持MinerU和MarkItDown双解析
- `task_scheduler.py`: 可选组件,仅用于监控和健康检查(默认5分钟监控,15分钟健康检查)
## 📚 使用示例
### 示例 1: 提交任务并等待结果
### 示例 1: 提交任务并等待结果 (新版本 - 自动返回内容)
```python
import requests
@@ -119,14 +140,18 @@ while True:
result = response.json()
if result['status'] == 'completed':
# 任务完成自动返回解析内容
# v2.0 新特性: 任务完成自动返回解析内容
if result.get('data'):
content = result['data']['content']
print(f"✅ 解析完成,内容长度: {len(content)} 字符")
print(f" 解析方法: {result['data'].get('parser', 'Unknown')}")
# 保存结果
with open('output.md', 'w', encoding='utf-8') as f:
f.write(content)
else:
# 结果文件已被清理
print(f"⚠️ 任务完成但结果文件已清理: {result.get('message', '')}")
break
elif result['status'] == 'failed':
print(f"❌ 失败: {result['error_message']}")
@@ -136,25 +161,29 @@ while True:
time.sleep(2)
```
### 示例 2: 图片上传到 MinIO
### 示例 2: 图片上传到 MinIO (可选功能)
```python
import requests
task_id = "your-task-id"
# 查询状态并上传图片到 MinIO
# v2.0: 查询时自动返回内容,同时可选上传图片到 MinIO
response = requests.get(
f'http://localhost:8000/api/v1/tasks/{task_id}',
params={'upload_images': True}
params={'upload_images': True} # 启用图片上传
)
result = response.json()
if result['status'] == 'completed' and result.get('data'):
# 图片已替换为 MinIO URL
# 图片已替换为 MinIO URL (HTML img 标签格式)
content = result['data']['content']
print(f"✅ 图片已上传: {result['data']['images_uploaded']}")
images_uploaded = result['data']['images_uploaded']
print(f"✅ 图片已上传到 MinIO: {images_uploaded}")
print(f" 内容长度: {len(content)} 字符")
# 保存包含 MinIO 图片链接的 Markdown
with open('output_with_cloud_images.md', 'w', encoding='utf-8') as f:
f.write(content)
```
@@ -202,17 +231,30 @@ python client_example.py priority # 优先级队列
python start_all.py [选项]
选项:
--output-dir PATH 输出目录 (默认: /tmp/mineru_tianshu_output)
--api-port PORT API端口 (默认: 8000)
--worker-port PORT Worker端口 (默认: 9000)
--accelerator TYPE 加速器类型: auto/cuda/cpu/mps (默认: auto)
--workers-per-device N 每个GPU的worker数 (默认: 1)
--devices DEVICES 使用的GPU设备 (默认: auto使用所有GPU)
--output-dir PATH 输出目录 (默认: /tmp/mineru_tianshu_output)
--api-port PORT API端口 (默认: 8000)
--worker-port PORT Worker端口 (默认: 9000)
--accelerator TYPE 加速器类型: auto/cuda/cpu/mps (默认: auto)
--workers-per-device N 每个GPU的worker数 (默认: 1)
--devices DEVICES 使用的GPU设备 (默认: auto使用所有GPU)
--poll-interval SECONDS Worker拉取任务间隔 (默认: 0.5秒)
--enable-scheduler 启用可选的任务调度器 (默认: 不启动)
--monitor-interval SECONDS 调度器监控间隔 (默认: 300秒=5分钟)
--cleanup-old-files-days N 清理N天前的结果文件 (默认: 7天, 0=禁用)
```
**新增功能说明**:
- `--poll-interval`: Worker空闲时拉取任务的频率,默认0.5秒响应极快
- `--enable-scheduler`: 是否启动调度器(可选),仅用于监控和健康检查
- `--monitor-interval`: 调度器日志输出频率,建议5-10分钟避免刷屏
- `--cleanup-old-files-days`: 自动清理旧结果文件但保留数据库记录
### 配置示例
```bash
# 基础启动(推荐)
python start_all.py
# CPU模式无GPU或测试
python start_all.py --accelerator cpu
@@ -222,8 +264,24 @@ python start_all.py --accelerator cuda --workers-per-device 2
# 指定GPU: 只使用GPU 0和1
python start_all.py --accelerator cuda --devices 0,1
# 自定义端口
python start_all.py --api-port 8080 --worker-port 9090
# 启用监控调度器(可选)
python start_all.py --enable-scheduler --monitor-interval 300
# 调整Worker拉取频率高负载场景
python start_all.py --poll-interval 1.0
# 禁用旧文件清理(保留所有结果)
python start_all.py --cleanup-old-files-days 0
# 完整配置示例
python start_all.py \
--accelerator cuda \
--devices 0,1 \
--workers-per-device 2 \
--poll-interval 0.5 \
--enable-scheduler \
--monitor-interval 300 \
--cleanup-old-files-days 7
# Mac M系列芯片
python start_all.py --accelerator mps
@@ -272,7 +330,16 @@ GET /api/v1/tasks/{task_id}?upload_images=false
:
- status: pending | processing | completed | failed
- data: Markdown
- data: **** Markdown
- markdown_file:
- content: Markdown
- images_uploaded:
- has_images:
- message:
:
- v2.0 : ,
- (),data null
```
### 3. 队列统计
@@ -289,6 +356,22 @@ DELETE /api/v1/tasks/{task_id}
pending
```
### 5. 管理接口
**重置超时任务**
```http
POST /api/v1/admin/reset-stale?timeout_minutes=60
processing pending
```
**清理旧任务**
```http
POST /api/v1/admin/cleanup?days=7
(24)
```
## 🔧 故障排查
### 问题1: Worker 无法启动
@@ -305,19 +388,30 @@ pip list | grep -E "(mineru|litserve|torch)"
### 问题2: 任务一直 pending
**检查调度器**
> ⚠️ **重要**: Worker 现在是主动拉取模式,不需要调度器触发!
**检查 Worker 是否运行**
```bash
ps aux | grep task_scheduler.py
# Windows
tasklist | findstr python
# Linux/Mac
ps aux | grep litserve_worker
```
**手动触发**
**检查 Worker 健康状态**
```bash
curl -X POST http://localhost:9000/predict \
-H "Content-Type: application/json" \
-d '{"action":"poll"}'
-d '{"action":"health"}'
```
### 问题3: 显存不足
**查看数据库状态**
```bash
python -c "from task_db import TaskDB; db = TaskDB(); print(db.get_queue_stats())"
```
### 问题3: 显存不足或多卡占用
**减少worker数量**
```bash
@@ -330,6 +424,14 @@ export MINERU_VIRTUAL_VRAM_SIZE=6
python start_all.py
```
**指定特定GPU**
```bash
# 只使用GPU 0
python start_all.py --devices 0
```
> 💡 **提示**: 新版本已修复多卡显存占用问题,通过设置 `CUDA_VISIBLE_DEVICES` 确保每个进程只使用分配的GPU
### 问题4: 端口被占用
**查看占用**
@@ -343,16 +445,99 @@ lsof -i :8000
**使用其他端口**
```bash
python start_all.py --api-port 8080
python start_all.py --api-port 8080 --worker-port 9090
```
### 问题5: 结果文件丢失
**查询任务状态**
```bash
curl http://localhost:8000/api/v1/tasks/{task_id}
```
**说明**: 如果返回 `result files have been cleaned up`,说明结果文件已被清理(默认7天后)
**解决方案**:
```bash
# 延长保留时间为30天
python start_all.py --cleanup-old-files-days 30
# 或禁用自动清理
python start_all.py --cleanup-old-files-days 0
```
### 问题6: 任务重复处理
**症状**: 同一个任务被多个 worker 处理
**原因**: 这不应该发生,数据库使用了原子操作防止重复
**排查**:
```bash
# 检查是否有多个 TaskDB 实例连接不同的数据库文件
# 确保所有组件使用同一个 mineru_tianshu.db
```
## 🛠️ 技术栈
- **Web**: FastAPI + Uvicorn
- **解析器**: MinerU (PDF/图片) + MarkItDown (Office/文本)
- **GPU 调度**: LitServe
- **存储**: SQLite + MinIO (可选)
- **解析器**: MinerU (PDF/图片) + MarkItDown (Office/文本/HTML等)
- **GPU 调度**: LitServe (自动负载均衡)
- **存储**: SQLite (并发安全) + MinIO (可选)
- **日志**: Loguru
- **并发模型**: Worker主动拉取 + 原子操作
## 🆕 版本更新说明
### v2.0 重大改进
**1. Worker 主动拉取模式**
- ✅ Workers 持续循环拉取任务,无需调度器触发
- ✅ 默认 0.5 秒拉取间隔,响应速度极快
- ✅ 空闲时自动休眠,不占用CPU资源
**2. 数据库并发安全增强**
- ✅ 使用 `BEGIN IMMEDIATE` 和原子操作
- ✅ 防止任务重复处理
- ✅ 支持多 Worker 并发拉取
**3. 调度器变为可选**
- ✅ 不再是必需组件,Workers 可独立运行
- ✅ 仅用于系统监控和健康检查
- ✅ 默认不启动,减少系统开销
**4. 结果文件清理功能**
- ✅ 自动清理旧结果文件(默认7天)
- ✅ 保留数据库记录供查询
- ✅ 可配置清理周期或禁用
**5. API 自动返回内容**
- ✅ 查询接口自动返回 Markdown 内容
- ✅ 无需额外请求获取结果
- ✅ 支持图片上传到 MinIO
**6. 多GPU显存优化**
- ✅ 修复多卡显存占用问题
- ✅ 每个进程只使用分配的GPU
- ✅ 通过 `CUDA_VISIBLE_DEVICES` 隔离
### 迁移指南 (v1.x → v2.0)
**无需修改代码**,只需注意:
1. 调度器现在是可选的,不启动也能正常工作
2. 结果文件默认7天后清理,如需保留请设置 `--cleanup-old-files-days 0`
3. API 查询接口现在会返回 `data` 字段包含完整内容
### 性能提升
| 指标 | v1.x | v2.0 | 提升 |
|-----|------|------|-----|
| 任务响应延迟<sup>※</sup> | 5-10秒 (调度器轮询) | 0.5秒 (Worker主动拉取) | **10-20倍** |
| 并发安全性 | 基础锁机制 | 原子操作 + 状态检查 | **可靠性提升** |
| 多GPU效率 | 有时会出现显存冲突 | 完全隔离,无冲突 | **稳定性提升** |
| 系统开销 | 调度器持续运行 | 可选监控(5分钟) | **资源节省** |
※ 任务响应延迟指任务添加到被 Worker 开始处理的时间间隔。v1.x 主要受调度器轮询间隔影响,非测量端到端处理时间。实际端到端响应时间还包括任务类型和系统负载所有因子。
## 📝 核心依赖

View File

@@ -226,8 +226,14 @@ async def get_task_status(
}
logger.info(f"✅ Task status: {task['status']} - (result_path: {task['result_path']})")
# 如果任务已完成,自动返回解析内容
if task['status'] == 'completed' and task['result_path']:
# 如果任务已完成,尝试返回解析内容
if task['status'] == 'completed':
if not task['result_path']:
# 结果文件已被清理
response['data'] = None
response['message'] = 'Task completed but result files have been cleaned up (older than retention period)'
return response
result_dir = Path(task['result_path'])
logger.info(f"📂 Checking result directory: {result_dir}")

View File

@@ -3,11 +3,15 @@ MinerU Tianshu - LitServe Worker
天枢 LitServe Worker
使用 LitServe 实现 GPU 资源的自动负载均衡
从 SQLite 队列拉取任务并处理
Worker 主动循环拉取任务并处理
"""
import os
import json
import sys
import time
import threading
import signal
import atexit
from pathlib import Path
import litserve as ls
from loguru import logger
@@ -18,7 +22,7 @@ sys.path.insert(0, str(Path(__file__).parent.parent.parent))
from task_db import TaskDB
from mineru.cli.common import do_parse, read_fn
from mineru.utils.config_reader import get_device
from mineru.utils.model_utils import get_vram
from mineru.utils.model_utils import get_vram, clean_memory
# 尝试导入 markitdown
try:
@@ -33,10 +37,12 @@ class MinerUWorkerAPI(ls.LitAPI):
"""
LitServe API Worker
从 SQLite 队列拉取任务,利用 LitServe 的自动 GPU 负载均衡
Worker 主动循环拉取任务,利用 LitServe 的自动 GPU 负载均衡
支持两种解析方式:
- PDF/图片 -> MinerU 解析GPU 加速)
- 其他所有格式 -> MarkItDown 解析(快速处理)
新模式:每个 worker 启动后持续循环拉取任务,处理完一个立即拉取下一个
"""
# 支持的文件格式定义
@@ -44,19 +50,26 @@ class MinerUWorkerAPI(ls.LitAPI):
PDF_IMAGE_FORMATS = {'.pdf', '.png', '.jpg', '.jpeg', '.bmp', '.tiff', '.tif', '.webp'}
# 其他所有格式都使用 MarkItDown 解析
def __init__(self, output_dir='/tmp/mineru_tianshu_output', worker_id_prefix='tianshu'):
def __init__(self, output_dir='/tmp/mineru_tianshu_output', worker_id_prefix='tianshu',
poll_interval=0.5, enable_worker_loop=True):
super().__init__()
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.worker_id_prefix = worker_id_prefix
self.poll_interval = poll_interval # Worker 拉取任务的间隔(秒)
self.enable_worker_loop = enable_worker_loop # 是否启用 worker 循环拉取
self.db = TaskDB()
self.worker_id = None
self.markitdown = None
self.running = False # Worker 运行状态
self.worker_thread = None # Worker 线程
def setup(self, device):
"""
初始化环境(每个 worker 进程调用一次)
关键修复:使用 CUDA_VISIBLE_DEVICES 确保每个进程只使用分配的 GPU
Args:
device: LitServe 分配的设备 (cuda:0, cuda:1, etc.)
"""
@@ -68,11 +81,21 @@ class MinerUWorkerAPI(ls.LitAPI):
logger.info(f"⚙️ Worker {self.worker_id} setting up on device: {device}")
# 配置 MinerU 环境
if os.getenv('MINERU_DEVICE_MODE', None) is None:
os.environ['MINERU_DEVICE_MODE'] = device if device != 'auto' else get_device()
device_mode = os.environ['MINERU_DEVICE_MODE']
# 关键修复:设置 CUDA_VISIBLE_DEVICES 限制进程只能看到分配的 GPU
# 这样可以防止一个进程占用多张卡的显存
if device != 'auto' and device != 'cpu' and ':' in str(device):
# 从 'cuda:0' 提取设备ID '0'
device_id = str(device).split(':')[-1]
os.environ['CUDA_VISIBLE_DEVICES'] = device_id
# 设置为 cuda:0因为对进程来说只能看到一张卡逻辑ID变为0
os.environ['MINERU_DEVICE_MODE'] = 'cuda:0'
device_mode = os.environ['MINERU_DEVICE_MODE']
logger.info(f"🔒 CUDA_VISIBLE_DEVICES={device_id} (Physical GPU {device_id} → Logical GPU 0)")
else:
# 配置 MinerU 环境
if os.getenv('MINERU_DEVICE_MODE', None) is None:
os.environ['MINERU_DEVICE_MODE'] = device if device != 'auto' else get_device()
device_mode = os.environ['MINERU_DEVICE_MODE']
# 配置显存
if os.getenv('MINERU_VIRTUAL_VRAM_SIZE', None) is None:
@@ -93,12 +116,163 @@ class MinerUWorkerAPI(ls.LitAPI):
logger.info(f"✅ Worker {self.worker_id} ready")
logger.info(f" Device: {device_mode}")
logger.info(f" VRAM: {os.environ['MINERU_VIRTUAL_VRAM_SIZE']}GB")
# 启动 worker 循环拉取任务(在独立线程中)
if self.enable_worker_loop:
self.running = True
self.worker_thread = threading.Thread(
target=self._worker_loop,
daemon=True,
name=f"Worker-{self.worker_id}"
)
self.worker_thread.start()
logger.info(f"🔄 Worker loop started (poll_interval={self.poll_interval}s)")
def teardown(self):
"""
优雅关闭 Worker
设置 running 标志为 False等待 worker 线程完成当前任务后退出。
这避免了守护线程可能导致的任务处理不完整或数据库操作不一致问题。
"""
if self.enable_worker_loop and self.worker_thread and self.worker_thread.is_alive():
logger.info(f"🛑 Shutting down worker {self.worker_id}...")
self.running = False
# 等待线程完成当前任务(最多等待 poll_interval * 2 秒)
timeout = self.poll_interval * 2
self.worker_thread.join(timeout=timeout)
if self.worker_thread.is_alive():
logger.warning(f"⚠️ Worker thread did not stop within {timeout}s, forcing exit")
else:
logger.info(f"✅ Worker {self.worker_id} shut down gracefully")
def _worker_loop(self):
"""
Worker 主循环:持续拉取并处理任务
这个方法在独立线程中运行,让每个 worker 主动拉取任务
而不是被动等待调度器触发
"""
logger.info(f"🔁 {self.worker_id} started task polling loop")
idle_count = 0
while self.running:
try:
# 从数据库获取任务
task = self.db.get_next_task(self.worker_id)
if task:
idle_count = 0 # 重置空闲计数
# 处理任务
task_id = task['task_id']
logger.info(f"🔄 {self.worker_id} picked up task {task_id}")
try:
self._process_task(task)
except Exception as e:
logger.error(f"{self.worker_id} failed to process task {task_id}: {e}")
success = self.db.update_task_status(
task_id, 'failed',
error_message=str(e),
worker_id=self.worker_id
)
if not success:
logger.warning(f"⚠️ Task {task_id} was modified by another process during failure update")
else:
# 没有任务时,增加空闲计数
idle_count += 1
# 只在第一次空闲时记录日志,避免刷屏
if idle_count == 1:
logger.debug(f"💤 {self.worker_id} is idle, waiting for tasks...")
# 空闲时等待一段时间再拉取
time.sleep(self.poll_interval)
except Exception as e:
logger.error(f"{self.worker_id} loop error: {e}")
time.sleep(self.poll_interval)
logger.info(f"⏹️ {self.worker_id} stopped task polling loop")
def _process_task(self, task: dict):
"""
处理单个任务
Args:
task: 任务字典
"""
task_id = task['task_id']
file_path = task['file_path']
file_name = task['file_name']
backend = task['backend']
options = json.loads(task['options'])
logger.info(f"🔄 Processing task {task_id}: {file_name}")
try:
# 准备输出目录
output_path = self.output_dir / task_id
output_path.mkdir(parents=True, exist_ok=True)
# 判断文件类型并选择解析方式
file_type = self._get_file_type(file_path)
if file_type == 'pdf_image':
# 使用 MinerU 解析 PDF 和图片
self._parse_with_mineru(
file_path=Path(file_path),
file_name=file_name,
task_id=task_id,
backend=backend,
options=options,
output_path=output_path
)
parse_method = 'MinerU'
else: # file_type == 'markitdown'
# 使用 markitdown 解析所有其他格式
self._parse_with_markitdown(
file_path=Path(file_path),
file_name=file_name,
output_path=output_path
)
parse_method = 'MarkItDown'
# 更新状态为成功
success = self.db.update_task_status(
task_id, 'completed',
result_path=str(output_path),
worker_id=self.worker_id
)
if success:
logger.info(f"✅ Task {task_id} completed by {self.worker_id}")
logger.info(f" Parser: {parse_method}")
logger.info(f" Output: {output_path}")
else:
logger.warning(
f"⚠️ Task {task_id} was modified by another process. "
f"Worker {self.worker_id} completed the work but status update was rejected."
)
finally:
# 清理临时文件
try:
if Path(file_path).exists():
Path(file_path).unlink()
except Exception as e:
logger.warning(f"Failed to clean up temp file {file_path}: {e}")
def decode_request(self, request):
"""
解码请求
接收一个 'poll' 信号来触发从数据库拉取任务
现在主要用于健康检查和手动触发(兼容旧接口)
"""
return request.get('action', 'poll')
@@ -136,20 +310,28 @@ class MinerUWorkerAPI(ls.LitAPI):
"""
logger.info(f"📄 Using MinerU to parse: {file_name}")
# 读取文件
pdf_bytes = read_fn(file_path)
# 执行解析
do_parse(
output_dir=str(output_path),
pdf_file_names=[Path(file_name).stem],
pdf_bytes_list=[pdf_bytes],
p_lang_list=[options.get('lang', 'ch')],
backend=backend,
parse_method=options.get('method', 'auto'),
formula_enable=options.get('formula_enable', True),
table_enable=options.get('table_enable', True),
)
try:
# 读取文件
pdf_bytes = read_fn(file_path)
# 执行解析MinerU 的 ModelSingleton 会自动复用模型)
do_parse(
output_dir=str(output_path),
pdf_file_names=[Path(file_name).stem],
pdf_bytes_list=[pdf_bytes],
p_lang_list=[options.get('lang', 'ch')],
backend=backend,
parse_method=options.get('method', 'auto'),
formula_enable=options.get('formula_enable', True),
table_enable=options.get('table_enable', True),
)
finally:
# 使用 MinerU 自带的内存清理函数
# 这个函数只清理推理产生的中间结果,不会卸载模型
try:
clean_memory()
except Exception as e:
logger.debug(f"Memory cleanup failed for task {task_id}: {e}")
def _parse_with_markitdown(self, file_path: Path, file_name: str,
output_path: Path):
@@ -177,103 +359,65 @@ class MinerUWorkerAPI(ls.LitAPI):
def predict(self, action):
"""
从数据库拉取任务并处理
HTTP 接口(主要用于健康检查和监控)
这里是实际的任务处理逻辑LitServe 会自动管理 GPU 负载均衡
支持根据文件类型选择不同的解析器:
- PDF/图片 -> MinerUGPU 加速)
- 其他所有格式 -> MarkItDown快速处理
现在任务由 worker 循环自动拉取处理,这个接口主要用于:
1. 健康检查
2. 获取 worker 状态
3. 兼容旧的手动触发模式(当 enable_worker_loop=False 时
"""
if action != 'poll':
if action == 'health':
# 健康检查
stats = self.db.get_queue_stats()
return {
'status': 'error',
'message': 'Invalid action. Use {"action": "poll"} to trigger task processing.'
'status': 'healthy',
'worker_id': self.worker_id,
'worker_loop_enabled': self.enable_worker_loop,
'worker_running': self.running,
'queue_stats': stats
}
# 从数据库获取任务
task = self.db.get_next_task(self.worker_id)
if not task:
# 没有任务时返回空闲状态
return {
'status': 'idle',
'message': 'No pending tasks in queue',
'worker_id': self.worker_id
}
# 提取任务信息
task_id = task['task_id']
file_path = task['file_path']
file_name = task['file_name']
backend = task['backend']
options = json.loads(task['options'])
logger.info(f"🔄 Worker {self.worker_id} processing task {task_id}: {file_name}")
try:
# 准备输出目录
output_path = self.output_dir / task_id
output_path.mkdir(parents=True, exist_ok=True)
# 判断文件类型并选择解析方式
file_type = self._get_file_type(file_path)
if file_type == 'pdf_image':
# 使用 MinerU 解析 PDF 和图片
self._parse_with_mineru(
file_path=Path(file_path),
file_name=file_name,
task_id=task_id,
backend=backend,
options=options,
output_path=output_path
)
parse_method = 'MinerU'
elif action == 'poll':
if not self.enable_worker_loop:
# 兼容模式:手动触发任务拉取
task = self.db.get_next_task(self.worker_id)
else: # file_type == 'markitdown'
# 使用 markitdown 解析所有其他格式
self._parse_with_markitdown(
file_path=Path(file_path),
file_name=file_name,
output_path=output_path
)
parse_method = 'MarkItDown'
# 更新状态为成功
self.db.update_task_status(task_id, 'completed', str(output_path))
logger.info(f"✅ Task {task_id} completed by {self.worker_id}")
logger.info(f" Parser: {parse_method}")
logger.info(f" Output: {output_path}")
return {
'status': 'completed',
'task_id': task_id,
'file_name': file_name,
'parse_method': parse_method,
'file_type': file_type,
'output_path': str(output_path),
'worker_id': self.worker_id
}
except Exception as e:
logger.error(f"❌ Task {task_id} failed: {e}")
self.db.update_task_status(task_id, 'failed', error_message=str(e))
return {
'status': 'failed',
'task_id': task_id,
'error': str(e),
'worker_id': self.worker_id
}
if not task:
return {
'status': 'idle',
'message': 'No pending tasks in queue',
'worker_id': self.worker_id
}
try:
self._process_task(task)
return {
'status': 'completed',
'task_id': task['task_id'],
'worker_id': self.worker_id
}
except Exception as e:
return {
'status': 'failed',
'task_id': task['task_id'],
'error': str(e),
'worker_id': self.worker_id
}
else:
# Worker 循环模式:返回状态信息
return {
'status': 'auto_mode',
'message': 'Worker is running in auto-loop mode, tasks are processed automatically',
'worker_id': self.worker_id,
'worker_running': self.running
}
finally:
# 清理临时文件
try:
if Path(file_path).exists():
Path(file_path).unlink()
except Exception as e:
logger.warning(f"Failed to clean up temp file {file_path}: {e}")
else:
return {
'status': 'error',
'message': f'Invalid action: {action}. Use "health" or "poll".',
'worker_id': self.worker_id
}
def encode_response(self, response):
"""编码响应"""
@@ -285,7 +429,9 @@ def start_litserve_workers(
accelerator='auto',
devices='auto',
workers_per_device=1,
port=9000
port=9000,
poll_interval=0.5,
enable_worker_loop=True
):
"""
启动 LitServe Worker Pool
@@ -296,6 +442,8 @@ def start_litserve_workers(
devices: 使用的设备 (auto/[0,1,2])
workers_per_device: 每个 GPU 的 worker 数量
port: 服务端口
poll_interval: Worker 拉取任务的间隔(秒)
enable_worker_loop: 是否启用 worker 自动循环拉取任务
"""
logger.info("=" * 60)
logger.info("🚀 Starting MinerU Tianshu LitServe Worker Pool")
@@ -305,10 +453,17 @@ def start_litserve_workers(
logger.info(f"💾 Devices: {devices}")
logger.info(f"👷 Workers per Device: {workers_per_device}")
logger.info(f"🔌 Port: {port}")
logger.info(f"🔄 Worker Loop: {'Enabled' if enable_worker_loop else 'Disabled'}")
if enable_worker_loop:
logger.info(f"⏱️ Poll Interval: {poll_interval}s")
logger.info("=" * 60)
# 创建 LitServe 服务器
api = MinerUWorkerAPI(output_dir=output_dir)
api = MinerUWorkerAPI(
output_dir=output_dir,
poll_interval=poll_interval,
enable_worker_loop=enable_worker_loop
)
server = ls.LitServer(
api,
accelerator=accelerator,
@@ -317,9 +472,30 @@ def start_litserve_workers(
timeout=False, # 不设置超时
)
# 注册优雅关闭处理器
def graceful_shutdown(signum=None, frame=None):
"""处理关闭信号,优雅地停止 worker"""
logger.info("🛑 Received shutdown signal, gracefully stopping workers...")
# 注意LitServe 会为每个设备创建多个 worker 实例
# 这里的 api 只是模板,实际的 worker 实例由 LitServe 管理
# teardown 会在每个 worker 进程中被调用
if hasattr(api, 'teardown'):
api.teardown()
sys.exit(0)
# 注册信号处理器Ctrl+C 等)
signal.signal(signal.SIGINT, graceful_shutdown)
signal.signal(signal.SIGTERM, graceful_shutdown)
# 注册 atexit 处理器(正常退出时调用)
atexit.register(lambda: api.teardown() if hasattr(api, 'teardown') else None)
logger.info(f"✅ LitServe worker pool initialized")
logger.info(f"📡 Listening on: http://0.0.0.0:{port}/predict")
logger.info(f"🔄 Workers will poll SQLite queue for tasks")
if enable_worker_loop:
logger.info(f"🔁 Workers will continuously poll and process tasks")
else:
logger.info(f"🔄 Workers will wait for scheduler triggers")
logger.info("=" * 60)
# 启动服务器
@@ -341,6 +517,10 @@ if __name__ == '__main__':
help='Number of workers per device')
parser.add_argument('--port', type=int, default=9000,
help='Server port')
parser.add_argument('--poll-interval', type=float, default=0.5,
help='Worker poll interval in seconds (default: 0.5)')
parser.add_argument('--disable-worker-loop', action='store_true',
help='Disable worker auto-loop mode (use scheduler-driven mode)')
args = parser.parse_args()
@@ -358,6 +538,9 @@ if __name__ == '__main__':
accelerator=args.accelerator,
devices=devices,
workers_per_device=args.workers_per_device,
port=args.port
port=args.port,
poll_interval=args.poll_interval,
enable_worker_loop=not args.disable_worker_loop
)

View File

@@ -22,7 +22,7 @@ aiohttp>=3.11.0
loguru>=0.7.0
# Office Document Parsing
markitdown>=0.1.3
markitdown[all]>=0.1.3
# MinIO Object Storage
minio>=7.2.0

View File

@@ -20,7 +20,15 @@ class TaskDB:
self._init_db()
def _get_conn(self):
"""获取数据库连接(每次创建新连接,避免 pickle 问题)"""
"""获取数据库连接(每次创建新连接,避免 pickle 问题)
并发安全说明:
- 使用 check_same_thread=False 是安全的,因为:
1. 每次调用都创建新连接,不跨线程共享
2. 连接使用完立即关闭(在 get_cursor 上下文管理器中)
3. 不使用连接池,避免线程间共享同一连接
- timeout=30.0 防止死锁如果锁等待超过30秒会抛出异常
"""
conn = sqlite3.connect(
self.db_path,
check_same_thread=False,
@@ -95,45 +103,116 @@ class TaskDB:
''', (task_id, file_name, file_path, backend, json.dumps(options or {}), priority))
return task_id
def get_next_task(self, worker_id: str) -> Optional[Dict]:
def get_next_task(self, worker_id: str, max_retries: int = 3) -> Optional[Dict]:
"""
获取下一个待处理任务(原子操作,防止并发冲突)
Args:
worker_id: Worker ID
max_retries: 当任务被其他 worker 抢走时的最大重试次数默认3次
Returns:
task: 任务字典,如果没有任务返回 None
并发安全说明:
1. 使用 BEGIN IMMEDIATE 立即获取写锁
2. UPDATE 时检查 status = 'pending' 防止重复拉取
3. 检查 rowcount 确保更新成功
4. 如果任务被抢走,立即重试而不是返回 None避免不必要的等待
"""
with self.get_cursor() as cursor:
# 使用事务确保原子性
cursor.execute('BEGIN IMMEDIATE')
# 按优先级和创建时间获取任务
cursor.execute('''
SELECT * FROM tasks
WHERE status = 'pending'
ORDER BY priority DESC, created_at ASC
LIMIT 1
''')
task = cursor.fetchone()
if task:
# 立即标记为 processing
cursor.execute('''
UPDATE tasks
SET status = 'processing',
started_at = CURRENT_TIMESTAMP,
worker_id = ?
WHERE task_id = ?
''', (worker_id, task['task_id']))
for attempt in range(max_retries):
with self.get_cursor() as cursor:
# 使用事务确保原子性
cursor.execute('BEGIN IMMEDIATE')
return dict(task)
# 按优先级和创建时间获取任务
cursor.execute('''
SELECT * FROM tasks
WHERE status = 'pending'
ORDER BY priority DESC, created_at ASC
LIMIT 1
''')
task = cursor.fetchone()
if task:
# 立即标记为 processing并确保状态仍是 pending
cursor.execute('''
UPDATE tasks
SET status = 'processing',
started_at = CURRENT_TIMESTAMP,
worker_id = ?
WHERE task_id = ? AND status = 'pending'
''', (worker_id, task['task_id']))
# 检查是否更新成功(防止被其他 worker 抢走)
if cursor.rowcount == 0:
# 任务被其他进程抢走了,立即重试
# 因为队列中可能还有其他待处理任务
continue
return dict(task)
else:
# 队列中没有待处理任务,返回 None
return None
return None
# 重试次数用尽,仍未获取到任务(高并发场景)
return None
def _build_update_clauses(self, status: str, result_path: str = None,
error_message: str = None, worker_id: str = None,
task_id: str = None):
"""
构建 UPDATE 和 WHERE 子句的辅助方法
Args:
status: 新状态
result_path: 结果路径(可选)
error_message: 错误信息(可选)
worker_id: Worker ID可选
task_id: 任务ID可选
Returns:
tuple: (update_clauses, update_params, where_clauses, where_params)
"""
update_clauses = ['status = ?']
update_params = [status]
where_clauses = []
where_params = []
# 添加 task_id 条件(如果提供)
if task_id:
where_clauses.append('task_id = ?')
where_params.append(task_id)
# 处理 completed 状态
if status == 'completed':
update_clauses.append('completed_at = CURRENT_TIMESTAMP')
if result_path:
update_clauses.append('result_path = ?')
update_params.append(result_path)
# 只更新正在处理的任务
where_clauses.append("status = 'processing'")
if worker_id:
where_clauses.append('worker_id = ?')
where_params.append(worker_id)
# 处理 failed 状态
elif status == 'failed':
update_clauses.append('completed_at = CURRENT_TIMESTAMP')
if error_message:
update_clauses.append('error_message = ?')
update_params.append(error_message)
# 只更新正在处理的任务
where_clauses.append("status = 'processing'")
if worker_id:
where_clauses.append('worker_id = ?')
where_params.append(worker_id)
return update_clauses, update_params, where_clauses, where_params
def update_task_status(self, task_id: str, status: str,
result_path: str = None, error_message: str = None):
result_path: str = None, error_message: str = None,
worker_id: str = None):
"""
更新任务状态
@@ -142,27 +221,44 @@ class TaskDB:
status: 新状态 (pending/processing/completed/failed/cancelled)
result_path: 结果路径(可选)
error_message: 错误信息(可选)
worker_id: Worker ID可选用于并发检查
Returns:
bool: 更新是否成功
并发安全说明:
1. 更新为 completed/failed 时会检查状态是 processing
2. 如果提供 worker_id会检查任务是否属于该 worker
3. 返回 False 表示任务被其他进程修改了
"""
with self.get_cursor() as cursor:
updates = ['status = ?']
params = [status]
# 使用辅助方法构建 UPDATE 和 WHERE 子句
update_clauses, update_params, where_clauses, where_params = \
self._build_update_clauses(status, result_path, error_message, worker_id, task_id)
if status == 'completed':
updates.append('completed_at = CURRENT_TIMESTAMP')
if result_path:
updates.append('result_path = ?')
params.append(result_path)
# 合并参数:先 UPDATE 部分,再 WHERE 部分
all_params = update_params + where_params
if status == 'failed' and error_message:
updates.append('error_message = ?')
params.append(error_message)
updates.append('completed_at = CURRENT_TIMESTAMP')
sql = f'''
UPDATE tasks
SET {', '.join(update_clauses)}
WHERE {' AND '.join(where_clauses)}
'''
params.append(task_id)
cursor.execute(f'''
UPDATE tasks SET {', '.join(updates)}
WHERE task_id = ?
''', params)
cursor.execute(sql, all_params)
# 检查更新是否成功
success = cursor.rowcount > 0
# 调试日志(仅在失败时)
if not success and status in ['completed', 'failed']:
from loguru import logger
logger.debug(
f"Status update failed: task_id={task_id}, status={status}, "
f"worker_id={worker_id}, SQL: {sql}, params: {all_params}"
)
return success
def get_task(self, task_id: str) -> Optional[Dict]:
"""
@@ -215,12 +311,72 @@ class TaskDB:
''', (status, limit))
return [dict(row) for row in cursor.fetchall()]
def cleanup_old_tasks(self, days: int = 7):
def cleanup_old_task_files(self, days: int = 7):
"""
清理旧任务记录
清理旧任务的结果文件(保留数据库记录
Args:
days: 保留最近N天的任务
days: 清理多少天前的任务文件
Returns:
int: 删除的文件目录数
注意:
- 只删除结果文件,保留数据库记录
- 数据库中的 result_path 字段会被清空
- 用户仍可查询任务状态和历史记录
"""
from pathlib import Path
import shutil
with self.get_cursor() as cursor:
# 查询要清理文件的任务
cursor.execute('''
SELECT task_id, result_path FROM tasks
WHERE completed_at < datetime('now', '-' || ? || ' days')
AND status IN ('completed', 'failed')
AND result_path IS NOT NULL
''', (days,))
old_tasks = cursor.fetchall()
file_count = 0
# 删除结果文件
for task in old_tasks:
if task['result_path']:
result_path = Path(task['result_path'])
if result_path.exists() and result_path.is_dir():
try:
shutil.rmtree(result_path)
file_count += 1
# 清空数据库中的 result_path表示文件已被清理
cursor.execute('''
UPDATE tasks
SET result_path = NULL
WHERE task_id = ?
''', (task['task_id'],))
except Exception as e:
from loguru import logger
logger.warning(f"Failed to delete result files for task {task['task_id']}: {e}")
return file_count
def cleanup_old_task_records(self, days: int = 30):
"""
清理极旧的任务记录(可选功能)
Args:
days: 删除多少天前的任务记录
Returns:
int: 删除的记录数
注意:
- 这个方法会永久删除数据库记录
- 建议设置较长的保留期如30-90天
- 一般情况下不需要调用此方法
"""
with self.get_cursor() as cursor:
cursor.execute('''
@@ -228,6 +384,7 @@ class TaskDB:
WHERE completed_at < datetime('now', '-' || ? || ' days')
AND status IN ('completed', 'failed')
''', (days,))
deleted_count = cursor.rowcount
return deleted_count

View File

@@ -1,8 +1,18 @@
"""
MinerU Tianshu - Task Scheduler
天枢任务调度器
MinerU Tianshu - Task Scheduler (Optional)
天枢任务调度器(可选)
定期检查任务队列,触发 LitServe Workers 拉取和处理任务
在 Worker 自动循环模式下,调度器主要用于:
1. 监控队列状态默认5分钟一次
2. 健康检查默认15分钟一次
3. 统计信息收集
4. 故障恢复(重置超时任务)
注意:
- 如果 workers 启用了自动循环模式(默认),则不需要调度器来触发任务处理
- Worker 已经主动工作,调度器只是偶尔检查系统状态
- 较长的间隔可以最小化系统开销,同时保持必要的监控能力
- 5分钟监控、15分钟健康检查对于自动运行的系统来说已经足够及时
"""
import asyncio
import aiohttp
@@ -13,108 +23,161 @@ import signal
class TaskScheduler:
"""
任务调度器
任务调度器(可选)
职责:
1. 监控 SQLite 任务队列
2. 当有待处理任务时,触发 LitServe Workers
3. 管理调度策略(轮询间隔、并发控制等
职责(在 Worker 自动循环模式下)
1. 监控 SQLite 任务队列状态
2. 健康检查 Workers
3. 故障恢复(重置超时任务
4. 收集和展示统计信息
职责(在传统模式下):
1. 触发 Workers 拉取任务
"""
def __init__(
self,
litserve_url='http://localhost:9000/predict',
poll_interval=2,
max_concurrent_polls=10
monitor_interval=300,
health_check_interval=900,
stale_task_timeout=60,
cleanup_old_files_days=7,
cleanup_old_records_days=0,
worker_auto_mode=True
):
"""
初始化调度器
Args:
litserve_url: LitServe Worker 的 URL
poll_interval: 轮询间隔(秒)
max_concurrent_polls: 最大并发轮询数
monitor_interval: 监控间隔(秒默认300秒=5分钟
health_check_interval: 健康检查间隔默认900秒=15分钟
stale_task_timeout: 超时任务重置时间(分钟)
cleanup_old_files_days: 清理多少天前的结果文件0=禁用默认7天
cleanup_old_records_days: 清理多少天前的数据库记录0=禁用,不推荐删除)
worker_auto_mode: Worker 是否启用自动循环模式
"""
self.litserve_url = litserve_url
self.poll_interval = poll_interval
self.max_concurrent_polls = max_concurrent_polls
self.monitor_interval = monitor_interval
self.health_check_interval = health_check_interval
self.stale_task_timeout = stale_task_timeout
self.cleanup_old_files_days = cleanup_old_files_days
self.cleanup_old_records_days = cleanup_old_records_days
self.worker_auto_mode = worker_auto_mode
self.db = TaskDB()
self.running = True
self.semaphore = asyncio.Semaphore(max_concurrent_polls)
async def trigger_worker_poll(self, session: aiohttp.ClientSession):
async def check_worker_health(self, session: aiohttp.ClientSession):
"""
触发一个 worker 拉取任务
检查 worker 健康状态
"""
async with self.semaphore:
try:
async with session.post(
self.litserve_url,
json={'action': 'poll'},
timeout=aiohttp.ClientTimeout(total=600) # 10分钟超时
) as resp:
if resp.status == 200:
result = await resp.json()
if result.get('status') == 'completed':
logger.info(f"✅ Task completed: {result.get('task_id')} by {result.get('worker_id')}")
elif result.get('status') == 'failed':
logger.error(f"❌ Task failed: {result.get('task_id')} - {result.get('error')}")
elif result.get('status') == 'idle':
# Worker 空闲,没有任务
pass
return result
else:
logger.error(f"Worker poll failed with status {resp.status}")
except asyncio.TimeoutError:
logger.warning("Worker poll timeout")
except Exception as e:
logger.error(f"Worker poll error: {e}")
try:
async with session.post(
self.litserve_url,
json={'action': 'health'},
timeout=aiohttp.ClientTimeout(total=10)
) as resp:
if resp.status == 200:
result = await resp.json()
return result
else:
logger.error(f"Health check failed with status {resp.status}")
return None
except asyncio.TimeoutError:
logger.warning("Health check timeout")
return None
except Exception as e:
logger.error(f"Health check error: {e}")
return None
async def schedule_loop(self):
"""
调度循环
监控循环
"""
logger.info("🔄 Task scheduler started")
logger.info(f" LitServe URL: {self.litserve_url}")
logger.info(f" Poll Interval: {self.poll_interval}s")
logger.info(f" Max Concurrent Polls: {self.max_concurrent_polls}")
logger.info(f" Worker Mode: {'Auto-Loop' if self.worker_auto_mode else 'Scheduler-Driven'}")
logger.info(f" Monitor Interval: {self.monitor_interval}s")
logger.info(f" Health Check Interval: {self.health_check_interval}s")
logger.info(f" Stale Task Timeout: {self.stale_task_timeout}m")
if self.cleanup_old_files_days > 0:
logger.info(f" Cleanup Old Files: {self.cleanup_old_files_days} days")
else:
logger.info(f" Cleanup Old Files: Disabled")
if self.cleanup_old_records_days > 0:
logger.info(f" Cleanup Old Records: {self.cleanup_old_records_days} days (Not Recommended)")
else:
logger.info(f" Cleanup Old Records: Disabled (Keep Forever)")
health_check_counter = 0
stale_task_counter = 0
cleanup_counter = 0
async with aiohttp.ClientSession() as session:
while self.running:
try:
# 获取队列统计
# 1. 监控队列状态
stats = self.db.get_queue_stats()
pending_count = stats.get('pending', 0)
processing_count = stats.get('processing', 0)
completed_count = stats.get('completed', 0)
failed_count = stats.get('failed', 0)
if pending_count > 0:
logger.info(f"📋 Queue status: {pending_count} pending, {processing_count} processing")
# 计算需要触发的 worker 数量
# 考虑:待处理任务数
needed_workers = min(
pending_count, # 待处理任务数
self.max_concurrent_polls # 最大并发数
if pending_count > 0 or processing_count > 0:
logger.info(
f"📊 Queue: {pending_count} pending, {processing_count} processing, "
f"{completed_count} completed, {failed_count} failed"
)
if needed_workers > 0:
# 并发触发多个 worker
# semaphore 会自动控制实际并发数
tasks = [
self.trigger_worker_poll(session)
for _ in range(needed_workers)
]
await asyncio.gather(*tasks, return_exceptions=True)
# 等待下一次轮询
await asyncio.sleep(self.poll_interval)
# 2. 定期健康检查
health_check_counter += 1
if health_check_counter * self.monitor_interval >= self.health_check_interval:
health_check_counter = 0
logger.info("🏥 Performing health check...")
health_result = await self.check_worker_health(session)
if health_result:
logger.info(f"✅ Workers healthy: {health_result}")
else:
logger.warning("⚠️ Workers health check failed")
# 3. 定期重置超时任务
stale_task_counter += 1
if stale_task_counter * self.monitor_interval >= self.stale_task_timeout * 60:
stale_task_counter = 0
reset_count = self.db.reset_stale_tasks(self.stale_task_timeout)
if reset_count > 0:
logger.warning(f"⚠️ Reset {reset_count} stale tasks (timeout: {self.stale_task_timeout}m)")
# 4. 定期清理旧任务文件和记录
cleanup_counter += 1
# 每24小时清理一次基于当前监控间隔计算
cleanup_interval_cycles = (24 * 3600) / self.monitor_interval
if cleanup_counter >= cleanup_interval_cycles:
cleanup_counter = 0
# 清理旧结果文件(保留数据库记录)
if self.cleanup_old_files_days > 0:
logger.info(f"🧹 Cleaning up result files older than {self.cleanup_old_files_days} days...")
file_count = self.db.cleanup_old_task_files(days=self.cleanup_old_files_days)
if file_count > 0:
logger.info(f"✅ Cleaned up {file_count} result directories (DB records kept)")
# 清理极旧的数据库记录(可选,默认不启用)
if self.cleanup_old_records_days > 0:
logger.warning(
f"🗑️ Cleaning up database records older than {self.cleanup_old_records_days} days..."
)
record_count = self.db.cleanup_old_task_records(days=self.cleanup_old_records_days)
if record_count > 0:
logger.warning(f"⚠️ Deleted {record_count} task records permanently")
# 等待下一次监控
await asyncio.sleep(self.monitor_interval)
except Exception as e:
logger.error(f"Scheduler loop error: {e}")
await asyncio.sleep(self.poll_interval)
await asyncio.sleep(self.monitor_interval)
logger.info("⏹️ Task scheduler stopped")
@@ -156,15 +219,23 @@ async def health_check(litserve_url: str) -> bool:
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='MinerU Tianshu Task Scheduler')
parser = argparse.ArgumentParser(description='MinerU Tianshu Task Scheduler (Optional)')
parser.add_argument('--litserve-url', type=str, default='http://localhost:9000/predict',
help='LitServe worker URL')
parser.add_argument('--poll-interval', type=int, default=2,
help='Poll interval in seconds')
parser.add_argument('--max-concurrent', type=int, default=10,
help='Maximum concurrent worker polls')
parser.add_argument('--monitor-interval', type=int, default=300,
help='Monitor interval in seconds (default: 300s = 5 minutes)')
parser.add_argument('--health-check-interval', type=int, default=900,
help='Health check interval in seconds (default: 900s = 15 minutes)')
parser.add_argument('--stale-task-timeout', type=int, default=60,
help='Timeout for stale tasks in minutes (default: 60)')
parser.add_argument('--cleanup-old-files-days', type=int, default=7,
help='Delete result files older than N days (0=disable, default: 7)')
parser.add_argument('--cleanup-old-records-days', type=int, default=0,
help='Delete DB records older than N days (0=disable, NOT recommended)')
parser.add_argument('--wait-for-workers', action='store_true',
help='Wait for workers to be ready before starting')
parser.add_argument('--no-worker-auto-mode', action='store_true',
help='Disable worker auto-loop mode assumption')
args = parser.parse_args()
@@ -184,8 +255,12 @@ if __name__ == '__main__':
# 创建并启动调度器
scheduler = TaskScheduler(
litserve_url=args.litserve_url,
poll_interval=args.poll_interval,
max_concurrent_polls=args.max_concurrent
monitor_interval=args.monitor_interval,
health_check_interval=args.health_check_interval,
stale_task_timeout=args.stale_task_timeout,
cleanup_old_files_days=args.cleanup_old_files_days,
cleanup_old_records_days=args.cleanup_old_records_days,
worker_auto_mode=not args.no_worker_auto_mode
)
try: