From 882a0ee72f9bf2fc347b101d7151fccfcaec1a93 Mon Sep 17 00:00:00 2001 From: myhloli Date: Fri, 20 Mar 2026 16:52:51 +0800 Subject: [PATCH] feat: refactor PDF image processing and enhance markdown rendering for visual blocks --- .../pipeline_middle_json_mkcontent.py | 362 ++++++++++++------ mineru/utils/pdf_image_tools.py | 24 +- 2 files changed, 262 insertions(+), 124 deletions(-) diff --git a/mineru/backend/pipeline/pipeline_middle_json_mkcontent.py b/mineru/backend/pipeline/pipeline_middle_json_mkcontent.py index 8a80b0da..dbd69de5 100644 --- a/mineru/backend/pipeline/pipeline_middle_json_mkcontent.py +++ b/mineru/backend/pipeline/pipeline_middle_json_mkcontent.py @@ -15,7 +15,13 @@ def make_blocks_to_markdown(paras_of_layout, for para_block in paras_of_layout: para_text = '' para_type = para_block['type'] - if para_type in [BlockType.TEXT, BlockType.LIST, BlockType.INDEX]: + if para_type in [ + BlockType.TEXT, + BlockType.LIST, + BlockType.INDEX, + BlockType.ABSTRACT, + BlockType.REF_TEXT + ]: para_text = merge_para_with_text(para_block) elif para_type == BlockType.TITLE: title_level = get_title_level(para_block) @@ -31,64 +37,127 @@ def make_blocks_to_markdown(paras_of_layout, if mode == MakeMode.NLP_MD: continue elif mode == MakeMode.MM_MD: - # 检测是否存在图片脚注 - has_image_footnote = any(block['type'] == BlockType.IMAGE_FOOTNOTE for block in para_block['blocks']) - # 如果存在图片脚注,则将图片脚注拼接到图片正文后面 - if has_image_footnote: - for block in para_block['blocks']: # 1st.拼image_caption - if block['type'] == BlockType.IMAGE_CAPTION: - para_text += merge_para_with_text(block) + ' \n' - for block in para_block['blocks']: # 2nd.拼image_body - if block['type'] == BlockType.IMAGE_BODY: - for line in block['lines']: - for span in line['spans']: - if span['type'] == ContentType.IMAGE: - if span.get('image_path', ''): - para_text += f"![]({img_buket_path}/{span['image_path']})" - for block in para_block['blocks']: # 3rd.拼image_footnote - if block['type'] == BlockType.IMAGE_FOOTNOTE: - para_text += ' \n' + merge_para_with_text(block) - else: - for block in para_block['blocks']: # 1st.拼image_body - if block['type'] == BlockType.IMAGE_BODY: - for line in block['lines']: - for span in line['spans']: - if span['type'] == ContentType.IMAGE: - if span.get('image_path', ''): - para_text += f"![]({img_buket_path}/{span['image_path']})" - for block in para_block['blocks']: # 2nd.拼image_caption - if block['type'] == BlockType.IMAGE_CAPTION: - para_text += ' \n' + merge_para_with_text(block) + para_text = merge_visual_blocks_to_markdown(para_block, img_buket_path) + elif para_type == BlockType.CHART: + if mode == MakeMode.NLP_MD: + continue + elif mode == MakeMode.MM_MD: + para_text = merge_visual_blocks_to_markdown(para_block, img_buket_path) elif para_type == BlockType.TABLE: if mode == MakeMode.NLP_MD: continue elif mode == MakeMode.MM_MD: - for block in para_block['blocks']: # 1st.拼table_caption - if block['type'] == BlockType.TABLE_CAPTION: - para_text += merge_para_with_text(block) + ' \n' - for block in para_block['blocks']: # 2nd.拼table_body - if block['type'] == BlockType.TABLE_BODY: - for line in block['lines']: - for span in line['spans']: - if span['type'] == ContentType.TABLE: - # if processed by table model - if span.get('html', ''): - para_text += f"\n{span['html']}\n" - elif span.get('image_path', ''): - para_text += f"![]({img_buket_path}/{span['image_path']})" - for block in para_block['blocks']: # 3rd.拼table_footnote - if block['type'] == BlockType.TABLE_FOOTNOTE: - para_text += '\n' + merge_para_with_text(block) + ' ' + para_text = merge_visual_blocks_to_markdown(para_block, img_buket_path) + elif para_type == BlockType.ALGORITHM: + para_text = merge_visual_blocks_to_markdown(para_block) if para_text.strip() == '': continue else: - # page_markdown.append(para_text.strip() + ' ') page_markdown.append(para_text.strip()) return page_markdown +def merge_visual_blocks_to_markdown(para_block, img_buket_path=''): + # 将 image/table/algorithm 这类视觉块的子 block 按阅读顺序拼接成 markdown。 + # 这里不再写死 caption/body/footnote 的优先级,而是先展开成 segment, + # 再根据 markdown_line / html_block 两类片段决定分隔方式。 + rendered_segments = [] + + for block in get_blocks_in_index_order(para_block.get('blocks', [])): + rendered_segments.extend(render_visual_block_segments(block, img_buket_path)) + + para_text = '' + prev_segment_kind = None + for segment_text, segment_kind in rendered_segments: + if para_text: + para_text += get_visual_block_separator(prev_segment_kind, segment_kind) + para_text += segment_text + prev_segment_kind = segment_kind + + return para_text + + +def get_blocks_in_index_order(blocks): + # 按 middle json 中的 index 排序子 block; + # 如果 index 缺失,则退化为保持原始顺序。 + return [ + block + for _, block in sorted( + enumerate(blocks), + key=lambda item: (item[1].get('index', float('inf')), item[0]), + ) + ] + + +def render_visual_block_segments(block, img_buket_path=''): + # 将单个视觉子 block 渲染成一个或多个 segment。 + # 文本类子块统一输出 markdown_line; + # table 的 html 输出为 html_block,供后续决定是否需要空行隔开。 + block_type = block['type'] + + if block_type in [ + BlockType.IMAGE_CAPTION, + BlockType.IMAGE_FOOTNOTE, + BlockType.TABLE_CAPTION, + BlockType.TABLE_FOOTNOTE, + BlockType.ALGORITHM_BODY, + BlockType.ALGORITHM_CAPTION, + BlockType.ALGORITHM_FOOTNOTE, + BlockType.CHART_CAPTION, + BlockType.CHART_FOOTNOTE, + ]: + block_text = merge_para_with_text(block) + if block_text.strip(): + return [(block_text, 'markdown_line')] + return [] + + if block_type == BlockType.IMAGE_BODY: + return [ + (f"![]({img_buket_path}/{span['image_path']})", 'markdown_line') + for line in block['lines'] + for span in line['spans'] + if span['type'] == ContentType.IMAGE and span.get('image_path', '') + ] + + if block_type == BlockType.CHART_BODY: + return [ + (f"![]({img_buket_path}/{span['image_path']})", 'markdown_line') + for line in block['lines'] + for span in line['spans'] + if span['type'] == ContentType.CHART and span.get('image_path', '') + ] + + if block_type == BlockType.TABLE_BODY: + rendered_segments = [] + for line in block['lines']: + for span in line['spans']: + if span['type'] != ContentType.TABLE: + continue + if span.get('html', ''): + rendered_segments.append((span['html'], 'html_block')) + elif span.get('image_path', ''): + rendered_segments.append((f"![]({img_buket_path}/{span['image_path']})", 'markdown_line')) + return rendered_segments + + return [] + + +def get_visual_block_separator(prev_segment_kind, current_segment_kind): + # 根据前后 segment 类型决定分隔符: + # 1. 普通 markdown 行之间用 hard break(" \\n") + # 2. 进入 html block 前只换一行 + # 3. html block 后必须留空行,否则后续文本仍会被当作 html 块内容 + if prev_segment_kind == 'html_block': + # Raw HTML blocks need a blank line after them, otherwise the following + # markdown text is still treated as part of the HTML block. + return '\n\n' + if current_segment_kind == 'html_block': + return '\n' + return ' \n' + + latex_delimiters_config = get_latex_delimiter_config() default_delimiters = { @@ -103,80 +172,141 @@ display_right_delimiter = delimiters['display']['right'] inline_left_delimiter = delimiters['inline']['left'] inline_right_delimiter = delimiters['inline']['right'] +CJK_LANGS = {'zh', 'ja', 'ko'} + + def merge_para_with_text(para_block): - block_text = '' - for line in para_block['lines']: - for span in line['spans']: - if span['type'] in [ContentType.TEXT]: - span['content'] = full_to_half_exclude_marks(span['content']) - block_text += span['content'] - block_lang = detect_lang(block_text) + # 将普通文本段落 block 渲染成 markdown 字符串。 + # 处理流程分为三层: + # 1. 先收集文本内容做语言检测 + # 2. 再把每个 span 渲染成 markdown 片段 + # 3. 最后按语言和上下文决定 span 之间如何补空格/换行 + block_lang = detect_lang(_collect_text_for_lang_detection(para_block)) + para_parts = [] - para_text = '' - for i, line in enumerate(para_block['lines']): + for line_idx, line in enumerate(para_block['lines']): + line_prefix = _line_prefix(line_idx, line) + if line_prefix: + para_parts.append(line_prefix) - if i >= 1 and line.get(ListLineTag.IS_LIST_START_LINE, False): - para_text += ' \n' - - for j, span in enumerate(line['spans']): - - span_type = span['type'] - content = '' - if span_type == ContentType.TEXT: - content = escape_special_markdown_char(span['content']) - elif span_type == ContentType.INLINE_EQUATION: - if span.get('content', ''): - content = f"{inline_left_delimiter}{span['content']}{inline_right_delimiter}" - elif span_type == ContentType.INTERLINE_EQUATION: - if span.get('content', ''): - content = f"\n{display_left_delimiter}\n{span['content']}\n{display_right_delimiter}\n" - - content = content.strip() - - if content: - - if span_type == ContentType.INTERLINE_EQUATION: - para_text += content - continue - - # 定义CJK语言集合(中日韩) - cjk_langs = {'zh', 'ja', 'ko'} - # logger.info(f'block_lang: {block_lang}, content: {content}') - - # 判断是否为行末span - is_last_span = j == len(line['spans']) - 1 - - if block_lang in cjk_langs: # 中文/日语/韩文语境下,换行不需要空格分隔,但是如果是行内公式结尾,还是要加空格 - if is_last_span and span_type not in [ContentType.INLINE_EQUATION]: - para_text += content - else: - para_text += f'{content} ' - else: - # 西方文本语境下 每行的最后一个span判断是否要去除连字符 - if span_type in [ContentType.TEXT, ContentType.INLINE_EQUATION]: - # 如果span是line的最后一个且末尾带有-连字符,那么末尾不应该加空格,同时应该把-删除 - if ( - is_last_span - and span_type == ContentType.TEXT - and is_hyphen_at_line_end(content) - ): - # 如果下一行的第一个span是小写字母开头,删除连字符 - if ( - i + 1 < len(para_block['lines']) - and para_block['lines'][i + 1].get('spans') - and para_block['lines'][i + 1]['spans'][0].get('type') == ContentType.TEXT - and para_block['lines'][i + 1]['spans'][0].get('content', '') - and para_block['lines'][i + 1]['spans'][0]['content'][0].islower() - ): - para_text += content[:-1] - else: # 如果没有下一行,或者下一行的第一个span不是小写字母开头,则保留连字符但不加空格 - para_text += content - else: # 西方文本语境下 content间需要空格分隔 - para_text += f'{content} ' - else: + for span_idx, span in enumerate(line['spans']): + rendered_span = _render_span(span) + if rendered_span is None: continue - return para_text + span_type, content = rendered_span + content, suffix = _join_rendered_span( + para_block, + block_lang, + line, + line_idx, + span_idx, + span_type, + content, + ) + para_parts.append(content) + if suffix: + para_parts.append(suffix) + + return ''.join(para_parts).rstrip() + + +def _collect_text_for_lang_detection(para_block): + # 只收集 TEXT span 的内容,用于语言检测。 + # 这里会先做全角转半角,但不会修改原始输入数据。 + block_text_parts = [] + for line in para_block['lines']: + for span in line['spans']: + if span['type'] == ContentType.TEXT: + block_text_parts.append(_normalize_text_content(span.get('content', ''))) + return ''.join(block_text_parts) + + +def _normalize_text_content(content): + # 对原始文本做统一归一化,当前只负责全角转半角。 + # 单独拆出来是为了让语言检测和渲染阶段复用同一套文本预处理。 + return full_to_half_exclude_marks(content or '') + + +def _render_span(span): + # 将单个 span 渲染成 markdown 片段。 + # 这里只负责“渲染成什么文本”,不决定后面是否补空格。 + span_type = span['type'] + content = '' + + if span_type == ContentType.TEXT: + content = escape_special_markdown_char(_normalize_text_content(span.get('content', ''))) + elif span_type == ContentType.INLINE_EQUATION: + if span.get('content', ''): + content = f"{inline_left_delimiter}{span['content']}{inline_right_delimiter}" + elif span_type == ContentType.INTERLINE_EQUATION: + if span.get('content', ''): + content = f"\n{display_left_delimiter}\n{span['content']}\n{display_right_delimiter}\n" + else: + return None + + content = content.strip() + if not content: + return None + + return span_type, content + + +def _join_rendered_span(para_block, block_lang, line, line_idx, span_idx, span_type, content): + # 根据语言和上下文决定当前 span 后面的分隔符。 + # 这里集中处理: + # 1. CJK 与西文的空格差异 + # 2. 西文行尾连字符是否需要跨行合并 + # 3. 独立公式是否作为块内容直接插入 + if span_type == ContentType.INTERLINE_EQUATION: + return content, '' + + is_last_span = span_idx == len(line['spans']) - 1 + + if block_lang in CJK_LANGS: + if is_last_span and span_type != ContentType.INLINE_EQUATION: + return content, '' + return content, ' ' + + if span_type not in [ContentType.TEXT, ContentType.INLINE_EQUATION]: + return content, '' + + if ( + is_last_span + and span_type == ContentType.TEXT + and is_hyphen_at_line_end(content) + ): + if _next_line_starts_with_lowercase_text(para_block, line_idx): + return content[:-1], '' + return content, '' + + return content, ' ' + + +def _line_prefix(line_idx, line): + # 处理进入新 list item 前的 block 级换行。 + # 这里保留历史语义:list 起始行前插入一个 hard break。 + if line_idx >= 1 and line.get(ListLineTag.IS_LIST_START_LINE, False): + return ' \n' + return '' + + +def _next_line_starts_with_lowercase_text(para_block, line_idx): + # 判断下一行是否以小写英文文本开头。 + # 这个条件用于决定西文行尾的连字符是否应与下一行合并。 + if line_idx + 1 >= len(para_block['lines']): + return False + + next_line_spans = para_block['lines'][line_idx + 1].get('spans') + if not next_line_spans: + return False + + next_span = next_line_spans[0] + if next_span.get('type') != ContentType.TEXT: + return False + + next_content = _normalize_text_content(next_span.get('content', '')) + return bool(next_content) and next_content[0].islower() def make_blocks_to_content_list(para_block, img_buket_path, page_idx, page_size): @@ -311,4 +441,4 @@ def escape_special_markdown_char(content): for char in special_chars: content = content.replace(char, "\\" + char) - return content \ No newline at end of file + return content diff --git a/mineru/utils/pdf_image_tools.py b/mineru/utils/pdf_image_tools.py index bf71b3f9..87c170e4 100644 --- a/mineru/utils/pdf_image_tools.py +++ b/mineru/utils/pdf_image_tools.py @@ -21,12 +21,20 @@ from mineru.utils.pdf_page_id import get_end_page_id from concurrent.futures import ProcessPoolExecutor, wait, ALL_COMPLETED -def pdf_page_to_image(page: pdfium.PdfPage, dpi=200, image_type=ImageType.PIL) -> dict: +DEFAULT_PDF_IMAGE_DPI = 200 +# DEFAULT_PDF_IMAGE_DPI = 144 + + +def pdf_page_to_image( + page: pdfium.PdfPage, + dpi=DEFAULT_PDF_IMAGE_DPI, + image_type=ImageType.PIL, +) -> dict: """Convert pdfium.PdfDocument to image, Then convert the image to base64. Args: page (_type_): pdfium.PdfPage - dpi (int, optional): reset the dpi of dpi. Defaults to 200. + dpi (int, optional): reset the dpi of dpi. Defaults to DEFAULT_PDF_IMAGE_DPI. image_type (ImageType, optional): The type of image to return. Defaults to ImageType.PIL. Returns: @@ -55,7 +63,7 @@ def _load_images_from_pdf_worker( def load_images_from_pdf( pdf_bytes: bytes, - dpi=200, + dpi=DEFAULT_PDF_IMAGE_DPI, start_page_id=0, end_page_id=None, image_type=ImageType.PIL, @@ -66,7 +74,7 @@ def load_images_from_pdf( Args: pdf_bytes (bytes): PDF 文件的 bytes - dpi (int, optional): reset the dpi of dpi. Defaults to 200. + dpi (int, optional): reset the dpi of dpi. Defaults to DEFAULT_PDF_IMAGE_DPI. start_page_id (int, optional): 起始页码. Defaults to 0. end_page_id (int | None, optional): 结束页码. Defaults to None. image_type (ImageType, optional): 图片类型. Defaults to ImageType.PIL. @@ -195,7 +203,7 @@ def _terminate_executor_processes(executor): def load_images_from_pdf_core( pdf_bytes: bytes, - dpi=200, + dpi=DEFAULT_PDF_IMAGE_DPI, start_page_id=0, end_page_id=None, image_type=ImageType.PIL, # PIL or BASE64 @@ -218,7 +226,7 @@ def load_images_from_pdf_core( def load_images_from_pdf_doc( pdf_doc: pdfium.PdfDocument, - dpi=200, + dpi=DEFAULT_PDF_IMAGE_DPI, start_page_id=0, end_page_id=None, image_type=ImageType.PIL, @@ -305,12 +313,12 @@ def images_bytes_to_pdf_bytes(image_bytes): # 第一张图保存为 PDF,其余追加 # Keep image inputs at the same raster size when CLI later renders the - # wrapper PDF at 200 dpi; PIL defaults to 72 dpi for PDF output, which + # wrapper PDF at the default DPI; PIL defaults to 72 dpi for PDF output, which # would upscale the image and noticeably hurt seal OCR detection quality. image.save( pdf_buffer, format="PDF", - resolution=200.0, + resolution=float(DEFAULT_PDF_IMAGE_DPI), # save_all=True )