#!/usr/bin/env python3
"""
Security validation utilities for documentation synchronization.
Provides input validation, path sanitization, and security checks.
"""
import os
import re
import json
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple, Set
import hashlib
import hmac
class SecurityValidator:
"""Validates and sanitizes inputs for documentation synchronization"""
# Security constants
MAX_FILE_SIZE_MB = 10
MAX_FILES_PER_SYNC = 50
MAX_PATH_LENGTH = 255
MAX_CONTENT_LENGTH = 1024 * 1024 * 10 # 10MB
# Allowed file extensions
ALLOWED_EXTENSIONS = {'.md', '.mdx', '.json'}
# Dangerous patterns to block
DANGEROUS_PATTERNS = [
r'\.\.', # Directory traversal
r'^/', # Absolute paths
r'^~', # Home directory
r'\$\{', # Variable expansion
r'`', # Command substitution
r'', # Script tags
r'on\w+\s*=\s*["\']', # Event handlers
r'javascript:', # JavaScript protocol
r'data:text/html', # Data URLs with HTML
]
for pattern in dangerous_content_patterns:
if re.search(pattern, content, re.IGNORECASE | re.DOTALL):
return False, f"Dangerous content pattern detected"
return True, None
def validate_json_structure(self, json_data: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""
Validate JSON structure for security issues.
Args:
json_data: The JSON data to validate
Returns:
Tuple of (is_valid, error_message)
"""
def check_value(value: Any, depth: int = 0) -> Optional[str]:
"""Recursively check JSON values"""
if depth > 10:
return "JSON nesting too deep"
if isinstance(value, str):
# Check for dangerous patterns in string values
for pattern in self.DANGEROUS_PATTERNS:
if re.search(pattern, value, re.IGNORECASE):
return f"Dangerous pattern in JSON value: {pattern}"
elif isinstance(value, dict):
for k, v in value.items():
if not isinstance(k, str):
return "Non-string key in JSON"
error = check_value(v, depth + 1)
if error:
return error
elif isinstance(value, list):
for item in value:
error = check_value(item, depth + 1)
if error:
return error
return None
error = check_value(json_data)
if error:
return False, error
return True, None
def validate_sync_plan(self, sync_plan: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""
Validate a synchronization plan.
Args:
sync_plan: The sync plan to validate
Returns:
Tuple of (is_valid, error_message)
"""
# Check required fields
required_fields = ['files_to_sync', 'target_languages', 'metadata']
for field in required_fields:
if field not in sync_plan:
return False, f"Missing required field: {field}"
# Validate file count
files = sync_plan.get('files_to_sync', [])
if len(files) > self.MAX_FILES_PER_SYNC:
return False, f"Too many files: {len(files)} > {self.MAX_FILES_PER_SYNC}"
# Validate each file
for file_info in files:
if not isinstance(file_info, dict):
return False, "Invalid file info structure"
file_path = file_info.get('path')
if not file_path:
return False, "File path missing in sync plan"
valid, error = self.validate_file_path(file_path)
if not valid:
return False, f"Invalid file path in sync plan: {error}"
# Validate file size if present
if 'size' in file_info:
max_size = self.MAX_FILE_SIZE_MB * 1024 * 1024
if file_info['size'] > max_size:
return False, f"File too large: {file_path}"
# Validate target languages
target_langs = sync_plan.get('target_languages', [])
for lang in target_langs:
if lang not in self.valid_languages:
return False, f"Invalid target language: {lang}"
return True, None
def sanitize_path(self, file_path: str) -> Optional[str]:
"""
Sanitize a file path by removing dangerous elements.
Args:
file_path: The file path to sanitize
Returns:
Sanitized path or None if path cannot be sanitized
"""
# Remove leading/trailing whitespace
file_path = file_path.strip()
# Remove any null bytes
file_path = file_path.replace('\x00', '')
# Normalize path separators
file_path = file_path.replace('\\', '/')
# Remove double slashes
while '//' in file_path:
file_path = file_path.replace('//', '/')
# Validate the sanitized path
valid, _ = self.validate_file_path(file_path)
if not valid:
return None
return file_path
def create_safe_temp_dir(self) -> Path:
"""
Create a safe temporary directory for operations.
Returns:
Path to the temporary directory
"""
import tempfile
import secrets
# Create temp dir with random suffix
suffix = secrets.token_hex(8)
temp_dir = Path(tempfile.mkdtemp(suffix=f'-sync-{suffix}'))
# Set restrictive permissions (Unix only)
try:
os.chmod(temp_dir, 0o700)
except:
pass # Windows doesn't support chmod
return temp_dir
def calculate_file_hash(self, file_path: Path) -> str:
"""
Calculate SHA-256 hash of a file.
Args:
file_path: Path to the file
Returns:
Hex digest of the file hash
"""
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
def verify_artifact_integrity(self, artifact_data: bytes, expected_hash: Optional[str] = None) -> bool:
"""
Verify the integrity of an artifact.
Args:
artifact_data: The artifact data
expected_hash: Optional expected hash
Returns:
True if artifact is valid
"""
if expected_hash:
actual_hash = hashlib.sha256(artifact_data).hexdigest()
return hmac.compare_digest(actual_hash, expected_hash)
# Basic validation if no hash provided
return len(artifact_data) < self.MAX_CONTENT_LENGTH
def is_trusted_contributor(self, username: str, trusted_list: List[str] = None) -> bool:
"""
Check if a user is a trusted contributor.
Args:
username: GitHub username
trusted_list: Optional list of trusted usernames
Returns:
True if user is trusted
"""
if not trusted_list:
# Default trusted contributors (should be configured)
trusted_list = []
return username in trusted_list
def rate_limit_check(self, identifier: str, max_requests: int = 10, window_seconds: int = 60) -> bool:
"""
Simple rate limiting check (would need persistent storage in production).
Args:
identifier: Unique identifier (e.g., PR number)
max_requests: Maximum requests allowed
window_seconds: Time window in seconds
Returns:
True if within rate limit
"""
# This is a placeholder - in production, you'd use Redis or similar
# For now, always return True
return True
def create_validator(base_dir: Optional[Path] = None) -> SecurityValidator:
"""
Create a security validator instance.
Args:
base_dir: Optional base directory (defaults to script parent)
Returns:
SecurityValidator instance
"""
if base_dir is None:
base_dir = Path(__file__).parent.parent.parent
return SecurityValidator(base_dir)
# Example usage and tests
if __name__ == "__main__":
validator = create_validator()
# Test path validation
test_paths = [
"en/docs/test.md", # Valid
"../../../etc/passwd", # Invalid - directory traversal
"/etc/passwd", # Invalid - absolute path
"en/test.exe", # Invalid - wrong extension
"zh/docs/test.mdx", # Valid
"docs.json", # Valid - special case
]
print("Path Validation Tests:")
for path in test_paths:
valid, error = validator.validate_file_path(path)
status = "✓" if valid else "✗"
print(f" {status} {path}: {error if error else 'Valid'}")
print("\nContent Validation Tests:")
test_contents = [
"# Normal markdown content", # Valid
"", # Invalid - script tag
"Normal text with onclick='alert()'", # Invalid - event handler
]
for content in test_contents:
valid, error = validator.validate_file_content(content)
status = "✓" if valid else "✗"
preview = content[:30] + "..." if len(content) > 30 else content
print(f" {status} {preview}: {error if error else 'Valid'}")