Compare commits

...

14 Commits

Author SHA1 Message Date
Xiaomeng Zhao
c9f402adda Merge pull request #4747 from opendatalab/master
master->dev
2026-04-07 21:13:08 +08:00
myhloli
e16b8582b4 Update version.py with new version 2026-04-07 13:02:14 +00:00
Xiaomeng Zhao
3ad7e0b981 Merge pull request #4746 from opendatalab/dev
3.0.9: fix #4742 #4744
2026-04-07 20:59:52 +08:00
Xiaomeng Zhao
f93e260f24 Merge pull request #4745 from myhloli/dev
fix #4744
2026-04-07 20:53:46 +08:00
myhloli
fede292388 feat: adjust contrast threshold for OCR processing in span_pre_proc.py 2026-04-07 20:49:26 +08:00
myhloli
7f365ce96b feat: optimize character processing in span_pre_proc.py for improved spacing and content generation 2026-04-07 20:47:29 +08:00
myhloli
0f43f17fae feat: add aspect ratio checks and character count limits for PDF processing 2026-04-07 20:36:22 +08:00
myhloli
e83395a0b3 feat: enhance table merging logic with improved row metrics and state management 2026-04-07 19:36:07 +08:00
Xiaomeng Zhao
576581d826 Merge pull request #4743 from myhloli/dev
feat: add function to identify disallowed control Unicode characters
2026-04-07 17:03:31 +08:00
myhloli
23f3bd5975 feat: add function to identify disallowed control Unicode characters 2026-04-07 16:41:22 +08:00
Xiaomeng Zhao
56f474d32a Merge pull request #4734 from myhloli/dev
docs: update CLI tools documentation for mineru API usage
2026-04-03 19:25:23 +08:00
myhloli
c66848a425 docs: update CLI tools documentation for mineru API usage 2026-04-03 19:24:12 +08:00
Xiaomeng Zhao
82788a9b8b Merge pull request #4733 from opendatalab/master
master->dev
2026-04-03 18:54:22 +08:00
myhloli
4be86ee07a Update version.py with new version 2026-04-03 10:48:44 +00:00
6 changed files with 505 additions and 410 deletions

View File

@@ -87,9 +87,10 @@ Options:
> [!NOTE]
> Starting from this version, `mineru` is an orchestration client built on top of `mineru-api`:
> - Without `--api-url`, the CLI launches a temporary local `mineru-api`
> - With `--api-url`, the CLI connects to that FastAPI service directly
> - `--url` is no longer the MinerU API address; it is the OpenAI-compatible backend URL used by server-side `vlm/hybrid-http-client`
>
>- Without `--api-url`, the CLI launches a temporary local `mineru-api`
>- With `--api-url`, the CLI connects to that FastAPI service directly
>- `--url` is no longer the MinerU API address; it is the OpenAI-compatible backend URL used by server-side `vlm/hybrid-http-client`
Some parameters of MinerU command line tools have equivalent environment variable configurations. Generally, environment variable configurations have higher priority than command line parameters and take effect across all command line tools.
Here are the environment variables and their descriptions:

View File

@@ -81,9 +81,10 @@ Options:
> [!NOTE]
> 从当前版本开始,`mineru` 是基于 `mineru-api` 的编排客户端:
> - 未传 `--api-url` 时CLI 会自动拉起本地临时 `mineru-api`
> - 传入 `--api-url` 时CLI 会直连该 FastAPI 服务
> - `--url` 不再表示 MinerU API 地址,而是服务端 `vlm/hybrid-http-client` 所需的 OpenAI 兼容地址
>
>- 未传 `--api-url`CLI 会自动拉起本地临时 `mineru-api`
>- 传入 `--api-url`CLI 会直连该 FastAPI 服务
>- `--url` 不再表示 MinerU API 地址,而是服务端 `vlm/hybrid-http-client` 所需的 OpenAI 兼容地址
MinerU命令行工具的某些参数存在相同功能的环境变量配置通常环境变量配置的优先级高于命令行参数且在所有命令行工具中都生效。
以下是常用的环境变量及其说明:

View File

