Compare commits

...

33 Commits

Author SHA1 Message Date
kernel.h@qq.com
df9d43b851 delete 无用字段 2024-03-27 18:59:02 +08:00
kernel.h@qq.com
5db910260e add __init__.py 2024-03-27 15:11:19 +08:00
kernel.h@qq.com
433684c646 实现多模态markdown拼装 2024-03-27 14:46:56 +08:00
liusilu
fffee0ae97 Merge branch 'master' of https://github.com/myhloli/Magic-PDF 2024-03-27 10:03:46 +08:00
liusilu
e73606250e add pdf tools 2024-03-27 10:03:20 +08:00
kernel.h@qq.com
7162debc38 实现文本拼PDF解析结果装标准格式 2024-03-26 21:19:19 +08:00
赵小蒙
a343175d66 恢复pipeline 2024-03-26 18:10:53 +08:00
赵小蒙
671ce1d97c Merge remote-tracking branch 'origin/master' 2024-03-26 16:52:57 +08:00
赵小蒙
6f80beaa31 原pipeline拆分 2024-03-26 16:51:58 +08:00
许瑞
cb1b02e716 feat: disable auto include table title 2024-03-26 16:46:05 +08:00
赵小蒙
8ebb79a43a standard_format dump逻辑更新 2024-03-26 16:37:38 +08:00
赵小蒙
154eed1ade footnote drop逻辑更新 2024-03-26 16:37:07 +08:00
赵小蒙
b7652171ea make_standard_format_with_para逻辑更新 2024-03-26 16:36:45 +08:00
许瑞
f0c463ed6d Merge branch 'master' of https://github.com/myhloli/Magic-PDF 2024-03-26 10:17:05 +08:00
赵小蒙
3d2fcc9dce 删除无用代码 2024-03-25 19:10:22 +08:00
赵小蒙
d3c9cb84f8 分段部分log限定在debug模式下才能输出 2024-03-25 17:07:51 +08:00
赵小蒙
8c089976ed 更新注释 2024-03-25 16:09:29 +08:00
赵小蒙
473a0a7de0 拼接markdown时,如果para_text为空则跳过拼接 2024-03-25 14:26:15 +08:00
kernel.h@qq.com
61c970f7da 修复list index错误 2024-03-25 13:24:48 +08:00
赵小蒙
d3ee9abbab 更新ocr_mk_mm_markdown_with_para_core逻辑 2024-03-24 20:39:48 +08:00
赵小蒙
07e4f115e6 ocr_pdf_intermediate_dict_to_markdown_with_para输出nlp格式的markdown 2024-03-24 19:40:41 +08:00
赵小蒙
bf8d8e217d 新增ocr_mk_nlp_markdown_with_para方法 2024-03-24 19:31:58 +08:00
kernel.h@qq.com
744b3f75eb 合并居中显示、想同行高的文字 2024-03-23 19:32:02 +08:00
kernel.h@qq.com
2e772467ee 连接跨页的list 2024-03-23 16:19:23 +08:00
许瑞
efed5faa53 feat: modify foot note bbox tmp 2024-03-23 14:34:25 +08:00
xu rui
05161c6e62 feat: backup footnote_bbox_tmp 2024-03-23 14:11:50 +08:00
xu rui
15c8830416 feat: comment parse_title 2024-03-23 13:15:32 +08:00
xu rui
432e1ae5e3 feat: process title and footnote 2024-03-22 18:11:44 +08:00
kernel.h@qq.com
e3e125baef 跨layout的列表合并 2024-03-22 16:35:04 +08:00
赵小蒙
2277e31ff4 ocr_demo main函数精简 2024-03-22 16:33:54 +08:00
赵小蒙
7d010e1969 ocr_mk_mm_markdown_with_para和ocr_mk_mm_markdown_with_para_and_pagination逻辑优化 2024-03-22 16:11:55 +08:00
赵小蒙
dbe79ba1b2 ocr_mk_mm_markdown_with_para_and_pagination逻辑更新 2024-03-22 15:41:59 +08:00
kernel.h@qq.com
f36c26565e 使用面积占比方式判断一行文本是不是在一个layoutbox里 2024-03-22 14:48:42 +08:00
13 changed files with 823 additions and 517 deletions

View File

