feat: enhance logging granularity and improve document context management

This commit is contained in:
myhloli
2026-03-20 10:48:12 +08:00
parent 07701638ed
commit 18d93c5cd3
2 changed files with 157 additions and 176 deletions

View File

@@ -109,6 +109,18 @@ def _build_page_result(page_idx: int, pil_img: Image.Image, layout_dets):
return {'layout_dets': layout_dets, 'page_info': page_info_dict}
def _close_doc_context(context):
if context['closed']:
return
try:
context['pdf_doc'].close()
except Exception:
pass
_close_images(context['images_list'])
context['images_list'].clear()
context['closed'] = True
def _finalize_low_memory_context(context, on_doc_ready):
if context['closed']:
return
@@ -117,7 +129,7 @@ def _finalize_low_memory_context(context, on_doc_ready):
lang=context['lang'],
ocr_enable=context['ocr_enable'],
)
logger.info(
logger.debug(
f"Pipeline doc ready: doc{context['doc_index']} pages={context['page_count']}"
)
on_doc_ready(
@@ -141,84 +153,143 @@ def doc_analyze(
parse_method: str = 'auto',
formula_enable=True,
table_enable=True,
):
infer_results = [None] * len(pdf_bytes_list)
all_image_lists = [None] * len(pdf_bytes_list)
all_pdf_docs = [None] * len(pdf_bytes_list)
ocr_enabled_list = [None] * len(pdf_bytes_list)
def on_doc_ready(context):
doc_index = context['doc_index']
infer_results[doc_index] = context['model_list']
all_image_lists[doc_index] = context['images_list']
all_pdf_docs[doc_index] = context['pdf_doc']
ocr_enabled_list[doc_index] = context['ocr_enable']
context['closed'] = True
try:
_doc_analyze_streaming_core(
pdf_bytes_list,
lang_list,
on_doc_ready,
parse_method=parse_method,
formula_enable=formula_enable,
table_enable=table_enable,
)
except Exception:
for pdf_doc in all_pdf_docs:
if pdf_doc is not None:
try:
pdf_doc.close()
except Exception:
pass
for images_list in all_image_lists:
if images_list is not None:
_close_images(images_list)
images_list.clear()
raise
return infer_results, all_image_lists, all_pdf_docs, lang_list, ocr_enabled_list
def _doc_analyze_streaming_core(
pdf_bytes_list,
lang_list,
on_doc_ready,
parse_method: str = 'auto',
formula_enable=True,
table_enable=True,
):
"""
适当调大MIN_BATCH_INFERENCE_SIZE可以提高性能更大的 MIN_BATCH_INFERENCE_SIZE会消耗更多内存
可通过环境变量MINERU_MIN_BATCH_INFERENCE_SIZE设置默认值为384。
"""
if len(pdf_bytes_list) != len(lang_list):
raise ValueError("pdf_bytes_list and lang_list must have the same length")
min_batch_inference_size = int(os.environ.get('MINERU_MIN_BATCH_INFERENCE_SIZE', 384))
# 收集所有页面信息
all_pages_info = [] # 存储(dataset_index, page_index, img, ocr, lang, width, height)
all_image_lists = []
all_pdf_docs = []
ocr_enabled_list = []
all_pages_info = []
all_doc_contexts = []
total_pages = 0
load_images_start = time.time()
for pdf_idx, pdf_bytes in enumerate(pdf_bytes_list):
# 确定OCR设置
_ocr_enable = False
if parse_method == 'auto':
if classify(pdf_bytes) == 'ocr':
_ocr_enable = True
elif parse_method == 'ocr':
_ocr_enable = True
ocr_enabled_list.append(_ocr_enable)
_lang = lang_list[pdf_idx]
# 收集每个数据集中的页面
for pdf_idx, (pdf_bytes, lang) in enumerate(zip(pdf_bytes_list, lang_list)):
_ocr_enable = _get_ocr_enable(pdf_bytes, parse_method)
images_list, pdf_doc = load_images_from_pdf(pdf_bytes, image_type=ImageType.PIL)
all_image_lists.append(images_list)
all_pdf_docs.append(pdf_doc)
for page_idx in range(len(images_list)):
img_dict = images_list[page_idx]
page_count = len(images_list)
total_pages += page_count
all_doc_contexts.append(
{
'doc_index': pdf_idx,
'images_list': images_list,
'pdf_doc': pdf_doc,
'page_count': page_count,
'lang': lang,
'ocr_enable': _ocr_enable,
'model_list': [],
'closed': False,
}
)
for page_idx, img_dict in enumerate(images_list):
all_pages_info.append((
pdf_idx, page_idx,
img_dict['img_pil'], _ocr_enable, _lang,
pdf_idx,
page_idx,
img_dict['img_pil'],
_ocr_enable,
lang,
))
load_images_time = round(time.time() - load_images_start, 2)
logger.debug(f"load images cost: {load_images_time}, speed: {round(len(all_pages_info) / load_images_time, 3)} images/s")
if load_images_time > 0 and total_pages > 0:
logger.debug(f"load images cost: {load_images_time}, speed: {round(total_pages / load_images_time, 3)} images/s")
# 准备批处理
images_with_extra_info = [(info[2], info[3], info[4]) for info in all_pages_info]
batch_size = min_batch_inference_size
batch_images = [
images_with_extra_info[i:i + batch_size]
for i in range(0, len(images_with_extra_info), batch_size)
images_with_extra_info[i:i + min_batch_inference_size]
for i in range(0, len(images_with_extra_info), min_batch_inference_size)
]
# 执行批处理
results = []
doc_end_offsets = []
cumulative_pages = 0
for context in all_doc_contexts:
cumulative_pages += context['page_count']
doc_end_offsets.append(cumulative_pages)
next_doc_to_emit = 0
processed_images_count = 0
infer_start = time.time()
for index, batch_image in enumerate(batch_images):
processed_images_count += len(batch_image)
logger.info(
f'Batch {index + 1}/{len(batch_images)}: '
f'{processed_images_count} pages/{len(images_with_extra_info)} pages'
)
batch_results = batch_image_analyze(batch_image, formula_enable, table_enable)
results.extend(batch_results)
infer_time = round(time.time() - infer_start, 2)
logger.debug(f"infer finished, cost: {infer_time}, speed: {round(len(results) / infer_time, 3)} page/s")
try:
while next_doc_to_emit < len(all_doc_contexts) and doc_end_offsets[next_doc_to_emit] == 0:
on_doc_ready(all_doc_contexts[next_doc_to_emit])
next_doc_to_emit += 1
# 构建返回结果
infer_results = []
for index, batch_image in enumerate(batch_images):
processed_images_count += len(batch_image)
logger.info(
f'Batch {index + 1}/{len(batch_images)}: '
f'{processed_images_count} pages/{len(images_with_extra_info)} pages'
)
batch_results = batch_image_analyze(batch_image, formula_enable, table_enable)
batch_start_index = processed_images_count - len(batch_image)
for page_meta, page_result in zip(
all_pages_info[batch_start_index: processed_images_count],
batch_results,
):
pdf_idx, page_idx, pil_img, _, _ = page_meta
all_doc_contexts[pdf_idx]['model_list'].append(
_build_page_result(page_idx, pil_img, page_result)
)
for _ in range(len(pdf_bytes_list)):
infer_results.append([])
while next_doc_to_emit < len(all_doc_contexts) and doc_end_offsets[next_doc_to_emit] <= processed_images_count:
on_doc_ready(all_doc_contexts[next_doc_to_emit])
next_doc_to_emit += 1
for i, page_info in enumerate(all_pages_info):
pdf_idx, page_idx, pil_img, _, _ = page_info
result = results[i]
page_info_dict = {'page_no': page_idx, 'width': pil_img.width, 'height': pil_img.height}
page_dict = {'layout_dets': result, 'page_info': page_info_dict}
infer_results[pdf_idx].append(page_dict)
return infer_results, all_image_lists, all_pdf_docs, lang_list, ocr_enabled_list
infer_time = round(time.time() - infer_start, 2)
if infer_time > 0 and total_pages > 0:
logger.debug(f"infer finished, cost: {infer_time}, speed: {round(total_pages / infer_time, 3)} page/s")
finally:
for context in all_doc_contexts:
if not context['closed']:
_close_doc_context(context)
def doc_analyze_low_memory(
@@ -252,64 +323,24 @@ def doc_analyze_streaming(
if not (len(pdf_bytes_list) == len(image_writer_list) == len(lang_list)):
raise ValueError("pdf_bytes_list, image_writer_list, and lang_list must have the same length")
min_batch_inference_size = int(os.environ.get('MINERU_MIN_BATCH_INFERENCE_SIZE', 384))
all_pages_info = []
all_doc_contexts = []
total_pages = 0
load_images_start = time.time()
for pdf_idx, (pdf_bytes, image_writer, lang) in enumerate(zip(pdf_bytes_list, image_writer_list, lang_list)):
_ocr_enable = _get_ocr_enable(pdf_bytes, parse_method)
images_list, pdf_doc = load_images_from_pdf(pdf_bytes, image_type=ImageType.PIL)
page_count = len(images_list)
total_pages += page_count
all_doc_contexts.append(
{
'doc_index': pdf_idx,
'images_list': images_list,
'pdf_doc': pdf_doc,
'page_count': page_count,
'lang': lang,
'ocr_enable': _ocr_enable,
'image_writer': image_writer,
'model_list': [],
'closed': False,
}
)
for page_idx, img_dict in enumerate(images_list):
all_pages_info.append((
pdf_idx,
page_idx,
img_dict['img_pil'],
_ocr_enable,
lang,
))
load_images_time = round(time.time() - load_images_start, 2)
if load_images_time > 0 and total_pages > 0:
logger.debug(f"load images cost: {load_images_time}, speed: {round(total_pages / load_images_time, 3)} images/s")
images_with_extra_info = [(info[2], info[3], info[4]) for info in all_pages_info]
batch_images = [
images_with_extra_info[i:i + min_batch_inference_size]
for i in range(0, len(images_with_extra_info), min_batch_inference_size)
]
doc_end_offsets = []
cumulative_pages = 0
for context in all_doc_contexts:
cumulative_pages += context['page_count']
doc_end_offsets.append(cumulative_pages)
next_doc_to_emit = 0
while next_doc_to_emit < len(all_doc_contexts) and doc_end_offsets[next_doc_to_emit] == 0:
context = all_doc_contexts[next_doc_to_emit]
def handle_doc_ready(context):
middle_json = init_middle_json()
finalize_middle_json(
middle_json['pdf_info'],
lang=context['lang'],
ocr_enable=context['ocr_enable'],
)
logger.info(
if context['page_count'] == 0:
finalize_middle_json(
middle_json['pdf_info'],
lang=context['lang'],
ocr_enable=context['ocr_enable'],
)
else:
middle_json = result_to_middle_json(
context['model_list'],
context['images_list'],
context['pdf_doc'],
image_writer_list[context['doc_index']],
context['lang'],
context['ocr_enable'],
)
logger.debug(
f"Pipeline doc ready: doc{context['doc_index']} pages={context['page_count']}"
)
on_doc_ready(
@@ -318,66 +349,16 @@ def doc_analyze_streaming(
middle_json,
context['ocr_enable'],
)
context['pdf_doc'].close()
context['closed'] = True
next_doc_to_emit += 1
_close_doc_context(context)
processed_images_count = 0
infer_start = time.time()
try:
for index, batch_image in enumerate(batch_images):
processed_images_count += len(batch_image)
logger.info(
f'Batch {index + 1}/{len(batch_images)}: '
f'{processed_images_count} pages/{len(images_with_extra_info)} pages'
)
batch_results = batch_image_analyze(batch_image, formula_enable, table_enable)
batch_start_index = processed_images_count - len(batch_image)
for page_meta, page_result in zip(
all_pages_info[batch_start_index: processed_images_count],
batch_results,
):
pdf_idx, page_idx, pil_img, _, _ = page_meta
all_doc_contexts[pdf_idx]['model_list'].append(
_build_page_result(page_idx, pil_img, page_result)
)
while next_doc_to_emit < len(all_doc_contexts) and doc_end_offsets[next_doc_to_emit] <= processed_images_count:
context = all_doc_contexts[next_doc_to_emit]
middle_json = result_to_middle_json(
context['model_list'],
context['images_list'],
context['pdf_doc'],
context['image_writer'],
context['lang'],
context['ocr_enable'],
)
logger.info(
f"Pipeline doc ready: doc{context['doc_index']} pages={context['page_count']}"
)
on_doc_ready(
context['doc_index'],
context['model_list'],
middle_json,
context['ocr_enable'],
)
_close_images(context['images_list'])
context['images_list'].clear()
context['closed'] = True
next_doc_to_emit += 1
infer_time = round(time.time() - infer_start, 2)
if infer_time > 0 and total_pages > 0:
logger.debug(f"infer finished, cost: {infer_time}, speed: {round(total_pages / infer_time, 3)} page/s")
finally:
for context in all_doc_contexts:
if not context['closed']:
try:
context['pdf_doc'].close()
except Exception:
pass
_close_images(context['images_list'])
context['images_list'].clear()
_doc_analyze_streaming_core(
pdf_bytes_list,
lang_list,
handle_doc_ready,
parse_method=parse_method,
formula_enable=formula_enable,
table_enable=table_enable,
)
def doc_analyze_low_memory_multi_streaming(

View File

@@ -242,7 +242,7 @@ def _process_pipeline(
pdf_file_name, local_image_dir, local_md_dir = local_output_info[doc_index]
md_writer = md_writer_list[doc_index]
pdf_bytes = pdf_bytes_list[doc_index]
logger.info(f"Pipeline output start: doc{doc_index}")
logger.debug(f"Pipeline output start: doc{doc_index}")
try:
_process_output(
middle_json["pdf_info"], pdf_bytes, pdf_file_name, local_md_dir, local_image_dir,
@@ -250,14 +250,14 @@ def _process_pipeline(
f_dump_md, f_dump_content_list, f_dump_middle_json, f_dump_model_output,
f_make_md_mode, middle_json, model_list, process_mode="pipeline"
)
logger.info(f"Pipeline output complete: doc{doc_index}")
logger.debug(f"Pipeline output complete: doc{doc_index}")
except Exception:
logger.exception(f"Pipeline output failed: doc{doc_index}")
raise
with ThreadPoolExecutor(max_workers=1) as output_executor:
def on_doc_ready(doc_index, model_list, middle_json, ocr_enable):
logger.info(
logger.debug(
f"Pipeline doc ready: doc{doc_index} pages={len(middle_json['pdf_info'])} output_submitted=1"
)
future = output_executor.submit(run_output_task, doc_index, middle_json, model_list)