mirror of
https://github.com/opendatalab/MinerU.git
synced 2026-04-12 07:06:44 +07:00
Compare commits
13 Commits
mineru-3.0
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e16b8582b4 | ||
|
|
3ad7e0b981 | ||
|
|
f93e260f24 | ||
|
|
fede292388 | ||
|
|
7f365ce96b | ||
|
|
0f43f17fae | ||
|
|
e83395a0b3 | ||
|
|
576581d826 | ||
|
|
23f3bd5975 | ||
|
|
56f474d32a | ||
|
|
c66848a425 | ||
|
|
82788a9b8b | ||
|
|
4be86ee07a |
@@ -87,9 +87,10 @@ Options:
|
||||
|
||||
> [!NOTE]
|
||||
> Starting from this version, `mineru` is an orchestration client built on top of `mineru-api`:
|
||||
> - Without `--api-url`, the CLI launches a temporary local `mineru-api`
|
||||
> - With `--api-url`, the CLI connects to that FastAPI service directly
|
||||
> - `--url` is no longer the MinerU API address; it is the OpenAI-compatible backend URL used by server-side `vlm/hybrid-http-client`
|
||||
>
|
||||
>- Without `--api-url`, the CLI launches a temporary local `mineru-api`
|
||||
>- With `--api-url`, the CLI connects to that FastAPI service directly
|
||||
>- `--url` is no longer the MinerU API address; it is the OpenAI-compatible backend URL used by server-side `vlm/hybrid-http-client`
|
||||
|
||||
Some parameters of MinerU command line tools have equivalent environment variable configurations. Generally, environment variable configurations have higher priority than command line parameters and take effect across all command line tools.
|
||||
Here are the environment variables and their descriptions:
|
||||
|
||||
@@ -81,9 +81,10 @@ Options:
|
||||
|
||||
> [!NOTE]
|
||||
> 从当前版本开始,`mineru` 是基于 `mineru-api` 的编排客户端:
|
||||
> - 未传 `--api-url` 时,CLI 会自动拉起本地临时 `mineru-api`
|
||||
> - 传入 `--api-url` 时,CLI 会直连该 FastAPI 服务
|
||||
> - `--url` 不再表示 MinerU API 地址,而是服务端 `vlm/hybrid-http-client` 所需的 OpenAI 兼容地址
|
||||
>
|
||||
>- 未传 `--api-url` 时,CLI 会自动拉起本地临时 `mineru-api`
|
||||
>- 传入 `--api-url` 时,CLI 会直连该 FastAPI 服务
|
||||
>- `--url` 不再表示 MinerU API 地址,而是服务端 `vlm/hybrid-http-client` 所需的 OpenAI 兼容地址
|
||||
|
||||
MinerU命令行工具的某些参数存在相同功能的环境变量配置,通常环境变量配置的优先级高于命令行参数,且在所有命令行工具中都生效。
|
||||
以下是常用的环境变量及其说明:
|
||||
|
||||
@@ -32,11 +32,23 @@ CID_RATIO_THRESHOLD = 0.05
|
||||
TEXT_QUALITY_MIN_CHARS = 300
|
||||
TEXT_QUALITY_BAD_THRESHOLD = 0.03
|
||||
TEXT_QUALITY_GOOD_THRESHOLD = 0.005
|
||||
MAX_PAGE_ASPECT_RATIO = 10.0
|
||||
|
||||
_ALLOWED_CONTROL_CODES = {9, 10, 13}
|
||||
_PRIVATE_USE_AREA_START = 0xE000
|
||||
_PRIVATE_USE_AREA_END = 0xF8FF
|
||||
|
||||
|
||||
def _is_disallowed_control_unicode(unicode_code: int) -> bool:
|
||||
return (
|
||||
(
|
||||
0 <= unicode_code < 32
|
||||
or 127 <= unicode_code <= 159
|
||||
)
|
||||
and unicode_code not in _ALLOWED_CONTROL_CODES
|
||||
)
|
||||
|
||||
|
||||
def classify(pdf_bytes):
|
||||
"""
|
||||
Classify a PDF as text-based or OCR-based.
|
||||
@@ -90,6 +102,17 @@ def classify_hybrid(pdf_bytes):
|
||||
if not page_indices:
|
||||
return "ocr"
|
||||
|
||||
extreme_page_index, extreme_ratio = get_extreme_aspect_ratio_page_pdfium(
|
||||
pdf,
|
||||
page_indices,
|
||||
)
|
||||
if extreme_page_index is not None:
|
||||
logger.info(
|
||||
"Classify PDF as OCR due to extreme sampled-page aspect ratio: "
|
||||
f"page={extreme_page_index + 1}, ratio={extreme_ratio:.2f}"
|
||||
)
|
||||
return "ocr"
|
||||
|
||||
if (
|
||||
get_avg_cleaned_chars_per_page_pdfium(pdf, page_indices)
|
||||
< CHARS_THRESHOLD
|
||||
@@ -203,6 +226,24 @@ def get_sample_page_indices(page_count: int, max_pages: int = MAX_SAMPLE_PAGES):
|
||||
return sorted(indices)
|
||||
|
||||
|
||||
def get_extreme_aspect_ratio_page_pdfium(
|
||||
pdf_doc,
|
||||
page_indices,
|
||||
max_page_aspect_ratio: float = MAX_PAGE_ASPECT_RATIO,
|
||||
):
|
||||
for page_index in page_indices:
|
||||
page = pdf_doc[page_index]
|
||||
page_width, page_height = page.get_size()
|
||||
if page_width <= 0 or page_height <= 0:
|
||||
continue
|
||||
|
||||
aspect_ratio = max(page_width / page_height, page_height / page_width)
|
||||
if aspect_ratio > max_page_aspect_ratio:
|
||||
return page_index, aspect_ratio
|
||||
|
||||
return None, None
|
||||
|
||||
|
||||
def get_avg_cleaned_chars_per_page(pdf_doc, pages_to_check):
|
||||
total_chars = 0
|
||||
cleaned_total_chars = 0
|
||||
@@ -252,7 +293,7 @@ def get_text_quality_signal_pdfium(pdf_doc, page_indices):
|
||||
null_char_count += 1
|
||||
elif unicode_code == 0xFFFD:
|
||||
replacement_char_count += 1
|
||||
elif unicode_code < 32 and unicode_code not in _ALLOWED_CONTROL_CODES:
|
||||
elif _is_disallowed_control_unicode(unicode_code):
|
||||
control_char_count += 1
|
||||
elif _PRIVATE_USE_AREA_START <= unicode_code <= _PRIVATE_USE_AREA_END:
|
||||
private_use_char_count += 1
|
||||
|
||||
@@ -13,6 +13,8 @@ from mineru.utils.enum_class import BlockType, ContentType
|
||||
from mineru.utils.pdf_image_tools import get_crop_img
|
||||
from mineru.utils.pdf_text_tool import get_page
|
||||
|
||||
MAX_NATIVE_TEXT_CHARS_PER_PAGE = 65535
|
||||
|
||||
|
||||
def __replace_ligatures(text: str):
|
||||
ligatures = {
|
||||
@@ -29,6 +31,21 @@ def __replace_unicode(text: str):
|
||||
|
||||
"""pdf_text dict方案 char级别"""
|
||||
def txt_spans_extract(pdf_page, spans, pil_img, scale, all_bboxes, all_discarded_blocks):
|
||||
page_char_count = None
|
||||
try:
|
||||
page_char_count = pdf_page.get_textpage().count_chars()
|
||||
except Exception as exc:
|
||||
logger.debug(f"Failed to get page char count before txt extraction: {exc}")
|
||||
|
||||
if page_char_count is not None and page_char_count > MAX_NATIVE_TEXT_CHARS_PER_PAGE:
|
||||
logger.info(
|
||||
"Fallback to post-OCR in txt_spans_extract due to high char count: "
|
||||
f"count_chars={page_char_count}"
|
||||
)
|
||||
need_ocr_spans = [
|
||||
span for span in spans if span.get('type') == ContentType.TEXT
|
||||
]
|
||||
return _prepare_post_ocr_spans(need_ocr_spans, spans, pil_img, scale)
|
||||
|
||||
page_dict = get_page(pdf_page)
|
||||
|
||||
@@ -99,21 +116,26 @@ def txt_spans_extract(pdf_page, spans, pil_img, scale, all_bboxes, all_discarded
|
||||
|
||||
need_ocr_spans = fill_char_in_spans(new_spans, page_all_chars, median_span_height)
|
||||
|
||||
"""对未填充的span进行ocr"""
|
||||
if len(need_ocr_spans) > 0:
|
||||
return _prepare_post_ocr_spans(need_ocr_spans, spans, pil_img, scale)
|
||||
|
||||
for span in need_ocr_spans:
|
||||
# 对span的bbox截图再ocr
|
||||
span_pil_img = get_crop_img(span['bbox'], pil_img, scale)
|
||||
span_img = cv2.cvtColor(np.array(span_pil_img), cv2.COLOR_RGB2BGR)
|
||||
# 计算span的对比度,低于0.20的span不进行ocr
|
||||
if calculate_contrast(span_img, img_mode='bgr') <= 0.17:
|
||||
|
||||
def _prepare_post_ocr_spans(need_ocr_spans, spans, pil_img, scale):
|
||||
if len(need_ocr_spans) == 0:
|
||||
return spans
|
||||
|
||||
for span in need_ocr_spans:
|
||||
# 对span的bbox截图再ocr
|
||||
span_pil_img = get_crop_img(span['bbox'], pil_img, scale)
|
||||
span_img = cv2.cvtColor(np.array(span_pil_img), cv2.COLOR_RGB2BGR)
|
||||
# 计算span的对比度,低于0.17的span不进行ocr
|
||||
if calculate_contrast(span_img, img_mode='bgr') <= 0.17:
|
||||
if span in spans:
|
||||
spans.remove(span)
|
||||
continue
|
||||
continue
|
||||
|
||||
span['content'] = ''
|
||||
span['score'] = 1.0
|
||||
span['np_img'] = span_img
|
||||
span['content'] = ''
|
||||
span['score'] = 1.0
|
||||
span['np_img'] = span_img
|
||||
|
||||
return spans
|
||||
|
||||
@@ -194,28 +216,32 @@ def calculate_char_in_span(char_bbox, span_bbox, char, span_height_ratio=Span_He
|
||||
|
||||
def chars_to_content(span):
|
||||
# 检查span中的char是否为空
|
||||
if len(span['chars']) == 0:
|
||||
pass
|
||||
else:
|
||||
if len(span['chars']) != 0:
|
||||
# 给chars按char_idx排序
|
||||
span['chars'] = sorted(span['chars'], key=lambda x: x['char_idx'])
|
||||
chars = sorted(span['chars'], key=lambda x: x['char_idx'])
|
||||
|
||||
# Calculate the width of each character
|
||||
char_widths = [char['bbox'][2] - char['bbox'][0] for char in span['chars']]
|
||||
char_widths = [char['bbox'][2] - char['bbox'][0] for char in chars]
|
||||
# Calculate the median width
|
||||
median_width = statistics.median(char_widths)
|
||||
|
||||
content = ''
|
||||
for char in span['chars']:
|
||||
parts = []
|
||||
for idx, char1 in enumerate(chars):
|
||||
char2 = chars[idx + 1] if idx + 1 < len(chars) else None
|
||||
|
||||
# 如果下一个char的x0和上一个char的x1距离超过0.25个字符宽度,则需要在中间插入一个空格
|
||||
char1 = char
|
||||
char2 = span['chars'][span['chars'].index(char) + 1] if span['chars'].index(char) + 1 < len(span['chars']) else None
|
||||
if char2 and char2['bbox'][0] - char1['bbox'][2] > median_width * 0.25 and char['char'] != ' ' and char2['char'] != ' ':
|
||||
content += f"{char['char']} "
|
||||
if (
|
||||
char2
|
||||
and char2['bbox'][0] - char1['bbox'][2] > median_width * 0.25
|
||||
and char1['char'] != ' '
|
||||
and char2['char'] != ' '
|
||||
):
|
||||
parts.append(char1['char'])
|
||||
parts.append(' ')
|
||||
else:
|
||||
content += char['char']
|
||||
parts.append(char1['char'])
|
||||
|
||||
content = ''.join(parts)
|
||||
content = __replace_unicode(content)
|
||||
content = __replace_ligatures(content)
|
||||
content = __replace_ligatures(content)
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
# Copyright (c) Opendatalab. All rights reserved.
|
||||
from copy import deepcopy
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
from loguru import logger
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
from mineru.backend.vlm.vlm_middle_json_mkcontent import merge_para_with_text
|
||||
@@ -24,125 +25,254 @@ CONTINUATION_INLINE_MARKERS = [
|
||||
"(continued)",
|
||||
]
|
||||
|
||||
MAX_HEADER_ROWS = 5
|
||||
|
||||
def calculate_table_total_columns(soup):
|
||||
"""计算表格的总列数,通过分析整个表格结构来处理rowspan和colspan
|
||||
|
||||
Args:
|
||||
soup: BeautifulSoup解析的表格
|
||||
@dataclass
|
||||
class RowMetrics:
|
||||
row_idx: int
|
||||
effective_cols: int
|
||||
actual_cols: int
|
||||
visual_cols: int
|
||||
|
||||
Returns:
|
||||
int: 表格的总列数
|
||||
|
||||
@dataclass
|
||||
class RowSignature:
|
||||
effective_cols: int
|
||||
colspans: tuple[int, ...]
|
||||
rowspans: tuple[int, ...]
|
||||
normalized_texts: tuple[str, ...]
|
||||
display_texts: tuple[str, ...]
|
||||
|
||||
@property
|
||||
def cell_count(self) -> int:
|
||||
return len(self.colspans)
|
||||
|
||||
|
||||
@dataclass
|
||||
class RowScanResult:
|
||||
row_effective_cols: list[int]
|
||||
row_metrics: list[RowMetrics]
|
||||
total_cols: int
|
||||
last_nonempty_row_metrics: RowMetrics | None
|
||||
tail_occupied: dict[int, set[int]]
|
||||
|
||||
|
||||
@dataclass
|
||||
class TableMergeState:
|
||||
owner_block: dict[str, Any]
|
||||
body_span: dict[str, Any]
|
||||
soup: Any
|
||||
tbody: Any
|
||||
rows: list[Any]
|
||||
total_cols: int
|
||||
front_header_info: list[RowSignature]
|
||||
front_first_data_row_metrics: dict[int, RowMetrics]
|
||||
last_data_row_metrics: RowMetrics | None
|
||||
row_effective_cols: list[int]
|
||||
tail_occupied: dict[int, set[int]]
|
||||
dirty: bool = False
|
||||
|
||||
|
||||
def _normalize_cell_text(cell) -> str:
|
||||
return "".join(full_to_half(cell.get_text()).split())
|
||||
|
||||
|
||||
def _display_cell_text(cell) -> str:
|
||||
return full_to_half(cell.get_text().strip())
|
||||
|
||||
|
||||
def _scan_rows(rows, initial_occupied: dict[int, set[int]] | None = None, start_row_idx: int = 0) -> RowScanResult:
|
||||
"""Scan rows once and cache effective-column metrics.
|
||||
|
||||
initial_occupied stores future-row occupancy relative to the first scanned row
|
||||
and preserves rowspans that cross a merge boundary.
|
||||
"""
|
||||
rows = soup.find_all("tr")
|
||||
if not rows:
|
||||
return 0
|
||||
|
||||
# 创建一个矩阵来跟踪每个位置的占用情况
|
||||
occupied: dict[int, dict[int, bool]] = {}
|
||||
max_cols = 0
|
||||
occupied = {} # {row_idx: {col_idx: True}}
|
||||
|
||||
for row_idx, row in enumerate(rows):
|
||||
for row_offset, cols in (initial_occupied or {}).items():
|
||||
if not cols:
|
||||
continue
|
||||
occupied[row_offset] = {col: True for col in cols}
|
||||
max_cols = max(max_cols, max(cols) + 1)
|
||||
|
||||
row_effective_cols: list[int] = []
|
||||
row_metrics: list[RowMetrics] = []
|
||||
last_nonempty_row_metrics: RowMetrics | None = None
|
||||
|
||||
for local_idx, row in enumerate(rows):
|
||||
occupied_row = occupied.setdefault(local_idx, {})
|
||||
col_idx = 0
|
||||
cells = row.find_all(["td", "th"])
|
||||
|
||||
if row_idx not in occupied:
|
||||
occupied[row_idx] = {}
|
||||
actual_cols = 0
|
||||
|
||||
for cell in cells:
|
||||
# 找到下一个未被占用的列位置
|
||||
while col_idx in occupied[row_idx]:
|
||||
while col_idx in occupied_row:
|
||||
col_idx += 1
|
||||
|
||||
colspan = int(cell.get("colspan", 1))
|
||||
rowspan = int(cell.get("rowspan", 1))
|
||||
actual_cols += colspan
|
||||
|
||||
# 标记被这个单元格占用的所有位置
|
||||
for r in range(row_idx, row_idx + rowspan):
|
||||
if r not in occupied:
|
||||
occupied[r] = {}
|
||||
for c in range(col_idx, col_idx + colspan):
|
||||
occupied[r][c] = True
|
||||
for row_offset in range(rowspan):
|
||||
target_idx = local_idx + row_offset
|
||||
occupied_target = occupied.setdefault(target_idx, {})
|
||||
for col in range(col_idx, col_idx + colspan):
|
||||
occupied_target[col] = True
|
||||
|
||||
col_idx += colspan
|
||||
max_cols = max(max_cols, col_idx)
|
||||
|
||||
return max_cols
|
||||
effective_cols = max(occupied_row.keys()) + 1 if occupied_row else 0
|
||||
row_effective_cols.append(effective_cols)
|
||||
max_cols = max(max_cols, effective_cols)
|
||||
|
||||
metrics = RowMetrics(
|
||||
row_idx=start_row_idx + local_idx,
|
||||
effective_cols=effective_cols,
|
||||
actual_cols=actual_cols,
|
||||
visual_cols=len(cells),
|
||||
)
|
||||
row_metrics.append(metrics)
|
||||
if cells:
|
||||
last_nonempty_row_metrics = metrics
|
||||
|
||||
tail_occupied = {
|
||||
row_idx - len(rows): set(cols.keys())
|
||||
for row_idx, cols in occupied.items()
|
||||
if row_idx >= len(rows) and cols
|
||||
}
|
||||
|
||||
return RowScanResult(
|
||||
row_effective_cols=row_effective_cols,
|
||||
row_metrics=row_metrics,
|
||||
total_cols=max_cols,
|
||||
last_nonempty_row_metrics=last_nonempty_row_metrics,
|
||||
tail_occupied=tail_occupied,
|
||||
)
|
||||
|
||||
|
||||
def _build_row_signature(row, effective_cols: int) -> RowSignature:
|
||||
cells = row.find_all(["td", "th"])
|
||||
return RowSignature(
|
||||
effective_cols=effective_cols,
|
||||
colspans=tuple(int(cell.get("colspan", 1)) for cell in cells),
|
||||
rowspans=tuple(int(cell.get("rowspan", 1)) for cell in cells),
|
||||
normalized_texts=tuple(_normalize_cell_text(cell) for cell in cells),
|
||||
display_texts=tuple(_display_cell_text(cell) for cell in cells),
|
||||
)
|
||||
|
||||
|
||||
def _build_front_cache(rows, max_header_rows: int = MAX_HEADER_ROWS) -> tuple[list[RowSignature], dict[int, RowMetrics]]:
|
||||
front_limit = min(len(rows), max_header_rows + 1)
|
||||
front_rows = rows[:front_limit]
|
||||
front_scan = _scan_rows(front_rows)
|
||||
|
||||
front_header_info = [
|
||||
_build_row_signature(front_rows[idx], front_scan.row_effective_cols[idx])
|
||||
for idx in range(min(len(front_rows), max_header_rows))
|
||||
]
|
||||
front_first_data_row_metrics = {
|
||||
idx: metrics for idx, metrics in enumerate(front_scan.row_metrics)
|
||||
}
|
||||
return front_header_info, front_first_data_row_metrics
|
||||
|
||||
|
||||
def _find_table_body_span(table_block):
|
||||
for block in table_block["blocks"]:
|
||||
if block["type"] == BlockType.TABLE_BODY and block["lines"] and block["lines"][0]["spans"]:
|
||||
return block["lines"][0]["spans"][0]
|
||||
return None
|
||||
|
||||
|
||||
def _refresh_table_state_metrics(state: TableMergeState) -> None:
|
||||
scan = _scan_rows(state.rows)
|
||||
state.row_effective_cols = scan.row_effective_cols
|
||||
state.total_cols = scan.total_cols
|
||||
state.last_data_row_metrics = scan.last_nonempty_row_metrics
|
||||
state.tail_occupied = scan.tail_occupied
|
||||
state.front_header_info, state.front_first_data_row_metrics = _build_front_cache(state.rows)
|
||||
|
||||
|
||||
def _build_table_state(table_block, max_header_rows: int = MAX_HEADER_ROWS) -> TableMergeState | None:
|
||||
body_span = _find_table_body_span(table_block)
|
||||
if body_span is None:
|
||||
return None
|
||||
|
||||
html = body_span.get("html", "")
|
||||
if not html:
|
||||
return None
|
||||
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
tbody = soup.find("tbody") or soup.find("table")
|
||||
rows = soup.find_all("tr")
|
||||
scan = _scan_rows(rows)
|
||||
front_header_info, front_first_data_row_metrics = _build_front_cache(rows, max_header_rows=max_header_rows)
|
||||
|
||||
return TableMergeState(
|
||||
owner_block=table_block,
|
||||
body_span=body_span,
|
||||
soup=soup,
|
||||
tbody=tbody,
|
||||
rows=rows,
|
||||
total_cols=scan.total_cols,
|
||||
front_header_info=front_header_info,
|
||||
front_first_data_row_metrics=front_first_data_row_metrics,
|
||||
last_data_row_metrics=scan.last_nonempty_row_metrics,
|
||||
row_effective_cols=scan.row_effective_cols,
|
||||
tail_occupied=scan.tail_occupied,
|
||||
)
|
||||
|
||||
|
||||
def _get_or_create_table_state(
|
||||
table_block,
|
||||
state_cache: dict[int, TableMergeState],
|
||||
max_header_rows: int = MAX_HEADER_ROWS,
|
||||
) -> TableMergeState | None:
|
||||
cache_key = id(table_block)
|
||||
state = state_cache.get(cache_key)
|
||||
if state is not None:
|
||||
return state
|
||||
|
||||
state = _build_table_state(table_block, max_header_rows=max_header_rows)
|
||||
if state is not None:
|
||||
state_cache[cache_key] = state
|
||||
return state
|
||||
|
||||
|
||||
def _serialize_table_state_html(state: TableMergeState) -> None:
|
||||
state.body_span["html"] = str(state.soup)
|
||||
state.dirty = False
|
||||
|
||||
|
||||
def calculate_table_total_columns(soup):
|
||||
"""计算表格的总列数,通过分析整个表格结构来处理rowspan和colspan."""
|
||||
rows = soup.find_all("tr")
|
||||
return _scan_rows(rows).total_cols if rows else 0
|
||||
|
||||
|
||||
def build_table_occupied_matrix(soup):
|
||||
"""构建表格的占用矩阵,返回每行的有效列数
|
||||
|
||||
Args:
|
||||
soup: BeautifulSoup解析的表格
|
||||
|
||||
Returns:
|
||||
dict: {row_idx: effective_columns} 每行的有效列数(考虑rowspan占用)
|
||||
"""
|
||||
"""构建表格的占用矩阵,返回每行的有效列数."""
|
||||
rows = soup.find_all("tr")
|
||||
if not rows:
|
||||
return {}
|
||||
|
||||
occupied = {} # {row_idx: {col_idx: True}}
|
||||
row_effective_cols = {} # {row_idx: effective_columns}
|
||||
|
||||
for row_idx, row in enumerate(rows):
|
||||
col_idx = 0
|
||||
cells = row.find_all(["td", "th"])
|
||||
|
||||
if row_idx not in occupied:
|
||||
occupied[row_idx] = {}
|
||||
|
||||
for cell in cells:
|
||||
# 找到下一个未被占用的列位置
|
||||
while col_idx in occupied[row_idx]:
|
||||
col_idx += 1
|
||||
|
||||
colspan = int(cell.get("colspan", 1))
|
||||
rowspan = int(cell.get("rowspan", 1))
|
||||
|
||||
# 标记被这个单元格占用的所有位置
|
||||
for r in range(row_idx, row_idx + rowspan):
|
||||
if r not in occupied:
|
||||
occupied[r] = {}
|
||||
for c in range(col_idx, col_idx + colspan):
|
||||
occupied[r][c] = True
|
||||
|
||||
col_idx += colspan
|
||||
|
||||
# 该行的有效列数为已占用的最大列索引+1
|
||||
if occupied[row_idx]:
|
||||
row_effective_cols[row_idx] = max(occupied[row_idx].keys()) + 1
|
||||
else:
|
||||
row_effective_cols[row_idx] = 0
|
||||
|
||||
return row_effective_cols
|
||||
scan = _scan_rows(rows)
|
||||
return {
|
||||
row_idx: effective_cols
|
||||
for row_idx, effective_cols in enumerate(scan.row_effective_cols)
|
||||
}
|
||||
|
||||
|
||||
def calculate_row_effective_columns(soup, row_idx):
|
||||
"""计算指定行的有效列数(考虑rowspan占用)
|
||||
|
||||
Args:
|
||||
soup: BeautifulSoup解析的表格
|
||||
row_idx: 行索引
|
||||
|
||||
Returns:
|
||||
int: 该行的有效列数
|
||||
"""
|
||||
"""计算指定行的有效列数(考虑rowspan占用)."""
|
||||
row_effective_cols = build_table_occupied_matrix(soup)
|
||||
return row_effective_cols.get(row_idx, 0)
|
||||
|
||||
|
||||
def calculate_row_columns(row):
|
||||
"""
|
||||
计算表格行的实际列数,考虑colspan属性
|
||||
|
||||
Args:
|
||||
row: BeautifulSoup的tr元素对象
|
||||
|
||||
Returns:
|
||||
int: 行的实际列数
|
||||
"""
|
||||
"""计算表格行的实际列数,考虑colspan属性."""
|
||||
cells = row.find_all(["td", "th"])
|
||||
column_count = 0
|
||||
|
||||
@@ -154,126 +284,67 @@ def calculate_row_columns(row):
|
||||
|
||||
|
||||
def calculate_visual_columns(row):
|
||||
"""
|
||||
计算表格行的视觉列数(实际td/th单元格数量,不考虑colspan)
|
||||
|
||||
Args:
|
||||
row: BeautifulSoup的tr元素对象
|
||||
|
||||
Returns:
|
||||
int: 行的视觉列数(实际单元格数)
|
||||
"""
|
||||
"""计算表格行的视觉列数(实际td/th单元格数量,不考虑colspan)."""
|
||||
cells = row.find_all(["td", "th"])
|
||||
return len(cells)
|
||||
|
||||
|
||||
def detect_table_headers(soup1, soup2, max_header_rows=5):
|
||||
"""
|
||||
检测并比较两个表格的表头
|
||||
def detect_table_headers(state1: TableMergeState, state2: TableMergeState, max_header_rows: int = MAX_HEADER_ROWS):
|
||||
"""检测并比较两个表格的表头,仅扫描前几行."""
|
||||
front_rows1 = state1.front_header_info[:max_header_rows]
|
||||
front_rows2 = state2.front_header_info[:max_header_rows]
|
||||
|
||||
Args:
|
||||
soup1: 第一个表格的BeautifulSoup对象
|
||||
soup2: 第二个表格的BeautifulSoup对象
|
||||
max_header_rows: 最大可能的表头行数
|
||||
|
||||
Returns:
|
||||
tuple: (表头行数, 表头是否一致, 表头文本列表)
|
||||
"""
|
||||
rows1 = soup1.find_all("tr")
|
||||
rows2 = soup2.find_all("tr")
|
||||
|
||||
# 构建两个表格的有效列数矩阵
|
||||
effective_cols1 = build_table_occupied_matrix(soup1)
|
||||
effective_cols2 = build_table_occupied_matrix(soup2)
|
||||
|
||||
min_rows = min(len(rows1), len(rows2), max_header_rows)
|
||||
min_rows = min(len(front_rows1), len(front_rows2), max_header_rows)
|
||||
header_rows = 0
|
||||
headers_match = True
|
||||
header_texts = []
|
||||
|
||||
for i in range(min_rows):
|
||||
# 提取当前行的所有单元格
|
||||
cells1 = rows1[i].find_all(["td", "th"])
|
||||
cells2 = rows2[i].find_all(["td", "th"])
|
||||
|
||||
# 检查两行的结构和内容是否一致
|
||||
structure_match = True
|
||||
|
||||
# 首先检查单元格数量
|
||||
if len(cells1) != len(cells2):
|
||||
structure_match = False
|
||||
else:
|
||||
# 检查有效列数是否一致(考虑rowspan影响)
|
||||
if effective_cols1.get(i, 0) != effective_cols2.get(i, 0):
|
||||
structure_match = False
|
||||
else:
|
||||
# 然后检查单元格的属性和内容
|
||||
for cell1, cell2 in zip(cells1, cells2):
|
||||
colspan1 = int(cell1.get("colspan", 1))
|
||||
rowspan1 = int(cell1.get("rowspan", 1))
|
||||
colspan2 = int(cell2.get("colspan", 1))
|
||||
rowspan2 = int(cell2.get("rowspan", 1))
|
||||
|
||||
# 去除所有空白字符(包括空格、换行、制表符等)
|
||||
text1 = ''.join(full_to_half(cell1.get_text()).split())
|
||||
text2 = ''.join(full_to_half(cell2.get_text()).split())
|
||||
|
||||
if colspan1 != colspan2 or rowspan1 != rowspan2 or text1 != text2:
|
||||
structure_match = False
|
||||
break
|
||||
for row_idx in range(min_rows):
|
||||
row1 = front_rows1[row_idx]
|
||||
row2 = front_rows2[row_idx]
|
||||
structure_match = (
|
||||
row1.cell_count == row2.cell_count
|
||||
and row1.effective_cols == row2.effective_cols
|
||||
and row1.colspans == row2.colspans
|
||||
and row1.rowspans == row2.rowspans
|
||||
and row1.normalized_texts == row2.normalized_texts
|
||||
)
|
||||
|
||||
if structure_match:
|
||||
header_rows += 1
|
||||
row_texts = [full_to_half(cell.get_text().strip()) for cell in cells1]
|
||||
header_texts.append(row_texts) # 添加表头文本
|
||||
header_texts.append(list(row1.display_texts))
|
||||
else:
|
||||
headers_match = header_rows > 0 # 只有当至少匹配了一行时,才认为表头匹配
|
||||
headers_match = header_rows > 0
|
||||
break
|
||||
|
||||
# 如果严格匹配失败,尝试视觉一致性匹配(只比较文本内容)
|
||||
if header_rows == 0:
|
||||
header_rows, headers_match, header_texts = _detect_table_headers_visual(soup1, soup2, rows1, rows2, max_header_rows)
|
||||
header_rows, headers_match, header_texts = _detect_table_headers_visual(
|
||||
state1, state2, max_header_rows=max_header_rows
|
||||
)
|
||||
|
||||
return header_rows, headers_match, header_texts
|
||||
|
||||
|
||||
def _detect_table_headers_visual(soup1, soup2, rows1, rows2, max_header_rows=5):
|
||||
"""
|
||||
基于视觉一致性检测表头(只比较文本内容,忽略colspan/rowspan差异)
|
||||
def _detect_table_headers_visual(
|
||||
state1: TableMergeState,
|
||||
state2: TableMergeState,
|
||||
max_header_rows: int = MAX_HEADER_ROWS,
|
||||
):
|
||||
"""基于视觉一致性检测表头(只比较文本内容,忽略colspan/rowspan差异)."""
|
||||
front_rows1 = state1.front_header_info[:max_header_rows]
|
||||
front_rows2 = state2.front_header_info[:max_header_rows]
|
||||
|
||||
Args:
|
||||
soup1: 第一个表格的BeautifulSoup对象
|
||||
soup2: 第二个表格的BeautifulSoup对象
|
||||
rows1: 第一个表格的行列表
|
||||
rows2: 第二个表格的行列表
|
||||
max_header_rows: 最大可能的表头行数
|
||||
|
||||
Returns:
|
||||
tuple: (表头行数, 表头是否一致, 表头文本列表)
|
||||
"""
|
||||
# 构建两个表格的有效列数矩阵
|
||||
effective_cols1 = build_table_occupied_matrix(soup1)
|
||||
effective_cols2 = build_table_occupied_matrix(soup2)
|
||||
|
||||
min_rows = min(len(rows1), len(rows2), max_header_rows)
|
||||
min_rows = min(len(front_rows1), len(front_rows2), max_header_rows)
|
||||
header_rows = 0
|
||||
headers_match = True
|
||||
header_texts = []
|
||||
|
||||
for i in range(min_rows):
|
||||
cells1 = rows1[i].find_all(["td", "th"])
|
||||
cells2 = rows2[i].find_all(["td", "th"])
|
||||
|
||||
# 提取每行的文本内容列表(去除空白字符)
|
||||
texts1 = [''.join(full_to_half(cell.get_text()).split()) for cell in cells1]
|
||||
texts2 = [''.join(full_to_half(cell.get_text()).split()) for cell in cells2]
|
||||
|
||||
# 检查视觉一致性:文本内容完全相同,且有效列数一致
|
||||
effective_cols_match = effective_cols1.get(i, 0) == effective_cols2.get(i, 0)
|
||||
if texts1 == texts2 and effective_cols_match:
|
||||
for row_idx in range(min_rows):
|
||||
row1 = front_rows1[row_idx]
|
||||
row2 = front_rows2[row_idx]
|
||||
if row1.normalized_texts == row2.normalized_texts and row1.effective_cols == row2.effective_cols:
|
||||
header_rows += 1
|
||||
row_texts = [full_to_half(cell.get_text().strip()) for cell in cells1]
|
||||
header_texts.append(row_texts)
|
||||
header_texts.append(list(row1.display_texts))
|
||||
else:
|
||||
headers_match = header_rows > 0
|
||||
break
|
||||
@@ -284,126 +355,69 @@ def _detect_table_headers_visual(soup1, soup2, rows1, rows2, max_header_rows=5):
|
||||
return header_rows, headers_match, header_texts
|
||||
|
||||
|
||||
def can_merge_tables(current_table_block, previous_table_block):
|
||||
"""判断两个表格是否可以合并"""
|
||||
# 检查表格是否有caption和footnote
|
||||
# 计算previous_table_block中的footnote数量
|
||||
footnote_count = sum(1 for block in previous_table_block["blocks"] if block["type"] == BlockType.TABLE_FOOTNOTE)
|
||||
# 如果有TABLE_CAPTION类型的块,检查是否至少有一个以"(续)"结尾
|
||||
caption_blocks = [block for block in current_table_block["blocks"] if block["type"] == BlockType.TABLE_CAPTION]
|
||||
def can_merge_tables(current_state: TableMergeState, previous_state: TableMergeState):
|
||||
"""判断两个表格是否可以合并."""
|
||||
current_table_block = current_state.owner_block
|
||||
previous_table_block = previous_state.owner_block
|
||||
|
||||
footnote_count = sum(
|
||||
1 for block in previous_table_block["blocks"] if block["type"] == BlockType.TABLE_FOOTNOTE
|
||||
)
|
||||
caption_blocks = [
|
||||
block for block in current_table_block["blocks"] if block["type"] == BlockType.TABLE_CAPTION
|
||||
]
|
||||
if caption_blocks:
|
||||
# 检查是否至少有一个caption包含续表标识
|
||||
has_continuation_marker = False
|
||||
for block in caption_blocks:
|
||||
caption_text = full_to_half(merge_para_with_text(block).strip()).lower()
|
||||
if (
|
||||
any(caption_text.endswith(marker.lower()) for marker in CONTINUATION_END_MARKERS)
|
||||
or any(marker.lower() in caption_text for marker in CONTINUATION_INLINE_MARKERS)
|
||||
any(caption_text.endswith(marker.lower()) for marker in CONTINUATION_END_MARKERS)
|
||||
or any(marker.lower() in caption_text for marker in CONTINUATION_INLINE_MARKERS)
|
||||
):
|
||||
has_continuation_marker = True
|
||||
break
|
||||
|
||||
# 如果所有caption都不包含续表标识,则不允许合并
|
||||
if not has_continuation_marker:
|
||||
return False, None, None, None, None
|
||||
return False
|
||||
|
||||
# 如果current_table_block的caption存在续标识,放宽footnote的限制允许previous_table_block有最多一条footnote
|
||||
if footnote_count > 1:
|
||||
return False, None, None, None, None
|
||||
else:
|
||||
if footnote_count > 0:
|
||||
return False, None, None, None, None
|
||||
return False
|
||||
elif footnote_count > 0:
|
||||
return False
|
||||
|
||||
# 获取两个表格的HTML内容
|
||||
current_html = ""
|
||||
previous_html = ""
|
||||
|
||||
for block in current_table_block["blocks"]:
|
||||
if (block["type"] == BlockType.TABLE_BODY and block["lines"] and block["lines"][0]["spans"]):
|
||||
current_html = block["lines"][0]["spans"][0].get("html", "")
|
||||
|
||||
for block in previous_table_block["blocks"]:
|
||||
if (block["type"] == BlockType.TABLE_BODY and block["lines"] and block["lines"][0]["spans"]):
|
||||
previous_html = block["lines"][0]["spans"][0].get("html", "")
|
||||
|
||||
if not current_html or not previous_html:
|
||||
return False, None, None, None, None
|
||||
|
||||
# 检查表格宽度差异
|
||||
x0_t1, y0_t1, x1_t1, y1_t1 = current_table_block["bbox"]
|
||||
x0_t2, y0_t2, x1_t2, y1_t2 = previous_table_block["bbox"]
|
||||
x0_t1, _, x1_t1, _ = current_table_block["bbox"]
|
||||
x0_t2, _, x1_t2, _ = previous_table_block["bbox"]
|
||||
table1_width = x1_t1 - x0_t1
|
||||
table2_width = x1_t2 - x0_t2
|
||||
|
||||
if abs(table1_width - table2_width) / min(table1_width, table2_width) >= 0.1:
|
||||
return False, None, None, None, None
|
||||
|
||||
# 解析HTML并检查表格结构
|
||||
soup1 = BeautifulSoup(previous_html, "html.parser")
|
||||
soup2 = BeautifulSoup(current_html, "html.parser")
|
||||
|
||||
# 检查整体列数匹配
|
||||
table_cols1 = calculate_table_total_columns(soup1)
|
||||
table_cols2 = calculate_table_total_columns(soup2)
|
||||
# logger.debug(f"Table columns - Previous: {table_cols1}, Current: {table_cols2}")
|
||||
tables_match = table_cols1 == table_cols2
|
||||
|
||||
# 检查首末行列数匹配
|
||||
rows_match = check_rows_match(soup1, soup2)
|
||||
|
||||
return (tables_match or rows_match), soup1, soup2, current_html, previous_html
|
||||
|
||||
|
||||
def check_rows_match(soup1, soup2):
|
||||
"""检查表格行是否匹配"""
|
||||
rows1 = soup1.find_all("tr")
|
||||
rows2 = soup2.find_all("tr")
|
||||
|
||||
if not (rows1 and rows2):
|
||||
return False
|
||||
|
||||
# 获取第一个表的最后一行数据行索引
|
||||
last_row_idx = None
|
||||
last_row = None
|
||||
for idx in range(len(rows1) - 1, -1, -1):
|
||||
if rows1[idx].find_all(["td", "th"]):
|
||||
last_row_idx = idx
|
||||
last_row = rows1[idx]
|
||||
break
|
||||
if previous_state.total_cols == current_state.total_cols:
|
||||
return True
|
||||
|
||||
# 检测表头行数,以便获取第二个表的首个数据行
|
||||
header_count, _, _ = detect_table_headers(soup1, soup2)
|
||||
return check_rows_match(previous_state, current_state)
|
||||
|
||||
# 获取第二个表的首个数据行
|
||||
first_data_row_idx = None
|
||||
first_data_row = None
|
||||
if len(rows2) > header_count:
|
||||
first_data_row_idx = header_count
|
||||
first_data_row = rows2[header_count] # 第一个非表头行
|
||||
|
||||
if not (last_row and first_data_row):
|
||||
def check_rows_match(previous_state: TableMergeState, current_state: TableMergeState):
|
||||
"""检查表格边界行是否匹配."""
|
||||
last_row_metrics = previous_state.last_data_row_metrics
|
||||
if last_row_metrics is None:
|
||||
return False
|
||||
|
||||
# 计算有效列数(考虑rowspan和colspan)
|
||||
last_row_effective_cols = calculate_row_effective_columns(soup1, last_row_idx)
|
||||
first_row_effective_cols = calculate_row_effective_columns(soup2, first_data_row_idx)
|
||||
header_count, _, _ = detect_table_headers(previous_state, current_state)
|
||||
first_data_row_metrics = current_state.front_first_data_row_metrics.get(header_count)
|
||||
if first_data_row_metrics is None:
|
||||
return False
|
||||
|
||||
# 计算实际列数(仅考虑colspan)和视觉列数
|
||||
last_row_cols = calculate_row_columns(last_row)
|
||||
first_row_cols = calculate_row_columns(first_data_row)
|
||||
last_row_visual_cols = calculate_visual_columns(last_row)
|
||||
first_row_visual_cols = calculate_visual_columns(first_data_row)
|
||||
|
||||
# logger.debug(f"行列数 - 前表最后一行: {last_row_cols}(有效列数:{last_row_effective_cols}, 视觉列数:{last_row_visual_cols}), 当前表首行: {first_row_cols}(有效列数:{first_row_effective_cols}, 视觉列数:{first_row_visual_cols})")
|
||||
|
||||
# 同时考虑有效列数匹配、实际列数匹配和视觉列数匹配
|
||||
return (last_row_effective_cols == first_row_effective_cols or
|
||||
last_row_cols == first_row_cols or
|
||||
last_row_visual_cols == first_row_visual_cols)
|
||||
return (
|
||||
last_row_metrics.effective_cols == first_data_row_metrics.effective_cols
|
||||
or last_row_metrics.actual_cols == first_data_row_metrics.actual_cols
|
||||
or last_row_metrics.visual_cols == first_data_row_metrics.visual_cols
|
||||
)
|
||||
|
||||
|
||||
def check_row_columns_match(row1, row2):
|
||||
# 逐个cell检测colspan属性是否一致
|
||||
cells1 = row1.find_all(["td", "th"])
|
||||
cells2 = row2.find_all(["td", "th"])
|
||||
if len(cells1) != len(cells2):
|
||||
@@ -416,51 +430,40 @@ def check_row_columns_match(row1, row2):
|
||||
return True
|
||||
|
||||
|
||||
def adjust_table_rows_colspan(soup, rows, start_idx, end_idx,
|
||||
reference_structure, reference_visual_cols,
|
||||
target_cols, current_cols, reference_row):
|
||||
"""调整表格行的colspan属性以匹配目标列数
|
||||
def adjust_table_rows_colspan(
|
||||
rows,
|
||||
start_idx,
|
||||
end_idx,
|
||||
row_effective_cols,
|
||||
reference_structure,
|
||||
reference_visual_cols,
|
||||
target_cols,
|
||||
match_reference_row,
|
||||
):
|
||||
"""调整表格行的colspan属性以匹配目标列数."""
|
||||
reference_row_copy = deepcopy(match_reference_row)
|
||||
|
||||
Args:
|
||||
soup: BeautifulSoup解析的表格对象(用于计算有效列数)
|
||||
rows: 表格行列表
|
||||
start_idx: 起始行索引
|
||||
end_idx: 结束行索引(不包含)
|
||||
reference_structure: 参考行的colspan结构列表
|
||||
reference_visual_cols: 参考行的视觉列数
|
||||
target_cols: 目标总列数
|
||||
current_cols: 当前总列数
|
||||
reference_row: 参考行对象
|
||||
"""
|
||||
reference_row_copy = deepcopy(reference_row)
|
||||
|
||||
# 构建有效列数矩阵
|
||||
effective_cols_matrix = build_table_occupied_matrix(soup)
|
||||
|
||||
for i in range(start_idx, end_idx):
|
||||
row = rows[i]
|
||||
for row_idx in range(start_idx, end_idx):
|
||||
row = rows[row_idx]
|
||||
cells = row.find_all(["td", "th"])
|
||||
if not cells:
|
||||
continue
|
||||
|
||||
# 使用有效列数(考虑rowspan)判断是否需要调整
|
||||
current_row_effective_cols = effective_cols_matrix.get(i, 0)
|
||||
current_row_effective_cols = row_effective_cols[row_idx]
|
||||
current_row_cols = calculate_row_columns(row)
|
||||
|
||||
# 如果有效列数或实际列数已经达到目标,则跳过
|
||||
if current_row_effective_cols >= target_cols or current_row_cols >= target_cols:
|
||||
continue
|
||||
|
||||
# 检查是否与参考行结构匹配
|
||||
if calculate_visual_columns(row) == reference_visual_cols and check_row_columns_match(row, reference_row_copy):
|
||||
# 尝试应用参考结构
|
||||
if (
|
||||
calculate_visual_columns(row) == reference_visual_cols
|
||||
and check_row_columns_match(row, reference_row_copy)
|
||||
):
|
||||
if len(cells) <= len(reference_structure):
|
||||
for j, cell in enumerate(cells):
|
||||
if j < len(reference_structure) and reference_structure[j] > 1:
|
||||
cell["colspan"] = str(reference_structure[j])
|
||||
for cell_idx, cell in enumerate(cells):
|
||||
if cell_idx < len(reference_structure) and reference_structure[cell_idx] > 1:
|
||||
cell["colspan"] = str(reference_structure[cell_idx])
|
||||
else:
|
||||
# 扩展最后一个单元格以填补列数差异
|
||||
# 使用有效列数来计算差异
|
||||
cols_diff = target_cols - current_row_effective_cols
|
||||
if cols_diff > 0:
|
||||
last_cell = cells[-1]
|
||||
@@ -468,121 +471,144 @@ def adjust_table_rows_colspan(soup, rows, start_idx, end_idx,
|
||||
last_cell["colspan"] = str(current_last_span + cols_diff)
|
||||
|
||||
|
||||
def perform_table_merge(soup1, soup2, previous_table_block, wait_merge_table_footnotes):
|
||||
"""执行表格合并操作"""
|
||||
# 检测表头有几行,并确认表头内容是否一致
|
||||
header_count, headers_match, header_texts = detect_table_headers(soup1, soup2)
|
||||
# logger.debug(f"检测到表头行数: {header_count}, 表头匹配: {headers_match}")
|
||||
# logger.debug(f"表头内容: {header_texts}")
|
||||
def perform_table_merge(
|
||||
previous_state: TableMergeState,
|
||||
current_state: TableMergeState,
|
||||
previous_table_block,
|
||||
wait_merge_table_footnotes,
|
||||
):
|
||||
"""执行表格合并操作."""
|
||||
header_count, _, _ = detect_table_headers(previous_state, current_state)
|
||||
|
||||
# 找到第一个表格的tbody,如果没有则查找table元素
|
||||
tbody1 = soup1.find("tbody") or soup1.find("table")
|
||||
|
||||
# 获取表1和表2的所有行
|
||||
rows1 = soup1.find_all("tr")
|
||||
rows2 = soup2.find_all("tr")
|
||||
rows1 = previous_state.rows
|
||||
rows2 = current_state.rows
|
||||
|
||||
previous_adjusted = False
|
||||
|
||||
if rows1 and rows2 and header_count < len(rows2):
|
||||
# 获取表1最后一行和表2第一个非表头行
|
||||
last_row1 = rows1[-1]
|
||||
first_data_row2 = rows2[header_count]
|
||||
table_cols1 = previous_state.total_cols
|
||||
table_cols2 = current_state.total_cols
|
||||
|
||||
# 计算表格总列数
|
||||
table_cols1 = calculate_table_total_columns(soup1)
|
||||
table_cols2 = calculate_table_total_columns(soup2)
|
||||
if table_cols1 >= table_cols2:
|
||||
reference_structure = [int(cell.get("colspan", 1)) for cell in last_row1.find_all(["td", "th"])]
|
||||
if table_cols1 > table_cols2:
|
||||
reference_structure = [
|
||||
int(cell.get("colspan", 1)) for cell in last_row1.find_all(["td", "th"])
|
||||
]
|
||||
reference_visual_cols = calculate_visual_columns(last_row1)
|
||||
# 以表1的最后一行为参考,调整表2的行
|
||||
adjust_table_rows_colspan(
|
||||
soup2, rows2, header_count, len(rows2),
|
||||
reference_structure, reference_visual_cols,
|
||||
table_cols1, table_cols2, first_data_row2
|
||||
rows2,
|
||||
header_count,
|
||||
len(rows2),
|
||||
current_state.row_effective_cols,
|
||||
reference_structure,
|
||||
reference_visual_cols,
|
||||
table_cols1,
|
||||
first_data_row2,
|
||||
)
|
||||
|
||||
else: # table_cols2 > table_cols1
|
||||
reference_structure = [int(cell.get("colspan", 1)) for cell in first_data_row2.find_all(["td", "th"])]
|
||||
elif table_cols2 > table_cols1:
|
||||
reference_structure = [
|
||||
int(cell.get("colspan", 1)) for cell in first_data_row2.find_all(["td", "th"])
|
||||
]
|
||||
reference_visual_cols = calculate_visual_columns(first_data_row2)
|
||||
# 以表2的第一个数据行为参考,调整表1的行
|
||||
adjust_table_rows_colspan(
|
||||
soup1, rows1, 0, len(rows1),
|
||||
reference_structure, reference_visual_cols,
|
||||
table_cols2, table_cols1, last_row1
|
||||
rows1,
|
||||
0,
|
||||
len(rows1),
|
||||
previous_state.row_effective_cols,
|
||||
reference_structure,
|
||||
reference_visual_cols,
|
||||
table_cols2,
|
||||
last_row1,
|
||||
)
|
||||
previous_adjusted = True
|
||||
|
||||
# 将第二个表格的行添加到第一个表格中
|
||||
if tbody1:
|
||||
tbody2 = soup2.find("tbody") or soup2.find("table")
|
||||
if tbody2:
|
||||
# 将第二个表格的行添加到第一个表格中(跳过表头行)
|
||||
for row in rows2[header_count:]:
|
||||
row.extract()
|
||||
tbody1.append(row)
|
||||
if previous_adjusted:
|
||||
_refresh_table_state_metrics(previous_state)
|
||||
|
||||
appended_rows = rows2[header_count:]
|
||||
append_start_idx = len(previous_state.rows)
|
||||
merged_rows = []
|
||||
|
||||
if previous_state.tbody and current_state.tbody:
|
||||
for row in appended_rows:
|
||||
row.extract()
|
||||
previous_state.tbody.append(row)
|
||||
merged_rows.append(row)
|
||||
|
||||
previous_state.rows.extend(merged_rows)
|
||||
|
||||
if merged_rows:
|
||||
appended_scan = _scan_rows(
|
||||
merged_rows,
|
||||
initial_occupied=previous_state.tail_occupied,
|
||||
start_row_idx=append_start_idx,
|
||||
)
|
||||
previous_state.row_effective_cols.extend(appended_scan.row_effective_cols)
|
||||
previous_state.total_cols = max(previous_state.total_cols, appended_scan.total_cols)
|
||||
if appended_scan.last_nonempty_row_metrics is not None:
|
||||
previous_state.last_data_row_metrics = appended_scan.last_nonempty_row_metrics
|
||||
previous_state.tail_occupied = appended_scan.tail_occupied
|
||||
|
||||
# 清空previous_table_block的footnote
|
||||
previous_table_block["blocks"] = [
|
||||
block for block in previous_table_block["blocks"]
|
||||
if block["type"] != BlockType.TABLE_FOOTNOTE
|
||||
block for block in previous_table_block["blocks"] if block["type"] != BlockType.TABLE_FOOTNOTE
|
||||
]
|
||||
# 添加待合并表格的footnote到前一个表格中
|
||||
for table_footnote in wait_merge_table_footnotes:
|
||||
temp_table_footnote = table_footnote.copy()
|
||||
temp_table_footnote[SplitFlag.CROSS_PAGE] = True
|
||||
previous_table_block["blocks"].append(temp_table_footnote)
|
||||
|
||||
return str(soup1)
|
||||
previous_state.dirty = True
|
||||
|
||||
|
||||
def merge_table(page_info_list):
|
||||
"""合并跨页表格"""
|
||||
# 倒序遍历每一页
|
||||
"""合并跨页表格."""
|
||||
state_cache: dict[int, TableMergeState] = {}
|
||||
merged_away_blocks: set[int] = set()
|
||||
|
||||
for page_idx in range(len(page_info_list) - 1, -1, -1):
|
||||
# 跳过第一页,因为它没有前一页
|
||||
if page_idx == 0:
|
||||
continue
|
||||
|
||||
page_info = page_info_list[page_idx]
|
||||
previous_page_info = page_info_list[page_idx - 1]
|
||||
|
||||
# 检查当前页是否有表格块
|
||||
if not (page_info["para_blocks"] and page_info["para_blocks"][0]["type"] == BlockType.TABLE):
|
||||
continue
|
||||
|
||||
current_table_block = page_info["para_blocks"][0]
|
||||
|
||||
# 检查上一页是否有表格块
|
||||
if not (previous_page_info["para_blocks"] and previous_page_info["para_blocks"][-1]["type"] == BlockType.TABLE):
|
||||
if not (
|
||||
previous_page_info["para_blocks"]
|
||||
and previous_page_info["para_blocks"][-1]["type"] == BlockType.TABLE
|
||||
):
|
||||
continue
|
||||
|
||||
current_table_block = page_info["para_blocks"][0]
|
||||
previous_table_block = previous_page_info["para_blocks"][-1]
|
||||
|
||||
# 收集待合并表格的footnote
|
||||
wait_merge_table_footnotes = [
|
||||
block for block in current_table_block["blocks"]
|
||||
if block["type"] == BlockType.TABLE_FOOTNOTE
|
||||
]
|
||||
|
||||
# 检查两个表格是否可以合并
|
||||
can_merge, soup1, soup2, current_html, previous_html = can_merge_tables(
|
||||
current_table_block, previous_table_block
|
||||
)
|
||||
|
||||
if not can_merge:
|
||||
current_state = _get_or_create_table_state(current_table_block, state_cache)
|
||||
previous_state = _get_or_create_table_state(previous_table_block, state_cache)
|
||||
if current_state is None or previous_state is None:
|
||||
continue
|
||||
|
||||
# 执行表格合并
|
||||
merged_html = perform_table_merge(
|
||||
soup1, soup2, previous_table_block, wait_merge_table_footnotes
|
||||
wait_merge_table_footnotes = [
|
||||
block for block in current_table_block["blocks"] if block["type"] == BlockType.TABLE_FOOTNOTE
|
||||
]
|
||||
|
||||
if not can_merge_tables(current_state, previous_state):
|
||||
continue
|
||||
|
||||
perform_table_merge(
|
||||
previous_state,
|
||||
current_state,
|
||||
previous_table_block,
|
||||
wait_merge_table_footnotes,
|
||||
)
|
||||
|
||||
# 更新previous_table_block的html
|
||||
for block in previous_table_block["blocks"]:
|
||||
if (block["type"] == BlockType.TABLE_BODY and block["lines"] and block["lines"][0]["spans"]):
|
||||
block["lines"][0]["spans"][0]["html"] = merged_html
|
||||
break
|
||||
|
||||
# 删除当前页的table
|
||||
merged_away_blocks.add(id(current_table_block))
|
||||
for block in current_table_block["blocks"]:
|
||||
block['lines'] = []
|
||||
block["lines"] = []
|
||||
block[SplitFlag.LINES_DELETED] = True
|
||||
|
||||
for state in state_cache.values():
|
||||
if state.dirty and id(state.owner_block) not in merged_away_blocks:
|
||||
_serialize_table_state_html(state)
|
||||
|
||||
@@ -1 +1 @@
|
||||
__version__ = "3.0.7"
|
||||
__version__ = "3.0.9"
|
||||
|
||||
Reference in New Issue
Block a user