From 7679eb4b291adc0a18fc22a42e5121ad41a1e0d6 Mon Sep 17 00:00:00 2001 From: Alter-xyz <88554920+alterxyz@users.noreply.github.com> Date: Sat, 17 May 2025 22:16:47 +0800 Subject: [PATCH] chore: tools update --- tools/1_rename_by_dimensions.py | 560 ------------- tools/2_apply_docs_json.py | 777 ------------------ tools/apply_docs_json.py | 698 ++++++++++++++++ ...ing_in_page.py => contributing_in_page.py} | 13 +- tools/rename_by_dimensions.py | 449 ++++++++++ ...ces.py => temp_add_deprecation_notices.py} | 0 6 files changed, 1158 insertions(+), 1339 deletions(-) delete mode 100644 tools/1_rename_by_dimensions.py delete mode 100644 tools/2_apply_docs_json.py create mode 100644 tools/apply_docs_json.py rename tools/{2_contributing_in_page.py => contributing_in_page.py} (97%) create mode 100644 tools/rename_by_dimensions.py rename tools/{add_deprecation_notices.py => temp_add_deprecation_notices.py} (100%) diff --git a/tools/1_rename_by_dimensions.py b/tools/1_rename_by_dimensions.py deleted file mode 100644 index 0058681e..00000000 --- a/tools/1_rename_by_dimensions.py +++ /dev/null @@ -1,560 +0,0 @@ -import yaml # pip install pyyaml -import re -import datetime -from pathlib import Path -import shutil - - -class Config: - # --- Path Setup --- - BASE_DIR = Path(__file__).resolve().parent.parent - LANGUAGES = ["zh", "en", "ja"] # Languages to process - # Still useful for potential internal archiving if needed - TIMESTAMP = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") - - # --- Directory Naming Templates --- - # This is the directory we look for, operate within, and is the final name. - LANG_DIR_TEMPLATE = "plugin_dev_{lang}" - - # Prefix for archiving a LANG_DIR_TEMPLATE if (for some external reason) - # we wanted to back it up before processing. Not used in the main flow currently - # but kept as a utility. - ARCHIVE_LANG_DIR_PREFIX_TEMPLATE = "plugin_dev_{lang}_archive_pre_processing_" - - # --- PWXY Mappings --- - PRIMARY_TYPE_MAP = { - "conceptual": 1, - "implementation": 2, - "operational": 3, - "reference": 4, - } - DEFAULT_W = 0 - DETAIL_TYPE_MAPS = { - "conceptual": {"introduction": 1, "principles": 2, "architecture": 3}, - "implementation": {"basic": 1, "standard": 2, "high": 3, "advanced": 4}, - "operational": {"setup": 1, "deployment": 2, "maintenance": 3}, - "reference": {"core": 1, "configuration": 2, "examples": 3}, - } - DEFAULT_X = 0 - LEVEL_MAP = { - "beginner": 1, - "intermediate": 2, - "advanced": 3, - } - DEFAULT_Y = 0 - PRIORITY_NORMAL = 0 - PRIORITY_HIGH = 9 - PRIORITY_ADVANCED_LEVEL_KEY = "advanced" - PRIORITY_IMPLEMENTATION_PRIMARY_KEY = "implementation" - PRIORITY_IMPLEMENTATION_DETAIL_KEYS = {"high", "advanced"} - -# --- Helper Functions --- - - -def extract_front_matter(content: str): - match = re.match(r"^\s*---\s*$(.*?)^---\s*$(.*)", - content, re.DOTALL | re.MULTILINE) - if match: - yaml_str = match.group(1).strip() - markdown_content = match.group(2).strip() - try: - front_matter = yaml.safe_load(yaml_str) - if front_matter is None: # Handles empty YAML (--- \n ---) - return {}, markdown_content - return ( - front_matter if isinstance(front_matter, dict) else {} - ), markdown_content - except yaml.YAMLError as e: - print(f" [Error] YAML Parsing Failed: {e}") - return None, content # Indicate error - else: - return {}, content # No front matter found - - -def sanitize_filename_part(part: str) -> str: - if not isinstance(part, str): - part = str(part) - part = part.lower() - part = part.replace("&", "and").replace("@", "at") - part = re.sub(r"\s+", "-", part) - # Allow dots for language suffix in stem - part = re.sub(r"[^\w\-.]+", "", part) - part = part.strip(".-_") - return part or "untitled" - - -def _calculate_pwxy_and_warnings(front_matter: dict, config: Config) -> tuple[int, int, int, int, list[str]]: - """Calculates P, W, X, Y values and generates warnings for missing/unmapped data.""" - warnings_messages = [] - dimensions = front_matter.get("dimensions", {}) - type_info = dimensions.get("type", {}) - primary = type_info.get("primary") - detail = type_info.get("detail") - level = dimensions.get("level") - - P = config.PRIORITY_NORMAL - if level == config.PRIORITY_ADVANCED_LEVEL_KEY: - P = config.PRIORITY_HIGH - if ( - primary == config.PRIORITY_IMPLEMENTATION_PRIMARY_KEY - and detail in config.PRIORITY_IMPLEMENTATION_DETAIL_KEYS - ): - P = config.PRIORITY_HIGH - - W = config.PRIMARY_TYPE_MAP.get(primary, config.DEFAULT_W) - primary_detail_map = config.DETAIL_TYPE_MAPS.get(primary, {}) - X = primary_detail_map.get(detail, config.DEFAULT_X) - Y = config.LEVEL_MAP.get(level, config.DEFAULT_Y) - - if primary is None: - warnings_messages.append("Missing dimensions.type.primary") - elif W == config.DEFAULT_W: - warnings_messages.append( - f"Unmapped primary type: '{primary}'. Using W={config.DEFAULT_W}") - if detail is None: - warnings_messages.append("Missing dimensions.type.detail") - elif X == config.DEFAULT_X and primary in config.DETAIL_TYPE_MAPS: - warnings_messages.append( - f"Unmapped detail type: '{detail}' for primary '{primary}'. Using X={config.DEFAULT_X}") - elif primary not in config.DETAIL_TYPE_MAPS and primary is not None: - warnings_messages.append( - f"No detail map defined for primary type: '{primary}'. Using X={config.DEFAULT_X}") - if level is None: - warnings_messages.append("Missing dimensions.level") - elif Y == config.DEFAULT_Y: - warnings_messages.append( - f"Unmapped level: '{level}'. Using Y={config.DEFAULT_Y}") - - return P, W, X, Y, warnings_messages - - -def _generate_filename_parts( - P: int, W: int, X: int, Y: int, - front_matter: dict, - original_filename_stem: str -) -> tuple[str | None, str, str, list[str]]: - """Generates padded prefix, sanitized title, language suffix, and any warnings.""" - warnings_messages = [] - - prefix_str = f"{P}{W}{X}{Y}" - try: - numeric_prefix = int(prefix_str) - padded_prefix = f"{numeric_prefix:04d}" - except ValueError: - warnings_messages.append( - f"Could not form numeric prefix from P={P},W={W},X={X},Y={Y}. Using '0000'.") - padded_prefix = "0000" - - standard_title = front_matter.get("standard_title") - title_part_to_use = standard_title - if not title_part_to_use: - warnings_messages.append( - "Missing 'standard_title'. Using original filename stem as fallback.") - title_part_to_use = original_filename_stem - sanitized_title = sanitize_filename_part(title_part_to_use) - - lang_suffix = "" - language_fm = front_matter.get("language") - if language_fm: - lang_code = str(language_fm).strip().lower() - if lang_code: - lang_suffix = f".{lang_code}" - else: - warnings_messages.append( - "Empty 'language' field in frontmatter. Omitting language suffix.") - else: - warnings_messages.append( - "Missing 'language' field in frontmatter. Omitting language suffix.") - - return padded_prefix, sanitized_title, lang_suffix, warnings_messages - -# --- Core Processing Functions --- - - -def get_or_create_lang_dir(lang: str, config: Config) -> tuple[Path | None, bool]: - """ - Identifies the language-specific directory. Creates it if it doesn't exist. - This directory will be processed in-place. - - Returns: - - Path | None: The path to the language directory, or None on critical error. - - bool: True if the directory was newly created (was_absent), False otherwise. - """ - lang_dir_name = config.LANG_DIR_TEMPLATE.format(lang=lang) - lang_dir_path = config.BASE_DIR / lang_dir_name - was_newly_created = False - - if lang_dir_path.exists(): - if not lang_dir_path.is_dir(): - print( - f"[ERROR] Path '{lang_dir_path}' exists but is not a directory. Skipping language '{lang}'.") - return None, False - print( - f"Using existing directory '{lang_dir_path.name}' for in-place processing of '{lang}'.") - else: - print( - f"Directory '{lang_dir_path.name}' not found for language '{lang}'. Creating it.") - try: - # exist_ok=False to ensure it's new - lang_dir_path.mkdir(parents=True, exist_ok=False) - was_newly_created = True - print(f"Created directory: '{lang_dir_path.name}' for '{lang}'.") - except FileExistsError: # Should not happen due to prior .exists() check, but for safety - print( - f"[ERROR] Directory '{lang_dir_path.name}' unexpectedly created by another process. Attempting to use it.") - if not lang_dir_path.is_dir(): # Verify it's a dir - print( - f"[ERROR] Path '{lang_dir_path}' is not a directory after attempted creation. Skipping language '{lang}'.") - return None, False - was_newly_created = False # It existed. - except OSError as e: - print( - f"[ERROR] Failed to create directory '{lang_dir_path}': {e}. Skipping language '{lang}'.") - return None, False - - return lang_dir_path, was_newly_created - - -def archive_existing_directory(path_to_archive: Path, archive_prefix_template: str, lang: str, timestamp: str) -> bool: - """ - Archives the given directory if it exists. - The archive_prefix_template should be like "plugin_dev_{lang}_archive_". - Returns True if path is clear for use (was archived or didn't exist), False on error or if path is not a dir. - """ - if path_to_archive.exists(): - if path_to_archive.is_dir(): - archive_base_name = archive_prefix_template.format(lang=lang) - archive_dir_name = f"{archive_base_name}{timestamp}" - archive_dir_path = path_to_archive.parent / archive_dir_name - try: - if archive_dir_path.exists(): # Safety: if target archive name exists, remove it. - print( - f" [Warning] Archive destination '{archive_dir_path}' already exists. Removing it first to avoid error during move.") - shutil.rmtree(archive_dir_path) - shutil.move(str(path_to_archive), str(archive_dir_path)) - print( - f" Archived existing directory '{path_to_archive.name}' to '{archive_dir_path.name}'.") - return True # Path is now clear because original was moved - except OSError as e: - print( - f" [Error] Failed to archive existing directory '{path_to_archive.name}' to '{archive_dir_path.name}': {e}") - return False - else: - print( - f" [Error] Path '{path_to_archive}' exists but is not a directory. Cannot archive.") - return False - return True # Path didn't exist, so it's clear - - -def process_single_mdx_file( - mdx_filepath: Path, - config: Config -) -> dict: - """ - Processes a single MDX file: extracts metadata, generates new filename, - and renames the file in place. - Returns stats, including old and new filename stems if renamed. - """ - stats = { - "status": "processed", - "warnings": [], - "error_message": None, - "old_filename_stem_for_replace": None, - "new_filename_stem_for_replace": None, - } - display_path = mdx_filepath.name - if mdx_filepath.parent != config.BASE_DIR: - try: - # Show relative path from the language directory's parent (BASE_DIR) - display_path = mdx_filepath.relative_to( - mdx_filepath.parent.parent).as_posix() - except ValueError: - display_path = mdx_filepath.relative_to(config.BASE_DIR).as_posix() - - file_warnings = [] - - try: - content = mdx_filepath.read_text(encoding="utf-8") - front_matter, _ = extract_front_matter(content) - - if front_matter is None: - stats["status"] = "error" - stats["error_message"] = "YAML Error in file." - print(f"\nProcessing: {display_path}") - print(f" [Skipping] {stats['error_message']}") - return stats - - P, W, X, Y, pwxy_warnings = _calculate_pwxy_and_warnings( - front_matter, config) - file_warnings.extend(pwxy_warnings) - - original_stem_for_title_fallback = mdx_filepath.stem # Used if standard_title is missing - - padded_prefix, sanitized_title, lang_suffix, fname_warnings = _generate_filename_parts( - P, W, X, Y, front_matter, original_stem_for_title_fallback - ) - file_warnings.extend(fname_warnings) - - # padded_prefix has a fallback to "0000", so it should not be None - new_filename = f"{padded_prefix}-{sanitized_title}{lang_suffix}.mdx" - new_filepath = mdx_filepath.with_name(new_filename) - - if new_filepath == mdx_filepath: - stats["status"] = "skipped_no_change" - elif new_filepath.exists(): - stats["status"] = "skipped_target_exists" - else: - try: - original_stem_before_rename = mdx_filepath.stem # Capture actual stem before rename - mdx_filepath.rename(new_filepath) - stats["status"] = "processed" - # Store stems for content replacement phase - stats["old_filename_stem_for_replace"] = original_stem_before_rename - stats["new_filename_stem_for_replace"] = new_filepath.stem - except Exception as rename_error: - stats["status"] = "error" - stats["error_message"] = f"Failed to rename file to '{new_filename}': {rename_error}" - # Defer printing to main loop for consistency - return stats - - stats["warnings"] = file_warnings - action_taken = new_filepath != mdx_filepath and stats["status"] == "processed" - - # Only print details if there are warnings or an actual change/error for this file - if file_warnings or action_taken or stats["status"].startswith("error") or stats["status"] == "skipped_target_exists": - print( - f"\nProcessing: {display_path} -> {new_filename if action_taken else '(no change or skipped)'}") - for warning_msg in file_warnings: - print(f" [Warning] {warning_msg}") - if stats["status"] == "skipped_target_exists": - print( - f" [Skipping] Target filename '{new_filename}' already exists in this directory.") - if stats["error_message"]: - print(f" [Error] {stats['error_message']}") - - except FileNotFoundError: - stats["status"] = "error" - stats["error_message"] = f"File not found during processing: {mdx_filepath}" - print(f"\nProcessing: {display_path}") - print(f" [Error] {stats['error_message']}") - except Exception as e: - stats["status"] = "error" - stats["error_message"] = f"Unexpected error: {e}" - print(f"\nProcessing: {display_path}") - print(f" [Error] Unexpected error processing file: {e}") - import traceback - traceback.print_exc() - return stats - - -def run_processing_for_language( - lang_dir_path: Path, - config: Config -) -> dict: - """Processes all MDX files in the lang_dir_path by renaming them in place, - then updates internal content references.""" - print(f"Starting in-place processing for: {lang_dir_path.name}") - - lang_stats = { - "processed_count": 0, - "skipped_no_change_count": 0, - "skipped_target_exists_count": 0, - "error_count": 0, - "warning_files_count": 0, - "status": "OK", - "dir_path_str": str(lang_dir_path.relative_to(config.BASE_DIR)), - "content_replacements_made_count": 0, - "content_replacement_errors_count": 0, - } - - if not lang_dir_path.exists() or not lang_dir_path.is_dir(): - print( - f"[Error] Language directory '{lang_dir_path.name}' does not exist or is not a directory.") - lang_stats["status"] = "LANG_DIR_ERROR" - return lang_stats - - # --- Phase 1: Rename files --- - print(f"\n--- Phase 1: Renaming files in '{lang_dir_path.name}' ---") - mdx_files = sorted(list(lang_dir_path.rglob("*.mdx"))) - total_files = len(mdx_files) - print(f"Found {total_files} MDX files to process for renaming.") - - rename_mappings = [] # List to store (old_stem, new_stem) for content replacement - - for i, mdx_filepath in enumerate(mdx_files): - result = process_single_mdx_file(mdx_filepath, config) - - if result["status"] == "processed": - lang_stats["processed_count"] += 1 - # Check if stems were provided and different (meaning a rename happened) - old_stem = result.get("old_filename_stem_for_replace") - new_stem = result.get("new_filename_stem_for_replace") - if old_stem and new_stem and old_stem != new_stem: - rename_mappings.append((old_stem, new_stem)) - elif result["status"] == "skipped_no_change": - lang_stats["skipped_no_change_count"] += 1 - elif result["status"] == "skipped_target_exists": - lang_stats["skipped_target_exists_count"] += 1 - elif result["status"] == "error": - lang_stats["error_count"] += 1 - - if result["warnings"]: - lang_stats["warning_files_count"] += 1 - - if total_files > 0: - progress = (i + 1) / total_files * 100 - print( - f"Rename Progress ({lang_dir_path.name}): {i+1}/{total_files} files ({progress:.1f}%) evaluated.", end="\r") - - if total_files > 0: - print() # Newline after progress bar - print("--- Phase 1: Renaming files complete. ---") - - # --- Phase 2: Update content references --- - if rename_mappings: - print(f"\n--- Phase 2: Updating content references in '{lang_dir_path.name}' ---") - print(f"Found {len(rename_mappings)} filename changes to propagate.") - # Re-glob for files, as their names might have changed. - # Also, we need to process all files, not just the renamed ones. - all_mdx_files_after_rename = sorted(list(lang_dir_path.rglob("*.mdx"))) - total_files_for_replacement = len(all_mdx_files_after_rename) - print(f"Scanning {total_files_for_replacement} .mdx files for content updates.") - - files_content_updated = 0 - for i, file_to_scan_path in enumerate(all_mdx_files_after_rename): - try: - original_content = file_to_scan_path.read_text(encoding="utf-8") - modified_content = original_content - file_actually_changed_by_replacement = False - - for old_stem, new_stem in rename_mappings: - if old_stem in modified_content: # Check if old_stem exists before replacing - temp_content = modified_content.replace(old_stem, new_stem) - if temp_content != modified_content: - modified_content = temp_content - file_actually_changed_by_replacement = True - - if file_actually_changed_by_replacement: - file_to_scan_path.write_text(modified_content, encoding="utf-8") - files_content_updated +=1 - print(f" Updated references in: {file_to_scan_path.relative_to(lang_dir_path)}") - except Exception as e: - print(f" [Error] Failed to update references in {file_to_scan_path.name}: {e}") - lang_stats["content_replacement_errors_count"] += 1 - - if total_files_for_replacement > 0: - progress = (i + 1) / total_files_for_replacement * 100 - print( - f"Content Update Progress ({lang_dir_path.name}): {i+1}/{total_files_for_replacement} files ({progress:.1f}%) scanned.", end="\r") - - if total_files_for_replacement > 0: - print() # Newline after progress bar - - lang_stats["content_replacements_made_count"] = files_content_updated - print(f"Content replacement phase: {files_content_updated} files had their content updated.") - print("--- Phase 2: Content references update complete. ---") - else: - print("\nNo renames occurred, skipping content reference update phase.") - - - print("-" * 20) - print(f"Language Processing Summary ({lang_dir_path.name}):") - print(f" Successfully processed (renamed): {lang_stats['processed_count']}") - print(f" Checked (filename no change): {lang_stats['skipped_no_change_count']}") - print(f" Skipped (target filename exists): {lang_stats['skipped_target_exists_count']}") - print(f" Files with warnings: {lang_stats['warning_files_count']}") - print(f" Errors during file processing: {lang_stats['error_count']}") - if rename_mappings: # Only show if phase 2 ran - print(f" Files with content updated (references): {lang_stats['content_replacements_made_count']}") - print(f" Errors during content update: {lang_stats['content_replacement_errors_count']}") - print("-" * 20) - - if lang_stats["error_count"] > 0 or lang_stats["content_replacement_errors_count"] > 0: - lang_stats["status"] = "ERRORS_IN_PROCESSING" - return lang_stats - -# --- Main Orchestration --- - - -def main(): - config = Config() - print(f"Base directory: {config.BASE_DIR}") - print(f"Timestamp for this run: {config.TIMESTAMP}") - - overall_summary = {} - lang_dir_newly_created_flags = {} - lang_dirs_map = {} - - for lang in config.LANGUAGES: - print(f"\n{'='*10} Processing Language: {lang.upper()} {'='*10}") - - current_lang_dir, was_newly_created = get_or_create_lang_dir( - lang, config) - lang_dir_newly_created_flags[lang] = was_newly_created - lang_dirs_map[lang] = current_lang_dir - - if not current_lang_dir: - overall_summary[lang] = { - "status": "SETUP_ERROR", "message": f"Failed to get or create language directory for {lang}."} - continue - - lang_results = run_processing_for_language(current_lang_dir, config) - overall_summary[lang] = lang_results - - if current_lang_dir: - if lang_results["status"] in ["OK", "ERRORS_IN_PROCESSING"]: - if was_newly_created and current_lang_dir.exists() and not any(current_lang_dir.iterdir()): - try: - current_lang_dir.rmdir() - print( - f" Removed empty newly created language directory: {current_lang_dir.name}") - lang_dirs_map[lang] = None - lang_results["message"] = lang_results.get( - "message", "") + " Empty newly created directory removed." - except OSError as e: - print( - f" Note: Could not remove empty newly created directory '{current_lang_dir.name}': {e}") - - print("\n\n" + "=" * 20 + " Overall Script Summary " + "=" * 20) - for lang_code in config.LANGUAGES: - summary = overall_summary.get(lang_code, {}) - lang_dir_path_obj = lang_dirs_map.get(lang_code) - - print(f"\nLanguage: {lang_code.upper()}") - status = summary.get("status", "UNKNOWN") - print(f" Status: {status}") - - if "message" in summary: - print(f" Message: {summary['message']}") - - if status not in ["SETUP_ERROR", "SETUP_ERROR_POST_ARCHIVE", "PRE_ARCHIVE_ERROR", "LANG_DIR_ERROR"]: - print(f" Directory: {summary.get('dir_path_str', 'N/A')}") - print( - f" Processed (renamed): {summary.get('processed_count', 0)}") - print( - f" Checked (no name change): {summary.get('skipped_no_change_count', 0)}") - print( - f" Skipped (target exists): {summary.get('skipped_target_exists_count', 0)}") - print( - f" Files with Warnings: {summary.get('warning_files_count', 0)}") - print( - f" Errors during file processing: {summary.get('error_count', 0)}") - if summary.get('processed_count', 0) > 0 or "content_replacements_made_count" in summary : # Show only if relevant - print(f" Files with content updated (references): {summary.get('content_replacements_made_count',0)}") - print(f" Errors during content update: {summary.get('content_replacement_errors_count',0)}") - - - if lang_dir_path_obj and lang_dir_path_obj.exists(): - print(f" Final directory location: {lang_dir_path_obj.name}") - elif lang_dir_newly_created_flags.get(lang_code) and not lang_dir_path_obj: - print(" Note: Empty newly created directory was removed as expected.") - elif not lang_dir_path_obj and status != "SETUP_ERROR": - print( - f" Note: Language directory '{config.LANG_DIR_TEMPLATE.format(lang=lang_code)}' may have been archived or removed.") - - print("=" * (40 + len(" Overall Script Summary "))) - print("\nScript finished. Please review changes and commit to Git if satisfied.") - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/tools/2_apply_docs_json.py b/tools/2_apply_docs_json.py deleted file mode 100644 index 92a40935..00000000 --- a/tools/2_apply_docs_json.py +++ /dev/null @@ -1,777 +0,0 @@ -import json -import os -import re -from collections import defaultdict - -# --- 配置 --- -refresh = True # 如果为 True,将清空指定版本的 tabs -DOCS_JSON_PATH = "docs.json" - -# --- 简体中文配置(docs_config) --- -PLUGIN_DEV_ZH = { - "DOCS_DIR": "plugin_dev_zh", # 插件开发文档目录 - "LANGUAGE_CODE": "简体中文", # 注意:虽然变量名是 LANGUAGE_CODE,但会部署为 docs.json 中的 'version' 值。 - "FILE_EXTENSION": ".zh.mdx", - "TARGET_TAB_NAME": "插件开发", # 新增:目标 Tab 名称 - "FILENAME_PATTERN": re.compile(r"^(\d{4})-(.*?)\.zh\.mdx$"), # 新增:文件名匹配模式 - "PWX_TO_GROUP_MAP": { - # --- PWX 到 Group 名称的映射 (统一到 "插件开发" Tab) --- - # (P, W, X) -> (tab_name, group_name, nested_group_name) - # Tab: 插件开发 - # Group: 概念与入门 - ("0", "1", "1"): ("插件开发", "概念与入门", "概览"), - ("0", "1", "3"): ("插件开发", "概念与入门", None), - # Group: 开发实践 - ("0", "2", "1"): ("插件开发", "开发实践", "快速开始"), - ("0", "2", "2"): ("插件开发", "开发实践", "开发 Dify 插件"), - # Group: 贡献与发布 - ("0", "3", "1"): ("插件开发", "贡献与发布", "行为准则与规范"), - ("0", "3", "2"): ("插件开发", "贡献与发布", "发布与上架"), - ("0", "3", "3"): ("插件开发", "贡献与发布", "常见问题解答"), - # Group: 实践案例与示例 - ("0", "4", "3"): ("插件开发", "实践案例与示例", "开发示例"), - # Group: 高级开发 - ("9", "2", "2"): ("插件开发", "高级开发", "Extension 与 Agent"), - ("9", "2", "3"): ("插件开发", "高级开发", "Extension 与 Agent"), - ("9", "4", "3"): ("插件开发", "高级开发", "Extension 与 Agent"), - ("9", "2", "4"): ("插件开发", "高级开发", "反向调用"), - # Group: Reference & Specifications - ("0", "4", "1"): ("插件开发", "Reference & Specifications", "核心规范与功能"), - }, - "DESIRED_GROUP_ORDER": [ - "概念与入门", - "开发实践", - "贡献与发布", - "实践案例与示例", - "高级开发", - "Reference & Specifications", # 确保这个在最后 - ], -} - -# --- English Configuration --- -PLUGIN_DEV_EN = { - "DOCS_DIR": "plugin_dev_en", # Plugin development documentation directory - # Note: Although the variable name is LANGUAGE_CODE, it will be deployed as the 'version' value in docs.json. - "LANGUAGE_CODE": "English", - "FILE_EXTENSION": ".en.mdx", - "TARGET_TAB_NAME": "Plugin Development", - "FILENAME_PATTERN": re.compile(r"^(\d{4})-(.*?)\.en\.mdx$"), - "PWX_TO_GROUP_MAP": { - # --- PWX to Group Name Mapping (Unified under the "Plugin Development" Tab) --- - # (P, W, X) -> (tab_name, group_name, nested_group_name) - # Tab: Plugin Development - # Group: Concepts & Getting Started - ("0", "1", "1"): ( - "Plugin Development", - "Concepts & Getting Started", - "Overview", - ), - ("0", "1", "3"): ("Plugin Development", "Concepts & Getting Started", None), - # Group: Development Practices - ("0", "2", "1"): ("Plugin Development", "Development Practices", "Quick Start"), - ("0", "2", "2"): ( - "Plugin Development", - "Development Practices", - "Developing Dify Plugins", - ), - # Group: Contribution & Publishing - ("0", "3", "1"): ( - "Plugin Development", - "Contribution & Publishing", - "Code of Conduct & Standards", - ), - ("0", "3", "2"): ( - "Plugin Development", - "Contribution & Publishing", - "Publishing & Listing", - ), - ("0", "3", "3"): ("Plugin Development", "Contribution & Publishing", "FAQ"), - # Group: Examples & Use Cases - ("0", "4", "3"): ( - "Plugin Development", - "Examples & Use Cases", - "Development Examples", - ), - # Group: Advanced Development - ("9", "2", "2"): ( - "Plugin Development", - "Advanced Development", - "Extension & Agent", - ), - ("9", "2", "3"): ( - "Plugin Development", - "Advanced Development", - "Extension & Agent", - ), - ("9", "4", "3"): ( - "Plugin Development", - "Advanced Development", - "Extension & Agent", - ), - ("9", "2", "4"): ( - "Plugin Development", - "Advanced Development", - "Reverse Calling", - ), - # Group: Reference & Specifications - ("0", "4", "1"): ( - "Plugin Development", - "Reference & Specifications", - "Core Specifications & Features", - ), - }, - "DESIRED_GROUP_ORDER": [ - "Concepts & Getting Started", - "Development Practices", - "Contribution & Publishing", - "Examples & Use Cases", - "Advanced Development", - "Reference & Specifications", # Ensure this is last - ], -} - -# --- 日本語設定 (Japanese Configuration) --- -PLUGIN_DEV_JA = { - "DOCS_DIR": "plugin_dev_ja", # プラグイン開発ドキュメントディレクトリ - "LANGUAGE_CODE": "日本語", # 注意:変数名は LANGUAGE_CODE ですが、docs.json の 'version' 値としてデプロイされます。 - "FILE_EXTENSION": ".ja.mdx", - "TARGET_TAB_NAME": "プラグイン開発", # 対象タブ名 - "FILENAME_PATTERN": re.compile( - r"^(\d{4})-(.*?)\.ja\.mdx$" - ), # ファイル名照合パターン - "PWX_TO_GROUP_MAP": { - # --- PWX からグループ名へのマッピング(「プラグイン開発」タブに統一)--- - # (P, W, X) -> (tab_name, group_name, nested_group_name) - # Tab: プラグイン開発 - # Group: 概念と概要 - ("0", "1", "1"): ("プラグイン開発", "概念と概要", "概要"), - ("0", "1", "3"): ("プラグイン開発", "概念と概要", None), - # Group: 開発実践 - ("0", "2", "1"): ("プラグイン開発", "開発実践", "クイックスタート"), - ("0", "2", "2"): ("プラグイン開発", "開発実践", "Difyプラグインの開発"), - # Group: 貢献と公開 - ("0", "3", "1"): ("プラグイン開発", "貢献と公開", "行動規範と基準"), - ("0", "3", "2"): ("プラグイン開発", "貢献と公開", "公開と掲載"), - ("0", "3", "3"): ("プラグイン開発", "貢献と公開", "よくある質問 (FAQ)"), - # Group: 実践例とユースケース - ("0", "4", "3"): ("プラグイン開発", "実践例とユースケース", "開発例"), - # Group: 高度な開発 - ("9", "2", "2"): ("プラグイン開発", "高度な開発", "Extension と Agent"), - ("9", "2", "3"): ("プラグイン開発", "高度な開発", "Extension と Agent"), - ("9", "4", "3"): ("プラグイン開発", "高度な開発", "Extension と Agent"), - ("9", "2", "4"): ( - "プラグイン開発", - "高度な開発", - "リバースコール", - ), # Reverse Calling - # Group: リファレンスと仕様 - ("0", "4", "1"): ("プラグイン開発", "リファレンスと仕様", "コア仕様と機能"), - }, - "DESIRED_GROUP_ORDER": [ - "概念と概要", - "開発実践", - "貢献と公開", - "実践例とユースケース", - "高度な開発", - "リファレンスと仕様", # これが最後になるように確認 - ], -} - - -# --- 辅助函数 --- - - -def clear_tabs_if_refresh(navigation_data, version_code, target_tab_name, do_refresh): - """如果 do_refresh 为 True,则查找指定版本和目标 Tab,并清空其 groups 列表""" - if not do_refresh: - return False # 未执行清空 - - if not navigation_data or "versions" not in navigation_data: - print("警告: 'navigation.versions' 未找到,无法清空 tabs。") - return False - - version_found = False - tab_found_and_cleared = False - for version_nav in navigation_data.get("versions", []): - if version_nav.get("version") == version_code: - version_found = True - target_tab = None - if "tabs" in version_nav and isinstance(version_nav["tabs"], list): - for tab in version_nav["tabs"]: - if isinstance(tab, dict) and tab.get("tab") == target_tab_name: - target_tab = tab - break - - if target_tab: - if "groups" in target_tab: - target_tab["groups"] = [] - print( - f"信息: 已清空版本 '{version_code}' 下 Tab '{target_tab_name}' 的 groups (因为 refresh=True)。" - ) - tab_found_and_cleared = True - else: - # 如果 'groups' 不存在,也视为一种“清空”状态,或者可以创建一个空的 - target_tab["groups"] = [] - print( - f"信息: 版本 '{version_code}' 下 Tab '{target_tab_name}' 没有 'groups' 键,已确保其为空列表 (因为 refresh=True)。" - ) - tab_found_and_cleared = True - else: - print( - f"警告: 在版本 '{version_code}' 中未找到目标 Tab '{target_tab_name}',无法清空其 groups。" - ) - break # 找到版本后即可退出循环 - - if not version_found: - print(f"警告: 未找到版本 '{version_code}',无法清空任何 Tab。") - return False - - return tab_found_and_cleared - - -def get_page_path( - filename, docs_config -): # docs_config 参数保留,但 FILE_EXTENSION 不再用于此处的后缀移除 - """从 mdx 文件名获取 mintlify 页面路径 (固定去掉末尾 .mdx 后缀)""" - docs_dir = docs_config["DOCS_DIR"] - # 固定移除末尾的 .mdx,以保留 .zh 或 .en 等语言标识 - if filename.endswith(".mdx"): - base_filename = filename[: -len(".mdx")] - else: - # 如果不以 .mdx 结尾,则引发错误,因为这是预期格式 - raise ValueError(f"错误: 文件名 '{filename}' 不以 '.mdx' 结尾,无法处理。") - - return os.path.join(docs_dir, base_filename) - - -def extract_existing_pages(navigation_data, version_code, target_tab_name): - """递归提取指定版本和目标 Tab 下所有已存在的页面路径""" - existing_pages = set() - target_version_nav = None - target_tab_nav = None # 新增:用于存储找到的目标 Tab 对象 - - if not navigation_data or "versions" not in navigation_data: - print("警告: 'navigation.versions' 未找到") - return existing_pages, None, None # 返回三个值 - - # 查找目标版本 - for version_nav in navigation_data.get("versions", []): - if version_nav.get("version") == version_code: - target_version_nav = version_nav - break - - if not target_version_nav: - print(f"警告: 版本 '{version_code}' 在 docs.json 中未找到") - return existing_pages, None, None # 返回三个值 - - # 在目标版本中查找目标 Tab - if "tabs" in target_version_nav and isinstance(target_version_nav["tabs"], list): - for tab in target_version_nav["tabs"]: - if isinstance(tab, dict) and tab.get("tab") == target_tab_name: - target_tab_nav = tab # 存储找到的 Tab 对象 - # 仅从目标 Tab 中提取页面 - for group in tab.get("groups", []): - if isinstance(group, dict): - _recursive_extract(group, existing_pages) - break # 找到目标 Tab 后即可退出循环 - else: # 'tabs' might not exist or not be a list - target_version_nav["tabs"] = [] - - if not target_tab_nav: - print( - f"警告: 在版本 '{version_code}' 中未找到 Tab '{target_tab_name}',无法提取现有页面。" - ) - # 即使 Tab 不存在,也返回版本导航对象,以便后续可能创建 Tab - return existing_pages, target_version_nav, None - - # 返回提取到的页面、版本导航对象和目标 Tab 对象 - return existing_pages, target_version_nav, target_tab_nav - - -def _recursive_extract(group_item, pages_set): - """递归辅助函数""" - # Ensure group_item is a dictionary before proceeding - if not isinstance(group_item, dict): - return - - if "pages" in group_item and isinstance(group_item["pages"], list): - for page in group_item["pages"]: - if isinstance(page, str): - pages_set.add(page) - elif isinstance(page, dict) and "group" in page: - # Recurse into nested groups - _recursive_extract(page, pages_set) - - -def remove_obsolete_pages(target_tab_data, pages_to_remove): - """递归地从目标 Tab 的 groups 结构中移除失效页面路径。 - 注意:此函数直接修改传入的 target_tab_data 字典。 - """ - if not isinstance(target_tab_data, dict) or "groups" not in target_tab_data: - # 如果输入不是预期的 Tab 结构,则直接返回 - return - - groups = target_tab_data.get("groups", []) - if not isinstance(groups, list): - # 如果 groups 不是列表,也无法处理 - return - - # 使用索引迭代以安全地移除项 - i = 0 - while i < len(groups): - group_item = groups[i] - if isinstance(group_item, dict): - # 递归处理 group 内部的 pages - _remove_obsolete_from_group(group_item, pages_to_remove) - # 如果处理后 group 的 pages 为空(且没有嵌套 group),可以选择移除该 group - # 当前逻辑:保留空 group 结构 - if not group_item.get("pages"): - print( - f"信息: Group '{group_item.get('group')}' 清理后为空,已保留结构。" - ) - i += 1 - else: - # 如果 groups 列表中包含非字典项(不符合预期),则跳过 - i += 1 - - -def _remove_obsolete_from_group(group_dict, pages_to_remove): - """辅助函数,递归处理单个 group 或 nested group 内的 pages""" - if not isinstance(group_dict, dict) or "pages" not in group_dict: - return - - pages = group_dict.get("pages", []) - if not isinstance(pages, list): - return - - new_pages = [] - for page_item in pages: - if isinstance(page_item, str): - if page_item not in pages_to_remove: - new_pages.append(page_item) - else: - print(f" - {page_item} (从 Group '{group_dict.get('group')}' 移除)") - elif isinstance(page_item, dict) and "group" in page_item: - # 递归处理嵌套的 group - _remove_obsolete_from_group(page_item, pages_to_remove) - # 保留嵌套 group 结构,即使它变空 - if page_item or page_item.get("pages"): # 检查字典是否为空或 pages 是否存在 - new_pages.append(page_item) - else: - print( - f"信息: 嵌套 Group '{page_item.get('group')}' 清理后为空,已保留结构。" - ) - new_pages.append(page_item) # 仍然添加空的嵌套组结构 - else: - # 保留无法识别的项 - new_pages.append(page_item) - group_dict["pages"] = new_pages - - -def find_or_create_target_group( - target_version_nav, tab_name, group_name, nested_group_name -): - # 注意:target_version_nav 是特定版本对象,例如 {"version": "简体中文", "tabs": [...]} - target_tab = None - # Ensure 'tabs' exists and is a list - if "tabs" not in target_version_nav or not isinstance( - target_version_nav["tabs"], list - ): - target_version_nav["tabs"] = [] - - for tab in target_version_nav["tabs"]: - if isinstance(tab, dict) and tab.get("tab") == tab_name: - target_tab = tab - break - if target_tab is None: - target_tab = {"tab": tab_name, "groups": []} - target_version_nav["tabs"].append(target_tab) - - target_group = None - # Ensure 'groups' exists and is a list - if "groups" not in target_tab or not isinstance(target_tab["groups"], list): - target_tab["groups"] = [] - - for group in target_tab["groups"]: - if isinstance(group, dict) and group.get("group") == group_name: - target_group = group - break - if target_group is None: - target_group = {"group": group_name, "pages": []} - target_tab["groups"].append(target_group) - - # Ensure 'pages' exists in the target_group and is a list - if "pages" not in target_group or not isinstance(target_group["pages"], list): - target_group["pages"] = [] - - # Default container is the top-level group's pages list - target_pages_container = target_group["pages"] - - if nested_group_name: - target_nested_group = None - # Find existing nested group - for item in target_group["pages"]: - if isinstance(item, dict) and item.get("group") == nested_group_name: - target_nested_group = item - # Ensure pages list exists in nested group - target_pages_container = target_nested_group.setdefault("pages", []) - # Ensure it's actually a list after setdefault - if not isinstance(target_pages_container, list): - target_nested_group["pages"] = [] - target_pages_container = target_nested_group["pages"] - break - # If not found, create it - if target_nested_group is None: - target_nested_group = {"group": nested_group_name, "pages": []} - # Check if target_group['pages'] is already the container we want to add to - # This logic assumes nested groups are *always* dicts within the parent's 'pages' list - target_group["pages"].append(target_nested_group) - target_pages_container = target_nested_group["pages"] - - # Final check before returning - if not isinstance(target_pages_container, list): - # 这表示内部逻辑错误,应该引发异常 - raise RuntimeError( - f"内部错误: 无法为 Tab='{tab_name}', Group='{group_name}', Nested='{nested_group_name}' 获取有效的 pages 列表。" - ) - - return target_pages_container - - -# --- 主逻辑 --- - - -def get_group_sort_key(group_dict, docs_config): - """为排序提供 key,根据 DESIRED_GROUP_ORDER 返回索引,未知组放在最后""" - group_name = group_dict.get("group", "") - desired_order = docs_config["DESIRED_GROUP_ORDER"] - try: - return desired_order.index(group_name) - except ValueError: - return len(desired_order) # 将未在列表中的组排在最后 - - -def main( - docs_config, navigation_data -): # navigation_data: 传入内存中的 navigation 字典供直接修改 - """处理单个文档配置,并直接修改传入的 navigation_data""" - print( - f"\n--- 开始处理版本: {docs_config['LANGUAGE_CODE']} / Tab: {docs_config['TARGET_TAB_NAME']} ---" - ) - - # 从 docs_config 获取配置值 - language_code = docs_config["LANGUAGE_CODE"] - docs_dir = docs_config["DOCS_DIR"] - file_extension = docs_config["FILE_EXTENSION"] - pwx_to_group_map = docs_config["PWX_TO_GROUP_MAP"] - filename_pattern = docs_config["FILENAME_PATTERN"] # 使用配置中的 pattern - target_tab_name = docs_config["TARGET_TAB_NAME"] # 使用配置中的 tab name - - # 1. 清理或准备版本导航 (不再加载 JSON,直接使用传入的 navigation_data) - navigation = navigation_data # 使用传入的 navigation 对象进行操作 - - # 使用 language_code 和 target_tab_name 清理目标 Tab - was_refreshed = clear_tabs_if_refresh( - navigation, language_code, target_tab_name, refresh - ) - if was_refreshed: - print(f"继续执行 Tab '{target_tab_name}' 的后续页面提取和添加操作...") - - # 2. 提取目标 Tab 的现有页面或创建版本/Tab 导航 - existing_pages, target_version_nav, target_tab_nav = extract_existing_pages( - navigation, language_code, target_tab_name - ) - - if target_version_nav is None: - print(f"信息:在导航数据中未找到版本 '{language_code}',将创建。") - if "versions" not in navigation: # 确保 versions 列表存在 - navigation["versions"] = [] - target_version_nav = {"version": language_code, "tabs": []} - navigation["versions"].append(target_version_nav) - existing_pages = set() - target_tab_nav = None # 版本是新建的,Tab 肯定不存在 - - # 如果目标 Tab 不存在,需要创建它 - if target_tab_nav is None: - print( - f"信息: 在版本 '{language_code}' 中未找到 Tab '{target_tab_name}',将创建。" - ) - target_tab_nav = {"tab": target_tab_name, "groups": []} - # 确保 target_version_nav['tabs'] 是列表 - if "tabs" not in target_version_nav or not isinstance( - target_version_nav["tabs"], list - ): - target_version_nav["tabs"] = [] - target_version_nav["tabs"].append(target_tab_nav) - existing_pages = set() # 新 Tab 没有现有页面 - - print( - f"找到 {len(existing_pages)} 个已存在的页面 (版本: '{language_code}', Tab: '{target_tab_name}')。" - ) - - # 3. 扫描文件系统 (这部分不变,扫描目录下的所有匹配文件) - filesystem_pages = set() - valid_files = [] - if not os.path.isdir(docs_dir): - # 如果目录不存在,则无法继续处理此配置,引发错误 - raise FileNotFoundError( - f"错误: 配置 '{language_code}' 的文档目录 '{docs_dir}' 不存在。" - ) - else: - for filename in os.listdir(docs_dir): - # 使用配置中的 filename_pattern - if filename.endswith(file_extension) and filename_pattern.match(filename): - try: # 添加 try-except 块以捕获 get_page_path 可能引发的 ValueError - page_path = get_page_path(filename, docs_config) - filesystem_pages.add(page_path) - valid_files.append(filename) - except ValueError as e: - # 从 get_page_path 捕获到错误,打印并继续处理其他文件,或重新引发以停止 - print(f"错误处理文件 '{filename}': {e}。将跳过此文件。") - # 如果希望停止整个过程,取消注释下一行: - # raise e - print(f"在 '{docs_dir}' 找到 {len(filesystem_pages)} 个有效的文档文件。") - - # 4. 计算差异 (相对于目标 Tab 的 existing_pages) - new_files_paths = filesystem_pages - existing_pages - removed_files_paths = existing_pages - filesystem_pages - - print(f"新增文件数 (相对于 Tab '{target_tab_name}'): {len(new_files_paths)}") - print(f"移除文件数 (相对于 Tab '{target_tab_name}'): {len(removed_files_paths)}") - - # 5. 移除失效页面 (仅从目标 Tab 移除) - if removed_files_paths and target_tab_nav: # 确保目标 Tab 存在 - print(f"正在从 Tab '{target_tab_name}' 移除失效页面...") - remove_obsolete_pages( - target_tab_nav, removed_files_paths - ) # 直接传入目标 Tab 对象 - print(f"已处理从 Tab '{target_tab_name}' 移除: {removed_files_paths}") - elif removed_files_paths: - print( - f"警告: 存在失效页面 {removed_files_paths},但未找到目标 Tab '{target_tab_name}' 进行移除。" - ) - - # 6. 添加新页面 (逻辑不变,但 find_or_create_target_group 会确保添加到正确的 Tab 和 Group) - if new_files_paths: - print(f"正在向 Tab '{target_tab_name}' 添加新页面...") - new_files_sorted = sorted( - [f for f in valid_files if get_page_path(f, docs_config) in new_files_paths] - ) - - groups_to_add = defaultdict(list) - for filename in new_files_sorted: - match = filename_pattern.match(filename) # 使用配置中的 pattern - if match: - pwxy = match.group(1) - if len(pwxy) >= 3: - p, w, x = pwxy[0], pwxy[1], pwxy[2] - try: # 包裹 get_page_path 调用 - page_path = get_page_path(filename, docs_config) - except ValueError as e: - print( - f"错误处理文件 '{filename}' (添加阶段): {e}。将跳过此文件。" - ) - continue # 跳过这个文件 - - group_key = (p, w, x) - if group_key in pwx_to_group_map: - map_result = pwx_to_group_map[group_key] - current_tab_name_from_map = map_result[0] - # 强制使用配置的目标 Tab 名称 - if current_tab_name_from_map != target_tab_name: - print( - f"警告: 文件 '{filename}' 根据 PWX 映射到 Tab '{current_tab_name_from_map}',但当前配置强制处理 Tab '{target_tab_name}'。将添加到 '{target_tab_name}'。" - ) - # 始终使用配置中定义的 target_tab_name - tab_name_to_use = target_tab_name - - if len(map_result) == 3: - _, group_name, nested_group_name = map_result - else: # 兼容旧格式或只有两项的情况 - if len(map_result) >= 2: - _, group_name = map_result[:2] # 取前两项 - else: - # 处理 map_result 项数不足的情况 - print( - f"错误: PWX_TO_GROUP_MAP 中键 '{group_key}' 的值 '{map_result}' 格式不正确,至少需要两项。跳过文件 '{filename}'。" - ) - continue - nested_group_name = None # 假设没有嵌套组 - - groups_to_add[ - (tab_name_to_use, group_name, nested_group_name) - ].append(page_path) - else: - print( - f"警告: 文件 '{filename}' 的 PWX 前缀 ('{p}', '{w}', '{x}') 在 PWX_TO_GROUP_MAP 中没有找到映射,将跳过添加。" - ) - else: - # 数字前缀不足3位是文件名格式错误,应引发异常 - raise ValueError( - f"错误: 文件 '{filename}' 的数字前缀 '{pwxy}' 不足3位,无法解析 PWX。" - ) - - for ( - tab_name, - group_name, - nested_group_name, - ), pages_to_append in groups_to_add.items(): - # 确保只添加到目标 Tab 下 (此检查现在是多余的,因为上面强制使用了 target_tab_name) - # if tab_name == target_tab_name: - print( - f" 添加到 Tab='{tab_name}', Group='{group_name}', Nested='{nested_group_name or '[无]'}' : {len(pages_to_append)} 个页面" - ) - # find_or_create_target_group 现在需要 target_version_nav 来定位或创建 Tab - target_pages_list = find_or_create_target_group( - # tab_name 此时应等于 target_tab_name - target_version_nav, - tab_name, - group_name, - nested_group_name, - ) - - if isinstance(target_pages_list, list): - for new_page in pages_to_append: - if new_page not in target_pages_list: - target_pages_list.append(new_page) - print(f" + {new_page}") - else: - # find_or_create_target_group 内部出错时会抛出 RuntimeError - # 这里可以加日志,但理论上不应到达 - print( - f"错误: 未能为 Tab='{tab_name}', Group='{group_name}', Nested='{nested_group_name}' 获取有效的 pages 列表进行添加。" - ) - # else: # 这个 else 分支现在不会被触发 - # print(f"信息: 跳过向非目标 Tab '{tab_name}' 添加页面 (目标 Tab: '{target_tab_name}')。") - - # <-- 排序 Group (仅排序目标 Tab 内的 Group) --> - print(f"正在排序 Tab '{target_tab_name}' 内的 Group...") - if target_tab_nav and "groups" in target_tab_nav: # 确保目标 Tab 和 groups 存在 - groups_list = [g for g in target_tab_nav["groups"] if isinstance(g, dict)] - groups_list.sort(key=lambda g: get_group_sort_key(g, docs_config)) - target_tab_nav["groups"] = groups_list - print(f" 已对 Tab '{target_tab_name}' 中的 Group 进行排序。") - elif target_tab_nav: - print(f" Tab '{target_tab_name}' 中没有 'groups' 或为空,无需排序。") - else: - print(f" 未找到 Tab '{target_tab_name}',无法排序 Group。") - - # 不再返回 docs_data,因为直接修改了传入的 navigation_data - print( - f"--- 完成处理版本: {docs_config['LANGUAGE_CODE']} / Tab: {docs_config['TARGET_TAB_NAME']} ---" - ) - - -def load_docs_data(path): - """加载 JSON 文件,处理文件不存在和格式错误的情况""" - try: - with open(path, "r", encoding="utf-8") as f: - return json.load(f) - except FileNotFoundError: - print(f"信息: {path} 未找到,将创建新的结构。") - return {"navigation": {"versions": []}} # 返回初始结构 - except json.JSONDecodeError as e: - # 引发更具体的错误,而不是返回 None - raise json.JSONDecodeError( - f"错误: {path} 格式错误。无法继续。- {e.msg}", e.doc, e.pos - ) - - -def save_docs_data(path, data): - """保存 JSON 数据到文件""" - try: - with open(path, "w", encoding="utf-8") as f: - json.dump(data, f, ensure_ascii=False, indent=4) - print(f"\n成功更新 {path},包含所有已处理的版本。") - # 不再需要返回 True/False,因为异常会处理失败情况 - except IOError as e: - # 引发 IO 错误 - raise IOError(f"错误: 无法写入 {path} - {e}") - except Exception as e: - # 引发其他未知错误 - raise Exception(f"写入 {path} 时发生未知错误: {e}") - - -def process_configurations(configs, docs_path): - """加载数据,处理所有有效配置,然后保存数据""" - # 1. 加载初始数据 - try: - current_docs_data = load_docs_data(docs_path) - except json.JSONDecodeError as e: - print(e) # 打印加载错误信息 - return # 加载失败则退出 - # current_docs_data 不会是 None,因为 load_docs_data 要么返回数据要么引发异常 - - # 2. 确保基本结构存在 - navigation_data = current_docs_data.setdefault( - "navigation", {} - ) # 获取 navigation 字典 - navigation_data.setdefault("versions", []) - - # 3. 筛选有效配置 - valid_configs = [] - for config in configs: - required_keys = [ - "DOCS_DIR", - "LANGUAGE_CODE", - "FILE_EXTENSION", - "PWX_TO_GROUP_MAP", - "DESIRED_GROUP_ORDER", - "TARGET_TAB_NAME", - "FILENAME_PATTERN", - ] - if all(k in config for k in required_keys): - # 可选:检查 PWX_TO_GROUP_MAP 和 DESIRED_GROUP_ORDER 是否为空 - # 并且检查 FILENAME_PATTERN 是否是编译后的正则表达式对象 - if ( - config.get("PWX_TO_GROUP_MAP") - and config.get("DESIRED_GROUP_ORDER") - and isinstance(config.get("FILENAME_PATTERN"), re.Pattern) - ): - valid_configs.append(config) - else: - reason = [] - if not config.get("PWX_TO_GROUP_MAP"): - reason.append("PWX_TO_GROUP_MAP 为空或不存在") - if not config.get("DESIRED_GROUP_ORDER"): - reason.append("DESIRED_GROUP_ORDER 为空或不存在") - if not isinstance(config.get("FILENAME_PATTERN"), re.Pattern): - reason.append("FILENAME_PATTERN 不是有效的正则表达式对象") - print( - f"警告: 配置 {config.get('LANGUAGE_CODE', '未知')} 无效 ({'; '.join(reason)}),跳过处理。" - ) - else: - missing_keys = [k for k in required_keys if k not in config] - print( - f"警告: 配置 {config.get('LANGUAGE_CODE', '未知')} 不完整 (缺少: {', '.join(missing_keys)}),跳过处理。" - ) - - # 4. 处理有效配置 - if not valid_configs: - print("没有有效的配置可供处理。") - else: - try: # 包裹所有配置的处理过程 - for config in valid_configs: - # 将 navigation_data 传递给 main 函数进行修改 - # main 函数会直接修改这个 navigation_data 字典 - main(config, navigation_data) - - # 5. 所有配置处理完毕后,统一写回文件 - save_docs_data(docs_path, current_docs_data) - except (FileNotFoundError, ValueError, RuntimeError, IOError, Exception) as e: - # 捕获 main 或 save_docs_data 中可能引发的已知错误 - print(f"\n处理过程中发生错误: {e}") - print("操作已终止,文件可能未完全更新。") - # 根据需要,可以在这里决定是否尝试保存部分结果或直接退出 - - -if __name__ == "__main__": - # 定义要处理的配置列表 - CONFIGS_TO_PROCESS = [ - PLUGIN_DEV_ZH, - PLUGIN_DEV_EN, - PLUGIN_DEV_JA, - ] - - # 调用主处理函数 - process_configurations(CONFIGS_TO_PROCESS, DOCS_JSON_PATH) diff --git a/tools/apply_docs_json.py b/tools/apply_docs_json.py new file mode 100644 index 00000000..8bce8a0c --- /dev/null +++ b/tools/apply_docs_json.py @@ -0,0 +1,698 @@ +import json +import os +import re +from collections import defaultdict +from pathlib import Path +import sys # Import sys for system-specific parameters and functions, e.g., sys.exit() + +# --- Script Base Paths --- +SCRIPT_DIR = Path(__file__).resolve().parent +BASE_DIR = SCRIPT_DIR.parent + +# --- Configuration --- +refresh = False # Flag to control whether to clear existing tabs before processing +DOCS_JSON_PATH = BASE_DIR / "docs.json" # Path to the main documentation structure JSON file + +# --- Language Configurations --- +# These configurations define how documentation files for different languages are processed. +# IMPORTANT: The string values for LANGUAGE_CODE, TARGET_TAB_NAME, and content within +# PWX_TO_GROUP_MAP and DESIRED_GROUP_ORDER are i18n-specific and MUST NOT be translated. +PLUGIN_DEV_ZH = { + "DOCS_DIR_RELATIVE": "plugin_dev_zh", "LANGUAGE_CODE": "简体中文", "FILE_EXTENSION_SUFFIX": ".zh", + "TARGET_TAB_NAME": "插件开发", "FILENAME_PATTERN": re.compile(r"^(\d{4})-(.*?)\.zh\.mdx$"), + "PWX_TO_GROUP_MAP": { # Maps (P, W, X) prefixes from filenames to (Tab Name, Group Name, Optional Nested Group Name) + ("0", "1", "1"): ("插件开发", "概念与入门", "概览"), ("0", "1", "3"): ("插件开发", "概念与入门", None), + ("0", "2", "1"): ("插件开发", "开发实践", "快速开始"),("0", "2", "2"): ("插件开发", "开发实践", "开发 Dify 插件"), + ("0", "3", "1"): ("插件开发", "贡献与发布", "行为准则与规范"),("0", "3", "2"): ("插件开发", "贡献与发布", "发布与上架"),("0", "3", "3"): ("插件开发", "贡献与发布", "常见问题解答"), + ("0", "4", "3"): ("插件开发", "实践案例与示例", "开发示例"), + ("9", "2", "2"): ("插件开发", "高级开发", "Extension 与 Agent"),("9", "2", "3"): ("插件开发", "高级开发", "Extension 与 Agent"),("9", "4", "3"): ("插件开发", "高级开发", "Extension 与 Agent"),("9", "2", "4"): ("插件开发", "高级开发", "反向调用"), + ("0", "4", "1"): ("插件开发", "Reference & Specifications", "核心规范与功能"), + }, + "DESIRED_GROUP_ORDER": ["概念与入门", "开发实践", "贡献与发布", "实践案例与示例", "高级开发", "Reference & Specifications"], +} +PLUGIN_DEV_EN = { + "DOCS_DIR_RELATIVE": "plugin_dev_en", "LANGUAGE_CODE": "English", "FILE_EXTENSION_SUFFIX": ".en", + "TARGET_TAB_NAME": "Plugin Development", "FILENAME_PATTERN": re.compile(r"^(\d{4})-(.*?)\.en\.mdx$"), + "PWX_TO_GROUP_MAP": { + ("0", "1", "1"): ("Plugin Development", "Concepts & Getting Started", "Overview"),("0", "1", "3"): ("Plugin Development", "Concepts & Getting Started", None), + ("0", "2", "1"): ("Plugin Development", "Development Practices", "Quick Start"),("0", "2", "2"): ("Plugin Development", "Development Practices", "Developing Dify Plugins"), + ("0", "3", "1"): ("Plugin Development", "Contribution & Publishing", "Code of Conduct & Standards"),("0", "3", "2"): ("Plugin Development", "Contribution & Publishing", "Publishing & Listing"),("0", "3", "3"): ("Plugin Development", "Contribution & Publishing", "FAQ"), + ("0", "4", "3"): ("Plugin Development", "Examples & Use Cases", "Development Examples"), + ("9", "2", "2"): ("Plugin Development", "Advanced Development", "Extension & Agent"),("9", "2", "3"): ("Plugin Development", "Advanced Development", "Extension & Agent"),("9", "4", "3"): ("Plugin Development", "Advanced Development", "Extension & Agent"),("9", "2", "4"): ("Plugin Development", "Advanced Development", "Reverse Calling"), + ("0", "4", "1"): ("Plugin Development", "Reference & Specifications", "Core Specifications & Features"), + }, + "DESIRED_GROUP_ORDER": ["Concepts & Getting Started", "Development Practices", "Contribution & Publishing", "Examples & Use Cases", "Advanced Development", "Reference & Specifications"], +} +PLUGIN_DEV_JA = { + "DOCS_DIR_RELATIVE": "plugin_dev_ja", "LANGUAGE_CODE": "日本語", "FILE_EXTENSION_SUFFIX": ".ja", + "TARGET_TAB_NAME": "プラグイン開発", "FILENAME_PATTERN": re.compile(r"^(\d{4})-(.*?)\.ja\.mdx$"), + "PWX_TO_GROUP_MAP": { + ("0", "1", "1"): ("プラグイン開発", "概念と概要", "概要"),("0", "1", "3"): ("プラグイン開発", "概念と概要", None), + ("0", "2", "1"): ("プラグイン開発", "開発実践", "クイックスタート"),("0", "2", "2"): ("プラグイン開発", "開発実践", "Difyプラグインの開発"), + ("0", "3", "1"): ("プラグイン開発", "貢献と公開", "行動規範と基準"),("0", "3", "2"): ("プラグイン開発", "貢献と公開", "公開と掲載"),("0", "3", "3"): ("プラグイン開発", "貢献と公開", "よくある質問 (FAQ)"), + ("0", "4", "3"): ("プラグイン開発", "実践例とユースケース", "開発例"), + ("9", "2", "2"): ("プラグイン開発", "高度な開発", "Extension と Agent"),("9", "2", "3"): ("プラグイン開発", "高度な開発", "Extension と Agent"),("9", "4", "3"): ("プラグイン開発", "高度な開発", "Extension と Agent"),("9", "2", "4"): ("プラグイン開発", "高度な開発", "リバースコール"), + ("0", "4", "1"): ("プラグイン開発", "リファレンスと仕様", "コア仕様と機能"), + }, + "DESIRED_GROUP_ORDER": ["概念と概要", "開発実践", "貢献と公開", "実践例とユースケース", "高度な開発", "リファレンスと仕様"], +} + +# --- Helper Functions --- + +# Defines log issue types considered critical enough to be included in the commit message summary. +CRITICAL_ISSUE_TYPES = {"Error", "Critical", "ConfigError", "SeriousWarning"} + +def _log_issue(reports_list_for_commit_message: list, lang_code: str, issue_type: str, message: str, details: str = ""): + """ + Logs a detailed message to the console and adds a concise version to a list for commit messages + if the issue_type is critical. + + Args: + reports_list_for_commit_message: List to accumulate messages for the commit summary. + lang_code: Language code or identifier for the context of the log (e.g., "简体中文", "GLOBAL"). + issue_type: Type of the issue (e.g., "Info", "Warning", "Error", "Critical"). + message: The main message of the log. + details: Optional additional details for the log. + """ + full_log_message = f"[{issue_type.upper()}] Lang '{lang_code}': {message}" + if details: + full_log_message += f" Details: {details}" + print(full_log_message) # Always print the detailed log message to console. + + if issue_type in CRITICAL_ISSUE_TYPES: + # Prepare a more concise message for the commit summary. + commit_msg_part = f"- Lang '{lang_code}': [{issue_type}] {message}" + reports_list_for_commit_message.append(commit_msg_part) + # INFO and non-critical Warning logs are only printed to console, not added to the commit summary list. + + +# Note: The following helper functions call `_log_issue`. Their docstrings will describe their primary purpose. +# The `commit_message_reports_list` parameter passed to them is for `_log_issue`. + +def clear_tabs_if_refresh(navigation_data: dict, version_code: str, target_tab_name: str, do_refresh: bool, commit_message_reports_list: list) -> bool: + """ + Clears groups within a specific tab in the navigation data if `do_refresh` is True. + + Args: + navigation_data: The main navigation data structure. + version_code: The language code or version identifier (e.g., "简体中文"). + target_tab_name: The name of the tab to clear. + do_refresh: Boolean flag; if True, groups in the tab will be cleared. + commit_message_reports_list: List for accumulating critical issue messages. + + Returns: + True if the tab was found and cleared, False otherwise. + """ + if not do_refresh: + return False + if not navigation_data or "versions" not in navigation_data: + _log_issue(commit_message_reports_list, version_code, "Warning", "'navigation.versions' not found, cannot clear tabs.") + return False + + version_found, tab_cleared = False, False + for version_nav in navigation_data.get("versions", []): + if version_nav.get("version") == version_code: + version_found = True + target_tab = next((t for t in version_nav.get("tabs", []) if isinstance(t, dict) and t.get("tab") == target_tab_name), None) + if target_tab: + target_tab["groups"] = [] + _log_issue(commit_message_reports_list, version_code, "Info", f"Cleared groups for Tab '{target_tab_name}'.") + tab_cleared = True + else: + # This might be an Info log if we expect the tab to be created later. + # If refresh implies the tab must exist, it could be a Warning. + _log_issue(commit_message_reports_list, version_code, "Info", f"Tab '{target_tab_name}' not found to clear groups (will be created if needed).") + break + if not version_found: + _log_issue(commit_message_reports_list, version_code, "Warning", f"Version '{version_code}' not found, cannot clear any Tab.") + return tab_cleared + +def get_page_path_from_filename(filename: str, docs_dir_name: str) -> str: + """ + Constructs the documentation page path from its filename and directory name. + Example: "0001-intro.en.mdx", "plugin_dev_en" -> "plugin_dev_en/0001-intro.en" + + Args: + filename: The .mdx filename (e.g., "0001-intro.en.mdx"). + docs_dir_name: The relative directory name for this set of docs (e.g., "plugin_dev_en"). + + Returns: + The page path string used in docs.json. + + Raises: + ValueError: If the filename does not end with ".mdx" (internal error if this happens). + """ + if not filename.endswith(".mdx"): + # This case should ideally be filtered out before calling this function. + # If it reaches here, it indicates an internal logic error. + raise ValueError(f"Internal Error: Filename '{filename}' received by get_page_path_from_filename does not end with '.mdx'.") + base_filename = filename[:-len(".mdx")] # Remove ".mdx" + return f"{docs_dir_name}/{base_filename}" + + +def extract_existing_pages(navigation_data: dict, version_code: str, target_tab_name: str, commit_message_reports_list: list): + """ + Extracts all existing page paths from a specific tab within a version in the navigation data. + + Args: + navigation_data: The main navigation data structure. + version_code: The language code or version identifier. + target_tab_name: The name of the tab to extract pages from. + commit_message_reports_list: List for accumulating critical issue messages (passed to helpers, not used directly). + + Returns: + A tuple: (set_of_existing_page_paths, target_version_nav_dict, target_tab_nav_dict). + Returns (set(), None, None) if the version or tab is not found. + """ + existing_pages = set() + target_version_nav, target_tab_nav = None, None + + if not navigation_data or "versions" not in navigation_data: + return existing_pages, None, None # No versions structure, so no pages + + target_version_nav = next((v for v in navigation_data.get("versions", []) if v.get("version") == version_code), None) + if not target_version_nav: + return existing_pages, None, None # Version not found + + if "tabs" in target_version_nav and isinstance(target_version_nav["tabs"], list): + target_tab_nav = next((t for t in target_version_nav["tabs"] if isinstance(t,dict) and t.get("tab") == target_tab_name), None) + if target_tab_nav: + for group in target_tab_nav.get("groups", []): + if isinstance(group, dict): + _recursive_extract(group, existing_pages) + + return existing_pages, target_version_nav, target_tab_nav + +def _recursive_extract(group_item: dict, pages_set: set): + """ + Recursively extracts page paths from a group item and its nested groups. + (Helper for extract_existing_pages). + + Args: + group_item: A dictionary representing a group, which may contain pages or nested groups. + pages_set: A set to which extracted page paths are added. + """ + if not isinstance(group_item, dict): return # Safety check + for page in group_item.get("pages", []): + if isinstance(page, str): + pages_set.add(page) + elif isinstance(page, dict) and "group" in page: # It's a nested group + _recursive_extract(page, pages_set) + + +def remove_obsolete_pages(target_tab_data: dict, pages_to_remove: set, commit_message_reports_list: list, lang_code: str): + """ + Removes obsolete page paths from the groups within the target tab data. + Modifies `target_tab_data` in place. + + Args: + target_tab_data: The dictionary for the specific tab being processed. + pages_to_remove: A set of page path strings that should be removed. + commit_message_reports_list: List for accumulating critical issue messages. + lang_code: Language code for logging purposes. + """ + if not isinstance(target_tab_data, dict) or "groups" not in target_tab_data or not isinstance(target_tab_data.get("groups"), list): + _log_issue(commit_message_reports_list, lang_code, "Warning", "Attempted to remove obsolete pages from invalid target_tab_data structure.", f"Tab data: {target_tab_data}") + return + + groups = target_tab_data["groups"] + i = 0 + while i < len(groups): # Iterate with index to handle potential removal of empty groups (currently retains structure) + group_item = groups[i] + if isinstance(group_item, dict): + _remove_obsolete_from_group(group_item, pages_to_remove, commit_message_reports_list, lang_code) + if not group_item.get("pages"): # Check if the group became empty + _log_issue(commit_message_reports_list, lang_code, "Info", f"Group '{group_item.get('group', 'Unknown')}' emptied after removing obsolete pages; structure retained.") + i += 1 + else: + _log_issue(commit_message_reports_list, lang_code, "Warning", f"Encountered non-dict item in groups list of Tab '{target_tab_data.get('tab','Unknown')}' during obsolete page removal. Item: {group_item}") + i += 1 + +def _remove_obsolete_from_group(group_dict: dict, pages_to_remove: set, commit_message_reports_list: list, lang_code: str): + """ + Recursively removes obsolete page paths from a group dictionary and its nested groups. + Modifies `group_dict` in place. (Helper for remove_obsolete_pages). + + Args: + group_dict: The dictionary representing a group. + pages_to_remove: A set of page path strings to remove. + commit_message_reports_list: List for accumulating critical issue messages. + lang_code: Language code for logging. + """ + if not isinstance(group_dict, dict) or "pages" not in group_dict or not isinstance(group_dict.get("pages"), list): + group_name_for_log_err = group_dict.get('group', 'Unnamed Group with structural issue') if isinstance(group_dict, dict) else 'Non-dict item' + _log_issue(commit_message_reports_list, lang_code, "Warning", f"Group '{group_name_for_log_err}' has invalid 'pages' structure; cannot remove obsolete pages from it. Structure: {group_dict}") + return + + new_pages = [] + group_name_for_log = group_dict.get('group', 'Unknown') # For logging context + for page_item in group_dict["pages"]: + if isinstance(page_item, str): # It's a page path + if page_item not in pages_to_remove: + new_pages.append(page_item) + else: + _log_issue(commit_message_reports_list, lang_code, "Info", f"Removed obsolete page '{page_item}' from Group '{group_name_for_log}'.") + elif isinstance(page_item, dict) and "group" in page_item: # It's a nested group + _remove_obsolete_from_group(page_item, pages_to_remove, commit_message_reports_list, lang_code) + # Retain nested group even if it becomes empty. + if page_item.get("pages"): + new_pages.append(page_item) + else: + _log_issue(commit_message_reports_list, lang_code, "Info", f"Nested group '{page_item.get('group', 'Unknown')}' in Group '{group_name_for_log}' emptied; structure retained.") + new_pages.append(page_item) # Still append the empty nested group structure + else: # Unknown item type, preserve it + _log_issue(commit_message_reports_list, lang_code, "Warning", f"Encountered unexpected item type in 'pages' list of Group '{group_name_for_log}'. Preserving item: {page_item}") + new_pages.append(page_item) + group_dict["pages"] = new_pages + + +def find_or_create_target_group(target_version_nav: dict, tab_name: str, group_name: str, nested_group_name: str | None, commit_message_reports_list: list, lang_code: str) -> list: + """ + Finds or creates the target group (and nested group, if specified) within the navigation data + and returns the 'pages' list where new pages should be added. + Modifies `target_version_nav` in place by adding new structures if they don't exist. + + Args: + target_version_nav: The dictionary for the specific version being processed. + tab_name: The name of the target tab. + group_name: The name of the primary group. + nested_group_name: The name of the nested group (optional, can be None). + commit_message_reports_list: List for accumulating critical issue messages. + lang_code: Language code for logging. + + Returns: + The 'pages' list (mutable) of the target group or nested group. + """ + target_version_nav.setdefault("tabs", []) + if not isinstance(target_version_nav["tabs"], list): + _log_issue(commit_message_reports_list, lang_code, "Critical", f"Internal state error: version.tabs is not a list for version '{target_version_nav.get('version')}'. Attempting to recover by creating a new list.") + target_version_nav["tabs"] = [] + + target_tab = next((t for t in target_version_nav["tabs"] if isinstance(t,dict) and t.get("tab") == tab_name), None) + if not target_tab: + target_tab = {"tab": tab_name, "groups": []} + target_version_nav["tabs"].append(target_tab) + _log_issue(commit_message_reports_list, lang_code, "Info", f"Created new Tab '{tab_name}'.") + + target_tab.setdefault("groups", []) + if not isinstance(target_tab["groups"], list): + _log_issue(commit_message_reports_list, lang_code, "Critical", f"Internal state error: tab.groups is not a list for Tab '{tab_name}'. Attempting to recover.") + target_tab["groups"] = [] + + target_group = next((g for g in target_tab["groups"] if isinstance(g,dict) and g.get("group") == group_name), None) + if not target_group: + target_group = {"group": group_name, "pages": []} + target_tab["groups"].append(target_group) + _log_issue(commit_message_reports_list, lang_code, "Info", f"Created new Group '{group_name}' in Tab '{tab_name}'.") + + target_group.setdefault("pages", []) + if not isinstance(target_group["pages"], list): + _log_issue(commit_message_reports_list, lang_code, "Critical", f"Internal state error: group.pages is not a list for Group '{group_name}'. Attempting to recover.") + target_group["pages"] = [] + + container_for_pages = target_group["pages"] + + if nested_group_name: + nested_group = next((item for item in target_group["pages"] if isinstance(item, dict) and item.get("group") == nested_group_name), None) + if not nested_group: + nested_group = {"group": nested_group_name, "pages": []} + target_group["pages"].append(nested_group) + _log_issue(commit_message_reports_list, lang_code, "Info", f"Created new Nested Group '{nested_group_name}' in Group '{group_name}'.") + + nested_group.setdefault("pages", []) + if not isinstance(nested_group["pages"], list): + _log_issue(commit_message_reports_list, lang_code, "Critical", f"Internal state error: nested_group.pages is not a list for Nested Group '{nested_group_name}'. Attempting to recover.") + nested_group["pages"] = [] + container_for_pages = nested_group["pages"] + + return container_for_pages + +def get_group_sort_key(group_dict: dict, desired_order_list: list) -> int: + """ + Calculates a sort key for a group based on its desired order. + Groups not in the desired_order_list will be placed at the end. + + Args: + group_dict: The group dictionary, expected to have a "group" key with its name. + desired_order_list: A list of group names in their desired display order. + + Returns: + An integer sort key. Lower numbers sort earlier. + """ + group_name = group_dict.get("group", "") + try: + return desired_order_list.index(group_name) + except ValueError: + return len(desired_order_list) + +# --- Main Logic --- +def process_single_config(docs_config: dict, navigation_data: dict, commit_message_reports_list: list): + """ + Processes a single language/documentation configuration. + It updates the `navigation_data` by adding new pages, removing obsolete ones, + and structuring them according to the configuration. + + Args: + docs_config: A dictionary containing the configuration for a specific documentation set (e.g., PLUGIN_DEV_EN). + navigation_data: The mutable main navigation data structure (specifically, the 'navigation' dict from docs_data). + commit_message_reports_list: List for accumulating critical issue messages. + """ + lang_code = docs_config["LANGUAGE_CODE"] + docs_dir_relative = docs_config["DOCS_DIR_RELATIVE"] + docs_dir_abs = BASE_DIR / docs_dir_relative + pwx_map = docs_config["PWX_TO_GROUP_MAP"] + filename_pattern = docs_config["FILENAME_PATTERN"] + target_tab_name = docs_config["TARGET_TAB_NAME"] + desired_group_order = docs_config["DESIRED_GROUP_ORDER"] + + _log_issue(commit_message_reports_list, lang_code, "Info", f"Processing Tab '{target_tab_name}'. Docs dir: '{docs_dir_abs}'") + + clear_tabs_if_refresh(navigation_data, lang_code, target_tab_name, refresh, commit_message_reports_list) + + existing_pages, target_version_nav, target_tab_nav = extract_existing_pages(navigation_data, lang_code, target_tab_name, commit_message_reports_list) + + if target_version_nav is None: + _log_issue(commit_message_reports_list, lang_code, "Info", f"Version '{lang_code}' not found in docs.json, creating it.") + navigation_data.setdefault("versions", []) + if not isinstance(navigation_data["versions"], list): + _log_issue(commit_message_reports_list, lang_code, "Critical", "Top-level 'navigation.versions' is not a list. Re-initializing.") + navigation_data["versions"] = [] + target_version_nav = {"version": lang_code, "tabs": []} + navigation_data["versions"].append(target_version_nav) + existing_pages = set() + target_tab_nav = None + + if target_tab_nav is None: + _log_issue(commit_message_reports_list, lang_code, "Info", f"Tab '{target_tab_name}' not found in version '{lang_code}'. It will be created if pages are added to it.") + existing_pages = set() + # Ensure target_version_nav.tabs exists for find_or_create_target_group + target_version_nav.setdefault("tabs", []) + if not isinstance(target_version_nav["tabs"], list): + _log_issue(commit_message_reports_list, lang_code, "Critical", f"Version '{lang_code}' 'tabs' attribute is not a list. Re-initializing.") + target_version_nav["tabs"] = [] + # Tab structure will be fully created by find_or_create_target_group when the first page is added. + # If no pages are added, the tab might not appear unless explicitly created empty for sorting. + # For now, rely on find_or_create_target_group. + + _log_issue(commit_message_reports_list, lang_code, "Info", f"{len(existing_pages)} existing pages found in docs.json for Tab '{target_tab_name}'.") + + filesystem_pages_map = {} + valid_filenames_for_processing = [] + + if not docs_dir_abs.is_dir(): + _log_issue(commit_message_reports_list, lang_code, "Error", f"Documentation directory '{docs_dir_abs}' not found. Skipping file processing for this configuration.") + return + + for filename in os.listdir(docs_dir_abs): + if not filename.endswith(".mdx"): + continue + + if filename_pattern.match(filename): + try: + page_path = get_page_path_from_filename(filename, docs_dir_relative) + filesystem_pages_map[filename] = page_path + valid_filenames_for_processing.append(filename) + except ValueError as e: + _log_issue(commit_message_reports_list, lang_code, "Error", f"Error generating page path for '{filename}': {e}. Skipping this file.") + else: + _log_issue(commit_message_reports_list, lang_code, "SeriousWarning", f"File '{filename}' in '{docs_dir_relative}' is .mdx but does not match FILENAME_PATTERN. Skipping this file.") + + filesystem_page_paths_set = set(filesystem_pages_map.values()) + _log_issue(commit_message_reports_list, lang_code, "Info", f"{len(filesystem_page_paths_set)} valid .mdx files matching pattern found in '{docs_dir_relative}'.") + + new_page_paths = filesystem_page_paths_set - existing_pages + removed_page_paths = existing_pages - filesystem_page_paths_set + + if new_page_paths: + _log_issue(commit_message_reports_list, lang_code, "Info", f"{len(new_page_paths)} new page(s) to add to Tab '{target_tab_name}'.") + if removed_page_paths: + _log_issue(commit_message_reports_list, lang_code, "Info", f"{len(removed_page_paths)} obsolete page(s) to remove from Tab '{target_tab_name}'.") + + # Re-fetch target_tab_nav as it might have been None if the tab was new + # This ensures we operate on the correct tab structure, especially if it was just created by find_or_create_target_group + # or if it was pre-existing. + # This ensures 'remove_obsolete_pages' gets the correct tab object. + # Note: find_or_create_target_group modifies target_version_nav in-place. + # We need to find the tab object within target_version_nav *after* any potential modifications. + # This will be done before adding new pages and before sorting groups. + + _current_tab_for_removal = next((t for t in target_version_nav.get("tabs", []) if isinstance(t, dict) and t.get("tab") == target_tab_name), None) + if removed_page_paths and _current_tab_for_removal: + remove_obsolete_pages(_current_tab_for_removal, removed_page_paths, commit_message_reports_list, lang_code) + elif removed_page_paths: # Means there were pages to remove, but the tab itself wasn't found (edge case) + _log_issue(commit_message_reports_list, lang_code, "Warning", f"Obsolete pages detected for Tab '{target_tab_name}', but the tab was not found in the current version structure. Removal skipped.") + + if new_page_paths: + files_to_add_sorted = sorted([fn for fn, pp in filesystem_pages_map.items() if pp in new_page_paths]) + + for filename in files_to_add_sorted: + match = filename_pattern.match(filename) + if not match: + _log_issue(commit_message_reports_list, lang_code, "InternalError", f"File '{filename}' was marked for addition but failed pattern match. Skipping.") + continue + + pwxy_str = match.group(1) + page_path = filesystem_pages_map[filename] + + if len(pwxy_str) < 3: + _log_issue(commit_message_reports_list, lang_code, "Error", f"File '{filename}' has an invalid PWXY prefix '{pwxy_str}' (too short). Skipping this file.") + continue + + p, w, x = pwxy_str[0], pwxy_str[1], pwxy_str[2] + group_key = (p, w, x) + + if group_key in pwx_map: + map_val = pwx_map[group_key] + if not (isinstance(map_val, tuple) and (len(map_val) == 2 or len(map_val) == 3)): + _log_issue(commit_message_reports_list, lang_code, "ConfigError", f"PWX_TO_GROUP_MAP entry for key {group_key} has invalid format: {map_val}. Expected tuple of 2 or 3 strings. Skipping file '{filename}'.") + continue + + _tab_name_in_map, group_name_from_map = map_val[0], map_val[1] + nested_group_name_from_map = map_val[2] if len(map_val) == 3 else None + + if _tab_name_in_map != target_tab_name: + _log_issue(commit_message_reports_list, lang_code, "Warning", f"File '{filename}' (PWX key {group_key}) maps to Tab '{_tab_name_in_map}' in PWX_TO_GROUP_MAP, but current processing is for Tab '{target_tab_name}'. Page will be added to '{target_tab_name}' under group '{group_name_from_map}'.") + + target_pages_container_list = find_or_create_target_group( + target_version_nav, target_tab_name, group_name_from_map, nested_group_name_from_map, + commit_message_reports_list, lang_code + ) + if page_path not in target_pages_container_list: + target_pages_container_list.append(page_path) + _log_issue(commit_message_reports_list, lang_code, "Info", f"Added page '{page_path}' to Group '{group_name_from_map}' (Nested: {nested_group_name_from_map or 'No'}).") + else: + _log_issue(commit_message_reports_list, lang_code, "Info", f"Page '{page_path}' already exists in Group '{group_name_from_map}' (Nested: {nested_group_name_from_map or 'No'}). Skipping addition.") + else: + _log_issue(commit_message_reports_list, lang_code, "SeriousWarning", f"File '{filename}' (PWX prefix ({p},{w},{x})) has no corresponding entry in PWX_TO_GROUP_MAP. Skipping this file.") + + # Final check for sorting: target_tab_nav needs to be the current state of the tab object. + final_target_tab_nav = next((t for t in target_version_nav.get("tabs", []) if isinstance(t, dict) and t.get("tab") == target_tab_name), None) + + if final_target_tab_nav and "groups" in final_target_tab_nav and isinstance(final_target_tab_nav["groups"], list): + if final_target_tab_nav["groups"]: + final_target_tab_nav["groups"].sort(key=lambda g: get_group_sort_key(g, desired_group_order)) + _log_issue(commit_message_reports_list, lang_code, "Info", f"Sorted groups in Tab '{target_tab_name}'.") + else: + _log_issue(commit_message_reports_list, lang_code, "Info", f"No groups to sort in Tab '{target_tab_name}' (tab is empty or contains no group structures).") + elif final_target_tab_nav: + _log_issue(commit_message_reports_list, lang_code, "Warning", f"Tab '{target_tab_name}' exists but has no valid 'groups' list to sort.") + else: # Tab was not created (e.g., no new pages and it didn't exist before) + _log_issue(commit_message_reports_list, lang_code, "Info", f"Tab '{target_tab_name}' does not exist in the final structure; no sorting needed.") + + +def load_docs_data_robust(path: Path, commit_message_reports_list: list, lang_for_report: str = "GLOBAL") -> dict: + """ + Loads docs.json data robustly. If file doesn't exist or is invalid, returns a default structure. + + Args: + path: Path object to the docs.json file. + commit_message_reports_list: List for accumulating critical issue messages. + lang_for_report: Identifier for logging context (defaults to "GLOBAL"). + + Returns: + A dictionary with the loaded data or a default structure on failure. + """ + default_structure = {"navigation": {"versions": []}} + try: + if not path.exists(): + _log_issue(commit_message_reports_list, lang_for_report, "Info", f"File '{path}' not found. Initializing with a new default structure.") + return default_structure + with open(path, "r", encoding="utf-8") as f: + data = json.load(f) + if not isinstance(data, dict) or \ + "navigation" not in data or not isinstance(data["navigation"], dict) or \ + "versions" not in data["navigation"] or not isinstance(data["navigation"]["versions"], list): + _log_issue(commit_message_reports_list, lang_for_report, "Error", f"File '{path}' has an invalid root structure. Key 'navigation.versions' (as a list) is missing or malformed. Using default structure.") + return default_structure + return data + except json.JSONDecodeError as e: + _log_issue(commit_message_reports_list, lang_for_report, "Error", f"Failed to parse JSON from '{path}': {e}. Using default structure.") + return default_structure + except Exception as e: + _log_issue(commit_message_reports_list, lang_for_report, "Critical", f"Unexpected error loading file '{path}': {e}. Using default structure.") + return default_structure + +def save_docs_data_robust(path: Path, data: dict, commit_message_reports_list: list, lang_for_report: str = "GLOBAL") -> bool: + """ + Saves data to docs.json robustly. + + Args: + path: Path object to the docs.json file. + data: The dictionary data to save. + commit_message_reports_list: List for accumulating critical issue messages. + lang_for_report: Identifier for logging context. + + Returns: + True if save was successful, False otherwise. + """ + try: + with open(path, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=4) + _log_issue(commit_message_reports_list, lang_for_report, "Info", f"Successfully saved updates to '{path}'.") + return True + except Exception as e: + _log_issue(commit_message_reports_list, lang_for_report, "Critical", f"Failed to save updates to '{path}': {e}.") + return False + +def validate_config(config: dict, config_name: str, commit_message_reports_list: list) -> bool: + """ + Validates a single documentation configuration dictionary. + + Args: + config: The configuration dictionary to validate. + config_name: A name/identifier for the configuration (e.g., language code), used for logging. + commit_message_reports_list: List for accumulating critical issue messages. + + Returns: + True if the configuration is valid, False otherwise. + """ + is_valid = True + required_keys = [ + "DOCS_DIR_RELATIVE", "LANGUAGE_CODE", "FILE_EXTENSION_SUFFIX", + "TARGET_TAB_NAME", "FILENAME_PATTERN", "PWX_TO_GROUP_MAP", "DESIRED_GROUP_ORDER" + ] + for key in required_keys: + if key not in config: + _log_issue(commit_message_reports_list, config_name, "ConfigError", f"Configuration is missing required key '{key}'.") + is_valid = False + + if not is_valid: + _log_issue(commit_message_reports_list, config_name, "Info", f"Skipping configuration '{config_name}' due to missing required keys.") + return False + + if not (isinstance(config["DOCS_DIR_RELATIVE"], str) and config["DOCS_DIR_RELATIVE"]): + _log_issue(commit_message_reports_list, config_name, "ConfigError", f"Key 'DOCS_DIR_RELATIVE' must be a non-empty string. Found: '{config.get('DOCS_DIR_RELATIVE')}'.") + is_valid = False + if not isinstance(config["FILENAME_PATTERN"], re.Pattern): + _log_issue(commit_message_reports_list, config_name, "ConfigError", f"Key 'FILENAME_PATTERN' must be a compiled regular expression (re.Pattern). Found type: {type(config.get('FILENAME_PATTERN'))}.") + is_valid = False + if not (isinstance(config["PWX_TO_GROUP_MAP"], dict) and config["PWX_TO_GROUP_MAP"]): + _log_issue(commit_message_reports_list, config_name, "ConfigError", f"Key 'PWX_TO_GROUP_MAP' must be a non-empty dictionary. Found: '{config.get('PWX_TO_GROUP_MAP')}'.") + is_valid = False + if not isinstance(config["DESIRED_GROUP_ORDER"], list): + _log_issue(commit_message_reports_list, config_name, "ConfigError", f"Key 'DESIRED_GROUP_ORDER' must be a list. Found type: {type(config.get('DESIRED_GROUP_ORDER'))}.") + is_valid = False + + if not is_valid: + _log_issue(commit_message_reports_list, config_name, "Info", f"Skipping configuration '{config_name}' due to type or content errors in its definition.") + return is_valid + + +def process_all_configs(configs_to_process: list[dict], docs_json_path: Path) -> list[str]: + """ + Main orchestrator for processing all provided documentation configurations. + Loads existing docs.json, processes each config, and saves the result. + + Args: + configs_to_process: A list of configuration dictionaries. + docs_json_path: Path to the docs.json file. + + Returns: + A list of strings, where each string is a critical issue message formatted for a commit summary. + Returns an empty list if no critical issues occurred. + """ + commit_message_reports = [] + + docs_data = load_docs_data_robust(docs_json_path, commit_message_reports) + + navigation_data_to_modify = docs_data.setdefault("navigation", {}) + if not isinstance(navigation_data_to_modify, dict): + _log_issue(commit_message_reports, "GLOBAL", "Critical", "'navigation' key in docs.json is not a dictionary. Resetting to default structure.") + docs_data["navigation"] = {"versions": []} + navigation_data_to_modify = docs_data["navigation"] + + navigation_data_to_modify.setdefault("versions", []) + if not isinstance(navigation_data_to_modify.get("versions"), list): + _log_issue(commit_message_reports, "GLOBAL", "Error", "'navigation.versions' in docs.json was not a list. Resetting it to an empty list.") + navigation_data_to_modify["versions"] = [] + + processed_any_config_successfully = False + for i, config_item in enumerate(configs_to_process): + config_id = config_item.get("LANGUAGE_CODE", f"UnnamedConfig_{i+1}") + + _log_issue(commit_message_reports, config_id, "Info", f"Starting validation for configuration '{config_id}'.") + if validate_config(config_item, config_id, commit_message_reports): + _log_issue(commit_message_reports, config_id, "Info", f"Configuration '{config_id}' validated successfully. Starting processing.") + try: + process_single_config(config_item, navigation_data_to_modify, commit_message_reports) + processed_any_config_successfully = True + except Exception as e: + _log_issue(commit_message_reports, config_id, "Critical", f"Unhandled exception during processing of configuration '{config_id}': {e}.") + import traceback + tb_str = traceback.format_exc() + print(f"TRACEBACK for configuration '{config_id}':\n{tb_str}") + else: + _log_issue(commit_message_reports, config_id, "Info", f"Configuration '{config_id}' failed validation. Skipping processing.") + + + if processed_any_config_successfully: + _log_issue(commit_message_reports, "GLOBAL", "Info", "Attempting to save changes to docs.json.") + save_docs_data_robust(docs_json_path, docs_data, commit_message_reports) + elif not configs_to_process: + _log_issue(commit_message_reports, "GLOBAL", "Info", "No configurations were provided to process.") + else: + _log_issue(commit_message_reports, "GLOBAL", "Info", "No valid configurations were processed successfully. docs.json will not be modified.") + + return commit_message_reports + +def main_apply_docs_json() -> str: + """ + Entry point for the script. Initializes configurations, processes them, + and returns a status message for commit purposes. + + Returns: + "success" if no critical issues were reported, otherwise a formatted string + summarizing critical issues for a commit message. + """ + print(f"Script base directory: {BASE_DIR}") + print(f"Docs JSON path: {DOCS_JSON_PATH}") + print(f"Refresh mode: {refresh}") + + CONFIGS_TO_PROCESS = [ + PLUGIN_DEV_ZH, + PLUGIN_DEV_EN, + PLUGIN_DEV_JA, + ] + + commit_message_parts = process_all_configs(CONFIGS_TO_PROCESS, DOCS_JSON_PATH) + + if not commit_message_parts: + return "success" + else: + num_critical_issues = len(commit_message_parts) + commit_summary_line = f"docs.json processed with {num_critical_issues} critical issue(s) reported." + + max_lines_for_commit_detail = 10 + if len(commit_message_parts) > max_lines_for_commit_detail: + detailed_issues_str = "\n".join(commit_message_parts[:max_lines_for_commit_detail]) + \ + f"\n... and {len(commit_message_parts) - max_lines_for_commit_detail} more critical issues (see full console logs for details)." + else: + detailed_issues_str = "\n".join(commit_message_parts) + + return f"{commit_summary_line}\n\nDetails of critical issues:\n{detailed_issues_str}" + + +if __name__ == "__main__": + result_message = main_apply_docs_json() + print("\n--- Script Execution Result ---") + print(result_message) \ No newline at end of file diff --git a/tools/2_contributing_in_page.py b/tools/contributing_in_page.py similarity index 97% rename from tools/2_contributing_in_page.py rename to tools/contributing_in_page.py index b0988a8e..7f99cdfb 100644 --- a/tools/2_contributing_in_page.py +++ b/tools/contributing_in_page.py @@ -222,7 +222,7 @@ def loop(dict): ) -if __name__ == "__main__": +def main_contributing_in_page(): process = { # Help Documentation "zh_help": { @@ -263,4 +263,13 @@ if __name__ == "__main__": "language": "ja" }, } - loop(process) + try: + loop(process) + return "success" + except Exception as e: + return (f"{str(e)}") + +if __name__ == "__main__": + result_message = main_contributing_in_page() + print("\n--- Script Execution Result ---") + print(result_message) diff --git a/tools/rename_by_dimensions.py b/tools/rename_by_dimensions.py new file mode 100644 index 00000000..7fe5b001 --- /dev/null +++ b/tools/rename_by_dimensions.py @@ -0,0 +1,449 @@ +import yaml # pip install pyyaml +import re +import datetime +from pathlib import Path +import shutil +import sys + + +class Config: + # --- Path Setup --- + BASE_DIR = Path(__file__).resolve().parent.parent + LANGUAGES = ["zh", "en", "ja"] # Languages to process + TIMESTAMP = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") + + # --- Directory Naming Templates --- + LANG_DIR_TEMPLATE = "plugin_dev_{lang}" + ARCHIVE_LANG_DIR_PREFIX_TEMPLATE = "plugin_dev_{lang}_archive_pre_processing_" + + # --- PWXY Mappings --- + PRIMARY_TYPE_MAP = { + "conceptual": 1, "implementation": 2, "operational": 3, "reference": 4, + } + DEFAULT_W = 0 + DETAIL_TYPE_MAPS = { + "conceptual": {"introduction": 1, "principles": 2, "architecture": 3}, + "implementation": {"basic": 1, "standard": 2, "high": 3, "advanced": 4}, + "operational": {"setup": 1, "deployment": 2, "maintenance": 3}, + "reference": {"core": 1, "configuration": 2, "examples": 3}, + } + DEFAULT_X = 0 + LEVEL_MAP = { + "beginner": 1, "intermediate": 2, "advanced": 3, + } + DEFAULT_Y = 0 + PRIORITY_NORMAL = 0 + PRIORITY_HIGH = 9 + PRIORITY_ADVANCED_LEVEL_KEY = "advanced" + PRIORITY_IMPLEMENTATION_PRIMARY_KEY = "implementation" + PRIORITY_IMPLEMENTATION_DETAIL_KEYS = {"high", "advanced"} + +# --- Helper Functions --- + +def extract_front_matter(content: str): + match = re.match(r"^\s*---\s*$(.*?)^---\s*$(.*)", + content, re.DOTALL | re.MULTILINE) + if match: + yaml_str = match.group(1).strip() + markdown_content = match.group(2).strip() + try: + front_matter = yaml.safe_load(yaml_str) + if front_matter is None: front_matter = {} # Ensure it's a dict for empty YAML + return (front_matter if isinstance(front_matter, dict) else {}), markdown_content + except yaml.YAMLError as e: + # This print is fine here as it's an early, critical parsing failure for a single file. + print(f" [Error] YAML Parsing Failed: {e}") + return None, content # Indicate error + return {}, content # No front matter found + + +def sanitize_filename_part(part: str) -> str: + if not isinstance(part, str): + part = str(part) + part = part.lower().replace("&", "and").replace("@", "at") + part = re.sub(r"\s+", "-", part) + part = re.sub(r"[^\w\-.]+", "", part) # Allow dots + part = part.strip(".-_") + return part or "untitled" + + +def _calculate_pwxy_and_warnings(front_matter: dict, config: Config) -> tuple[int, int, int, int, list[str]]: + warnings_messages = [] + dimensions = front_matter.get("dimensions", {}) + type_info = dimensions.get("type", {}) + primary = type_info.get("primary") # Will be present due to compliance check if we reach here + detail = type_info.get("detail") # Will be present + level = dimensions.get("level") # Will be present + + P = config.PRIORITY_NORMAL + if level == config.PRIORITY_ADVANCED_LEVEL_KEY or \ + (primary == config.PRIORITY_IMPLEMENTATION_PRIMARY_KEY and detail in config.PRIORITY_IMPLEMENTATION_DETAIL_KEYS): + P = config.PRIORITY_HIGH + + W = config.PRIMARY_TYPE_MAP.get(primary, config.DEFAULT_W) + primary_detail_map = config.DETAIL_TYPE_MAPS.get(primary, {}) + X = primary_detail_map.get(detail, config.DEFAULT_X) + Y = config.LEVEL_MAP.get(level, config.DEFAULT_Y) + + # Warnings for unmapped values (fields are assumed present from compliance check) + if W == config.DEFAULT_W and primary is not None : # primary being None shouldn't happen if compliant + warnings_messages.append(f"Unmapped primary type: '{primary}'. Using W={config.DEFAULT_W}") + + if X == config.DEFAULT_X and detail is not None: # detail being None shouldn't happen + if primary in config.DETAIL_TYPE_MAPS: + warnings_messages.append(f"Unmapped detail type: '{detail}' for primary '{primary}'. Using X={config.DEFAULT_X}") + elif primary is not None: # Should always be true + warnings_messages.append(f"No detail map defined for primary type: '{primary}'. Using X={config.DEFAULT_X}") + + if Y == config.DEFAULT_Y and level is not None: # level being None shouldn't happen + warnings_messages.append(f"Unmapped level: '{level}'. Using Y={config.DEFAULT_Y}") + + return P, W, X, Y, warnings_messages + + +def _generate_filename_parts( + P: int, W: int, X: int, Y: int, + front_matter: dict, + original_filename_stem: str +) -> tuple[str | None, str, str, list[str]]: + warnings_messages = [] + prefix_str = f"{P}{W}{X}{Y}" + try: + padded_prefix = f"{int(prefix_str):04d}" + except ValueError: + warnings_messages.append(f"Could not form numeric prefix from P{P}W{W}X{X}Y{Y}. Using '0000'.") + padded_prefix = "0000" + + standard_title = front_matter.get("standard_title") # Assumed present (not None) by compliance check + title_part_to_use = standard_title + if not title_part_to_use: # Handles standard_title: "" (empty string) + warnings_messages.append("Empty 'standard_title'. Using original filename stem as fallback.") + title_part_to_use = original_filename_stem + sanitized_title = sanitize_filename_part(title_part_to_use) + + lang_suffix = "" + language_fm = front_matter.get("language") + if language_fm is not None: # Check for presence + lang_code = str(language_fm).strip().lower() + if lang_code: lang_suffix = f".{lang_code}" + else: warnings_messages.append("Empty 'language' field in frontmatter. Omitting language suffix.") + else: # language field is missing + warnings_messages.append("Missing 'language' field in frontmatter. Omitting language suffix.") + return padded_prefix, sanitized_title, lang_suffix, warnings_messages + +# --- Core Processing Functions --- + +def get_or_create_lang_dir(lang: str, config: Config) -> tuple[Path | None, bool]: + lang_dir_path = config.BASE_DIR / config.LANG_DIR_TEMPLATE.format(lang=lang) + was_newly_created = False + if lang_dir_path.exists(): + if not lang_dir_path.is_dir(): + print(f"[ERROR] Path '{lang_dir_path}' exists but is not a directory. Skipping '{lang}'.") + return None, False + print(f"Using existing directory '{lang_dir_path.name}' for '{lang}'.") + else: + print(f"Directory '{lang_dir_path.name}' not found for '{lang}'. Creating it.") + try: + lang_dir_path.mkdir(parents=True, exist_ok=False) + was_newly_created = True + print(f"Created directory: '{lang_dir_path.name}' for '{lang}'.") + except Exception as e: # Catch any OS or File system error + print(f"[ERROR] Failed to create directory '{lang_dir_path}': {e}. Skipping '{lang}'.") + return None, False + return lang_dir_path, was_newly_created + + +def process_single_mdx_file(mdx_filepath: Path, config: Config) -> dict: + stats = { + "status": "processed", "all_file_warnings": [], "error_message": None, # all_file_warnings replaces warnings + "old_filename_stem_for_replace": None, "new_filename_stem_for_replace": None, + "problem_file_display_path": None, "problem_file_target_name": None, + "non_compliant_reason": None, + } + + try: + if mdx_filepath.is_relative_to(config.BASE_DIR): + # Show path relative to BASE_DIR's parent for context (e.g. "lang_dir/file.mdx") + display_path = mdx_filepath.relative_to(config.BASE_DIR.parent).as_posix() + else: + display_path = mdx_filepath.name # Fallback + except ValueError: # Should not happen if path is under BASE_DIR + display_path = mdx_filepath.name + stats["problem_file_display_path"] = display_path + + + try: + content = mdx_filepath.read_text(encoding="utf-8") + front_matter, _ = extract_front_matter(content) + + if front_matter is None: # YAML Parsing Error from extract_front_matter + stats["status"] = "error"; stats["error_message"] = "YAML Error in file." + # The extract_front_matter already printed details + print(f"\nProcessing: {display_path} -> (skipped due to YAML error)") + return stats + + # --- Check for critical missing frontmatter for renaming compliance --- + missing_critical_fields = [] + fm_dimensions = front_matter.get("dimensions", {}) + fm_type = fm_dimensions.get("type", {}) + + if fm_type.get("primary") is None: missing_critical_fields.append("dimensions.type.primary") + if fm_type.get("detail") is None: missing_critical_fields.append("dimensions.type.detail") + if fm_dimensions.get("level") is None: missing_critical_fields.append("dimensions.level") + if front_matter.get("standard_title") is None: missing_critical_fields.append("standard_title") + + if missing_critical_fields: + stats["status"] = "skipped_non_compliant" + reason = f"Missing critical frontmatter fields for renaming: {', '.join(missing_critical_fields)}." + stats["non_compliant_reason"] = reason + # This skip will be reported as a problem, so print concise info here + print(f"\nProcessing: {display_path} -> (skipped, non-compliant)") + print(f" [Skipping Reason] {reason}") + return stats # No further processing or warning generation for this file + + # --- If compliant, proceed to calculate and generate parts --- + P, W, X, Y, pwxy_warnings = _calculate_pwxy_and_warnings(front_matter, config) + stats["all_file_warnings"].extend(pwxy_warnings) + + original_stem_for_title_fallback = mdx_filepath.stem + padded_prefix, sanitized_title, lang_suffix, fname_warnings = _generate_filename_parts( + P, W, X, Y, front_matter, original_stem_for_title_fallback) + stats["all_file_warnings"].extend(fname_warnings) + + new_filename = f"{padded_prefix}-{sanitized_title}{lang_suffix}.mdx" + stats["problem_file_target_name"] = new_filename + new_filepath = mdx_filepath.with_name(new_filename) + + if new_filepath == mdx_filepath: + stats["status"] = "skipped_no_change" + elif new_filepath.exists(): + stats["status"] = "skipped_target_exists" + else: + try: + original_stem_before_rename = mdx_filepath.stem + mdx_filepath.rename(new_filepath) + stats["status"] = "processed" + stats["old_filename_stem_for_replace"] = original_stem_before_rename + stats["new_filename_stem_for_replace"] = new_filepath.stem + except Exception as rename_error: + stats["status"] = "error" + stats["error_message"] = f"Failed to rename to '{new_filename}': {rename_error}" + + # Print details if warnings, actual change, error, or specific skips (except non-compliant already printed) + action_taken = new_filepath != mdx_filepath and stats["status"] == "processed" + if stats["all_file_warnings"] or action_taken or stats["status"].startswith("error") or stats["status"] == "skipped_target_exists": + print(f"\nProcessing: {display_path} -> {new_filename if action_taken else '(no change or skipped/error)'}") + for warning_msg in stats["all_file_warnings"]: print(f" [Warning] {warning_msg}") # These will now be problem reports + if stats["status"] == "skipped_target_exists": print(f" [Skipping] Target '{new_filename}' already exists.") + if stats["error_message"]: print(f" [Error] {stats['error_message']}") + + + except FileNotFoundError: # Should be rare if mdx_filepath comes from rglob + stats["status"] = "error"; stats["error_message"] = f"File not found: {mdx_filepath.name}" + print(f"\nProcessing: {display_path}"); print(f" [Error] {stats['error_message']}") + except Exception as e: + stats["status"] = "error"; stats["error_message"] = f"Unexpected error: {e}" + print(f"\nProcessing: {display_path}"); print(f" [Error] Unexpected error processing file: {e}") + import traceback; traceback.print_exc() + return stats + + +def run_processing_for_language(lang_dir_path: Path, config: Config) -> dict: + print(f"Starting in-place processing for: {lang_dir_path.name}") + lang_stats = { + "processed_count": 0, "skipped_no_change_count": 0, + "skipped_target_exists_count": 0, "error_count": 0, + "warning_files_count": 0, # This counts files that HAD warnings. + "status": "OK", + "dir_path_str": str(lang_dir_path.relative_to(config.BASE_DIR)), # Path relative to project root + "content_replacements_made_count": 0, "content_replacement_errors_count": 0, + "error_file_details": [], "skipped_target_exists_details": [], + "content_replacement_error_details": [], + "skipped_non_compliant_count": 0, + "skipped_non_compliant_details": [], + "files_with_processing_warnings_details": [], # New: to store path and specific warnings + } + + if not lang_dir_path.exists() or not lang_dir_path.is_dir(): + lang_stats["status"] = "LANG_DIR_ERROR" + print(f"[Error] Language directory '{lang_dir_path.name}' issue (not found or not a dir).") + return lang_stats + + print(f"\n--- Phase 1: Renaming files in '{lang_dir_path.name}' ---") + mdx_files = sorted(list(lang_dir_path.rglob("*.mdx"))) + total_files = len(mdx_files) + print(f"Found {total_files} MDX files for renaming phase.") + rename_mappings = [] + + for i, mdx_filepath in enumerate(mdx_files): + result = process_single_mdx_file(mdx_filepath, config) + status = result["status"] + + if status == "processed": + lang_stats["processed_count"] += 1 + old, new = result.get("old_filename_stem_for_replace"), result.get("new_filename_stem_for_replace") + if old and new and old != new: rename_mappings.append((old, new)) + elif status == "skipped_no_change": lang_stats["skipped_no_change_count"] += 1 + elif status == "skipped_target_exists": + lang_stats["skipped_target_exists_count"] += 1 + lang_stats["skipped_target_exists_details"].append({ + "original_display_path": result["problem_file_display_path"], + "target_name": result["problem_file_target_name"] + }) + elif status == "skipped_non_compliant": + lang_stats["skipped_non_compliant_count"] += 1 + lang_stats["skipped_non_compliant_details"].append({ + "path": result["problem_file_display_path"], + "reason": result["non_compliant_reason"] + }) + elif status == "error": + lang_stats["error_count"] += 1 + lang_stats["error_file_details"].append({ + "path": result["problem_file_display_path"], "message": result["error_message"] + }) + + if result["all_file_warnings"]: # If there were any warnings for this file + lang_stats["warning_files_count"] += 1 # Increment count of files with warnings + lang_stats["files_with_processing_warnings_details"].append({ # Store details for problem reporting + "path": result["problem_file_display_path"], + "warnings": result["all_file_warnings"] + }) + + if total_files > 0: print(f"Rename Progress ({lang_dir_path.name}): {i+1}/{total_files} ({((i+1)/total_files*100):.1f}%)", end="\r") + + if total_files > 0: print() # Newline after progress + print("--- Phase 1: Renaming files complete. ---") + + if rename_mappings: + print(f"\n--- Phase 2: Updating content references in '{lang_dir_path.name}' ({len(rename_mappings)} filename changes to propagate) ---") + all_mdx_after_rename = sorted(list(lang_dir_path.rglob("*.mdx"))) + total_replace_scan = len(all_mdx_after_rename) + print(f"Scanning {total_replace_scan} .mdx files for content updates.") + updated_count = 0 + for i, scan_path in enumerate(all_mdx_after_rename): + display_scan_path = scan_path.relative_to(config.BASE_DIR.parent).as_posix() # Consistent display path + try: + content, changed = scan_path.read_text(encoding="utf-8"), False + mod_content = content + for old, new in rename_mappings: + if old in mod_content: mod_content, changed = mod_content.replace(old, new), True + if changed: + scan_path.write_text(mod_content, encoding="utf-8") + updated_count +=1; print(f" Updated references in: {display_scan_path}") + except Exception as e: + err_msg = f"Failed to update references in {display_scan_path}: {e}" + print(f" [Error] {err_msg}") + lang_stats["content_replacement_errors_count"] += 1 + lang_stats["content_replacement_error_details"].append({"path": display_scan_path, "error": str(e)}) + if total_replace_scan > 0: print(f"Content Update Progress ({lang_dir_path.name}): {i+1}/{total_replace_scan} ({((i+1)/total_replace_scan*100):.1f}%)", end="\r") + if total_replace_scan > 0: print() # Newline after progress + lang_stats["content_replacements_made_count"] = updated_count + print(f"Content replacement phase: {updated_count} files had their content updated.") + print("--- Phase 2: Content references update complete. ---") + else: print("\nNo renames occurred, skipping content reference update phase.") + + print("-" * 20 + f"\nLanguage Processing Summary ({lang_dir_path.name}):") + print(f" Processed (renamed): {lang_stats['processed_count']}") + print(f" Skipped (no change): {lang_stats['skipped_no_change_count']}") + print(f" Skipped (target exists): {lang_stats['skipped_target_exists_count']}") + print(f" Skipped (non-compliant for rename): {lang_stats['skipped_non_compliant_count']}") + print(f" Files generating warnings: {lang_stats['warning_files_count']}") # Renamed for clarity + print(f" Errors (renaming phase): {lang_stats['error_count']}") + if rename_mappings or lang_stats['content_replacement_errors_count'] > 0 or lang_stats['content_replacements_made_count'] > 0: + print(f" Content updated (references): {lang_stats['content_replacements_made_count']}") + print(f" Errors (content update): {lang_stats['content_replacement_errors_count']}") + print("-" * 20) + + # A language dir has errors if file errors or content replacement errors occurred. + # Non-compliant skips or warnings are now also reported as "problems" at main level, + # but don't change the "ERRORS_IN_PROCESSING" status of the language itself here. + # The main problem report will cover those. + if lang_stats["error_count"] > 0 or lang_stats["content_replacement_errors_count"] > 0: + lang_stats["status"] = "ERRORS_IN_PROCESSING" + return lang_stats + + +def main_rename_by_dimensions() -> str: # Return type is now str + config = Config() + print(f"Base directory: {config.BASE_DIR}\nTimestamp for this run: {config.TIMESTAMP}") + overall_summary, lang_dir_created_flags, lang_dirs_map = {}, {}, {} + problem_reports_list = [] # Internal list to build up problem strings + + for lang in config.LANGUAGES: + print(f"\n{'='*10} Processing Language: {lang.upper()} {'='*10}") + current_lang_dir, was_newly_created = get_or_create_lang_dir(lang, config) + lang_dir_created_flags[lang], lang_dirs_map[lang] = was_newly_created, current_lang_dir + + if not current_lang_dir: + msg = f"Failed to get or create language directory for '{lang}'." + overall_summary[lang] = {"status": "SETUP_ERROR", "message": msg} + problem_reports_list.append(f"- Lang '{lang}': Setup Error - {msg}") + continue + + lang_results = run_processing_for_language(current_lang_dir, config) + overall_summary[lang] = lang_results + + # Cleanup empty newly created directory + if current_lang_dir and was_newly_created and current_lang_dir.exists() and not any(current_lang_dir.iterdir()): + try: + current_lang_dir.rmdir(); print(f" Removed empty newly created language directory: {current_lang_dir.name}") + lang_dirs_map[lang] = None # Mark as gone + # No need to add to lang_results["message"] as it's a normal cleanup + except OSError as e: print(f" Note: Could not remove empty newly created directory '{current_lang_dir.name}': {e}") + + print("\n\n" + "=" * 20 + " Overall Script Summary " + "=" * 20) + for lang_code in config.LANGUAGES: + summary = overall_summary.get(lang_code, {}) + lang_dir_path_obj = lang_dirs_map.get(lang_code) + + print(f"\nLanguage: {lang_code.upper()}\n Status: {summary.get('status', 'UNKNOWN')}") + + if "message" in summary and summary['status'] in ["SETUP_ERROR", "LANG_DIR_ERROR"]: # Critical setup messages + print(f" Message: {summary['message']}") + + if summary.get('status') not in ["SETUP_ERROR", "LANG_DIR_ERROR"]: + print(f" Directory: {summary.get('dir_path_str', 'N/A')}") + for key, label in [ + ("processed_count", "Processed (renamed)"), + ("skipped_no_change_count", "Skipped (no change)"), + ("skipped_target_exists_count", "Skipped (target exists)"), + ("skipped_non_compliant_count", "Skipped (non-compliant for rename)"), + ("warning_files_count", "Files generating warnings"), + ("error_count", "Errors (renaming phase)"), + ("content_replacements_made_count", "Content updated (references)"), + ("content_replacement_errors_count", "Errors (content update)") + ]: # Iterate common stats + if key in summary: print(f" {label}: {summary.get(key, 0)}") + + # Collect problem reports for return + for detail in summary.get("error_file_details", []): + problem_reports_list.append(f"- Lang '{lang_code}': File '{detail['path']}' - Renaming error: {detail['message']}") + for detail in summary.get("skipped_target_exists_details", []): + problem_reports_list.append(f"- Lang '{lang_code}': File '{detail['original_display_path']}' could not be renamed to '{detail['target_name']}' (target exists).") + for detail in summary.get("skipped_non_compliant_details", []): # Now a problem + problem_reports_list.append(f"- Lang '{lang_code}': File '{detail['path']}' - Skipped (non-compliant): {detail['reason']}") + for detail in summary.get("files_with_processing_warnings_details", []): # Now a problem + warnings_str = "; ".join(detail['warnings']) + problem_reports_list.append(f"- Lang '{lang_code}': File '{detail['path']}' - Processing Warnings: {warnings_str}") + for detail in summary.get("content_replacement_error_details", []): + problem_reports_list.append(f"- Lang '{lang_code}': File '{detail['path']}' - Content replacement error: {detail['error']}") + + if lang_dir_path_obj and lang_dir_path_obj.exists(): + print(f" Final directory location: {lang_dir_path_obj.name}") + elif lang_dir_created_flags.get(lang_code) and not lang_dir_path_obj: # Was new, now gone + print(" Note: Empty newly created directory was removed as expected.") + elif not lang_dir_path_obj and summary.get('status') != "SETUP_ERROR": # Not a setup error, but dir is gone + print(f" Note: Language directory '{config.LANG_DIR_TEMPLATE.format(lang=lang_code)}' may have been archived or removed by other means.") + + print("=" * (40 + len(" Overall Script Summary "))) + + if not problem_reports_list: + return "success" + else: + return "\n".join(problem_reports_list) + + +if __name__ == "__main__": + result_message = main_rename_by_dimensions() + print("\n--- Script Execution Result ---") + print(result_message) \ No newline at end of file diff --git a/tools/add_deprecation_notices.py b/tools/temp_add_deprecation_notices.py similarity index 100% rename from tools/add_deprecation_notices.py rename to tools/temp_add_deprecation_notices.py