diff --git a/mineru/backend/office/office_middle_json_mkcontent.py b/mineru/backend/office/office_middle_json_mkcontent.py
index 22321470..67559a0d 100644
--- a/mineru/backend/office/office_middle_json_mkcontent.py
+++ b/mineru/backend/office/office_middle_json_mkcontent.py
@@ -1,5 +1,6 @@
import os
import re
+import unicodedata
from html import escape
from loguru import logger
@@ -24,6 +25,7 @@ inline_right_delimiter = delimiters['inline']['right']
OFFICE_STYLE_RENDER_MODE_ENV = 'MINERU_OFFICE_STYLE_RENDER_MODE'
OFFICE_STYLE_RENDER_MODE_HTML = 'html'
OFFICE_STYLE_RENDER_MODE_MARKDOWN = 'markdown'
+OFFICE_MARKDOWN_WRAPPER_STYLES = {'bold', 'italic', 'strikethrough'}
def _apply_markdown_style(content: str, style: list) -> str:
@@ -162,14 +164,187 @@ def get_title_level(para_block):
return title_level
+def _make_rendered_part(
+ span_type,
+ rendered_content: str,
+ raw_content: str = '',
+ style: list | None = None,
+ has_markdown_wrapper: bool = False,
+):
+ return {
+ 'span_type': span_type,
+ 'rendered_content': rendered_content,
+ 'raw_content': raw_content,
+ 'style': style or [],
+ 'has_markdown_wrapper': has_markdown_wrapper,
+ }
+
+
+def _has_markdown_wrapper(style: list) -> bool:
+ if _get_office_style_render_mode() != OFFICE_STYLE_RENDER_MODE_MARKDOWN:
+ return False
+ if not style or 'underline' in style:
+ return False
+ return any(name in OFFICE_MARKDOWN_WRAPPER_STYLES for name in style)
+
+
+def _get_first_non_whitespace_char(text: str):
+ for ch in text:
+ if not ch.isspace():
+ return ch
+ return None
+
+
+def _get_last_non_whitespace_char(text: str):
+ for ch in reversed(text):
+ if not ch.isspace():
+ return ch
+ return None
+
+
+def _is_punctuation_or_symbol(ch: str) -> bool:
+ return unicodedata.category(ch).startswith(('P', 'S'))
+
+
+def _is_boundary_text_char(ch: str) -> bool:
+ if ch.isspace():
+ return False
+ return not _is_punctuation_or_symbol(ch)
+
+
+def _needs_markdown_it_boundary_space(prev_part: dict, next_part: dict) -> bool:
+ if _get_office_style_render_mode() != OFFICE_STYLE_RENDER_MODE_MARKDOWN:
+ return False
+ if not prev_part.get('has_markdown_wrapper', False):
+ return False
+ if next_part.get('span_type') in {
+ ContentType.HYPERLINK,
+ ContentType.INLINE_EQUATION,
+ ContentType.INTERLINE_EQUATION,
+ }:
+ return False
+
+ prev_raw = prev_part.get('raw_content', '')
+ next_raw = next_part.get('raw_content', '')
+ if not prev_raw.strip() or not next_raw.strip():
+ return False
+ if prev_raw[-1].isspace() or next_raw[0].isspace():
+ return False
+
+ prev_char = _get_last_non_whitespace_char(prev_raw)
+ next_char = _get_first_non_whitespace_char(next_raw)
+ if prev_char is None or next_char is None:
+ return False
+ if not _is_punctuation_or_symbol(prev_char):
+ return False
+ if not _is_boundary_text_char(next_char):
+ return False
+ return True
+
+
+def _join_rendered_parts(parts: list[dict]) -> str:
+ para_text = ''
+ prev_part = None
+
+ for i, part in enumerate(parts):
+ span_type = part['span_type']
+ content = part['rendered_content']
+ is_last = i == len(parts) - 1
+
+ if span_type == ContentType.INLINE_EQUATION:
+ if para_text and not para_text.endswith(' '):
+ para_text += ' '
+ para_text += content
+ if not is_last:
+ para_text += ' '
+ else:
+ if prev_part is not None and _needs_markdown_it_boundary_space(prev_part, part):
+ para_text += ' '
+ para_text += content
+
+ prev_part = part
+
+ return para_text
+
+
+def _append_text_part(parts: list[dict], original_content: str, span_style: list):
+ content_stripped = original_content.strip()
+ if content_stripped:
+ styled = _apply_configured_style(content_stripped, span_style)
+ leading = original_content[:len(original_content) - len(original_content.lstrip())]
+ trailing = original_content[len(original_content.rstrip()):]
+ parts.append(
+ _make_rendered_part(
+ ContentType.TEXT,
+ leading + styled + trailing,
+ raw_content=original_content,
+ style=span_style,
+ has_markdown_wrapper=_has_markdown_wrapper(span_style),
+ )
+ )
+ elif original_content:
+ visible_styles = {'underline', 'strikethrough'}
+ if span_style and any(s in visible_styles for s in span_style):
+ rendered_content = original_content.replace(" ", " ")
+ rendered_content = _apply_configured_style(rendered_content, span_style)
+ else:
+ rendered_content = original_content
+ parts.append(
+ _make_rendered_part(
+ ContentType.TEXT,
+ rendered_content,
+ raw_content=original_content,
+ style=span_style,
+ )
+ )
+
+
+def _append_hyperlink_part(
+ parts: list[dict],
+ original_content: str,
+ span_style: list,
+ url: str = '',
+ plain_text_only: bool = False,
+):
+ link_text = original_content.strip()
+ if not link_text:
+ return
+
+ styled_text = _apply_configured_style(link_text, span_style)
+ if plain_text_only:
+ leading = original_content[:len(original_content) - len(original_content.lstrip())]
+ trailing = original_content[len(original_content.rstrip()):]
+ rendered_content = leading + styled_text + trailing
+ has_markdown_wrapper = _has_markdown_wrapper(span_style)
+ else:
+ rendered_content = _render_link(styled_text, url)
+ has_markdown_wrapper = False
+
+ parts.append(
+ _make_rendered_part(
+ ContentType.HYPERLINK,
+ rendered_content,
+ raw_content=original_content,
+ style=span_style,
+ has_markdown_wrapper=has_markdown_wrapper,
+ )
+ )
+
+
def merge_para_with_text(para_block):
- # First pass: collect all non-empty (span_type, content) parts
+ # First pass: collect rendered parts with raw boundary metadata.
parts = []
if para_block['type'] == BlockType.TITLE:
if para_block.get('is_numbered_style', False):
section_number = para_block.get('section_number', '')
if section_number:
- parts.append((ContentType.TEXT, f"{section_number} "))
+ parts.append(
+ _make_rendered_part(
+ ContentType.TEXT,
+ f"{section_number} ",
+ raw_content=f"{section_number} ",
+ )
+ )
for line in para_block['lines']:
for span in line['spans']:
@@ -177,57 +352,38 @@ def merge_para_with_text(para_block):
span_style = span.get('style', [])
if span_type == ContentType.TEXT:
- original_content = span['content']
- content_stripped = original_content.strip()
- if content_stripped:
- styled = _apply_configured_style(content_stripped, span_style)
- leading = original_content[:len(original_content) - len(original_content.lstrip())]
- trailing = original_content[len(original_content.rstrip()):]
- parts.append((span_type, leading + styled + trailing))
- elif original_content:
- # Whitespace-only span: apply visible styles if present,
- # otherwise preserve as spacing between styled parts
- _visible = {'underline', 'strikethrough'}
- if span_style and any(s in _visible for s in span_style):
- # 将original_content替换为
- original_content = original_content.replace(" ", " ")
- styled = _apply_configured_style(original_content, span_style)
- parts.append((span_type, styled))
- else:
- parts.append((span_type, original_content))
+ _append_text_part(parts, span['content'], span_style)
elif span_type == ContentType.INLINE_EQUATION:
content = f"{inline_left_delimiter}{span['content']}{inline_right_delimiter}"
content = content.strip()
if content:
- parts.append((span_type, content))
+ parts.append(
+ _make_rendered_part(
+ span_type,
+ content,
+ raw_content=span['content'],
+ )
+ )
elif span_type == ContentType.INTERLINE_EQUATION:
content = f"\n{display_left_delimiter}\n{span['content']}\n{display_right_delimiter}\n"
content = content.strip()
if content:
- parts.append((span_type, content))
+ parts.append(
+ _make_rendered_part(
+ span_type,
+ content,
+ raw_content=span['content'],
+ )
+ )
elif span_type == ContentType.HYPERLINK:
- link_text = span['content'].strip()
- if link_text:
- link_text = _apply_configured_style(link_text, span_style)
- content = _render_link(link_text, span.get('url', ''))
- parts.append((span_type, content))
+ _append_hyperlink_part(
+ parts,
+ span['content'],
+ span_style,
+ url=span.get('url', ''),
+ )
- # Second pass: join parts, keeping one space on each side of inline equations
- para_text = ''
- for i, (span_type, content) in enumerate(parts):
- is_last = i == len(parts) - 1
- if span_type == ContentType.INLINE_EQUATION:
- # Ensure one space before the equation (if there is preceding text)
- if para_text and not para_text.endswith(' '):
- para_text += ' '
- para_text += content
- # Ensure one space after the equation, unless it is the last part
- if not is_last:
- para_text += ' '
- else:
- para_text += content
-
- return para_text
+ return _join_rendered_parts(parts)
def _flatten_list_items(list_block):
@@ -416,37 +572,29 @@ def _flatten_index_items(index_block):
if item_text:
item_text = _apply_configured_style(item_text, uniform_style)
else:
- raw_parts = []
+ rendered_parts = []
for content, span_type, span_style in stripped_span_items:
if not content:
continue
if span_type == ContentType.INLINE_EQUATION:
- # Wrap inline equations with configured delimiters
- raw_parts.append(
- f'{inline_left_delimiter}{content}{inline_right_delimiter}'
+ rendered_parts.append(
+ _make_rendered_part(
+ span_type,
+ f'{inline_left_delimiter}{content}{inline_right_delimiter}',
+ raw_content=content,
+ )
)
elif span_type == ContentType.HYPERLINK:
- # TOC hyperlinks use document-internal bookmark refs; output
- # only the styled display text without the URL.
- link_text = content.strip()
- if link_text:
- link_text = _apply_configured_style(link_text, span_style)
- leading = content[:len(content) - len(content.lstrip())]
- trailing = content[len(content.rstrip()):]
- raw_parts.append(leading + link_text + trailing)
+ _append_hyperlink_part(
+ rendered_parts,
+ content,
+ span_style,
+ plain_text_only=True,
+ )
else:
- # TEXT span: apply markdown style while preserving
- # surrounding whitespace (e.g. leading space after section #).
- stripped = content.strip()
- if stripped:
- styled = _apply_configured_style(stripped, span_style)
- leading = content[:len(content) - len(content.lstrip())]
- trailing = content[len(content.rstrip()):]
- raw_parts.append(leading + styled + trailing)
- elif content:
- raw_parts.append(content)
+ _append_text_part(rendered_parts, content, span_style)
- item_text = ''.join(raw_parts).strip()
+ item_text = _join_rendered_parts(rendered_parts).strip()
if not item_text:
continue
diff --git a/mineru/backend/pipeline/pipeline_middle_json_mkcontent.py b/mineru/backend/pipeline/pipeline_middle_json_mkcontent.py
index 0fe70ff4..ef7cede3 100644
--- a/mineru/backend/pipeline/pipeline_middle_json_mkcontent.py
+++ b/mineru/backend/pipeline/pipeline_middle_json_mkcontent.py
@@ -6,7 +6,7 @@ from loguru import logger
from mineru.utils.char_utils import full_to_half_exclude_marks, is_hyphen_at_line_end
from mineru.utils.config_reader import get_latex_delimiter_config
from mineru.backend.pipeline.para_split import ListLineTag
-from mineru.utils.enum_class import BlockType, ContentType, MakeMode
+from mineru.utils.enum_class import BlockType, ContentType, ContentTypeV2, MakeMode
from mineru.utils.language import detect_lang
@@ -401,23 +401,222 @@ def _next_line_starts_with_lowercase_text(para_block, line_idx):
return bool(next_content) and next_content[0].islower()
+def merge_adjacent_ref_text_blocks_for_content(para_blocks):
+ merged_blocks = []
+ ref_group = []
+
+ def flush_ref_group():
+ nonlocal ref_group
+ if not ref_group:
+ return
+ if len(ref_group) == 1:
+ merged_blocks.append(ref_group[0])
+ else:
+ merged_blocks.append({
+ 'type': BlockType.REF_TEXT,
+ 'blocks': list(ref_group),
+ 'bbox': ref_group[0].get('bbox'),
+ })
+ ref_group = []
+
+ for para_block in para_blocks or []:
+ if para_block.get('type') == BlockType.REF_TEXT:
+ ref_group.append(para_block)
+ continue
+
+ flush_ref_group()
+ merged_blocks.append(para_block)
+
+ flush_ref_group()
+ return merged_blocks
+
+
+def _build_bbox(para_bbox, page_size):
+ if not para_bbox or not page_size:
+ return None
+
+ page_width, page_height = page_size
+ x0, y0, x1, y1 = para_bbox
+ return [
+ int(x0 * 1000 / page_width),
+ int(y0 * 1000 / page_height),
+ int(x1 * 1000 / page_width),
+ int(y1 * 1000 / page_height),
+ ]
+
+
+def _get_seal_span(para_block):
+ for line in para_block.get('lines', []):
+ for span in line.get('spans', []):
+ if span.get('type') == ContentType.SEAL:
+ return span
+ return None
+
+
+def _get_seal_text(para_block):
+ seal_span = _get_seal_span(para_block)
+ if not seal_span:
+ return ''
+
+ content = seal_span.get('content', '')
+ if isinstance(content, list):
+ return ' '.join(str(item) for item in content if str(item).strip())
+ if isinstance(content, str):
+ return content.strip()
+ return ''
+
+
+def _get_ref_text_item_blocks(para_block):
+ return para_block.get('blocks') or [para_block]
+
+
+def _split_list_item_blocks(para_block):
+ item_blocks = []
+ current_lines = []
+
+ for line_idx, line in enumerate(para_block.get('lines', [])):
+ if line_idx > 0 and line.get(ListLineTag.IS_LIST_START_LINE, False) and current_lines:
+ item_blocks.append({
+ 'type': BlockType.TEXT,
+ 'lines': current_lines,
+ })
+ current_lines = []
+ current_lines.append(line)
+
+ if current_lines:
+ item_blocks.append({
+ 'type': BlockType.TEXT,
+ 'lines': current_lines,
+ })
+
+ return item_blocks
+
+
+def _get_body_data(para_block):
+ def get_data_from_spans(lines):
+ for line in lines:
+ for span in line.get('spans', []):
+ span_type = span.get('type')
+ if span_type == ContentType.TABLE:
+ return span.get('image_path', ''), span.get('html', '')
+ if span_type == ContentType.CHART:
+ return span.get('image_path', ''), span.get('content', '')
+ if span_type == ContentType.IMAGE:
+ return span.get('image_path', ''), ''
+ if span_type == ContentType.INTERLINE_EQUATION:
+ return span.get('image_path', ''), span.get('content', '')
+ return '', ''
+
+ if 'blocks' in para_block:
+ for block in para_block['blocks']:
+ block_type = block.get('type')
+ if block_type in [
+ BlockType.IMAGE_BODY,
+ BlockType.TABLE_BODY,
+ BlockType.CHART_BODY,
+ BlockType.CODE_BODY,
+ ]:
+ result = get_data_from_spans(block.get('lines', []))
+ if result != ('', '') or block_type == BlockType.CHART_BODY:
+ return result
+ return '', ''
+
+ return get_data_from_spans(para_block.get('lines', []))
+
+
+def merge_para_with_text_v2(para_block):
+ block_lang = detect_lang(_collect_text_for_lang_detection(para_block))
+ para_content = []
+ para_type = para_block.get('type')
+
+ for line_idx, line in enumerate(para_block.get('lines', [])):
+ for span_idx, span in enumerate(line.get('spans', [])):
+ span_type = span.get('type')
+
+ if span_type == ContentType.TEXT:
+ content = _normalize_text_content(span.get('content', ''))
+ if not content.strip():
+ continue
+
+ output_type = (
+ ContentTypeV2.SPAN_PHONETIC
+ if para_type == BlockType.PHONETIC
+ else ContentTypeV2.SPAN_TEXT
+ )
+ is_last_span = span_idx == len(line['spans']) - 1
+
+ if block_lang in CJK_LANGS:
+ rendered_content = content if is_last_span else f"{content} "
+ else:
+ if (
+ is_last_span
+ and is_hyphen_at_line_end(content)
+ and _next_line_starts_with_lowercase_text(para_block, line_idx)
+ ):
+ rendered_content = content[:-1]
+ elif is_last_span and is_hyphen_at_line_end(content):
+ rendered_content = content
+ else:
+ rendered_content = f"{content} "
+
+ if para_content and para_content[-1]['type'] == output_type:
+ para_content[-1]['content'] += rendered_content
+ else:
+ para_content.append({
+ 'type': output_type,
+ 'content': rendered_content,
+ })
+ elif span_type == ContentType.INLINE_EQUATION:
+ content = span.get('content', '').strip()
+ if content:
+ para_content.append({
+ 'type': ContentTypeV2.SPAN_EQUATION_INLINE,
+ 'content': content,
+ })
+
+ if para_content and para_content[-1]['type'] in [
+ ContentTypeV2.SPAN_TEXT,
+ ContentTypeV2.SPAN_PHONETIC,
+ ]:
+ para_content[-1]['content'] = para_content[-1]['content'].rstrip()
+
+ return para_content
+
+
def make_blocks_to_content_list(para_block, img_buket_path, page_idx, page_size):
para_type = para_block['type']
- para_content = {}
+ para_content = None
if para_type in [
BlockType.TEXT,
- BlockType.LIST,
BlockType.INDEX,
+ BlockType.LIST,
+ BlockType.ABSTRACT,
]:
para_content = {
'type': ContentType.TEXT,
'text': merge_para_with_text(para_block),
}
- elif para_type == BlockType.DISCARDED:
+ elif para_type in [
+ BlockType.HEADER,
+ BlockType.FOOTER,
+ BlockType.PAGE_NUMBER,
+ BlockType.ASIDE_TEXT,
+ BlockType.PAGE_FOOTNOTE,
+ ]:
para_content = {
'type': para_type,
'text': merge_para_with_text(para_block),
}
+ elif para_type == BlockType.REF_TEXT:
+ para_content = {
+ 'type': BlockType.LIST,
+ 'sub_type': BlockType.REF_TEXT,
+ 'list_items': [],
+ }
+ for block in _get_ref_text_item_blocks(para_block):
+ item_text = merge_para_with_text(block)
+ if item_text.strip():
+ para_content['list_items'].append(item_text)
elif para_type == BlockType.TITLE:
para_content = {
'type': ContentType.TEXT,
@@ -436,6 +635,15 @@ def make_blocks_to_content_list(para_block, img_buket_path, page_idx, page_size)
if para_block['lines'][0]['spans'][0].get('content', ''):
para_content['text'] = merge_para_with_text(para_block)
para_content['text_format'] = 'latex'
+ elif para_type == BlockType.SEAL:
+ seal_span = _get_seal_span(para_block)
+ if not seal_span:
+ return None
+ para_content = {
+ 'type': ContentType.SEAL,
+ 'img_path': f"{img_buket_path}/{seal_span.get('image_path', '')}",
+ 'text': _get_seal_text(para_block),
+ }
elif para_type == BlockType.IMAGE:
para_content = {'type': ContentType.IMAGE, 'img_path': '', BlockType.IMAGE_CAPTION: [], BlockType.IMAGE_FOOTNOTE: []}
for block in para_block['blocks']:
@@ -469,23 +677,289 @@ def make_blocks_to_content_list(para_block, img_buket_path, page_idx, page_size)
para_content[BlockType.TABLE_CAPTION].append(merge_para_with_text(block))
if block['type'] == BlockType.TABLE_FOOTNOTE:
para_content[BlockType.TABLE_FOOTNOTE].append(merge_para_with_text(block))
+ elif para_type == BlockType.CHART:
+ para_content = {
+ 'type': ContentType.CHART,
+ 'img_path': '',
+ 'content': '',
+ BlockType.CHART_CAPTION: [],
+ BlockType.CHART_FOOTNOTE: [],
+ }
+ for block in para_block.get('blocks', []):
+ if block['type'] == BlockType.CHART_BODY:
+ for line in block['lines']:
+ for span in line['spans']:
+ if span['type'] == ContentType.CHART and span.get('image_path', ''):
+ para_content['img_path'] = f"{img_buket_path}/{span['image_path']}"
+ if block['type'] == BlockType.CHART_CAPTION:
+ para_content[BlockType.CHART_CAPTION].append(merge_para_with_text(block))
+ if block['type'] == BlockType.CHART_FOOTNOTE:
+ para_content[BlockType.CHART_FOOTNOTE].append(merge_para_with_text(block))
+ elif para_type == BlockType.CODE:
+ para_content = {
+ 'type': BlockType.CODE,
+ 'sub_type': para_block['sub_type'],
+ BlockType.CODE_CAPTION: [],
+ BlockType.CODE_FOOTNOTE: [],
+ }
+ for block in para_block.get('blocks', []):
+ render_block = _inherit_parent_code_render_metadata(block, para_block)
+ if block['type'] == BlockType.CODE_BODY:
+ para_content[BlockType.CODE_BODY] = merge_para_with_text(render_block)
+ if block['type'] == BlockType.CODE_CAPTION:
+ para_content[BlockType.CODE_CAPTION].append(merge_para_with_text(block))
+ if block['type'] == BlockType.CODE_FOOTNOTE:
+ para_content[BlockType.CODE_FOOTNOTE].append(merge_para_with_text(block))
- page_width, page_height = page_size
- para_bbox = para_block.get('bbox')
- if para_bbox:
- x0, y0, x1, y1 = para_bbox
- para_content['bbox'] = [
- int(x0 * 1000 / page_width),
- int(y0 * 1000 / page_height),
- int(x1 * 1000 / page_width),
- int(y1 * 1000 / page_height),
- ]
+ if not para_content:
+ return None
+ bbox = _build_bbox(para_block.get('bbox'), page_size)
+ if bbox:
+ para_content['bbox'] = bbox
para_content['page_idx'] = page_idx
return para_content
+def make_blocks_to_content_list_v2(para_block, img_buket_path, page_size):
+ para_type = para_block['type']
+ para_content = None
+
+ if para_type in [
+ BlockType.HEADER,
+ BlockType.FOOTER,
+ BlockType.ASIDE_TEXT,
+ BlockType.PAGE_NUMBER,
+ BlockType.PAGE_FOOTNOTE,
+ ]:
+ if para_type == BlockType.HEADER:
+ content_type = ContentTypeV2.PAGE_HEADER
+ elif para_type == BlockType.FOOTER:
+ content_type = ContentTypeV2.PAGE_FOOTER
+ elif para_type == BlockType.ASIDE_TEXT:
+ content_type = ContentTypeV2.PAGE_ASIDE_TEXT
+ elif para_type == BlockType.PAGE_NUMBER:
+ content_type = ContentTypeV2.PAGE_NUMBER
+ elif para_type == BlockType.PAGE_FOOTNOTE:
+ content_type = ContentTypeV2.PAGE_FOOTNOTE
+ else:
+ raise ValueError(f"Unknown para_type: {para_type}")
+ para_content = {
+ 'type': content_type,
+ 'content': {
+ f"{content_type}_content": merge_para_with_text_v2(para_block),
+ },
+ }
+ elif para_type == BlockType.TITLE:
+ title_level = get_title_level(para_block)
+ if title_level != 0:
+ para_content = {
+ 'type': ContentTypeV2.TITLE,
+ 'content': {
+ 'title_content': merge_para_with_text_v2(para_block),
+ 'level': title_level,
+ },
+ }
+ else:
+ para_content = {
+ 'type': ContentTypeV2.PARAGRAPH,
+ 'content': {
+ 'paragraph_content': merge_para_with_text_v2(para_block),
+ },
+ }
+ elif para_type in [
+ BlockType.TEXT,
+ BlockType.ABSTRACT,
+ ]:
+ para_content = {
+ 'type': ContentTypeV2.PARAGRAPH,
+ 'content': {
+ 'paragraph_content': merge_para_with_text_v2(para_block),
+ },
+ }
+ elif para_type == BlockType.INTERLINE_EQUATION:
+ image_path, math_content = _get_body_data(para_block)
+ para_content = {
+ 'type': ContentTypeV2.EQUATION_INTERLINE,
+ 'content': {
+ 'math_content': math_content,
+ 'math_type': 'latex',
+ 'image_source': {'path': f"{img_buket_path}/{image_path}"},
+ },
+ }
+ elif para_type == BlockType.IMAGE:
+ image_caption = []
+ image_footnote = []
+ image_path, _ = _get_body_data(para_block)
+ for block in para_block.get('blocks', []):
+ if block['type'] == BlockType.IMAGE_CAPTION:
+ image_caption.extend(merge_para_with_text_v2(block))
+ if block['type'] == BlockType.IMAGE_FOOTNOTE:
+ image_footnote.extend(merge_para_with_text_v2(block))
+ para_content = {
+ 'type': ContentTypeV2.IMAGE,
+ 'content': {
+ 'image_source': {'path': f"{img_buket_path}/{image_path}"},
+ 'image_caption': image_caption,
+ 'image_footnote': image_footnote,
+ },
+ }
+ elif para_type == BlockType.TABLE:
+ table_caption = []
+ table_footnote = []
+ image_path, html = _get_body_data(para_block)
+ table_html = _format_embedded_html(html, img_buket_path)
+ table_nest_level = 2 if table_html.count('
1 else 1
+ if 'colspan' in table_html or 'rowspan' in table_html or table_nest_level > 1:
+ table_type = ContentTypeV2.TABLE_COMPLEX
+ else:
+ table_type = ContentTypeV2.TABLE_SIMPLE
+ for block in para_block.get('blocks', []):
+ if block['type'] == BlockType.TABLE_CAPTION:
+ table_caption.extend(merge_para_with_text_v2(block))
+ if block['type'] == BlockType.TABLE_FOOTNOTE:
+ table_footnote.extend(merge_para_with_text_v2(block))
+ para_content = {
+ 'type': ContentTypeV2.TABLE,
+ 'content': {
+ 'image_source': {'path': f"{img_buket_path}/{image_path}"},
+ 'table_caption': table_caption,
+ 'table_footnote': table_footnote,
+ 'html': table_html,
+ 'table_type': table_type,
+ 'table_nest_level': table_nest_level,
+ },
+ }
+ elif para_type == BlockType.CHART:
+ chart_caption = []
+ chart_footnote = []
+ image_path, _ = _get_body_data(para_block)
+ for block in para_block.get('blocks', []):
+ if block['type'] == BlockType.CHART_CAPTION:
+ chart_caption.extend(merge_para_with_text_v2(block))
+ if block['type'] == BlockType.CHART_FOOTNOTE:
+ chart_footnote.extend(merge_para_with_text_v2(block))
+ para_content = {
+ 'type': ContentTypeV2.CHART,
+ 'content': {
+ 'image_source': {'path': f"{img_buket_path}/{image_path}"},
+ 'content': '',
+ 'chart_caption': chart_caption,
+ 'chart_footnote': chart_footnote,
+ },
+ }
+ elif para_type == BlockType.CODE:
+ code_caption = []
+ code_footnote = []
+ code_content = []
+ for block in para_block.get('blocks', []):
+ if block['type'] == BlockType.CODE_CAPTION:
+ code_caption.extend(merge_para_with_text_v2(block))
+ if block['type'] == BlockType.CODE_FOOTNOTE:
+ code_footnote.extend(merge_para_with_text_v2(block))
+ if block['type'] == BlockType.CODE_BODY:
+ code_content = merge_para_with_text_v2(block)
+
+ sub_type = para_block['sub_type']
+ if sub_type == BlockType.CODE:
+ para_content = {
+ 'type': ContentTypeV2.CODE,
+ 'content': {
+ 'code_caption': code_caption,
+ 'code_content': code_content,
+ 'code_footnote': code_footnote,
+ 'code_language': para_block.get('guess_lang', 'txt'),
+ },
+ }
+ elif sub_type == BlockType.ALGORITHM:
+ para_content = {
+ 'type': ContentTypeV2.ALGORITHM,
+ 'content': {
+ 'algorithm_caption': code_caption,
+ 'algorithm_content': code_content,
+ 'algorithm_footnote': code_footnote,
+ },
+ }
+ else:
+ raise ValueError(f"Unknown code sub_type: {sub_type}")
+ elif para_type == BlockType.REF_TEXT:
+ list_items = []
+ for block in _get_ref_text_item_blocks(para_block):
+ item_content = merge_para_with_text_v2(block)
+ if item_content:
+ list_items.append({
+ 'item_type': 'text',
+ 'item_content': item_content,
+ })
+ para_content = {
+ 'type': ContentTypeV2.LIST,
+ 'content': {
+ 'list_type': ContentTypeV2.LIST_REF,
+ 'list_items': list_items,
+ },
+ }
+ elif para_type == BlockType.LIST:
+ list_items = []
+ for block in _split_list_item_blocks(para_block):
+ item_content = merge_para_with_text_v2(block)
+ if item_content:
+ list_items.append({
+ 'item_type': 'text',
+ 'item_content': item_content,
+ })
+ para_content = {
+ 'type': ContentTypeV2.LIST,
+ 'content': {
+ 'list_type': ContentTypeV2.LIST_TEXT,
+ 'attribute': para_block.get('attribute', 'unordered'),
+ 'list_items': list_items,
+ },
+ }
+ elif para_type == BlockType.INDEX:
+ list_items = []
+ for block in _split_list_item_blocks(para_block):
+ item_content = merge_para_with_text_v2(block)
+ if item_content:
+ list_items.append({
+ 'item_type': 'text',
+ 'item_content': item_content,
+ })
+ para_content = {
+ 'type': ContentTypeV2.INDEX,
+ 'content': {
+ 'list_type': ContentTypeV2.LIST_TEXT,
+ 'list_items': list_items,
+ },
+ }
+ elif para_type == BlockType.SEAL:
+ seal_span = _get_seal_span(para_block)
+ if not seal_span:
+ return None
+ seal_text = _get_seal_text(para_block)
+ para_content = {
+ 'type': ContentTypeV2.SEAL,
+ 'content': {
+ 'image_source': {
+ 'path': f"{img_buket_path}/{seal_span.get('image_path', '')}",
+ },
+ 'seal_content': (
+ [{'type': ContentTypeV2.SPAN_TEXT, 'content': seal_text}]
+ if seal_text else []
+ ),
+ },
+ }
+
+ if not para_content:
+ return None
+
+ bbox = _build_bbox(para_block.get('bbox'), page_size)
+ if bbox:
+ para_content['bbox'] = bbox
+
+ return para_content
+
+
def union_make(pdf_info_dict: list,
make_mode: str,
img_buket_path: str = '',
@@ -502,17 +976,30 @@ def union_make(pdf_info_dict: list,
page_markdown = make_blocks_to_markdown(paras_of_layout, make_mode, img_buket_path)
output_content.extend(page_markdown)
elif make_mode == MakeMode.CONTENT_LIST:
- para_blocks = (paras_of_layout or []) + (paras_of_discarded or [])
+ para_blocks = merge_adjacent_ref_text_blocks_for_content(
+ (paras_of_layout or []) + (paras_of_discarded or [])
+ )
if not para_blocks:
continue
for para_block in para_blocks:
para_content = make_blocks_to_content_list(para_block, img_buket_path, page_idx, page_size)
if para_content:
output_content.append(para_content)
+ elif make_mode == MakeMode.CONTENT_LIST_V2:
+ para_blocks = merge_adjacent_ref_text_blocks_for_content(
+ (paras_of_layout or []) + (paras_of_discarded or [])
+ )
+ page_contents = []
+ if para_blocks:
+ for para_block in para_blocks:
+ para_content = make_blocks_to_content_list_v2(para_block, img_buket_path, page_size)
+ if para_content:
+ page_contents.append(para_content)
+ output_content.append(page_contents)
if make_mode in [MakeMode.MM_MD, MakeMode.NLP_MD]:
return '\n\n'.join(output_content)
- elif make_mode == MakeMode.CONTENT_LIST:
+ elif make_mode in [MakeMode.CONTENT_LIST, MakeMode.CONTENT_LIST_V2]:
return output_content
else:
logger.error(f"Unsupported make mode: {make_mode}")
@@ -521,9 +1008,7 @@ def union_make(pdf_info_dict: list,
def get_title_level(block):
title_level = block.get('level', 1)
- if title_level > 4:
- title_level = 4
- elif title_level < 1:
+ if title_level < 1:
title_level = 0
return title_level
diff --git a/mineru/backend/vlm/vlm_middle_json_mkcontent.py b/mineru/backend/vlm/vlm_middle_json_mkcontent.py
index c081cd6f..0b542058 100644
--- a/mineru/backend/vlm/vlm_middle_json_mkcontent.py
+++ b/mineru/backend/vlm/vlm_middle_json_mkcontent.py
@@ -260,9 +260,11 @@ def make_blocks_to_content_list(para_block, img_buket_path, page_idx, page_size)
para_content = {'type': BlockType.CODE, 'sub_type': para_block["sub_type"], BlockType.CODE_CAPTION: []}
for block in para_block['blocks']:
if block['type'] == BlockType.CODE_BODY:
- para_content[BlockType.CODE_BODY] = merge_para_with_text(block)
- if para_block["sub_type"] == BlockType.CODE:
- para_content["guess_lang"] = para_block["guess_lang"]
+ code_text = merge_para_with_text(block)
+ if para_block['sub_type'] == BlockType.CODE:
+ guess_lang = para_block.get("guess_lang", "txt")
+ code_text = f"```{guess_lang}\n{code_text}\n```"
+ para_content[BlockType.CODE_BODY] = code_text
if block['type'] == BlockType.CODE_CAPTION:
para_content[BlockType.CODE_CAPTION].append(merge_para_with_text(block))
diff --git a/mineru/cli/common.py b/mineru/cli/common.py
index c0a51bfc..d20d943f 100644
--- a/mineru/cli/common.py
+++ b/mineru/cli/common.py
@@ -172,17 +172,18 @@ def _process_output(
)
if f_dump_content_list:
+
content_list = make_func(pdf_info, MakeMode.CONTENT_LIST, image_dir)
md_writer.write_string(
f"{pdf_file_name}_content_list.json",
json.dumps(content_list, ensure_ascii=False, indent=4),
)
- if process_mode != "pipeline":
- content_list_v2 = make_func(pdf_info, MakeMode.CONTENT_LIST_V2, image_dir)
- md_writer.write_string(
- f"{pdf_file_name}_content_list_v2.json",
- json.dumps(content_list_v2, ensure_ascii=False, indent=4),
- )
+
+ content_list_v2 = make_func(pdf_info, MakeMode.CONTENT_LIST_V2, image_dir)
+ md_writer.write_string(
+ f"{pdf_file_name}_content_list_v2.json",
+ json.dumps(content_list_v2, ensure_ascii=False, indent=4),
+ )
if f_dump_middle_json:
diff --git a/mineru/utils/enum_class.py b/mineru/utils/enum_class.py
index 2c84ed00..bf990a8b 100644
--- a/mineru/utils/enum_class.py
+++ b/mineru/utils/enum_class.py
@@ -65,6 +65,7 @@ class ContentTypeV2:
ALGORITHM = "algorithm"
EQUATION_INTERLINE = 'equation_interline'
IMAGE = 'image'
+ SEAL = 'seal'
TABLE = 'table'
CHART = 'chart'
TABLE_SIMPLE = 'simple_table'
diff --git a/mineru/utils/pdf_classify.py b/mineru/utils/pdf_classify.py
index f6103c4c..1a28197d 100644
--- a/mineru/utils/pdf_classify.py
+++ b/mineru/utils/pdf_classify.py
@@ -1,113 +1,328 @@
# Copyright (c) Opendatalab. All rights reserved.
+import os
import re
import threading
from io import BytesIO
+
import numpy as np
import pypdfium2 as pdfium
+import pypdfium2.raw as pdfium_c
from loguru import logger
-from pdfminer.high_level import extract_text
-from pdfminer.pdfparser import PDFParser
-from pdfminer.pdfdocument import PDFDocument
-from pdfminer.pdfpage import PDFPage
-from pdfminer.pdfinterp import PDFResourceManager
-from pdfminer.pdfinterp import PDFPageInterpreter
-from pdfminer.layout import LAParams, LTImage, LTFigure
+from pypdf import PdfReader
from pdfminer.converter import PDFPageAggregator
+from pdfminer.high_level import extract_text
+from pdfminer.layout import LAParams, LTFigure, LTImage
+from pdfminer.pdfdocument import PDFDocument
+from pdfminer.pdfinterp import PDFPageInterpreter, PDFResourceManager
+from pdfminer.pdfpage import PDFPage
+from pdfminer.pdfparser import PDFParser
+
+PDF_CLASSIFY_STRATEGY_ENV = "MINERU_PDF_CLASSIFY_STRATEGY"
+PDF_CLASSIFY_STRATEGY_HYBRID = "hybrid"
+PDF_CLASSIFY_STRATEGY_LEGACY = "legacy"
+
+MAX_SAMPLE_PAGES = 10
+CHARS_THRESHOLD = 50
+HIGH_IMAGE_COVERAGE_THRESHOLD = 0.8
+CID_RATIO_THRESHOLD = 0.05
+TEXT_QUALITY_MIN_CHARS = 300
+TEXT_QUALITY_BAD_THRESHOLD = 0.03
+TEXT_QUALITY_GOOD_THRESHOLD = 0.005
+
+_ALLOWED_CONTROL_CODES = {9, 10, 13}
+_PRIVATE_USE_AREA_START = 0xE000
+_PRIVATE_USE_AREA_END = 0xF8FF
_pdf_sample_extract_lock = threading.Lock()
def classify(pdf_bytes):
"""
- 判断PDF文件是可以直接提取文本还是需要OCR
-
- Args:
- pdf_bytes: PDF文件的字节数据
+ Classify a PDF as text-based or OCR-based.
Returns:
- str: 'txt' 表示可以直接提取文本,'ocr' 表示需要OCR
+ "txt" if the PDF can be parsed as text, otherwise "ocr".
"""
- # 从字节数据加载PDF
- sample_pdf_bytes = extract_pages(pdf_bytes)
- if not sample_pdf_bytes:
- return 'ocr'
- pdf = pdfium.PdfDocument(sample_pdf_bytes)
+ strategy = get_pdf_classify_strategy()
+ if strategy == PDF_CLASSIFY_STRATEGY_LEGACY:
+ return classify_legacy(pdf_bytes)
+ return classify_hybrid(pdf_bytes)
+
+
+def get_pdf_classify_strategy() -> str:
+ strategy = os.getenv(
+ PDF_CLASSIFY_STRATEGY_ENV, PDF_CLASSIFY_STRATEGY_HYBRID
+ ).strip().lower()
+ if strategy not in {
+ PDF_CLASSIFY_STRATEGY_HYBRID,
+ PDF_CLASSIFY_STRATEGY_LEGACY,
+ }:
+ logger.warning(
+ f"Invalid {PDF_CLASSIFY_STRATEGY_ENV} value: {strategy}, "
+ f"fall back to {PDF_CLASSIFY_STRATEGY_HYBRID}"
+ )
+ return PDF_CLASSIFY_STRATEGY_HYBRID
+ return strategy
+
+
+def classify_hybrid(pdf_bytes):
+ """
+ Fast PDF classification path.
+
+ The hybrid path uses pdfium + pypdf as the main path and falls back to
+ pdfminer only for gray-zone samples.
+ """
+
+ pdf = None
+ page_indices = []
+ should_run_pdfminer_fallback = False
+
try:
- # 获取PDF页数
+ pdf = pdfium.PdfDocument(pdf_bytes)
page_count = len(pdf)
-
- # 如果PDF页数为0,直接返回OCR
if page_count == 0:
- return 'ocr'
+ return "ocr"
- # 检查的页面数(最多检查10页)
- pages_to_check = min(page_count, 10)
+ page_indices = get_sample_page_indices(page_count, MAX_SAMPLE_PAGES)
+ if not page_indices:
+ return "ocr"
- # 设置阈值:如果每页平均少于50个有效字符,认为需要OCR
- chars_threshold = 50
+ if (
+ get_avg_cleaned_chars_per_page_pdfium(pdf, page_indices)
+ < CHARS_THRESHOLD
+ ):
+ return "ocr"
- # 检查平均字符数和无效字符
- if (get_avg_cleaned_chars_per_page(pdf, pages_to_check) < chars_threshold) or detect_invalid_chars(sample_pdf_bytes):
- return 'ocr'
+ if detect_cid_font_signal_pypdf(pdf_bytes, page_indices):
+ return "ocr"
- # 检查图像覆盖率
- if get_high_image_coverage_ratio(sample_pdf_bytes, pages_to_check) >= 0.8:
- return 'ocr'
+ text_quality_signal = get_text_quality_signal_pdfium(pdf, page_indices)
+ total_chars = text_quality_signal["total_chars"]
+ abnormal_ratio = text_quality_signal["abnormal_ratio"]
- return 'txt'
+ if total_chars >= TEXT_QUALITY_MIN_CHARS:
+ if abnormal_ratio >= TEXT_QUALITY_BAD_THRESHOLD:
+ return "ocr"
+ should_run_pdfminer_fallback = abnormal_ratio > TEXT_QUALITY_GOOD_THRESHOLD
+ else:
+ should_run_pdfminer_fallback = True
+
+ if (
+ get_high_image_coverage_ratio_pdfium(pdf, page_indices)
+ >= HIGH_IMAGE_COVERAGE_THRESHOLD
+ ):
+ return "ocr"
except Exception as e:
- logger.error(f"判断PDF类型时出错: {e}")
- # 出错时默认使用OCR
- return 'ocr'
+ logger.error(f"Failed to classify PDF with hybrid strategy: {e}")
+ return "ocr"
+
+ finally:
+ if pdf is not None:
+ pdf.close()
+
+ if should_run_pdfminer_fallback:
+ sample_pdf_bytes = extract_selected_pages(pdf_bytes, page_indices)
+ if not sample_pdf_bytes:
+ return "ocr"
+ if detect_invalid_chars_pdfminer_fallback(sample_pdf_bytes):
+ return "ocr"
+
+ return "txt"
+
+
+def classify_legacy(pdf_bytes):
+ """
+ Legacy classification path kept for rollback and A/B comparison.
+ """
+
+ sample_pdf_bytes = extract_pages(pdf_bytes)
+ if not sample_pdf_bytes:
+ return "ocr"
+ pdf = pdfium.PdfDocument(sample_pdf_bytes)
+ try:
+ page_count = len(pdf)
+ if page_count == 0:
+ return "ocr"
+
+ pages_to_check = min(page_count, MAX_SAMPLE_PAGES)
+
+ if (
+ get_avg_cleaned_chars_per_page(pdf, pages_to_check) < CHARS_THRESHOLD
+ ) or detect_invalid_chars(sample_pdf_bytes):
+ return "ocr"
+
+ if (
+ get_high_image_coverage_ratio(sample_pdf_bytes, pages_to_check)
+ >= HIGH_IMAGE_COVERAGE_THRESHOLD
+ ):
+ return "ocr"
+
+ return "txt"
+
+ except Exception as e:
+ logger.error(f"Failed to classify PDF with legacy strategy: {e}")
+ return "ocr"
finally:
- # 无论执行哪个路径,都确保PDF被关闭
pdf.close()
+def get_sample_page_indices(page_count: int, max_pages: int = MAX_SAMPLE_PAGES):
+ if page_count <= 0 or max_pages <= 0:
+ return []
+
+ sample_count = min(page_count, max_pages)
+ if sample_count == page_count:
+ return list(range(page_count))
+ if sample_count == 1:
+ return [0]
+
+ indices = []
+ seen = set()
+ for i in range(sample_count):
+ page_index = round(i * (page_count - 1) / (sample_count - 1))
+ page_index = max(0, min(page_count - 1, page_index))
+ if page_index not in seen:
+ indices.append(page_index)
+ seen.add(page_index)
+
+ if len(indices) < sample_count:
+ for page_index in range(page_count):
+ if page_index in seen:
+ continue
+ indices.append(page_index)
+ seen.add(page_index)
+ if len(indices) == sample_count:
+ break
+
+ return sorted(indices)
+
+
def get_avg_cleaned_chars_per_page(pdf_doc, pages_to_check):
- # 总字符数
total_chars = 0
- # 清理后的总字符数
cleaned_total_chars = 0
- # 检查前几页的文本
for i in range(pages_to_check):
page = pdf_doc[i]
text_page = page.get_textpage()
text = text_page.get_text_bounded()
total_chars += len(text)
-
- # 清理提取的文本,移除空白字符
- cleaned_text = re.sub(r'\s+', '', text)
+ cleaned_text = re.sub(r"\s+", "", text)
cleaned_total_chars += len(cleaned_text)
- # 计算平均每页字符数
avg_cleaned_chars_per_page = cleaned_total_chars / pages_to_check
-
- # logger.debug(f"PDF分析: 平均每页清理后{avg_cleaned_chars_per_page:.1f}字符")
-
return avg_cleaned_chars_per_page
+def get_avg_cleaned_chars_per_page_pdfium(pdf_doc, page_indices):
+ cleaned_total_chars = 0
+
+ for page_index in page_indices:
+ page = pdf_doc[page_index]
+ text_page = page.get_textpage()
+ text = text_page.get_text_bounded()
+ cleaned_total_chars += len(re.sub(r"\s+", "", text))
+
+ if not page_indices:
+ return 0.0
+ return cleaned_total_chars / len(page_indices)
+
+
+def get_text_quality_signal_pdfium(pdf_doc, page_indices):
+ total_chars = 0
+ null_char_count = 0
+ replacement_char_count = 0
+ control_char_count = 0
+ private_use_char_count = 0
+
+ for page_index in page_indices:
+ page = pdf_doc[page_index]
+ text_page = page.get_textpage()
+ char_count = text_page.count_chars()
+ total_chars += char_count
+
+ for char_index in range(char_count):
+ unicode_code = pdfium_c.FPDFText_GetUnicode(text_page, char_index)
+ if unicode_code == 0:
+ null_char_count += 1
+ elif unicode_code == 0xFFFD:
+ replacement_char_count += 1
+ elif unicode_code < 32 and unicode_code not in _ALLOWED_CONTROL_CODES:
+ control_char_count += 1
+ elif _PRIVATE_USE_AREA_START <= unicode_code <= _PRIVATE_USE_AREA_END:
+ private_use_char_count += 1
+
+ abnormal_chars = (
+ null_char_count
+ + replacement_char_count
+ + control_char_count
+ + private_use_char_count
+ )
+
+ abnormal_ratio = 0.0
+ if total_chars > 0:
+ abnormal_ratio = abnormal_chars / total_chars
+
+ return {
+ "total_chars": total_chars,
+ "abnormal_ratio": abnormal_ratio,
+ "null_char_count": null_char_count,
+ "replacement_char_count": replacement_char_count,
+ "control_char_count": control_char_count,
+ "private_use_char_count": private_use_char_count,
+ }
+
+
+def detect_cid_font_signal_pypdf(pdf_bytes, page_indices):
+ reader = PdfReader(BytesIO(pdf_bytes))
+
+ for page_index in page_indices:
+ page = reader.pages[page_index]
+ resources = _resolve_pdf_object(page.get("/Resources"))
+ if not resources:
+ continue
+
+ fonts = _resolve_pdf_object(resources.get("/Font"))
+ if not fonts:
+ continue
+
+ for _, font_ref in fonts.items():
+ font = _resolve_pdf_object(font_ref)
+ if not font:
+ continue
+
+ subtype = str(font.get("/Subtype"))
+ encoding = str(font.get("/Encoding"))
+ has_descendant_fonts = "/DescendantFonts" in font
+ has_to_unicode = "/ToUnicode" in font
+
+ if (
+ subtype == "/Type0"
+ and encoding in ("/Identity-H", "/Identity-V")
+ and has_descendant_fonts
+ and not has_to_unicode
+ ):
+ return True
+
+ return False
+
+
+def _resolve_pdf_object(obj):
+ if hasattr(obj, "get_object"):
+ return obj.get_object()
+ return obj
+
+
def get_high_image_coverage_ratio(sample_pdf_bytes, pages_to_check):
- # 创建内存文件对象
pdf_stream = BytesIO(sample_pdf_bytes)
-
- # 创建PDF解析器
parser = PDFParser(pdf_stream)
-
- # 创建PDF文档对象
document = PDFDocument(parser)
- # 检查文档是否允许文本提取
if not document.is_extractable:
- # logger.warning("PDF不允许内容提取")
- return 1.0 # 默认为高覆盖率,因为无法提取内容
+ return 1.0
- # 创建资源管理器和参数对象
rsrcmgr = PDFResourceManager()
laparams = LAParams(
line_overlap=0.5,
@@ -118,115 +333,144 @@ def get_high_image_coverage_ratio(sample_pdf_bytes, pages_to_check):
detect_vertical=False,
all_texts=False,
)
-
- # 创建聚合器
device = PDFPageAggregator(rsrcmgr, laparams=laparams)
-
- # 创建解释器
interpreter = PDFPageInterpreter(rsrcmgr, device)
- # 记录高图像覆盖率的页面数量
high_image_coverage_pages = 0
page_count = 0
- # 遍历页面
for page in PDFPage.create_pages(document):
- # 控制检查的页数
if page_count >= pages_to_check:
break
- # 处理页面
interpreter.process_page(page)
layout = device.get_result()
- # 页面尺寸
page_width = layout.width
page_height = layout.height
page_area = page_width * page_height
- # 计算图像覆盖的总面积
image_area = 0
-
- # 遍历页面元素
for element in layout:
- # 检查是否为图像或图形元素
if isinstance(element, (LTImage, LTFigure)):
- # 计算图像边界框面积
img_width = element.width
img_height = element.height
- img_area = img_width * img_height
- image_area += img_area
+ image_area += img_width * img_height
- # 计算覆盖率
coverage_ratio = min(image_area / page_area, 1.0) if page_area > 0 else 0
- # logger.debug(f"PDF分析: 页面 {page_count + 1} 图像覆盖率: {coverage_ratio:.2f}")
-
- # 判断是否为高覆盖率
- if coverage_ratio >= 0.8: # 使用80%作为高覆盖率的阈值
+ if coverage_ratio >= HIGH_IMAGE_COVERAGE_THRESHOLD:
high_image_coverage_pages += 1
page_count += 1
- # 关闭资源
pdf_stream.close()
- # 如果没有处理任何页面,返回0
if page_count == 0:
return 0.0
- # 计算高图像覆盖率的页面比例
- high_coverage_ratio = high_image_coverage_pages / page_count
- # logger.debug(f"PDF分析: 高图像覆盖页面比例: {high_coverage_ratio:.2f}")
+ return high_image_coverage_pages / page_count
- return high_coverage_ratio
+
+def get_high_image_coverage_ratio_pdfium(pdf_doc, page_indices):
+ high_image_coverage_pages = 0
+
+ for page_index in page_indices:
+ page = pdf_doc[page_index]
+ page_bbox = page.get_bbox()
+ page_area = abs(
+ (page_bbox[2] - page_bbox[0]) * (page_bbox[3] - page_bbox[1])
+ )
+ image_area = 0.0
+
+ for page_object in page.get_objects(
+ filter=[pdfium_c.FPDF_PAGEOBJ_IMAGE], max_depth=3
+ ):
+ left, bottom, right, top = page_object.get_pos()
+ image_area += max(0.0, right - left) * max(0.0, top - bottom)
+
+ coverage_ratio = min(image_area / page_area, 1.0) if page_area > 0 else 0.0
+ if coverage_ratio >= HIGH_IMAGE_COVERAGE_THRESHOLD:
+ high_image_coverage_pages += 1
+
+ if not page_indices:
+ return 0.0
+ return high_image_coverage_pages / len(page_indices)
def extract_pages(src_pdf_bytes: bytes) -> bytes:
"""
- 从PDF字节数据中随机提取最多10页,返回新的PDF字节数据
-
- Args:
- src_pdf_bytes: PDF文件的字节数据
-
- Returns:
- bytes: 提取页面后的PDF字节数据
+ Extract up to 10 random pages and return them as a new PDF.
"""
with _pdf_sample_extract_lock:
pdf = None
sample_docs = None
try:
- # 从字节数据加载PDF
pdf = pdfium.PdfDocument(src_pdf_bytes)
-
- # 获取PDF页数
total_page = len(pdf)
if total_page == 0:
- # 如果PDF没有页面,直接返回空文档
logger.warning("PDF is empty, return empty document")
- return b''
+ return b""
- # 小文档直接复用原始字节,避免无意义的 PDF 重写。
- if total_page <= 10:
+ if total_page <= MAX_SAMPLE_PAGES:
return src_pdf_bytes
- # 选择最多10页
- select_page_cnt = min(10, total_page)
+ select_page_cnt = min(MAX_SAMPLE_PAGES, total_page)
+ page_indices = np.random.choice(
+ total_page, select_page_cnt, replace=False
+ ).tolist()
- # 从总页数中随机选择页面
- page_indices = np.random.choice(total_page, select_page_cnt, replace=False).tolist()
-
- # 创建一个新的PDF文档
sample_docs = pdfium.PdfDocument.new()
-
- # 将选择的页面导入新文档
sample_docs.import_pages(pdf, page_indices)
- # 将新PDF保存到内存缓冲区
output_buffer = BytesIO()
sample_docs.save(output_buffer)
+ return output_buffer.getvalue()
+ except Exception as e:
+ logger.exception(e)
+ return src_pdf_bytes
+ finally:
+ if pdf is not None:
+ pdf.close()
+ if sample_docs is not None:
+ sample_docs.close()
- # 获取字节数据
+
+def extract_selected_pages(src_pdf_bytes: bytes, page_indices) -> bytes:
+ """
+ Extract specific pages and return them as a new PDF.
+ """
+
+ selected_page_indices = sorted(set(page_indices))
+ if not selected_page_indices:
+ return b""
+
+ with _pdf_sample_extract_lock:
+ pdf = None
+ sample_docs = None
+ try:
+ pdf = pdfium.PdfDocument(src_pdf_bytes)
+ total_page = len(pdf)
+ if total_page == 0:
+ logger.warning("PDF is empty, return empty document")
+ return b""
+
+ selected_page_indices = [
+ page_index
+ for page_index in selected_page_indices
+ if 0 <= page_index < total_page
+ ]
+ if not selected_page_indices:
+ return b""
+
+ if selected_page_indices == list(range(total_page)):
+ return src_pdf_bytes
+
+ sample_docs = pdfium.PdfDocument.new()
+ sample_docs.import_pages(pdf, selected_page_indices)
+
+ output_buffer = BytesIO()
+ sample_docs.save(output_buffer)
return output_buffer.getvalue()
except Exception as e:
logger.exception(e)
@@ -239,11 +483,10 @@ def extract_pages(src_pdf_bytes: bytes) -> bytes:
def detect_invalid_chars(sample_pdf_bytes: bytes) -> bool:
- """"
- 检测PDF中是否包含非法字符
"""
- '''pdfminer比较慢,需要先随机抽取10页左右的sample'''
- # sample_pdf_bytes = extract_pages(src_pdf_bytes)
+ Detect whether a PDF contains invalid CID-style extracted text.
+ """
+
sample_pdf_file_like_object = BytesIO(sample_pdf_bytes)
laparams = LAParams(
line_overlap=0.5,
@@ -256,26 +499,25 @@ def detect_invalid_chars(sample_pdf_bytes: bytes) -> bool:
)
text = extract_text(pdf_file=sample_pdf_file_like_object, laparams=laparams)
text = text.replace("\n", "")
- # logger.info(text)
- '''乱码文本用pdfminer提取出来的文本特征是(cid:xxx)'''
- cid_pattern = re.compile(r'\(cid:\d+\)')
+
+ cid_pattern = re.compile(r"\(cid:\d+\)")
matches = cid_pattern.findall(text)
cid_count = len(matches)
cid_len = sum(len(match) for match in matches)
text_len = len(text)
if text_len == 0:
- cid_chars_radio = 0
+ cid_chars_ratio = 0
else:
- cid_chars_radio = cid_count/(cid_count + text_len - cid_len)
- # logger.debug(f"cid_count: {cid_count}, text_len: {text_len}, cid_chars_radio: {cid_chars_radio}")
- '''当一篇文章存在5%以上的文本是乱码时,认为该文档为乱码文档'''
- if cid_chars_radio > 0.05:
- return True # 乱码文档
- else:
- return False # 正常文档
+ cid_chars_ratio = cid_count / (cid_count + text_len - cid_len)
+
+ return cid_chars_ratio > CID_RATIO_THRESHOLD
-if __name__ == '__main__':
- with open('/Users/myhloli/pdf/luanma2x10.pdf', 'rb') as f:
+def detect_invalid_chars_pdfminer_fallback(sample_pdf_bytes: bytes) -> bool:
+ return detect_invalid_chars(sample_pdf_bytes)
+
+
+if __name__ == "__main__":
+ with open("/Users/myhloli/pdf/luanma2x10.pdf", "rb") as f:
p_bytes = f.read()
- logger.info(f"PDF分类结果: {classify(p_bytes)}")
+ logger.info(f"PDF classify result: {classify(p_bytes)}")