@@ -6,7 +6,14 @@ from pathlib import Path
from app.common.s3 import get_s3_config
from demo.demo_test import get_json_from_local_or_s3
from magic_pdf.dict2md.ocr_mkcontent import ocr_mk_mm_markdown_with_para, ocr_mk_nlp_markdown, ocr_mk_mm_markdown, ocr_mk_mm_standard_format
from magic_pdf.dict2md.ocr_mkcontent import (
ocr_mk_mm_markdown_with_para,
ocr_mk_nlp_markdown,
ocr_mk_mm_markdown,
ocr_mk_mm_standard_format,
ocr_mk_mm_markdown_with_para_and_pagination,
make_standard_format_with_para
)
from magic_pdf.libs.commons import join_path
from magic_pdf.pdf_parse_by_ocr import parse_pdf_by_ocr
@@ -47,7 +54,7 @@ def ocr_online_parse(book_name, start_page_id=0, debug_mode=True):
# logger.info(json_object)
s3_pdf_path = json_object["file_location"]
s3_config = get_s3_config(s3_pdf_path)
ocr_pdf_model_info = json_object["doc_layout_result"]
ocr_pdf_model_info = json_object.get("doc_layout_result")
ocr_parse_core(book_name, s3_pdf_path, ocr_pdf_model_info, s3_config=s3_config)
except Exception as e:
logger.exception(e)
@@ -72,25 +79,23 @@ def ocr_parse_core(book_name, ocr_pdf_path, ocr_pdf_model_info, start_page_id=0,
# markdown_content = mk_nlp_markdown(pdf_info_dict)
markdown_content = ocr_mk_mm_markdown_with_para(pdf_info_dict)
# markdown_pagination = ocr_mk_mm_markdown_with_para_and_pagination(pdf_info_dict)
with open(text_content_save_path, "w", encoding="utf-8") as f:
f.write(markdown_content)
standard_format = ocr_mk_mm_standard_format(pdf_info_dict)
standard_format = make_standard_format_with_para(pdf_info_dict)
standard_format_save_path = f"{save_path_with_bookname}/standard_format.txt"
with open(standard_format_save_path, "w", encoding="utf-8") as f:
f.write(str(standard_format))
# 将standard_format dump成json文本并保存
f.write(json.dumps(standard_format, ensure_ascii=False))
if __name__ == '__main__':
#ocr_pdf_path = r"D:\project\20231108code-clean\ocr\new\双栏\s0043-1354(02)00581-x.pdf"
#ocr_json_file_path = r"D:\project\20231108code-clean\ocr\new\双栏\s0043-1354(02)00581-x.json"
# ocr_pdf_path = r"D:\project\20231108code-clean\ocr\new\双栏\j.1540-627x.2006.00176.x.pdf"
# ocr_json_file_path = r"D:\project\20231108code-clean\ocr\new\双栏\j.1540-627x.2006.00176.x.json"
pdf_path = r"/home/cxu/workspace/Magic-PDF/ocr_demo/j.1540-627x.2006.00176.x.pdf"
json_file_path = r"/home/cxu/workspace/Magic-PDF/ocr_demo/j.1540-627x.2006.00176.x.json"
# ocr_local_parse(pdf_path, json_file_path)
book_name = "科数网/edu_00011318"
ocr_online_parse(book_name)
ocr_pdf_path = r"/home/cxu/workspace/Magic-PDF/ocr_demo/j.1540-627x.2006.00176.x.pdf"
ocr_json_file_path = r"/home/cxu/workspace/Magic-PDF/ocr_demo/j.1540-627x.2006.00176.x.json"
# ocr_pdf_path = r"/home/cxu/workspace/Magic-PDF/ocr_demo/ocr_1.pdf"
# ocr_json_file_path = r"/home/cxu/workspace/Magic-PDF/ocr_demo/ocr_1.json"
#ocr_local_parse(ocr_pdf_path, ocr_json_file_path)
ocr_online_parse(book_name="美国加州中学教材/edu_00000060")
pass

View File

@@ -71,69 +71,96 @@ def ocr_mk_mm_markdown(pdf_info_dict: dict):
def ocr_mk_mm_markdown_with_para(pdf_info_dict: dict):
markdown = []
for _, page_info in pdf_info_dict.items():
paras = page_info.get("para_blocks")
if not paras:
continue
for para in paras:
para_text = ''
for line in para:
for span in line['spans']:
span_type = span.get('type')
if span_type == ContentType.Text:
content = split_long_words(span['content'])
# content = span['content']
elif span_type == ContentType.InlineEquation:
content = f"${span['content']}$"
elif span_type == ContentType.InterlineEquation:
content = f"\n$$\n{span['content']}\n$$\n"
elif span_type in [ContentType.Image, ContentType.Table]:
content = f"\n![]({join_path(s3_image_save_path, span['image_path'])})\n"
para_text += content + ' '
markdown.append(para_text.strip() + ' ')
paras_of_layout = page_info.get("para_blocks")
page_markdown = ocr_mk_mm_markdown_with_para_core(paras_of_layout, "mm")
markdown.extend(page_markdown)
return '\n\n'.join(markdown)
def ocr_mk_nlp_markdown_with_para(pdf_info_dict: dict):
markdown = []
for _, page_info in pdf_info_dict.items():
paras_of_layout = page_info.get("para_blocks")
page_markdown = ocr_mk_mm_markdown_with_para_core(paras_of_layout, "nlp")
markdown.extend(page_markdown)
return '\n\n'.join(markdown)
def ocr_mk_mm_markdown_with_para_and_pagination(pdf_info_dict: dict):
markdown_with_para_and_pagination = []
for page_no, page_info in pdf_info_dict.items():
page_markdown = []
paras = page_info.get("para_blocks")
if not paras:
paras_of_layout = page_info.get("para_blocks")
if not paras_of_layout:
continue
page_markdown = ocr_mk_mm_markdown_with_para_core(paras_of_layout, "mm")
markdown_with_para_and_pagination.append({
'page_no': page_no,
'md_content': '\n\n'.join(page_markdown)
})
return markdown_with_para_and_pagination
def ocr_mk_mm_markdown_with_para_core(paras_of_layout, mode):
page_markdown = []
for paras in paras_of_layout:
for para in paras:
para_text = ''
for line in para:
for span in line['spans']:
span_type = span.get('type')
content = ''
if span_type == ContentType.Text:
content = split_long_words(span['content'])
# content = span['content']
content = ocr_escape_special_markdown_char(split_long_words(span['content']))
elif span_type == ContentType.InlineEquation:
content = f"${span['content']}$"
content = f"${ocr_escape_special_markdown_char(span['content'])}$"
elif span_type == ContentType.InterlineEquation:
content = f"\n$$\n{span['content']}\n$$\n"
content = f"\n$$\n{ocr_escape_special_markdown_char(span['content'])}\n$$\n"
elif span_type in [ContentType.Image, ContentType.Table]:
content = f"\n![]({join_path(s3_image_save_path, span['image_path'])})\n"
para_text += content + ' '
page_markdown.append(para_text.strip() + ' ')
markdown_with_para_and_pagination.append({
'page_no': page_no,
'md': '\n\n'.join(page_markdown)
})
return markdown_with_para_and_pagination
if mode == 'mm':
content = f"\n![]({join_path(s3_image_save_path, span['image_path'])})\n"
elif mode == 'nlp':
pass
if content != '':
para_text += content + ' '
if para_text.strip() == '':
continue
else:
page_markdown.append(para_text.strip() + ' ')
return page_markdown
def para_to_standard_format(para):
para_content = {}
if len(para) == 1:
para_content = line_to_standard_format(para[0])
elif len(para) > 1:
para_text = ''
inline_equation_num = 0
for line in para:
for span in line['spans']:
span_type = span.get('type')
if span_type == ContentType.Text:
content = ocr_escape_special_markdown_char(split_long_words(span['content']))
elif span_type == ContentType.InlineEquation:
content = f"${ocr_escape_special_markdown_char(span['content'])}$"
inline_equation_num += 1
para_text += content + ' '
para_content = {
'type': 'text',
'text': para_text,
'inline_equation_num': inline_equation_num
}
return para_content
def make_standard_format_with_para(pdf_info_dict: dict):
content_list = []
for _, page_info in pdf_info_dict.items():
paras = page_info.get("para_blocks")
if not paras:
paras_of_layout = page_info.get("para_blocks")
if not paras_of_layout:
continue
for para in paras:
for line in para:
content = line_to_standard_format(line)
content_list.append(content)
for paras in paras_of_layout:
for para in paras:
para_content = para_to_standard_format(para)
content_list.append(para_content)
return content_list

View File

@@ -18,6 +18,33 @@ def _is_in_or_part_overlap(box1, box2) -> bool:
y1_1 < y0_2 or # box1在box2的上边
y0_1 > y1_2) # box1在box2的下边
def _is_in_or_part_overlap_with_area_ratio(box1, box2, area_ratio_threshold=0.6):
"""
判断box1是否在box2里面或者box1和box2有部分重叠且重叠面积占box1的比例超过area_ratio_threshold
"""
if box1 is None or box2 is None:
return False
x0_1, y0_1, x1_1, y1_1 = box1
x0_2, y0_2, x1_2, y1_2 = box2
if not _is_in_or_part_overlap(box1, box2):
return False
# 计算重叠面积
x_left = max(x0_1, x0_2)
y_top = max(y0_1, y0_2)
x_right = min(x1_1, x1_2)
y_bottom = min(y1_1, y1_2)
overlap_area = (x_right - x_left) * (y_bottom - y_top)
# 计算box1的面积
box1_area = (x1_1 - x0_1) * (y1_1 - y0_1)
return overlap_area / box1_area > area_ratio_threshold
def _is_in(box1, box2) -> bool:
"""
box1是否完全在box2里面

View File

@@ -2,14 +2,14 @@ from sklearn.cluster import DBSCAN
import numpy as np
from loguru import logger
from magic_pdf.libs.boxbase import _is_in_or_part_overlap
from magic_pdf.libs.boxbase import _is_in_or_part_overlap_with_area_ratio as is_in_layout
from magic_pdf.libs.ocr_content_type import ContentType
LINE_STOP_FLAG = ['.', '!', '?', '', '', '',"", ":", ")", "", ";"]
INLINE_EQUATION = ContentType.InlineEquation
INTERLINE_EQUATION = ContentType.InterlineEquation
TEXT = "text"
TEXT = ContentType.Text
def __get_span_text(span):
@@ -19,46 +19,8 @@ def __get_span_text(span):
return c
def __add_line_period(blocks, layout_bboxes):
"""
为每行添加句号
如果这个行
1. 以行内公式结尾,但没有任何标点符号,此时加个句号,认为他就是段落结尾。
"""
for block in blocks:
for line in block['lines']:
last_span = line['spans'][-1]
span_type = last_span['type']
if span_type in [INLINE_EQUATION]:
span_content = last_span['content'].strip()
if span_type==INLINE_EQUATION and span_content[-1] not in LINE_STOP_FLAG:
if span_type in [INLINE_EQUATION, INTERLINE_EQUATION]:
last_span['content'] = span_content + '.'
def __detect_line_align_direction(line, new_layout_bboxes):
"""
探测line是左对齐还是右对齐还是居中。
"""
lbox = line['bbox']
x0, x1 = lbox[0], lbox[2]
layout_x0, layout_x1 = new_layout_bboxes[0], new_layout_bboxes[2]
if x0 <= layout_x0 and x1 < layout_x1:
return "left"
elif x0 > layout_x0 and x1 >= layout_x1:
return "right"
else:
return "center"
def __detect_line_group_align_direction(lines, new_layout_bboxes):
"""
首先把lines按照行距离分成几部分。针对每一部分分别探测。
最后返回[(dir, lines), (dir, lines), ...]
"""
pass
def __detect_list_lines(lines, new_layout_bboxes, lang='en'):
def __detect_list_lines(lines, new_layout_bboxes, lang):
"""
探测是否包含了列表,并且把列表的行分开.
这样的段落特点是,顶格字母大写/数字,紧跟着几行缩进的。缩进的行首字母含小写的。
@@ -166,7 +128,7 @@ def __valign_lines(blocks, layout_bboxes):
new_layout_bboxes = []
for layout_box in layout_bboxes:
blocks_in_layoutbox = [b for b in blocks if _is_in_or_part_overlap(b['bbox'], layout_box['layout_bbox'])]
blocks_in_layoutbox = [b for b in blocks if is_in_layout(b['bbox'], layout_box['layout_bbox'])]
if len(blocks_in_layoutbox)==0:
continue
@@ -252,7 +214,7 @@ def __group_line_by_layout(blocks, layout_bboxes, lang="en"):
lines_group = []
for lyout in layout_bboxes:
lines = [line for block in blocks if _is_in_or_part_overlap(block['bbox'], lyout['layout_bbox']) for line in block['lines']]
lines = [line for block in blocks if is_in_layout(block['bbox'], lyout['layout_bbox']) for line in block['lines']]
lines_group.append(lines)
return lines_group
@@ -267,12 +229,19 @@ def __split_para_in_layoutbox(lines_group, new_layout_bbox, lang="en", char_avg_
且下一行开头不留空白。
"""
line_group_end_with_list = [] # 这个layout最后是不是列表用于跨layout列表合并
paras = []
list_info = [] # 这个layout最后是不是列表,记录每一个layout里是不是列表开头列表结尾
layout_paras = []
right_tail_distance = 1.5 * char_avg_len
for lines in lines_group:
paras = []
total_lines = len(lines)
if total_lines<=1: # 0行无需处理。1行无法分段。
if total_lines==0:
continue # 0行无需处理
if total_lines==1: # 1行无法分段。
layout_paras.append([lines])
list_info.append([False, False])
continue
"""在进入到真正的分段之前,要对文字块从统计维度进行对齐方式的探测,
@@ -292,7 +261,7 @@ def __split_para_in_layoutbox(lines_group, new_layout_bbox, lang="en", char_avg_
layout_right = __find_layout_bbox_by_line(lines[0]['bbox'], new_layout_bbox)[2]
layout_left = __find_layout_bbox_by_line(lines[0]['bbox'], new_layout_bbox)[0]
para = [] # 元素是line
is_lines_end_with_list = False
layout_list_info = [False, False] # 这个layout最后是不是列表,记录每一个layout里是不是列表开头列表结尾
for content_type, start, end in text_segments:
if content_type == 'list':
for i, line in enumerate(lines[start:end+1]):
@@ -307,7 +276,10 @@ def __split_para_in_layoutbox(lines_group, new_layout_bbox, lang="en", char_avg_
if len(para)>0:
paras.append(para)
para = []
is_lines_end_with_list = True
if start==0:
layout_list_info[0] = True
if end==total_lines-1:
layout_list_info[1] = True
else:
for i, line in enumerate(lines[start:end+1]):
# 如果i有下一行那么就要根据下一行位置综合判断是否要分段。如果i之后没有行那么只需要判断一下行结尾特征。
@@ -335,12 +307,78 @@ def __split_para_in_layoutbox(lines_group, new_layout_bbox, lang="en", char_avg_
if len(para)>0:
paras.append(para)
para = []
is_lines_end_with_list = False
line_group_end_with_list.append(is_lines_end_with_list)
list_info.append(layout_list_info)
layout_paras.append(paras)
paras = []
return paras, line_group_end_with_list
return layout_paras, list_info
def __connect_list_inter_layout(layout_paras, new_layout_bbox, layout_list_info, page_num, lang):
"""
如果上个layout的最后一个段落是列表下一个layout的第一个段落也是列表那么将他们连接起来。 TODO 因为没有区分列表和段落,所以这个方法暂时不实现。
根据layout_list_info判断是不是列表。下个layout的第一个段如果不是列表那么看他们是否有几行都有相同的缩进。
"""
if len(layout_paras)==0 or len(layout_list_info)==0: # 0的时候最后的return 会出错
return layout_paras, [False, False]
for i in range(1, len(layout_paras)):
pre_layout_list_info = layout_list_info[i-1]
next_layout_list_info = layout_list_info[i]
pre_last_para = layout_paras[i-1][-1]
next_paras = layout_paras[i]
next_first_para = next_paras[0]
if pre_layout_list_info[1] and not next_layout_list_info[0]: # 前一个是列表结尾,后一个是非列表开头,此时检测是否有相同的缩进
logger.info(f"连接page {page_num} 内的list")
# 向layout_paras[i] 寻找开头具有相同缩进的连续的行
may_list_lines = []
for j in range(len(next_paras)):
line = next_paras[j]
if len(line)==1: # 只可能是一行,多行情况再需要分析了
if line[0]['bbox'][0] > __find_layout_bbox_by_line(line[0]['bbox'], new_layout_bbox)[0]:
may_list_lines.append(line[0])
else:
break
else:
break
# 如果这些行的缩进是相等的那么连到上一个layout的最后一个段落上。
if len(may_list_lines)>0 and len(set([x['bbox'][0] for x in may_list_lines]))==1:
pre_last_para.extend(may_list_lines)
layout_paras[i] = layout_paras[i][len(may_list_lines):]
return layout_paras, [layout_list_info[0][0], layout_list_info[-1][1]] # 同时还返回了这个页面级别的开头、结尾是不是列表的信息
def __connect_list_inter_page(pre_page_paras, next_page_paras, pre_page_layout_bbox, next_page_layout_bbox, pre_page_list_info, next_page_list_info, page_num, lang):
"""
如果上个layout的最后一个段落是列表下一个layout的第一个段落也是列表那么将他们连接起来。 TODO 因为没有区分列表和段落,所以这个方法暂时不实现。
根据layout_list_info判断是不是列表。下个layout的第一个段如果不是列表那么看他们是否有几行都有相同的缩进。
"""
if len(pre_page_paras)==0 or len(next_page_paras)==0: # 0的时候最后的return 会出错
return False
if pre_page_list_info[1] and not next_page_list_info[0]: # 前一个是列表结尾,后一个是非列表开头,此时检测是否有相同的缩进
logger.info(f"连接page {page_num} 内的list")
# 向layout_paras[i] 寻找开头具有相同缩进的连续的行
may_list_lines = []
for j in range(len(next_page_paras[0])):
line = next_page_paras[0][j]
if len(line)==1: # 只可能是一行,多行情况再需要分析了
if line[0]['bbox'][0] > __find_layout_bbox_by_line(line[0]['bbox'], next_page_layout_bbox)[0]:
may_list_lines.append(line[0])
else:
break
else:
break
# 如果这些行的缩进是相等的那么连到上一个layout的最后一个段落上。
if len(may_list_lines)>0 and len(set([x['bbox'][0] for x in may_list_lines]))==1:
pre_page_paras[-1].append(may_list_lines)
next_page_paras[0] = next_page_paras[0][len(may_list_lines):]
return True
return False
def __find_layout_bbox_by_line(line_bbox, layout_bboxes):
@@ -348,12 +386,12 @@ def __find_layout_bbox_by_line(line_bbox, layout_bboxes):
根据line找到所在的layout
"""
for layout in layout_bboxes:
if _is_in_or_part_overlap(line_bbox, layout):
if is_in_layout(line_bbox, layout):
return layout
return None
def __connect_para_inter_layoutbox(layout_paras, new_layout_bbox, line_group_end_with_list, lang="en"):
def __connect_para_inter_layoutbox(layout_paras, new_layout_bbox, lang):
"""
layout之间进行分段。
主要是计算前一个layOut的最后一行和后一个layout的第一行是否可以连接。
@@ -363,21 +401,27 @@ def __connect_para_inter_layoutbox(layout_paras, new_layout_bbox, line_group_end
"""
connected_layout_paras = []
for i, para in enumerate(layout_paras):
if i==0:
connected_layout_paras.append(para)
if len(layout_paras)==0:
return connected_layout_paras
connected_layout_paras.append(layout_paras[0])
for i in range(1, len(layout_paras)):
try:
if len(layout_paras[i])==0 or len(layout_paras[i-1])==0: # TODO 考虑连接问题,
continue
pre_last_line = layout_paras[i-1][-1][-1]
next_first_line = layout_paras[i][0][0]
except Exception as e:
logger.error(f"page layout {i} has no line")
continue
pre_last_line = layout_paras[i-1][-1]
next_first_line = layout_paras[i][0]
pre_last_line_text = ''.join([__get_span_text(span) for span in pre_last_line['spans']])
pre_last_line_type = pre_last_line['spans'][-1]['type']
next_first_line_text = ''.join([__get_span_text(span) for span in next_first_line['spans']])
next_first_line_type = next_first_line['spans'][0]['type']
if pre_last_line_type not in [TEXT, INLINE_EQUATION] or next_first_line_type not in [TEXT, INLINE_EQUATION]: # TODO真的要做好要考虑跨table, image, 行间的情况
connected_layout_paras.append(para)
if pre_last_line_type not in [TEXT, INLINE_EQUATION] or next_first_line_type not in [TEXT, INLINE_EQUATION]:
connected_layout_paras.append(layout_paras[i])
continue
pre_x2_max = __find_layout_bbox_by_line(pre_last_line['bbox'], new_layout_bbox)[2]
next_x0_min = __find_layout_bbox_by_line(next_first_line['bbox'], new_layout_bbox)[0]
@@ -385,15 +429,20 @@ def __connect_para_inter_layoutbox(layout_paras, new_layout_bbox, line_group_end
next_first_line_text = next_first_line_text.strip()
if pre_last_line['bbox'][2] == pre_x2_max and pre_last_line_text[-1] not in LINE_STOP_FLAG and next_first_line['bbox'][0]==next_x0_min: # 前面一行沾满了整个行,并且没有结尾符号.下一行没有空白开头。
"""连接段落条件成立将前一个layout的段落和后一个layout的段落连接。"""
connected_layout_paras[-1].extend(para)
else:
connected_layout_paras[-1][-1].extend(layout_paras[i][0])
layout_paras[i].pop(0) # 删除后一个layout的第一个段落 因为他已经被合并到前一个layout的最后一个段落了。
if len(layout_paras[i])==0:
layout_paras.pop(i)
else:
connected_layout_paras.append(layout_paras[i])
else:
"""连接段落条件不成立将前一个layout的段落加入到结果中。"""
connected_layout_paras.append(para)
connected_layout_paras.append(layout_paras[i])
return connected_layout_paras
def __connect_para_inter_page(pre_page_paras, next_page_paras, pre_page_layout_bbox, next_page_layout_bbox, lang):
def __connect_para_inter_page(pre_page_paras, next_page_paras, pre_page_layout_bbox, next_page_layout_bbox, page_num, lang):
"""
连接起来相邻两个页面的段落——前一个页面最后一个段落和后一个页面的第一个段落。
是否可以连接的条件:
@@ -401,10 +450,10 @@ def __connect_para_inter_page(pre_page_paras, next_page_paras, pre_page_layout_b
2. 后一个页面的第一个段落第一行没有空白开头。
"""
# 有的页面可能压根没有文字
if len(pre_page_paras)==0 or len(next_page_paras)==0:
if len(pre_page_paras)==0 or len(next_page_paras)==0 or len(pre_page_paras[0])==0 or len(next_page_paras[0])==0: # TODO [[]]为什么出现在pre_page_paras里
return False
pre_last_para = pre_page_paras[-1]
next_first_para = next_page_paras[0]
pre_last_para = pre_page_paras[-1][-1]
next_first_para = next_page_paras[0][0]
pre_last_line = pre_last_para[-1]
next_first_line = next_first_para[0]
pre_last_line_text = ''.join([__get_span_text(span) for span in pre_last_line['spans']])
@@ -423,14 +472,91 @@ def __connect_para_inter_page(pre_page_paras, next_page_paras, pre_page_layout_b
next_first_line_text = next_first_line_text.strip()
if pre_last_line['bbox'][2] == pre_x2_max and pre_last_line_text[-1] not in LINE_STOP_FLAG and next_first_line['bbox'][0]==next_x0_min: # 前面一行沾满了整个行,并且没有结尾符号.下一行没有空白开头。
"""连接段落条件成立将前一个layout的段落和后一个layout的段落连接。"""
pre_page_paras[-1].extend(next_first_para)
next_page_paras.pop(0) # 删除后一个页面的第一个段落, 因为他已经被合并到前一个页面的最后一个段落了。
pre_last_para.extend(next_first_para)
next_page_paras[0].pop(0) # 删除后一个页面的第一个段落, 因为他已经被合并到前一个页面的最后一个段落了。
return True
else:
return False
def find_consecutive_true_regions(input_array):
start_index = None # 连续True区域的起始索引
regions = [] # 用于保存所有连续True区域的起始和结束索引
def __do_split(blocks, layout_bboxes, new_layout_bbox, lang="en"):
for i in range(len(input_array)):
# 如果我们找到了一个True值并且当前并没有在连续True区域中
if input_array[i] and start_index is None:
start_index = i # 记录连续True区域的起始索引
# 如果我们找到了一个False值并且当前在连续True区域中
elif not input_array[i] and start_index is not None:
# 如果连续True区域长度大于1那么将其添加到结果列表中
if i - start_index > 1:
regions.append((start_index, i-1))
start_index = None # 重置起始索引
# 如果最后一个元素是True那么需要将最后一个连续True区域加入到结果列表中
if start_index is not None and len(input_array) - start_index > 1:
regions.append((start_index, len(input_array)-1))
return regions
def __connect_middle_align_text(page_paras, new_layout_bbox, page_num, lang, debug_mode):
"""
找出来中间对齐的连续单行文本,如果连续行高度相同,那么合并为一个段落。
一个line居中的条件是
1. 水平中心点跨越layout的中心点。
2. 左右两侧都有空白
"""
for layout_i, layout_para in enumerate(page_paras):
layout_box = new_layout_bbox[layout_i]
single_line_paras_tag = []
for i in range(len(layout_para)):
single_line_paras_tag.append(len(layout_para[i])==1 and layout_para[i][0]['spans'][0]['type']==TEXT)
"""找出来连续的单行文本,如果连续行高度相同,那么合并为一个段落。"""
consecutive_single_line_indices = find_consecutive_true_regions(single_line_paras_tag)
if len(consecutive_single_line_indices)>0:
index_offset = 0
"""检查这些行是否是高度相同的,居中的"""
for start, end in consecutive_single_line_indices:
start += index_offset
end += index_offset
line_hi = np.array([line[0]['bbox'][3]-line[0]['bbox'][1] for line in layout_para[start:end+1]])
first_line_text = ''.join([__get_span_text(span) for span in layout_para[start][0]['spans']])
if "Table" in first_line_text or "Figure" in first_line_text:
pass
if debug_mode:
logger.info(line_hi.std())
if line_hi.std()<2:
"""行高度相同,那么判断是否居中"""
all_left_x0 = [line[0]['bbox'][0] for line in layout_para[start:end+1]]
all_right_x1 = [line[0]['bbox'][2] for line in layout_para[start:end+1]]
layout_center = (layout_box[0] + layout_box[2]) / 2
if all([x0 < layout_center < x1 for x0, x1 in zip(all_left_x0, all_right_x1)]) \
and not all([x0==layout_box[0] for x0 in all_left_x0]) \
and not all([x1==layout_box[2] for x1 in all_right_x1]):
merge_para = [l[0] for l in layout_para[start:end+1]]
para_text = ''.join([__get_span_text(span) for line in merge_para for span in line['spans']])
if debug_mode:
logger.info(para_text)
layout_para[start:end+1] = [merge_para]
index_offset -= end-start
return
def __merge_signle_list_text(page_paras, new_layout_bbox, page_num, lang):
"""
找出来连续的单行文本,如果首行顶格,接下来的几个单行段落缩进对齐,那么合并为一个段落。
"""
pass
def __do_split_page(blocks, layout_bboxes, new_layout_bbox, page_num, lang):
"""
根据line和layout情况进行分段
先实现一个根据行末尾特征分段的简单方法。
@@ -443,34 +569,55 @@ def __do_split(blocks, layout_bboxes, new_layout_bbox, lang="en"):
4. 图、表,目前独占一行,不考虑分段。
"""
lines_group = __group_line_by_layout(blocks, layout_bboxes, lang) # block内分段
layout_paras, line_group_end_with_list = __split_para_in_layoutbox(lines_group, new_layout_bbox, lang) # layout内分段
connected_layout_paras = __connect_para_inter_layoutbox(layout_paras, new_layout_bbox, line_group_end_with_list, lang) # layout间链接段落
return connected_layout_paras
layout_paras, layout_list_info = __split_para_in_layoutbox(lines_group, new_layout_bbox, lang) # layout内分段
layout_paras2, page_list_info = __connect_list_inter_layout(layout_paras, new_layout_bbox, layout_list_info, page_num, lang) # layout之间连接列表段落
connected_layout_paras = __connect_para_inter_layoutbox(layout_paras2, new_layout_bbox, lang) # layout间链接段落
def para_split(pdf_info_dict, lang="en"):
return connected_layout_paras, page_list_info
def para_split(pdf_info_dict, debug_mode, lang="en"):
"""
根据line和layout情况进行分段
"""
new_layout_of_pages = [] # 数组的数组每个元素是一个页面的layoutS
for _, page in pdf_info_dict.items():
all_page_list_info = [] # 保存每个页面开头和结尾是否是列表
for page_num, page in pdf_info_dict.items():
blocks = page['preproc_blocks']
layout_bboxes = page['layout_bboxes']
new_layout_bbox = __common_pre_proc(blocks, layout_bboxes)
new_layout_of_pages.append(new_layout_bbox)
splited_blocks = __do_split(blocks, layout_bboxes, new_layout_bbox, lang)
splited_blocks, page_list_info = __do_split_page(blocks, layout_bboxes, new_layout_bbox, page_num, lang)
all_page_list_info.append(page_list_info)
page['para_blocks'] = splited_blocks
"""连接页面与页面之间的可能合并的段落"""
pdf_infos = list(pdf_info_dict.values())
for i, page in enumerate(pdf_info_dict.values()):
if i==0:
for page_num, page in enumerate(pdf_info_dict.values()):
if page_num==0:
continue
pre_page_paras = pdf_infos[i-1]['para_blocks']
next_page_paras = pdf_infos[i]['para_blocks']
pre_page_layout_bbox = new_layout_of_pages[i-1]
next_page_layout_bbox = new_layout_of_pages[i]
pre_page_paras = pdf_infos[page_num-1]['para_blocks']
next_page_paras = pdf_infos[page_num]['para_blocks']
pre_page_layout_bbox = new_layout_of_pages[page_num-1]
next_page_layout_bbox = new_layout_of_pages[page_num]
is_conn= __connect_para_inter_page(pre_page_paras, next_page_paras, pre_page_layout_bbox, next_page_layout_bbox, lang)
if is_conn:
logger.info(f"连接了第{i-1}页和第{i}页的段落")
is_conn = __connect_para_inter_page(pre_page_paras, next_page_paras, pre_page_layout_bbox, next_page_layout_bbox, page_num, lang)
if debug_mode:
if is_conn:
logger.info(f"连接了第{page_num-1}页和第{page_num}页的段落")
is_list_conn = __connect_list_inter_page(pre_page_paras, next_page_paras, pre_page_layout_bbox, next_page_layout_bbox, all_page_list_info[page_num-1], all_page_list_info[page_num], page_num, lang)
if debug_mode:
if is_list_conn:
logger.info(f"连接了第{page_num-1}页和第{page_num}页的列表段落")
"""接下来可能会漏掉一些特别的一些可以合并的内容,对他们进行段落连接
1. 正文中有时出现一个行顶格,接下来几行缩进的情况。
2. 居中的一些连续单行,如果高度相同,那么可能是一个段落。
"""
for page_num, page in enumerate(pdf_info_dict.values()):
page_paras = page['para_blocks']
new_layout_bbox = new_layout_of_pages[page_num]
__connect_middle_align_text(page_paras, new_layout_bbox, page_num, lang, debug_mode=debug_mode)
__merge_signle_list_text(page_paras, new_layout_bbox, page_num, lang)

View File

@@ -57,16 +57,16 @@ def construct_page_component(blocks, layout_bboxes, page_id, page_w, page_h, lay
def parse_pdf_by_ocr(
pdf_path,
s3_pdf_profile,
pdf_model_output,
save_path,
book_name,
pdf_model_profile=None,
image_s3_config=None,
start_page_id=0,
end_page_id=None,
debug_mode=False,
pdf_path,
s3_pdf_profile,
pdf_model_output,
save_path,
book_name,
pdf_model_profile=None,
image_s3_config=None,
start_page_id=0,
end_page_id=None,
debug_mode=False,
):
pdf_bytes = read_file(pdf_path, s3_pdf_profile)
save_tmp_path = os.path.join(os.path.dirname(__file__), "../..", "tmp", "unittest")
@@ -95,7 +95,6 @@ def parse_pdf_by_ocr(
start_time = time.time()
end_page_id = end_page_id if end_page_id else len(pdf_docs) - 1
for page_id in range(start_page_id, end_page_id + 1):
@@ -125,13 +124,6 @@ def parse_pdf_by_ocr(
page_id, page, ocr_page_info, md_bookname_save_path, debug_mode=debug_mode
)
# 构建需要remove的bbox列表
# need_remove_spans_bboxes = []
# need_remove_spans_bboxes.extend(page_no_bboxes)
# need_remove_spans_bboxes.extend(header_bboxes)
# need_remove_spans_bboxes.extend(footer_bboxes)
# need_remove_spans_bboxes.extend(footnote_bboxes)
# 构建需要remove的bbox字典
need_remove_spans_bboxes_dict = {
DropTag.PAGE_NUMBER: page_no_bboxes,
@@ -199,50 +191,48 @@ def parse_pdf_by_ocr(
else:
continue
# 删除重叠spans中较小的那些
'''删除重叠spans中较小的那些'''
spans, dropped_spans_by_span_overlap = remove_overlaps_min_spans(spans)
# 删除remove_span_block_bboxes中的bbox
# spans = remove_spans_by_bboxes(spans, need_remove_spans_bboxes)
# 按qa要求增加drop相关数据
'''
删除remove_span_block_bboxes中的bbox
增加drop相关数据
'''
spans, dropped_spans_by_removed_bboxes = remove_spans_by_bboxes_dict(spans, need_remove_spans_bboxes_dict)
# 对image和table截图
'''对image和table截图'''
spans = cut_image_and_table(spans, page, page_id, book_name, save_path, img_s3_client)
# 行内公式调整, 高度调整至与同行文字高度一致(优先左侧, 其次右侧)
'''行内公式调整, 高度调整至与同行文字高度一致(优先左侧, 其次右侧)'''
displayed_list = []
text_inline_lines = []
modify_y_axis(spans, displayed_list, text_inline_lines)
# 模型识别错误的行间公式, type类型转换成行内公式
'''模型识别错误的行间公式, type类型转换成行内公式'''
spans = modify_inline_equation(spans, displayed_list, text_inline_lines)
# bbox去除粘连
'''bbox去除粘连'''
spans = remove_overlap_between_bbox(spans)
# 对tpye=["interline_equation", "image", "table"]进行额外处理,如果左边有字的话,将该span的bbox中y0调整至不高于文字的y0
'''
对tpye=["interline_equation", "image", "table"]进行额外处理,
如果左边有字的话,将该span的bbox中y0调整至不高于文字的y0
'''
spans = adjust_bbox_for_standalone_block(spans)
# 从ocr_page_info中解析layout信息(按自然阅读方向排序,并修复重叠和交错的bad case)
'''从ocr_page_info中解析layout信息(按自然阅读方向排序,并修复重叠和交错的bad case)'''
layout_bboxes, layout_tree = layout_detect(ocr_page_info['subfield_dets'], page, ocr_page_info)
# 将spans合并成line(在layout内,从上到下,从左到右)
'''将spans合并成line(在layout内,从上到下,从左到右)'''
lines, dropped_spans_by_layout = merge_spans_to_line_by_layout(spans, layout_bboxes)
# 将lines合并成block
'''将lines合并成block'''
blocks = merge_lines_to_block(lines)
# 根据block合并段落
#para_blocks = para_split(blocks, layout_bboxes)
# 获取QA需要外置的list
'''获取QA需要外置的list'''
images, tables, interline_equations, inline_equations = get_qa_need_list(blocks)
# drop的span_list合并
'''drop的span_list合并'''
dropped_spans = []
dropped_spans.extend(dropped_spans_by_span_overlap)
dropped_spans.extend(dropped_spans_by_removed_bboxes)
@@ -263,19 +253,18 @@ def parse_pdf_by_ocr(
elif span['type'] in [ContentType.InlineEquation, ContentType.InterlineEquation]:
dropped_equation_block.append(span)
# 构造pdf_info_dict
'''构造pdf_info_dict'''
page_info = construct_page_component(blocks, layout_bboxes, page_id, page_w, page_h, layout_tree,
images, tables, interline_equations, inline_equations,
dropped_text_block, dropped_image_block, dropped_table_block, dropped_equation_block,
dropped_text_block, dropped_image_block, dropped_table_block,
dropped_equation_block,
need_remove_spans_bboxes_dict)
pdf_info_dict[f"page_{page_id}"] = page_info
"""分段"""
para_split(pdf_info_dict)
# 在测试时,保存调试信息
para_split(pdf_info_dict, debug_mode=debug_mode)
'''在测试时,保存调试信息'''
if debug_mode:
params_file_save_path = join_path(
save_tmp_path, "md", book_name, "preproc_out.json"

View File

@@ -220,7 +220,7 @@ def parse_pdf_for_train(
# 解析表格并对table_bboxes进行位置的微调,防止表格周围的文字被截断
table_bboxes = parse_tables(page_id, page, model_output_json)
table_bboxes = fix_tables(
page, table_bboxes, include_table_title=True, scan_line_num=2
page, table_bboxes, include_table_title=False, scan_line_num=2
) # 修正
table_bboxes = fix_table_text_block(
text_raw_blocks, table_bboxes
@@ -253,7 +253,8 @@ def parse_pdf_for_train(
# isSimpleLayout_flag, fullColumn_cnt, subColumn_cnt, curPage_loss = evaluate_pdf_layout(page_id, page, model_output_json)
接下来开始进行预处理过程
"""
# title_bboxs = parse_titles(page_id, page, model_output_json)
"""去掉每页的页码、页眉、页脚"""
page_no_bboxs = parse_pageNos(page_id, page, model_output_json)
header_bboxs = parse_headers(page_id, page, model_output_json)
@@ -530,6 +531,7 @@ def parse_pdf_for_train(
page_info["bak_page_no_bboxes"] = page_no_bboxs
page_info["bak_header_bboxes"] = header_bboxs
page_info["bak_footer_bboxes"] = footer_bboxs
page_info["bak_footer_note_bboxes"] = footnote_bboxes_tmp
pdf_info_dict[f"page_{page_id}"] = page_info

View File

@@ -3,12 +3,9 @@ import sys
import time
from urllib.parse import quote
from magic_pdf.dict2md.ocr_mkcontent import (
ocr_mk_nlp_markdown,
ocr_mk_mm_markdown,
ocr_mk_mm_standard_format,
ocr_mk_mm_markdown_with_para, ocr_mk_mm_markdown_with_para_and_pagination,
)
from magic_pdf.dict2md.ocr_mkcontent import ocr_mk_mm_markdown, ocr_mk_nlp_markdown_with_para, \
ocr_mk_mm_markdown_with_para_and_pagination, ocr_mk_mm_markdown_with_para, ocr_mk_mm_standard_format, \
make_standard_format_with_para
from magic_pdf.libs.commons import (
read_file,
join_path,
@@ -23,19 +20,14 @@ from magic_pdf.pdf_parse_by_model import parse_pdf_by_model
from magic_pdf.filter.pdf_classify_by_type import classify
from magic_pdf.filter.pdf_meta_scan import pdf_meta_scan
from loguru import logger
from magic_pdf.pdf_parse_by_ocr import parse_pdf_by_ocr
from magic_pdf.pdf_parse_for_train import parse_pdf_for_train
from magic_pdf.spark.base import exception_handler, get_data_source
from magic_pdf.train_utils.convert_to_train_format import convert_to_train_format
from app.common.s3 import get_s3_config, get_s3_client
from magic_pdf.pdf_parse_by_ocr import parse_pdf_by_ocr
def exception_handler(jso: dict, e):
logger.exception(e)
jso["need_drop"] = True
jso["drop_reason"] = DropReason.Exception
jso["exception"] = f"ERROR: {e}"
return jso
def get_data_type(jso: dict):
data_type = jso.get("data_type")
@@ -51,13 +43,6 @@ def get_bookid(jso: dict):
return book_id
def get_data_source(jso: dict):
data_source = jso.get("data_source")
if data_source is None:
data_source = jso.get("file_source")
return data_source
def meta_scan(jso: dict, doc_layout_check=True) -> dict:
s3_pdf_path = jso.get("file_location")
s3_config = get_s3_config(s3_pdf_path)
@@ -400,221 +385,6 @@ def uni_parse_pdf(jso: dict, start_page_id=0, debug_mode=False) -> dict:
jso = ocr_dropped_parse_pdf(jso, start_page_id=start_page_id, debug_mode=debug_mode)
return jso
# 专门用来跑被drop的pdf跑完之后需要把need_drop字段置为false
def ocr_dropped_parse_pdf(jso: dict, start_page_id=0, debug_mode=False) -> dict:
if not jso.get("need_drop", False):
return jso
else:
jso = ocr_parse_pdf_core(
jso, start_page_id=start_page_id, debug_mode=debug_mode
)
jso["need_drop"] = False
return jso
def ocr_parse_pdf(jso: dict, start_page_id=0, debug_mode=False) -> dict:
# 检测debug开关
if debug_mode:
pass
else: # 如果debug没开则检测是否有needdrop字段
if jso.get("need_drop", False):
return jso
jso = ocr_parse_pdf_core(jso, start_page_id=start_page_id, debug_mode=debug_mode)
return jso
def ocr_parse_pdf_core(jso: dict, start_page_id=0, debug_mode=False) -> dict:
s3_pdf_path = jso.get("file_location")
s3_config = get_s3_config(s3_pdf_path)
model_output_json_list = jso.get("doc_layout_result")
data_source = get_data_source(jso)
file_id = jso.get("file_id")
book_name = f"{data_source}/{file_id}"
try:
save_path = s3_image_save_path
image_s3_config = get_s3_config(save_path)
start_time = time.time() # 记录开始时间
# 先打印一下book_name和解析开始的时间
logger.info(
f"book_name is:{book_name},start_time is:{formatted_time(start_time)}",
file=sys.stderr,
)
pdf_info_dict = parse_pdf_by_ocr(
s3_pdf_path,
s3_config,
model_output_json_list,
save_path,
book_name,
pdf_model_profile=None,
image_s3_config=image_s3_config,
start_page_id=start_page_id,
debug_mode=debug_mode,
)
pdf_info_dict = JsonCompressor.compress_json(pdf_info_dict)
jso["pdf_intermediate_dict"] = pdf_info_dict
end_time = time.time() # 记录完成时间
parse_time = int(end_time - start_time) # 计算执行时间
# 解析完成后打印一下book_name和耗时
logger.info(
f"book_name is:{book_name},end_time is:{formatted_time(end_time)},cost_time is:{parse_time}",
file=sys.stderr,
)
jso["parse_time"] = parse_time
except Exception as e:
jso = exception_handler(jso, e)
return jso
def ocr_pdf_intermediate_dict_to_markdown(jso: dict, debug_mode=False) -> dict:
if debug_mode:
pass
else: # 如果debug没开则检测是否有needdrop字段
if jso.get("need_drop", False):
book_name = join_path(get_data_source(jso), jso["file_id"])
logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
jso["dropped"] = True
return jso
try:
pdf_intermediate_dict = jso["pdf_intermediate_dict"]
# 将 pdf_intermediate_dict 解压
pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
markdown_content = ocr_mk_mm_markdown(pdf_intermediate_dict)
jso["content"] = markdown_content
logger.info(
f"book_name is:{get_data_source(jso)}/{jso['file_id']},markdown content length is {len(markdown_content)}",
file=sys.stderr,
)
# 把无用的信息清空
jso["doc_layout_result"] = ""
jso["pdf_intermediate_dict"] = ""
jso["pdf_meta"] = ""
except Exception as e:
jso = exception_handler(jso, e)
return jso
def ocr_pdf_intermediate_dict_to_markdown_with_para(jso: dict, debug_mode=False) -> dict:
if debug_mode:
pass
else: # 如果debug没开则检测是否有needdrop字段
if jso.get("need_drop", False):
book_name = join_path(get_data_source(jso), jso["file_id"])
logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
jso["dropped"] = True
return jso
try:
pdf_intermediate_dict = jso["pdf_intermediate_dict"]
# 将 pdf_intermediate_dict 解压
pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
markdown_content = ocr_mk_mm_markdown_with_para(pdf_intermediate_dict)
jso["content"] = markdown_content
logger.info(
f"book_name is:{get_data_source(jso)}/{jso['file_id']},markdown content length is {len(markdown_content)}",
file=sys.stderr,
)
# 把无用的信息清空
jso["doc_layout_result"] = ""
jso["pdf_intermediate_dict"] = ""
jso["pdf_meta"] = ""
except Exception as e:
jso = exception_handler(jso, e)
return jso
def ocr_pdf_intermediate_dict_to_markdown_with_para_and_pagination(jso: dict, debug_mode=False) -> dict:
if debug_mode:
pass
else: # 如果debug没开则检测是否有needdrop字段
if jso.get("need_drop", False):
book_name = join_path(get_data_source(jso), jso["file_id"])
logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
jso["dropped"] = True
return jso
try:
pdf_intermediate_dict = jso["pdf_intermediate_dict"]
# 将 pdf_intermediate_dict 解压
pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
markdown_content = ocr_mk_mm_markdown_with_para_and_pagination(pdf_intermediate_dict)
jso["content"] = markdown_content
logger.info(
f"book_name is:{get_data_source(jso)}/{jso['file_id']},markdown content length is {len(markdown_content)}",
file=sys.stderr,
)
# 把无用的信息清空
# jso["doc_layout_result"] = ""
jso["pdf_intermediate_dict"] = ""
# jso["pdf_meta"] = ""
except Exception as e:
jso = exception_handler(jso, e)
return jso
def ocr_pdf_intermediate_dict_to_markdown_with_para_for_qa(
jso: dict, debug_mode=False
) -> dict:
if debug_mode:
pass
else: # 如果debug没开则检测是否有needdrop字段
if jso.get("need_drop", False):
book_name = join_path(get_data_source(jso), jso["file_id"])
logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
jso["dropped"] = True
return jso
try:
pdf_intermediate_dict = jso["pdf_intermediate_dict"]
# 将 pdf_intermediate_dict 解压
pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
markdown_content = ocr_mk_mm_markdown_with_para(pdf_intermediate_dict)
jso["content_ocr"] = markdown_content
logger.info(
f"book_name is:{get_data_source(jso)}/{jso['file_id']},markdown content length is {len(markdown_content)}",
file=sys.stderr,
)
# 把无用的信息清空
jso["doc_layout_result"] = ""
jso["pdf_intermediate_dict"] = ""
jso["mid_json_ocr"] = pdf_intermediate_dict
jso["pdf_meta"] = ""
except Exception as e:
jso = exception_handler(jso, e)
return jso
def ocr_pdf_intermediate_dict_to_standard_format(jso: dict, debug_mode=False) -> dict:
if debug_mode:
pass
else: # 如果debug没开则检测是否有needdrop字段
if jso.get("need_drop", False):
book_name = join_path(get_data_source(jso), jso["file_id"])
logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
jso["dropped"] = True
return jso
try:
pdf_intermediate_dict = jso["pdf_intermediate_dict"]
# 将 pdf_intermediate_dict 解压
pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
standard_format = ocr_mk_mm_standard_format(pdf_intermediate_dict)
jso["content_list"] = standard_format
logger.info(
f"book_name is:{get_data_source(jso)}/{jso['file_id']},content_list length is {len(standard_format)}",
file=sys.stderr,
)
# 把无用的信息清空
jso["doc_layout_result"] = ""
jso["pdf_intermediate_dict"] = ""
jso["pdf_meta"] = ""
except Exception as e:
jso = exception_handler(jso, e)
return jso
def parse_pdf_for_model_train(jso: dict, start_page_id=0, debug_mode=False) -> dict:
# 检测debug开关
if debug_mode:
@@ -695,5 +465,243 @@ def parse_pdf_for_model_train(jso: dict, start_page_id=0, debug_mode=False) -> d
return jso
# 专门用来跑被drop的pdf跑完之后需要把need_drop字段置为false
def ocr_dropped_parse_pdf(jso: dict, start_page_id=0, debug_mode=False) -> dict:
if not jso.get("need_drop", False):
return jso
else:
jso = ocr_parse_pdf_core(
jso, start_page_id=start_page_id, debug_mode=debug_mode
)
jso["need_drop"] = False
return jso
def ocr_parse_pdf(jso: dict, start_page_id=0, debug_mode=False) -> dict:
# 检测debug开关
if debug_mode:
pass
else: # 如果debug没开则检测是否有needdrop字段
if jso.get("need_drop", False):
return jso
jso = ocr_parse_pdf_core(jso, start_page_id=start_page_id, debug_mode=debug_mode)
return jso
def ocr_parse_pdf_core(jso: dict, start_page_id=0, debug_mode=False) -> dict:
s3_pdf_path = jso.get("file_location")
s3_config = get_s3_config(s3_pdf_path)
model_output_json_list = jso.get("doc_layout_result")
data_source = get_data_source(jso)
file_id = jso.get("file_id")
book_name = f"{data_source}/{file_id}"
try:
save_path = s3_image_save_path
image_s3_config = get_s3_config(save_path)
start_time = time.time() # 记录开始时间
# 先打印一下book_name和解析开始的时间
logger.info(
f"book_name is:{book_name},start_time is:{formatted_time(start_time)}",
file=sys.stderr,
)
pdf_info_dict = parse_pdf_by_ocr(
s3_pdf_path,
s3_config,
model_output_json_list,
save_path,
book_name,
pdf_model_profile=None,
image_s3_config=image_s3_config,
start_page_id=start_page_id,
debug_mode=debug_mode,
)
pdf_info_dict = JsonCompressor.compress_json(pdf_info_dict)
jso["pdf_intermediate_dict"] = pdf_info_dict
end_time = time.time() # 记录完成时间
parse_time = int(end_time - start_time) # 计算执行时间
# 解析完成后打印一下book_name和耗时
logger.info(
f"book_name is:{book_name},end_time is:{formatted_time(end_time)},cost_time is:{parse_time}",
file=sys.stderr,
)
jso["parse_time"] = parse_time
except Exception as e:
jso = exception_handler(jso, e)
return jso
def ocr_pdf_intermediate_dict_to_markdown(jso: dict, debug_mode=False) -> dict:
if debug_mode:
pass
else: # 如果debug没开则检测是否有needdrop字段
if jso.get("need_drop", False):
book_name = join_path(get_data_source(jso), jso["file_id"])
logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
jso["dropped"] = True
return jso
try:
pdf_intermediate_dict = jso["pdf_intermediate_dict"]
# 将 pdf_intermediate_dict 解压
pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
markdown_content = ocr_mk_mm_markdown(pdf_intermediate_dict)
jso["content"] = markdown_content
logger.info(
f"book_name is:{get_data_source(jso)}/{jso['file_id']},markdown content length is {len(markdown_content)}",
file=sys.stderr,
)
# 把无用的信息清空
jso["doc_layout_result"] = ""
jso["pdf_intermediate_dict"] = ""
jso["pdf_meta"] = ""
except Exception as e:
jso = exception_handler(jso, e)
return jso
def ocr_pdf_intermediate_dict_to_markdown_with_para(jso: dict, debug_mode=False) -> dict:
if debug_mode:
pass
else: # 如果debug没开则检测是否有needdrop字段
if jso.get("need_drop", False):
book_name = join_path(get_data_source(jso), jso["file_id"])
logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
jso["dropped"] = True
return jso
try:
pdf_intermediate_dict = jso["pdf_intermediate_dict"]
# 将 pdf_intermediate_dict 解压
pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
# markdown_content = ocr_mk_mm_markdown_with_para(pdf_intermediate_dict)
markdown_content = ocr_mk_nlp_markdown_with_para(pdf_intermediate_dict)
jso["content"] = markdown_content
logger.info(
f"book_name is:{get_data_source(jso)}/{jso['file_id']},markdown content length is {len(markdown_content)}",
file=sys.stderr,
)
# 把无用的信息清空
jso["doc_layout_result"] = ""
jso["pdf_intermediate_dict"] = ""
jso["pdf_meta"] = ""
except Exception as e:
jso = exception_handler(jso, e)
return jso
def ocr_pdf_intermediate_dict_to_markdown_with_para_and_pagination(jso: dict, debug_mode=False) -> dict:
if debug_mode:
pass
else: # 如果debug没开则检测是否有needdrop字段
if jso.get("need_drop", False):
book_name = join_path(get_data_source(jso), jso["file_id"])
logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
jso["dropped"] = True
return jso
try:
pdf_intermediate_dict = jso["pdf_intermediate_dict"]
# 将 pdf_intermediate_dict 解压
pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
markdown_content = ocr_mk_mm_markdown_with_para_and_pagination(pdf_intermediate_dict)
jso["content"] = markdown_content
logger.info(
f"book_name is:{get_data_source(jso)}/{jso['file_id']},markdown content length is {len(markdown_content)}",
file=sys.stderr,
)
# 把无用的信息清空
# jso["doc_layout_result"] = ""
jso["pdf_intermediate_dict"] = ""
# jso["pdf_meta"] = ""
except Exception as e:
jso = exception_handler(jso, e)
return jso
def ocr_pdf_intermediate_dict_to_markdown_with_para_for_qa(
jso: dict, debug_mode=False
) -> dict:
if debug_mode:
pass
else: # 如果debug没开则检测是否有needdrop字段
if jso.get("need_drop", False):
book_name = join_path(get_data_source(jso), jso["file_id"])
logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
jso["dropped"] = True
return jso
try:
pdf_intermediate_dict = jso["pdf_intermediate_dict"]
# 将 pdf_intermediate_dict 解压
pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
markdown_content = ocr_mk_mm_markdown_with_para(pdf_intermediate_dict)
jso["content_ocr"] = markdown_content
logger.info(
f"book_name is:{get_data_source(jso)}/{jso['file_id']},markdown content length is {len(markdown_content)}",
file=sys.stderr,
)
# 把无用的信息清空
jso["doc_layout_result"] = ""
jso["pdf_intermediate_dict"] = ""
jso["mid_json_ocr"] = pdf_intermediate_dict
jso["pdf_meta"] = ""
except Exception as e:
jso = exception_handler(jso, e)
return jso
def ocr_pdf_intermediate_dict_to_standard_format(jso: dict, debug_mode=False) -> dict:
if debug_mode:
pass
else: # 如果debug没开则检测是否有needdrop字段
if jso.get("need_drop", False):
book_name = join_path(get_data_source(jso), jso["file_id"])
logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
jso["dropped"] = True
return jso
try:
pdf_intermediate_dict = jso["pdf_intermediate_dict"]
# 将 pdf_intermediate_dict 解压
pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
standard_format = ocr_mk_mm_standard_format(pdf_intermediate_dict)
jso["content_list"] = standard_format
logger.info(
f"book_name is:{get_data_source(jso)}/{jso['file_id']},content_list length is {len(standard_format)}",
file=sys.stderr,
)
# 把无用的信息清空
jso["doc_layout_result"] = ""
jso["pdf_intermediate_dict"] = ""
jso["pdf_meta"] = ""
except Exception as e:
jso = exception_handler(jso, e)
return jso
def ocr_pdf_intermediate_dict_to_standard_format_with_para(jso: dict, debug_mode=False) -> dict:
if debug_mode:
pass
else: # 如果debug没开则检测是否有needdrop字段
if jso.get("need_drop", False):
book_name = join_path(get_data_source(jso), jso["file_id"])
logger.info(f"book_name is:{book_name} need drop", file=sys.stderr)
jso["dropped"] = True
return jso
try:
pdf_intermediate_dict = jso["pdf_intermediate_dict"]
# 将 pdf_intermediate_dict 解压
pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
standard_format = make_standard_format_with_para(pdf_intermediate_dict)
jso["content_list"] = standard_format
logger.info(
f"book_name is:{get_data_source(jso)}/{jso['file_id']},content_list length is {len(standard_format)}",
file=sys.stderr,
)
# 把无用的信息清空
jso["doc_layout_result"] = ""
jso["pdf_intermediate_dict"] = ""
jso["pdf_meta"] = ""
except Exception as e:
jso = exception_handler(jso, e)
return jso
if __name__ == "__main__":
pass

69
magic_pdf/pipeline_txt.py Normal file
View File

@@ -0,0 +1,69 @@
"""
文本型pdf转化为统一清洗格式
"""
# TODO 移动到spark/目录下
from loguru import logger
from magic_pdf.dict2md.mkcontent import mk_mm_markdown, mk_universal_format
from magic_pdf.libs.commons import join_path
from magic_pdf.libs.json_compressor import JsonCompressor
from magic_pdf.spark.base import exception_handler, get_data_source
def txt_pdf_to_standard_format(jso: dict, debug_mode=False) -> dict:
"""
变成统一的标准格式
"""
if debug_mode:
pass
else: # 如果debug没开则检测是否有needdrop字段
if jso.get("need_drop", False):
book_name = join_path(get_data_source(jso), jso["file_id"])
logger.info(f"book_name is:{book_name} need drop")
jso["dropped"] = True
return jso
try:
pdf_intermediate_dict = jso["pdf_intermediate_dict"]
# 将 pdf_intermediate_dict 解压
pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
standard_format = mk_universal_format(pdf_intermediate_dict)
jso["content_list"] = standard_format
logger.info(f"book_name is:{get_data_source(jso)}/{jso['file_id']},content_list length is {len(standard_format)}",)
# 把无用的信息清空
jso["doc_layout_result"] = ""
jso["pdf_intermediate_dict"] = ""
jso["pdf_meta"] = ""
except Exception as e:
jso = exception_handler(jso, e)
return jso
def txt_pdf_to_mm_markdown_format(jso: dict, debug_mode=False) -> dict:
"""
变成多模态的markdown格式
"""
if debug_mode:
pass
else: # 如果debug没开则检测是否有needdrop字段
if jso.get("need_drop", False):
book_name = join_path(get_data_source(jso), jso["file_id"])
logger.info(f"book_name is:{book_name} need drop")
jso["dropped"] = True
return jso
try:
pdf_intermediate_dict = jso["pdf_intermediate_dict"]
# 将 pdf_intermediate_dict 解压
pdf_intermediate_dict = JsonCompressor.decompress_json(pdf_intermediate_dict)
standard_format = mk_universal_format(pdf_intermediate_dict)
mm_content = mk_mm_markdown(standard_format)
jso["content"] = mm_content
logger.info(f"book_name is:{get_data_source(jso)}/{jso['file_id']},content_list length is {len(standard_format)}",)
# 把无用的信息清空
to_del_keys = ["doc_layout_result", "pdf_intermediate_dict", "pdf_meta", "parsed_result"]
for key in to_del_keys:
if jso.get(key):
del jso[key]
except Exception as e:
jso = exception_handler(jso, e)
return jso

View File

@@ -44,10 +44,15 @@ def remove_spans_by_bboxes_dict(spans, need_remove_spans_bboxes_dict):
# logger.info(f"remove spans by bbox dict, drop_tag: {drop_tag}, removed_bboxes: {removed_bboxes}")
need_remove_spans = []
for span in spans:
# 通过判断span的bbox是否在removed_bboxes中, 判断是否需要删除该span
for removed_bbox in removed_bboxes:
if calculate_overlap_area_in_bbox1_area_ratio(span['bbox'], removed_bbox) > 0.5:
need_remove_spans.append(span)
break
# 当drop_tag为DropTag.FOOTNOTE时, 判断span是否在removed_bboxes中任意一个的下方如果是,则删除该span
elif drop_tag == DropTag.FOOTNOTE and (span['bbox'][1]+span['bbox'][3])/2 > removed_bbox[3] and removed_bbox[0] < (span['bbox'][0]+span['bbox'][2])/2 < removed_bbox[2]:
need_remove_spans.append(span)
break
for span in need_remove_spans:
spans.remove(span)

View File

21
magic_pdf/spark/base.py Normal file
View File

@@ -0,0 +1,21 @@
from loguru import logger
from magic_pdf.libs.drop_reason import DropReason
def get_data_source(jso: dict):
data_source = jso.get("data_source")
if data_source is None:
data_source = jso.get("file_source")
return data_source
def exception_handler(jso: dict, e):
logger.exception(e)
jso["need_drop"] = True
jso["drop_reason"] = DropReason.Exception
jso["exception"] = f"ERROR: {e}"
return jso

View File

@@ -35,8 +35,16 @@ def convert_to_train_format(jso: dict) -> []:
# 脚注, 目前没有看到例子
for para in v["para_blocks"]:
n_bbox = {"category_id": 2, "bbox": para["bbox"]}
bboxes.append(n_bbox)
if "paras" in para:
paras = para["paras"]
for para_key, para_content in paras.items():
para_bbox = para_content["para_bbox"]
is_para_title = para_content["is_para_title"]
if is_para_title:
n_bbox = {"category_id": 0, "bbox": para_bbox}
else:
n_bbox = {"category_id": 2, "bbox": para_bbox}
bboxes.append(n_bbox)
for inline_equation in v["inline_equations"]:
n_bbox = {"category_id": 13, "bbox": inline_equation["bbox"]}
@@ -46,6 +54,10 @@ def convert_to_train_format(jso: dict) -> []:
n_bbox = {"category_id": 10, "bbox": inter_equation["bbox"]}
bboxes.append(n_bbox)
for footnote_bbox in v["bak_footer_note_bboxes"]:
n_bbox = {"category_id": 5, "bbox": list(footnote_bbox)}
bboxes.append(n_bbox)
info["bboxes"] = bboxes
info["layout_tree"] = v["layout_bboxes"]
pages.append(info)

View File

@@ -46,6 +46,8 @@ def indicator_cal(json_standard,json_test):
'''批量读取中间生成的json文件'''
test_inline_equations=[]
test_interline_equations=[]
test_inline_euqations_bboxs=[]
test_interline_equations_bboxs=[]
test_dropped_text_bboxes=[]
test_dropped_text_tag=[]
test_dropped_image_bboxes=[]
@@ -58,15 +60,20 @@ def indicator_cal(json_standard,json_test):
mid_json=pd.DataFrame(i)
mid_json=mid_json.iloc[:,:-1]
for j1 in mid_json.loc['inline_equations',:]:
page_in=[]
page_in_text=[]
page_in_bbox=[]
for k1 in j1:
page_in.append(k1['latex_text'])
test_inline_equations.append(page_in)
page_in_text.append(k1['latex_text'])
page_in_bbox.append(k1['bbox'])
test_inline_equations.append(page_in_text)
test_inline_euqations_bboxs.append(page_in_bbox)
for j2 in mid_json.loc['interline_equations',:]:
page_in=[]
page_in_text=[]
page_in_bbox=[]
for k2 in j2:
page_in.append(k2['latex_text'])
test_interline_equations.append(page_in)
page_in_text.append(k2['latex_text'])
test_interline_equations.append(page_in_text)
test_interline_equations_bboxs.append(page_in_bbox)
for j3 in mid_json.loc['droped_text_block',:]:
page_in_bbox=[]
@@ -101,6 +108,8 @@ def indicator_cal(json_standard,json_test):
standard_inline_equations=[]
standard_interline_equations=[]
standard_inline_euqations_bboxs=[]
standard_interline_equations_bboxs=[]
standard_dropped_text_bboxes=[]
standard_dropped_text_tag=[]
standard_dropped_image_bboxes=[]
@@ -113,15 +122,21 @@ def indicator_cal(json_standard,json_test):
mid_json=pd.DataFrame(i)
mid_json=mid_json.iloc[:,:-1]
for j1 in mid_json.loc['inline_equations',:]:
page_in=[]
page_in_text=[]
page_in_bbox=[]
for k1 in j1:
page_in.append(k1['latex_text'])
standard_inline_equations.append(page_in)
page_in_text.append(k1['latex_text'])
page_in_bbox.append(k1['bbox'])
standard_inline_equations.append(page_in_text)
standard_inline_euqations_bboxs.append(page_in_bbox)
for j2 in mid_json.loc['interline_equations',:]:
page_in=[]
page_in_text=[]
page_in_bbox=[]
for k2 in j2:
page_in.append(k2['latex_text'])
standard_interline_equations.append(page_in)
page_in_text.append(k2['latex_text'])
page_in_bbox.append(k2['bbox'])
standard_interline_equations.append(page_in_text)
standard_interline_equations_bboxs.append(page_in_bbox)
for j3 in mid_json.loc['droped_text_block',:]:
page_in_bbox=[]
page_in_tag=[]
@@ -195,6 +210,9 @@ def indicator_cal(json_standard,json_test):
inline_equations_edit=np.mean(dis1)
inline_equations_bleu=np.mean(bleu1)
'''行内公式bbox匹配相关指标'''
inline_equations_bbox_report=bbox_match_indicator(test_inline_euqations_bboxs,standard_inline_euqations_bboxs)
'''行间公式编辑距离和bleu'''
dis2=[]
@@ -217,6 +235,10 @@ def indicator_cal(json_standard,json_test):
interline_equations_bleu=np.mean(bleu2)
'''行间公式bbox匹配相关指标'''
interline_equations_bbox_report=bbox_match_indicator(test_interline_equations_bboxs,standard_interline_equations_bboxs)
'''可以先检查page和bbox数量是否一致'''
@@ -289,87 +311,11 @@ def indicator_cal(json_standard,json_test):
'''dropped_image_block的bbox匹配相关指标'''
'''有数据格式不一致的问题'''
image_block_report=bbox_match_indicator(test_dropped_image_bboxes,standard_dropped_image_bboxes)
test_image_bbox=[]
standard_image_bbox=[]
for a,b in zip(test_dropped_image_bboxes,standard_dropped_image_bboxes):
test_page_bbox=[]
standard_page_bbox=[]
if len(a)==0 and len(b)==0:
pass
else:
for i in b:
if len(i)!=4:
continue
else:
judge=0
standard_page_bbox.append(1)
for j in a:
if bbox_offset(i,j):
judge=1
test_page_bbox.append(1)
break
if judge==0:
test_page_bbox.append(0)
diff_num=len(a)+test_page_bbox.count(0)-len(b)
if diff_num>0:#有多删的情况出现
test_page_bbox.extend([1]*diff_num)
standard_page_bbox.extend([0]*diff_num)
test_image_bbox.extend(test_page_bbox)
standard_image_bbox.extend(standard_page_bbox)
image_block_report = {}
image_block_report['accuracy']=metrics.accuracy_score(standard_image_bbox,test_image_bbox)
image_block_report['precision']=metrics.precision_score(standard_image_bbox,test_image_bbox)
image_block_report['recall']=metrics.recall_score(standard_image_bbox,test_image_bbox)
image_block_report['f1_score']=metrics.f1_score(standard_image_bbox,test_image_bbox)
'''dropped_table_block的bbox匹配相关指标'''
test_table_bbox=[]
standard_table_bbox=[]
for a,b in zip(test_dropped_table_bboxes,standard_dropped_table_bboxes):
test_page_bbox=[]
standard_page_bbox=[]
if len(a)==0 and len(b)==0:
pass
else:
for i in b:
if len(i)!=4:
continue
else:
judge=0
standard_page_bbox.append(1)
for j in a:
if bbox_offset(i,j):
judge=1
test_page_bbox.append(1)
break
if judge==0:
test_page_bbox.append(0)
diff_num=len(a)+test_page_bbox.count(0)-len(b)
if diff_num>0:#有多删的情况出现
test_page_bbox.extend([1]*diff_num)
standard_page_bbox.extend([0]*diff_num)
test_table_bbox.extend(test_page_bbox)
standard_table_bbox.extend(standard_page_bbox)
table_block_report = {}
table_block_report['accuracy']=metrics.accuracy_score(standard_table_bbox,test_table_bbox)
table_block_report['precision']=metrics.precision_score(standard_table_bbox,test_table_bbox)
table_block_report['recall']=metrics.recall_score(standard_table_bbox,test_table_bbox)
table_block_report['f1_score']=metrics.f1_score(standard_table_bbox,test_table_bbox)
table_block_report=bbox_match_indicator(test_dropped_table_bboxes,standard_dropped_table_bboxes)
'''阅读顺序编辑距离的均值'''
@@ -392,6 +338,8 @@ def indicator_cal(json_standard,json_test):
output['行间公式平均编辑距离']=[interline_equations_edit]
output['行内公式平均bleu']=[inline_equations_bleu]
output['行间公式平均bleu']=[interline_equations_bleu]
output['行内公式识别相关指标']=[inline_equations_bbox_report]
output['行间公式识别相关指标']=[interline_equations_bbox_report]
output['阅读顺序平均编辑距离']=[preproc_num_edit]
output['分段准确率']=[acc_para]
output['删除的text block的相关指标']=[text_block_report]
@@ -434,6 +382,52 @@ def bbox_offset(b_t,b_s):
return True
else:
return False
'''bbox匹配和对齐函数输出相关指标'''
'''输入的是以page为单位的bbox列表'''
def bbox_match_indicator(test_bbox_list,standard_bbox_list):
test_bbox=[]
standard_bbox=[]
for a,b in zip(test_bbox_list,standard_bbox_list):
test_page_bbox=[]
standard_page_bbox=[]
if len(a)==0 and len(b)==0:
pass
else:
for i in b:
if len(i)!=4:
continue
else:
judge=0
standard_page_bbox.append(1)
for j in a:
if bbox_offset(i,j):
judge=1
test_page_bbox.append(1)
break
if judge==0:
test_page_bbox.append(0)
diff_num=len(a)+test_page_bbox.count(0)-len(b)
if diff_num>0:#有多删的情况出现
test_page_bbox.extend([1]*diff_num)
standard_page_bbox.extend([0]*diff_num)
test_bbox.extend(test_page_bbox)
standard_bbox.extend(standard_page_bbox)
block_report = {}
block_report['accuracy']=metrics.accuracy_score(standard_bbox,test_bbox)
block_report['precision']=metrics.precision_score(standard_bbox,test_bbox)
block_report['recall']=metrics.recall_score(standard_bbox,test_bbox)
block_report['f1_score']=metrics.f1_score(standard_bbox,test_bbox)
return block_report