Files
dify-docs/tools/translate/security_validator.py
Gu 33455cd05e test: internal contributor workflow test
- Add test documentation file
- Update docs.json navigation
- Testing two-workflow pattern for internal PRs
2025-08-23 10:24:37 +08:00

384 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Security validation utilities for documentation synchronization.
Provides input validation, path sanitization, and security checks.
"""
import os
import re
import json
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
import hashlib
import hmac
class SecurityValidator:
"""Validates and sanitizes inputs for documentation synchronization"""
# Security constants
MAX_FILE_SIZE_MB = 10
MAX_FILES_PER_SYNC = 50
MAX_PATH_LENGTH = 255
MAX_CONTENT_LENGTH = 1024 * 1024 * 10 # 10MB
# Allowed file extensions
ALLOWED_EXTENSIONS = {'.md', '.mdx', '.json'}
# Allowed base directories
ALLOWED_BASE_DIRS = {'en', 'zh-hans', 'ja-jp'}
# Dangerous patterns to block
DANGEROUS_PATTERNS = [
r'\.\.', # Directory traversal
r'^/', # Absolute paths
r'^~', # Home directory
r'\$\{', # Variable expansion
r'`', # Command substitution
r'<script', # Script tags
r'javascript:', # JavaScript protocol
r'data:text/html', # Data URLs with HTML
]
def __init__(self, base_dir: Path):
"""
Initialize the security validator.
Args:
base_dir: The base directory for all operations
"""
self.base_dir = Path(base_dir).resolve()
def validate_file_path(self, file_path: str) -> Tuple[bool, Optional[str]]:
"""
Validate a file path for security issues.
Args:
file_path: The file path to validate
Returns:
Tuple of (is_valid, error_message)
"""
# Check path length
if len(file_path) > self.MAX_PATH_LENGTH:
return False, f"Path too long: {len(file_path)} > {self.MAX_PATH_LENGTH}"
# Check for dangerous patterns
for pattern in self.DANGEROUS_PATTERNS:
if re.search(pattern, file_path, re.IGNORECASE):
return False, f"Dangerous pattern detected: {pattern}"
# Parse path
path = Path(file_path)
# Check for absolute path
if path.is_absolute():
return False, "Absolute paths not allowed"
# Check file extension
if path.suffix not in self.ALLOWED_EXTENSIONS:
return False, f"File extension not allowed: {path.suffix}"
# Check if path starts with allowed directory
parts = path.parts
if not parts:
return False, "Empty path"
if parts[0] not in self.ALLOWED_BASE_DIRS and not file_path == 'docs.json':
return False, f"Path must start with allowed directory: {self.ALLOWED_BASE_DIRS}"
# Resolve and check if path stays within base directory
try:
full_path = (self.base_dir / path).resolve()
if not full_path.is_relative_to(self.base_dir):
return False, "Path escapes base directory"
except (ValueError, RuntimeError) as e:
return False, f"Invalid path: {e}"
return True, None
def validate_file_content(self, content: str) -> Tuple[bool, Optional[str]]:
"""
Validate file content for security issues.
Args:
content: The file content to validate
Returns:
Tuple of (is_valid, error_message)
"""
# Check content length
if len(content) > self.MAX_CONTENT_LENGTH:
return False, f"Content too large: {len(content)} > {self.MAX_CONTENT_LENGTH}"
# Check for script injections in content
dangerous_content_patterns = [
r'<script[^>]*>.*?</script>', # Script tags
r'on\w+\s*=\s*["\']', # Event handlers
r'javascript:', # JavaScript protocol
r'data:text/html', # Data URLs with HTML
]
for pattern in dangerous_content_patterns:
if re.search(pattern, content, re.IGNORECASE | re.DOTALL):
return False, f"Dangerous content pattern detected"
return True, None
def validate_json_structure(self, json_data: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""
Validate JSON structure for security issues.
Args:
json_data: The JSON data to validate
Returns:
Tuple of (is_valid, error_message)
"""
def check_value(value: Any, depth: int = 0) -> Optional[str]:
"""Recursively check JSON values"""
if depth > 10:
return "JSON nesting too deep"
if isinstance(value, str):
# Check for dangerous patterns in string values
for pattern in self.DANGEROUS_PATTERNS:
if re.search(pattern, value, re.IGNORECASE):
return f"Dangerous pattern in JSON value: {pattern}"
elif isinstance(value, dict):
for k, v in value.items():
if not isinstance(k, str):
return "Non-string key in JSON"
error = check_value(v, depth + 1)
if error:
return error
elif isinstance(value, list):
for item in value:
error = check_value(item, depth + 1)
if error:
return error
return None
error = check_value(json_data)
if error:
return False, error
return True, None
def validate_sync_plan(self, sync_plan: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""
Validate a synchronization plan.
Args:
sync_plan: The sync plan to validate
Returns:
Tuple of (is_valid, error_message)
"""
# Check required fields
required_fields = ['files_to_sync', 'target_languages', 'metadata']
for field in required_fields:
if field not in sync_plan:
return False, f"Missing required field: {field}"
# Validate file count
files = sync_plan.get('files_to_sync', [])
if len(files) > self.MAX_FILES_PER_SYNC:
return False, f"Too many files: {len(files)} > {self.MAX_FILES_PER_SYNC}"
# Validate each file
for file_info in files:
if not isinstance(file_info, dict):
return False, "Invalid file info structure"
file_path = file_info.get('path')
if not file_path:
return False, "File path missing in sync plan"
valid, error = self.validate_file_path(file_path)
if not valid:
return False, f"Invalid file path in sync plan: {error}"
# Validate file size if present
if 'size' in file_info:
max_size = self.MAX_FILE_SIZE_MB * 1024 * 1024
if file_info['size'] > max_size:
return False, f"File too large: {file_path}"
# Validate target languages
valid_languages = {'zh-hans', 'ja-jp'}
target_langs = sync_plan.get('target_languages', [])
for lang in target_langs:
if lang not in valid_languages:
return False, f"Invalid target language: {lang}"
return True, None
def sanitize_path(self, file_path: str) -> Optional[str]:
"""
Sanitize a file path by removing dangerous elements.
Args:
file_path: The file path to sanitize
Returns:
Sanitized path or None if path cannot be sanitized
"""
# Remove leading/trailing whitespace
file_path = file_path.strip()
# Remove any null bytes
file_path = file_path.replace('\x00', '')
# Normalize path separators
file_path = file_path.replace('\\', '/')
# Remove double slashes
while '//' in file_path:
file_path = file_path.replace('//', '/')
# Validate the sanitized path
valid, _ = self.validate_file_path(file_path)
if not valid:
return None
return file_path
def create_safe_temp_dir(self) -> Path:
"""
Create a safe temporary directory for operations.
Returns:
Path to the temporary directory
"""
import tempfile
import secrets
# Create temp dir with random suffix
suffix = secrets.token_hex(8)
temp_dir = Path(tempfile.mkdtemp(suffix=f'-sync-{suffix}'))
# Set restrictive permissions (Unix only)
try:
os.chmod(temp_dir, 0o700)
except:
pass # Windows doesn't support chmod
return temp_dir
def calculate_file_hash(self, file_path: Path) -> str:
"""
Calculate SHA-256 hash of a file.
Args:
file_path: Path to the file
Returns:
Hex digest of the file hash
"""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def verify_artifact_integrity(self, artifact_data: bytes, expected_hash: Optional[str] = None) -> bool:
"""
Verify the integrity of an artifact.
Args:
artifact_data: The artifact data
expected_hash: Optional expected hash
Returns:
True if artifact is valid
"""
if expected_hash:
actual_hash = hashlib.sha256(artifact_data).hexdigest()
return hmac.compare_digest(actual_hash, expected_hash)
# Basic validation if no hash provided
return len(artifact_data) < self.MAX_CONTENT_LENGTH
def is_trusted_contributor(self, username: str, trusted_list: List[str] = None) -> bool:
"""
Check if a user is a trusted contributor.
Args:
username: GitHub username
trusted_list: Optional list of trusted usernames
Returns:
True if user is trusted
"""
if not trusted_list:
# Default trusted contributors (should be configured)
trusted_list = []
return username in trusted_list
def rate_limit_check(self, identifier: str, max_requests: int = 10, window_seconds: int = 60) -> bool:
"""
Simple rate limiting check (would need persistent storage in production).
Args:
identifier: Unique identifier (e.g., PR number)
max_requests: Maximum requests allowed
window_seconds: Time window in seconds
Returns:
True if within rate limit
"""
# This is a placeholder - in production, you'd use Redis or similar
# For now, always return True
return True
def create_validator(base_dir: Optional[Path] = None) -> SecurityValidator:
"""
Create a security validator instance.
Args:
base_dir: Optional base directory (defaults to script parent)
Returns:
SecurityValidator instance
"""
if base_dir is None:
base_dir = Path(__file__).parent.parent.parent
return SecurityValidator(base_dir)
# Example usage and tests
if __name__ == "__main__":
validator = create_validator()
# Test path validation
test_paths = [
"en/docs/test.md", # Valid
"../../../etc/passwd", # Invalid - directory traversal
"/etc/passwd", # Invalid - absolute path
"en/test.exe", # Invalid - wrong extension
"zh-hans/docs/test.mdx", # Valid
"docs.json", # Valid - special case
]
print("Path Validation Tests:")
for path in test_paths:
valid, error = validator.validate_file_path(path)
status = "" if valid else ""
print(f" {status} {path}: {error if error else 'Valid'}")
print("\nContent Validation Tests:")
test_contents = [
"# Normal markdown content", # Valid
"<script>alert('xss')</script>", # Invalid - script tag
"Normal text with onclick='alert()'", # Invalid - event handler
]
for content in test_contents:
valid, error = validator.validate_file_content(content)
status = "" if valid else ""
preview = content[:30] + "..." if len(content) > 30 else content
print(f" {status} {preview}: {error if error else 'Valid'}")