Files
MinerU/projects/mineru_tianshu/task_scheduler.py
2025-10-17 13:04:49 +08:00

271 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
MinerU Tianshu - Task Scheduler (Optional)
天枢任务调度器(可选)
在 Worker 自动循环模式下,调度器主要用于:
1. 监控队列状态默认5分钟一次
2. 健康检查默认15分钟一次
3. 统计信息收集
4. 故障恢复(重置超时任务)
注意:
- 如果 workers 启用了自动循环模式(默认),则不需要调度器来触发任务处理
- Worker 已经主动工作,调度器只是偶尔检查系统状态
- 较长的间隔可以最小化系统开销,同时保持必要的监控能力
- 5分钟监控、15分钟健康检查对于自动运行的系统来说已经足够及时
"""
import asyncio
import aiohttp
from loguru import logger
from task_db import TaskDB
import signal
class TaskScheduler:
"""
任务调度器(可选)
职责(在 Worker 自动循环模式下):
1. 监控 SQLite 任务队列状态
2. 健康检查 Workers
3. 故障恢复(重置超时任务)
4. 收集和展示统计信息
职责(在传统模式下):
1. 触发 Workers 拉取任务
"""
def __init__(
self,
litserve_url='http://localhost:9000/predict',
monitor_interval=300,
health_check_interval=900,
stale_task_timeout=60,
cleanup_old_files_days=7,
cleanup_old_records_days=0,
worker_auto_mode=True
):
"""
初始化调度器
Args:
litserve_url: LitServe Worker 的 URL
monitor_interval: 监控间隔默认300秒=5分钟
health_check_interval: 健康检查间隔默认900秒=15分钟
stale_task_timeout: 超时任务重置时间(分钟)
cleanup_old_files_days: 清理多少天前的结果文件0=禁用默认7天
cleanup_old_records_days: 清理多少天前的数据库记录0=禁用,不推荐删除)
worker_auto_mode: Worker 是否启用自动循环模式
"""
self.litserve_url = litserve_url
self.monitor_interval = monitor_interval
self.health_check_interval = health_check_interval
self.stale_task_timeout = stale_task_timeout
self.cleanup_old_files_days = cleanup_old_files_days
self.cleanup_old_records_days = cleanup_old_records_days
self.worker_auto_mode = worker_auto_mode
self.db = TaskDB()
self.running = True
async def check_worker_health(self, session: aiohttp.ClientSession):
"""
检查 worker 健康状态
"""
try:
async with session.post(
self.litserve_url,
json={'action': 'health'},
timeout=aiohttp.ClientTimeout(total=10)
) as resp:
if resp.status == 200:
result = await resp.json()
return result
else:
logger.error(f"Health check failed with status {resp.status}")
return None
except asyncio.TimeoutError:
logger.warning("Health check timeout")
return None
except Exception as e:
logger.error(f"Health check error: {e}")
return None
async def schedule_loop(self):
"""
主监控循环
"""
logger.info("🔄 Task scheduler started")
logger.info(f" LitServe URL: {self.litserve_url}")
logger.info(f" Worker Mode: {'Auto-Loop' if self.worker_auto_mode else 'Scheduler-Driven'}")
logger.info(f" Monitor Interval: {self.monitor_interval}s")
logger.info(f" Health Check Interval: {self.health_check_interval}s")
logger.info(f" Stale Task Timeout: {self.stale_task_timeout}m")
if self.cleanup_old_files_days > 0:
logger.info(f" Cleanup Old Files: {self.cleanup_old_files_days} days")
else:
logger.info(f" Cleanup Old Files: Disabled")
if self.cleanup_old_records_days > 0:
logger.info(f" Cleanup Old Records: {self.cleanup_old_records_days} days (Not Recommended)")
else:
logger.info(f" Cleanup Old Records: Disabled (Keep Forever)")
health_check_counter = 0
stale_task_counter = 0
cleanup_counter = 0
async with aiohttp.ClientSession() as session:
while self.running:
try:
# 1. 监控队列状态
stats = self.db.get_queue_stats()
pending_count = stats.get('pending', 0)
processing_count = stats.get('processing', 0)
completed_count = stats.get('completed', 0)
failed_count = stats.get('failed', 0)
if pending_count > 0 or processing_count > 0:
logger.info(
f"📊 Queue: {pending_count} pending, {processing_count} processing, "
f"{completed_count} completed, {failed_count} failed"
)
# 2. 定期健康检查
health_check_counter += 1
if health_check_counter * self.monitor_interval >= self.health_check_interval:
health_check_counter = 0
logger.info("🏥 Performing health check...")
health_result = await self.check_worker_health(session)
if health_result:
logger.info(f"✅ Workers healthy: {health_result}")
else:
logger.warning("⚠️ Workers health check failed")
# 3. 定期重置超时任务
stale_task_counter += 1
if stale_task_counter * self.monitor_interval >= self.stale_task_timeout * 60:
stale_task_counter = 0
reset_count = self.db.reset_stale_tasks(self.stale_task_timeout)
if reset_count > 0:
logger.warning(f"⚠️ Reset {reset_count} stale tasks (timeout: {self.stale_task_timeout}m)")
# 4. 定期清理旧任务文件和记录
cleanup_counter += 1
# 每24小时清理一次基于当前监控间隔计算
cleanup_interval_cycles = (24 * 3600) / self.monitor_interval
if cleanup_counter >= cleanup_interval_cycles:
cleanup_counter = 0
# 清理旧结果文件(保留数据库记录)
if self.cleanup_old_files_days > 0:
logger.info(f"🧹 Cleaning up result files older than {self.cleanup_old_files_days} days...")
file_count = self.db.cleanup_old_task_files(days=self.cleanup_old_files_days)
if file_count > 0:
logger.info(f"✅ Cleaned up {file_count} result directories (DB records kept)")
# 清理极旧的数据库记录(可选,默认不启用)
if self.cleanup_old_records_days > 0:
logger.warning(
f"🗑️ Cleaning up database records older than {self.cleanup_old_records_days} days..."
)
record_count = self.db.cleanup_old_task_records(days=self.cleanup_old_records_days)
if record_count > 0:
logger.warning(f"⚠️ Deleted {record_count} task records permanently")
# 等待下一次监控
await asyncio.sleep(self.monitor_interval)
except Exception as e:
logger.error(f"Scheduler loop error: {e}")
await asyncio.sleep(self.monitor_interval)
logger.info("⏹️ Task scheduler stopped")
def start(self):
"""启动调度器"""
logger.info("🚀 Starting MinerU Tianshu Task Scheduler...")
# 设置信号处理
def signal_handler(sig, frame):
logger.info("\n🛑 Received stop signal, shutting down...")
self.running = False
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# 运行调度循环
asyncio.run(self.schedule_loop())
def stop(self):
"""停止调度器"""
self.running = False
async def health_check(litserve_url: str) -> bool:
"""
健康检查:验证 LitServe Worker 是否可用
"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(
litserve_url.replace('/predict', '/health'),
timeout=aiohttp.ClientTimeout(total=5)
) as resp:
return resp.status == 200
except:
return False
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='MinerU Tianshu Task Scheduler (Optional)')
parser.add_argument('--litserve-url', type=str, default='http://localhost:9000/predict',
help='LitServe worker URL')
parser.add_argument('--monitor-interval', type=int, default=300,
help='Monitor interval in seconds (default: 300s = 5 minutes)')
parser.add_argument('--health-check-interval', type=int, default=900,
help='Health check interval in seconds (default: 900s = 15 minutes)')
parser.add_argument('--stale-task-timeout', type=int, default=60,
help='Timeout for stale tasks in minutes (default: 60)')
parser.add_argument('--cleanup-old-files-days', type=int, default=7,
help='Delete result files older than N days (0=disable, default: 7)')
parser.add_argument('--cleanup-old-records-days', type=int, default=0,
help='Delete DB records older than N days (0=disable, NOT recommended)')
parser.add_argument('--wait-for-workers', action='store_true',
help='Wait for workers to be ready before starting')
parser.add_argument('--no-worker-auto-mode', action='store_true',
help='Disable worker auto-loop mode assumption')
args = parser.parse_args()
# 等待 workers 就绪(可选)
if args.wait_for_workers:
logger.info("⏳ Waiting for LitServe workers to be ready...")
import time
max_retries = 30
for i in range(max_retries):
if asyncio.run(health_check(args.litserve_url)):
logger.info("✅ LitServe workers are ready!")
break
time.sleep(2)
if i == max_retries - 1:
logger.error("❌ LitServe workers not responding, starting anyway...")
# 创建并启动调度器
scheduler = TaskScheduler(
litserve_url=args.litserve_url,
monitor_interval=args.monitor_interval,
health_check_interval=args.health_check_interval,
stale_task_timeout=args.stale_task_timeout,
cleanup_old_files_days=args.cleanup_old_files_days,
cleanup_old_records_days=args.cleanup_old_records_days,
worker_auto_mode=not args.no_worker_auto_mode
)
try:
scheduler.start()
except KeyboardInterrupt:
logger.info("👋 Scheduler interrupted by user")