Merge pull request #4644 from myhloli/dev

Dev
This commit is contained in:
Xiaomeng Zhao
2026-03-24 00:30:13 +08:00
committed by GitHub
6 changed files with 1097 additions and 218 deletions

View File

@@ -1,5 +1,6 @@
import os
import re
import unicodedata
from html import escape
from loguru import logger
@@ -24,6 +25,7 @@ inline_right_delimiter = delimiters['inline']['right']
OFFICE_STYLE_RENDER_MODE_ENV = 'MINERU_OFFICE_STYLE_RENDER_MODE'
OFFICE_STYLE_RENDER_MODE_HTML = 'html'
OFFICE_STYLE_RENDER_MODE_MARKDOWN = 'markdown'
OFFICE_MARKDOWN_WRAPPER_STYLES = {'bold', 'italic', 'strikethrough'}
def _apply_markdown_style(content: str, style: list) -> str:
@@ -162,14 +164,187 @@ def get_title_level(para_block):
return title_level
def _make_rendered_part(
span_type,
rendered_content: str,
raw_content: str = '',
style: list | None = None,
has_markdown_wrapper: bool = False,
):
return {
'span_type': span_type,
'rendered_content': rendered_content,
'raw_content': raw_content,
'style': style or [],
'has_markdown_wrapper': has_markdown_wrapper,
}
def _has_markdown_wrapper(style: list) -> bool:
if _get_office_style_render_mode() != OFFICE_STYLE_RENDER_MODE_MARKDOWN:
return False
if not style or 'underline' in style:
return False
return any(name in OFFICE_MARKDOWN_WRAPPER_STYLES for name in style)
def _get_first_non_whitespace_char(text: str):
for ch in text:
if not ch.isspace():
return ch
return None
def _get_last_non_whitespace_char(text: str):
for ch in reversed(text):
if not ch.isspace():
return ch
return None
def _is_punctuation_or_symbol(ch: str) -> bool:
return unicodedata.category(ch).startswith(('P', 'S'))
def _is_boundary_text_char(ch: str) -> bool:
if ch.isspace():
return False
return not _is_punctuation_or_symbol(ch)
def _needs_markdown_it_boundary_space(prev_part: dict, next_part: dict) -> bool:
if _get_office_style_render_mode() != OFFICE_STYLE_RENDER_MODE_MARKDOWN:
return False
if not prev_part.get('has_markdown_wrapper', False):
return False
if next_part.get('span_type') in {
ContentType.HYPERLINK,
ContentType.INLINE_EQUATION,
ContentType.INTERLINE_EQUATION,
}:
return False
prev_raw = prev_part.get('raw_content', '')
next_raw = next_part.get('raw_content', '')
if not prev_raw.strip() or not next_raw.strip():
return False
if prev_raw[-1].isspace() or next_raw[0].isspace():
return False
prev_char = _get_last_non_whitespace_char(prev_raw)
next_char = _get_first_non_whitespace_char(next_raw)
if prev_char is None or next_char is None:
return False
if not _is_punctuation_or_symbol(prev_char):
return False
if not _is_boundary_text_char(next_char):
return False
return True
def _join_rendered_parts(parts: list[dict]) -> str:
para_text = ''
prev_part = None
for i, part in enumerate(parts):
span_type = part['span_type']
content = part['rendered_content']
is_last = i == len(parts) - 1
if span_type == ContentType.INLINE_EQUATION:
if para_text and not para_text.endswith(' '):
para_text += ' '
para_text += content
if not is_last:
para_text += ' '
else:
if prev_part is not None and _needs_markdown_it_boundary_space(prev_part, part):
para_text += ' '
para_text += content
prev_part = part
return para_text
def _append_text_part(parts: list[dict], original_content: str, span_style: list):
content_stripped = original_content.strip()
if content_stripped:
styled = _apply_configured_style(content_stripped, span_style)
leading = original_content[:len(original_content) - len(original_content.lstrip())]
trailing = original_content[len(original_content.rstrip()):]
parts.append(
_make_rendered_part(
ContentType.TEXT,
leading + styled + trailing,
raw_content=original_content,
style=span_style,
has_markdown_wrapper=_has_markdown_wrapper(span_style),
)
)
elif original_content:
visible_styles = {'underline', 'strikethrough'}
if span_style and any(s in visible_styles for s in span_style):
rendered_content = original_content.replace(" ", " ")
rendered_content = _apply_configured_style(rendered_content, span_style)
else:
rendered_content = original_content
parts.append(
_make_rendered_part(
ContentType.TEXT,
rendered_content,
raw_content=original_content,
style=span_style,
)
)
def _append_hyperlink_part(
parts: list[dict],
original_content: str,
span_style: list,
url: str = '',
plain_text_only: bool = False,
):
link_text = original_content.strip()
if not link_text:
return
styled_text = _apply_configured_style(link_text, span_style)
if plain_text_only:
leading = original_content[:len(original_content) - len(original_content.lstrip())]
trailing = original_content[len(original_content.rstrip()):]
rendered_content = leading + styled_text + trailing
has_markdown_wrapper = _has_markdown_wrapper(span_style)
else:
rendered_content = _render_link(styled_text, url)
has_markdown_wrapper = False
parts.append(
_make_rendered_part(
ContentType.HYPERLINK,
rendered_content,
raw_content=original_content,
style=span_style,
has_markdown_wrapper=has_markdown_wrapper,
)
)
def merge_para_with_text(para_block):
# First pass: collect all non-empty (span_type, content) parts
# First pass: collect rendered parts with raw boundary metadata.
parts = []
if para_block['type'] == BlockType.TITLE:
if para_block.get('is_numbered_style', False):
section_number = para_block.get('section_number', '')
if section_number:
parts.append((ContentType.TEXT, f"{section_number} "))
parts.append(
_make_rendered_part(
ContentType.TEXT,
f"{section_number} ",
raw_content=f"{section_number} ",
)
)
for line in para_block['lines']:
for span in line['spans']:
@@ -177,57 +352,38 @@ def merge_para_with_text(para_block):
span_style = span.get('style', [])
if span_type == ContentType.TEXT:
original_content = span['content']
content_stripped = original_content.strip()
if content_stripped:
styled = _apply_configured_style(content_stripped, span_style)
leading = original_content[:len(original_content) - len(original_content.lstrip())]
trailing = original_content[len(original_content.rstrip()):]
parts.append((span_type, leading + styled + trailing))
elif original_content:
# Whitespace-only span: apply visible styles if present,
# otherwise preserve as spacing between styled parts
_visible = {'underline', 'strikethrough'}
if span_style and any(s in _visible for s in span_style):
# 将original_content替换为 
original_content = original_content.replace(" ", " ")
styled = _apply_configured_style(original_content, span_style)
parts.append((span_type, styled))
else:
parts.append((span_type, original_content))
_append_text_part(parts, span['content'], span_style)
elif span_type == ContentType.INLINE_EQUATION:
content = f"{inline_left_delimiter}{span['content']}{inline_right_delimiter}"
content = content.strip()
if content:
parts.append((span_type, content))
parts.append(
_make_rendered_part(
span_type,
content,
raw_content=span['content'],
)
)
elif span_type == ContentType.INTERLINE_EQUATION:
content = f"\n{display_left_delimiter}\n{span['content']}\n{display_right_delimiter}\n"
content = content.strip()
if content:
parts.append((span_type, content))
parts.append(
_make_rendered_part(
span_type,
content,
raw_content=span['content'],
)
)
elif span_type == ContentType.HYPERLINK:
link_text = span['content'].strip()
if link_text:
link_text = _apply_configured_style(link_text, span_style)
content = _render_link(link_text, span.get('url', ''))
parts.append((span_type, content))
_append_hyperlink_part(
parts,
span['content'],
span_style,
url=span.get('url', ''),
)
# Second pass: join parts, keeping one space on each side of inline equations
para_text = ''
for i, (span_type, content) in enumerate(parts):
is_last = i == len(parts) - 1
if span_type == ContentType.INLINE_EQUATION:
# Ensure one space before the equation (if there is preceding text)
if para_text and not para_text.endswith(' '):
para_text += ' '
para_text += content
# Ensure one space after the equation, unless it is the last part
if not is_last:
para_text += ' '
else:
para_text += content
return para_text
return _join_rendered_parts(parts)
def _flatten_list_items(list_block):
@@ -416,37 +572,29 @@ def _flatten_index_items(index_block):
if item_text:
item_text = _apply_configured_style(item_text, uniform_style)
else:
raw_parts = []
rendered_parts = []
for content, span_type, span_style in stripped_span_items:
if not content:
continue
if span_type == ContentType.INLINE_EQUATION:
# Wrap inline equations with configured delimiters
raw_parts.append(
f'{inline_left_delimiter}{content}{inline_right_delimiter}'
rendered_parts.append(
_make_rendered_part(
span_type,
f'{inline_left_delimiter}{content}{inline_right_delimiter}',
raw_content=content,
)
)
elif span_type == ContentType.HYPERLINK:
# TOC hyperlinks use document-internal bookmark refs; output
# only the styled display text without the URL.
link_text = content.strip()
if link_text:
link_text = _apply_configured_style(link_text, span_style)
leading = content[:len(content) - len(content.lstrip())]
trailing = content[len(content.rstrip()):]
raw_parts.append(leading + link_text + trailing)
_append_hyperlink_part(
rendered_parts,
content,
span_style,
plain_text_only=True,
)
else:
# TEXT span: apply markdown style while preserving
# surrounding whitespace (e.g. leading space after section #).
stripped = content.strip()
if stripped:
styled = _apply_configured_style(stripped, span_style)
leading = content[:len(content) - len(content.lstrip())]
trailing = content[len(content.rstrip()):]
raw_parts.append(leading + styled + trailing)
elif content:
raw_parts.append(content)
_append_text_part(rendered_parts, content, span_style)
item_text = ''.join(raw_parts).strip()
item_text = _join_rendered_parts(rendered_parts).strip()
if not item_text:
continue

View File

@@ -6,7 +6,7 @@ from loguru import logger
from mineru.utils.char_utils import full_to_half_exclude_marks, is_hyphen_at_line_end
from mineru.utils.config_reader import get_latex_delimiter_config
from mineru.backend.pipeline.para_split import ListLineTag
from mineru.utils.enum_class import BlockType, ContentType, MakeMode
from mineru.utils.enum_class import BlockType, ContentType, ContentTypeV2, MakeMode
from mineru.utils.language import detect_lang
@@ -401,23 +401,222 @@ def _next_line_starts_with_lowercase_text(para_block, line_idx):
return bool(next_content) and next_content[0].islower()
def merge_adjacent_ref_text_blocks_for_content(para_blocks):
merged_blocks = []
ref_group = []
def flush_ref_group():
nonlocal ref_group
if not ref_group:
return
if len(ref_group) == 1:
merged_blocks.append(ref_group[0])
else:
merged_blocks.append({
'type': BlockType.REF_TEXT,
'blocks': list(ref_group),
'bbox': ref_group[0].get('bbox'),
})
ref_group = []
for para_block in para_blocks or []:
if para_block.get('type') == BlockType.REF_TEXT:
ref_group.append(para_block)
continue
flush_ref_group()
merged_blocks.append(para_block)
flush_ref_group()
return merged_blocks
def _build_bbox(para_bbox, page_size):
if not para_bbox or not page_size:
return None
page_width, page_height = page_size
x0, y0, x1, y1 = para_bbox
return [
int(x0 * 1000 / page_width),
int(y0 * 1000 / page_height),
int(x1 * 1000 / page_width),
int(y1 * 1000 / page_height),
]
def _get_seal_span(para_block):
for line in para_block.get('lines', []):
for span in line.get('spans', []):
if span.get('type') == ContentType.SEAL:
return span
return None
def _get_seal_text(para_block):
seal_span = _get_seal_span(para_block)
if not seal_span:
return ''
content = seal_span.get('content', '')
if isinstance(content, list):
return ' '.join(str(item) for item in content if str(item).strip())
if isinstance(content, str):
return content.strip()
return ''
def _get_ref_text_item_blocks(para_block):
return para_block.get('blocks') or [para_block]
def _split_list_item_blocks(para_block):
item_blocks = []
current_lines = []
for line_idx, line in enumerate(para_block.get('lines', [])):
if line_idx > 0 and line.get(ListLineTag.IS_LIST_START_LINE, False) and current_lines:
item_blocks.append({
'type': BlockType.TEXT,
'lines': current_lines,
})
current_lines = []
current_lines.append(line)
if current_lines:
item_blocks.append({
'type': BlockType.TEXT,
'lines': current_lines,
})
return item_blocks
def _get_body_data(para_block):
def get_data_from_spans(lines):
for line in lines:
for span in line.get('spans', []):
span_type = span.get('type')
if span_type == ContentType.TABLE:
return span.get('image_path', ''), span.get('html', '')
if span_type == ContentType.CHART:
return span.get('image_path', ''), span.get('content', '')
if span_type == ContentType.IMAGE:
return span.get('image_path', ''), ''
if span_type == ContentType.INTERLINE_EQUATION:
return span.get('image_path', ''), span.get('content', '')
return '', ''
if 'blocks' in para_block:
for block in para_block['blocks']:
block_type = block.get('type')
if block_type in [
BlockType.IMAGE_BODY,
BlockType.TABLE_BODY,
BlockType.CHART_BODY,
BlockType.CODE_BODY,
]:
result = get_data_from_spans(block.get('lines', []))
if result != ('', '') or block_type == BlockType.CHART_BODY:
return result
return '', ''
return get_data_from_spans(para_block.get('lines', []))
def merge_para_with_text_v2(para_block):
block_lang = detect_lang(_collect_text_for_lang_detection(para_block))
para_content = []
para_type = para_block.get('type')
for line_idx, line in enumerate(para_block.get('lines', [])):
for span_idx, span in enumerate(line.get('spans', [])):
span_type = span.get('type')
if span_type == ContentType.TEXT:
content = _normalize_text_content(span.get('content', ''))
if not content.strip():
continue
output_type = (
ContentTypeV2.SPAN_PHONETIC
if para_type == BlockType.PHONETIC
else ContentTypeV2.SPAN_TEXT
)
is_last_span = span_idx == len(line['spans']) - 1
if block_lang in CJK_LANGS:
rendered_content = content if is_last_span else f"{content} "
else:
if (
is_last_span
and is_hyphen_at_line_end(content)
and _next_line_starts_with_lowercase_text(para_block, line_idx)
):
rendered_content = content[:-1]
elif is_last_span and is_hyphen_at_line_end(content):
rendered_content = content
else:
rendered_content = f"{content} "
if para_content and para_content[-1]['type'] == output_type:
para_content[-1]['content'] += rendered_content
else:
para_content.append({
'type': output_type,
'content': rendered_content,
})
elif span_type == ContentType.INLINE_EQUATION:
content = span.get('content', '').strip()
if content:
para_content.append({
'type': ContentTypeV2.SPAN_EQUATION_INLINE,
'content': content,
})
if para_content and para_content[-1]['type'] in [
ContentTypeV2.SPAN_TEXT,
ContentTypeV2.SPAN_PHONETIC,
]:
para_content[-1]['content'] = para_content[-1]['content'].rstrip()
return para_content
def make_blocks_to_content_list(para_block, img_buket_path, page_idx, page_size):
para_type = para_block['type']
para_content = {}
para_content = None
if para_type in [
BlockType.TEXT,
BlockType.LIST,
BlockType.INDEX,
BlockType.LIST,
BlockType.ABSTRACT,
]:
para_content = {
'type': ContentType.TEXT,
'text': merge_para_with_text(para_block),
}
elif para_type == BlockType.DISCARDED:
elif para_type in [
BlockType.HEADER,
BlockType.FOOTER,
BlockType.PAGE_NUMBER,
BlockType.ASIDE_TEXT,
BlockType.PAGE_FOOTNOTE,
]:
para_content = {
'type': para_type,
'text': merge_para_with_text(para_block),
}
elif para_type == BlockType.REF_TEXT:
para_content = {
'type': BlockType.LIST,
'sub_type': BlockType.REF_TEXT,
'list_items': [],
}
for block in _get_ref_text_item_blocks(para_block):
item_text = merge_para_with_text(block)
if item_text.strip():
para_content['list_items'].append(item_text)
elif para_type == BlockType.TITLE:
para_content = {
'type': ContentType.TEXT,
@@ -436,6 +635,15 @@ def make_blocks_to_content_list(para_block, img_buket_path, page_idx, page_size)
if para_block['lines'][0]['spans'][0].get('content', ''):
para_content['text'] = merge_para_with_text(para_block)
para_content['text_format'] = 'latex'
elif para_type == BlockType.SEAL:
seal_span = _get_seal_span(para_block)
if not seal_span:
return None
para_content = {
'type': ContentType.SEAL,
'img_path': f"{img_buket_path}/{seal_span.get('image_path', '')}",
'text': _get_seal_text(para_block),
}
elif para_type == BlockType.IMAGE:
para_content = {'type': ContentType.IMAGE, 'img_path': '', BlockType.IMAGE_CAPTION: [], BlockType.IMAGE_FOOTNOTE: []}
for block in para_block['blocks']:
@@ -469,23 +677,289 @@ def make_blocks_to_content_list(para_block, img_buket_path, page_idx, page_size)
para_content[BlockType.TABLE_CAPTION].append(merge_para_with_text(block))
if block['type'] == BlockType.TABLE_FOOTNOTE:
para_content[BlockType.TABLE_FOOTNOTE].append(merge_para_with_text(block))
elif para_type == BlockType.CHART:
para_content = {
'type': ContentType.CHART,
'img_path': '',
'content': '',
BlockType.CHART_CAPTION: [],
BlockType.CHART_FOOTNOTE: [],
}
for block in para_block.get('blocks', []):
if block['type'] == BlockType.CHART_BODY:
for line in block['lines']:
for span in line['spans']:
if span['type'] == ContentType.CHART and span.get('image_path', ''):
para_content['img_path'] = f"{img_buket_path}/{span['image_path']}"
if block['type'] == BlockType.CHART_CAPTION:
para_content[BlockType.CHART_CAPTION].append(merge_para_with_text(block))
if block['type'] == BlockType.CHART_FOOTNOTE:
para_content[BlockType.CHART_FOOTNOTE].append(merge_para_with_text(block))
elif para_type == BlockType.CODE:
para_content = {
'type': BlockType.CODE,
'sub_type': para_block['sub_type'],
BlockType.CODE_CAPTION: [],
BlockType.CODE_FOOTNOTE: [],
}
for block in para_block.get('blocks', []):
render_block = _inherit_parent_code_render_metadata(block, para_block)
if block['type'] == BlockType.CODE_BODY:
para_content[BlockType.CODE_BODY] = merge_para_with_text(render_block)
if block['type'] == BlockType.CODE_CAPTION:
para_content[BlockType.CODE_CAPTION].append(merge_para_with_text(block))
if block['type'] == BlockType.CODE_FOOTNOTE:
para_content[BlockType.CODE_FOOTNOTE].append(merge_para_with_text(block))
page_width, page_height = page_size
para_bbox = para_block.get('bbox')
if para_bbox:
x0, y0, x1, y1 = para_bbox
para_content['bbox'] = [
int(x0 * 1000 / page_width),
int(y0 * 1000 / page_height),
int(x1 * 1000 / page_width),
int(y1 * 1000 / page_height),
]
if not para_content:
return None
bbox = _build_bbox(para_block.get('bbox'), page_size)
if bbox:
para_content['bbox'] = bbox
para_content['page_idx'] = page_idx
return para_content
def make_blocks_to_content_list_v2(para_block, img_buket_path, page_size):
para_type = para_block['type']
para_content = None
if para_type in [
BlockType.HEADER,
BlockType.FOOTER,
BlockType.ASIDE_TEXT,
BlockType.PAGE_NUMBER,
BlockType.PAGE_FOOTNOTE,
]:
if para_type == BlockType.HEADER:
content_type = ContentTypeV2.PAGE_HEADER
elif para_type == BlockType.FOOTER:
content_type = ContentTypeV2.PAGE_FOOTER
elif para_type == BlockType.ASIDE_TEXT:
content_type = ContentTypeV2.PAGE_ASIDE_TEXT
elif para_type == BlockType.PAGE_NUMBER:
content_type = ContentTypeV2.PAGE_NUMBER
elif para_type == BlockType.PAGE_FOOTNOTE:
content_type = ContentTypeV2.PAGE_FOOTNOTE
else:
raise ValueError(f"Unknown para_type: {para_type}")
para_content = {
'type': content_type,
'content': {
f"{content_type}_content": merge_para_with_text_v2(para_block),
},
}
elif para_type == BlockType.TITLE:
title_level = get_title_level(para_block)
if title_level != 0:
para_content = {
'type': ContentTypeV2.TITLE,
'content': {
'title_content': merge_para_with_text_v2(para_block),
'level': title_level,
},
}
else:
para_content = {
'type': ContentTypeV2.PARAGRAPH,
'content': {
'paragraph_content': merge_para_with_text_v2(para_block),
},
}
elif para_type in [
BlockType.TEXT,
BlockType.ABSTRACT,
]:
para_content = {
'type': ContentTypeV2.PARAGRAPH,
'content': {
'paragraph_content': merge_para_with_text_v2(para_block),
},
}
elif para_type == BlockType.INTERLINE_EQUATION:
image_path, math_content = _get_body_data(para_block)
para_content = {
'type': ContentTypeV2.EQUATION_INTERLINE,
'content': {
'math_content': math_content,
'math_type': 'latex',
'image_source': {'path': f"{img_buket_path}/{image_path}"},
},
}
elif para_type == BlockType.IMAGE:
image_caption = []
image_footnote = []
image_path, _ = _get_body_data(para_block)
for block in para_block.get('blocks', []):
if block['type'] == BlockType.IMAGE_CAPTION:
image_caption.extend(merge_para_with_text_v2(block))
if block['type'] == BlockType.IMAGE_FOOTNOTE:
image_footnote.extend(merge_para_with_text_v2(block))
para_content = {
'type': ContentTypeV2.IMAGE,
'content': {
'image_source': {'path': f"{img_buket_path}/{image_path}"},
'image_caption': image_caption,
'image_footnote': image_footnote,
},
}
elif para_type == BlockType.TABLE:
table_caption = []
table_footnote = []
image_path, html = _get_body_data(para_block)
table_html = _format_embedded_html(html, img_buket_path)
table_nest_level = 2 if table_html.count('<table') > 1 else 1
if 'colspan' in table_html or 'rowspan' in table_html or table_nest_level > 1:
table_type = ContentTypeV2.TABLE_COMPLEX
else:
table_type = ContentTypeV2.TABLE_SIMPLE
for block in para_block.get('blocks', []):
if block['type'] == BlockType.TABLE_CAPTION:
table_caption.extend(merge_para_with_text_v2(block))
if block['type'] == BlockType.TABLE_FOOTNOTE:
table_footnote.extend(merge_para_with_text_v2(block))
para_content = {
'type': ContentTypeV2.TABLE,
'content': {
'image_source': {'path': f"{img_buket_path}/{image_path}"},
'table_caption': table_caption,
'table_footnote': table_footnote,
'html': table_html,
'table_type': table_type,
'table_nest_level': table_nest_level,
},
}
elif para_type == BlockType.CHART:
chart_caption = []
chart_footnote = []
image_path, _ = _get_body_data(para_block)
for block in para_block.get('blocks', []):
if block['type'] == BlockType.CHART_CAPTION:
chart_caption.extend(merge_para_with_text_v2(block))
if block['type'] == BlockType.CHART_FOOTNOTE:
chart_footnote.extend(merge_para_with_text_v2(block))
para_content = {
'type': ContentTypeV2.CHART,
'content': {
'image_source': {'path': f"{img_buket_path}/{image_path}"},
'content': '',
'chart_caption': chart_caption,
'chart_footnote': chart_footnote,
},
}
elif para_type == BlockType.CODE:
code_caption = []
code_footnote = []
code_content = []
for block in para_block.get('blocks', []):
if block['type'] == BlockType.CODE_CAPTION:
code_caption.extend(merge_para_with_text_v2(block))
if block['type'] == BlockType.CODE_FOOTNOTE:
code_footnote.extend(merge_para_with_text_v2(block))
if block['type'] == BlockType.CODE_BODY:
code_content = merge_para_with_text_v2(block)
sub_type = para_block['sub_type']
if sub_type == BlockType.CODE:
para_content = {
'type': ContentTypeV2.CODE,
'content': {
'code_caption': code_caption,
'code_content': code_content,
'code_footnote': code_footnote,
'code_language': para_block.get('guess_lang', 'txt'),
},
}
elif sub_type == BlockType.ALGORITHM:
para_content = {
'type': ContentTypeV2.ALGORITHM,
'content': {
'algorithm_caption': code_caption,
'algorithm_content': code_content,
'algorithm_footnote': code_footnote,
},
}
else:
raise ValueError(f"Unknown code sub_type: {sub_type}")
elif para_type == BlockType.REF_TEXT:
list_items = []
for block in _get_ref_text_item_blocks(para_block):
item_content = merge_para_with_text_v2(block)
if item_content:
list_items.append({
'item_type': 'text',
'item_content': item_content,
})
para_content = {
'type': ContentTypeV2.LIST,
'content': {
'list_type': ContentTypeV2.LIST_REF,
'list_items': list_items,
},
}
elif para_type == BlockType.LIST:
list_items = []
for block in _split_list_item_blocks(para_block):
item_content = merge_para_with_text_v2(block)
if item_content:
list_items.append({
'item_type': 'text',
'item_content': item_content,
})
para_content = {
'type': ContentTypeV2.LIST,
'content': {
'list_type': ContentTypeV2.LIST_TEXT,
'attribute': para_block.get('attribute', 'unordered'),
'list_items': list_items,
},
}
elif para_type == BlockType.INDEX:
list_items = []
for block in _split_list_item_blocks(para_block):
item_content = merge_para_with_text_v2(block)
if item_content:
list_items.append({
'item_type': 'text',
'item_content': item_content,
})
para_content = {
'type': ContentTypeV2.INDEX,
'content': {
'list_type': ContentTypeV2.LIST_TEXT,
'list_items': list_items,
},
}
elif para_type == BlockType.SEAL:
seal_span = _get_seal_span(para_block)
if not seal_span:
return None
seal_text = _get_seal_text(para_block)
para_content = {
'type': ContentTypeV2.SEAL,
'content': {
'image_source': {
'path': f"{img_buket_path}/{seal_span.get('image_path', '')}",
},
'seal_content': (
[{'type': ContentTypeV2.SPAN_TEXT, 'content': seal_text}]
if seal_text else []
),
},
}
if not para_content:
return None
bbox = _build_bbox(para_block.get('bbox'), page_size)
if bbox:
para_content['bbox'] = bbox
return para_content
def union_make(pdf_info_dict: list,
make_mode: str,
img_buket_path: str = '',
@@ -502,17 +976,30 @@ def union_make(pdf_info_dict: list,
page_markdown = make_blocks_to_markdown(paras_of_layout, make_mode, img_buket_path)
output_content.extend(page_markdown)
elif make_mode == MakeMode.CONTENT_LIST:
para_blocks = (paras_of_layout or []) + (paras_of_discarded or [])
para_blocks = merge_adjacent_ref_text_blocks_for_content(
(paras_of_layout or []) + (paras_of_discarded or [])
)
if not para_blocks:
continue
for para_block in para_blocks:
para_content = make_blocks_to_content_list(para_block, img_buket_path, page_idx, page_size)
if para_content:
output_content.append(para_content)
elif make_mode == MakeMode.CONTENT_LIST_V2:
para_blocks = merge_adjacent_ref_text_blocks_for_content(
(paras_of_layout or []) + (paras_of_discarded or [])
)
page_contents = []
if para_blocks:
for para_block in para_blocks:
para_content = make_blocks_to_content_list_v2(para_block, img_buket_path, page_size)
if para_content:
page_contents.append(para_content)
output_content.append(page_contents)
if make_mode in [MakeMode.MM_MD, MakeMode.NLP_MD]:
return '\n\n'.join(output_content)
elif make_mode == MakeMode.CONTENT_LIST:
elif make_mode in [MakeMode.CONTENT_LIST, MakeMode.CONTENT_LIST_V2]:
return output_content
else:
logger.error(f"Unsupported make mode: {make_mode}")
@@ -521,9 +1008,7 @@ def union_make(pdf_info_dict: list,
def get_title_level(block):
title_level = block.get('level', 1)
if title_level > 4:
title_level = 4
elif title_level < 1:
if title_level < 1:
title_level = 0
return title_level

View File

@@ -260,9 +260,11 @@ def make_blocks_to_content_list(para_block, img_buket_path, page_idx, page_size)
para_content = {'type': BlockType.CODE, 'sub_type': para_block["sub_type"], BlockType.CODE_CAPTION: []}
for block in para_block['blocks']:
if block['type'] == BlockType.CODE_BODY:
para_content[BlockType.CODE_BODY] = merge_para_with_text(block)
if para_block["sub_type"] == BlockType.CODE:
para_content["guess_lang"] = para_block["guess_lang"]
code_text = merge_para_with_text(block)
if para_block['sub_type'] == BlockType.CODE:
guess_lang = para_block.get("guess_lang", "txt")
code_text = f"```{guess_lang}\n{code_text}\n```"
para_content[BlockType.CODE_BODY] = code_text
if block['type'] == BlockType.CODE_CAPTION:
para_content[BlockType.CODE_CAPTION].append(merge_para_with_text(block))

View File

@@ -172,17 +172,18 @@ def _process_output(
)
if f_dump_content_list:
content_list = make_func(pdf_info, MakeMode.CONTENT_LIST, image_dir)
md_writer.write_string(
f"{pdf_file_name}_content_list.json",
json.dumps(content_list, ensure_ascii=False, indent=4),
)
if process_mode != "pipeline":
content_list_v2 = make_func(pdf_info, MakeMode.CONTENT_LIST_V2, image_dir)
md_writer.write_string(
f"{pdf_file_name}_content_list_v2.json",
json.dumps(content_list_v2, ensure_ascii=False, indent=4),
)
content_list_v2 = make_func(pdf_info, MakeMode.CONTENT_LIST_V2, image_dir)
md_writer.write_string(
f"{pdf_file_name}_content_list_v2.json",
json.dumps(content_list_v2, ensure_ascii=False, indent=4),
)
if f_dump_middle_json:

View File

@@ -65,6 +65,7 @@ class ContentTypeV2:
ALGORITHM = "algorithm"
EQUATION_INTERLINE = 'equation_interline'
IMAGE = 'image'
SEAL = 'seal'
TABLE = 'table'
CHART = 'chart'
TABLE_SIMPLE = 'simple_table'

View File

@@ -1,113 +1,328 @@
# Copyright (c) Opendatalab. All rights reserved.
import os
import re
import threading
from io import BytesIO
import numpy as np
import pypdfium2 as pdfium
import pypdfium2.raw as pdfium_c
from loguru import logger
from pdfminer.high_level import extract_text
from pdfminer.pdfparser import PDFParser
from pdfminer.pdfdocument import PDFDocument
from pdfminer.pdfpage import PDFPage
from pdfminer.pdfinterp import PDFResourceManager
from pdfminer.pdfinterp import PDFPageInterpreter
from pdfminer.layout import LAParams, LTImage, LTFigure
from pypdf import PdfReader
from pdfminer.converter import PDFPageAggregator
from pdfminer.high_level import extract_text
from pdfminer.layout import LAParams, LTFigure, LTImage
from pdfminer.pdfdocument import PDFDocument
from pdfminer.pdfinterp import PDFPageInterpreter, PDFResourceManager
from pdfminer.pdfpage import PDFPage
from pdfminer.pdfparser import PDFParser
PDF_CLASSIFY_STRATEGY_ENV = "MINERU_PDF_CLASSIFY_STRATEGY"
PDF_CLASSIFY_STRATEGY_HYBRID = "hybrid"
PDF_CLASSIFY_STRATEGY_LEGACY = "legacy"
MAX_SAMPLE_PAGES = 10
CHARS_THRESHOLD = 50
HIGH_IMAGE_COVERAGE_THRESHOLD = 0.8
CID_RATIO_THRESHOLD = 0.05
TEXT_QUALITY_MIN_CHARS = 300
TEXT_QUALITY_BAD_THRESHOLD = 0.03
TEXT_QUALITY_GOOD_THRESHOLD = 0.005
_ALLOWED_CONTROL_CODES = {9, 10, 13}
_PRIVATE_USE_AREA_START = 0xE000
_PRIVATE_USE_AREA_END = 0xF8FF
_pdf_sample_extract_lock = threading.Lock()
def classify(pdf_bytes):
"""
判断PDF文件是可以直接提取文本还是需要OCR
Args:
pdf_bytes: PDF文件的字节数据
Classify a PDF as text-based or OCR-based.
Returns:
str: 'txt' 表示可以直接提取文本,'ocr' 表示需要OCR
"txt" if the PDF can be parsed as text, otherwise "ocr".
"""
# 从字节数据加载PDF
sample_pdf_bytes = extract_pages(pdf_bytes)
if not sample_pdf_bytes:
return 'ocr'
pdf = pdfium.PdfDocument(sample_pdf_bytes)
strategy = get_pdf_classify_strategy()
if strategy == PDF_CLASSIFY_STRATEGY_LEGACY:
return classify_legacy(pdf_bytes)
return classify_hybrid(pdf_bytes)
def get_pdf_classify_strategy() -> str:
strategy = os.getenv(
PDF_CLASSIFY_STRATEGY_ENV, PDF_CLASSIFY_STRATEGY_HYBRID
).strip().lower()
if strategy not in {
PDF_CLASSIFY_STRATEGY_HYBRID,
PDF_CLASSIFY_STRATEGY_LEGACY,
}:
logger.warning(
f"Invalid {PDF_CLASSIFY_STRATEGY_ENV} value: {strategy}, "
f"fall back to {PDF_CLASSIFY_STRATEGY_HYBRID}"
)
return PDF_CLASSIFY_STRATEGY_HYBRID
return strategy
def classify_hybrid(pdf_bytes):
"""
Fast PDF classification path.
The hybrid path uses pdfium + pypdf as the main path and falls back to
pdfminer only for gray-zone samples.
"""
pdf = None
page_indices = []
should_run_pdfminer_fallback = False
try:
# 获取PDF页数
pdf = pdfium.PdfDocument(pdf_bytes)
page_count = len(pdf)
# 如果PDF页数为0直接返回OCR
if page_count == 0:
return 'ocr'
return "ocr"
# 检查的页面数最多检查10页
pages_to_check = min(page_count, 10)
page_indices = get_sample_page_indices(page_count, MAX_SAMPLE_PAGES)
if not page_indices:
return "ocr"
# 设置阈值如果每页平均少于50个有效字符认为需要OCR
chars_threshold = 50
if (
get_avg_cleaned_chars_per_page_pdfium(pdf, page_indices)
< CHARS_THRESHOLD
):
return "ocr"
# 检查平均字符数和无效字符
if (get_avg_cleaned_chars_per_page(pdf, pages_to_check) < chars_threshold) or detect_invalid_chars(sample_pdf_bytes):
return 'ocr'
if detect_cid_font_signal_pypdf(pdf_bytes, page_indices):
return "ocr"
# 检查图像覆盖率
if get_high_image_coverage_ratio(sample_pdf_bytes, pages_to_check) >= 0.8:
return 'ocr'
text_quality_signal = get_text_quality_signal_pdfium(pdf, page_indices)
total_chars = text_quality_signal["total_chars"]
abnormal_ratio = text_quality_signal["abnormal_ratio"]
return 'txt'
if total_chars >= TEXT_QUALITY_MIN_CHARS:
if abnormal_ratio >= TEXT_QUALITY_BAD_THRESHOLD:
return "ocr"
should_run_pdfminer_fallback = abnormal_ratio > TEXT_QUALITY_GOOD_THRESHOLD
else:
should_run_pdfminer_fallback = True
if (
get_high_image_coverage_ratio_pdfium(pdf, page_indices)
>= HIGH_IMAGE_COVERAGE_THRESHOLD
):
return "ocr"
except Exception as e:
logger.error(f"判断PDF类型时出错: {e}")
# 出错时默认使用OCR
return 'ocr'
logger.error(f"Failed to classify PDF with hybrid strategy: {e}")
return "ocr"
finally:
if pdf is not None:
pdf.close()
if should_run_pdfminer_fallback:
sample_pdf_bytes = extract_selected_pages(pdf_bytes, page_indices)
if not sample_pdf_bytes:
return "ocr"
if detect_invalid_chars_pdfminer_fallback(sample_pdf_bytes):
return "ocr"
return "txt"
def classify_legacy(pdf_bytes):
"""
Legacy classification path kept for rollback and A/B comparison.
"""
sample_pdf_bytes = extract_pages(pdf_bytes)
if not sample_pdf_bytes:
return "ocr"
pdf = pdfium.PdfDocument(sample_pdf_bytes)
try:
page_count = len(pdf)
if page_count == 0:
return "ocr"
pages_to_check = min(page_count, MAX_SAMPLE_PAGES)
if (
get_avg_cleaned_chars_per_page(pdf, pages_to_check) < CHARS_THRESHOLD
) or detect_invalid_chars(sample_pdf_bytes):
return "ocr"
if (
get_high_image_coverage_ratio(sample_pdf_bytes, pages_to_check)
>= HIGH_IMAGE_COVERAGE_THRESHOLD
):
return "ocr"
return "txt"
except Exception as e:
logger.error(f"Failed to classify PDF with legacy strategy: {e}")
return "ocr"
finally:
# 无论执行哪个路径都确保PDF被关闭
pdf.close()
def get_sample_page_indices(page_count: int, max_pages: int = MAX_SAMPLE_PAGES):
if page_count <= 0 or max_pages <= 0:
return []
sample_count = min(page_count, max_pages)
if sample_count == page_count:
return list(range(page_count))
if sample_count == 1:
return [0]
indices = []
seen = set()
for i in range(sample_count):
page_index = round(i * (page_count - 1) / (sample_count - 1))
page_index = max(0, min(page_count - 1, page_index))
if page_index not in seen:
indices.append(page_index)
seen.add(page_index)
if len(indices) < sample_count:
for page_index in range(page_count):
if page_index in seen:
continue
indices.append(page_index)
seen.add(page_index)
if len(indices) == sample_count:
break
return sorted(indices)
def get_avg_cleaned_chars_per_page(pdf_doc, pages_to_check):
# 总字符数
total_chars = 0
# 清理后的总字符数
cleaned_total_chars = 0
# 检查前几页的文本
for i in range(pages_to_check):
page = pdf_doc[i]
text_page = page.get_textpage()
text = text_page.get_text_bounded()
total_chars += len(text)
# 清理提取的文本,移除空白字符
cleaned_text = re.sub(r'\s+', '', text)
cleaned_text = re.sub(r"\s+", "", text)
cleaned_total_chars += len(cleaned_text)
# 计算平均每页字符数
avg_cleaned_chars_per_page = cleaned_total_chars / pages_to_check
# logger.debug(f"PDF分析: 平均每页清理后{avg_cleaned_chars_per_page:.1f}字符")
return avg_cleaned_chars_per_page
def get_avg_cleaned_chars_per_page_pdfium(pdf_doc, page_indices):
cleaned_total_chars = 0
for page_index in page_indices:
page = pdf_doc[page_index]
text_page = page.get_textpage()
text = text_page.get_text_bounded()
cleaned_total_chars += len(re.sub(r"\s+", "", text))
if not page_indices:
return 0.0
return cleaned_total_chars / len(page_indices)
def get_text_quality_signal_pdfium(pdf_doc, page_indices):
total_chars = 0
null_char_count = 0
replacement_char_count = 0
control_char_count = 0
private_use_char_count = 0
for page_index in page_indices:
page = pdf_doc[page_index]
text_page = page.get_textpage()
char_count = text_page.count_chars()
total_chars += char_count
for char_index in range(char_count):
unicode_code = pdfium_c.FPDFText_GetUnicode(text_page, char_index)
if unicode_code == 0:
null_char_count += 1
elif unicode_code == 0xFFFD:
replacement_char_count += 1
elif unicode_code < 32 and unicode_code not in _ALLOWED_CONTROL_CODES:
control_char_count += 1
elif _PRIVATE_USE_AREA_START <= unicode_code <= _PRIVATE_USE_AREA_END:
private_use_char_count += 1
abnormal_chars = (
null_char_count
+ replacement_char_count
+ control_char_count
+ private_use_char_count
)
abnormal_ratio = 0.0
if total_chars > 0:
abnormal_ratio = abnormal_chars / total_chars
return {
"total_chars": total_chars,
"abnormal_ratio": abnormal_ratio,
"null_char_count": null_char_count,
"replacement_char_count": replacement_char_count,
"control_char_count": control_char_count,
"private_use_char_count": private_use_char_count,
}
def detect_cid_font_signal_pypdf(pdf_bytes, page_indices):
reader = PdfReader(BytesIO(pdf_bytes))
for page_index in page_indices:
page = reader.pages[page_index]
resources = _resolve_pdf_object(page.get("/Resources"))
if not resources:
continue
fonts = _resolve_pdf_object(resources.get("/Font"))
if not fonts:
continue
for _, font_ref in fonts.items():
font = _resolve_pdf_object(font_ref)
if not font:
continue
subtype = str(font.get("/Subtype"))
encoding = str(font.get("/Encoding"))
has_descendant_fonts = "/DescendantFonts" in font
has_to_unicode = "/ToUnicode" in font
if (
subtype == "/Type0"
and encoding in ("/Identity-H", "/Identity-V")
and has_descendant_fonts
and not has_to_unicode
):
return True
return False
def _resolve_pdf_object(obj):
if hasattr(obj, "get_object"):
return obj.get_object()
return obj
def get_high_image_coverage_ratio(sample_pdf_bytes, pages_to_check):
# 创建内存文件对象
pdf_stream = BytesIO(sample_pdf_bytes)
# 创建PDF解析器
parser = PDFParser(pdf_stream)
# 创建PDF文档对象
document = PDFDocument(parser)
# 检查文档是否允许文本提取
if not document.is_extractable:
# logger.warning("PDF不允许内容提取")
return 1.0 # 默认为高覆盖率,因为无法提取内容
return 1.0
# 创建资源管理器和参数对象
rsrcmgr = PDFResourceManager()
laparams = LAParams(
line_overlap=0.5,
@@ -118,115 +333,144 @@ def get_high_image_coverage_ratio(sample_pdf_bytes, pages_to_check):
detect_vertical=False,
all_texts=False,
)
# 创建聚合器
device = PDFPageAggregator(rsrcmgr, laparams=laparams)
# 创建解释器
interpreter = PDFPageInterpreter(rsrcmgr, device)
# 记录高图像覆盖率的页面数量
high_image_coverage_pages = 0
page_count = 0
# 遍历页面
for page in PDFPage.create_pages(document):
# 控制检查的页数
if page_count >= pages_to_check:
break
# 处理页面
interpreter.process_page(page)
layout = device.get_result()
# 页面尺寸
page_width = layout.width
page_height = layout.height
page_area = page_width * page_height
# 计算图像覆盖的总面积
image_area = 0
# 遍历页面元素
for element in layout:
# 检查是否为图像或图形元素
if isinstance(element, (LTImage, LTFigure)):
# 计算图像边界框面积
img_width = element.width
img_height = element.height
img_area = img_width * img_height
image_area += img_area
image_area += img_width * img_height
# 计算覆盖率
coverage_ratio = min(image_area / page_area, 1.0) if page_area > 0 else 0
# logger.debug(f"PDF分析: 页面 {page_count + 1} 图像覆盖率: {coverage_ratio:.2f}")
# 判断是否为高覆盖率
if coverage_ratio >= 0.8: # 使用80%作为高覆盖率的阈值
if coverage_ratio >= HIGH_IMAGE_COVERAGE_THRESHOLD:
high_image_coverage_pages += 1
page_count += 1
# 关闭资源
pdf_stream.close()
# 如果没有处理任何页面返回0
if page_count == 0:
return 0.0
# 计算高图像覆盖率的页面比例
high_coverage_ratio = high_image_coverage_pages / page_count
# logger.debug(f"PDF分析: 高图像覆盖页面比例: {high_coverage_ratio:.2f}")
return high_image_coverage_pages / page_count
return high_coverage_ratio
def get_high_image_coverage_ratio_pdfium(pdf_doc, page_indices):
high_image_coverage_pages = 0
for page_index in page_indices:
page = pdf_doc[page_index]
page_bbox = page.get_bbox()
page_area = abs(
(page_bbox[2] - page_bbox[0]) * (page_bbox[3] - page_bbox[1])
)
image_area = 0.0
for page_object in page.get_objects(
filter=[pdfium_c.FPDF_PAGEOBJ_IMAGE], max_depth=3
):
left, bottom, right, top = page_object.get_pos()
image_area += max(0.0, right - left) * max(0.0, top - bottom)
coverage_ratio = min(image_area / page_area, 1.0) if page_area > 0 else 0.0
if coverage_ratio >= HIGH_IMAGE_COVERAGE_THRESHOLD:
high_image_coverage_pages += 1
if not page_indices:
return 0.0
return high_image_coverage_pages / len(page_indices)
def extract_pages(src_pdf_bytes: bytes) -> bytes:
"""
从PDF字节数据中随机提取最多10页返回新的PDF字节数据
Args:
src_pdf_bytes: PDF文件的字节数据
Returns:
bytes: 提取页面后的PDF字节数据
Extract up to 10 random pages and return them as a new PDF.
"""
with _pdf_sample_extract_lock:
pdf = None
sample_docs = None
try:
# 从字节数据加载PDF
pdf = pdfium.PdfDocument(src_pdf_bytes)
# 获取PDF页数
total_page = len(pdf)
if total_page == 0:
# 如果PDF没有页面直接返回空文档
logger.warning("PDF is empty, return empty document")
return b''
return b""
# 小文档直接复用原始字节,避免无意义的 PDF 重写。
if total_page <= 10:
if total_page <= MAX_SAMPLE_PAGES:
return src_pdf_bytes
# 选择最多10页
select_page_cnt = min(10, total_page)
select_page_cnt = min(MAX_SAMPLE_PAGES, total_page)
page_indices = np.random.choice(
total_page, select_page_cnt, replace=False
).tolist()
# 从总页数中随机选择页面
page_indices = np.random.choice(total_page, select_page_cnt, replace=False).tolist()
# 创建一个新的PDF文档
sample_docs = pdfium.PdfDocument.new()
# 将选择的页面导入新文档
sample_docs.import_pages(pdf, page_indices)
# 将新PDF保存到内存缓冲区
output_buffer = BytesIO()
sample_docs.save(output_buffer)
return output_buffer.getvalue()
except Exception as e:
logger.exception(e)
return src_pdf_bytes
finally:
if pdf is not None:
pdf.close()
if sample_docs is not None:
sample_docs.close()
# 获取字节数据
def extract_selected_pages(src_pdf_bytes: bytes, page_indices) -> bytes:
"""
Extract specific pages and return them as a new PDF.
"""
selected_page_indices = sorted(set(page_indices))
if not selected_page_indices:
return b""
with _pdf_sample_extract_lock:
pdf = None
sample_docs = None
try:
pdf = pdfium.PdfDocument(src_pdf_bytes)
total_page = len(pdf)
if total_page == 0:
logger.warning("PDF is empty, return empty document")
return b""
selected_page_indices = [
page_index
for page_index in selected_page_indices
if 0 <= page_index < total_page
]
if not selected_page_indices:
return b""
if selected_page_indices == list(range(total_page)):
return src_pdf_bytes
sample_docs = pdfium.PdfDocument.new()
sample_docs.import_pages(pdf, selected_page_indices)
output_buffer = BytesIO()
sample_docs.save(output_buffer)
return output_buffer.getvalue()
except Exception as e:
logger.exception(e)
@@ -239,11 +483,10 @@ def extract_pages(src_pdf_bytes: bytes) -> bytes:
def detect_invalid_chars(sample_pdf_bytes: bytes) -> bool:
""""
检测PDF中是否包含非法字符
"""
'''pdfminer比较慢,需要先随机抽取10页左右的sample'''
# sample_pdf_bytes = extract_pages(src_pdf_bytes)
Detect whether a PDF contains invalid CID-style extracted text.
"""
sample_pdf_file_like_object = BytesIO(sample_pdf_bytes)
laparams = LAParams(
line_overlap=0.5,
@@ -256,26 +499,25 @@ def detect_invalid_chars(sample_pdf_bytes: bytes) -> bool:
)
text = extract_text(pdf_file=sample_pdf_file_like_object, laparams=laparams)
text = text.replace("\n", "")
# logger.info(text)
'''乱码文本用pdfminer提取出来的文本特征是(cid:xxx)'''
cid_pattern = re.compile(r'\(cid:\d+\)')
cid_pattern = re.compile(r"\(cid:\d+\)")
matches = cid_pattern.findall(text)
cid_count = len(matches)
cid_len = sum(len(match) for match in matches)
text_len = len(text)
if text_len == 0:
cid_chars_radio = 0
cid_chars_ratio = 0
else:
cid_chars_radio = cid_count/(cid_count + text_len - cid_len)
# logger.debug(f"cid_count: {cid_count}, text_len: {text_len}, cid_chars_radio: {cid_chars_radio}")
'''当一篇文章存在5%以上的文本是乱码时,认为该文档为乱码文档'''
if cid_chars_radio > 0.05:
return True # 乱码文档
else:
return False # 正常文档
cid_chars_ratio = cid_count / (cid_count + text_len - cid_len)
return cid_chars_ratio > CID_RATIO_THRESHOLD
if __name__ == '__main__':
with open('/Users/myhloli/pdf/luanma2x10.pdf', 'rb') as f:
def detect_invalid_chars_pdfminer_fallback(sample_pdf_bytes: bytes) -> bool:
return detect_invalid_chars(sample_pdf_bytes)
if __name__ == "__main__":
with open("/Users/myhloli/pdf/luanma2x10.pdf", "rb") as f:
p_bytes = f.read()
logger.info(f"PDF分类结果: {classify(p_bytes)}")
logger.info(f"PDF classify result: {classify(p_bytes)}")