chore: tools update

This commit is contained in:
Alter-xyz
2025-05-17 22:16:47 +08:00
parent 10f1f354db
commit 7679eb4b29
6 changed files with 1158 additions and 1339 deletions

View File

@@ -1,560 +0,0 @@
import yaml # pip install pyyaml
import re
import datetime
from pathlib import Path
import shutil
class Config:
# --- Path Setup ---
BASE_DIR = Path(__file__).resolve().parent.parent
LANGUAGES = ["zh", "en", "ja"] # Languages to process
# Still useful for potential internal archiving if needed
TIMESTAMP = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# --- Directory Naming Templates ---
# This is the directory we look for, operate within, and is the final name.
LANG_DIR_TEMPLATE = "plugin_dev_{lang}"
# Prefix for archiving a LANG_DIR_TEMPLATE if (for some external reason)
# we wanted to back it up before processing. Not used in the main flow currently
# but kept as a utility.
ARCHIVE_LANG_DIR_PREFIX_TEMPLATE = "plugin_dev_{lang}_archive_pre_processing_"
# --- PWXY Mappings ---
PRIMARY_TYPE_MAP = {
"conceptual": 1,
"implementation": 2,
"operational": 3,
"reference": 4,
}
DEFAULT_W = 0
DETAIL_TYPE_MAPS = {
"conceptual": {"introduction": 1, "principles": 2, "architecture": 3},
"implementation": {"basic": 1, "standard": 2, "high": 3, "advanced": 4},
"operational": {"setup": 1, "deployment": 2, "maintenance": 3},
"reference": {"core": 1, "configuration": 2, "examples": 3},
}
DEFAULT_X = 0
LEVEL_MAP = {
"beginner": 1,
"intermediate": 2,
"advanced": 3,
}
DEFAULT_Y = 0
PRIORITY_NORMAL = 0
PRIORITY_HIGH = 9
PRIORITY_ADVANCED_LEVEL_KEY = "advanced"
PRIORITY_IMPLEMENTATION_PRIMARY_KEY = "implementation"
PRIORITY_IMPLEMENTATION_DETAIL_KEYS = {"high", "advanced"}
# --- Helper Functions ---
def extract_front_matter(content: str):
match = re.match(r"^\s*---\s*$(.*?)^---\s*$(.*)",
content, re.DOTALL | re.MULTILINE)
if match:
yaml_str = match.group(1).strip()
markdown_content = match.group(2).strip()
try:
front_matter = yaml.safe_load(yaml_str)
if front_matter is None: # Handles empty YAML (--- \n ---)
return {}, markdown_content
return (
front_matter if isinstance(front_matter, dict) else {}
), markdown_content
except yaml.YAMLError as e:
print(f" [Error] YAML Parsing Failed: {e}")
return None, content # Indicate error
else:
return {}, content # No front matter found
def sanitize_filename_part(part: str) -> str:
if not isinstance(part, str):
part = str(part)
part = part.lower()
part = part.replace("&", "and").replace("@", "at")
part = re.sub(r"\s+", "-", part)
# Allow dots for language suffix in stem
part = re.sub(r"[^\w\-.]+", "", part)
part = part.strip(".-_")
return part or "untitled"
def _calculate_pwxy_and_warnings(front_matter: dict, config: Config) -> tuple[int, int, int, int, list[str]]:
"""Calculates P, W, X, Y values and generates warnings for missing/unmapped data."""
warnings_messages = []
dimensions = front_matter.get("dimensions", {})
type_info = dimensions.get("type", {})
primary = type_info.get("primary")
detail = type_info.get("detail")
level = dimensions.get("level")
P = config.PRIORITY_NORMAL
if level == config.PRIORITY_ADVANCED_LEVEL_KEY:
P = config.PRIORITY_HIGH
if (
primary == config.PRIORITY_IMPLEMENTATION_PRIMARY_KEY
and detail in config.PRIORITY_IMPLEMENTATION_DETAIL_KEYS
):
P = config.PRIORITY_HIGH
W = config.PRIMARY_TYPE_MAP.get(primary, config.DEFAULT_W)
primary_detail_map = config.DETAIL_TYPE_MAPS.get(primary, {})
X = primary_detail_map.get(detail, config.DEFAULT_X)
Y = config.LEVEL_MAP.get(level, config.DEFAULT_Y)
if primary is None:
warnings_messages.append("Missing dimensions.type.primary")
elif W == config.DEFAULT_W:
warnings_messages.append(
f"Unmapped primary type: '{primary}'. Using W={config.DEFAULT_W}")
if detail is None:
warnings_messages.append("Missing dimensions.type.detail")
elif X == config.DEFAULT_X and primary in config.DETAIL_TYPE_MAPS:
warnings_messages.append(
f"Unmapped detail type: '{detail}' for primary '{primary}'. Using X={config.DEFAULT_X}")
elif primary not in config.DETAIL_TYPE_MAPS and primary is not None:
warnings_messages.append(
f"No detail map defined for primary type: '{primary}'. Using X={config.DEFAULT_X}")
if level is None:
warnings_messages.append("Missing dimensions.level")
elif Y == config.DEFAULT_Y:
warnings_messages.append(
f"Unmapped level: '{level}'. Using Y={config.DEFAULT_Y}")
return P, W, X, Y, warnings_messages
def _generate_filename_parts(
P: int, W: int, X: int, Y: int,
front_matter: dict,
original_filename_stem: str
) -> tuple[str | None, str, str, list[str]]:
"""Generates padded prefix, sanitized title, language suffix, and any warnings."""
warnings_messages = []
prefix_str = f"{P}{W}{X}{Y}"
try:
numeric_prefix = int(prefix_str)
padded_prefix = f"{numeric_prefix:04d}"
except ValueError:
warnings_messages.append(
f"Could not form numeric prefix from P={P},W={W},X={X},Y={Y}. Using '0000'.")
padded_prefix = "0000"
standard_title = front_matter.get("standard_title")
title_part_to_use = standard_title
if not title_part_to_use:
warnings_messages.append(
"Missing 'standard_title'. Using original filename stem as fallback.")
title_part_to_use = original_filename_stem
sanitized_title = sanitize_filename_part(title_part_to_use)
lang_suffix = ""
language_fm = front_matter.get("language")
if language_fm:
lang_code = str(language_fm).strip().lower()
if lang_code:
lang_suffix = f".{lang_code}"
else:
warnings_messages.append(
"Empty 'language' field in frontmatter. Omitting language suffix.")
else:
warnings_messages.append(
"Missing 'language' field in frontmatter. Omitting language suffix.")
return padded_prefix, sanitized_title, lang_suffix, warnings_messages
# --- Core Processing Functions ---
def get_or_create_lang_dir(lang: str, config: Config) -> tuple[Path | None, bool]:
"""
Identifies the language-specific directory. Creates it if it doesn't exist.
This directory will be processed in-place.
Returns:
- Path | None: The path to the language directory, or None on critical error.
- bool: True if the directory was newly created (was_absent), False otherwise.
"""
lang_dir_name = config.LANG_DIR_TEMPLATE.format(lang=lang)
lang_dir_path = config.BASE_DIR / lang_dir_name
was_newly_created = False
if lang_dir_path.exists():
if not lang_dir_path.is_dir():
print(
f"[ERROR] Path '{lang_dir_path}' exists but is not a directory. Skipping language '{lang}'.")
return None, False
print(
f"Using existing directory '{lang_dir_path.name}' for in-place processing of '{lang}'.")
else:
print(
f"Directory '{lang_dir_path.name}' not found for language '{lang}'. Creating it.")
try:
# exist_ok=False to ensure it's new
lang_dir_path.mkdir(parents=True, exist_ok=False)
was_newly_created = True
print(f"Created directory: '{lang_dir_path.name}' for '{lang}'.")
except FileExistsError: # Should not happen due to prior .exists() check, but for safety
print(
f"[ERROR] Directory '{lang_dir_path.name}' unexpectedly created by another process. Attempting to use it.")
if not lang_dir_path.is_dir(): # Verify it's a dir
print(
f"[ERROR] Path '{lang_dir_path}' is not a directory after attempted creation. Skipping language '{lang}'.")
return None, False
was_newly_created = False # It existed.
except OSError as e:
print(
f"[ERROR] Failed to create directory '{lang_dir_path}': {e}. Skipping language '{lang}'.")
return None, False
return lang_dir_path, was_newly_created
def archive_existing_directory(path_to_archive: Path, archive_prefix_template: str, lang: str, timestamp: str) -> bool:
"""
Archives the given directory if it exists.
The archive_prefix_template should be like "plugin_dev_{lang}_archive_".
Returns True if path is clear for use (was archived or didn't exist), False on error or if path is not a dir.
"""
if path_to_archive.exists():
if path_to_archive.is_dir():
archive_base_name = archive_prefix_template.format(lang=lang)
archive_dir_name = f"{archive_base_name}{timestamp}"
archive_dir_path = path_to_archive.parent / archive_dir_name
try:
if archive_dir_path.exists(): # Safety: if target archive name exists, remove it.
print(
f" [Warning] Archive destination '{archive_dir_path}' already exists. Removing it first to avoid error during move.")
shutil.rmtree(archive_dir_path)
shutil.move(str(path_to_archive), str(archive_dir_path))
print(
f" Archived existing directory '{path_to_archive.name}' to '{archive_dir_path.name}'.")
return True # Path is now clear because original was moved
except OSError as e:
print(
f" [Error] Failed to archive existing directory '{path_to_archive.name}' to '{archive_dir_path.name}': {e}")
return False
else:
print(
f" [Error] Path '{path_to_archive}' exists but is not a directory. Cannot archive.")
return False
return True # Path didn't exist, so it's clear
def process_single_mdx_file(
mdx_filepath: Path,
config: Config
) -> dict:
"""
Processes a single MDX file: extracts metadata, generates new filename,
and renames the file in place.
Returns stats, including old and new filename stems if renamed.
"""
stats = {
"status": "processed",
"warnings": [],
"error_message": None,
"old_filename_stem_for_replace": None,
"new_filename_stem_for_replace": None,
}
display_path = mdx_filepath.name
if mdx_filepath.parent != config.BASE_DIR:
try:
# Show relative path from the language directory's parent (BASE_DIR)
display_path = mdx_filepath.relative_to(
mdx_filepath.parent.parent).as_posix()
except ValueError:
display_path = mdx_filepath.relative_to(config.BASE_DIR).as_posix()
file_warnings = []
try:
content = mdx_filepath.read_text(encoding="utf-8")
front_matter, _ = extract_front_matter(content)
if front_matter is None:
stats["status"] = "error"
stats["error_message"] = "YAML Error in file."
print(f"\nProcessing: {display_path}")
print(f" [Skipping] {stats['error_message']}")
return stats
P, W, X, Y, pwxy_warnings = _calculate_pwxy_and_warnings(
front_matter, config)
file_warnings.extend(pwxy_warnings)
original_stem_for_title_fallback = mdx_filepath.stem # Used if standard_title is missing
padded_prefix, sanitized_title, lang_suffix, fname_warnings = _generate_filename_parts(
P, W, X, Y, front_matter, original_stem_for_title_fallback
)
file_warnings.extend(fname_warnings)
# padded_prefix has a fallback to "0000", so it should not be None
new_filename = f"{padded_prefix}-{sanitized_title}{lang_suffix}.mdx"
new_filepath = mdx_filepath.with_name(new_filename)
if new_filepath == mdx_filepath:
stats["status"] = "skipped_no_change"
elif new_filepath.exists():
stats["status"] = "skipped_target_exists"
else:
try:
original_stem_before_rename = mdx_filepath.stem # Capture actual stem before rename
mdx_filepath.rename(new_filepath)
stats["status"] = "processed"
# Store stems for content replacement phase
stats["old_filename_stem_for_replace"] = original_stem_before_rename
stats["new_filename_stem_for_replace"] = new_filepath.stem
except Exception as rename_error:
stats["status"] = "error"
stats["error_message"] = f"Failed to rename file to '{new_filename}': {rename_error}"
# Defer printing to main loop for consistency
return stats
stats["warnings"] = file_warnings
action_taken = new_filepath != mdx_filepath and stats["status"] == "processed"
# Only print details if there are warnings or an actual change/error for this file
if file_warnings or action_taken or stats["status"].startswith("error") or stats["status"] == "skipped_target_exists":
print(
f"\nProcessing: {display_path} -> {new_filename if action_taken else '(no change or skipped)'}")
for warning_msg in file_warnings:
print(f" [Warning] {warning_msg}")
if stats["status"] == "skipped_target_exists":
print(
f" [Skipping] Target filename '{new_filename}' already exists in this directory.")
if stats["error_message"]:
print(f" [Error] {stats['error_message']}")
except FileNotFoundError:
stats["status"] = "error"
stats["error_message"] = f"File not found during processing: {mdx_filepath}"
print(f"\nProcessing: {display_path}")
print(f" [Error] {stats['error_message']}")
except Exception as e:
stats["status"] = "error"
stats["error_message"] = f"Unexpected error: {e}"
print(f"\nProcessing: {display_path}")
print(f" [Error] Unexpected error processing file: {e}")
import traceback
traceback.print_exc()
return stats
def run_processing_for_language(
lang_dir_path: Path,
config: Config
) -> dict:
"""Processes all MDX files in the lang_dir_path by renaming them in place,
then updates internal content references."""
print(f"Starting in-place processing for: {lang_dir_path.name}")
lang_stats = {
"processed_count": 0,
"skipped_no_change_count": 0,
"skipped_target_exists_count": 0,
"error_count": 0,
"warning_files_count": 0,
"status": "OK",
"dir_path_str": str(lang_dir_path.relative_to(config.BASE_DIR)),
"content_replacements_made_count": 0,
"content_replacement_errors_count": 0,
}
if not lang_dir_path.exists() or not lang_dir_path.is_dir():
print(
f"[Error] Language directory '{lang_dir_path.name}' does not exist or is not a directory.")
lang_stats["status"] = "LANG_DIR_ERROR"
return lang_stats
# --- Phase 1: Rename files ---
print(f"\n--- Phase 1: Renaming files in '{lang_dir_path.name}' ---")
mdx_files = sorted(list(lang_dir_path.rglob("*.mdx")))
total_files = len(mdx_files)
print(f"Found {total_files} MDX files to process for renaming.")
rename_mappings = [] # List to store (old_stem, new_stem) for content replacement
for i, mdx_filepath in enumerate(mdx_files):
result = process_single_mdx_file(mdx_filepath, config)
if result["status"] == "processed":
lang_stats["processed_count"] += 1
# Check if stems were provided and different (meaning a rename happened)
old_stem = result.get("old_filename_stem_for_replace")
new_stem = result.get("new_filename_stem_for_replace")
if old_stem and new_stem and old_stem != new_stem:
rename_mappings.append((old_stem, new_stem))
elif result["status"] == "skipped_no_change":
lang_stats["skipped_no_change_count"] += 1
elif result["status"] == "skipped_target_exists":
lang_stats["skipped_target_exists_count"] += 1
elif result["status"] == "error":
lang_stats["error_count"] += 1
if result["warnings"]:
lang_stats["warning_files_count"] += 1
if total_files > 0:
progress = (i + 1) / total_files * 100
print(
f"Rename Progress ({lang_dir_path.name}): {i+1}/{total_files} files ({progress:.1f}%) evaluated.", end="\r")
if total_files > 0:
print() # Newline after progress bar
print("--- Phase 1: Renaming files complete. ---")
# --- Phase 2: Update content references ---
if rename_mappings:
print(f"\n--- Phase 2: Updating content references in '{lang_dir_path.name}' ---")
print(f"Found {len(rename_mappings)} filename changes to propagate.")
# Re-glob for files, as their names might have changed.
# Also, we need to process all files, not just the renamed ones.
all_mdx_files_after_rename = sorted(list(lang_dir_path.rglob("*.mdx")))
total_files_for_replacement = len(all_mdx_files_after_rename)
print(f"Scanning {total_files_for_replacement} .mdx files for content updates.")
files_content_updated = 0
for i, file_to_scan_path in enumerate(all_mdx_files_after_rename):
try:
original_content = file_to_scan_path.read_text(encoding="utf-8")
modified_content = original_content
file_actually_changed_by_replacement = False
for old_stem, new_stem in rename_mappings:
if old_stem in modified_content: # Check if old_stem exists before replacing
temp_content = modified_content.replace(old_stem, new_stem)
if temp_content != modified_content:
modified_content = temp_content
file_actually_changed_by_replacement = True
if file_actually_changed_by_replacement:
file_to_scan_path.write_text(modified_content, encoding="utf-8")
files_content_updated +=1
print(f" Updated references in: {file_to_scan_path.relative_to(lang_dir_path)}")
except Exception as e:
print(f" [Error] Failed to update references in {file_to_scan_path.name}: {e}")
lang_stats["content_replacement_errors_count"] += 1
if total_files_for_replacement > 0:
progress = (i + 1) / total_files_for_replacement * 100
print(
f"Content Update Progress ({lang_dir_path.name}): {i+1}/{total_files_for_replacement} files ({progress:.1f}%) scanned.", end="\r")
if total_files_for_replacement > 0:
print() # Newline after progress bar
lang_stats["content_replacements_made_count"] = files_content_updated
print(f"Content replacement phase: {files_content_updated} files had their content updated.")
print("--- Phase 2: Content references update complete. ---")
else:
print("\nNo renames occurred, skipping content reference update phase.")
print("-" * 20)
print(f"Language Processing Summary ({lang_dir_path.name}):")
print(f" Successfully processed (renamed): {lang_stats['processed_count']}")
print(f" Checked (filename no change): {lang_stats['skipped_no_change_count']}")
print(f" Skipped (target filename exists): {lang_stats['skipped_target_exists_count']}")
print(f" Files with warnings: {lang_stats['warning_files_count']}")
print(f" Errors during file processing: {lang_stats['error_count']}")
if rename_mappings: # Only show if phase 2 ran
print(f" Files with content updated (references): {lang_stats['content_replacements_made_count']}")
print(f" Errors during content update: {lang_stats['content_replacement_errors_count']}")
print("-" * 20)
if lang_stats["error_count"] > 0 or lang_stats["content_replacement_errors_count"] > 0:
lang_stats["status"] = "ERRORS_IN_PROCESSING"
return lang_stats
# --- Main Orchestration ---
def main():
config = Config()
print(f"Base directory: {config.BASE_DIR}")
print(f"Timestamp for this run: {config.TIMESTAMP}")
overall_summary = {}
lang_dir_newly_created_flags = {}
lang_dirs_map = {}
for lang in config.LANGUAGES:
print(f"\n{'='*10} Processing Language: {lang.upper()} {'='*10}")
current_lang_dir, was_newly_created = get_or_create_lang_dir(
lang, config)
lang_dir_newly_created_flags[lang] = was_newly_created
lang_dirs_map[lang] = current_lang_dir
if not current_lang_dir:
overall_summary[lang] = {
"status": "SETUP_ERROR", "message": f"Failed to get or create language directory for {lang}."}
continue
lang_results = run_processing_for_language(current_lang_dir, config)
overall_summary[lang] = lang_results
if current_lang_dir:
if lang_results["status"] in ["OK", "ERRORS_IN_PROCESSING"]:
if was_newly_created and current_lang_dir.exists() and not any(current_lang_dir.iterdir()):
try:
current_lang_dir.rmdir()
print(
f" Removed empty newly created language directory: {current_lang_dir.name}")
lang_dirs_map[lang] = None
lang_results["message"] = lang_results.get(
"message", "") + " Empty newly created directory removed."
except OSError as e:
print(
f" Note: Could not remove empty newly created directory '{current_lang_dir.name}': {e}")
print("\n\n" + "=" * 20 + " Overall Script Summary " + "=" * 20)
for lang_code in config.LANGUAGES:
summary = overall_summary.get(lang_code, {})
lang_dir_path_obj = lang_dirs_map.get(lang_code)
print(f"\nLanguage: {lang_code.upper()}")
status = summary.get("status", "UNKNOWN")
print(f" Status: {status}")
if "message" in summary:
print(f" Message: {summary['message']}")
if status not in ["SETUP_ERROR", "SETUP_ERROR_POST_ARCHIVE", "PRE_ARCHIVE_ERROR", "LANG_DIR_ERROR"]:
print(f" Directory: {summary.get('dir_path_str', 'N/A')}")
print(
f" Processed (renamed): {summary.get('processed_count', 0)}")
print(
f" Checked (no name change): {summary.get('skipped_no_change_count', 0)}")
print(
f" Skipped (target exists): {summary.get('skipped_target_exists_count', 0)}")
print(
f" Files with Warnings: {summary.get('warning_files_count', 0)}")
print(
f" Errors during file processing: {summary.get('error_count', 0)}")
if summary.get('processed_count', 0) > 0 or "content_replacements_made_count" in summary : # Show only if relevant
print(f" Files with content updated (references): {summary.get('content_replacements_made_count',0)}")
print(f" Errors during content update: {summary.get('content_replacement_errors_count',0)}")
if lang_dir_path_obj and lang_dir_path_obj.exists():
print(f" Final directory location: {lang_dir_path_obj.name}")
elif lang_dir_newly_created_flags.get(lang_code) and not lang_dir_path_obj:
print(" Note: Empty newly created directory was removed as expected.")
elif not lang_dir_path_obj and status != "SETUP_ERROR":
print(
f" Note: Language directory '{config.LANG_DIR_TEMPLATE.format(lang=lang_code)}' may have been archived or removed.")
print("=" * (40 + len(" Overall Script Summary ")))
print("\nScript finished. Please review changes and commit to Git if satisfied.")
if __name__ == "__main__":
main()

