diff --git a/mineru/backend/pipeline/pipeline_analyze.py b/mineru/backend/pipeline/pipeline_analyze.py index 2384b945..8bc9064b 100644 --- a/mineru/backend/pipeline/pipeline_analyze.py +++ b/mineru/backend/pipeline/pipeline_analyze.py @@ -109,6 +109,18 @@ def _build_page_result(page_idx: int, pil_img: Image.Image, layout_dets): return {'layout_dets': layout_dets, 'page_info': page_info_dict} +def _close_doc_context(context): + if context['closed']: + return + try: + context['pdf_doc'].close() + except Exception: + pass + _close_images(context['images_list']) + context['images_list'].clear() + context['closed'] = True + + def _finalize_low_memory_context(context, on_doc_ready): if context['closed']: return @@ -117,7 +129,7 @@ def _finalize_low_memory_context(context, on_doc_ready): lang=context['lang'], ocr_enable=context['ocr_enable'], ) - logger.info( + logger.debug( f"Pipeline doc ready: doc{context['doc_index']} pages={context['page_count']}" ) on_doc_ready( @@ -141,84 +153,143 @@ def doc_analyze( parse_method: str = 'auto', formula_enable=True, table_enable=True, +): + infer_results = [None] * len(pdf_bytes_list) + all_image_lists = [None] * len(pdf_bytes_list) + all_pdf_docs = [None] * len(pdf_bytes_list) + ocr_enabled_list = [None] * len(pdf_bytes_list) + + def on_doc_ready(context): + doc_index = context['doc_index'] + infer_results[doc_index] = context['model_list'] + all_image_lists[doc_index] = context['images_list'] + all_pdf_docs[doc_index] = context['pdf_doc'] + ocr_enabled_list[doc_index] = context['ocr_enable'] + context['closed'] = True + + try: + _doc_analyze_streaming_core( + pdf_bytes_list, + lang_list, + on_doc_ready, + parse_method=parse_method, + formula_enable=formula_enable, + table_enable=table_enable, + ) + except Exception: + for pdf_doc in all_pdf_docs: + if pdf_doc is not None: + try: + pdf_doc.close() + except Exception: + pass + for images_list in all_image_lists: + if images_list is not None: + _close_images(images_list) + images_list.clear() + raise + + return infer_results, all_image_lists, all_pdf_docs, lang_list, ocr_enabled_list + + +def _doc_analyze_streaming_core( + pdf_bytes_list, + lang_list, + on_doc_ready, + parse_method: str = 'auto', + formula_enable=True, + table_enable=True, ): """ 适当调大MIN_BATCH_INFERENCE_SIZE可以提高性能,更大的 MIN_BATCH_INFERENCE_SIZE会消耗更多内存, 可通过环境变量MINERU_MIN_BATCH_INFERENCE_SIZE设置,默认值为384。 """ + if len(pdf_bytes_list) != len(lang_list): + raise ValueError("pdf_bytes_list and lang_list must have the same length") + min_batch_inference_size = int(os.environ.get('MINERU_MIN_BATCH_INFERENCE_SIZE', 384)) - # 收集所有页面信息 - all_pages_info = [] # 存储(dataset_index, page_index, img, ocr, lang, width, height) - - all_image_lists = [] - all_pdf_docs = [] - ocr_enabled_list = [] + all_pages_info = [] + all_doc_contexts = [] + total_pages = 0 load_images_start = time.time() - for pdf_idx, pdf_bytes in enumerate(pdf_bytes_list): - # 确定OCR设置 - _ocr_enable = False - if parse_method == 'auto': - if classify(pdf_bytes) == 'ocr': - _ocr_enable = True - elif parse_method == 'ocr': - _ocr_enable = True - - ocr_enabled_list.append(_ocr_enable) - _lang = lang_list[pdf_idx] - - # 收集每个数据集中的页面 + for pdf_idx, (pdf_bytes, lang) in enumerate(zip(pdf_bytes_list, lang_list)): + _ocr_enable = _get_ocr_enable(pdf_bytes, parse_method) images_list, pdf_doc = load_images_from_pdf(pdf_bytes, image_type=ImageType.PIL) - all_image_lists.append(images_list) - all_pdf_docs.append(pdf_doc) - for page_idx in range(len(images_list)): - img_dict = images_list[page_idx] + page_count = len(images_list) + total_pages += page_count + all_doc_contexts.append( + { + 'doc_index': pdf_idx, + 'images_list': images_list, + 'pdf_doc': pdf_doc, + 'page_count': page_count, + 'lang': lang, + 'ocr_enable': _ocr_enable, + 'model_list': [], + 'closed': False, + } + ) + for page_idx, img_dict in enumerate(images_list): all_pages_info.append(( - pdf_idx, page_idx, - img_dict['img_pil'], _ocr_enable, _lang, + pdf_idx, + page_idx, + img_dict['img_pil'], + _ocr_enable, + lang, )) load_images_time = round(time.time() - load_images_start, 2) - logger.debug(f"load images cost: {load_images_time}, speed: {round(len(all_pages_info) / load_images_time, 3)} images/s") + if load_images_time > 0 and total_pages > 0: + logger.debug(f"load images cost: {load_images_time}, speed: {round(total_pages / load_images_time, 3)} images/s") - # 准备批处理 images_with_extra_info = [(info[2], info[3], info[4]) for info in all_pages_info] - batch_size = min_batch_inference_size batch_images = [ - images_with_extra_info[i:i + batch_size] - for i in range(0, len(images_with_extra_info), batch_size) + images_with_extra_info[i:i + min_batch_inference_size] + for i in range(0, len(images_with_extra_info), min_batch_inference_size) ] - # 执行批处理 - results = [] + doc_end_offsets = [] + cumulative_pages = 0 + for context in all_doc_contexts: + cumulative_pages += context['page_count'] + doc_end_offsets.append(cumulative_pages) + + next_doc_to_emit = 0 processed_images_count = 0 infer_start = time.time() - for index, batch_image in enumerate(batch_images): - processed_images_count += len(batch_image) - logger.info( - f'Batch {index + 1}/{len(batch_images)}: ' - f'{processed_images_count} pages/{len(images_with_extra_info)} pages' - ) - batch_results = batch_image_analyze(batch_image, formula_enable, table_enable) - results.extend(batch_results) - infer_time = round(time.time() - infer_start, 2) - logger.debug(f"infer finished, cost: {infer_time}, speed: {round(len(results) / infer_time, 3)} page/s") + try: + while next_doc_to_emit < len(all_doc_contexts) and doc_end_offsets[next_doc_to_emit] == 0: + on_doc_ready(all_doc_contexts[next_doc_to_emit]) + next_doc_to_emit += 1 - # 构建返回结果 - infer_results = [] + for index, batch_image in enumerate(batch_images): + processed_images_count += len(batch_image) + logger.info( + f'Batch {index + 1}/{len(batch_images)}: ' + f'{processed_images_count} pages/{len(images_with_extra_info)} pages' + ) + batch_results = batch_image_analyze(batch_image, formula_enable, table_enable) + batch_start_index = processed_images_count - len(batch_image) + for page_meta, page_result in zip( + all_pages_info[batch_start_index: processed_images_count], + batch_results, + ): + pdf_idx, page_idx, pil_img, _, _ = page_meta + all_doc_contexts[pdf_idx]['model_list'].append( + _build_page_result(page_idx, pil_img, page_result) + ) - for _ in range(len(pdf_bytes_list)): - infer_results.append([]) + while next_doc_to_emit < len(all_doc_contexts) and doc_end_offsets[next_doc_to_emit] <= processed_images_count: + on_doc_ready(all_doc_contexts[next_doc_to_emit]) + next_doc_to_emit += 1 - for i, page_info in enumerate(all_pages_info): - pdf_idx, page_idx, pil_img, _, _ = page_info - result = results[i] - - page_info_dict = {'page_no': page_idx, 'width': pil_img.width, 'height': pil_img.height} - page_dict = {'layout_dets': result, 'page_info': page_info_dict} - - infer_results[pdf_idx].append(page_dict) - - return infer_results, all_image_lists, all_pdf_docs, lang_list, ocr_enabled_list + infer_time = round(time.time() - infer_start, 2) + if infer_time > 0 and total_pages > 0: + logger.debug(f"infer finished, cost: {infer_time}, speed: {round(total_pages / infer_time, 3)} page/s") + finally: + for context in all_doc_contexts: + if not context['closed']: + _close_doc_context(context) def doc_analyze_low_memory( @@ -252,64 +323,24 @@ def doc_analyze_streaming( if not (len(pdf_bytes_list) == len(image_writer_list) == len(lang_list)): raise ValueError("pdf_bytes_list, image_writer_list, and lang_list must have the same length") - min_batch_inference_size = int(os.environ.get('MINERU_MIN_BATCH_INFERENCE_SIZE', 384)) - - all_pages_info = [] - all_doc_contexts = [] - total_pages = 0 - load_images_start = time.time() - for pdf_idx, (pdf_bytes, image_writer, lang) in enumerate(zip(pdf_bytes_list, image_writer_list, lang_list)): - _ocr_enable = _get_ocr_enable(pdf_bytes, parse_method) - images_list, pdf_doc = load_images_from_pdf(pdf_bytes, image_type=ImageType.PIL) - page_count = len(images_list) - total_pages += page_count - all_doc_contexts.append( - { - 'doc_index': pdf_idx, - 'images_list': images_list, - 'pdf_doc': pdf_doc, - 'page_count': page_count, - 'lang': lang, - 'ocr_enable': _ocr_enable, - 'image_writer': image_writer, - 'model_list': [], - 'closed': False, - } - ) - for page_idx, img_dict in enumerate(images_list): - all_pages_info.append(( - pdf_idx, - page_idx, - img_dict['img_pil'], - _ocr_enable, - lang, - )) - load_images_time = round(time.time() - load_images_start, 2) - if load_images_time > 0 and total_pages > 0: - logger.debug(f"load images cost: {load_images_time}, speed: {round(total_pages / load_images_time, 3)} images/s") - - images_with_extra_info = [(info[2], info[3], info[4]) for info in all_pages_info] - batch_images = [ - images_with_extra_info[i:i + min_batch_inference_size] - for i in range(0, len(images_with_extra_info), min_batch_inference_size) - ] - - doc_end_offsets = [] - cumulative_pages = 0 - for context in all_doc_contexts: - cumulative_pages += context['page_count'] - doc_end_offsets.append(cumulative_pages) - - next_doc_to_emit = 0 - while next_doc_to_emit < len(all_doc_contexts) and doc_end_offsets[next_doc_to_emit] == 0: - context = all_doc_contexts[next_doc_to_emit] + def handle_doc_ready(context): middle_json = init_middle_json() - finalize_middle_json( - middle_json['pdf_info'], - lang=context['lang'], - ocr_enable=context['ocr_enable'], - ) - logger.info( + if context['page_count'] == 0: + finalize_middle_json( + middle_json['pdf_info'], + lang=context['lang'], + ocr_enable=context['ocr_enable'], + ) + else: + middle_json = result_to_middle_json( + context['model_list'], + context['images_list'], + context['pdf_doc'], + image_writer_list[context['doc_index']], + context['lang'], + context['ocr_enable'], + ) + logger.debug( f"Pipeline doc ready: doc{context['doc_index']} pages={context['page_count']}" ) on_doc_ready( @@ -318,66 +349,16 @@ def doc_analyze_streaming( middle_json, context['ocr_enable'], ) - context['pdf_doc'].close() - context['closed'] = True - next_doc_to_emit += 1 + _close_doc_context(context) - processed_images_count = 0 - infer_start = time.time() - try: - for index, batch_image in enumerate(batch_images): - processed_images_count += len(batch_image) - logger.info( - f'Batch {index + 1}/{len(batch_images)}: ' - f'{processed_images_count} pages/{len(images_with_extra_info)} pages' - ) - batch_results = batch_image_analyze(batch_image, formula_enable, table_enable) - batch_start_index = processed_images_count - len(batch_image) - for page_meta, page_result in zip( - all_pages_info[batch_start_index: processed_images_count], - batch_results, - ): - pdf_idx, page_idx, pil_img, _, _ = page_meta - all_doc_contexts[pdf_idx]['model_list'].append( - _build_page_result(page_idx, pil_img, page_result) - ) - - while next_doc_to_emit < len(all_doc_contexts) and doc_end_offsets[next_doc_to_emit] <= processed_images_count: - context = all_doc_contexts[next_doc_to_emit] - middle_json = result_to_middle_json( - context['model_list'], - context['images_list'], - context['pdf_doc'], - context['image_writer'], - context['lang'], - context['ocr_enable'], - ) - logger.info( - f"Pipeline doc ready: doc{context['doc_index']} pages={context['page_count']}" - ) - on_doc_ready( - context['doc_index'], - context['model_list'], - middle_json, - context['ocr_enable'], - ) - _close_images(context['images_list']) - context['images_list'].clear() - context['closed'] = True - next_doc_to_emit += 1 - - infer_time = round(time.time() - infer_start, 2) - if infer_time > 0 and total_pages > 0: - logger.debug(f"infer finished, cost: {infer_time}, speed: {round(total_pages / infer_time, 3)} page/s") - finally: - for context in all_doc_contexts: - if not context['closed']: - try: - context['pdf_doc'].close() - except Exception: - pass - _close_images(context['images_list']) - context['images_list'].clear() + _doc_analyze_streaming_core( + pdf_bytes_list, + lang_list, + handle_doc_ready, + parse_method=parse_method, + formula_enable=formula_enable, + table_enable=table_enable, + ) def doc_analyze_low_memory_multi_streaming( diff --git a/mineru/cli/common.py b/mineru/cli/common.py index 18ad8924..68a342f0 100644 --- a/mineru/cli/common.py +++ b/mineru/cli/common.py @@ -242,7 +242,7 @@ def _process_pipeline( pdf_file_name, local_image_dir, local_md_dir = local_output_info[doc_index] md_writer = md_writer_list[doc_index] pdf_bytes = pdf_bytes_list[doc_index] - logger.info(f"Pipeline output start: doc{doc_index}") + logger.debug(f"Pipeline output start: doc{doc_index}") try: _process_output( middle_json["pdf_info"], pdf_bytes, pdf_file_name, local_md_dir, local_image_dir, @@ -250,14 +250,14 @@ def _process_pipeline( f_dump_md, f_dump_content_list, f_dump_middle_json, f_dump_model_output, f_make_md_mode, middle_json, model_list, process_mode="pipeline" ) - logger.info(f"Pipeline output complete: doc{doc_index}") + logger.debug(f"Pipeline output complete: doc{doc_index}") except Exception: logger.exception(f"Pipeline output failed: doc{doc_index}") raise with ThreadPoolExecutor(max_workers=1) as output_executor: def on_doc_ready(doc_index, model_list, middle_json, ocr_enable): - logger.info( + logger.debug( f"Pipeline doc ready: doc{doc_index} pages={len(middle_json['pdf_info'])} output_submitted=1" ) future = output_executor.submit(run_output_task, doc_index, middle_json, model_list)