diff --git a/mineru/backend/hybrid/__init__.py b/mineru/backend/hybrid/__init__.py new file mode 100644 index 00000000..1e83c082 --- /dev/null +++ b/mineru/backend/hybrid/__init__.py @@ -0,0 +1 @@ +# Copyright (c) Opendatalab. All rights reserved. diff --git a/mineru/backend/hybrid/hybrid_analyze.py b/mineru/backend/hybrid/hybrid_analyze.py new file mode 100644 index 00000000..24b01f93 --- /dev/null +++ b/mineru/backend/hybrid/hybrid_analyze.py @@ -0,0 +1,228 @@ +# Copyright (c) Opendatalab. All rights reserved. +import os +import time + +from PIL import Image +from lmdeploy import PytorchEngineConfig, TurbomindEngineConfig +from lmdeploy.serve.vl_async_engine import VLAsyncEngine +from loguru import logger +from mineru_vl_utils import MinerUClient +from packaging import version + +from mineru.backend.hybrid.utils import set_default_batch_size, set_default_gpu_memory_utilization, \ + enable_custom_logits_processors, set_lmdeploy_backend +from mineru.utils.check_sys_env import is_mac_os_version_supported +from mineru.utils.config_reader import get_device +from mineru.utils.models_download_utils import auto_download_and_get_model_root_path + + +class ModelSingleton: + _instance = None + _models = {} + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def get_model( + self, + backend: str, + model_path: str | None, + server_url: str | None, + **kwargs, + ) -> MinerUClient: + key = (backend, model_path, server_url) + if key not in self._models: + start_time = time.time() + model = None + processor = None + vllm_llm = None + lmdeploy_engine = None + vllm_async_llm = None + batch_size = kwargs.get("batch_size", 0) # for transformers backend only + max_concurrency = kwargs.get("max_concurrency", 100) # for http-client backend only + http_timeout = kwargs.get("http_timeout", 600) # for http-client backend only + server_headers = kwargs.get("server_headers", None) # for http-client backend only + max_retries = kwargs.get("max_retries", 3) # for http-client backend only + retry_backoff_factor = kwargs.get("retry_backoff_factor", 0.5) # for http-client backend only + # 从kwargs中移除这些参数,避免传递给不相关的初始化函数 + for param in ["batch_size", "max_concurrency", "http_timeout", "server_headers", "max_retries", "retry_backoff_factor"]: + if param in kwargs: + del kwargs[param] + if backend not in ["http-client"] and not model_path: + model_path = auto_download_and_get_model_root_path("/","vlm") + if backend == "transformers": + try: + from transformers import ( + AutoProcessor, + Qwen2VLForConditionalGeneration, + ) + from transformers import __version__ as transformers_version + except ImportError: + raise ImportError("Please install transformers to use the transformers backend.") + + if version.parse(transformers_version) >= version.parse("4.56.0"): + dtype_key = "dtype" + else: + dtype_key = "torch_dtype" + device = get_device() + model = Qwen2VLForConditionalGeneration.from_pretrained( + model_path, + device_map={"": device}, + **{dtype_key: "auto"}, # type: ignore + ) + processor = AutoProcessor.from_pretrained( + model_path, + use_fast=True, + ) + if batch_size == 0: + batch_size = set_default_batch_size() + elif backend == "mlx-engine": + mlx_supported = is_mac_os_version_supported() + if not mlx_supported: + raise EnvironmentError("mlx-engine backend is only supported on macOS 13.5+ with Apple Silicon.") + try: + from mlx_vlm import load as mlx_load + except ImportError: + raise ImportError("Please install mlx-vlm to use the mlx-engine backend.") + model, processor = mlx_load(model_path) + else: + if os.getenv('OMP_NUM_THREADS') is None: + os.environ["OMP_NUM_THREADS"] = "1" + + if backend == "vllm-engine": + try: + import vllm + except ImportError: + raise ImportError("Please install vllm to use the vllm-engine backend.") + if "gpu_memory_utilization" not in kwargs: + kwargs["gpu_memory_utilization"] = set_default_gpu_memory_utilization() + if "model" not in kwargs: + kwargs["model"] = model_path + if enable_custom_logits_processors() and ("logits_processors" not in kwargs): + from mineru_vl_utils import MinerULogitsProcessor + kwargs["logits_processors"] = [MinerULogitsProcessor] + # 使用kwargs为 vllm初始化参数 + vllm_llm = vllm.LLM(**kwargs) + elif backend == "vllm-async-engine": + try: + from vllm.engine.arg_utils import AsyncEngineArgs + from vllm.v1.engine.async_llm import AsyncLLM + except ImportError: + raise ImportError("Please install vllm to use the vllm-async-engine backend.") + if "gpu_memory_utilization" not in kwargs: + kwargs["gpu_memory_utilization"] = set_default_gpu_memory_utilization() + if "model" not in kwargs: + kwargs["model"] = model_path + if enable_custom_logits_processors() and ("logits_processors" not in kwargs): + from mineru_vl_utils import MinerULogitsProcessor + kwargs["logits_processors"] = [MinerULogitsProcessor] + # 使用kwargs为 vllm初始化参数 + vllm_async_llm = AsyncLLM.from_engine_args(AsyncEngineArgs(**kwargs)) + elif backend == "lmdeploy-engine": + try: + from lmdeploy import PytorchEngineConfig, TurbomindEngineConfig + from lmdeploy.serve.vl_async_engine import VLAsyncEngine + except ImportError: + raise ImportError("Please install lmdeploy to use the lmdeploy-engine backend.") + if "cache_max_entry_count" not in kwargs: + kwargs["cache_max_entry_count"] = 0.5 + + device_type = os.getenv("MINERU_LMDEPLOY_DEVICE", "") + if device_type == "": + if "lmdeploy_device" in kwargs: + device_type = kwargs.pop("lmdeploy_device") + if device_type not in ["cuda", "ascend", "maca", "camb"]: + raise ValueError(f"Unsupported lmdeploy device type: {device_type}") + else: + device_type = "cuda" + lm_backend = os.getenv("MINERU_LMDEPLOY_BACKEND", "") + if lm_backend == "": + if "lmdeploy_backend" in kwargs: + lm_backend = kwargs.pop("lmdeploy_backend") + if lm_backend not in ["pytorch", "turbomind"]: + raise ValueError(f"Unsupported lmdeploy backend: {lm_backend}") + else: + lm_backend = set_lmdeploy_backend(device_type) + logger.info(f"lmdeploy device is: {device_type}, lmdeploy backend is: {lm_backend}") + + if lm_backend == "pytorch": + kwargs["device_type"] = device_type + backend_config = PytorchEngineConfig(**kwargs) + elif lm_backend == "turbomind": + backend_config = TurbomindEngineConfig(**kwargs) + else: + raise ValueError(f"Unsupported lmdeploy backend: {lm_backend}") + + log_level = 'ERROR' + from lmdeploy.utils import get_logger + lm_logger = get_logger('lmdeploy') + lm_logger.setLevel(log_level) + if os.getenv('TM_LOG_LEVEL') is None: + os.environ['TM_LOG_LEVEL'] = log_level + + lmdeploy_engine = VLAsyncEngine( + model_path, + backend=lm_backend, + backend_config=backend_config, + ) + self._models[key] = MinerUClient( + backend=backend, + model=model, + processor=processor, + lmdeploy_engine=lmdeploy_engine, + vllm_llm=vllm_llm, + vllm_async_llm=vllm_async_llm, + server_url=server_url, + batch_size=batch_size, + max_concurrency=max_concurrency, + http_timeout=http_timeout, + server_headers=server_headers, + max_retries=max_retries, + retry_backoff_factor=retry_backoff_factor, + ) + elapsed = round(time.time() - start_time, 2) + logger.info(f"get {backend} predictor cost: {elapsed}s") + return self._models[key] + + +if __name__ == "__main__": + kwargs = {} + kwargs["cache_max_entry_count"] = 0.5 + os.environ["MINERU_MODEL_SOURCE"] = "local" + model_path = auto_download_and_get_model_root_path("/", "vlm") + + log_level = 'ERROR' + from lmdeploy.utils import get_logger + + lm_logger = get_logger('lmdeploy') + lm_logger.setLevel(log_level) + if os.getenv('TM_LOG_LEVEL') is None: + os.environ['TM_LOG_LEVEL'] = log_level + + lmdeploy_engine = VLAsyncEngine( + model_path, + backend="turbomind", + backend_config=TurbomindEngineConfig(**kwargs), + ) + + client = MinerUClient( + backend="lmdeploy-engine", + lmdeploy_engine=lmdeploy_engine, + ) + not_extract_list = ["text", "title"] + image = Image.open(r"C:\Users\zhaoxiaomeng\Downloads\ch_hand_writer.jpg") + def run_sync(): + extracted_blocks = client.two_step_extract(image=image, not_extract_list=not_extract_list) + return extracted_blocks + async def run_async(): + extracted_blocks = await client.aio_two_step_extract(image=image, not_extract_list=not_extract_list) + return extracted_blocks + + extracted_blocks = run_sync() + print(f"extracted_blocks: {extracted_blocks}") + + import asyncio + async_extracted_blocks = asyncio.run(run_async()) + print(f"async_extracted_blocks: {async_extracted_blocks}") diff --git a/mineru/backend/hybrid/utils.py b/mineru/backend/hybrid/utils.py new file mode 100644 index 00000000..5defc438 --- /dev/null +++ b/mineru/backend/hybrid/utils.py @@ -0,0 +1,97 @@ +import os + +from loguru import logger +from packaging import version + +from mineru.utils.check_sys_env import is_windows_environment, is_linux_environment +from mineru.utils.config_reader import get_device +from mineru.utils.model_utils import get_vram + + +def enable_custom_logits_processors() -> bool: + import torch + from vllm import __version__ as vllm_version + + if torch.cuda.is_available(): + major, minor = torch.cuda.get_device_capability() + # 正确计算Compute Capability + compute_capability = f"{major}.{minor}" + elif hasattr(torch, 'npu') and torch.npu.is_available(): + compute_capability = "8.0" + else: + logger.info("CUDA not available, disabling custom_logits_processors") + return False + + # 安全地处理环境变量 + vllm_use_v1_str = os.getenv('VLLM_USE_V1', "1") + if vllm_use_v1_str.isdigit(): + vllm_use_v1 = int(vllm_use_v1_str) + else: + vllm_use_v1 = 1 + + if vllm_use_v1 == 0: + logger.info("VLLM_USE_V1 is set to 0, disabling custom_logits_processors") + return False + elif version.parse(vllm_version) < version.parse("0.10.1"): + logger.info(f"vllm version: {vllm_version} < 0.10.1, disable custom_logits_processors") + return False + elif version.parse(compute_capability) < version.parse("8.0"): + if version.parse(vllm_version) >= version.parse("0.10.2"): + logger.info(f"compute_capability: {compute_capability} < 8.0, but vllm version: {vllm_version} >= 0.10.2, enable custom_logits_processors") + return True + else: + logger.info(f"compute_capability: {compute_capability} < 8.0 and vllm version: {vllm_version} < 0.10.2, disable custom_logits_processors") + return False + else: + logger.info(f"compute_capability: {compute_capability} >= 8.0 and vllm version: {vllm_version} >= 0.10.1, enable custom_logits_processors") + return True + + +def set_lmdeploy_backend(device_type: str) -> str: + if device_type.lower() in ["ascend", "maca", "camb"]: + lmdeploy_backend = "pytorch" + elif device_type.lower() in ["cuda"]: + import torch + if not torch.cuda.is_available(): + raise ValueError("CUDA is not available.") + if is_windows_environment(): + lmdeploy_backend = "turbomind" + elif is_linux_environment(): + major, minor = torch.cuda.get_device_capability() + compute_capability = f"{major}.{minor}" + if version.parse(compute_capability) >= version.parse("8.0"): + lmdeploy_backend = "pytorch" + else: + lmdeploy_backend = "turbomind" + else: + raise ValueError("Unsupported operating system.") + else: + raise ValueError(f"Unsupported lmdeploy device type: {device_type}") + return lmdeploy_backend + + +def set_default_gpu_memory_utilization() -> float: + from vllm import __version__ as vllm_version + if version.parse(vllm_version) >= version.parse("0.11.0"): + return 0.7 + else: + return 0.5 + + +def set_default_batch_size() -> int: + try: + device = get_device() + gpu_memory = get_vram(device) + + if gpu_memory >= 16: + batch_size = 8 + elif gpu_memory >= 8: + batch_size = 4 + else: + batch_size = 1 + logger.info(f'gpu_memory: {gpu_memory} GB, batch_size: {batch_size}') + + except Exception as e: + logger.warning(f'Error determining VRAM: {e}, using default batch_ratio: 1') + batch_size = 1 + return batch_size \ No newline at end of file