View File

@@ -1,777 +0,0 @@
import json
import os
import re
from collections import defaultdict
# --- 配置 ---
refresh = True # 如果为 True将清空指定版本的 tabs
DOCS_JSON_PATH = "docs.json"
# --- 简体中文配置docs_config ---
PLUGIN_DEV_ZH = {
"DOCS_DIR": "plugin_dev_zh", # 插件开发文档目录
"LANGUAGE_CODE": "简体中文", # 注意:虽然变量名是 LANGUAGE_CODE但会部署为 docs.json 中的 'version' 值。
"FILE_EXTENSION": ".zh.mdx",
"TARGET_TAB_NAME": "插件开发", # 新增:目标 Tab 名称
"FILENAME_PATTERN": re.compile(r"^(\d{4})-(.*?)\.zh\.mdx$"), # 新增:文件名匹配模式
"PWX_TO_GROUP_MAP": {
# --- PWX 到 Group 名称的映射 (统一到 "插件开发" Tab) ---
# (P, W, X) -> (tab_name, group_name, nested_group_name)
# Tab: 插件开发
# Group: 概念与入门
("0", "1", "1"): ("插件开发", "概念与入门", "概览"),
("0", "1", "3"): ("插件开发", "概念与入门", None),
# Group: 开发实践
("0", "2", "1"): ("插件开发", "开发实践", "快速开始"),
("0", "2", "2"): ("插件开发", "开发实践", "开发 Dify 插件"),
# Group: 贡献与发布
("0", "3", "1"): ("插件开发", "贡献与发布", "行为准则与规范"),
("0", "3", "2"): ("插件开发", "贡献与发布", "发布与上架"),
("0", "3", "3"): ("插件开发", "贡献与发布", "常见问题解答"),
# Group: 实践案例与示例
("0", "4", "3"): ("插件开发", "实践案例与示例", "开发示例"),
# Group: 高级开发
("9", "2", "2"): ("插件开发", "高级开发", "Extension 与 Agent"),
("9", "2", "3"): ("插件开发", "高级开发", "Extension 与 Agent"),
("9", "4", "3"): ("插件开发", "高级开发", "Extension 与 Agent"),
("9", "2", "4"): ("插件开发", "高级开发", "反向调用"),
# Group: Reference & Specifications
("0", "4", "1"): ("插件开发", "Reference & Specifications", "核心规范与功能"),
},
"DESIRED_GROUP_ORDER": [
"概念与入门",
"开发实践",
"贡献与发布",
"实践案例与示例",
"高级开发",
"Reference & Specifications", # 确保这个在最后
],
}
# --- English Configuration ---
PLUGIN_DEV_EN = {
"DOCS_DIR": "plugin_dev_en", # Plugin development documentation directory
# Note: Although the variable name is LANGUAGE_CODE, it will be deployed as the 'version' value in docs.json.
"LANGUAGE_CODE": "English",
"FILE_EXTENSION": ".en.mdx",
"TARGET_TAB_NAME": "Plugin Development",
"FILENAME_PATTERN": re.compile(r"^(\d{4})-(.*?)\.en\.mdx$"),
"PWX_TO_GROUP_MAP": {
# --- PWX to Group Name Mapping (Unified under the "Plugin Development" Tab) ---
# (P, W, X) -> (tab_name, group_name, nested_group_name)
# Tab: Plugin Development
# Group: Concepts & Getting Started
("0", "1", "1"): (
"Plugin Development",
"Concepts & Getting Started",
"Overview",
),
("0", "1", "3"): ("Plugin Development", "Concepts & Getting Started", None),
# Group: Development Practices
("0", "2", "1"): ("Plugin Development", "Development Practices", "Quick Start"),
("0", "2", "2"): (
"Plugin Development",
"Development Practices",
"Developing Dify Plugins",
),
# Group: Contribution & Publishing
("0", "3", "1"): (
"Plugin Development",
"Contribution & Publishing",
"Code of Conduct & Standards",
),
("0", "3", "2"): (
"Plugin Development",
"Contribution & Publishing",
"Publishing & Listing",
),
("0", "3", "3"): ("Plugin Development", "Contribution & Publishing", "FAQ"),
# Group: Examples & Use Cases
("0", "4", "3"): (
"Plugin Development",
"Examples & Use Cases",
"Development Examples",
),
# Group: Advanced Development
("9", "2", "2"): (
"Plugin Development",
"Advanced Development",
"Extension & Agent",
),
("9", "2", "3"): (
"Plugin Development",
"Advanced Development",
"Extension & Agent",
),
("9", "4", "3"): (
"Plugin Development",
"Advanced Development",
"Extension & Agent",
),
("9", "2", "4"): (
"Plugin Development",
"Advanced Development",
"Reverse Calling",
),
# Group: Reference & Specifications
("0", "4", "1"): (
"Plugin Development",
"Reference & Specifications",
"Core Specifications & Features",
),
},
"DESIRED_GROUP_ORDER": [
"Concepts & Getting Started",
"Development Practices",
"Contribution & Publishing",
"Examples & Use Cases",
"Advanced Development",
"Reference & Specifications", # Ensure this is last
],
}
# --- 日本語設定 (Japanese Configuration) ---
PLUGIN_DEV_JA = {
"DOCS_DIR": "plugin_dev_ja", # プラグイン開発ドキュメントディレクトリ
"LANGUAGE_CODE": "日本語", # 注意:変数名は LANGUAGE_CODE ですが、docs.json の 'version' 値としてデプロイされます。
"FILE_EXTENSION": ".ja.mdx",
"TARGET_TAB_NAME": "プラグイン開発", # 対象タブ名
"FILENAME_PATTERN": re.compile(
r"^(\d{4})-(.*?)\.ja\.mdx$"
), # ファイル名照合パターン
"PWX_TO_GROUP_MAP": {
# --- PWX からグループ名へのマッピング(「プラグイン開発」タブに統一)---
# (P, W, X) -> (tab_name, group_name, nested_group_name)
# Tab: プラグイン開発
# Group: 概念と概要
("0", "1", "1"): ("プラグイン開発", "概念と概要", "概要"),
("0", "1", "3"): ("プラグイン開発", "概念と概要", None),
# Group: 開発実践
("0", "2", "1"): ("プラグイン開発", "開発実践", "クイックスタート"),
("0", "2", "2"): ("プラグイン開発", "開発実践", "Difyプラグインの開発"),
# Group: 貢献と公開
("0", "3", "1"): ("プラグイン開発", "貢献と公開", "行動規範と基準"),
("0", "3", "2"): ("プラグイン開発", "貢献と公開", "公開と掲載"),
("0", "3", "3"): ("プラグイン開発", "貢献と公開", "よくある質問 (FAQ)"),
# Group: 実践例とユースケース
("0", "4", "3"): ("プラグイン開発", "実践例とユースケース", "開発例"),
# Group: 高度な開発
("9", "2", "2"): ("プラグイン開発", "高度な開発", "Extension と Agent"),
("9", "2", "3"): ("プラグイン開発", "高度な開発", "Extension と Agent"),
("9", "4", "3"): ("プラグイン開発", "高度な開発", "Extension と Agent"),
("9", "2", "4"): (
"プラグイン開発",
"高度な開発",
"リバースコール",
), # Reverse Calling
# Group: リファレンスと仕様
("0", "4", "1"): ("プラグイン開発", "リファレンスと仕様", "コア仕様と機能"),
},
"DESIRED_GROUP_ORDER": [
"概念と概要",
"開発実践",
"貢献と公開",
"実践例とユースケース",
"高度な開発",
"リファレンスと仕様", # これが最後になるように確認
],
}
# --- 辅助函数 ---
def clear_tabs_if_refresh(navigation_data, version_code, target_tab_name, do_refresh):
"""如果 do_refresh 为 True则查找指定版本和目标 Tab并清空其 groups 列表"""
if not do_refresh:
return False # 未执行清空
if not navigation_data or "versions" not in navigation_data:
print("警告: 'navigation.versions' 未找到,无法清空 tabs。")
return False
version_found = False
tab_found_and_cleared = False
for version_nav in navigation_data.get("versions", []):
if version_nav.get("version") == version_code:
version_found = True
target_tab = None
if "tabs" in version_nav and isinstance(version_nav["tabs"], list):
for tab in version_nav["tabs"]:
if isinstance(tab, dict) and tab.get("tab") == target_tab_name:
target_tab = tab
break
if target_tab:
if "groups" in target_tab:
target_tab["groups"] = []
print(
f"信息: 已清空版本 '{version_code}' 下 Tab '{target_tab_name}' 的 groups (因为 refresh=True)。"
)
tab_found_and_cleared = True
else:
# 如果 'groups' 不存在,也视为一种“清空”状态,或者可以创建一个空的
target_tab["groups"] = []
print(
f"信息: 版本 '{version_code}' 下 Tab '{target_tab_name}' 没有 'groups' 键,已确保其为空列表 (因为 refresh=True)。"
)
tab_found_and_cleared = True
else:
print(
f"警告: 在版本 '{version_code}' 中未找到目标 Tab '{target_tab_name}',无法清空其 groups。"
)
break # 找到版本后即可退出循环
if not version_found:
print(f"警告: 未找到版本 '{version_code}',无法清空任何 Tab。")
return False
return tab_found_and_cleared
def get_page_path(
filename, docs_config
): # docs_config 参数保留,但 FILE_EXTENSION 不再用于此处的后缀移除
"""从 mdx 文件名获取 mintlify 页面路径 (固定去掉末尾 .mdx 后缀)"""
docs_dir = docs_config["DOCS_DIR"]
# 固定移除末尾的 .mdx以保留 .zh 或 .en 等语言标识
if filename.endswith(".mdx"):
base_filename = filename[: -len(".mdx")]
else:
# 如果不以 .mdx 结尾,则引发错误,因为这是预期格式
raise ValueError(f"错误: 文件名 '{filename}' 不以 '.mdx' 结尾,无法处理。")
return os.path.join(docs_dir, base_filename)
def extract_existing_pages(navigation_data, version_code, target_tab_name):
"""递归提取指定版本和目标 Tab 下所有已存在的页面路径"""
existing_pages = set()
target_version_nav = None
target_tab_nav = None # 新增:用于存储找到的目标 Tab 对象
if not navigation_data or "versions" not in navigation_data:
print("警告: 'navigation.versions' 未找到")
return existing_pages, None, None # 返回三个值
# 查找目标版本
for version_nav in navigation_data.get("versions", []):
if version_nav.get("version") == version_code:
target_version_nav = version_nav
break
if not target_version_nav:
print(f"警告: 版本 '{version_code}' 在 docs.json 中未找到")
return existing_pages, None, None # 返回三个值
# 在目标版本中查找目标 Tab
if "tabs" in target_version_nav and isinstance(target_version_nav["tabs"], list):
for tab in target_version_nav["tabs"]:
if isinstance(tab, dict) and tab.get("tab") == target_tab_name:
target_tab_nav = tab # 存储找到的 Tab 对象
# 仅从目标 Tab 中提取页面
for group in tab.get("groups", []):
if isinstance(group, dict):
_recursive_extract(group, existing_pages)
break # 找到目标 Tab 后即可退出循环
else: # 'tabs' might not exist or not be a list
target_version_nav["tabs"] = []
if not target_tab_nav:
print(
f"警告: 在版本 '{version_code}' 中未找到 Tab '{target_tab_name}',无法提取现有页面。"
)
# 即使 Tab 不存在,也返回版本导航对象,以便后续可能创建 Tab
return existing_pages, target_version_nav, None
# 返回提取到的页面、版本导航对象和目标 Tab 对象
return existing_pages, target_version_nav, target_tab_nav
def _recursive_extract(group_item, pages_set):
"""递归辅助函数"""
# Ensure group_item is a dictionary before proceeding
if not isinstance(group_item, dict):
return
if "pages" in group_item and isinstance(group_item["pages"], list):
for page in group_item["pages"]:
if isinstance(page, str):
pages_set.add(page)
elif isinstance(page, dict) and "group" in page:
# Recurse into nested groups
_recursive_extract(page, pages_set)
def remove_obsolete_pages(target_tab_data, pages_to_remove):
"""递归地从目标 Tab 的 groups 结构中移除失效页面路径。
注意:此函数直接修改传入的 target_tab_data 字典。
"""
if not isinstance(target_tab_data, dict) or "groups" not in target_tab_data:
# 如果输入不是预期的 Tab 结构,则直接返回
return
groups = target_tab_data.get("groups", [])
if not isinstance(groups, list):
# 如果 groups 不是列表,也无法处理
return
# 使用索引迭代以安全地移除项
i = 0
while i < len(groups):
group_item = groups[i]
if isinstance(group_item, dict):
# 递归处理 group 内部的 pages
_remove_obsolete_from_group(group_item, pages_to_remove)
# 如果处理后 group 的 pages 为空(且没有嵌套 group可以选择移除该 group
# 当前逻辑:保留空 group 结构
if not group_item.get("pages"):
print(
f"信息: Group '{group_item.get('group')}' 清理后为空,已保留结构。"
)
i += 1
else:
# 如果 groups 列表中包含非字典项(不符合预期),则跳过
i += 1
def _remove_obsolete_from_group(group_dict, pages_to_remove):
"""辅助函数,递归处理单个 group 或 nested group 内的 pages"""
if not isinstance(group_dict, dict) or "pages" not in group_dict:
return
pages = group_dict.get("pages", [])
if not isinstance(pages, list):
return
new_pages = []
for page_item in pages:
if isinstance(page_item, str):
if page_item not in pages_to_remove:
new_pages.append(page_item)
else:
print(f" - {page_item} (从 Group '{group_dict.get('group')}' 移除)")
elif isinstance(page_item, dict) and "group" in page_item:
# 递归处理嵌套的 group
_remove_obsolete_from_group(page_item, pages_to_remove)
# 保留嵌套 group 结构,即使它变空
if page_item or page_item.get("pages"): # 检查字典是否为空或 pages 是否存在
new_pages.append(page_item)
else:
print(
f"信息: 嵌套 Group '{page_item.get('group')}' 清理后为空,已保留结构。"
)
new_pages.append(page_item) # 仍然添加空的嵌套组结构
else:
# 保留无法识别的项
new_pages.append(page_item)
group_dict["pages"] = new_pages
def find_or_create_target_group(
target_version_nav, tab_name, group_name, nested_group_name
):
# 注意target_version_nav 是特定版本对象,例如 {"version": "简体中文", "tabs": [...]}
target_tab = None
# Ensure 'tabs' exists and is a list
if "tabs" not in target_version_nav or not isinstance(
target_version_nav["tabs"], list
):
target_version_nav["tabs"] = []
for tab in target_version_nav["tabs"]:
if isinstance(tab, dict) and tab.get("tab") == tab_name:
target_tab = tab
break
if target_tab is None:
target_tab = {"tab": tab_name, "groups": []}
target_version_nav["tabs"].append(target_tab)
target_group = None
# Ensure 'groups' exists and is a list
if "groups" not in target_tab or not isinstance(target_tab["groups"], list):
target_tab["groups"] = []
for group in target_tab["groups"]:
if isinstance(group, dict) and group.get("group") == group_name:
target_group = group
break
if target_group is None:
target_group = {"group": group_name, "pages": []}
target_tab["groups"].append(target_group)
# Ensure 'pages' exists in the target_group and is a list
if "pages" not in target_group or not isinstance(target_group["pages"], list):
target_group["pages"] = []
# Default container is the top-level group's pages list
target_pages_container = target_group["pages"]
if nested_group_name:
target_nested_group = None
# Find existing nested group
for item in target_group["pages"]:
if isinstance(item, dict) and item.get("group") == nested_group_name:
target_nested_group = item
# Ensure pages list exists in nested group
target_pages_container = target_nested_group.setdefault("pages", [])
# Ensure it's actually a list after setdefault
if not isinstance(target_pages_container, list):
target_nested_group["pages"] = []
target_pages_container = target_nested_group["pages"]
break
# If not found, create it
if target_nested_group is None:
target_nested_group = {"group": nested_group_name, "pages": []}
# Check if target_group['pages'] is already the container we want to add to
# This logic assumes nested groups are *always* dicts within the parent's 'pages' list
target_group["pages"].append(target_nested_group)
target_pages_container = target_nested_group["pages"]
# Final check before returning
if not isinstance(target_pages_container, list):
# 这表示内部逻辑错误,应该引发异常
raise RuntimeError(
f"内部错误: 无法为 Tab='{tab_name}', Group='{group_name}', Nested='{nested_group_name}' 获取有效的 pages 列表。"
)
return target_pages_container
# --- 主逻辑 ---
def get_group_sort_key(group_dict, docs_config):
"""为排序提供 key根据 DESIRED_GROUP_ORDER 返回索引,未知组放在最后"""
group_name = group_dict.get("group", "")
desired_order = docs_config["DESIRED_GROUP_ORDER"]
try:
return desired_order.index(group_name)
except ValueError:
return len(desired_order) # 将未在列表中的组排在最后
def main(
docs_config, navigation_data
): # navigation_data: 传入内存中的 navigation 字典供直接修改
"""处理单个文档配置,并直接修改传入的 navigation_data"""
print(
f"\n--- 开始处理版本: {docs_config['LANGUAGE_CODE']} / Tab: {docs_config['TARGET_TAB_NAME']} ---"
)
# 从 docs_config 获取配置值
language_code = docs_config["LANGUAGE_CODE"]
docs_dir = docs_config["DOCS_DIR"]
file_extension = docs_config["FILE_EXTENSION"]
pwx_to_group_map = docs_config["PWX_TO_GROUP_MAP"]
filename_pattern = docs_config["FILENAME_PATTERN"] # 使用配置中的 pattern
target_tab_name = docs_config["TARGET_TAB_NAME"] # 使用配置中的 tab name
# 1. 清理或准备版本导航 (不再加载 JSON直接使用传入的 navigation_data)
navigation = navigation_data # 使用传入的 navigation 对象进行操作
# 使用 language_code 和 target_tab_name 清理目标 Tab
was_refreshed = clear_tabs_if_refresh(
navigation, language_code, target_tab_name, refresh
)
if was_refreshed:
print(f"继续执行 Tab '{target_tab_name}' 的后续页面提取和添加操作...")
# 2. 提取目标 Tab 的现有页面或创建版本/Tab 导航
existing_pages, target_version_nav, target_tab_nav = extract_existing_pages(
navigation, language_code, target_tab_name
)
if target_version_nav is None:
print(f"信息:在导航数据中未找到版本 '{language_code}',将创建。")
if "versions" not in navigation: # 确保 versions 列表存在
navigation["versions"] = []
target_version_nav = {"version": language_code, "tabs": []}
navigation["versions"].append(target_version_nav)
existing_pages = set()
target_tab_nav = None # 版本是新建的Tab 肯定不存在
# 如果目标 Tab 不存在,需要创建它
if target_tab_nav is None:
print(
f"信息: 在版本 '{language_code}' 中未找到 Tab '{target_tab_name}',将创建。"
)
target_tab_nav = {"tab": target_tab_name, "groups": []}
# 确保 target_version_nav['tabs'] 是列表
if "tabs" not in target_version_nav or not isinstance(
target_version_nav["tabs"], list
):
target_version_nav["tabs"] = []
target_version_nav["tabs"].append(target_tab_nav)
existing_pages = set() # 新 Tab 没有现有页面
print(
f"找到 {len(existing_pages)} 个已存在的页面 (版本: '{language_code}', Tab: '{target_tab_name}')。"
)
# 3. 扫描文件系统 (这部分不变,扫描目录下的所有匹配文件)
filesystem_pages = set()
valid_files = []
if not os.path.isdir(docs_dir):
# 如果目录不存在,则无法继续处理此配置,引发错误
raise FileNotFoundError(
f"错误: 配置 '{language_code}' 的文档目录 '{docs_dir}' 不存在。"
)
else:
for filename in os.listdir(docs_dir):
# 使用配置中的 filename_pattern
if filename.endswith(file_extension) and filename_pattern.match(filename):
try: # 添加 try-except 块以捕获 get_page_path 可能引发的 ValueError
page_path = get_page_path(filename, docs_config)
filesystem_pages.add(page_path)
valid_files.append(filename)
except ValueError as e:
# 从 get_page_path 捕获到错误,打印并继续处理其他文件,或重新引发以停止
print(f"错误处理文件 '{filename}': {e}。将跳过此文件。")
# 如果希望停止整个过程,取消注释下一行:
# raise e
print(f"'{docs_dir}' 找到 {len(filesystem_pages)} 个有效的文档文件。")
# 4. 计算差异 (相对于目标 Tab 的 existing_pages)
new_files_paths = filesystem_pages - existing_pages
removed_files_paths = existing_pages - filesystem_pages
print(f"新增文件数 (相对于 Tab '{target_tab_name}'): {len(new_files_paths)}")
print(f"移除文件数 (相对于 Tab '{target_tab_name}'): {len(removed_files_paths)}")
# 5. 移除失效页面 (仅从目标 Tab 移除)
if removed_files_paths and target_tab_nav: # 确保目标 Tab 存在
print(f"正在从 Tab '{target_tab_name}' 移除失效页面...")
remove_obsolete_pages(
target_tab_nav, removed_files_paths
) # 直接传入目标 Tab 对象
print(f"已处理从 Tab '{target_tab_name}' 移除: {removed_files_paths}")
elif removed_files_paths:
print(
f"警告: 存在失效页面 {removed_files_paths},但未找到目标 Tab '{target_tab_name}' 进行移除。"
)
# 6. 添加新页面 (逻辑不变,但 find_or_create_target_group 会确保添加到正确的 Tab 和 Group)
if new_files_paths:
print(f"正在向 Tab '{target_tab_name}' 添加新页面...")
new_files_sorted = sorted(
[f for f in valid_files if get_page_path(f, docs_config) in new_files_paths]
)
groups_to_add = defaultdict(list)
for filename in new_files_sorted:
match = filename_pattern.match(filename) # 使用配置中的 pattern
if match:
pwxy = match.group(1)
if len(pwxy) >= 3:
p, w, x = pwxy[0], pwxy[1], pwxy[2]
try: # 包裹 get_page_path 调用
page_path = get_page_path(filename, docs_config)
except ValueError as e:
print(
f"错误处理文件 '{filename}' (添加阶段): {e}。将跳过此文件。"
)
continue # 跳过这个文件
group_key = (p, w, x)
if group_key in pwx_to_group_map:
map_result = pwx_to_group_map[group_key]
current_tab_name_from_map = map_result[0]
# 强制使用配置的目标 Tab 名称
if current_tab_name_from_map != target_tab_name:
print(
f"警告: 文件 '{filename}' 根据 PWX 映射到 Tab '{current_tab_name_from_map}',但当前配置强制处理 Tab '{target_tab_name}'。将添加到 '{target_tab_name}'"
)
# 始终使用配置中定义的 target_tab_name
tab_name_to_use = target_tab_name
if len(map_result) == 3:
_, group_name, nested_group_name = map_result
else: # 兼容旧格式或只有两项的情况
if len(map_result) >= 2:
_, group_name = map_result[:2] # 取前两项
else:
# 处理 map_result 项数不足的情况
print(
f"错误: PWX_TO_GROUP_MAP 中键 '{group_key}' 的值 '{map_result}' 格式不正确,至少需要两项。跳过文件 '{filename}'"
)
continue
nested_group_name = None # 假设没有嵌套组
groups_to_add[
(tab_name_to_use, group_name, nested_group_name)
].append(page_path)
else:
print(
f"警告: 文件 '{filename}' 的 PWX 前缀 ('{p}', '{w}', '{x}') 在 PWX_TO_GROUP_MAP 中没有找到映射,将跳过添加。"
)
else:
# 数字前缀不足3位是文件名格式错误应引发异常
raise ValueError(
f"错误: 文件 '{filename}' 的数字前缀 '{pwxy}' 不足3位无法解析 PWX。"
)
for (
tab_name,
group_name,
nested_group_name,
), pages_to_append in groups_to_add.items():
# 确保只添加到目标 Tab 下 (此检查现在是多余的,因为上面强制使用了 target_tab_name)
# if tab_name == target_tab_name:
print(
f" 添加到 Tab='{tab_name}', Group='{group_name}', Nested='{nested_group_name or '[无]'}' : {len(pages_to_append)} 个页面"
)
# find_or_create_target_group 现在需要 target_version_nav 来定位或创建 Tab
target_pages_list = find_or_create_target_group(
# tab_name 此时应等于 target_tab_name
target_version_nav,
tab_name,
group_name,
nested_group_name,
)
if isinstance(target_pages_list, list):
for new_page in pages_to_append:
if new_page not in target_pages_list:
target_pages_list.append(new_page)
print(f" + {new_page}")
else:
# find_or_create_target_group 内部出错时会抛出 RuntimeError
# 这里可以加日志,但理论上不应到达
print(
f"错误: 未能为 Tab='{tab_name}', Group='{group_name}', Nested='{nested_group_name}' 获取有效的 pages 列表进行添加。"
)
# else: # 这个 else 分支现在不会被触发
# print(f"信息: 跳过向非目标 Tab '{tab_name}' 添加页面 (目标 Tab: '{target_tab_name}')。")
# <-- 排序 Group (仅排序目标 Tab 内的 Group) -->
print(f"正在排序 Tab '{target_tab_name}' 内的 Group...")
if target_tab_nav and "groups" in target_tab_nav: # 确保目标 Tab 和 groups 存在
groups_list = [g for g in target_tab_nav["groups"] if isinstance(g, dict)]
groups_list.sort(key=lambda g: get_group_sort_key(g, docs_config))
target_tab_nav["groups"] = groups_list
print(f" 已对 Tab '{target_tab_name}' 中的 Group 进行排序。")
elif target_tab_nav:
print(f" Tab '{target_tab_name}' 中没有 'groups' 或为空,无需排序。")
else:
print(f" 未找到 Tab '{target_tab_name}',无法排序 Group。")
# 不再返回 docs_data因为直接修改了传入的 navigation_data
print(
f"--- 完成处理版本: {docs_config['LANGUAGE_CODE']} / Tab: {docs_config['TARGET_TAB_NAME']} ---"
)
def load_docs_data(path):
"""加载 JSON 文件,处理文件不存在和格式错误的情况"""
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except FileNotFoundError:
print(f"信息: {path} 未找到,将创建新的结构。")
return {"navigation": {"versions": []}} # 返回初始结构
except json.JSONDecodeError as e:
# 引发更具体的错误,而不是返回 None
raise json.JSONDecodeError(
f"错误: {path} 格式错误。无法继续。- {e.msg}", e.doc, e.pos
)
def save_docs_data(path, data):
"""保存 JSON 数据到文件"""
try:
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
print(f"\n成功更新 {path},包含所有已处理的版本。")
# 不再需要返回 True/False因为异常会处理失败情况
except IOError as e:
# 引发 IO 错误
raise IOError(f"错误: 无法写入 {path} - {e}")
except Exception as e:
# 引发其他未知错误
raise Exception(f"写入 {path} 时发生未知错误: {e}")
def process_configurations(configs, docs_path):
"""加载数据,处理所有有效配置,然后保存数据"""
# 1. 加载初始数据
try:
current_docs_data = load_docs_data(docs_path)
except json.JSONDecodeError as e:
print(e) # 打印加载错误信息
return # 加载失败则退出
# current_docs_data 不会是 None因为 load_docs_data 要么返回数据要么引发异常
# 2. 确保基本结构存在
navigation_data = current_docs_data.setdefault(
"navigation", {}
) # 获取 navigation 字典
navigation_data.setdefault("versions", [])
# 3. 筛选有效配置
valid_configs = []
for config in configs:
required_keys = [
"DOCS_DIR",
"LANGUAGE_CODE",
"FILE_EXTENSION",
"PWX_TO_GROUP_MAP",
"DESIRED_GROUP_ORDER",
"TARGET_TAB_NAME",
"FILENAME_PATTERN",
]
if all(k in config for k in required_keys):
# 可选:检查 PWX_TO_GROUP_MAP 和 DESIRED_GROUP_ORDER 是否为空
# 并且检查 FILENAME_PATTERN 是否是编译后的正则表达式对象
if (
config.get("PWX_TO_GROUP_MAP")
and config.get("DESIRED_GROUP_ORDER")
and isinstance(config.get("FILENAME_PATTERN"), re.Pattern)
):
valid_configs.append(config)
else:
reason = []
if not config.get("PWX_TO_GROUP_MAP"):
reason.append("PWX_TO_GROUP_MAP 为空或不存在")
if not config.get("DESIRED_GROUP_ORDER"):
reason.append("DESIRED_GROUP_ORDER 为空或不存在")
if not isinstance(config.get("FILENAME_PATTERN"), re.Pattern):
reason.append("FILENAME_PATTERN 不是有效的正则表达式对象")
print(
f"警告: 配置 {config.get('LANGUAGE_CODE', '未知')} 无效 ({'; '.join(reason)}),跳过处理。"
)
else:
missing_keys = [k for k in required_keys if k not in config]
print(
f"警告: 配置 {config.get('LANGUAGE_CODE', '未知')} 不完整 (缺少: {', '.join(missing_keys)}),跳过处理。"
)
# 4. 处理有效配置
if not valid_configs:
print("没有有效的配置可供处理。")
else:
try: # 包裹所有配置的处理过程
for config in valid_configs:
# 将 navigation_data 传递给 main 函数进行修改
# main 函数会直接修改这个 navigation_data 字典
main(config, navigation_data)
# 5. 所有配置处理完毕后,统一写回文件
save_docs_data(docs_path, current_docs_data)
except (FileNotFoundError, ValueError, RuntimeError, IOError, Exception) as e:
# 捕获 main 或 save_docs_data 中可能引发的已知错误
print(f"\n处理过程中发生错误: {e}")
print("操作已终止,文件可能未完全更新。")
# 根据需要,可以在这里决定是否尝试保存部分结果或直接退出
if __name__ == "__main__":
# 定义要处理的配置列表
CONFIGS_TO_PROCESS = [
PLUGIN_DEV_ZH,
PLUGIN_DEV_EN,
PLUGIN_DEV_JA,
]
# 调用主处理函数
process_configurations(CONFIGS_TO_PROCESS, DOCS_JSON_PATH)

