feat: enhance table merging logic with effective column calculations and visual consistency checks

This commit is contained in:
myhloli
2026-01-16 18:58:10 +08:00
parent c73c1d3847
commit df07baea6c

View File

@@ -70,6 +70,69 @@ def calculate_table_total_columns(soup):
return max_cols
def build_table_occupied_matrix(soup):
"""构建表格的占用矩阵,返回每行的有效列数
Args:
soup: BeautifulSoup解析的表格
Returns:
dict: {row_idx: effective_columns} 每行的有效列数考虑rowspan占用
"""
rows = soup.find_all("tr")
if not rows:
return {}
occupied = {} # {row_idx: {col_idx: True}}
row_effective_cols = {} # {row_idx: effective_columns}
for row_idx, row in enumerate(rows):
col_idx = 0
cells = row.find_all(["td", "th"])
if row_idx not in occupied:
occupied[row_idx] = {}
for cell in cells:
# 找到下一个未被占用的列位置
while col_idx in occupied[row_idx]:
col_idx += 1
colspan = int(cell.get("colspan", 1))
rowspan = int(cell.get("rowspan", 1))
# 标记被这个单元格占用的所有位置
for r in range(row_idx, row_idx + rowspan):
if r not in occupied:
occupied[r] = {}
for c in range(col_idx, col_idx + colspan):
occupied[r][c] = True
col_idx += colspan
# 该行的有效列数为已占用的最大列索引+1
if occupied[row_idx]:
row_effective_cols[row_idx] = max(occupied[row_idx].keys()) + 1
else:
row_effective_cols[row_idx] = 0
return row_effective_cols
def calculate_row_effective_columns(soup, row_idx):
"""计算指定行的有效列数考虑rowspan占用
Args:
soup: BeautifulSoup解析的表格
row_idx: 行索引
Returns:
int: 该行的有效列数
"""
row_effective_cols = build_table_occupied_matrix(soup)
return row_effective_cols.get(row_idx, 0)
def calculate_row_columns(row):
"""
计算表格行的实际列数考虑colspan属性
@@ -119,6 +182,10 @@ def detect_table_headers(soup1, soup2, max_header_rows=5):
rows1 = soup1.find_all("tr")
rows2 = soup2.find_all("tr")
# 构建两个表格的有效列数矩阵
effective_cols1 = build_table_occupied_matrix(soup1)
effective_cols2 = build_table_occupied_matrix(soup2)
min_rows = min(len(rows1), len(rows2), max_header_rows)
header_rows = 0
headers_match = True
@@ -136,20 +203,24 @@ def detect_table_headers(soup1, soup2, max_header_rows=5):
if len(cells1) != len(cells2):
structure_match = False
else:
# 然后检查单元格的属性和内容
for cell1, cell2 in zip(cells1, cells2):
colspan1 = int(cell1.get("colspan", 1))
rowspan1 = int(cell1.get("rowspan", 1))
colspan2 = int(cell2.get("colspan", 1))
rowspan2 = int(cell2.get("rowspan", 1))
# 检查有效列数是否一致考虑rowspan影响
if effective_cols1.get(i, 0) != effective_cols2.get(i, 0):
structure_match = False
else:
# 然后检查单元格的属性和内容
for cell1, cell2 in zip(cells1, cells2):
colspan1 = int(cell1.get("colspan", 1))
rowspan1 = int(cell1.get("rowspan", 1))
colspan2 = int(cell2.get("colspan", 1))
rowspan2 = int(cell2.get("rowspan", 1))
# 去除所有空白字符(包括空格、换行、制表符等)
text1 = ''.join(full_to_half(cell1.get_text()).split())
text2 = ''.join(full_to_half(cell2.get_text()).split())
# 去除所有空白字符(包括空格、换行、制表符等)
text1 = ''.join(full_to_half(cell1.get_text()).split())
text2 = ''.join(full_to_half(cell2.get_text()).split())
if colspan1 != colspan2 or rowspan1 != rowspan2 or text1 != text2:
structure_match = False
break
if colspan1 != colspan2 or rowspan1 != rowspan2 or text1 != text2:
structure_match = False
break
if structure_match:
header_rows += 1
@@ -159,7 +230,54 @@ def detect_table_headers(soup1, soup2, max_header_rows=5):
headers_match = header_rows > 0 # 只有当至少匹配了一行时,才认为表头匹配
break
# 如果没有找到匹配的表头行,则返回失败
# 如果严格匹配失败,尝试视觉一致性匹配(只比较文本内容)
if header_rows == 0:
header_rows, headers_match, header_texts = _detect_table_headers_visual(soup1, soup2, rows1, rows2, max_header_rows)
return header_rows, headers_match, header_texts
def _detect_table_headers_visual(soup1, soup2, rows1, rows2, max_header_rows=5):
"""
基于视觉一致性检测表头只比较文本内容忽略colspan/rowspan差异
Args:
soup1: 第一个表格的BeautifulSoup对象
soup2: 第二个表格的BeautifulSoup对象
rows1: 第一个表格的行列表
rows2: 第二个表格的行列表
max_header_rows: 最大可能的表头行数
Returns:
tuple: (表头行数, 表头是否一致, 表头文本列表)
"""
# 构建两个表格的有效列数矩阵
effective_cols1 = build_table_occupied_matrix(soup1)
effective_cols2 = build_table_occupied_matrix(soup2)
min_rows = min(len(rows1), len(rows2), max_header_rows)
header_rows = 0
headers_match = True
header_texts = []
for i in range(min_rows):
cells1 = rows1[i].find_all(["td", "th"])
cells2 = rows2[i].find_all(["td", "th"])
# 提取每行的文本内容列表(去除空白字符)
texts1 = [''.join(full_to_half(cell.get_text()).split()) for cell in cells1]
texts2 = [''.join(full_to_half(cell.get_text()).split()) for cell in cells2]
# 检查视觉一致性:文本内容完全相同,且有效列数一致
effective_cols_match = effective_cols1.get(i, 0) == effective_cols2.get(i, 0)
if texts1 == texts2 and effective_cols_match:
header_rows += 1
row_texts = [full_to_half(cell.get_text().strip()) for cell in cells1]
header_texts.append(row_texts)
else:
headers_match = header_rows > 0
break
if header_rows == 0:
headers_match = False
@@ -244,34 +362,44 @@ def check_rows_match(soup1, soup2):
if not (rows1 and rows2):
return False
# 获取第一个表的最后一行数据行
# 获取第一个表的最后一行数据行索引
last_row_idx = None
last_row = None
for row in reversed(rows1):
if row.find_all(["td", "th"]):
last_row = row
for idx in range(len(rows1) - 1, -1, -1):
if rows1[idx].find_all(["td", "th"]):
last_row_idx = idx
last_row = rows1[idx]
break
# 检测表头行数,以便获取第二个表的首个数据行
header_count, _, _ = detect_table_headers(soup1, soup2)
# 获取第二个表的首个数据行
first_data_row_idx = None
first_data_row = None
if len(rows2) > header_count:
first_data_row_idx = header_count
first_data_row = rows2[header_count] # 第一个非表头行
if not (last_row and first_data_row):
return False
# 计算实际列数考虑colspan和视觉列数
# 计算有效列数(考虑rowspan和colspan
last_row_effective_cols = calculate_row_effective_columns(soup1, last_row_idx)
first_row_effective_cols = calculate_row_effective_columns(soup2, first_data_row_idx)
# 计算实际列数仅考虑colspan和视觉列数
last_row_cols = calculate_row_columns(last_row)
first_row_cols = calculate_row_columns(first_data_row)
last_row_visual_cols = calculate_visual_columns(last_row)
first_row_visual_cols = calculate_visual_columns(first_data_row)
# logger.debug(f"行列数 - 前表最后一行: {last_row_cols}(视觉列数:{last_row_visual_cols}), 当前表首行: {first_row_cols}(视觉列数:{first_row_visual_cols})")
# logger.debug(f"行列数 - 前表最后一行: {last_row_cols}(有效列数:{last_row_effective_cols}, 视觉列数:{last_row_visual_cols}), 当前表首行: {first_row_cols}(有效列数:{first_row_effective_cols}, 视觉列数:{first_row_visual_cols})")
# 同时考虑实际列数匹配和视觉列数匹配
return last_row_cols == first_row_cols or last_row_visual_cols == first_row_visual_cols
# 同时考虑有效列数匹配、实际列数匹配和视觉列数匹配
return (last_row_effective_cols == first_row_effective_cols or
last_row_cols == first_row_cols or
last_row_visual_cols == first_row_visual_cols)
def check_row_columns_match(row1, row2):
@@ -288,12 +416,13 @@ def check_row_columns_match(row1, row2):
return True
def adjust_table_rows_colspan(rows, start_idx, end_idx,
def adjust_table_rows_colspan(soup, rows, start_idx, end_idx,
reference_structure, reference_visual_cols,
target_cols, current_cols, reference_row):
"""调整表格行的colspan属性以匹配目标列数
Args:
soup: BeautifulSoup解析的表格对象用于计算有效列数
rows: 表格行列表
start_idx: 起始行索引
end_idx: 结束行索引(不包含)
@@ -305,14 +434,21 @@ def adjust_table_rows_colspan(rows, start_idx, end_idx,
"""
reference_row_copy = deepcopy(reference_row)
# 构建有效列数矩阵
effective_cols_matrix = build_table_occupied_matrix(soup)
for i in range(start_idx, end_idx):
row = rows[i]
cells = row.find_all(["td", "th"])
if not cells:
continue
# 使用有效列数考虑rowspan判断是否需要调整
current_row_effective_cols = effective_cols_matrix.get(i, 0)
current_row_cols = calculate_row_columns(row)
if current_row_cols >= target_cols:
# 如果有效列数或实际列数已经达到目标,则跳过
if current_row_effective_cols >= target_cols or current_row_cols >= target_cols:
continue
# 检查是否与参考行结构匹配
@@ -324,9 +460,12 @@ def adjust_table_rows_colspan(rows, start_idx, end_idx,
cell["colspan"] = str(reference_structure[j])
else:
# 扩展最后一个单元格以填补列数差异
last_cell = cells[-1]
current_last_span = int(last_cell.get("colspan", 1))
last_cell["colspan"] = str(current_last_span + (target_cols - current_cols))
# 使用有效列数来计算差异
cols_diff = target_cols - current_row_effective_cols
if cols_diff > 0:
last_cell = cells[-1]
current_last_span = int(last_cell.get("colspan", 1))
last_cell["colspan"] = str(current_last_span + cols_diff)
def perform_table_merge(soup1, soup2, previous_table_block, wait_merge_table_footnotes):
@@ -357,7 +496,7 @@ def perform_table_merge(soup1, soup2, previous_table_block, wait_merge_table_foo
reference_visual_cols = calculate_visual_columns(last_row1)
# 以表1的最后一行为参考调整表2的行
adjust_table_rows_colspan(
rows2, header_count, len(rows2),
soup2, rows2, header_count, len(rows2),
reference_structure, reference_visual_cols,
table_cols1, table_cols2, first_data_row2
)
@@ -367,7 +506,7 @@ def perform_table_merge(soup1, soup2, previous_table_block, wait_merge_table_foo
reference_visual_cols = calculate_visual_columns(first_data_row2)
# 以表2的第一个数据行为参考调整表1的行
adjust_table_rows_colspan(
rows1, 0, len(rows1),
soup1, rows1, 0, len(rows1),
reference_structure, reference_visual_cols,
table_cols2, table_cols1, last_row1
)
@@ -446,4 +585,10 @@ def merge_table(page_info_list):
# 删除当前页的table
for block in current_table_block["blocks"]:
block['lines'] = []
block[SplitFlag.LINES_DELETED] = True
block[SplitFlag.LINES_DELETED] = True