@@ -32,11 +32,23 @@ CID_RATIO_THRESHOLD = 0.05
TEXT_QUALITY_MIN_CHARS = 300
TEXT_QUALITY_BAD_THRESHOLD = 0.03
TEXT_QUALITY_GOOD_THRESHOLD = 0.005
MAX_PAGE_ASPECT_RATIO = 10.0
_ALLOWED_CONTROL_CODES = {9, 10, 13}
_PRIVATE_USE_AREA_START = 0xE000
_PRIVATE_USE_AREA_END = 0xF8FF
def _is_disallowed_control_unicode(unicode_code: int) -> bool:
return (
(
0 <= unicode_code < 32
or 127 <= unicode_code <= 159
)
and unicode_code not in _ALLOWED_CONTROL_CODES
)
def classify(pdf_bytes):
"""
Classify a PDF as text-based or OCR-based.
@@ -90,6 +102,17 @@ def classify_hybrid(pdf_bytes):
if not page_indices:
return "ocr"
extreme_page_index, extreme_ratio = get_extreme_aspect_ratio_page_pdfium(
pdf,
page_indices,
)
if extreme_page_index is not None:
logger.info(
"Classify PDF as OCR due to extreme sampled-page aspect ratio: "
f"page={extreme_page_index + 1}, ratio={extreme_ratio:.2f}"
)
return "ocr"
if (
get_avg_cleaned_chars_per_page_pdfium(pdf, page_indices)
< CHARS_THRESHOLD
@@ -203,6 +226,24 @@ def get_sample_page_indices(page_count: int, max_pages: int = MAX_SAMPLE_PAGES):
return sorted(indices)
def get_extreme_aspect_ratio_page_pdfium(
pdf_doc,
page_indices,
max_page_aspect_ratio: float = MAX_PAGE_ASPECT_RATIO,
):
for page_index in page_indices:
page = pdf_doc[page_index]
page_width, page_height = page.get_size()
if page_width <= 0 or page_height <= 0:
continue
aspect_ratio = max(page_width / page_height, page_height / page_width)
if aspect_ratio > max_page_aspect_ratio:
return page_index, aspect_ratio
return None, None
def get_avg_cleaned_chars_per_page(pdf_doc, pages_to_check):
total_chars = 0
cleaned_total_chars = 0
@@ -252,7 +293,7 @@ def get_text_quality_signal_pdfium(pdf_doc, page_indices):
null_char_count += 1
elif unicode_code == 0xFFFD:
replacement_char_count += 1
elif unicode_code < 32 and unicode_code not in _ALLOWED_CONTROL_CODES:
elif _is_disallowed_control_unicode(unicode_code):
control_char_count += 1
elif _PRIVATE_USE_AREA_START <= unicode_code <= _PRIVATE_USE_AREA_END:
private_use_char_count += 1

View File

@@ -13,6 +13,8 @@ from mineru.utils.enum_class import BlockType, ContentType
from mineru.utils.pdf_image_tools import get_crop_img
from mineru.utils.pdf_text_tool import get_page
MAX_NATIVE_TEXT_CHARS_PER_PAGE = 65535
def __replace_ligatures(text: str):
ligatures = {
@@ -29,6 +31,21 @@ def __replace_unicode(text: str):
"""pdf_text dict方案 char级别"""
def txt_spans_extract(pdf_page, spans, pil_img, scale, all_bboxes, all_discarded_blocks):
page_char_count = None
try:
page_char_count = pdf_page.get_textpage().count_chars()
except Exception as exc:
logger.debug(f"Failed to get page char count before txt extraction: {exc}")
if page_char_count is not None and page_char_count > MAX_NATIVE_TEXT_CHARS_PER_PAGE:
logger.info(
"Fallback to post-OCR in txt_spans_extract due to high char count: "
f"count_chars={page_char_count}"
)
need_ocr_spans = [
span for span in spans if span.get('type') == ContentType.TEXT
]
return _prepare_post_ocr_spans(need_ocr_spans, spans, pil_img, scale)
page_dict = get_page(pdf_page)
@@ -99,21 +116,26 @@ def txt_spans_extract(pdf_page, spans, pil_img, scale, all_bboxes, all_discarded
need_ocr_spans = fill_char_in_spans(new_spans, page_all_chars, median_span_height)
"""对未填充的span进行ocr"""
if len(need_ocr_spans) > 0:
return _prepare_post_ocr_spans(need_ocr_spans, spans, pil_img, scale)
for span in need_ocr_spans:
# 对span的bbox截图再ocr
span_pil_img = get_crop_img(span['bbox'], pil_img, scale)
span_img = cv2.cvtColor(np.array(span_pil_img), cv2.COLOR_RGB2BGR)
# 计算span的对比度低于0.20的span不进行ocr
if calculate_contrast(span_img, img_mode='bgr') <= 0.17:
def _prepare_post_ocr_spans(need_ocr_spans, spans, pil_img, scale):
if len(need_ocr_spans) == 0:
return spans
for span in need_ocr_spans:
# 对span的bbox截图再ocr
span_pil_img = get_crop_img(span['bbox'], pil_img, scale)
span_img = cv2.cvtColor(np.array(span_pil_img), cv2.COLOR_RGB2BGR)
# 计算span的对比度低于0.17的span不进行ocr
if calculate_contrast(span_img, img_mode='bgr') <= 0.17:
if span in spans:
spans.remove(span)
continue
continue
span['content'] = ''
span['score'] = 1.0
span['np_img'] = span_img
span['content'] = ''
span['score'] = 1.0
span['np_img'] = span_img
return spans
@@ -194,28 +216,32 @@ def calculate_char_in_span(char_bbox, span_bbox, char, span_height_ratio=Span_He
def chars_to_content(span):
# 检查span中的char是否为空
if len(span['chars']) == 0:
pass
else:
if len(span['chars']) != 0:
# 给chars按char_idx排序
span['chars'] = sorted(span['chars'], key=lambda x: x['char_idx'])
chars = sorted(span['chars'], key=lambda x: x['char_idx'])
# Calculate the width of each character
char_widths = [char['bbox'][2] - char['bbox'][0] for char in span['chars']]
char_widths = [char['bbox'][2] - char['bbox'][0] for char in chars]
# Calculate the median width
median_width = statistics.median(char_widths)
content = ''
for char in span['chars']:
parts = []
for idx, char1 in enumerate(chars):
char2 = chars[idx + 1] if idx + 1 < len(chars) else None
# 如果下一个char的x0和上一个char的x1距离超过0.25个字符宽度,则需要在中间插入一个空格
char1 = char
char2 = span['chars'][span['chars'].index(char) + 1] if span['chars'].index(char) + 1 < len(span['chars']) else None
if char2 and char2['bbox'][0] - char1['bbox'][2] > median_width * 0.25 and char['char'] != ' ' and char2['char'] != ' ':
content += f"{char['char']} "
if (
char2
and char2['bbox'][0] - char1['bbox'][2] > median_width * 0.25
and char1['char'] != ' '
and char2['char'] != ' '
):
parts.append(char1['char'])
parts.append(' ')
else:
content += char['char']
parts.append(char1['char'])
content = ''.join(parts)
content = __replace_unicode(content)
content = __replace_ligatures(content)
content = __replace_ligatures(content)

View File

@@ -1,7 +1,8 @@
# Copyright (c) Opendatalab. All rights reserved.
from copy import deepcopy
from dataclasses import dataclass
from typing import Any
from loguru import logger
from bs4 import BeautifulSoup
from mineru.backend.vlm.vlm_middle_json_mkcontent import merge_para_with_text
@@ -24,125 +25,254 @@ CONTINUATION_INLINE_MARKERS = [
"(continued)",
]
MAX_HEADER_ROWS = 5
def calculate_table_total_columns(soup):
"""计算表格的总列数通过分析整个表格结构来处理rowspan和colspan
Args:
soup: BeautifulSoup解析的表格
@dataclass
class RowMetrics:
row_idx: int
effective_cols: int
actual_cols: int
visual_cols: int
Returns:
int: 表格的总列数
@dataclass
class RowSignature:
effective_cols: int
colspans: tuple[int, ...]
rowspans: tuple[int, ...]
normalized_texts: tuple[str, ...]
display_texts: tuple[str, ...]
@property
def cell_count(self) -> int:
return len(self.colspans)
@dataclass
class RowScanResult:
row_effective_cols: list[int]
row_metrics: list[RowMetrics]
total_cols: int
last_nonempty_row_metrics: RowMetrics | None
tail_occupied: dict[int, set[int]]
@dataclass
class TableMergeState:
owner_block: dict[str, Any]
body_span: dict[str, Any]
soup: Any
tbody: Any
rows: list[Any]
total_cols: int
front_header_info: list[RowSignature]
front_first_data_row_metrics: dict[int, RowMetrics]
last_data_row_metrics: RowMetrics | None
row_effective_cols: list[int]
tail_occupied: dict[int, set[int]]
dirty: bool = False
def _normalize_cell_text(cell) -> str:
return "".join(full_to_half(cell.get_text()).split())
def _display_cell_text(cell) -> str:
return full_to_half(cell.get_text().strip())
def _scan_rows(rows, initial_occupied: dict[int, set[int]] | None = None, start_row_idx: int = 0) -> RowScanResult:
"""Scan rows once and cache effective-column metrics.
initial_occupied stores future-row occupancy relative to the first scanned row
and preserves rowspans that cross a merge boundary.
"""
rows = soup.find_all("tr")
if not rows:
return 0
# 创建一个矩阵来跟踪每个位置的占用情况
occupied: dict[int, dict[int, bool]] = {}
max_cols = 0
occupied = {} # {row_idx: {col_idx: True}}
for row_idx, row in enumerate(rows):
for row_offset, cols in (initial_occupied or {}).items():
if not cols:
continue
occupied[row_offset] = {col: True for col in cols}
max_cols = max(max_cols, max(cols) + 1)
row_effective_cols: list[int] = []
row_metrics: list[RowMetrics] = []
last_nonempty_row_metrics: RowMetrics | None = None
for local_idx, row in enumerate(rows):
occupied_row = occupied.setdefault(local_idx, {})
col_idx = 0
cells = row.find_all(["td", "th"])
if row_idx not in occupied:
occupied[row_idx] = {}
actual_cols = 0
for cell in cells:
# 找到下一个未被占用的列位置
while col_idx in occupied[row_idx]:
while col_idx in occupied_row:
col_idx += 1
colspan = int(cell.get("colspan", 1))
rowspan = int(cell.get("rowspan", 1))
actual_cols += colspan
# 标记被这个单元格占用的所有位置
for r in range(row_idx, row_idx + rowspan):
if r not in occupied:
occupied[r] = {}
for c in range(col_idx, col_idx + colspan):
occupied[r][c] = True
for row_offset in range(rowspan):
target_idx = local_idx + row_offset
occupied_target = occupied.setdefault(target_idx, {})
for col in range(col_idx, col_idx + colspan):
occupied_target[col] = True
col_idx += colspan
max_cols = max(max_cols, col_idx)
return max_cols
effective_cols = max(occupied_row.keys()) + 1 if occupied_row else 0
row_effective_cols.append(effective_cols)
max_cols = max(max_cols, effective_cols)
metrics = RowMetrics(
row_idx=start_row_idx + local_idx,
effective_cols=effective_cols,
actual_cols=actual_cols,
visual_cols=len(cells),
)
row_metrics.append(metrics)
if cells:
last_nonempty_row_metrics = metrics
tail_occupied = {
row_idx - len(rows): set(cols.keys())
for row_idx, cols in occupied.items()
if row_idx >= len(rows) and cols
}
return RowScanResult(
row_effective_cols=row_effective_cols,
row_metrics=row_metrics,
total_cols=max_cols,
last_nonempty_row_metrics=last_nonempty_row_metrics,
tail_occupied=tail_occupied,
)
def _build_row_signature(row, effective_cols: int) -> RowSignature:
cells = row.find_all(["td", "th"])
return RowSignature(
effective_cols=effective_cols,
colspans=tuple(int(cell.get("colspan", 1)) for cell in cells),
rowspans=tuple(int(cell.get("rowspan", 1)) for cell in cells),
normalized_texts=tuple(_normalize_cell_text(cell) for cell in cells),
display_texts=tuple(_display_cell_text(cell) for cell in cells),
)
def _build_front_cache(rows, max_header_rows: int = MAX_HEADER_ROWS) -> tuple[list[RowSignature], dict[int, RowMetrics]]:
front_limit = min(len(rows), max_header_rows + 1)
front_rows = rows[:front_limit]
front_scan = _scan_rows(front_rows)
front_header_info = [
_build_row_signature(front_rows[idx], front_scan.row_effective_cols[idx])
for idx in range(min(len(front_rows), max_header_rows))
]
front_first_data_row_metrics = {
idx: metrics for idx, metrics in enumerate(front_scan.row_metrics)
}
return front_header_info, front_first_data_row_metrics
def _find_table_body_span(table_block):
for block in table_block["blocks"]:
if block["type"] == BlockType.TABLE_BODY and block["lines"] and block["lines"][0]["spans"]:
return block["lines"][0]["spans"][0]
return None
def _refresh_table_state_metrics(state: TableMergeState) -> None:
scan = _scan_rows(state.rows)
state.row_effective_cols = scan.row_effective_cols
state.total_cols = scan.total_cols
state.last_data_row_metrics = scan.last_nonempty_row_metrics
state.tail_occupied = scan.tail_occupied
state.front_header_info, state.front_first_data_row_metrics = _build_front_cache(state.rows)
def _build_table_state(table_block, max_header_rows: int = MAX_HEADER_ROWS) -> TableMergeState | None:
body_span = _find_table_body_span(table_block)
if body_span is None:
return None
html = body_span.get("html", "")
if not html:
return None
soup = BeautifulSoup(html, "html.parser")
tbody = soup.find("tbody") or soup.find("table")
rows = soup.find_all("tr")
scan = _scan_rows(rows)
front_header_info, front_first_data_row_metrics = _build_front_cache(rows, max_header_rows=max_header_rows)
return TableMergeState(
owner_block=table_block,
body_span=body_span,
soup=soup,
tbody=tbody,
rows=rows,
total_cols=scan.total_cols,
front_header_info=front_header_info,
front_first_data_row_metrics=front_first_data_row_metrics,
last_data_row_metrics=scan.last_nonempty_row_metrics,
row_effective_cols=scan.row_effective_cols,
tail_occupied=scan.tail_occupied,
)
def _get_or_create_table_state(
table_block,
state_cache: dict[int, TableMergeState],
max_header_rows: int = MAX_HEADER_ROWS,
) -> TableMergeState | None:
cache_key = id(table_block)
state = state_cache.get(cache_key)
if state is not None:
return state
state = _build_table_state(table_block, max_header_rows=max_header_rows)
if state is not None:
state_cache[cache_key] = state
return state
def _serialize_table_state_html(state: TableMergeState) -> None:
state.body_span["html"] = str(state.soup)
state.dirty = False
def calculate_table_total_columns(soup):
"""计算表格的总列数通过分析整个表格结构来处理rowspan和colspan."""
rows = soup.find_all("tr")
return _scan_rows(rows).total_cols if rows else 0
def build_table_occupied_matrix(soup):
"""构建表格的占用矩阵,返回每行的有效列数
Args:
soup: BeautifulSoup解析的表格
Returns:
dict: {row_idx: effective_columns} 每行的有效列数考虑rowspan占用
"""
"""构建表格的占用矩阵,返回每行的有效列数."""
rows = soup.find_all("tr")
if not rows:
return {}
occupied = {} # {row_idx: {col_idx: True}}
row_effective_cols = {} # {row_idx: effective_columns}
for row_idx, row in enumerate(rows):
col_idx = 0
cells = row.find_all(["td", "th"])
if row_idx not in occupied:
occupied[row_idx] = {}
for cell in cells:
# 找到下一个未被占用的列位置
while col_idx in occupied[row_idx]:
col_idx += 1
colspan = int(cell.get("colspan", 1))
rowspan = int(cell.get("rowspan", 1))
# 标记被这个单元格占用的所有位置
for r in range(row_idx, row_idx + rowspan):
if r not in occupied:
occupied[r] = {}
for c in range(col_idx, col_idx + colspan):
occupied[r][c] = True
col_idx += colspan
# 该行的有效列数为已占用的最大列索引+1
if occupied[row_idx]:
row_effective_cols[row_idx] = max(occupied[row_idx].keys()) + 1
else:
row_effective_cols[row_idx] = 0
return row_effective_cols
scan = _scan_rows(rows)
return {
row_idx: effective_cols
for row_idx, effective_cols in enumerate(scan.row_effective_cols)
}
def calculate_row_effective_columns(soup, row_idx):
"""计算指定行的有效列数考虑rowspan占用
Args:
soup: BeautifulSoup解析的表格
row_idx: 行索引
Returns:
int: 该行的有效列数
"""
"""计算指定行的有效列数考虑rowspan占用."""
row_effective_cols = build_table_occupied_matrix(soup)
return row_effective_cols.get(row_idx, 0)
def calculate_row_columns(row):
"""
计算表格行的实际列数考虑colspan属性
Args:
row: BeautifulSoup的tr元素对象
Returns:
int: 行的实际列数
"""
"""计算表格行的实际列数考虑colspan属性."""
cells = row.find_all(["td", "th"])
column_count = 0
@@ -154,126 +284,67 @@ def calculate_row_columns(row):
def calculate_visual_columns(row):
"""
计算表格行的视觉列数实际td/th单元格数量不考虑colspan
Args:
row: BeautifulSoup的tr元素对象
Returns:
int: 行的视觉列数(实际单元格数)
"""
"""计算表格行的视觉列数实际td/th单元格数量不考虑colspan."""
cells = row.find_all(["td", "th"])
return len(cells)
def detect_table_headers(soup1, soup2, max_header_rows=5):
"""
检测并比较两个表格的表头
def detect_table_headers(state1: TableMergeState, state2: TableMergeState, max_header_rows: int = MAX_HEADER_ROWS):
"""检测并比较两个表格的表头,仅扫描前几行."""
front_rows1 = state1.front_header_info[:max_header_rows]
front_rows2 = state2.front_header_info[:max_header_rows]
Args:
soup1: 第一个表格的BeautifulSoup对象
soup2: 第二个表格的BeautifulSoup对象
max_header_rows: 最大可能的表头行数
Returns:
tuple: (表头行数, 表头是否一致, 表头文本列表)
"""
rows1 = soup1.find_all("tr")
rows2 = soup2.find_all("tr")
# 构建两个表格的有效列数矩阵
effective_cols1 = build_table_occupied_matrix(soup1)
effective_cols2 = build_table_occupied_matrix(soup2)
min_rows = min(len(rows1), len(rows2), max_header_rows)
min_rows = min(len(front_rows1), len(front_rows2), max_header_rows)
header_rows = 0
headers_match = True
header_texts = []
for i in range(min_rows):
# 提取当前行的所有单元格
cells1 = rows1[i].find_all(["td", "th"])
cells2 = rows2[i].find_all(["td", "th"])
# 检查两行的结构和内容是否一致
structure_match = True
# 首先检查单元格数量
if len(cells1) != len(cells2):
structure_match = False
else:
# 检查有效列数是否一致考虑rowspan影响
if effective_cols1.get(i, 0) != effective_cols2.get(i, 0):
structure_match = False
else:
# 然后检查单元格的属性和内容
for cell1, cell2 in zip(cells1, cells2):
colspan1 = int(cell1.get("colspan", 1))
rowspan1 = int(cell1.get("rowspan", 1))
colspan2 = int(cell2.get("colspan", 1))
rowspan2 = int(cell2.get("rowspan", 1))
# 去除所有空白字符(包括空格、换行、制表符等)
text1 = ''.join(full_to_half(cell1.get_text()).split())
text2 = ''.join(full_to_half(cell2.get_text()).split())
if colspan1 != colspan2 or rowspan1 != rowspan2 or text1 != text2:
structure_match = False
break
for row_idx in range(min_rows):
row1 = front_rows1[row_idx]
row2 = front_rows2[row_idx]
structure_match = (
row1.cell_count == row2.cell_count
and row1.effective_cols == row2.effective_cols
and row1.colspans == row2.colspans
and row1.rowspans == row2.rowspans
and row1.normalized_texts == row2.normalized_texts
)
if structure_match:
header_rows += 1
row_texts = [full_to_half(cell.get_text().strip()) for cell in cells1]
header_texts.append(row_texts) # 添加表头文本
header_texts.append(list(row1.display_texts))
else:
headers_match = header_rows > 0 # 只有当至少匹配了一行时,才认为表头匹配
headers_match = header_rows > 0
break
# 如果严格匹配失败,尝试视觉一致性匹配(只比较文本内容)
if header_rows == 0:
header_rows, headers_match, header_texts = _detect_table_headers_visual(soup1, soup2, rows1, rows2, max_header_rows)
header_rows, headers_match, header_texts = _detect_table_headers_visual(
state1, state2, max_header_rows=max_header_rows
)
return header_rows, headers_match, header_texts
def _detect_table_headers_visual(soup1, soup2, rows1, rows2, max_header_rows=5):
"""
基于视觉一致性检测表头只比较文本内容忽略colspan/rowspan差异
def _detect_table_headers_visual(
state1: TableMergeState,
state2: TableMergeState,
max_header_rows: int = MAX_HEADER_ROWS,
):
"""基于视觉一致性检测表头只比较文本内容忽略colspan/rowspan差异."""
front_rows1 = state1.front_header_info[:max_header_rows]
front_rows2 = state2.front_header_info[:max_header_rows]
Args:
soup1: 第一个表格的BeautifulSoup对象
soup2: 第二个表格的BeautifulSoup对象
rows1: 第一个表格的行列表
rows2: 第二个表格的行列表
max_header_rows: 最大可能的表头行数
Returns:
tuple: (表头行数, 表头是否一致, 表头文本列表)
"""
# 构建两个表格的有效列数矩阵
effective_cols1 = build_table_occupied_matrix(soup1)
effective_cols2 = build_table_occupied_matrix(soup2)
min_rows = min(len(rows1), len(rows2), max_header_rows)
min_rows = min(len(front_rows1), len(front_rows2), max_header_rows)
header_rows = 0
headers_match = True
header_texts = []
for i in range(min_rows):
cells1 = rows1[i].find_all(["td", "th"])
cells2 = rows2[i].find_all(["td", "th"])
# 提取每行的文本内容列表(去除空白字符)
texts1 = [''.join(full_to_half(cell.get_text()).split()) for cell in cells1]
texts2 = [''.join(full_to_half(cell.get_text()).split()) for cell in cells2]
# 检查视觉一致性:文本内容完全相同,且有效列数一致
effective_cols_match = effective_cols1.get(i, 0) == effective_cols2.get(i, 0)
if texts1 == texts2 and effective_cols_match:
for row_idx in range(min_rows):
row1 = front_rows1[row_idx]
row2 = front_rows2[row_idx]
if row1.normalized_texts == row2.normalized_texts and row1.effective_cols == row2.effective_cols:
header_rows += 1
row_texts = [full_to_half(cell.get_text().strip()) for cell in cells1]
header_texts.append(row_texts)
header_texts.append(list(row1.display_texts))
else:
headers_match = header_rows > 0
break
@@ -284,126 +355,69 @@ def _detect_table_headers_visual(soup1, soup2, rows1, rows2, max_header_rows=5):
return header_rows, headers_match, header_texts
def can_merge_tables(current_table_block, previous_table_block):
"""判断两个表格是否可以合并"""
# 检查表格是否有caption和footnote
# 计算previous_table_block中的footnote数量
footnote_count = sum(1 for block in previous_table_block["blocks"] if block["type"] == BlockType.TABLE_FOOTNOTE)
# 如果有TABLE_CAPTION类型的块,检查是否至少有一个以"(续)"结尾
caption_blocks = [block for block in current_table_block["blocks"] if block["type"] == BlockType.TABLE_CAPTION]
def can_merge_tables(current_state: TableMergeState, previous_state: TableMergeState):
"""判断两个表格是否可以合并."""
current_table_block = current_state.owner_block
previous_table_block = previous_state.owner_block
footnote_count = sum(
1 for block in previous_table_block["blocks"] if block["type"] == BlockType.TABLE_FOOTNOTE
)
caption_blocks = [
block for block in current_table_block["blocks"] if block["type"] == BlockType.TABLE_CAPTION
]
if caption_blocks:
# 检查是否至少有一个caption包含续表标识
has_continuation_marker = False
for block in caption_blocks:
caption_text = full_to_half(merge_para_with_text(block).strip()).lower()
if (
any(caption_text.endswith(marker.lower()) for marker in CONTINUATION_END_MARKERS)
or any(marker.lower() in caption_text for marker in CONTINUATION_INLINE_MARKERS)
any(caption_text.endswith(marker.lower()) for marker in CONTINUATION_END_MARKERS)
or any(marker.lower() in caption_text for marker in CONTINUATION_INLINE_MARKERS)
):
has_continuation_marker = True
break
# 如果所有caption都不包含续表标识则不允许合并
if not has_continuation_marker:
return False, None, None, None, None
return False
# 如果current_table_block的caption存在续标识,放宽footnote的限制允许previous_table_block有最多一条footnote
if footnote_count > 1:
return False, None, None, None, None
else:
if footnote_count > 0:
return False, None, None, None, None
return False
elif footnote_count > 0:
return False
# 获取两个表格的HTML内容
current_html = ""
previous_html = ""
for block in current_table_block["blocks"]:
if (block["type"] == BlockType.TABLE_BODY and block["lines"] and block["lines"][0]["spans"]):
current_html = block["lines"][0]["spans"][0].get("html", "")
for block in previous_table_block["blocks"]:
if (block["type"] == BlockType.TABLE_BODY and block["lines"] and block["lines"][0]["spans"]):
previous_html = block["lines"][0]["spans"][0].get("html", "")
if not current_html or not previous_html:
return False, None, None, None, None
# 检查表格宽度差异
x0_t1, y0_t1, x1_t1, y1_t1 = current_table_block["bbox"]
x0_t2, y0_t2, x1_t2, y1_t2 = previous_table_block["bbox"]
x0_t1, _, x1_t1, _ = current_table_block["bbox"]
x0_t2, _, x1_t2, _ = previous_table_block["bbox"]
table1_width = x1_t1 - x0_t1
table2_width = x1_t2 - x0_t2
if abs(table1_width - table2_width) / min(table1_width, table2_width) >= 0.1:
return False, None, None, None, None
# 解析HTML并检查表格结构
soup1 = BeautifulSoup(previous_html, "html.parser")
soup2 = BeautifulSoup(current_html, "html.parser")
# 检查整体列数匹配
table_cols1 = calculate_table_total_columns(soup1)
table_cols2 = calculate_table_total_columns(soup2)
# logger.debug(f"Table columns - Previous: {table_cols1}, Current: {table_cols2}")
tables_match = table_cols1 == table_cols2
# 检查首末行列数匹配
rows_match = check_rows_match(soup1, soup2)
return (tables_match or rows_match), soup1, soup2, current_html, previous_html
def check_rows_match(soup1, soup2):
"""检查表格行是否匹配"""
rows1 = soup1.find_all("tr")
rows2 = soup2.find_all("tr")
if not (rows1 and rows2):
return False
# 获取第一个表的最后一行数据行索引
last_row_idx = None
last_row = None
for idx in range(len(rows1) - 1, -1, -1):
if rows1[idx].find_all(["td", "th"]):
last_row_idx = idx
last_row = rows1[idx]
break
if previous_state.total_cols == current_state.total_cols:
return True
# 检测表头行数,以便获取第二个表的首个数据行
header_count, _, _ = detect_table_headers(soup1, soup2)
return check_rows_match(previous_state, current_state)
# 获取第二个表的首个数据行
first_data_row_idx = None
first_data_row = None
if len(rows2) > header_count:
first_data_row_idx = header_count
first_data_row = rows2[header_count] # 第一个非表头行
if not (last_row and first_data_row):
def check_rows_match(previous_state: TableMergeState, current_state: TableMergeState):
"""检查表格边界行是否匹配."""
last_row_metrics = previous_state.last_data_row_metrics
if last_row_metrics is None:
return False
# 计算有效列数考虑rowspan和colspan
last_row_effective_cols = calculate_row_effective_columns(soup1, last_row_idx)
first_row_effective_cols = calculate_row_effective_columns(soup2, first_data_row_idx)
header_count, _, _ = detect_table_headers(previous_state, current_state)
first_data_row_metrics = current_state.front_first_data_row_metrics.get(header_count)
if first_data_row_metrics is None:
return False
# 计算实际列数仅考虑colspan和视觉列数
last_row_cols = calculate_row_columns(last_row)
first_row_cols = calculate_row_columns(first_data_row)
last_row_visual_cols = calculate_visual_columns(last_row)
first_row_visual_cols = calculate_visual_columns(first_data_row)
# logger.debug(f"行列数 - 前表最后一行: {last_row_cols}(有效列数:{last_row_effective_cols}, 视觉列数:{last_row_visual_cols}), 当前表首行: {first_row_cols}(有效列数:{first_row_effective_cols}, 视觉列数:{first_row_visual_cols})")
# 同时考虑有效列数匹配、实际列数匹配和视觉列数匹配
return (last_row_effective_cols == first_row_effective_cols or
last_row_cols == first_row_cols or
last_row_visual_cols == first_row_visual_cols)
return (
last_row_metrics.effective_cols == first_data_row_metrics.effective_cols
or last_row_metrics.actual_cols == first_data_row_metrics.actual_cols
or last_row_metrics.visual_cols == first_data_row_metrics.visual_cols
)
def check_row_columns_match(row1, row2):
# 逐个cell检测colspan属性是否一致
cells1 = row1.find_all(["td", "th"])
cells2 = row2.find_all(["td", "th"])
if len(cells1) != len(cells2):
@@ -416,51 +430,40 @@ def check_row_columns_match(row1, row2):
return True
def adjust_table_rows_colspan(soup, rows, start_idx, end_idx,
reference_structure, reference_visual_cols,
target_cols, current_cols, reference_row):
"""调整表格行的colspan属性以匹配目标列数
def adjust_table_rows_colspan(
rows,
start_idx,
end_idx,
row_effective_cols,
reference_structure,
reference_visual_cols,
target_cols,
match_reference_row,
):
"""调整表格行的colspan属性以匹配目标列数."""
reference_row_copy = deepcopy(match_reference_row)
Args:
soup: BeautifulSoup解析的表格对象用于计算有效列数
rows: 表格行列表
start_idx: 起始行索引
end_idx: 结束行索引(不包含)
reference_structure: 参考行的colspan结构列表
reference_visual_cols: 参考行的视觉列数
target_cols: 目标总列数
current_cols: 当前总列数
reference_row: 参考行对象
"""
reference_row_copy = deepcopy(reference_row)
# 构建有效列数矩阵
effective_cols_matrix = build_table_occupied_matrix(soup)
for i in range(start_idx, end_idx):
row = rows[i]
for row_idx in range(start_idx, end_idx):
row = rows[row_idx]
cells = row.find_all(["td", "th"])
if not cells:
continue
# 使用有效列数考虑rowspan判断是否需要调整
current_row_effective_cols = effective_cols_matrix.get(i, 0)
current_row_effective_cols = row_effective_cols[row_idx]
current_row_cols = calculate_row_columns(row)
# 如果有效列数或实际列数已经达到目标,则跳过
if current_row_effective_cols >= target_cols or current_row_cols >= target_cols:
continue
# 检查是否与参考行结构匹配
if calculate_visual_columns(row) == reference_visual_cols and check_row_columns_match(row, reference_row_copy):
# 尝试应用参考结构
if (
calculate_visual_columns(row) == reference_visual_cols
and check_row_columns_match(row, reference_row_copy)
):
if len(cells) <= len(reference_structure):
for j, cell in enumerate(cells):
if j < len(reference_structure) and reference_structure[j] > 1:
cell["colspan"] = str(reference_structure[j])
for cell_idx, cell in enumerate(cells):
if cell_idx < len(reference_structure) and reference_structure[cell_idx] > 1:
cell["colspan"] = str(reference_structure[cell_idx])
else:
# 扩展最后一个单元格以填补列数差异
# 使用有效列数来计算差异
cols_diff = target_cols - current_row_effective_cols
if cols_diff > 0:
last_cell = cells[-1]
@@ -468,121 +471,144 @@ def adjust_table_rows_colspan(soup, rows, start_idx, end_idx,
last_cell["colspan"] = str(current_last_span + cols_diff)
def perform_table_merge(soup1, soup2, previous_table_block, wait_merge_table_footnotes):
"""执行表格合并操作"""
# 检测表头有几行,并确认表头内容是否一致
header_count, headers_match, header_texts = detect_table_headers(soup1, soup2)
# logger.debug(f"检测到表头行数: {header_count}, 表头匹配: {headers_match}")
# logger.debug(f"表头内容: {header_texts}")
def perform_table_merge(
previous_state: TableMergeState,
current_state: TableMergeState,
previous_table_block,
wait_merge_table_footnotes,
):
"""执行表格合并操作."""
header_count, _, _ = detect_table_headers(previous_state, current_state)
# 找到第一个表格的tbody如果没有则查找table元素
tbody1 = soup1.find("tbody") or soup1.find("table")
# 获取表1和表2的所有行
rows1 = soup1.find_all("tr")
rows2 = soup2.find_all("tr")
rows1 = previous_state.rows
rows2 = current_state.rows
previous_adjusted = False
if rows1 and rows2 and header_count < len(rows2):
# 获取表1最后一行和表2第一个非表头行
last_row1 = rows1[-1]
first_data_row2 = rows2[header_count]
table_cols1 = previous_state.total_cols
table_cols2 = current_state.total_cols
# 计算表格总列数
table_cols1 = calculate_table_total_columns(soup1)
table_cols2 = calculate_table_total_columns(soup2)
if table_cols1 >= table_cols2:
reference_structure = [int(cell.get("colspan", 1)) for cell in last_row1.find_all(["td", "th"])]
if table_cols1 > table_cols2:
reference_structure = [
int(cell.get("colspan", 1)) for cell in last_row1.find_all(["td", "th"])
]
reference_visual_cols = calculate_visual_columns(last_row1)
# 以表1的最后一行为参考调整表2的行
adjust_table_rows_colspan(
soup2, rows2, header_count, len(rows2),
reference_structure, reference_visual_cols,
table_cols1, table_cols2, first_data_row2
rows2,
header_count,
len(rows2),
current_state.row_effective_cols,
reference_structure,
reference_visual_cols,
table_cols1,
first_data_row2,
)
else: # table_cols2 > table_cols1
reference_structure = [int(cell.get("colspan", 1)) for cell in first_data_row2.find_all(["td", "th"])]
elif table_cols2 > table_cols1:
reference_structure = [
int(cell.get("colspan", 1)) for cell in first_data_row2.find_all(["td", "th"])
]
reference_visual_cols = calculate_visual_columns(first_data_row2)
# 以表2的第一个数据行为参考调整表1的行
adjust_table_rows_colspan(
soup1, rows1, 0, len(rows1),
reference_structure, reference_visual_cols,
table_cols2, table_cols1, last_row1
rows1,
0,
len(rows1),
previous_state.row_effective_cols,
reference_structure,
reference_visual_cols,
table_cols2,
last_row1,
)
previous_adjusted = True
# 将第二个表格的行添加到第一个表格中
if tbody1:
tbody2 = soup2.find("tbody") or soup2.find("table")
if tbody2:
# 将第二个表格的行添加到第一个表格中(跳过表头行)
for row in rows2[header_count:]:
row.extract()
tbody1.append(row)
if previous_adjusted:
_refresh_table_state_metrics(previous_state)
appended_rows = rows2[header_count:]
append_start_idx = len(previous_state.rows)
merged_rows = []
if previous_state.tbody and current_state.tbody:
for row in appended_rows:
row.extract()
previous_state.tbody.append(row)
merged_rows.append(row)
previous_state.rows.extend(merged_rows)
if merged_rows:
appended_scan = _scan_rows(
merged_rows,
initial_occupied=previous_state.tail_occupied,
start_row_idx=append_start_idx,
)
previous_state.row_effective_cols.extend(appended_scan.row_effective_cols)
previous_state.total_cols = max(previous_state.total_cols, appended_scan.total_cols)
if appended_scan.last_nonempty_row_metrics is not None:
previous_state.last_data_row_metrics = appended_scan.last_nonempty_row_metrics
previous_state.tail_occupied = appended_scan.tail_occupied
# 清空previous_table_block的footnote
previous_table_block["blocks"] = [
block for block in previous_table_block["blocks"]
if block["type"] != BlockType.TABLE_FOOTNOTE
block for block in previous_table_block["blocks"] if block["type"] != BlockType.TABLE_FOOTNOTE
]
# 添加待合并表格的footnote到前一个表格中
for table_footnote in wait_merge_table_footnotes:
temp_table_footnote = table_footnote.copy()
temp_table_footnote[SplitFlag.CROSS_PAGE] = True
previous_table_block["blocks"].append(temp_table_footnote)
return str(soup1)
previous_state.dirty = True
def merge_table(page_info_list):
"""合并跨页表格"""
# 倒序遍历每一页
"""合并跨页表格."""
state_cache: dict[int, TableMergeState] = {}
merged_away_blocks: set[int] = set()
for page_idx in range(len(page_info_list) - 1, -1, -1):
# 跳过第一页,因为它没有前一页
if page_idx == 0:
continue
page_info = page_info_list[page_idx]
previous_page_info = page_info_list[page_idx - 1]
# 检查当前页是否有表格块
if not (page_info["para_blocks"] and page_info["para_blocks"][0]["type"] == BlockType.TABLE):
continue
current_table_block = page_info["para_blocks"][0]
# 检查上一页是否有表格块
if not (previous_page_info["para_blocks"] and previous_page_info["para_blocks"][-1]["type"] == BlockType.TABLE):
if not (
previous_page_info["para_blocks"]
and previous_page_info["para_blocks"][-1]["type"] == BlockType.TABLE
):
continue
current_table_block = page_info["para_blocks"][0]
previous_table_block = previous_page_info["para_blocks"][-1]
# 收集待合并表格的footnote
wait_merge_table_footnotes = [
block for block in current_table_block["blocks"]
if block["type"] == BlockType.TABLE_FOOTNOTE
]
# 检查两个表格是否可以合并
can_merge, soup1, soup2, current_html, previous_html = can_merge_tables(
current_table_block, previous_table_block
)
if not can_merge:
current_state = _get_or_create_table_state(current_table_block, state_cache)
previous_state = _get_or_create_table_state(previous_table_block, state_cache)
if current_state is None or previous_state is None:
continue
# 执行表格合并
merged_html = perform_table_merge(
soup1, soup2, previous_table_block, wait_merge_table_footnotes
wait_merge_table_footnotes = [
block for block in current_table_block["blocks"] if block["type"] == BlockType.TABLE_FOOTNOTE
]
if not can_merge_tables(current_state, previous_state):
continue
perform_table_merge(
previous_state,
current_state,
previous_table_block,
wait_merge_table_footnotes,
)
# 更新previous_table_block的html
for block in previous_table_block["blocks"]:
if (block["type"] == BlockType.TABLE_BODY and block["lines"] and block["lines"][0]["spans"]):
block["lines"][0]["spans"][0]["html"] = merged_html
break
# 删除当前页的table
merged_away_blocks.add(id(current_table_block))
for block in current_table_block["blocks"]:
block['lines'] = []
block["lines"] = []
block[SplitFlag.LINES_DELETED] = True
for state in state_cache.values():
if state.dirty and id(state.owner_block) not in merged_away_blocks:
_serialize_table_state_html(state)

View File

@@ -1 +1 @@
__version__ = "3.0.7"
__version__ = "3.0.9"