698
tools/apply_docs_json.py Normal file
View File

@@ -0,0 +1,698 @@
import json
import os
import re
from collections import defaultdict
from pathlib import Path
import sys # Import sys for system-specific parameters and functions, e.g., sys.exit()
# --- Script Base Paths ---
SCRIPT_DIR = Path(__file__).resolve().parent
BASE_DIR = SCRIPT_DIR.parent
# --- Configuration ---
refresh = False # Flag to control whether to clear existing tabs before processing
DOCS_JSON_PATH = BASE_DIR / "docs.json" # Path to the main documentation structure JSON file
# --- Language Configurations ---
# These configurations define how documentation files for different languages are processed.
# IMPORTANT: The string values for LANGUAGE_CODE, TARGET_TAB_NAME, and content within
# PWX_TO_GROUP_MAP and DESIRED_GROUP_ORDER are i18n-specific and MUST NOT be translated.
PLUGIN_DEV_ZH = {
"DOCS_DIR_RELATIVE": "plugin_dev_zh", "LANGUAGE_CODE": "简体中文", "FILE_EXTENSION_SUFFIX": ".zh",
"TARGET_TAB_NAME": "插件开发", "FILENAME_PATTERN": re.compile(r"^(\d{4})-(.*?)\.zh\.mdx$"),
"PWX_TO_GROUP_MAP": { # Maps (P, W, X) prefixes from filenames to (Tab Name, Group Name, Optional Nested Group Name)
("0", "1", "1"): ("插件开发", "概念与入门", "概览"), ("0", "1", "3"): ("插件开发", "概念与入门", None),
("0", "2", "1"): ("插件开发", "开发实践", "快速开始"),("0", "2", "2"): ("插件开发", "开发实践", "开发 Dify 插件"),
("0", "3", "1"): ("插件开发", "贡献与发布", "行为准则与规范"),("0", "3", "2"): ("插件开发", "贡献与发布", "发布与上架"),("0", "3", "3"): ("插件开发", "贡献与发布", "常见问题解答"),
("0", "4", "3"): ("插件开发", "实践案例与示例", "开发示例"),
("9", "2", "2"): ("插件开发", "高级开发", "Extension 与 Agent"),("9", "2", "3"): ("插件开发", "高级开发", "Extension 与 Agent"),("9", "4", "3"): ("插件开发", "高级开发", "Extension 与 Agent"),("9", "2", "4"): ("插件开发", "高级开发", "反向调用"),
("0", "4", "1"): ("插件开发", "Reference & Specifications", "核心规范与功能"),
},
"DESIRED_GROUP_ORDER": ["概念与入门", "开发实践", "贡献与发布", "实践案例与示例", "高级开发", "Reference & Specifications"],
}
PLUGIN_DEV_EN = {
"DOCS_DIR_RELATIVE": "plugin_dev_en", "LANGUAGE_CODE": "English", "FILE_EXTENSION_SUFFIX": ".en",
"TARGET_TAB_NAME": "Plugin Development", "FILENAME_PATTERN": re.compile(r"^(\d{4})-(.*?)\.en\.mdx$"),
"PWX_TO_GROUP_MAP": {
("0", "1", "1"): ("Plugin Development", "Concepts & Getting Started", "Overview"),("0", "1", "3"): ("Plugin Development", "Concepts & Getting Started", None),
("0", "2", "1"): ("Plugin Development", "Development Practices", "Quick Start"),("0", "2", "2"): ("Plugin Development", "Development Practices", "Developing Dify Plugins"),
("0", "3", "1"): ("Plugin Development", "Contribution & Publishing", "Code of Conduct & Standards"),("0", "3", "2"): ("Plugin Development", "Contribution & Publishing", "Publishing & Listing"),("0", "3", "3"): ("Plugin Development", "Contribution & Publishing", "FAQ"),
("0", "4", "3"): ("Plugin Development", "Examples & Use Cases", "Development Examples"),
("9", "2", "2"): ("Plugin Development", "Advanced Development", "Extension & Agent"),("9", "2", "3"): ("Plugin Development", "Advanced Development", "Extension & Agent"),("9", "4", "3"): ("Plugin Development", "Advanced Development", "Extension & Agent"),("9", "2", "4"): ("Plugin Development", "Advanced Development", "Reverse Calling"),
("0", "4", "1"): ("Plugin Development", "Reference & Specifications", "Core Specifications & Features"),
},
"DESIRED_GROUP_ORDER": ["Concepts & Getting Started", "Development Practices", "Contribution & Publishing", "Examples & Use Cases", "Advanced Development", "Reference & Specifications"],
}
PLUGIN_DEV_JA = {
"DOCS_DIR_RELATIVE": "plugin_dev_ja", "LANGUAGE_CODE": "日本語", "FILE_EXTENSION_SUFFIX": ".ja",
"TARGET_TAB_NAME": "プラグイン開発", "FILENAME_PATTERN": re.compile(r"^(\d{4})-(.*?)\.ja\.mdx$"),
"PWX_TO_GROUP_MAP": {
("0", "1", "1"): ("プラグイン開発", "概念と概要", "概要"),("0", "1", "3"): ("プラグイン開発", "概念と概要", None),
("0", "2", "1"): ("プラグイン開発", "開発実践", "クイックスタート"),("0", "2", "2"): ("プラグイン開発", "開発実践", "Difyプラグインの開発"),
("0", "3", "1"): ("プラグイン開発", "貢献と公開", "行動規範と基準"),("0", "3", "2"): ("プラグイン開発", "貢献と公開", "公開と掲載"),("0", "3", "3"): ("プラグイン開発", "貢献と公開", "よくある質問 (FAQ)"),
("0", "4", "3"): ("プラグイン開発", "実践例とユースケース", "開発例"),
("9", "2", "2"): ("プラグイン開発", "高度な開発", "Extension と Agent"),("9", "2", "3"): ("プラグイン開発", "高度な開発", "Extension と Agent"),("9", "4", "3"): ("プラグイン開発", "高度な開発", "Extension と Agent"),("9", "2", "4"): ("プラグイン開発", "高度な開発", "リバースコール"),
("0", "4", "1"): ("プラグイン開発", "リファレンスと仕様", "コア仕様と機能"),
},
"DESIRED_GROUP_ORDER": ["概念と概要", "開発実践", "貢献と公開", "実践例とユースケース", "高度な開発", "リファレンスと仕様"],
}
# --- Helper Functions ---
# Defines log issue types considered critical enough to be included in the commit message summary.
CRITICAL_ISSUE_TYPES = {"Error", "Critical", "ConfigError", "SeriousWarning"}
def _log_issue(reports_list_for_commit_message: list, lang_code: str, issue_type: str, message: str, details: str = ""):
"""
Logs a detailed message to the console and adds a concise version to a list for commit messages
if the issue_type is critical.
Args:
reports_list_for_commit_message: List to accumulate messages for the commit summary.
lang_code: Language code or identifier for the context of the log (e.g., "简体中文", "GLOBAL").
issue_type: Type of the issue (e.g., "Info", "Warning", "Error", "Critical").
message: The main message of the log.
details: Optional additional details for the log.
"""
full_log_message = f"[{issue_type.upper()}] Lang '{lang_code}': {message}"
if details:
full_log_message += f" Details: {details}"
print(full_log_message) # Always print the detailed log message to console.
if issue_type in CRITICAL_ISSUE_TYPES:
# Prepare a more concise message for the commit summary.
commit_msg_part = f"- Lang '{lang_code}': [{issue_type}] {message}"
reports_list_for_commit_message.append(commit_msg_part)
# INFO and non-critical Warning logs are only printed to console, not added to the commit summary list.
# Note: The following helper functions call `_log_issue`. Their docstrings will describe their primary purpose.
# The `commit_message_reports_list` parameter passed to them is for `_log_issue`.
def clear_tabs_if_refresh(navigation_data: dict, version_code: str, target_tab_name: str, do_refresh: bool, commit_message_reports_list: list) -> bool:
"""
Clears groups within a specific tab in the navigation data if `do_refresh` is True.
Args:
navigation_data: The main navigation data structure.
version_code: The language code or version identifier (e.g., "简体中文").
target_tab_name: The name of the tab to clear.
do_refresh: Boolean flag; if True, groups in the tab will be cleared.
commit_message_reports_list: List for accumulating critical issue messages.
Returns:
True if the tab was found and cleared, False otherwise.
"""
if not do_refresh:
return False
if not navigation_data or "versions" not in navigation_data:
_log_issue(commit_message_reports_list, version_code, "Warning", "'navigation.versions' not found, cannot clear tabs.")
return False
version_found, tab_cleared = False, False
for version_nav in navigation_data.get("versions", []):
if version_nav.get("version") == version_code:
version_found = True
target_tab = next((t for t in version_nav.get("tabs", []) if isinstance(t, dict) and t.get("tab") == target_tab_name), None)
if target_tab:
target_tab["groups"] = []
_log_issue(commit_message_reports_list, version_code, "Info", f"Cleared groups for Tab '{target_tab_name}'.")
tab_cleared = True
else:
# This might be an Info log if we expect the tab to be created later.
# If refresh implies the tab must exist, it could be a Warning.
_log_issue(commit_message_reports_list, version_code, "Info", f"Tab '{target_tab_name}' not found to clear groups (will be created if needed).")
break
if not version_found:
_log_issue(commit_message_reports_list, version_code, "Warning", f"Version '{version_code}' not found, cannot clear any Tab.")
return tab_cleared
def get_page_path_from_filename(filename: str, docs_dir_name: str) -> str:
"""
Constructs the documentation page path from its filename and directory name.
Example: "0001-intro.en.mdx", "plugin_dev_en" -> "plugin_dev_en/0001-intro.en"
Args:
filename: The .mdx filename (e.g., "0001-intro.en.mdx").
docs_dir_name: The relative directory name for this set of docs (e.g., "plugin_dev_en").
Returns:
The page path string used in docs.json.
Raises:
ValueError: If the filename does not end with ".mdx" (internal error if this happens).
"""
if not filename.endswith(".mdx"):
# This case should ideally be filtered out before calling this function.
# If it reaches here, it indicates an internal logic error.
raise ValueError(f"Internal Error: Filename '{filename}' received by get_page_path_from_filename does not end with '.mdx'.")
base_filename = filename[:-len(".mdx")] # Remove ".mdx"
return f"{docs_dir_name}/{base_filename}"
def extract_existing_pages(navigation_data: dict, version_code: str, target_tab_name: str, commit_message_reports_list: list):
"""
Extracts all existing page paths from a specific tab within a version in the navigation data.
Args:
navigation_data: The main navigation data structure.
version_code: The language code or version identifier.
target_tab_name: The name of the tab to extract pages from.
commit_message_reports_list: List for accumulating critical issue messages (passed to helpers, not used directly).
Returns:
A tuple: (set_of_existing_page_paths, target_version_nav_dict, target_tab_nav_dict).
Returns (set(), None, None) if the version or tab is not found.
"""
existing_pages = set()
target_version_nav, target_tab_nav = None, None
if not navigation_data or "versions" not in navigation_data:
return existing_pages, None, None # No versions structure, so no pages
target_version_nav = next((v for v in navigation_data.get("versions", []) if v.get("version") == version_code), None)
if not target_version_nav:
return existing_pages, None, None # Version not found
if "tabs" in target_version_nav and isinstance(target_version_nav["tabs"], list):
target_tab_nav = next((t for t in target_version_nav["tabs"] if isinstance(t,dict) and t.get("tab") == target_tab_name), None)
if target_tab_nav:
for group in target_tab_nav.get("groups", []):
if isinstance(group, dict):
_recursive_extract(group, existing_pages)
return existing_pages, target_version_nav, target_tab_nav
def _recursive_extract(group_item: dict, pages_set: set):
"""
Recursively extracts page paths from a group item and its nested groups.
(Helper for extract_existing_pages).
Args:
group_item: A dictionary representing a group, which may contain pages or nested groups.
pages_set: A set to which extracted page paths are added.
"""
if not isinstance(group_item, dict): return # Safety check
for page in group_item.get("pages", []):
if isinstance(page, str):
pages_set.add(page)
elif isinstance(page, dict) and "group" in page: # It's a nested group
_recursive_extract(page, pages_set)
def remove_obsolete_pages(target_tab_data: dict, pages_to_remove: set, commit_message_reports_list: list, lang_code: str):
"""
Removes obsolete page paths from the groups within the target tab data.
Modifies `target_tab_data` in place.
Args:
target_tab_data: The dictionary for the specific tab being processed.
pages_to_remove: A set of page path strings that should be removed.
commit_message_reports_list: List for accumulating critical issue messages.
lang_code: Language code for logging purposes.
"""
if not isinstance(target_tab_data, dict) or "groups" not in target_tab_data or not isinstance(target_tab_data.get("groups"), list):
_log_issue(commit_message_reports_list, lang_code, "Warning", "Attempted to remove obsolete pages from invalid target_tab_data structure.", f"Tab data: {target_tab_data}")
return
groups = target_tab_data["groups"]
i = 0
while i < len(groups): # Iterate with index to handle potential removal of empty groups (currently retains structure)
group_item = groups[i]
if isinstance(group_item, dict):
_remove_obsolete_from_group(group_item, pages_to_remove, commit_message_reports_list, lang_code)
if not group_item.get("pages"): # Check if the group became empty
_log_issue(commit_message_reports_list, lang_code, "Info", f"Group '{group_item.get('group', 'Unknown')}' emptied after removing obsolete pages; structure retained.")
i += 1
else:
_log_issue(commit_message_reports_list, lang_code, "Warning", f"Encountered non-dict item in groups list of Tab '{target_tab_data.get('tab','Unknown')}' during obsolete page removal. Item: {group_item}")
i += 1
def _remove_obsolete_from_group(group_dict: dict, pages_to_remove: set, commit_message_reports_list: list, lang_code: str):
"""
Recursively removes obsolete page paths from a group dictionary and its nested groups.
Modifies `group_dict` in place. (Helper for remove_obsolete_pages).
Args:
group_dict: The dictionary representing a group.
pages_to_remove: A set of page path strings to remove.
commit_message_reports_list: List for accumulating critical issue messages.
lang_code: Language code for logging.
"""
if not isinstance(group_dict, dict) or "pages" not in group_dict or not isinstance(group_dict.get("pages"), list):
group_name_for_log_err = group_dict.get('group', 'Unnamed Group with structural issue') if isinstance(group_dict, dict) else 'Non-dict item'
_log_issue(commit_message_reports_list, lang_code, "Warning", f"Group '{group_name_for_log_err}' has invalid 'pages' structure; cannot remove obsolete pages from it. Structure: {group_dict}")
return
new_pages = []
group_name_for_log = group_dict.get('group', 'Unknown') # For logging context
for page_item in group_dict["pages"]:
if isinstance(page_item, str): # It's a page path
if page_item not in pages_to_remove:
new_pages.append(page_item)
else:
_log_issue(commit_message_reports_list, lang_code, "Info", f"Removed obsolete page '{page_item}' from Group '{group_name_for_log}'.")
elif isinstance(page_item, dict) and "group" in page_item: # It's a nested group
_remove_obsolete_from_group(page_item, pages_to_remove, commit_message_reports_list, lang_code)
# Retain nested group even if it becomes empty.
if page_item.get("pages"):
new_pages.append(page_item)
else:
_log_issue(commit_message_reports_list, lang_code, "Info", f"Nested group '{page_item.get('group', 'Unknown')}' in Group '{group_name_for_log}' emptied; structure retained.")
new_pages.append(page_item) # Still append the empty nested group structure
else: # Unknown item type, preserve it
_log_issue(commit_message_reports_list, lang_code, "Warning", f"Encountered unexpected item type in 'pages' list of Group '{group_name_for_log}'. Preserving item: {page_item}")
new_pages.append(page_item)
group_dict["pages"] = new_pages
def find_or_create_target_group(target_version_nav: dict, tab_name: str, group_name: str, nested_group_name: str | None, commit_message_reports_list: list, lang_code: str) -> list:
"""
Finds or creates the target group (and nested group, if specified) within the navigation data
and returns the 'pages' list where new pages should be added.
Modifies `target_version_nav` in place by adding new structures if they don't exist.
Args:
target_version_nav: The dictionary for the specific version being processed.
tab_name: The name of the target tab.
group_name: The name of the primary group.
nested_group_name: The name of the nested group (optional, can be None).
commit_message_reports_list: List for accumulating critical issue messages.
lang_code: Language code for logging.
Returns:
The 'pages' list (mutable) of the target group or nested group.
"""
target_version_nav.setdefault("tabs", [])
if not isinstance(target_version_nav["tabs"], list):
_log_issue(commit_message_reports_list, lang_code, "Critical", f"Internal state error: version.tabs is not a list for version '{target_version_nav.get('version')}'. Attempting to recover by creating a new list.")
target_version_nav["tabs"] = []
target_tab = next((t for t in target_version_nav["tabs"] if isinstance(t,dict) and t.get("tab") == tab_name), None)
if not target_tab:
target_tab = {"tab": tab_name, "groups": []}
target_version_nav["tabs"].append(target_tab)
_log_issue(commit_message_reports_list, lang_code, "Info", f"Created new Tab '{tab_name}'.")
target_tab.setdefault("groups", [])
if not isinstance(target_tab["groups"], list):
_log_issue(commit_message_reports_list, lang_code, "Critical", f"Internal state error: tab.groups is not a list for Tab '{tab_name}'. Attempting to recover.")
target_tab["groups"] = []
target_group = next((g for g in target_tab["groups"] if isinstance(g,dict) and g.get("group") == group_name), None)
if not target_group:
target_group = {"group": group_name, "pages": []}
target_tab["groups"].append(target_group)
_log_issue(commit_message_reports_list, lang_code, "Info", f"Created new Group '{group_name}' in Tab '{tab_name}'.")
target_group.setdefault("pages", [])
if not isinstance(target_group["pages"], list):
_log_issue(commit_message_reports_list, lang_code, "Critical", f"Internal state error: group.pages is not a list for Group '{group_name}'. Attempting to recover.")
target_group["pages"] = []
container_for_pages = target_group["pages"]
if nested_group_name:
nested_group = next((item for item in target_group["pages"] if isinstance(item, dict) and item.get("group") == nested_group_name), None)
if not nested_group:
nested_group = {"group": nested_group_name, "pages": []}
target_group["pages"].append(nested_group)
_log_issue(commit_message_reports_list, lang_code, "Info", f"Created new Nested Group '{nested_group_name}' in Group '{group_name}'.")
nested_group.setdefault("pages", [])
if not isinstance(nested_group["pages"], list):
_log_issue(commit_message_reports_list, lang_code, "Critical", f"Internal state error: nested_group.pages is not a list for Nested Group '{nested_group_name}'. Attempting to recover.")
nested_group["pages"] = []
container_for_pages = nested_group["pages"]
return container_for_pages
def get_group_sort_key(group_dict: dict, desired_order_list: list) -> int:
"""
Calculates a sort key for a group based on its desired order.
Groups not in the desired_order_list will be placed at the end.
Args:
group_dict: The group dictionary, expected to have a "group" key with its name.
desired_order_list: A list of group names in their desired display order.
Returns:
An integer sort key. Lower numbers sort earlier.
"""
group_name = group_dict.get("group", "")
try:
return desired_order_list.index(group_name)
except ValueError:
return len(desired_order_list)
# --- Main Logic ---
def process_single_config(docs_config: dict, navigation_data: dict, commit_message_reports_list: list):
"""
Processes a single language/documentation configuration.
It updates the `navigation_data` by adding new pages, removing obsolete ones,
and structuring them according to the configuration.
Args:
docs_config: A dictionary containing the configuration for a specific documentation set (e.g., PLUGIN_DEV_EN).
navigation_data: The mutable main navigation data structure (specifically, the 'navigation' dict from docs_data).
commit_message_reports_list: List for accumulating critical issue messages.
"""
lang_code = docs_config["LANGUAGE_CODE"]
docs_dir_relative = docs_config["DOCS_DIR_RELATIVE"]
docs_dir_abs = BASE_DIR / docs_dir_relative
pwx_map = docs_config["PWX_TO_GROUP_MAP"]
filename_pattern = docs_config["FILENAME_PATTERN"]
target_tab_name = docs_config["TARGET_TAB_NAME"]
desired_group_order = docs_config["DESIRED_GROUP_ORDER"]
_log_issue(commit_message_reports_list, lang_code, "Info", f"Processing Tab '{target_tab_name}'. Docs dir: '{docs_dir_abs}'")
clear_tabs_if_refresh(navigation_data, lang_code, target_tab_name, refresh, commit_message_reports_list)
existing_pages, target_version_nav, target_tab_nav = extract_existing_pages(navigation_data, lang_code, target_tab_name, commit_message_reports_list)
if target_version_nav is None:
_log_issue(commit_message_reports_list, lang_code, "Info", f"Version '{lang_code}' not found in docs.json, creating it.")
navigation_data.setdefault("versions", [])
if not isinstance(navigation_data["versions"], list):
_log_issue(commit_message_reports_list, lang_code, "Critical", "Top-level 'navigation.versions' is not a list. Re-initializing.")
navigation_data["versions"] = []
target_version_nav = {"version": lang_code, "tabs": []}
navigation_data["versions"].append(target_version_nav)
existing_pages = set()
target_tab_nav = None
if target_tab_nav is None:
_log_issue(commit_message_reports_list, lang_code, "Info", f"Tab '{target_tab_name}' not found in version '{lang_code}'. It will be created if pages are added to it.")
existing_pages = set()
# Ensure target_version_nav.tabs exists for find_or_create_target_group
target_version_nav.setdefault("tabs", [])
if not isinstance(target_version_nav["tabs"], list):
_log_issue(commit_message_reports_list, lang_code, "Critical", f"Version '{lang_code}' 'tabs' attribute is not a list. Re-initializing.")
target_version_nav["tabs"] = []
# Tab structure will be fully created by find_or_create_target_group when the first page is added.
# If no pages are added, the tab might not appear unless explicitly created empty for sorting.
# For now, rely on find_or_create_target_group.
_log_issue(commit_message_reports_list, lang_code, "Info", f"{len(existing_pages)} existing pages found in docs.json for Tab '{target_tab_name}'.")
filesystem_pages_map = {}
valid_filenames_for_processing = []
if not docs_dir_abs.is_dir():
_log_issue(commit_message_reports_list, lang_code, "Error", f"Documentation directory '{docs_dir_abs}' not found. Skipping file processing for this configuration.")
return
for filename in os.listdir(docs_dir_abs):
if not filename.endswith(".mdx"):
continue
if filename_pattern.match(filename):
try:
page_path = get_page_path_from_filename(filename, docs_dir_relative)
filesystem_pages_map[filename] = page_path
valid_filenames_for_processing.append(filename)
except ValueError as e:
_log_issue(commit_message_reports_list, lang_code, "Error", f"Error generating page path for '{filename}': {e}. Skipping this file.")
else:
_log_issue(commit_message_reports_list, lang_code, "SeriousWarning", f"File '{filename}' in '{docs_dir_relative}' is .mdx but does not match FILENAME_PATTERN. Skipping this file.")
filesystem_page_paths_set = set(filesystem_pages_map.values())
_log_issue(commit_message_reports_list, lang_code, "Info", f"{len(filesystem_page_paths_set)} valid .mdx files matching pattern found in '{docs_dir_relative}'.")
new_page_paths = filesystem_page_paths_set - existing_pages
removed_page_paths = existing_pages - filesystem_page_paths_set
if new_page_paths:
_log_issue(commit_message_reports_list, lang_code, "Info", f"{len(new_page_paths)} new page(s) to add to Tab '{target_tab_name}'.")
if removed_page_paths:
_log_issue(commit_message_reports_list, lang_code, "Info", f"{len(removed_page_paths)} obsolete page(s) to remove from Tab '{target_tab_name}'.")
# Re-fetch target_tab_nav as it might have been None if the tab was new
# This ensures we operate on the correct tab structure, especially if it was just created by find_or_create_target_group
# or if it was pre-existing.
# This ensures 'remove_obsolete_pages' gets the correct tab object.
# Note: find_or_create_target_group modifies target_version_nav in-place.
# We need to find the tab object within target_version_nav *after* any potential modifications.
# This will be done before adding new pages and before sorting groups.
_current_tab_for_removal = next((t for t in target_version_nav.get("tabs", []) if isinstance(t, dict) and t.get("tab") == target_tab_name), None)
if removed_page_paths and _current_tab_for_removal:
remove_obsolete_pages(_current_tab_for_removal, removed_page_paths, commit_message_reports_list, lang_code)
elif removed_page_paths: # Means there were pages to remove, but the tab itself wasn't found (edge case)
_log_issue(commit_message_reports_list, lang_code, "Warning", f"Obsolete pages detected for Tab '{target_tab_name}', but the tab was not found in the current version structure. Removal skipped.")
if new_page_paths:
files_to_add_sorted = sorted([fn for fn, pp in filesystem_pages_map.items() if pp in new_page_paths])
for filename in files_to_add_sorted:
match = filename_pattern.match(filename)
if not match:
_log_issue(commit_message_reports_list, lang_code, "InternalError", f"File '{filename}' was marked for addition but failed pattern match. Skipping.")
continue
pwxy_str = match.group(1)
page_path = filesystem_pages_map[filename]
if len(pwxy_str) < 3:
_log_issue(commit_message_reports_list, lang_code, "Error", f"File '{filename}' has an invalid PWXY prefix '{pwxy_str}' (too short). Skipping this file.")
continue
p, w, x = pwxy_str[0], pwxy_str[1], pwxy_str[2]
group_key = (p, w, x)
if group_key in pwx_map:
map_val = pwx_map[group_key]
if not (isinstance(map_val, tuple) and (len(map_val) == 2 or len(map_val) == 3)):
_log_issue(commit_message_reports_list, lang_code, "ConfigError", f"PWX_TO_GROUP_MAP entry for key {group_key} has invalid format: {map_val}. Expected tuple of 2 or 3 strings. Skipping file '{filename}'.")
continue
_tab_name_in_map, group_name_from_map = map_val[0], map_val[1]
nested_group_name_from_map = map_val[2] if len(map_val) == 3 else None
if _tab_name_in_map != target_tab_name:
_log_issue(commit_message_reports_list, lang_code, "Warning", f"File '{filename}' (PWX key {group_key}) maps to Tab '{_tab_name_in_map}' in PWX_TO_GROUP_MAP, but current processing is for Tab '{target_tab_name}'. Page will be added to '{target_tab_name}' under group '{group_name_from_map}'.")
target_pages_container_list = find_or_create_target_group(
target_version_nav, target_tab_name, group_name_from_map, nested_group_name_from_map,
commit_message_reports_list, lang_code
)
if page_path not in target_pages_container_list:
target_pages_container_list.append(page_path)
_log_issue(commit_message_reports_list, lang_code, "Info", f"Added page '{page_path}' to Group '{group_name_from_map}' (Nested: {nested_group_name_from_map or 'No'}).")
else:
_log_issue(commit_message_reports_list, lang_code, "Info", f"Page '{page_path}' already exists in Group '{group_name_from_map}' (Nested: {nested_group_name_from_map or 'No'}). Skipping addition.")
else:
_log_issue(commit_message_reports_list, lang_code, "SeriousWarning", f"File '{filename}' (PWX prefix ({p},{w},{x})) has no corresponding entry in PWX_TO_GROUP_MAP. Skipping this file.")
# Final check for sorting: target_tab_nav needs to be the current state of the tab object.
final_target_tab_nav = next((t for t in target_version_nav.get("tabs", []) if isinstance(t, dict) and t.get("tab") == target_tab_name), None)
if final_target_tab_nav and "groups" in final_target_tab_nav and isinstance(final_target_tab_nav["groups"], list):
if final_target_tab_nav["groups"]:
final_target_tab_nav["groups"].sort(key=lambda g: get_group_sort_key(g, desired_group_order))
_log_issue(commit_message_reports_list, lang_code, "Info", f"Sorted groups in Tab '{target_tab_name}'.")
else:
_log_issue(commit_message_reports_list, lang_code, "Info", f"No groups to sort in Tab '{target_tab_name}' (tab is empty or contains no group structures).")
elif final_target_tab_nav:
_log_issue(commit_message_reports_list, lang_code, "Warning", f"Tab '{target_tab_name}' exists but has no valid 'groups' list to sort.")
else: # Tab was not created (e.g., no new pages and it didn't exist before)
_log_issue(commit_message_reports_list, lang_code, "Info", f"Tab '{target_tab_name}' does not exist in the final structure; no sorting needed.")
def load_docs_data_robust(path: Path, commit_message_reports_list: list, lang_for_report: str = "GLOBAL") -> dict:
"""
Loads docs.json data robustly. If file doesn't exist or is invalid, returns a default structure.
Args:
path: Path object to the docs.json file.
commit_message_reports_list: List for accumulating critical issue messages.
lang_for_report: Identifier for logging context (defaults to "GLOBAL").
Returns:
A dictionary with the loaded data or a default structure on failure.
"""
default_structure = {"navigation": {"versions": []}}
try:
if not path.exists():
_log_issue(commit_message_reports_list, lang_for_report, "Info", f"File '{path}' not found. Initializing with a new default structure.")
return default_structure
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
if not isinstance(data, dict) or \
"navigation" not in data or not isinstance(data["navigation"], dict) or \
"versions" not in data["navigation"] or not isinstance(data["navigation"]["versions"], list):
_log_issue(commit_message_reports_list, lang_for_report, "Error", f"File '{path}' has an invalid root structure. Key 'navigation.versions' (as a list) is missing or malformed. Using default structure.")
return default_structure
return data
except json.JSONDecodeError as e:
_log_issue(commit_message_reports_list, lang_for_report, "Error", f"Failed to parse JSON from '{path}': {e}. Using default structure.")
return default_structure
except Exception as e:
_log_issue(commit_message_reports_list, lang_for_report, "Critical", f"Unexpected error loading file '{path}': {e}. Using default structure.")
return default_structure
def save_docs_data_robust(path: Path, data: dict, commit_message_reports_list: list, lang_for_report: str = "GLOBAL") -> bool:
"""
Saves data to docs.json robustly.
Args:
path: Path object to the docs.json file.
data: The dictionary data to save.
commit_message_reports_list: List for accumulating critical issue messages.
lang_for_report: Identifier for logging context.
Returns:
True if save was successful, False otherwise.
"""
try:
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=4)
_log_issue(commit_message_reports_list, lang_for_report, "Info", f"Successfully saved updates to '{path}'.")
return True
except Exception as e:
_log_issue(commit_message_reports_list, lang_for_report, "Critical", f"Failed to save updates to '{path}': {e}.")
return False
def validate_config(config: dict, config_name: str, commit_message_reports_list: list) -> bool:
"""
Validates a single documentation configuration dictionary.
Args:
config: The configuration dictionary to validate.
config_name: A name/identifier for the configuration (e.g., language code), used for logging.
commit_message_reports_list: List for accumulating critical issue messages.
Returns:
True if the configuration is valid, False otherwise.
"""
is_valid = True
required_keys = [
"DOCS_DIR_RELATIVE", "LANGUAGE_CODE", "FILE_EXTENSION_SUFFIX",
"TARGET_TAB_NAME", "FILENAME_PATTERN", "PWX_TO_GROUP_MAP", "DESIRED_GROUP_ORDER"
]
for key in required_keys:
if key not in config:
_log_issue(commit_message_reports_list, config_name, "ConfigError", f"Configuration is missing required key '{key}'.")
is_valid = False
if not is_valid:
_log_issue(commit_message_reports_list, config_name, "Info", f"Skipping configuration '{config_name}' due to missing required keys.")
return False
if not (isinstance(config["DOCS_DIR_RELATIVE"], str) and config["DOCS_DIR_RELATIVE"]):
_log_issue(commit_message_reports_list, config_name, "ConfigError", f"Key 'DOCS_DIR_RELATIVE' must be a non-empty string. Found: '{config.get('DOCS_DIR_RELATIVE')}'.")
is_valid = False
if not isinstance(config["FILENAME_PATTERN"], re.Pattern):
_log_issue(commit_message_reports_list, config_name, "ConfigError", f"Key 'FILENAME_PATTERN' must be a compiled regular expression (re.Pattern). Found type: {type(config.get('FILENAME_PATTERN'))}.")
is_valid = False
if not (isinstance(config["PWX_TO_GROUP_MAP"], dict) and config["PWX_TO_GROUP_MAP"]):
_log_issue(commit_message_reports_list, config_name, "ConfigError", f"Key 'PWX_TO_GROUP_MAP' must be a non-empty dictionary. Found: '{config.get('PWX_TO_GROUP_MAP')}'.")
is_valid = False
if not isinstance(config["DESIRED_GROUP_ORDER"], list):
_log_issue(commit_message_reports_list, config_name, "ConfigError", f"Key 'DESIRED_GROUP_ORDER' must be a list. Found type: {type(config.get('DESIRED_GROUP_ORDER'))}.")
is_valid = False
if not is_valid:
_log_issue(commit_message_reports_list, config_name, "Info", f"Skipping configuration '{config_name}' due to type or content errors in its definition.")
return is_valid
def process_all_configs(configs_to_process: list[dict], docs_json_path: Path) -> list[str]:
"""
Main orchestrator for processing all provided documentation configurations.
Loads existing docs.json, processes each config, and saves the result.
Args:
configs_to_process: A list of configuration dictionaries.
docs_json_path: Path to the docs.json file.
Returns:
A list of strings, where each string is a critical issue message formatted for a commit summary.
Returns an empty list if no critical issues occurred.
"""
commit_message_reports = []
docs_data = load_docs_data_robust(docs_json_path, commit_message_reports)
navigation_data_to_modify = docs_data.setdefault("navigation", {})
if not isinstance(navigation_data_to_modify, dict):
_log_issue(commit_message_reports, "GLOBAL", "Critical", "'navigation' key in docs.json is not a dictionary. Resetting to default structure.")
docs_data["navigation"] = {"versions": []}
navigation_data_to_modify = docs_data["navigation"]
navigation_data_to_modify.setdefault("versions", [])
if not isinstance(navigation_data_to_modify.get("versions"), list):
_log_issue(commit_message_reports, "GLOBAL", "Error", "'navigation.versions' in docs.json was not a list. Resetting it to an empty list.")
navigation_data_to_modify["versions"] = []
processed_any_config_successfully = False
for i, config_item in enumerate(configs_to_process):
config_id = config_item.get("LANGUAGE_CODE", f"UnnamedConfig_{i+1}")
_log_issue(commit_message_reports, config_id, "Info", f"Starting validation for configuration '{config_id}'.")
if validate_config(config_item, config_id, commit_message_reports):
_log_issue(commit_message_reports, config_id, "Info", f"Configuration '{config_id}' validated successfully. Starting processing.")
try:
process_single_config(config_item, navigation_data_to_modify, commit_message_reports)
processed_any_config_successfully = True
except Exception as e:
_log_issue(commit_message_reports, config_id, "Critical", f"Unhandled exception during processing of configuration '{config_id}': {e}.")
import traceback
tb_str = traceback.format_exc()
print(f"TRACEBACK for configuration '{config_id}':\n{tb_str}")
else:
_log_issue(commit_message_reports, config_id, "Info", f"Configuration '{config_id}' failed validation. Skipping processing.")
if processed_any_config_successfully:
_log_issue(commit_message_reports, "GLOBAL", "Info", "Attempting to save changes to docs.json.")
save_docs_data_robust(docs_json_path, docs_data, commit_message_reports)
elif not configs_to_process:
_log_issue(commit_message_reports, "GLOBAL", "Info", "No configurations were provided to process.")
else:
_log_issue(commit_message_reports, "GLOBAL", "Info", "No valid configurations were processed successfully. docs.json will not be modified.")
return commit_message_reports
def main_apply_docs_json() -> str:
"""
Entry point for the script. Initializes configurations, processes them,
and returns a status message for commit purposes.
Returns:
"success" if no critical issues were reported, otherwise a formatted string
summarizing critical issues for a commit message.
"""
print(f"Script base directory: {BASE_DIR}")
print(f"Docs JSON path: {DOCS_JSON_PATH}")
print(f"Refresh mode: {refresh}")
CONFIGS_TO_PROCESS = [
PLUGIN_DEV_ZH,
PLUGIN_DEV_EN,
PLUGIN_DEV_JA,
]
commit_message_parts = process_all_configs(CONFIGS_TO_PROCESS, DOCS_JSON_PATH)
if not commit_message_parts:
return "success"
else:
num_critical_issues = len(commit_message_parts)
commit_summary_line = f"docs.json processed with {num_critical_issues} critical issue(s) reported."
max_lines_for_commit_detail = 10
if len(commit_message_parts) > max_lines_for_commit_detail:
detailed_issues_str = "\n".join(commit_message_parts[:max_lines_for_commit_detail]) + \
f"\n... and {len(commit_message_parts) - max_lines_for_commit_detail} more critical issues (see full console logs for details)."
else:
detailed_issues_str = "\n".join(commit_message_parts)
return f"{commit_summary_line}\n\nDetails of critical issues:\n{detailed_issues_str}"
if __name__ == "__main__":
result_message = main_apply_docs_json()
print("\n--- Script Execution Result ---")
print(result_message)

