Files
MinerU/projects/multi_gpu_v2/client.py

82 lines
2.6 KiB
Python

import base64
import os
from loguru import logger
import asyncio
import aiohttp
async def mineru_parse_async(session, file_path, url='http://127.0.0.1:8000/predict', **options):
"""
Asynchronous version of the parse function.
"""
try:
# Asynchronously read and encode the file
with open(file_path, 'rb') as f:
file_b64 = base64.b64encode(f.read()).decode('utf-8')
payload = {
'file': file_b64,
'options': options
}
# Use the aiohttp session to send the request
async with session.post(url, json=payload) as response:
if response.status == 200:
result = await response.json()
logger.info(f"✅ Processed: {file_path} -> {result.get('output_dir', 'N/A')}")
return result
else:
error_text = await response.text()
logger.error(f"❌ Server error for {file_path}: {error_text}")
return {'error': error_text}
except Exception as e:
logger.error(f"❌ Failed to process {file_path}: {e}")
return {'error': str(e)}
async def main():
"""
Main function to run all parsing tasks concurrently.
"""
test_files = [
'../../demo/pdfs/demo1.pdf',
'../../demo/pdfs/demo2.pdf',
'../../demo/pdfs/demo3.pdf',
'../../demo/pdfs/small_ocr.pdf',
]
test_files = [os.path.join(os.path.dirname(__file__), f) for f in test_files]
existing_files = [f for f in test_files if os.path.exists(f)]
if not existing_files:
logger.warning("No test files found.")
return
# Create an aiohttp session to be reused across requests
async with aiohttp.ClientSession() as session:
# === Basic Processing ===
basic_tasks = [mineru_parse_async(session, file_path) for file_path in existing_files[:2]]
# === Custom Options ===
custom_options = {
'backend': 'pipeline', 'lang': 'ch', 'method': 'auto',
'formula_enable': True, 'table_enable': True,
# Example for remote vlm server (vllm/sglang/lmdeploy...)
# 'backend': 'vlm-http-client', 'server_url': 'http://127.0.0.1:30000',
}
custom_tasks = [mineru_parse_async(session, file_path, **custom_options) for file_path in existing_files[2:]]
# Start all tasks
all_tasks = basic_tasks + custom_tasks
all_results = await asyncio.gather(*all_tasks)
logger.info(f"All Results: {all_results}")
logger.info("🎉 All processing completed!")
if __name__ == '__main__':
# Run the async main function
asyncio.run(main())