diff --git a/demo/demo.py b/demo/demo.py index 1c1fd108..f6f1c5ad 100644 --- a/demo/demo.py +++ b/demo/demo.py @@ -13,18 +13,11 @@ from mineru.data.data_reader_writer import FileBasedDataWriter from mineru.utils.draw_bbox import draw_layout_bbox, draw_span_bbox from mineru.utils.engine_utils import get_vlm_engine from mineru.utils.enum_class import MakeMode -from mineru.utils.config_reader import is_low_memory_enabled from mineru.backend.vlm.vlm_analyze import doc_analyze as vlm_doc_analyze -from mineru.backend.vlm.vlm_analyze import doc_analyze_low_memory as vlm_doc_analyze_low_memory -from mineru.backend.pipeline.pipeline_analyze import doc_analyze as pipeline_doc_analyze from mineru.backend.pipeline.pipeline_analyze import doc_analyze_streaming as pipeline_doc_analyze_streaming -from mineru.backend.pipeline.pipeline_analyze import doc_analyze_low_memory_multi as pipeline_doc_analyze_low_memory_multi -from mineru.backend.pipeline.pipeline_analyze import doc_analyze_low_memory_multi_streaming as pipeline_doc_analyze_low_memory_multi_streaming from mineru.backend.pipeline.pipeline_middle_json_mkcontent import union_make as pipeline_union_make -from mineru.backend.pipeline.model_json_to_middle_json import result_to_middle_json as pipeline_result_to_middle_json from mineru.backend.vlm.vlm_middle_json_mkcontent import union_make as vlm_union_make from mineru.backend.hybrid.hybrid_analyze import doc_analyze as hybrid_doc_analyze -from mineru.backend.hybrid.hybrid_analyze import doc_analyze_low_memory as hybrid_doc_analyze_low_memory from mineru.backend.office.office_middle_json_mkcontent import union_make as office_union_make from mineru.backend.office.docx_analyze import office_docx_analyze from mineru.utils.guess_suffix_or_lang import guess_suffix_by_path, guess_suffix_by_bytes @@ -113,26 +106,15 @@ def do_parse( future = output_executor.submit(run_output_task, doc_index, middle_json, model_list) output_futures.append(future) - if is_low_memory_enabled(): - pipeline_doc_analyze_low_memory_multi_streaming( - pdf_bytes_list, - image_writer_list, - p_lang_list, - on_doc_ready, - parse_method=parse_method, - formula_enable=formula_enable, - table_enable=table_enable, - ) - else: - pipeline_doc_analyze_streaming( - pdf_bytes_list, - image_writer_list, - p_lang_list, - on_doc_ready, - parse_method=parse_method, - formula_enable=formula_enable, - table_enable=table_enable, - ) + pipeline_doc_analyze_streaming( + pdf_bytes_list, + image_writer_list, + p_lang_list, + on_doc_ready, + parse_method=parse_method, + formula_enable=formula_enable, + table_enable=table_enable, + ) for future in output_futures: future.result() @@ -152,10 +134,9 @@ def do_parse( pdf_bytes = convert_pdf_bytes_to_bytes(pdf_bytes, start_page_id, end_page_id) local_image_dir, local_md_dir = prepare_env(output_dir, pdf_file_name, parse_method) image_writer, md_writer = FileBasedDataWriter(local_image_dir), FileBasedDataWriter(local_md_dir) - if is_low_memory_enabled(): - middle_json, infer_result = vlm_doc_analyze_low_memory(pdf_bytes, image_writer=image_writer, backend=backend, server_url=server_url) - else: - middle_json, infer_result = vlm_doc_analyze(pdf_bytes, image_writer=image_writer, backend=backend, server_url=server_url) + middle_json, infer_result = vlm_doc_analyze( + pdf_bytes, image_writer=image_writer, backend=backend, server_url=server_url + ) pdf_info = middle_json["pdf_info"] @@ -177,26 +158,15 @@ def do_parse( pdf_bytes = convert_pdf_bytes_to_bytes(pdf_bytes, start_page_id, end_page_id) local_image_dir, local_md_dir = prepare_env(output_dir, pdf_file_name, parse_method) image_writer, md_writer = FileBasedDataWriter(local_image_dir), FileBasedDataWriter(local_md_dir) - if is_low_memory_enabled(): - middle_json, infer_result, _vlm_ocr_enable = hybrid_doc_analyze_low_memory( - pdf_bytes, - image_writer=image_writer, - backend=backend, - parse_method=parse_method, - language=p_lang_list[idx], - inline_formula_enable=formula_enable, - server_url=server_url, - ) - else: - middle_json, infer_result, _vlm_ocr_enable = hybrid_doc_analyze( - pdf_bytes, - image_writer=image_writer, - backend=backend, - parse_method=parse_method, - language=p_lang_list[idx], - inline_formula_enable=formula_enable, - server_url=server_url, - ) + middle_json, infer_result, _vlm_ocr_enable = hybrid_doc_analyze( + pdf_bytes, + image_writer=image_writer, + backend=backend, + parse_method=parse_method, + language=p_lang_list[idx], + inline_formula_enable=formula_enable, + server_url=server_url, + ) pdf_info = middle_json["pdf_info"] diff --git a/mineru/backend/hybrid/hybrid_analyze.py b/mineru/backend/hybrid/hybrid_analyze.py index aae4bfb8..f850ab56 100644 --- a/mineru/backend/hybrid/hybrid_analyze.py +++ b/mineru/backend/hybrid/hybrid_analyze.py @@ -15,7 +15,6 @@ from mineru.backend.hybrid.hybrid_model_output_to_middle_json import ( append_page_model_list_to_middle_json, finalize_middle_json, init_middle_json, - result_to_middle_json, ) from mineru.backend.pipeline.model_init import HybridModelSingleton from mineru.backend.vlm.vlm_analyze import ( @@ -25,13 +24,13 @@ from mineru.backend.vlm.vlm_analyze import ( _maybe_enable_serial_execution, ) from mineru.data.data_reader_writer import DataWriter -from mineru.utils.config_reader import get_device, get_low_memory_window_size +from mineru.utils.config_reader import get_device, get_processing_window_size from mineru.utils.enum_class import ImageType, NotExtractType from mineru.utils.model_utils import crop_img, get_vram, clean_memory from mineru.utils.ocr_utils import get_adjusted_mfdetrec_res, get_ocr_result_list, sorted_boxes, merge_det_boxes, \ update_det_boxes, OcrConfidence from mineru.utils.pdf_classify import classify -from mineru.utils.pdf_image_tools import load_images_from_pdf, load_images_from_pdf_doc +from mineru.utils.pdf_image_tools import load_images_from_pdf_doc from mineru.utils.pdfium_guard import ( close_pdfium_document, get_pdfium_document_page_count, @@ -544,76 +543,6 @@ def doc_analyze( model_path: str | None = None, server_url: str | None = None, **kwargs, -): - # 初始化预测器 - if predictor is None: - predictor = ModelSingleton().get_model(backend, model_path, server_url, **kwargs) - predictor = _maybe_enable_serial_execution(predictor, backend) - - # 加载图像 - load_images_start = time.time() - images_list, pdf_doc = load_images_from_pdf(pdf_bytes, image_type=ImageType.PIL) - images_pil_list = [image_dict["img_pil"] for image_dict in images_list] - load_images_time = round(time.time() - load_images_start, 2) - logger.debug(f"load images cost: {load_images_time}, speed: {round(len(images_pil_list)/load_images_time, 3)} images/s") - - # 获取设备信息 - device = get_device() - - # 确定OCR配置 - _ocr_enable = ocr_classify(pdf_bytes, parse_method=parse_method) - _vlm_ocr_enable = _should_enable_vlm_ocr(_ocr_enable, language, inline_formula_enable) - - infer_start = time.time() - # VLM提取 - if _vlm_ocr_enable: - with predictor_execution_guard(predictor): - model_list = predictor.batch_two_step_extract(images=images_pil_list) - hybrid_pipeline_model = None - else: - batch_ratio = get_batch_ratio(device) - with predictor_execution_guard(predictor): - model_list = predictor.batch_two_step_extract( - images=images_pil_list, - not_extract_list=not_extract_list - ) - model_list, hybrid_pipeline_model = _process_ocr_and_formulas( - images_pil_list, - model_list, - language, - inline_formula_enable, - _ocr_enable, - batch_radio=batch_ratio, - ) - infer_time = round(time.time() - infer_start, 2) - logger.debug(f"infer finished, cost: {infer_time}, speed: {round(len(model_list)/infer_time, 3)} page/s") - - # 生成中间JSON - middle_json = result_to_middle_json( - model_list, - images_list, - pdf_doc, - image_writer, - _ocr_enable, - _vlm_ocr_enable, - hybrid_pipeline_model, - ) - - clean_memory(device) - return middle_json, model_list, _vlm_ocr_enable - - -def doc_analyze_low_memory( - pdf_bytes, - image_writer: DataWriter | None, - predictor: MinerUClient | None = None, - backend="transformers", - parse_method: str = 'auto', - language: str = 'ch', - inline_formula_enable: bool = True, - model_path: str | None = None, - server_url: str | None = None, - **kwargs, ): if predictor is None: predictor = ModelSingleton().get_model(backend, model_path, server_url, **kwargs) @@ -630,24 +559,24 @@ def doc_analyze_low_memory( hybrid_pipeline_model = None try: page_count = get_pdfium_document_page_count(pdf_doc) - if page_count == 0: - close_pdfium_document(pdf_doc) - doc_closed = True - clean_memory(device) - return middle_json, model_list, _vlm_ocr_enable - window_size = min(page_count, get_low_memory_window_size(default=64)) - total_windows = (page_count + window_size - 1) // window_size + configured_window_size = get_processing_window_size(default=64) + effective_window_size = min(page_count, configured_window_size) if page_count else 0 + total_windows = ( + (page_count + effective_window_size - 1) // effective_window_size + if effective_window_size + else 0 + ) logger.info( - f'Hybrid low-memory mode enabled. page_count={page_count}, ' - f'window_size={window_size}, total_windows={total_windows}' + f'Hybrid processing-window run. page_count={page_count}, ' + f'window_size={configured_window_size}, total_windows={total_windows}' ) batch_ratio = get_batch_ratio(device) if not _vlm_ocr_enable else 1 infer_start = time.time() with tqdm(total=page_count, desc="Processing pages") as progress_bar: - for window_index, window_start in enumerate(range(0, page_count, window_size)): - window_end = min(page_count - 1, window_start + window_size - 1) + for window_index, window_start in enumerate(range(0, page_count, effective_window_size or 1)): + window_end = min(page_count - 1, window_start + effective_window_size - 1) images_list = load_images_from_pdf_doc( pdf_doc, start_page_id=window_start, @@ -657,7 +586,7 @@ def doc_analyze_low_memory( try: images_pil_list = [image_dict["img_pil"] for image_dict in images_list] logger.info( - f'Hybrid low-memory window {window_index + 1}/{total_windows}: ' + f'Hybrid processing window {window_index + 1}/{total_windows}: ' f'pages {window_start + 1}-{window_end + 1}/{page_count} ' f'({len(images_pil_list)} pages)' ) @@ -695,9 +624,9 @@ def doc_analyze_low_memory( _close_images(images_list) infer_time = round(time.time() - infer_start, 2) - if infer_time > 0: + if infer_time > 0 and page_count > 0: logger.debug( - f"low-memory infer finished, cost: {infer_time}, " + f"processing-window infer finished, cost: {infer_time}, " f"speed: {round(len(model_list) / infer_time, 3)} page/s" ) @@ -727,76 +656,6 @@ async def aio_doc_analyze( model_path: str | None = None, server_url: str | None = None, **kwargs, -): - # 初始化预测器 - if predictor is None: - predictor = ModelSingleton().get_model(backend, model_path, server_url, **kwargs) - predictor = _maybe_enable_serial_execution(predictor, backend) - - # 加载图像 - load_images_start = time.time() - images_list, pdf_doc = load_images_from_pdf(pdf_bytes, image_type=ImageType.PIL) - images_pil_list = [image_dict["img_pil"] for image_dict in images_list] - load_images_time = round(time.time() - load_images_start, 2) - logger.debug(f"load images cost: {load_images_time}, speed: {round(len(images_pil_list)/load_images_time, 3)} images/s") - - # 获取设备信息 - device = get_device() - - # 确定OCR配置 - _ocr_enable = ocr_classify(pdf_bytes, parse_method=parse_method) - _vlm_ocr_enable = _should_enable_vlm_ocr(_ocr_enable, language, inline_formula_enable) - - infer_start = time.time() - # VLM提取 - if _vlm_ocr_enable: - async with aio_predictor_execution_guard(predictor): - model_list = await predictor.aio_batch_two_step_extract(images=images_pil_list) - hybrid_pipeline_model = None - else: - batch_ratio = get_batch_ratio(device) - async with aio_predictor_execution_guard(predictor): - model_list = await predictor.aio_batch_two_step_extract( - images=images_pil_list, - not_extract_list=not_extract_list - ) - model_list, hybrid_pipeline_model = _process_ocr_and_formulas( - images_pil_list, - model_list, - language, - inline_formula_enable, - _ocr_enable, - batch_radio=batch_ratio, - ) - infer_time = round(time.time() - infer_start, 2) - logger.debug(f"infer finished, cost: {infer_time}, speed: {round(len(model_list)/infer_time, 3)} page/s") - - # 生成中间JSON - middle_json = result_to_middle_json( - model_list, - images_list, - pdf_doc, - image_writer, - _ocr_enable, - _vlm_ocr_enable, - hybrid_pipeline_model, - ) - - clean_memory(device) - return middle_json, model_list, _vlm_ocr_enable - - -async def aio_doc_analyze_low_memory( - pdf_bytes, - image_writer: DataWriter | None, - predictor: MinerUClient | None = None, - backend="transformers", - parse_method: str = 'auto', - language: str = 'ch', - inline_formula_enable: bool = True, - model_path: str | None = None, - server_url: str | None = None, - **kwargs, ): if predictor is None: predictor = ModelSingleton().get_model(backend, model_path, server_url, **kwargs) @@ -813,24 +672,24 @@ async def aio_doc_analyze_low_memory( hybrid_pipeline_model = None try: page_count = get_pdfium_document_page_count(pdf_doc) - if page_count == 0: - close_pdfium_document(pdf_doc) - doc_closed = True - clean_memory(device) - return middle_json, model_list, _vlm_ocr_enable - window_size = min(page_count, get_low_memory_window_size(default=64)) - total_windows = (page_count + window_size - 1) // window_size + configured_window_size = get_processing_window_size(default=64) + effective_window_size = min(page_count, configured_window_size) if page_count else 0 + total_windows = ( + (page_count + effective_window_size - 1) // effective_window_size + if effective_window_size + else 0 + ) logger.info( - f'Hybrid low-memory mode enabled. page_count={page_count}, ' - f'window_size={window_size}, total_windows={total_windows}' + f'Hybrid processing-window run. page_count={page_count}, ' + f'window_size={configured_window_size}, total_windows={total_windows}' ) batch_ratio = get_batch_ratio(device) if not _vlm_ocr_enable else 1 infer_start = time.time() with tqdm(total=page_count, desc="Processing pages") as progress_bar: - for window_index, window_start in enumerate(range(0, page_count, window_size)): - window_end = min(page_count - 1, window_start + window_size - 1) + for window_index, window_start in enumerate(range(0, page_count, effective_window_size or 1)): + window_end = min(page_count - 1, window_start + effective_window_size - 1) images_list = load_images_from_pdf_doc( pdf_doc, start_page_id=window_start, @@ -840,7 +699,7 @@ async def aio_doc_analyze_low_memory( try: images_pil_list = [image_dict["img_pil"] for image_dict in images_list] logger.info( - f'Hybrid low-memory window {window_index + 1}/{total_windows}: ' + f'Hybrid processing window {window_index + 1}/{total_windows}: ' f'pages {window_start + 1}-{window_end + 1}/{page_count} ' f'({len(images_pil_list)} pages)' ) @@ -878,9 +737,9 @@ async def aio_doc_analyze_low_memory( _close_images(images_list) infer_time = round(time.time() - infer_start, 2) - if infer_time > 0: + if infer_time > 0 and page_count > 0: logger.debug( - f"low-memory infer finished, cost: {infer_time}, " + f"processing-window infer finished, cost: {infer_time}, " f"speed: {round(len(model_list) / infer_time, 3)} page/s" ) diff --git a/mineru/backend/pipeline/pipeline_analyze.py b/mineru/backend/pipeline/pipeline_analyze.py index 4a988e09..b6d402d8 100644 --- a/mineru/backend/pipeline/pipeline_analyze.py +++ b/mineru/backend/pipeline/pipeline_analyze.py @@ -9,11 +9,15 @@ from loguru import logger from tqdm import tqdm from .model_init import MineruPipelineModel -from .model_json_to_middle_json import append_batch_results_to_middle_json, finalize_middle_json, init_middle_json, result_to_middle_json -from mineru.utils.config_reader import get_device, get_low_memory_window_size +from .model_json_to_middle_json import ( + append_batch_results_to_middle_json, + finalize_middle_json, + init_middle_json, +) +from mineru.utils.config_reader import get_device, get_processing_window_size from ...utils.enum_class import ImageType from ...utils.pdf_classify import classify -from ...utils.pdf_image_tools import load_images_from_pdf, load_images_from_pdf_doc +from ...utils.pdf_image_tools import load_images_from_pdf_doc from ...utils.model_utils import get_vram, clean_memory from ...utils.pdfium_guard import ( close_pdfium_document, @@ -88,10 +92,6 @@ def _get_ocr_enable(pdf_bytes, parse_method: str) -> bool: return False -def _get_low_memory_window_size(page_count: int) -> int: - return min(page_count, get_low_memory_window_size(default=64)) - - def _close_images(images_list): for image_dict in images_list or []: pil_img = image_dict.get('img_pil') @@ -109,24 +109,7 @@ def _format_doc_slices(batch_slices): ) -def _build_page_result(page_idx: int, pil_img: Image.Image, layout_dets): - page_info_dict = {'page_no': page_idx, 'width': pil_img.width, 'height': pil_img.height} - return {'layout_dets': layout_dets, 'page_info': page_info_dict} - - -def _close_doc_context(context): - if context['closed']: - return - try: - close_pdfium_document(context['pdf_doc']) - except Exception: - pass - _close_images(context['images_list']) - context['images_list'].clear() - context['closed'] = True - - -def _finalize_low_memory_context(context, on_doc_ready): +def _finalize_processing_window_context(context, on_doc_ready): if context['closed']: return finalize_middle_json( @@ -150,170 +133,7 @@ def _finalize_low_memory_context(context, on_doc_ready): def _emit_zero_page_contexts(doc_contexts, on_doc_ready): for context in doc_contexts: if context['page_count'] == 0 and not context['closed']: - _finalize_low_memory_context(context, on_doc_ready) - -def doc_analyze( - pdf_bytes_list, - lang_list, - parse_method: str = 'auto', - formula_enable=True, - table_enable=True, -): - infer_results = [None] * len(pdf_bytes_list) - all_image_lists = [None] * len(pdf_bytes_list) - all_pdf_docs = [None] * len(pdf_bytes_list) - ocr_enabled_list = [None] * len(pdf_bytes_list) - - def on_doc_ready(context): - doc_index = context['doc_index'] - infer_results[doc_index] = context['model_list'] - all_image_lists[doc_index] = context['images_list'] - all_pdf_docs[doc_index] = context['pdf_doc'] - ocr_enabled_list[doc_index] = context['ocr_enable'] - context['closed'] = True - - try: - _doc_analyze_streaming_core( - pdf_bytes_list, - lang_list, - on_doc_ready, - parse_method=parse_method, - formula_enable=formula_enable, - table_enable=table_enable, - ) - except Exception: - for pdf_doc in all_pdf_docs: - if pdf_doc is not None: - try: - close_pdfium_document(pdf_doc) - except Exception: - pass - for images_list in all_image_lists: - if images_list is not None: - _close_images(images_list) - images_list.clear() - raise - - return infer_results, all_image_lists, all_pdf_docs, lang_list, ocr_enabled_list - - -def _doc_analyze_streaming_core( - pdf_bytes_list, - lang_list, - on_doc_ready, - parse_method: str = 'auto', - formula_enable=True, - table_enable=True, -): - """ - 适当调大MIN_BATCH_INFERENCE_SIZE可以提高性能,更大的 MIN_BATCH_INFERENCE_SIZE会消耗更多内存, - 可通过环境变量MINERU_MIN_BATCH_INFERENCE_SIZE设置,默认值为384。 - """ - if len(pdf_bytes_list) != len(lang_list): - raise ValueError("pdf_bytes_list and lang_list must have the same length") - - min_batch_inference_size = int(os.environ.get('MINERU_MIN_BATCH_INFERENCE_SIZE', 384)) - - all_pages_info = [] - all_doc_contexts = [] - total_pages = 0 - load_images_start = time.time() - for pdf_idx, (pdf_bytes, lang) in enumerate(zip(pdf_bytes_list, lang_list)): - _ocr_enable = _get_ocr_enable(pdf_bytes, parse_method) - images_list, pdf_doc = load_images_from_pdf(pdf_bytes, image_type=ImageType.PIL) - page_count = len(images_list) - total_pages += page_count - all_doc_contexts.append( - { - 'doc_index': pdf_idx, - 'images_list': images_list, - 'pdf_doc': pdf_doc, - 'page_count': page_count, - 'lang': lang, - 'ocr_enable': _ocr_enable, - 'model_list': [], - 'closed': False, - } - ) - for page_idx, img_dict in enumerate(images_list): - all_pages_info.append(( - pdf_idx, - page_idx, - img_dict['img_pil'], - _ocr_enable, - lang, - )) - load_images_time = round(time.time() - load_images_start, 2) - if load_images_time > 0 and total_pages > 0: - logger.debug(f"load images cost: {load_images_time}, speed: {round(total_pages / load_images_time, 3)} images/s") - - images_with_extra_info = [(info[2], info[3], info[4]) for info in all_pages_info] - batch_images = [ - images_with_extra_info[i:i + min_batch_inference_size] - for i in range(0, len(images_with_extra_info), min_batch_inference_size) - ] - - doc_end_offsets = [] - cumulative_pages = 0 - for context in all_doc_contexts: - cumulative_pages += context['page_count'] - doc_end_offsets.append(cumulative_pages) - - next_doc_to_emit = 0 - processed_images_count = 0 - infer_start = time.time() - try: - while next_doc_to_emit < len(all_doc_contexts) and doc_end_offsets[next_doc_to_emit] == 0: - on_doc_ready(all_doc_contexts[next_doc_to_emit]) - next_doc_to_emit += 1 - - for index, batch_image in enumerate(batch_images): - processed_images_count += len(batch_image) - logger.info( - f'Batch {index + 1}/{len(batch_images)}: ' - f'{processed_images_count} pages/{len(images_with_extra_info)} pages' - ) - batch_results = batch_image_analyze(batch_image, formula_enable, table_enable) - batch_start_index = processed_images_count - len(batch_image) - for page_meta, page_result in zip( - all_pages_info[batch_start_index: processed_images_count], - batch_results, - ): - pdf_idx, page_idx, pil_img, _, _ = page_meta - all_doc_contexts[pdf_idx]['model_list'].append( - _build_page_result(page_idx, pil_img, page_result) - ) - - while next_doc_to_emit < len(all_doc_contexts) and doc_end_offsets[next_doc_to_emit] <= processed_images_count: - on_doc_ready(all_doc_contexts[next_doc_to_emit]) - next_doc_to_emit += 1 - - infer_time = round(time.time() - infer_start, 2) - if infer_time > 0 and total_pages > 0: - logger.debug(f"infer finished, cost: {infer_time}, speed: {round(total_pages / infer_time, 3)} page/s") - finally: - for context in all_doc_contexts: - if not context['closed']: - _close_doc_context(context) - - -def doc_analyze_low_memory( - pdf_bytes, - image_writer, - lang, - parse_method: str = 'auto', - formula_enable=True, - table_enable=True, -): - middle_json_list, model_list_list, _ = doc_analyze_low_memory_multi( - [pdf_bytes], - [image_writer], - [lang], - parse_method=parse_method, - formula_enable=formula_enable, - table_enable=table_enable, - ) - return middle_json_list[0], model_list_list[0] + _finalize_processing_window_context(context, on_doc_ready) def doc_analyze_streaming( @@ -328,59 +148,11 @@ def doc_analyze_streaming( if not (len(pdf_bytes_list) == len(image_writer_list) == len(lang_list)): raise ValueError("pdf_bytes_list, image_writer_list, and lang_list must have the same length") - def handle_doc_ready(context): - middle_json = init_middle_json() - if context['page_count'] == 0: - finalize_middle_json( - middle_json['pdf_info'], - lang=context['lang'], - ocr_enable=context['ocr_enable'], - ) - else: - middle_json = result_to_middle_json( - context['model_list'], - context['images_list'], - context['pdf_doc'], - image_writer_list[context['doc_index']], - context['lang'], - context['ocr_enable'], - ) - logger.debug( - f"Pipeline doc ready: doc{context['doc_index']} pages={context['page_count']}" - ) - on_doc_ready( - context['doc_index'], - context['model_list'], - middle_json, - context['ocr_enable'], - ) - _close_doc_context(context) - - _doc_analyze_streaming_core( - pdf_bytes_list, - lang_list, - handle_doc_ready, - parse_method=parse_method, - formula_enable=formula_enable, - table_enable=table_enable, - ) - - -def doc_analyze_low_memory_multi_streaming( - pdf_bytes_list, - image_writer_list, - lang_list, - on_doc_ready, - parse_method: str = 'auto', - formula_enable=True, - table_enable=True, -): - if not (len(pdf_bytes_list) == len(image_writer_list) == len(lang_list)): - raise ValueError("pdf_bytes_list, image_writer_list, and lang_list must have the same length") - doc_contexts = [] total_pages = 0 - for doc_index, (pdf_bytes, image_writer, lang) in enumerate(zip(pdf_bytes_list, image_writer_list, lang_list)): + for doc_index, (pdf_bytes, image_writer, lang) in enumerate( + zip(pdf_bytes_list, image_writer_list, lang_list) + ): _ocr_enable = _get_ocr_enable(pdf_bytes, parse_method) pdf_doc = open_pdfium_document(pdfium.PdfDocument, pdf_bytes) page_count = get_pdfium_document_page_count(pdf_doc) @@ -404,10 +176,10 @@ def doc_analyze_low_memory_multi_streaming( _emit_zero_page_contexts(doc_contexts, on_doc_ready) return - window_size = get_low_memory_window_size(default=64) + window_size = get_processing_window_size(default=64) total_batches = (total_pages + window_size - 1) // window_size logger.info( - f'Pipeline low-memory multi-file mode enabled. doc_count={len(doc_contexts)}, ' + f'Pipeline processing-window multi-file run. doc_count={len(doc_contexts)}, ' f'total_pages={total_pages}, window_size={window_size}, total_batches={total_batches}' ) @@ -456,7 +228,7 @@ def doc_analyze_low_memory_multi_streaming( batch_capacity -= take_count logger.info( - f'Pipeline low-memory batch {batch_index}/{total_batches}: ' + f'Pipeline processing window batch {batch_index}/{total_batches}: ' f'{processed_pages + len(batch_images)}/{total_pages} pages, ' f'batch_pages={len(batch_images)}, doc_slices={_format_doc_slices(batch_slices)}' ) @@ -486,14 +258,14 @@ def doc_analyze_low_memory_multi_streaming( images_list.clear() if context['next_page_idx'] >= context['page_count'] and not context['closed']: - _finalize_low_memory_context(context, on_doc_ready) + _finalize_processing_window_context(context, on_doc_ready) processed_pages += len(batch_images) infer_time = round(time.time() - infer_start, 2) if infer_time > 0: logger.debug( - f"low-memory multi-file infer finished, cost: {infer_time}, " + f"processing-window multi-file infer finished, cost: {infer_time}, " f"speed: {round(total_pages / infer_time, 3)} page/s" ) finally: @@ -503,36 +275,6 @@ def doc_analyze_low_memory_multi_streaming( context['closed'] = True -def doc_analyze_low_memory_multi( - pdf_bytes_list, - image_writer_list, - lang_list, - parse_method: str = 'auto', - formula_enable=True, - table_enable=True, -): - middle_json_list = [None] * len(pdf_bytes_list) - model_list_list = [None] * len(pdf_bytes_list) - ocr_enabled_list = [None] * len(pdf_bytes_list) - - def on_doc_ready(doc_index, model_list, middle_json, ocr_enable): - middle_json_list[doc_index] = middle_json - model_list_list[doc_index] = model_list - ocr_enabled_list[doc_index] = ocr_enable - - doc_analyze_low_memory_multi_streaming( - pdf_bytes_list, - image_writer_list, - lang_list, - on_doc_ready, - parse_method=parse_method, - formula_enable=formula_enable, - table_enable=table_enable, - ) - - return middle_json_list, model_list_list, ocr_enabled_list - - def batch_image_analyze( images_with_extra_info: List[Tuple[Image.Image, bool, str]], formula_enable=True, diff --git a/mineru/backend/vlm/vlm_analyze.py b/mineru/backend/vlm/vlm_analyze.py index 33790a3f..eb03cba3 100644 --- a/mineru/backend/vlm/vlm_analyze.py +++ b/mineru/backend/vlm/vlm_analyze.py @@ -12,11 +12,15 @@ from tqdm import tqdm from .utils import enable_custom_logits_processors, set_default_gpu_memory_utilization, set_default_batch_size, \ set_lmdeploy_backend, mod_kwargs_by_device_type -from .model_output_to_middle_json import append_page_blocks_to_middle_json, finalize_middle_json, init_middle_json, result_to_middle_json +from .model_output_to_middle_json import ( + append_page_blocks_to_middle_json, + finalize_middle_json, + init_middle_json, +) from ...data.data_reader_writer import DataWriter -from mineru.utils.pdf_image_tools import load_images_from_pdf, load_images_from_pdf_doc +from mineru.utils.pdf_image_tools import load_images_from_pdf_doc from ...utils.check_sys_env import is_mac_os_version_supported -from ...utils.config_reader import get_device, get_low_memory_window_size +from ...utils.config_reader import get_device, get_processing_window_size from ...utils.enum_class import ImageType from ...utils.pdfium_guard import ( @@ -298,56 +302,28 @@ def doc_analyze( predictor = ModelSingleton().get_model(backend, model_path, server_url, **kwargs) predictor = _maybe_enable_serial_execution(predictor, backend) - load_images_start = time.time() - images_list, pdf_doc = load_images_from_pdf(pdf_bytes, image_type=ImageType.PIL) - images_pil_list = [image_dict["img_pil"] for image_dict in images_list] - load_images_time = round(time.time() - load_images_start, 2) - logger.debug(f"load images cost: {load_images_time}, speed: {round(len(images_pil_list)/load_images_time, 3)} images/s") - - infer_start = time.time() - with predictor_execution_guard(predictor): - results = predictor.batch_two_step_extract(images=images_pil_list) - infer_time = round(time.time() - infer_start, 2) - logger.debug(f"infer finished, cost: {infer_time}, speed: {round(len(results)/infer_time, 3)} page/s") - - middle_json = result_to_middle_json(results, images_list, pdf_doc, image_writer) - return middle_json, results - - -def doc_analyze_low_memory( - pdf_bytes, - image_writer: DataWriter | None, - predictor: MinerUClient | None = None, - backend="transformers", - model_path: str | None = None, - server_url: str | None = None, - **kwargs, -): - if predictor is None: - predictor = ModelSingleton().get_model(backend, model_path, server_url, **kwargs) - predictor = _maybe_enable_serial_execution(predictor, backend) - pdf_doc = open_pdfium_document(pdfium.PdfDocument, pdf_bytes) middle_json = init_middle_json() results = [] doc_closed = False try: page_count = get_pdfium_document_page_count(pdf_doc) - if page_count == 0: - close_pdfium_document(pdf_doc) - doc_closed = True - return middle_json, results - window_size = min(page_count, get_low_memory_window_size(default=64)) - total_windows = (page_count + window_size - 1) // window_size + configured_window_size = get_processing_window_size(default=64) + effective_window_size = min(page_count, configured_window_size) if page_count else 0 + total_windows = ( + (page_count + effective_window_size - 1) // effective_window_size + if effective_window_size + else 0 + ) logger.info( - f'VLM low-memory mode enabled. page_count={page_count}, ' - f'window_size={window_size}, total_windows={total_windows}' + f'VLM processing-window run. page_count={page_count}, ' + f'window_size={configured_window_size}, total_windows={total_windows}' ) infer_start = time.time() with tqdm(total=page_count, desc="Processing pages") as progress_bar: - for window_index, window_start in enumerate(range(0, page_count, window_size)): - window_end = min(page_count - 1, window_start + window_size - 1) + for window_index, window_start in enumerate(range(0, page_count, effective_window_size or 1)): + window_end = min(page_count - 1, window_start + effective_window_size - 1) images_list = load_images_from_pdf_doc( pdf_doc, start_page_id=window_start, @@ -357,7 +333,7 @@ def doc_analyze_low_memory( try: images_pil_list = [image_dict["img_pil"] for image_dict in images_list] logger.info( - f'VLM low-memory window {window_index + 1}/{total_windows}: ' + f'VLM processing window {window_index + 1}/{total_windows}: ' f'pages {window_start + 1}-{window_end + 1}/{page_count} ' f'({len(images_pil_list)} pages)' ) @@ -376,9 +352,9 @@ def doc_analyze_low_memory( finally: _close_images(images_list) infer_time = round(time.time() - infer_start, 2) - if infer_time > 0: + if infer_time > 0 and page_count > 0: logger.debug( - f"low-memory infer finished, cost: {infer_time}, " + f"processing-window infer finished, cost: {infer_time}, " f"speed: {round(len(results) / infer_time, 3)} page/s" ) finalize_middle_json(middle_json["pdf_info"]) @@ -403,55 +379,28 @@ async def aio_doc_analyze( predictor = ModelSingleton().get_model(backend, model_path, server_url, **kwargs) predictor = _maybe_enable_serial_execution(predictor, backend) - load_images_start = time.time() - images_list, pdf_doc = load_images_from_pdf(pdf_bytes, image_type=ImageType.PIL) - images_pil_list = [image_dict["img_pil"] for image_dict in images_list] - load_images_time = round(time.time() - load_images_start, 2) - logger.debug(f"load images cost: {load_images_time}, speed: {round(len(images_pil_list)/load_images_time, 3)} images/s") - - infer_start = time.time() - async with aio_predictor_execution_guard(predictor): - results = await predictor.aio_batch_two_step_extract(images=images_pil_list) - infer_time = round(time.time() - infer_start, 2) - logger.debug(f"infer finished, cost: {infer_time}, speed: {round(len(results)/infer_time, 3)} page/s") - middle_json = result_to_middle_json(results, images_list, pdf_doc, image_writer) - return middle_json, results - - -async def aio_doc_analyze_low_memory( - pdf_bytes, - image_writer: DataWriter | None, - predictor: MinerUClient | None = None, - backend="transformers", - model_path: str | None = None, - server_url: str | None = None, - **kwargs, -): - if predictor is None: - predictor = ModelSingleton().get_model(backend, model_path, server_url, **kwargs) - predictor = _maybe_enable_serial_execution(predictor, backend) - pdf_doc = open_pdfium_document(pdfium.PdfDocument, pdf_bytes) middle_json = init_middle_json() results = [] doc_closed = False try: page_count = get_pdfium_document_page_count(pdf_doc) - if page_count == 0: - close_pdfium_document(pdf_doc) - doc_closed = True - return middle_json, results - window_size = min(page_count, get_low_memory_window_size(default=64)) - total_windows = (page_count + window_size - 1) // window_size + configured_window_size = get_processing_window_size(default=64) + effective_window_size = min(page_count, configured_window_size) if page_count else 0 + total_windows = ( + (page_count + effective_window_size - 1) // effective_window_size + if effective_window_size + else 0 + ) logger.info( - f'VLM low-memory mode enabled. page_count={page_count}, ' - f'window_size={window_size}, total_windows={total_windows}' + f'VLM processing-window run. page_count={page_count}, ' + f'window_size={configured_window_size}, total_windows={total_windows}' ) infer_start = time.time() with tqdm(total=page_count, desc="Processing pages") as progress_bar: - for window_index, window_start in enumerate(range(0, page_count, window_size)): - window_end = min(page_count - 1, window_start + window_size - 1) + for window_index, window_start in enumerate(range(0, page_count, effective_window_size or 1)): + window_end = min(page_count - 1, window_start + effective_window_size - 1) images_list = load_images_from_pdf_doc( pdf_doc, start_page_id=window_start, @@ -461,7 +410,7 @@ async def aio_doc_analyze_low_memory( try: images_pil_list = [image_dict["img_pil"] for image_dict in images_list] logger.info( - f'VLM low-memory window {window_index + 1}/{total_windows}: ' + f'VLM processing window {window_index + 1}/{total_windows}: ' f'pages {window_start + 1}-{window_end + 1}/{page_count} ' f'({len(images_pil_list)} pages)' ) @@ -480,9 +429,9 @@ async def aio_doc_analyze_low_memory( finally: _close_images(images_list) infer_time = round(time.time() - infer_start, 2) - if infer_time > 0: + if infer_time > 0 and page_count > 0: logger.debug( - f"low-memory infer finished, cost: {infer_time}, " + f"processing-window infer finished, cost: {infer_time}, " f"speed: {round(len(results) / infer_time, 3)} page/s" ) finalize_middle_json(middle_json["pdf_info"]) diff --git a/mineru/cli/common.py b/mineru/cli/common.py index a24c5b4d..abdbcfb9 100644 --- a/mineru/cli/common.py +++ b/mineru/cli/common.py @@ -13,7 +13,6 @@ from mineru.utils.draw_bbox import draw_layout_bbox, draw_span_bbox from mineru.utils.engine_utils import get_vlm_engine from mineru.utils.enum_class import MakeMode from mineru.utils.guess_suffix_or_lang import guess_suffix_by_bytes -from mineru.utils.config_reader import is_low_memory_enabled from mineru.utils.pdf_image_tools import images_bytes_to_pdf_bytes from mineru.backend.vlm.vlm_middle_json_mkcontent import union_make as vlm_union_make from mineru.backend.office.office_middle_json_mkcontent import union_make as office_union_make @@ -201,7 +200,6 @@ def _process_pipeline( ): """处理pipeline后端逻辑""" from mineru.backend.pipeline.pipeline_analyze import doc_analyze_streaming as pipeline_doc_analyze_streaming - from mineru.backend.pipeline.pipeline_analyze import doc_analyze_low_memory_multi_streaming as pipeline_doc_analyze_low_memory_multi_streaming image_writer_list = [] md_writer_list = [] @@ -241,26 +239,15 @@ def _process_pipeline( future = output_executor.submit(run_output_task, doc_index, middle_json, model_list) output_futures.append(future) - if is_low_memory_enabled(): - pipeline_doc_analyze_low_memory_multi_streaming( - pdf_bytes_list, - image_writer_list, - p_lang_list, - on_doc_ready, - parse_method=parse_method, - formula_enable=p_formula_enable, - table_enable=p_table_enable, - ) - else: - pipeline_doc_analyze_streaming( - pdf_bytes_list, - image_writer_list, - p_lang_list, - on_doc_ready, - parse_method=parse_method, - formula_enable=p_formula_enable, - table_enable=p_table_enable, - ) + pipeline_doc_analyze_streaming( + pdf_bytes_list, + image_writer_list, + p_lang_list, + on_doc_ready, + parse_method=parse_method, + formula_enable=p_formula_enable, + table_enable=p_table_enable, + ) for future in output_futures: future.result() @@ -294,15 +281,9 @@ async def _async_process_vlm( local_image_dir, local_md_dir = prepare_env(output_dir, pdf_file_name, parse_method) image_writer, md_writer = FileBasedDataWriter(local_image_dir), FileBasedDataWriter(local_md_dir) - if is_low_memory_enabled(): - from mineru.backend.vlm.vlm_analyze import aio_doc_analyze_low_memory as aio_vlm_doc_analyze_low_memory - middle_json, infer_result = await aio_vlm_doc_analyze_low_memory( - pdf_bytes, image_writer=image_writer, backend=backend, server_url=server_url, **kwargs, - ) - else: - middle_json, infer_result = await aio_vlm_doc_analyze( - pdf_bytes, image_writer=image_writer, backend=backend, server_url=server_url, **kwargs, - ) + middle_json, infer_result = await aio_vlm_doc_analyze( + pdf_bytes, image_writer=image_writer, backend=backend, server_url=server_url, **kwargs, + ) pdf_info = middle_json["pdf_info"] @@ -341,15 +322,9 @@ def _process_vlm( local_image_dir, local_md_dir = prepare_env(output_dir, pdf_file_name, parse_method) image_writer, md_writer = FileBasedDataWriter(local_image_dir), FileBasedDataWriter(local_md_dir) - if is_low_memory_enabled(): - from mineru.backend.vlm.vlm_analyze import doc_analyze_low_memory as vlm_doc_analyze_low_memory - middle_json, infer_result = vlm_doc_analyze_low_memory( - pdf_bytes, image_writer=image_writer, backend=backend, server_url=server_url, **kwargs, - ) - else: - middle_json, infer_result = vlm_doc_analyze( - pdf_bytes, image_writer=image_writer, backend=backend, server_url=server_url, **kwargs, - ) + middle_json, infer_result = vlm_doc_analyze( + pdf_bytes, image_writer=image_writer, backend=backend, server_url=server_url, **kwargs, + ) pdf_info = middle_json["pdf_info"] @@ -390,29 +365,16 @@ def _process_hybrid( local_image_dir, local_md_dir = prepare_env(output_dir, pdf_file_name, f"hybrid_{parse_method}") image_writer, md_writer = FileBasedDataWriter(local_image_dir), FileBasedDataWriter(local_md_dir) - if is_low_memory_enabled(): - from mineru.backend.hybrid.hybrid_analyze import doc_analyze_low_memory as hybrid_doc_analyze_low_memory - middle_json, infer_result, _vlm_ocr_enable = hybrid_doc_analyze_low_memory( - pdf_bytes, - image_writer=image_writer, - backend=backend, - parse_method=parse_method, - language=lang, - inline_formula_enable=inline_formula_enable, - server_url=server_url, - **kwargs, - ) - else: - middle_json, infer_result, _vlm_ocr_enable = hybrid_doc_analyze( - pdf_bytes, - image_writer=image_writer, - backend=backend, - parse_method=parse_method, - language=lang, - inline_formula_enable=inline_formula_enable, - server_url=server_url, - **kwargs, - ) + middle_json, infer_result, _vlm_ocr_enable = hybrid_doc_analyze( + pdf_bytes, + image_writer=image_writer, + backend=backend, + parse_method=parse_method, + language=lang, + inline_formula_enable=inline_formula_enable, + server_url=server_url, + **kwargs, + ) pdf_info = middle_json["pdf_info"] @@ -456,29 +418,16 @@ async def _async_process_hybrid( local_image_dir, local_md_dir = prepare_env(output_dir, pdf_file_name, f"hybrid_{parse_method}") image_writer, md_writer = FileBasedDataWriter(local_image_dir), FileBasedDataWriter(local_md_dir) - if is_low_memory_enabled(): - from mineru.backend.hybrid.hybrid_analyze import aio_doc_analyze_low_memory as aio_hybrid_doc_analyze_low_memory - middle_json, infer_result, _vlm_ocr_enable = await aio_hybrid_doc_analyze_low_memory( - pdf_bytes, - image_writer=image_writer, - backend=backend, - parse_method=parse_method, - language=lang, - inline_formula_enable=inline_formula_enable, - server_url=server_url, - **kwargs, - ) - else: - middle_json, infer_result, _vlm_ocr_enable = await aio_hybrid_doc_analyze( - pdf_bytes, - image_writer=image_writer, - backend=backend, - parse_method=parse_method, - language=lang, - inline_formula_enable=inline_formula_enable, - server_url=server_url, - **kwargs, - ) + middle_json, infer_result, _vlm_ocr_enable = await aio_hybrid_doc_analyze( + pdf_bytes, + image_writer=image_writer, + backend=backend, + parse_method=parse_method, + language=lang, + inline_formula_enable=inline_formula_enable, + server_url=server_url, + **kwargs, + ) pdf_info = middle_json["pdf_info"] diff --git a/mineru/utils/config_reader.py b/mineru/utils/config_reader.py index d26fd7b8..983537d3 100644 --- a/mineru/utils/config_reader.py +++ b/mineru/utils/config_reader.py @@ -125,18 +125,16 @@ def get_ocr_det_mask_inline_formula_enable(enable): return enable -def is_low_memory_enabled() -> bool: - return os.getenv('MINERU_LOW_MEMORY', 'true').lower() in ('1', 'true', 'yes') - - -def get_low_memory_window_size(default: int = 64) -> int: - value = os.getenv('MINERU_LOW_MEMORY_WINDOW_SIZE') +def get_processing_window_size(default: int = 64) -> int: + value = os.getenv('MINERU_PROCESSING_WINDOW_SIZE') if value is None: return default try: window_size = int(value) except ValueError: - logger.warning(f"Invalid MINERU_LOW_MEMORY_WINDOW_SIZE value: {value}, use default {default}") + logger.warning( + f"Invalid MINERU_PROCESSING_WINDOW_SIZE value: {value}, use default {default}" + ) return default return max(1, window_size) diff --git a/tests/unittest/test_e2e.py b/tests/unittest/test_e2e.py index 74eaece7..cb7cb206 100644 --- a/tests/unittest/test_e2e.py +++ b/tests/unittest/test_e2e.py @@ -1,5 +1,4 @@ # Copyright (c) Opendatalab. All rights reserved. -import copy import json import os from pathlib import Path @@ -13,15 +12,12 @@ from mineru.cli.common import ( ) from mineru.data.data_reader_writer import FileBasedDataWriter from mineru.utils.enum_class import MakeMode -from mineru.backend.vlm.vlm_analyze import doc_analyze as vlm_doc_analyze -from mineru.backend.pipeline.pipeline_analyze import doc_analyze as pipeline_doc_analyze +from mineru.backend.pipeline.pipeline_analyze import ( + doc_analyze_streaming as pipeline_doc_analyze_streaming, +) from mineru.backend.pipeline.pipeline_middle_json_mkcontent import ( union_make as pipeline_union_make, ) -from mineru.backend.pipeline.model_json_to_middle_json import ( - result_to_middle_json as pipeline_result_to_middle_json, -) -from mineru.backend.vlm.vlm_middle_json_mkcontent import union_make as vlm_union_make def test_pipeline_with_two_config(): @@ -51,21 +47,10 @@ def test_pipeline_with_two_config(): new_pdf_bytes = convert_pdf_bytes_to_bytes(pdf_bytes) pdf_bytes_list[idx] = new_pdf_bytes - # 获取 pipline 分析结果, 分别测试 txt 和 ocr 两种解析方法的结果 - infer_results, all_image_lists, all_pdf_docs, lang_list, ocr_enabled_list = ( - pipeline_doc_analyze( - pdf_bytes_list, - p_lang_list, - parse_method="txt", - ) - ) - write_infer_result( - infer_results, - all_image_lists, - all_pdf_docs, - lang_list, - ocr_enabled_list, + run_pipeline_parse( pdf_file_names, + pdf_bytes_list, + p_lang_list, output_dir, parse_method="txt", ) @@ -73,20 +58,10 @@ def test_pipeline_with_two_config(): Path(__file__).parent / "output" / "test" / "txt" / "test_content_list.json" ).as_posix() assert_content(res_json_path, parse_method="txt") - infer_results, all_image_lists, all_pdf_docs, lang_list, ocr_enabled_list = ( - pipeline_doc_analyze( - pdf_bytes_list, - p_lang_list, - parse_method="ocr", - ) - ) - write_infer_result( - infer_results, - all_image_lists, - all_pdf_docs, - lang_list, - ocr_enabled_list, + run_pipeline_parse( pdf_file_names, + pdf_bytes_list, + p_lang_list, output_dir, parse_method="ocr", ) @@ -96,137 +71,74 @@ def test_pipeline_with_two_config(): assert_content(res_json_path, parse_method="ocr") -# def test_vlm_transformers_with_default_config(): -# __dir__ = os.path.dirname(os.path.abspath(__file__)) -# pdf_files_dir = os.path.join(__dir__, "pdfs") -# output_dir = os.path.join(__dir__, "output") -# pdf_suffixes = [".pdf"] -# image_suffixes = [".png", ".jpeg", ".jpg"] -# -# doc_path_list = [] -# for doc_path in Path(pdf_files_dir).glob("*"): -# if doc_path.suffix in pdf_suffixes + image_suffixes: -# doc_path_list.append(doc_path) -# -# # os.environ["MINERU_MODEL_SOURCE"] = "modelscope" -# -# pdf_file_names = [] -# pdf_bytes_list = [] -# p_lang_list = [] -# for path in doc_path_list: -# file_name = str(Path(path).stem) -# pdf_bytes = read_fn(path) -# pdf_file_names.append(file_name) -# pdf_bytes_list.append(pdf_bytes) -# p_lang_list.append("en") -# -# for idx, pdf_bytes in enumerate(pdf_bytes_list): -# pdf_file_name = pdf_file_names[idx] -# pdf_bytes = convert_pdf_bytes_to_bytes(pdf_bytes) -# local_image_dir, local_md_dir = prepare_env( -# output_dir, pdf_file_name, parse_method="vlm" -# ) -# image_writer, md_writer = FileBasedDataWriter( -# local_image_dir -# ), FileBasedDataWriter(local_md_dir) -# middle_json, infer_result = vlm_doc_analyze( -# pdf_bytes, image_writer=image_writer, backend="transformers" -# ) -# -# pdf_info = middle_json["pdf_info"] -# -# image_dir = str(os.path.basename(local_image_dir)) -# -# md_content_str = vlm_union_make(pdf_info, MakeMode.MM_MD, image_dir) -# md_writer.write_string( -# f"{pdf_file_name}.md", -# md_content_str, -# ) -# -# content_list = vlm_union_make(pdf_info, MakeMode.CONTENT_LIST, image_dir) -# md_writer.write_string( -# f"{pdf_file_name}_content_list.json", -# json.dumps(content_list, ensure_ascii=False, indent=4), -# ) -# -# md_writer.write_string( -# f"{pdf_file_name}_middle.json", -# json.dumps(middle_json, ensure_ascii=False, indent=4), -# ) -# -# md_writer.write_string( -# f"{pdf_file_name}_model.json", -# json.dumps(infer_result, ensure_ascii=False, indent=4), -# ) -# -# logger.info(f"local output dir is {local_md_dir}") -# res_json_path = ( -# Path(__file__).parent / "output" / "test" / "vlm" / "test_content_list.json" -# ).as_posix() -# assert_content(res_json_path, parse_method="vlm") - - -def write_infer_result( - infer_results, - all_image_lists, - all_pdf_docs, - lang_list, - ocr_enabled_list, +def run_pipeline_parse( pdf_file_names, + pdf_bytes_list, + p_lang_list, output_dir, parse_method, ): - for idx, model_list in enumerate(infer_results): - model_json = copy.deepcopy(model_list) - pdf_file_name = pdf_file_names[idx] - local_image_dir, local_md_dir = prepare_env( - output_dir, pdf_file_name, parse_method - ) - image_writer, md_writer = FileBasedDataWriter( - local_image_dir - ), FileBasedDataWriter(local_md_dir) + image_writer_list = [] + output_info = [] + for pdf_file_name in pdf_file_names: + local_image_dir, local_md_dir = prepare_env(output_dir, pdf_file_name, parse_method) + image_writer_list.append(FileBasedDataWriter(local_image_dir)) + output_info.append((pdf_file_name, local_image_dir, local_md_dir)) - images_list = all_image_lists[idx] - pdf_doc = all_pdf_docs[idx] - _lang = lang_list[idx] - _ocr_enable = ocr_enabled_list[idx] - middle_json = pipeline_result_to_middle_json( + def on_doc_ready(doc_index, model_list, middle_json, ocr_enable): + del ocr_enable + pdf_file_name, local_image_dir, local_md_dir = output_info[doc_index] + write_infer_result( + pdf_file_name, + local_image_dir, + local_md_dir, + middle_json, model_list, - images_list, - pdf_doc, - image_writer, - _lang, - _ocr_enable, - True, ) - pdf_info = middle_json["pdf_info"] + pipeline_doc_analyze_streaming( + pdf_bytes_list, + image_writer_list, + p_lang_list, + on_doc_ready, + parse_method=parse_method, + ) - image_dir = str(os.path.basename(local_image_dir)) - # 写入 md 文件 - md_content_str = pipeline_union_make(pdf_info, MakeMode.MM_MD, image_dir) - md_writer.write_string( - f"{pdf_file_name}.md", - md_content_str, - ) - content_list = pipeline_union_make(pdf_info, MakeMode.CONTENT_LIST, image_dir) - md_writer.write_string( - f"{pdf_file_name}_content_list.json", - json.dumps(content_list, ensure_ascii=False, indent=4), - ) +def write_infer_result( + pdf_file_name, + local_image_dir, + local_md_dir, + middle_json, + model_list, +): + md_writer = FileBasedDataWriter(local_md_dir) + pdf_info = middle_json["pdf_info"] + image_dir = str(os.path.basename(local_image_dir)) - md_writer.write_string( - f"{pdf_file_name}_middle.json", - json.dumps(middle_json, ensure_ascii=False, indent=4), - ) + md_content_str = pipeline_union_make(pdf_info, MakeMode.MM_MD, image_dir) + md_writer.write_string( + f"{pdf_file_name}.md", + md_content_str, + ) - md_writer.write_string( - f"{pdf_file_name}_model.json", - json.dumps(model_json, ensure_ascii=False, indent=4), - ) + content_list = pipeline_union_make(pdf_info, MakeMode.CONTENT_LIST, image_dir) + md_writer.write_string( + f"{pdf_file_name}_content_list.json", + json.dumps(content_list, ensure_ascii=False, indent=4), + ) - logger.info(f"local output dir is {local_md_dir}") + md_writer.write_string( + f"{pdf_file_name}_middle.json", + json.dumps(middle_json, ensure_ascii=False, indent=4), + ) + + md_writer.write_string( + f"{pdf_file_name}_model.json", + json.dumps(model_list, ensure_ascii=False, indent=4), + ) + + logger.info(f"local output dir is {local_md_dir}") def validate_html(html_content):