View File

@@ -222,7 +222,7 @@ def loop(dict):
)
if __name__ == "__main__":
def main_contributing_in_page():
process = {
# Help Documentation
"zh_help": {
@@ -263,4 +263,13 @@ if __name__ == "__main__":
"language": "ja"
},
}
loop(process)
try:
loop(process)
return "success"
except Exception as e:
return (f"{str(e)}")
if __name__ == "__main__":
result_message = main_contributing_in_page()
print("\n--- Script Execution Result ---")
print(result_message)

View File

@@ -0,0 +1,449 @@
import yaml # pip install pyyaml
import re
import datetime
from pathlib import Path
import shutil
import sys
class Config:
# --- Path Setup ---
BASE_DIR = Path(__file__).resolve().parent.parent
LANGUAGES = ["zh", "en", "ja"] # Languages to process
TIMESTAMP = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
# --- Directory Naming Templates ---
LANG_DIR_TEMPLATE = "plugin_dev_{lang}"
ARCHIVE_LANG_DIR_PREFIX_TEMPLATE = "plugin_dev_{lang}_archive_pre_processing_"
# --- PWXY Mappings ---
PRIMARY_TYPE_MAP = {
"conceptual": 1, "implementation": 2, "operational": 3, "reference": 4,
}
DEFAULT_W = 0
DETAIL_TYPE_MAPS = {
"conceptual": {"introduction": 1, "principles": 2, "architecture": 3},
"implementation": {"basic": 1, "standard": 2, "high": 3, "advanced": 4},
"operational": {"setup": 1, "deployment": 2, "maintenance": 3},
"reference": {"core": 1, "configuration": 2, "examples": 3},
}
DEFAULT_X = 0
LEVEL_MAP = {
"beginner": 1, "intermediate": 2, "advanced": 3,
}
DEFAULT_Y = 0
PRIORITY_NORMAL = 0
PRIORITY_HIGH = 9
PRIORITY_ADVANCED_LEVEL_KEY = "advanced"
PRIORITY_IMPLEMENTATION_PRIMARY_KEY = "implementation"
PRIORITY_IMPLEMENTATION_DETAIL_KEYS = {"high", "advanced"}
# --- Helper Functions ---
def extract_front_matter(content: str):
match = re.match(r"^\s*---\s*$(.*?)^---\s*$(.*)",
content, re.DOTALL | re.MULTILINE)
if match:
yaml_str = match.group(1).strip()
markdown_content = match.group(2).strip()
try:
front_matter = yaml.safe_load(yaml_str)
if front_matter is None: front_matter = {} # Ensure it's a dict for empty YAML
return (front_matter if isinstance(front_matter, dict) else {}), markdown_content
except yaml.YAMLError as e:
# This print is fine here as it's an early, critical parsing failure for a single file.
print(f" [Error] YAML Parsing Failed: {e}")
return None, content # Indicate error
return {}, content # No front matter found
def sanitize_filename_part(part: str) -> str:
if not isinstance(part, str):
part = str(part)
part = part.lower().replace("&", "and").replace("@", "at")
part = re.sub(r"\s+", "-", part)
part = re.sub(r"[^\w\-.]+", "", part) # Allow dots
part = part.strip(".-_")
return part or "untitled"
def _calculate_pwxy_and_warnings(front_matter: dict, config: Config) -> tuple[int, int, int, int, list[str]]:
warnings_messages = []
dimensions = front_matter.get("dimensions", {})
type_info = dimensions.get("type", {})
primary = type_info.get("primary") # Will be present due to compliance check if we reach here
detail = type_info.get("detail") # Will be present
level = dimensions.get("level") # Will be present
P = config.PRIORITY_NORMAL
if level == config.PRIORITY_ADVANCED_LEVEL_KEY or \
(primary == config.PRIORITY_IMPLEMENTATION_PRIMARY_KEY and detail in config.PRIORITY_IMPLEMENTATION_DETAIL_KEYS):
P = config.PRIORITY_HIGH
W = config.PRIMARY_TYPE_MAP.get(primary, config.DEFAULT_W)
primary_detail_map = config.DETAIL_TYPE_MAPS.get(primary, {})
X = primary_detail_map.get(detail, config.DEFAULT_X)
Y = config.LEVEL_MAP.get(level, config.DEFAULT_Y)
# Warnings for unmapped values (fields are assumed present from compliance check)
if W == config.DEFAULT_W and primary is not None : # primary being None shouldn't happen if compliant
warnings_messages.append(f"Unmapped primary type: '{primary}'. Using W={config.DEFAULT_W}")
if X == config.DEFAULT_X and detail is not None: # detail being None shouldn't happen
if primary in config.DETAIL_TYPE_MAPS:
warnings_messages.append(f"Unmapped detail type: '{detail}' for primary '{primary}'. Using X={config.DEFAULT_X}")
elif primary is not None: # Should always be true
warnings_messages.append(f"No detail map defined for primary type: '{primary}'. Using X={config.DEFAULT_X}")
if Y == config.DEFAULT_Y and level is not None: # level being None shouldn't happen
warnings_messages.append(f"Unmapped level: '{level}'. Using Y={config.DEFAULT_Y}")
return P, W, X, Y, warnings_messages
def _generate_filename_parts(
P: int, W: int, X: int, Y: int,
front_matter: dict,
original_filename_stem: str
) -> tuple[str | None, str, str, list[str]]:
warnings_messages = []
prefix_str = f"{P}{W}{X}{Y}"
try:
padded_prefix = f"{int(prefix_str):04d}"
except ValueError:
warnings_messages.append(f"Could not form numeric prefix from P{P}W{W}X{X}Y{Y}. Using '0000'.")
padded_prefix = "0000"
standard_title = front_matter.get("standard_title") # Assumed present (not None) by compliance check
title_part_to_use = standard_title
if not title_part_to_use: # Handles standard_title: "" (empty string)
warnings_messages.append("Empty 'standard_title'. Using original filename stem as fallback.")
title_part_to_use = original_filename_stem
sanitized_title = sanitize_filename_part(title_part_to_use)
lang_suffix = ""
language_fm = front_matter.get("language")
if language_fm is not None: # Check for presence
lang_code = str(language_fm).strip().lower()
if lang_code: lang_suffix = f".{lang_code}"
else: warnings_messages.append("Empty 'language' field in frontmatter. Omitting language suffix.")
else: # language field is missing
warnings_messages.append("Missing 'language' field in frontmatter. Omitting language suffix.")
return padded_prefix, sanitized_title, lang_suffix, warnings_messages
# --- Core Processing Functions ---
def get_or_create_lang_dir(lang: str, config: Config) -> tuple[Path | None, bool]:
lang_dir_path = config.BASE_DIR / config.LANG_DIR_TEMPLATE.format(lang=lang)
was_newly_created = False
if lang_dir_path.exists():
if not lang_dir_path.is_dir():
print(f"[ERROR] Path '{lang_dir_path}' exists but is not a directory. Skipping '{lang}'.")
return None, False
print(f"Using existing directory '{lang_dir_path.name}' for '{lang}'.")
else:
print(f"Directory '{lang_dir_path.name}' not found for '{lang}'. Creating it.")
try:
lang_dir_path.mkdir(parents=True, exist_ok=False)
was_newly_created = True
print(f"Created directory: '{lang_dir_path.name}' for '{lang}'.")
except Exception as e: # Catch any OS or File system error
print(f"[ERROR] Failed to create directory '{lang_dir_path}': {e}. Skipping '{lang}'.")
return None, False
return lang_dir_path, was_newly_created
def process_single_mdx_file(mdx_filepath: Path, config: Config) -> dict:
stats = {
"status": "processed", "all_file_warnings": [], "error_message": None, # all_file_warnings replaces warnings
"old_filename_stem_for_replace": None, "new_filename_stem_for_replace": None,
"problem_file_display_path": None, "problem_file_target_name": None,
"non_compliant_reason": None,
}
try:
if mdx_filepath.is_relative_to(config.BASE_DIR):
# Show path relative to BASE_DIR's parent for context (e.g. "lang_dir/file.mdx")
display_path = mdx_filepath.relative_to(config.BASE_DIR.parent).as_posix()
else:
display_path = mdx_filepath.name # Fallback
except ValueError: # Should not happen if path is under BASE_DIR
display_path = mdx_filepath.name
stats["problem_file_display_path"] = display_path
try:
content = mdx_filepath.read_text(encoding="utf-8")
front_matter, _ = extract_front_matter(content)
if front_matter is None: # YAML Parsing Error from extract_front_matter
stats["status"] = "error"; stats["error_message"] = "YAML Error in file."
# The extract_front_matter already printed details
print(f"\nProcessing: {display_path} -> (skipped due to YAML error)")
return stats
# --- Check for critical missing frontmatter for renaming compliance ---
missing_critical_fields = []
fm_dimensions = front_matter.get("dimensions", {})
fm_type = fm_dimensions.get("type", {})
if fm_type.get("primary") is None: missing_critical_fields.append("dimensions.type.primary")
if fm_type.get("detail") is None: missing_critical_fields.append("dimensions.type.detail")
if fm_dimensions.get("level") is None: missing_critical_fields.append("dimensions.level")
if front_matter.get("standard_title") is None: missing_critical_fields.append("standard_title")
if missing_critical_fields:
stats["status"] = "skipped_non_compliant"
reason = f"Missing critical frontmatter fields for renaming: {', '.join(missing_critical_fields)}."
stats["non_compliant_reason"] = reason
# This skip will be reported as a problem, so print concise info here
print(f"\nProcessing: {display_path} -> (skipped, non-compliant)")
print(f" [Skipping Reason] {reason}")
return stats # No further processing or warning generation for this file
# --- If compliant, proceed to calculate and generate parts ---
P, W, X, Y, pwxy_warnings = _calculate_pwxy_and_warnings(front_matter, config)
stats["all_file_warnings"].extend(pwxy_warnings)
original_stem_for_title_fallback = mdx_filepath.stem
padded_prefix, sanitized_title, lang_suffix, fname_warnings = _generate_filename_parts(
P, W, X, Y, front_matter, original_stem_for_title_fallback)
stats["all_file_warnings"].extend(fname_warnings)
new_filename = f"{padded_prefix}-{sanitized_title}{lang_suffix}.mdx"
stats["problem_file_target_name"] = new_filename
new_filepath = mdx_filepath.with_name(new_filename)
if new_filepath == mdx_filepath:
stats["status"] = "skipped_no_change"
elif new_filepath.exists():
stats["status"] = "skipped_target_exists"
else:
try:
original_stem_before_rename = mdx_filepath.stem
mdx_filepath.rename(new_filepath)
stats["status"] = "processed"
stats["old_filename_stem_for_replace"] = original_stem_before_rename
stats["new_filename_stem_for_replace"] = new_filepath.stem
except Exception as rename_error:
stats["status"] = "error"
stats["error_message"] = f"Failed to rename to '{new_filename}': {rename_error}"
# Print details if warnings, actual change, error, or specific skips (except non-compliant already printed)
action_taken = new_filepath != mdx_filepath and stats["status"] == "processed"
if stats["all_file_warnings"] or action_taken or stats["status"].startswith("error") or stats["status"] == "skipped_target_exists":
print(f"\nProcessing: {display_path} -> {new_filename if action_taken else '(no change or skipped/error)'}")
for warning_msg in stats["all_file_warnings"]: print(f" [Warning] {warning_msg}") # These will now be problem reports
if stats["status"] == "skipped_target_exists": print(f" [Skipping] Target '{new_filename}' already exists.")
if stats["error_message"]: print(f" [Error] {stats['error_message']}")
except FileNotFoundError: # Should be rare if mdx_filepath comes from rglob
stats["status"] = "error"; stats["error_message"] = f"File not found: {mdx_filepath.name}"
print(f"\nProcessing: {display_path}"); print(f" [Error] {stats['error_message']}")
except Exception as e:
stats["status"] = "error"; stats["error_message"] = f"Unexpected error: {e}"
print(f"\nProcessing: {display_path}"); print(f" [Error] Unexpected error processing file: {e}")
import traceback; traceback.print_exc()
return stats
def run_processing_for_language(lang_dir_path: Path, config: Config) -> dict:
print(f"Starting in-place processing for: {lang_dir_path.name}")
lang_stats = {
"processed_count": 0, "skipped_no_change_count": 0,
"skipped_target_exists_count": 0, "error_count": 0,
"warning_files_count": 0, # This counts files that HAD warnings.
"status": "OK",
"dir_path_str": str(lang_dir_path.relative_to(config.BASE_DIR)), # Path relative to project root
"content_replacements_made_count": 0, "content_replacement_errors_count": 0,
"error_file_details": [], "skipped_target_exists_details": [],
"content_replacement_error_details": [],
"skipped_non_compliant_count": 0,
"skipped_non_compliant_details": [],
"files_with_processing_warnings_details": [], # New: to store path and specific warnings
}
if not lang_dir_path.exists() or not lang_dir_path.is_dir():
lang_stats["status"] = "LANG_DIR_ERROR"
print(f"[Error] Language directory '{lang_dir_path.name}' issue (not found or not a dir).")
return lang_stats
print(f"\n--- Phase 1: Renaming files in '{lang_dir_path.name}' ---")
mdx_files = sorted(list(lang_dir_path.rglob("*.mdx")))
total_files = len(mdx_files)
print(f"Found {total_files} MDX files for renaming phase.")
rename_mappings = []
for i, mdx_filepath in enumerate(mdx_files):
result = process_single_mdx_file(mdx_filepath, config)
status = result["status"]
if status == "processed":
lang_stats["processed_count"] += 1
old, new = result.get("old_filename_stem_for_replace"), result.get("new_filename_stem_for_replace")
if old and new and old != new: rename_mappings.append((old, new))
elif status == "skipped_no_change": lang_stats["skipped_no_change_count"] += 1
elif status == "skipped_target_exists":
lang_stats["skipped_target_exists_count"] += 1
lang_stats["skipped_target_exists_details"].append({
"original_display_path": result["problem_file_display_path"],
"target_name": result["problem_file_target_name"]
})
elif status == "skipped_non_compliant":
lang_stats["skipped_non_compliant_count"] += 1
lang_stats["skipped_non_compliant_details"].append({
"path": result["problem_file_display_path"],
"reason": result["non_compliant_reason"]
})
elif status == "error":
lang_stats["error_count"] += 1
lang_stats["error_file_details"].append({
"path": result["problem_file_display_path"], "message": result["error_message"]
})
if result["all_file_warnings"]: # If there were any warnings for this file
lang_stats["warning_files_count"] += 1 # Increment count of files with warnings
lang_stats["files_with_processing_warnings_details"].append({ # Store details for problem reporting
"path": result["problem_file_display_path"],
"warnings": result["all_file_warnings"]
})
if total_files > 0: print(f"Rename Progress ({lang_dir_path.name}): {i+1}/{total_files} ({((i+1)/total_files*100):.1f}%)", end="\r")
if total_files > 0: print() # Newline after progress
print("--- Phase 1: Renaming files complete. ---")
if rename_mappings:
print(f"\n--- Phase 2: Updating content references in '{lang_dir_path.name}' ({len(rename_mappings)} filename changes to propagate) ---")
all_mdx_after_rename = sorted(list(lang_dir_path.rglob("*.mdx")))
total_replace_scan = len(all_mdx_after_rename)
print(f"Scanning {total_replace_scan} .mdx files for content updates.")
updated_count = 0
for i, scan_path in enumerate(all_mdx_after_rename):
display_scan_path = scan_path.relative_to(config.BASE_DIR.parent).as_posix() # Consistent display path
try:
content, changed = scan_path.read_text(encoding="utf-8"), False
mod_content = content
for old, new in rename_mappings:
if old in mod_content: mod_content, changed = mod_content.replace(old, new), True
if changed:
scan_path.write_text(mod_content, encoding="utf-8")
updated_count +=1; print(f" Updated references in: {display_scan_path}")
except Exception as e:
err_msg = f"Failed to update references in {display_scan_path}: {e}"
print(f" [Error] {err_msg}")
lang_stats["content_replacement_errors_count"] += 1
lang_stats["content_replacement_error_details"].append({"path": display_scan_path, "error": str(e)})
if total_replace_scan > 0: print(f"Content Update Progress ({lang_dir_path.name}): {i+1}/{total_replace_scan} ({((i+1)/total_replace_scan*100):.1f}%)", end="\r")
if total_replace_scan > 0: print() # Newline after progress
lang_stats["content_replacements_made_count"] = updated_count
print(f"Content replacement phase: {updated_count} files had their content updated.")
print("--- Phase 2: Content references update complete. ---")
else: print("\nNo renames occurred, skipping content reference update phase.")
print("-" * 20 + f"\nLanguage Processing Summary ({lang_dir_path.name}):")
print(f" Processed (renamed): {lang_stats['processed_count']}")
print(f" Skipped (no change): {lang_stats['skipped_no_change_count']}")
print(f" Skipped (target exists): {lang_stats['skipped_target_exists_count']}")
print(f" Skipped (non-compliant for rename): {lang_stats['skipped_non_compliant_count']}")
print(f" Files generating warnings: {lang_stats['warning_files_count']}") # Renamed for clarity
print(f" Errors (renaming phase): {lang_stats['error_count']}")
if rename_mappings or lang_stats['content_replacement_errors_count'] > 0 or lang_stats['content_replacements_made_count'] > 0:
print(f" Content updated (references): {lang_stats['content_replacements_made_count']}")
print(f" Errors (content update): {lang_stats['content_replacement_errors_count']}")
print("-" * 20)
# A language dir has errors if file errors or content replacement errors occurred.
# Non-compliant skips or warnings are now also reported as "problems" at main level,
# but don't change the "ERRORS_IN_PROCESSING" status of the language itself here.
# The main problem report will cover those.
if lang_stats["error_count"] > 0 or lang_stats["content_replacement_errors_count"] > 0:
lang_stats["status"] = "ERRORS_IN_PROCESSING"
return lang_stats
def main_rename_by_dimensions() -> str: # Return type is now str
config = Config()
print(f"Base directory: {config.BASE_DIR}\nTimestamp for this run: {config.TIMESTAMP}")
overall_summary, lang_dir_created_flags, lang_dirs_map = {}, {}, {}
problem_reports_list = [] # Internal list to build up problem strings
for lang in config.LANGUAGES:
print(f"\n{'='*10} Processing Language: {lang.upper()} {'='*10}")
current_lang_dir, was_newly_created = get_or_create_lang_dir(lang, config)
lang_dir_created_flags[lang], lang_dirs_map[lang] = was_newly_created, current_lang_dir
if not current_lang_dir:
msg = f"Failed to get or create language directory for '{lang}'."
overall_summary[lang] = {"status": "SETUP_ERROR", "message": msg}
problem_reports_list.append(f"- Lang '{lang}': Setup Error - {msg}")
continue
lang_results = run_processing_for_language(current_lang_dir, config)
overall_summary[lang] = lang_results
# Cleanup empty newly created directory
if current_lang_dir and was_newly_created and current_lang_dir.exists() and not any(current_lang_dir.iterdir()):
try:
current_lang_dir.rmdir(); print(f" Removed empty newly created language directory: {current_lang_dir.name}")
lang_dirs_map[lang] = None # Mark as gone
# No need to add to lang_results["message"] as it's a normal cleanup
except OSError as e: print(f" Note: Could not remove empty newly created directory '{current_lang_dir.name}': {e}")
print("\n\n" + "=" * 20 + " Overall Script Summary " + "=" * 20)
for lang_code in config.LANGUAGES:
summary = overall_summary.get(lang_code, {})
lang_dir_path_obj = lang_dirs_map.get(lang_code)
print(f"\nLanguage: {lang_code.upper()}\n Status: {summary.get('status', 'UNKNOWN')}")
if "message" in summary and summary['status'] in ["SETUP_ERROR", "LANG_DIR_ERROR"]: # Critical setup messages
print(f" Message: {summary['message']}")
if summary.get('status') not in ["SETUP_ERROR", "LANG_DIR_ERROR"]:
print(f" Directory: {summary.get('dir_path_str', 'N/A')}")
for key, label in [
("processed_count", "Processed (renamed)"),
("skipped_no_change_count", "Skipped (no change)"),
("skipped_target_exists_count", "Skipped (target exists)"),
("skipped_non_compliant_count", "Skipped (non-compliant for rename)"),
("warning_files_count", "Files generating warnings"),
("error_count", "Errors (renaming phase)"),
("content_replacements_made_count", "Content updated (references)"),
("content_replacement_errors_count", "Errors (content update)")
]: # Iterate common stats
if key in summary: print(f" {label}: {summary.get(key, 0)}")
# Collect problem reports for return
for detail in summary.get("error_file_details", []):
problem_reports_list.append(f"- Lang '{lang_code}': File '{detail['path']}' - Renaming error: {detail['message']}")
for detail in summary.get("skipped_target_exists_details", []):
problem_reports_list.append(f"- Lang '{lang_code}': File '{detail['original_display_path']}' could not be renamed to '{detail['target_name']}' (target exists).")
for detail in summary.get("skipped_non_compliant_details", []): # Now a problem
problem_reports_list.append(f"- Lang '{lang_code}': File '{detail['path']}' - Skipped (non-compliant): {detail['reason']}")
for detail in summary.get("files_with_processing_warnings_details", []): # Now a problem
warnings_str = "; ".join(detail['warnings'])
problem_reports_list.append(f"- Lang '{lang_code}': File '{detail['path']}' - Processing Warnings: {warnings_str}")
for detail in summary.get("content_replacement_error_details", []):
problem_reports_list.append(f"- Lang '{lang_code}': File '{detail['path']}' - Content replacement error: {detail['error']}")
if lang_dir_path_obj and lang_dir_path_obj.exists():
print(f" Final directory location: {lang_dir_path_obj.name}")
elif lang_dir_created_flags.get(lang_code) and not lang_dir_path_obj: # Was new, now gone
print(" Note: Empty newly created directory was removed as expected.")
elif not lang_dir_path_obj and summary.get('status') != "SETUP_ERROR": # Not a setup error, but dir is gone
print(f" Note: Language directory '{config.LANG_DIR_TEMPLATE.format(lang=lang_code)}' may have been archived or removed by other means.")
print("=" * (40 + len(" Overall Script Summary ")))
if not problem_reports_list:
return "success"
else:
return "\n".join(problem_reports_list)
if __name__ == "__main__":
result_message = main_rename_by_dimensions()
print("\n--- Script Execution Result ---")
print(result_message)