Files
MinerU/mineru/backend/vlm/vlm_middle_json_mkcontent.py
2025-12-12 17:11:18 +08:00

560 lines
24 KiB
Python

import os
from loguru import logger
from mineru.utils.config_reader import get_latex_delimiter_config, get_formula_enable, get_table_enable
from mineru.utils.enum_class import MakeMode, BlockType, ContentType, ContentTypeV2
latex_delimiters_config = get_latex_delimiter_config()
default_delimiters = {
'display': {'left': '$$', 'right': '$$'},
'inline': {'left': '$', 'right': '$'}
}
delimiters = latex_delimiters_config if latex_delimiters_config else default_delimiters
display_left_delimiter = delimiters['display']['left']
display_right_delimiter = delimiters['display']['right']
inline_left_delimiter = delimiters['inline']['left']
inline_right_delimiter = delimiters['inline']['right']
def merge_para_with_text(para_block, formula_enable=True, img_buket_path=''):
para_text = ''
for line in para_block['lines']:
for j, span in enumerate(line['spans']):
span_type = span['type']
content = ''
if span_type == ContentType.TEXT:
content = span['content']
elif span_type == ContentType.INLINE_EQUATION:
content = f"{inline_left_delimiter}{span['content']}{inline_right_delimiter}"
elif span_type == ContentType.INTERLINE_EQUATION:
if formula_enable:
content = f"\n{display_left_delimiter}\n{span['content']}\n{display_right_delimiter}\n"
else:
if span.get('image_path', ''):
content = f"![]({img_buket_path}/{span['image_path']})"
# content = content.strip()
if content:
if span_type in [ContentType.TEXT, ContentType.INLINE_EQUATION]:
if j == len(line['spans']) - 1:
para_text += content
else:
para_text += f'{content} '
elif span_type == ContentType.INTERLINE_EQUATION:
para_text += content
return para_text
def mk_blocks_to_markdown(para_blocks, make_mode, formula_enable, table_enable, img_buket_path=''):
page_markdown = []
for para_block in para_blocks:
para_text = ''
para_type = para_block['type']
if para_type in [BlockType.TEXT, BlockType.INTERLINE_EQUATION, BlockType.PHONETIC, BlockType.REF_TEXT]:
para_text = merge_para_with_text(para_block, formula_enable=formula_enable, img_buket_path=img_buket_path)
elif para_type == BlockType.LIST:
for block in para_block['blocks']:
item_text = merge_para_with_text(block, formula_enable=formula_enable, img_buket_path=img_buket_path)
para_text += f"{item_text} \n"
elif para_type == BlockType.TITLE:
title_level = get_title_level(para_block)
para_text = f'{"#" * title_level} {merge_para_with_text(para_block)}'
elif para_type == BlockType.IMAGE:
if make_mode == MakeMode.NLP_MD:
continue
elif make_mode == MakeMode.MM_MD:
# 检测是否存在图片脚注
has_image_footnote = any(block['type'] == BlockType.IMAGE_FOOTNOTE for block in para_block['blocks'])
# 如果存在图片脚注,则将图片脚注拼接到图片正文后面
if has_image_footnote:
for block in para_block['blocks']: # 1st.拼image_caption
if block['type'] == BlockType.IMAGE_CAPTION:
para_text += merge_para_with_text(block) + ' \n'
for block in para_block['blocks']: # 2nd.拼image_body
if block['type'] == BlockType.IMAGE_BODY:
for line in block['lines']:
for span in line['spans']:
if span['type'] == ContentType.IMAGE:
if span.get('image_path', ''):
para_text += f"![]({img_buket_path}/{span['image_path']})"
for block in para_block['blocks']: # 3rd.拼image_footnote
if block['type'] == BlockType.IMAGE_FOOTNOTE:
para_text += ' \n' + merge_para_with_text(block)
else:
for block in para_block['blocks']: # 1st.拼image_body
if block['type'] == BlockType.IMAGE_BODY:
for line in block['lines']:
for span in line['spans']:
if span['type'] == ContentType.IMAGE:
if span.get('image_path', ''):
para_text += f"![]({img_buket_path}/{span['image_path']})"
for block in para_block['blocks']: # 2nd.拼image_caption
if block['type'] == BlockType.IMAGE_CAPTION:
para_text += ' \n' + merge_para_with_text(block)
elif para_type == BlockType.TABLE:
if make_mode == MakeMode.NLP_MD:
continue
elif make_mode == MakeMode.MM_MD:
for block in para_block['blocks']: # 1st.拼table_caption
if block['type'] == BlockType.TABLE_CAPTION:
para_text += merge_para_with_text(block) + ' \n'
for block in para_block['blocks']: # 2nd.拼table_body
if block['type'] == BlockType.TABLE_BODY:
for line in block['lines']:
for span in line['spans']:
if span['type'] == ContentType.TABLE:
# if processed by table model
if table_enable:
if span.get('html', ''):
para_text += f"\n{span['html']}\n"
elif span.get('image_path', ''):
para_text += f"![]({img_buket_path}/{span['image_path']})"
else:
if span.get('image_path', ''):
para_text += f"![]({img_buket_path}/{span['image_path']})"
for block in para_block['blocks']: # 3rd.拼table_footnote
if block['type'] == BlockType.TABLE_FOOTNOTE:
para_text += '\n' + merge_para_with_text(block) + ' '
elif para_type == BlockType.CODE:
sub_type = para_block["sub_type"]
for block in para_block['blocks']: # 1st.拼code_caption
if block['type'] == BlockType.CODE_CAPTION:
para_text += merge_para_with_text(block) + ' \n'
for block in para_block['blocks']: # 2nd.拼code_body
if block['type'] == BlockType.CODE_BODY:
if sub_type == BlockType.CODE:
guess_lang = para_block["guess_lang"]
para_text += f"```{guess_lang}\n{merge_para_with_text(block)}\n```"
elif sub_type == BlockType.ALGORITHM:
para_text += merge_para_with_text(block)
if para_text.strip() == '':
continue
else:
# page_markdown.append(para_text.strip() + ' ')
page_markdown.append(para_text.strip())
return page_markdown
def make_blocks_to_content_list(para_block, img_buket_path, page_idx, page_size):
para_type = para_block['type']
para_content = {}
if para_type in [
BlockType.TEXT,
BlockType.REF_TEXT,
BlockType.PHONETIC,
BlockType.HEADER,
BlockType.FOOTER,
BlockType.PAGE_NUMBER,
BlockType.ASIDE_TEXT,
BlockType.PAGE_FOOTNOTE,
]:
para_content = {
'type': para_type,
'text': merge_para_with_text(para_block),
}
elif para_type == BlockType.LIST:
para_content = {
'type': para_type,
'sub_type': para_block.get('sub_type', ''),
'list_items':[],
}
for block in para_block['blocks']:
item_text = merge_para_with_text(block)
if item_text.strip():
para_content['list_items'].append(item_text)
elif para_type == BlockType.TITLE:
title_level = get_title_level(para_block)
para_content = {
'type': ContentType.TEXT,
'text': merge_para_with_text(para_block),
}
if title_level != 0:
para_content['text_level'] = title_level
elif para_type == BlockType.INTERLINE_EQUATION:
para_content = {
'type': ContentType.EQUATION,
'text': merge_para_with_text(para_block),
'text_format': 'latex',
}
elif para_type == BlockType.IMAGE:
para_content = {'type': ContentType.IMAGE, 'img_path': '', BlockType.IMAGE_CAPTION: [], BlockType.IMAGE_FOOTNOTE: []}
for block in para_block['blocks']:
if block['type'] == BlockType.IMAGE_BODY:
for line in block['lines']:
for span in line['spans']:
if span['type'] == ContentType.IMAGE:
if span.get('image_path', ''):
para_content['img_path'] = f"{img_buket_path}/{span['image_path']}"
if block['type'] == BlockType.IMAGE_CAPTION:
para_content[BlockType.IMAGE_CAPTION].append(merge_para_with_text(block))
if block['type'] == BlockType.IMAGE_FOOTNOTE:
para_content[BlockType.IMAGE_FOOTNOTE].append(merge_para_with_text(block))
elif para_type == BlockType.TABLE:
para_content = {'type': ContentType.TABLE, 'img_path': '', BlockType.TABLE_CAPTION: [], BlockType.TABLE_FOOTNOTE: []}
for block in para_block['blocks']:
if block['type'] == BlockType.TABLE_BODY:
for line in block['lines']:
for span in line['spans']:
if span['type'] == ContentType.TABLE:
if span.get('html', ''):
para_content[BlockType.TABLE_BODY] = f"{span['html']}"
if span.get('image_path', ''):
para_content['img_path'] = f"{img_buket_path}/{span['image_path']}"
if block['type'] == BlockType.TABLE_CAPTION:
para_content[BlockType.TABLE_CAPTION].append(merge_para_with_text(block))
if block['type'] == BlockType.TABLE_FOOTNOTE:
para_content[BlockType.TABLE_FOOTNOTE].append(merge_para_with_text(block))
elif para_type == BlockType.CODE:
para_content = {'type': BlockType.CODE, 'sub_type': para_block["sub_type"], BlockType.CODE_CAPTION: []}
for block in para_block['blocks']:
if block['type'] == BlockType.CODE_BODY:
para_content[BlockType.CODE_BODY] = merge_para_with_text(block)
if para_block["sub_type"] == BlockType.CODE:
para_content["guess_lang"] = para_block["guess_lang"]
if block['type'] == BlockType.CODE_CAPTION:
para_content[BlockType.CODE_CAPTION].append(merge_para_with_text(block))
page_width, page_height = page_size
para_bbox = para_block.get('bbox')
if para_bbox:
x0, y0, x1, y1 = para_bbox
para_content['bbox'] = [
int(x0 * 1000 / page_width),
int(y0 * 1000 / page_height),
int(x1 * 1000 / page_width),
int(y1 * 1000 / page_height),
]
para_content['page_idx'] = page_idx
return para_content
def make_blocks_to_content_list_v2(para_block, img_buket_path, page_size):
para_type = para_block['type']
para_content = {}
if para_type in [
BlockType.HEADER,
BlockType.FOOTER,
BlockType.ASIDE_TEXT,
BlockType.PAGE_NUMBER,
BlockType.PAGE_FOOTNOTE,
]:
if para_type == BlockType.HEADER:
content_type = ContentTypeV2.PAGE_HEADER
elif para_type == BlockType.FOOTER:
content_type = ContentTypeV2.PAGE_FOOTER
elif para_type == BlockType.ASIDE_TEXT:
content_type = ContentTypeV2.PAGE_ASIDE_TEXT
elif para_type == BlockType.PAGE_NUMBER:
content_type = ContentTypeV2.PAGE_NUMBER
elif para_type == BlockType.PAGE_FOOTNOTE:
content_type = ContentTypeV2.PAGE_FOOTNOTE
else:
raise ValueError(f"Unknown para_type: {para_type}")
para_content = {
'type': content_type,
'content': {
f"{content_type}_content": merge_para_with_text_v2(para_block),
}
}
elif para_type == BlockType.TITLE:
title_level = get_title_level(para_block)
if title_level != 0:
para_content = {
'type': ContentTypeV2.TITLE,
'content': {
"title_content": merge_para_with_text_v2(para_block),
"level": title_level
}
}
else:
para_content = {
'type': ContentTypeV2.PARAGRAPH,
'content': {
"paragraph_content": merge_para_with_text_v2(para_block),
}
}
elif para_type in [
BlockType.TEXT,
BlockType.PHONETIC
]:
para_content = {
'type': ContentTypeV2.PARAGRAPH,
'content': {
'paragraph_content': merge_para_with_text_v2(para_block),
}
}
elif para_type == BlockType.INTERLINE_EQUATION:
image_path, math_content = get_body_data(para_block)
para_content = {
'type': ContentTypeV2.EQUATION_INTERLINE,
'content': {
'math_content': math_content,
'math_type': 'latex',
'image_source': {'path': f"{img_buket_path}/{image_path}"},
}
}
elif para_type == BlockType.IMAGE:
image_caption = []
image_footnote = []
image_path, _ = get_body_data(para_block)
image_source = {
'path': f"{img_buket_path}/{image_path}",
}
for block in para_block['blocks']:
if block['type'] == BlockType.IMAGE_CAPTION:
image_caption.extend(merge_para_with_text_v2(block))
if block['type'] == BlockType.IMAGE_FOOTNOTE:
image_footnote.extend(merge_para_with_text_v2(block))
para_content = {
'type': ContentTypeV2.IMAGE,
'content': {
'image_source': image_source,
'image_caption': image_caption,
'image_footnote': image_footnote,
}
}
elif para_type == BlockType.TABLE:
table_caption = []
table_footnote = []
image_path, html = get_body_data(para_block)
image_source = {
'path': f"{img_buket_path}/{image_path}",
}
if html.count("<table") > 1:
table_nest_level = 2
else:
table_nest_level = 1
if (
"colspan" in html or
"rowspan" in html or
table_nest_level > 1
):
table_type = ContentTypeV2.TABLE_COMPLEX
else:
table_type = ContentTypeV2.TABLE_SIMPLE
for block in para_block['blocks']:
if block['type'] == BlockType.TABLE_CAPTION:
table_caption.extend(merge_para_with_text_v2(block))
if block['type'] == BlockType.TABLE_FOOTNOTE:
table_footnote.extend(merge_para_with_text_v2(block))
para_content = {
'type': ContentTypeV2.TABLE,
'content': {
'image_source': image_source,
'table_caption': table_caption,
'table_footnote': table_footnote,
'html': html,
'table_type': table_type,
'table_nest_level': table_nest_level,
}
}
elif para_type == BlockType.CODE:
code_caption = []
code_content = []
for block in para_block['blocks']:
if block['type'] == BlockType.CODE_CAPTION:
code_caption.extend(merge_para_with_text_v2(block))
if block['type'] == BlockType.CODE_BODY:
code_content = merge_para_with_text_v2(block)
sub_type = para_block["sub_type"]
if sub_type == BlockType.CODE:
para_content = {
'type': ContentTypeV2.CODE,
'content': {
'code_caption': code_caption,
'code_content': code_content,
'code_language': para_block.get('guess_lang', 'txt'),
}
}
elif sub_type == BlockType.ALGORITHM:
para_content = {
'type': ContentTypeV2.ALGORITHM,
'content': {
'algorithm_caption': code_caption,
'algorithm_content': code_content,
}
}
else:
raise ValueError(f"Unknown code sub_type: {sub_type}")
elif para_type == BlockType.REF_TEXT:
para_content = {
'type': ContentTypeV2.LIST,
'content': {
'list_type': ContentTypeV2.LIST_REF,
'list_items': [
{
'item_type': 'text',
'item_content': merge_para_with_text_v2(para_block),
}
],
}
}
elif para_type == BlockType.LIST:
if 'sub_type' in para_block:
if para_block['sub_type'] == BlockType.REF_TEXT:
list_type = ContentTypeV2.LIST_REF
elif para_block['sub_type'] == BlockType.TEXT:
list_type = ContentTypeV2.LIST_TEXT
else:
raise ValueError(f"Unknown list sub_type: {para_block['sub_type']}")
else:
list_type = ContentTypeV2.LIST_TEXT
list_items = []
for block in para_block['blocks']:
item_content = merge_para_with_text_v2(block)
if item_content:
list_items.append({
'item_type': 'text',
'item_content': item_content,
})
para_content = {
'type': ContentTypeV2.LIST,
'content': {
'list_type': list_type,
'list_items': list_items,
}
}
page_width, page_height = page_size
para_bbox = para_block.get('bbox')
if para_bbox:
x0, y0, x1, y1 = para_bbox
para_content['bbox'] = [
int(x0 * 1000 / page_width),
int(y0 * 1000 / page_height),
int(x1 * 1000 / page_width),
int(y1 * 1000 / page_height),
]
return para_content
def get_body_data(para_block):
"""
Extract image_path and html from para_block
Returns:
- For IMAGE/INTERLINE_EQUATION: (image_path, '')
- For TABLE: (image_path, html)
- Default: ('', '')
"""
def get_data_from_spans(lines):
for line in lines:
for span in line.get('spans', []):
span_type = span.get('type')
if span_type == ContentType.TABLE:
return span.get('image_path', ''), span.get('html', '')
elif span_type == ContentType.IMAGE:
return span.get('image_path', ''), ''
elif span_type == ContentType.INTERLINE_EQUATION:
return span.get('image_path', ''), span.get('content', '')
elif span_type == ContentType.TEXT:
return '', span.get('content', '')
return '', ''
# 处理嵌套的 blocks 结构
if 'blocks' in para_block:
for block in para_block['blocks']:
block_type = block.get('type')
if block_type in [BlockType.IMAGE_BODY, BlockType.TABLE_BODY, BlockType.CODE_BODY]:
result = get_data_from_spans(block.get('lines', []))
if result != ('', ''):
return result
return '', ''
# 处理直接包含 lines 的结构
return get_data_from_spans(para_block.get('lines', []))
def merge_para_with_text_v2(para_block):
para_content = []
para_type = para_block['type']
for line in para_block['lines']:
for span in line['spans']:
span_type = span['type']
if span['content']:
if para_type == BlockType.PHONETIC and span_type == ContentTypeV2.SPAN_TEXT:
span_type = ContentTypeV2.SPAN_PHONETIC
if span_type == ContentType.INLINE_EQUATION:
span_type = ContentTypeV2.SPAN_EQUATION_INLINE
if span_type in [
ContentTypeV2.SPAN_TEXT,
ContentTypeV2.SPAN_PHONETIC,
ContentTypeV2.SPAN_EQUATION_INLINE,
ContentTypeV2.SPAN_MD,
ContentTypeV2.SPAN_CODE_INLINE,
]:
span_content = {
'type': span_type,
'content': span['content'],
}
para_content.append(span_content)
else:
logger.warning(f"Unknown span type in merge_para_with_text_v2: {span_type}")
return para_content
def union_make(pdf_info_dict: list,
make_mode: str,
img_buket_path: str = '',
):
formula_enable = get_formula_enable(os.getenv('MINERU_VLM_FORMULA_ENABLE', 'True').lower() == 'true')
table_enable = get_table_enable(os.getenv('MINERU_VLM_TABLE_ENABLE', 'True').lower() == 'true')
output_content = []
for page_info in pdf_info_dict:
paras_of_layout = page_info.get('para_blocks')
paras_of_discarded = page_info.get('discarded_blocks')
page_idx = page_info.get('page_idx')
page_size = page_info.get('page_size')
if make_mode in [MakeMode.MM_MD, MakeMode.NLP_MD]:
if not paras_of_layout:
continue
page_markdown = mk_blocks_to_markdown(paras_of_layout, make_mode, formula_enable, table_enable, img_buket_path)
output_content.extend(page_markdown)
elif make_mode == MakeMode.CONTENT_LIST:
para_blocks = (paras_of_layout or []) + (paras_of_discarded or [])
if not para_blocks:
continue
for para_block in para_blocks:
para_content = make_blocks_to_content_list(para_block, img_buket_path, page_idx, page_size)
output_content.append(para_content)
elif make_mode == MakeMode.CONTENT_LIST_V2:
# https://github.com/drunkpig/llm-webkit-mirror/blob/dev6/docs/specification/output_format/content_list_spec.md
page_contents = []
para_blocks = (paras_of_layout or []) + (paras_of_discarded or [])
if not para_blocks:
continue
for para_block in para_blocks:
para_content = make_blocks_to_content_list_v2(para_block, img_buket_path, page_size)
page_contents.append(para_content)
output_content.append(page_contents)
if make_mode in [MakeMode.MM_MD, MakeMode.NLP_MD]:
return '\n\n'.join(output_content)
elif make_mode in [MakeMode.CONTENT_LIST, MakeMode.CONTENT_LIST_V2]:
return output_content
return None
def get_title_level(block):
title_level = block.get('level', 1)
if title_level > 4:
title_level = 4
elif title_level < 1:
title_level = 0
return title_level