Files
dify-docs/tools/translate/pr_analyzer.py
Chenhe Gu 61466c3f45 support configuring ignored files in auto sync (#629)
* Add ignore_files config to exclude specific files from translation

Adds ability to specify source language files that should not be translated:
- New `ignore_files` array in config.json
- Validation ensures paths start with source dir, have valid extension, no traversal
- Filtering applied in PRAnalyzer.categorize_files() and SyncPlanGenerator.generate_sync_plan()

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

* update config

---------

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-23 15:45:28 -08:00

611 lines
24 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
PR Analyzer for Documentation Translation Workflow
This utility analyzes pull request changes to categorize them and validate
they follow the proper workflow requirements for source vs translation content.
"""
import json
import subprocess
import sys
from pathlib import Path
from typing import Dict, List, Tuple, Optional
import re
class PRAnalyzer:
"""Analyzes PR changes to categorize and validate translation workflow requirements."""
def __init__(self, base_sha: str, head_sha: str, repo_root: Optional[str] = None):
self.base_sha = base_sha
self.head_sha = head_sha
self.repo_root = Path(repo_root) if repo_root else Path(__file__).parent.parent.parent
self.docs_json_path = self.repo_root / "docs.json"
self.config = self._load_config()
# Initialize language settings from config
self.source_language = self.config.get('source_language', 'en')
self.target_languages = self.config.get('target_languages', ['zh', 'ja'])
# Load and validate ignore files
self.ignore_files = self._load_ignore_files()
def _load_config(self) -> Dict:
"""Load translation configuration."""
config_path = Path(__file__).parent / "config.json"
if config_path.exists():
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
return {}
def _load_ignore_files(self) -> List[str]:
"""Load and validate ignore_files configuration.
Validates that:
- Each path starts with source language directory
- No directory traversal (..)
- Valid file extension (.md, .mdx)
Returns:
List of validated ignore file paths
"""
ignore_files = self.config.get('ignore_files', [])
if not ignore_files:
return []
validated = []
source_dir = self.get_language_directory(self.source_language)
for path in ignore_files:
# Must start with source language directory
if not path.startswith(f"{source_dir}/"):
print(f"Warning: Ignore path must start with '{source_dir}/': {path} (skipping)")
continue
# No directory traversal
if ".." in path:
print(f"Warning: Invalid ignore path (contains '..'): {path} (skipping)")
continue
# Must have valid extension
if not any(path.endswith(ext) for ext in ['.md', '.mdx']):
print(f"Warning: Ignore path must end with .md or .mdx: {path} (skipping)")
continue
validated.append(path)
return validated
def _is_file_ignored(self, file_path: str) -> bool:
"""Check if a file should be ignored from translation.
Args:
file_path: Path to check (e.g., 'en/guides/some-file.md')
Returns:
True if file is in ignore list, False otherwise
"""
return file_path in self.ignore_files
def get_language_directory(self, lang_code: str) -> str:
"""Get directory name for a language code from config."""
if 'languages' in self.config and lang_code in self.config['languages']:
return self.config['languages'][lang_code].get('directory', lang_code)
return lang_code
def get_changed_files(self) -> List[str]:
"""Get list of changed files between base and head commits."""
try:
result = subprocess.run([
"git", "diff", "--name-only", self.base_sha, self.head_sha
], capture_output=True, text=True, check=True, cwd=self.repo_root)
files = [f.strip() for f in result.stdout.strip().split('\n') if f.strip()]
return files
except subprocess.CalledProcessError as e:
print(f"Error getting changed files: {e}")
return []
def get_docs_json_at_sha(self, sha: str) -> Optional[Dict]:
"""Get docs.json content at a specific commit."""
try:
result = subprocess.run([
"git", "show", f"{sha}:docs.json"
], capture_output=True, text=True, check=True, cwd=self.repo_root)
return json.loads(result.stdout)
except (subprocess.CalledProcessError, json.JSONDecodeError) as e:
print(f"Error loading docs.json at {sha}: {e}")
return None
def extract_language_navigation(self, docs_data: Dict, language: str) -> Optional[Dict]:
"""Extract navigation structure for a specific language from docs.json."""
if not docs_data or 'navigation' not in docs_data:
return None
navigation = docs_data['navigation']
# Handle both direct languages and versions structure
if 'languages' in navigation:
languages = navigation['languages']
elif 'versions' in navigation and len(navigation['versions']) > 0:
languages = navigation['versions'][0].get('languages', [])
else:
return None
for lang_data in languages:
if lang_data.get('language') == language:
return lang_data
return None
def analyze_docs_json_changes(self) -> Dict[str, bool]:
"""Analyze which language sections changed in docs.json."""
base_docs = self.get_docs_json_at_sha(self.base_sha)
head_docs = self.get_docs_json_at_sha(self.head_sha)
changes = {
'source_section': False,
'translation_sections': False,
'any_docs_json_changes': False
}
if not base_docs or not head_docs:
return changes
# Check if docs.json changed at all
if base_docs != head_docs:
changes['any_docs_json_changes'] = True
# Check source language navigation section
source_lang = self.config['source_language']
base_source = self.extract_language_navigation(base_docs, source_lang)
head_source = self.extract_language_navigation(head_docs, source_lang)
if base_source != head_source:
changes['source_section'] = True
# Check translation sections
for lang in self.config['target_languages']:
base_lang = self.extract_language_navigation(base_docs, lang)
head_lang = self.extract_language_navigation(head_docs, lang)
if base_lang != head_lang:
changes['translation_sections'] = True
break
return changes
def is_openapi_file(self, file_path: str) -> bool:
"""Check if file matches OpenAPI patterns from config."""
openapi_config = self.config.get('openapi', {})
if not openapi_config.get('enabled', False):
return False
patterns = openapi_config.get('file_patterns', [])
directories = openapi_config.get('directories', [])
# Check if in allowed directory
path_parts = Path(file_path).parts
if len(path_parts) < 3: # e.g., en/api-reference/file.json
return False
dir_name = path_parts[1] # Get directory after language code
if dir_name not in directories:
return False
# Check if matches any pattern
file_name = Path(file_path).name
for pattern in patterns:
if self._match_pattern(file_name, pattern):
return True
return False
def _match_pattern(self, filename: str, pattern: str) -> bool:
"""Simple glob-like pattern matching."""
regex = pattern.replace('*', '.*').replace('?', '.')
return bool(re.match(f'^{regex}$', filename))
def categorize_files(self, files: List[str]) -> Dict[str, List[str]]:
"""Categorize changed files by type."""
categories = {
'source': [],
'source_openapi': [], # NEW category
'translation': [],
'translation_openapi': [], # NEW category
'docs_json': [],
'other': []
}
# Get source and target language directories from config
source_dir = self.config.get('source_language', 'en')
if 'languages' in self.config and source_dir in self.config['languages']:
source_dir = self.config['languages'][source_dir].get('directory', 'en')
target_dirs = []
for lang_code in self.config.get('target_languages', []):
if 'languages' in self.config and lang_code in self.config['languages']:
target_dir = self.config['languages'][lang_code].get('directory', lang_code)
target_dirs.append(target_dir)
# Fallback if config not properly loaded
if not target_dirs:
target_dirs = ['zh', 'ja']
for file in files:
if file == 'docs.json':
categories['docs_json'].append(file)
elif file.startswith(f'{source_dir}/'):
# Check if file is in ignore list
if self._is_file_ignored(file):
categories['other'].append(file) # Treat as 'other' so it's not processed
elif file.endswith(('.md', '.mdx')):
categories['source'].append(file)
elif self.is_openapi_file(file):
categories['source_openapi'].append(file)
else:
categories['other'].append(file)
elif any(file.startswith(f'{target_dir}/') for target_dir in target_dirs):
if file.endswith(('.md', '.mdx')):
categories['translation'].append(file)
elif self.is_openapi_file(file):
categories['translation_openapi'].append(file)
else:
categories['other'].append(file)
else:
categories['other'].append(file)
return categories
def categorize_pr(self) -> Dict[str, any]:
"""Categorize the PR based on changed files and docs.json sections."""
changed_files = self.get_changed_files()
if not changed_files:
return {
'type': 'none',
'should_skip': True,
'error': None,
'files': {'source': [], 'translation': [], 'docs_json': [], 'other': []},
'docs_json_changes': {'source_section': False, 'translation_sections': False, 'any_docs_json_changes': False}
}
file_categories = self.categorize_files(changed_files)
docs_json_changes = self.analyze_docs_json_changes()
# Determine if there are source language content changes (including OpenAPI)
has_source_files = len(file_categories['source']) > 0 or len(file_categories['source_openapi']) > 0
has_source_docs_changes = docs_json_changes['source_section']
# Determine if there are translation changes (including OpenAPI)
has_translation_files = len(file_categories['translation']) > 0 or len(file_categories['translation_openapi']) > 0
has_translation_docs_changes = docs_json_changes['translation_sections']
# Filter out non-documentation changes from consideration
relevant_source_changes = has_source_files or has_source_docs_changes
relevant_translation_changes = has_translation_files or has_translation_docs_changes
# Categorize PR type
if relevant_source_changes and relevant_translation_changes:
pr_type = 'mixed'
should_skip = False
error = self.generate_mixed_pr_error(file_categories, docs_json_changes)
elif relevant_source_changes:
pr_type = 'source'
should_skip = False
error = None
elif relevant_translation_changes:
pr_type = 'translation'
should_skip = True
error = None
else:
pr_type = 'none'
should_skip = True
error = None
return {
'type': pr_type,
'should_skip': should_skip,
'error': error,
'files': file_categories,
'docs_json_changes': docs_json_changes
}
def generate_mixed_pr_error(self, file_categories: Dict[str, List[str]], docs_json_changes: Dict[str, bool]) -> str:
"""Generate comprehensive error message for mixed PRs."""
def format_file_list(files: List[str], max_files: int = 10) -> str:
if not files:
return " - (none)"
formatted = []
for file in files[:max_files]:
formatted.append(f" - `{file}`")
if len(files) > max_files:
formatted.append(f" - ... and {len(files) - max_files} more")
return '\n'.join(formatted)
def format_docs_json_changes(changes: Dict[str, bool]) -> str:
parts = []
if changes['source_section']:
source_lang = self.config.get('source_language', 'en')
parts.append(f" - ✅ {source_lang.upper()} navigation section")
if changes['translation_sections']:
target_langs = ', '.join(self.config.get('target_languages', []))
parts.append(f" - ✅ Translation navigation sections ({target_langs})")
if not parts:
parts.append(" - (no navigation changes)")
return '\n'.join(parts)
error_msg = f"""❌ **Mixed Content PR Detected**
This PR contains changes to both source language content and translations, which violates our automated workflow requirements.
**🔧 Required Action: Separate into Two PRs**
Please create two separate pull requests:
### 1⃣ **Source Language Content PR**
Create a PR containing only:
- Changes to source language files (`{self.get_language_directory(self.source_language)}/`)
- Changes to source language navigation in `docs.json`
- This will trigger automatic translation generation
### 2⃣ **Translation Improvement PR**
Create a PR containing only:
- Changes to translation language files ({self._get_translation_dirs_display()})
- Changes to translation navigation sections in `docs.json`
- This will go through direct review (no automation)
---
**📋 Files Detected in This PR:**
**📝 Source Language Content Files ({len(file_categories['source'])} files):**
{format_file_list(file_categories['source'])}
**🌐 Translation Files ({len(file_categories['translation'])} files):**
{format_file_list(file_categories['translation'])}
**📋 docs.json Navigation Changes:**
{format_docs_json_changes(docs_json_changes)}
---
**💡 Why This Separation is Required:**
- **Proper Review Process**: Source language content and translations have different review requirements
- **Automation Conflicts**: Mixed PRs break the automated translation workflow
- **Independent Merging**: Content and translations can be merged independently
- **Clear History**: Maintains clean git history for content vs translation changes
**🤖 What Happens Next:**
1. **Source Language PR**: Will automatically generate translations and create a linked translation PR
2. **Translation PR**: Will go through standard review process
3. **Both PRs**: Can be reviewed and merged independently
Please separate your changes and resubmit as two focused PRs. Thank you! 🙏"""
return error_msg
def _get_translation_dirs_display(self) -> str:
"""Get formatted display of translation directories for error messages."""
dirs = []
for lang_code in self.config.get('target_languages', []):
if 'languages' in self.config and lang_code in self.config['languages']:
dir_name = self.config['languages'][lang_code].get('directory', lang_code)
dirs.append(f"`{dir_name}/`")
if not dirs:
dirs = ["`zh/`", "`ja/`"] # Fallback
return ' and '.join(dirs)
class SyncPlanGenerator:
"""
Generates sync_plan.json with identical logic for both execute and update workflows.
Extracts the sync plan generation logic from the analyze workflow to ensure
both workflows use the same file filtering and structure change detection.
"""
def __init__(self, base_sha: str, head_sha: str, repo_root: Optional[str] = None):
self.base_sha = base_sha
self.head_sha = head_sha
self.repo_root = Path(repo_root) if repo_root else Path(__file__).parent.parent.parent
self.analyzer = PRAnalyzer(base_sha, head_sha, repo_root)
self.config = self.analyzer.config
def get_changed_files_with_status(self) -> List[Tuple[str, str]]:
"""
Get list of changed files with their status (A=added, M=modified, D=deleted, etc).
Returns list of tuples: [(status, filepath), ...]
Only returns A (added) and M (modified) files for translation.
Filters out files that don't exist at head_sha (handles add-then-delete scenario).
"""
try:
result = subprocess.run([
"git", "diff", "--name-status", "--diff-filter=AM",
self.base_sha, self.head_sha
], capture_output=True, text=True, check=True, cwd=self.repo_root)
files_with_status = []
for line in result.stdout.strip().split('\n'):
if line.strip():
parts = line.split('\t', 1)
if len(parts) == 2:
status, filepath = parts[0], parts[1]
# Verify file exists at head_sha (handles add-then-delete scenario)
if self._file_exists_at_commit(filepath, self.head_sha):
files_with_status.append((status, filepath))
else:
print(f"Skipping {filepath}: added then deleted in same PR")
return files_with_status
except subprocess.CalledProcessError as e:
print(f"Error getting changed files with status: {e}")
return []
def _file_exists_at_commit(self, filepath: str, commit_sha: str) -> bool:
"""Check if a file exists at a specific commit."""
try:
subprocess.run([
"git", "cat-file", "-e", f"{commit_sha}:{filepath}"
], capture_output=True, check=True, cwd=self.repo_root)
return True
except subprocess.CalledProcessError:
return False
def get_file_size(self, filepath: str) -> int:
"""Get file size in bytes."""
full_path = self.repo_root / filepath
try:
return full_path.stat().st_size if full_path.exists() else 0
except:
return 0
def is_openapi_file(self, filepath: str) -> bool:
"""Check if file matches OpenAPI JSON pattern."""
openapi_config = self.config.get("openapi", {})
if not openapi_config.get("enabled", False):
return False
file_patterns = openapi_config.get("file_patterns", ["openapi*.json"])
directories = openapi_config.get("directories", ["api-reference"])
# Check if file is in allowed directories
if not any(f"/{dir}/" in filepath or filepath.startswith(f"{dir}/") for dir in directories):
return False
# Check if filename matches patterns
filename = Path(filepath).name
for pattern in file_patterns:
regex = pattern.replace('*', '.*').replace('?', '.')
if re.match(f'^{regex}$', filename):
return True
return False
def generate_sync_plan(self) -> Dict:
"""
Generate sync plan with identical logic to analyze workflow.
Returns sync_plan dict with:
- metadata: PR context and commit info
- files_to_sync: Source language markdown files (A/M only)
- openapi_files_to_sync: Source language OpenAPI JSON files (A/M only)
- structure_changes: docs.json change analysis
- target_languages: Languages to translate to
- sync_required: Whether any sync is needed
"""
# Get changed files with status
files_with_status = self.get_changed_files_with_status()
# Categorize files for translation
files_to_sync = []
openapi_files_to_sync = []
docs_json_changed = False
for status, filepath in files_with_status:
# Check for docs.json
if filepath == 'docs.json':
docs_json_changed = True
continue
# Skip ignored files
if self.analyzer._is_file_ignored(filepath):
continue
# Process source language markdown files
if filepath.startswith('en/') and filepath.endswith(('.md', '.mdx')):
file_size = self.get_file_size(filepath)
file_type = 'mdx' if filepath.endswith('.mdx') else 'md'
files_to_sync.append({
"path": filepath,
"size": file_size,
"type": file_type,
"status": status
})
# Process source language OpenAPI JSON files
elif filepath.startswith('en/') and self.is_openapi_file(filepath):
file_size = self.get_file_size(filepath)
openapi_files_to_sync.append({
"path": filepath,
"size": file_size,
"type": "openapi_json",
"status": status
})
# Analyze docs.json changes (if changed)
if docs_json_changed:
docs_changes = self.analyzer.analyze_docs_json_changes()
structure_changes = {
"structure_changed": docs_changes["any_docs_json_changes"],
"navigation_modified": docs_changes["source_section"],
"languages_affected": self.config["target_languages"] if docs_changes["source_section"] else []
}
else:
structure_changes = {
"structure_changed": False,
"navigation_modified": False,
"languages_affected": []
}
# Create metadata
metadata = {
"base_sha": self.base_sha,
"head_sha": self.head_sha,
"comparison": f"{self.base_sha[:8]}...{self.head_sha[:8]}"
}
# Build sync plan
sync_plan = {
"metadata": metadata,
"files_to_sync": files_to_sync,
"openapi_files_to_sync": openapi_files_to_sync,
"structure_changes": structure_changes,
"target_languages": self.config["target_languages"],
"sync_required": len(files_to_sync) > 0 or len(openapi_files_to_sync) > 0 or structure_changes.get("structure_changed", False)
}
return sync_plan
def main():
"""Main entry point for command line usage."""
if len(sys.argv) != 3:
print("Usage: python pr_analyzer.py <base_sha> <head_sha>")
sys.exit(1)
base_sha = sys.argv[1]
head_sha = sys.argv[2]
analyzer = PRAnalyzer(base_sha, head_sha)
result = analyzer.categorize_pr()
# Output results for GitHub Actions
print(f"pr_type={result['type']}")
print(f"should_skip={str(result['should_skip']).lower()}")
if result['error']:
print(f"error_message={result['error']}")
sys.exit(1)
# Output additional details
files = result['files']
docs_changes = result['docs_json_changes']
print(f"source_files_count={len(files['source'])}")
print(f"translation_files_count={len(files['translation'])}")
print(f"docs_json_source_changes={str(docs_changes['source_section']).lower()}")
print(f"docs_json_translation_changes={str(docs_changes['translation_sections']).lower()}")
print(f"any_docs_json_changes={str(docs_changes['any_docs_json_changes']).lower()}")
if __name__ == "__main__":
main()