Files
SuperClaude_Framework/scripts/sync_from_framework.py

960 lines
33 KiB
Python
Executable File

#!/usr/bin/env python3
"""
SuperClaude Framework Sync Script
Automated pull-sync with namespace isolation for Plugin distribution
This script synchronizes content from SuperClaude_Framework repository and
transforms it for distribution as a Claude Code plugin with proper namespace
isolation (sc: prefix for commands, sc- prefix for filenames).
Usage:
python scripts/sync_from_framework.py [OPTIONS]
Options:
--framework-repo URL Framework repository URL
--plugin-root PATH Plugin repository root path
--dry-run Preview changes without applying
--output-report PATH Save sync report to file
"""
import sys
import argparse
import tempfile
import shutil
import hashlib
from pathlib import Path
from typing import Dict, List, Tuple, Optional
import json
import re
import subprocess
from dataclasses import dataclass, asdict
from datetime import datetime
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class ProtectionViolationError(RuntimeError):
"""Raised when sync would overwrite a Plugin-owned file listed in PROTECTED_PATHS."""
pass
@dataclass
class SyncResult:
"""Results from sync operation."""
success: bool
timestamp: str
framework_commit: str
framework_version: str
files_synced: int
files_modified: int
commands_transformed: int
agents_transformed: int
mcp_servers_merged: int
warnings: List[str]
errors: List[str]
def to_dict(self) -> dict:
return asdict(self)
class ContentTransformer:
"""Transforms Framework content for Plugin namespace."""
# Regex patterns for transformation
COMMAND_HEADER_PATTERN = re.compile(r'^(#+\s+)/(\w+)', re.MULTILINE)
COMMAND_REF_PATTERN = re.compile(r'(?<![/\w])/(\w+)(?=\s|$|:|`|\)|\])')
LINK_REF_PATTERN = re.compile(r'\[/(\w+)\]')
FRONTMATTER_NAME_PATTERN = re.compile(r'^name:\s*(.+)$', re.MULTILINE)
@staticmethod
def transform_command(content: str, filename: str) -> str:
"""
Transform command content for sc: namespace.
Transformations:
- Header: # /brainstorm → # /sc:brainstorm
- References: /analyze → /sc:analyze
- Links: [/task] → [/sc:task]
Args:
content: Original command file content
filename: Command filename (for logging)
Returns:
Transformed content with sc: namespace
"""
logger.debug(f"Transforming command: {filename}")
# Transform main header
content = ContentTransformer.COMMAND_HEADER_PATTERN.sub(
r'\1/sc:\2',
content
)
# Transform command references in text
content = ContentTransformer.COMMAND_REF_PATTERN.sub(
r'/sc:\1',
content
)
# Transform command references in links
content = ContentTransformer.LINK_REF_PATTERN.sub(
r'[/sc:\1]',
content
)
return content
@staticmethod
def transform_agent(content: str, filename: str) -> str:
"""
Transform agent frontmatter name.
Transformations:
- name: backend-architect → name: sc-backend-architect
Args:
content: Original agent file content
filename: Agent filename (for logging)
Returns:
Transformed content with sc- prefix in name field
"""
logger.debug(f"Transforming agent: {filename}")
# Parse frontmatter
frontmatter_pattern = re.compile(
r'^---\n(.*?)\n---',
re.DOTALL | re.MULTILINE
)
match = frontmatter_pattern.search(content)
if not match:
logger.warning(f"No frontmatter found in agent: {filename}")
return content
frontmatter = match.group(1)
# Transform name field (add sc- prefix if not already present)
def add_prefix(match):
name = match.group(1).strip()
if not name.startswith('sc-'):
return f'name: sc-{name}'
return match.group(0)
frontmatter = ContentTransformer.FRONTMATTER_NAME_PATTERN.sub(
add_prefix,
frontmatter
)
# Replace frontmatter
content = frontmatter_pattern.sub(
f'---\n{frontmatter}\n---',
content,
count=1
)
return content
class FileSyncer:
"""Handles file synchronization with git integration."""
def __init__(self, plugin_root: Path, dry_run: bool = False):
self.plugin_root = plugin_root
self.dry_run = dry_run
self.git_available = self._check_git()
def _check_git(self) -> bool:
"""Check if git is available and repo is initialized."""
try:
subprocess.run(
['git', 'rev-parse', '--git-dir'],
cwd=self.plugin_root,
capture_output=True,
check=True
)
return True
except (subprocess.CalledProcessError, FileNotFoundError):
logger.warning("Git not available - file operations will not preserve history")
return False
def sync_directory(
self,
source_dir: Path,
dest_dir: Path,
filename_prefix: str = "",
transform_fn=None
) -> Dict[str, int]:
"""
Sync directory with namespace prefix and transformation.
Args:
source_dir: Source directory path
dest_dir: Destination directory path
filename_prefix: Prefix to add to filenames (e.g., 'sc-')
transform_fn: Optional content transformation function
Returns:
Statistics dict with counts of synced/modified files
"""
stats = {'synced': 0, 'modified': 0, 'renamed': 0}
if not source_dir.exists():
logger.warning(f"Source directory not found: {source_dir}")
return stats
dest_dir.mkdir(parents=True, exist_ok=True)
# Get existing files in dest (with sc- prefix)
existing_files = {f.name: f for f in dest_dir.glob('*.md')}
synced_files = set()
for source_file in source_dir.glob('*.md'):
# Apply filename prefix
new_name = f"{filename_prefix}{source_file.name}"
synced_files.add(new_name)
dest_file = dest_dir / new_name
# Read and transform content
content = source_file.read_text(encoding='utf-8')
if transform_fn:
content = transform_fn(content, source_file.name)
# Check if file exists with different name (needs git mv)
old_unprefixed = source_file.name
old_file_path = dest_dir / old_unprefixed
if old_file_path.exists() and new_name != old_unprefixed:
# File needs renaming: use git mv to preserve history
if self.git_available:
self._git_mv(old_file_path, dest_file)
stats['renamed'] += 1
else:
# Fallback to regular rename
if not self.dry_run:
old_file_path.rename(dest_file)
stats['renamed'] += 1
logger.info(f" 📝 Renamed: {old_unprefixed}{new_name}")
# Write content
if not self.dry_run:
dest_file.write_text(content, encoding='utf-8')
if dest_file.exists():
stats['modified'] += 1
else:
stats['synced'] += 1
# Remove files that no longer exist in source
# (only remove files with prefix that aren't in synced set)
for filename, filepath in existing_files.items():
if filename.startswith(filename_prefix) and filename not in synced_files:
if not self.dry_run:
filepath.unlink()
logger.info(f" 🗑️ Removed: {filepath.relative_to(self.plugin_root)}")
return stats
def _git_mv(self, old_path: Path, new_path: Path):
"""Use git mv to preserve history."""
if self.dry_run:
logger.info(f" [DRY RUN] git mv {old_path.name} {new_path.name}")
return
try:
subprocess.run(
['git', 'mv', str(old_path), str(new_path)],
cwd=self.plugin_root,
check=True,
capture_output=True
)
logger.info(f" 📝 Renamed (git mv): {old_path.name}{new_path.name}")
except subprocess.CalledProcessError as e:
# Fallback to regular rename
logger.warning(f" ⚠️ Git mv failed, using regular rename: {e}")
old_path.rename(new_path)
def copy_directory(self, source_dir: Path, dest_dir: Path) -> int:
"""
Copy directory contents as-is (no transformation).
Args:
source_dir: Source directory path
dest_dir: Destination directory path
Returns:
Number of files copied
"""
if not source_dir.exists():
logger.warning(f"Source directory not found: {source_dir}")
return 0
dest_dir.mkdir(parents=True, exist_ok=True)
count = 0
for source_file in source_dir.glob('**/*'):
if source_file.is_file():
rel_path = source_file.relative_to(source_dir)
dest_file = dest_dir / rel_path
dest_file.parent.mkdir(parents=True, exist_ok=True)
if not self.dry_run:
shutil.copy2(source_file, dest_file)
count += 1
logger.debug(f" 📄 Copied: {rel_path}")
return count
class PluginJsonGenerator:
"""Generates .claude-plugin/plugin.json from synced commands."""
def __init__(self, plugin_root: Path):
self.plugin_root = plugin_root
def generate(self, framework_version: str) -> dict:
"""
Generate plugin.json with command mappings.
Args:
framework_version: Version from Framework repository
Returns:
Complete plugin.json dictionary
"""
commands_dir = self.plugin_root / 'commands'
# Base metadata from existing plugin.json
root_plugin_json = self.plugin_root / 'plugin.json'
if root_plugin_json.exists():
base_metadata = json.loads(root_plugin_json.read_text())
else:
base_metadata = {
"name": "sc",
"description": "SuperClaude Plugin",
"author": {"name": "SuperClaude Team"},
"license": "MIT"
}
# Build command mappings
commands = {}
if commands_dir.exists():
for cmd_file in sorted(commands_dir.glob('sc-*.md')):
# Extract command name from filename
# sc-brainstorm.md → brainstorm
cmd_name = cmd_file.stem.replace('sc-', '')
# Map sc:brainstorm to path
commands[f"sc:{cmd_name}"] = f"commands/{cmd_file.name}"
plugin_json = {
"name": "sc",
"version": framework_version,
"description": base_metadata.get("description", ""),
"author": base_metadata.get("author", {}),
"homepage": base_metadata.get("homepage", ""),
"repository": base_metadata.get("repository", ""),
"license": base_metadata.get("license", "MIT"),
"keywords": base_metadata.get("keywords", [])
}
logger.info(f"✅ Generated plugin.json with {len(commands)} commands")
return plugin_json
def write(self, plugin_json: dict, dry_run: bool = False):
"""Write plugin.json to .claude-plugin/ directory."""
output_path = self.plugin_root / '.claude-plugin' / 'plugin.json'
output_path.parent.mkdir(parents=True, exist_ok=True)
if dry_run:
logger.info(f"[DRY RUN] Would write plugin.json to: {output_path}")
logger.info(json.dumps(plugin_json, indent=2))
return
output_path.write_text(
json.dumps(plugin_json, indent=2) + '\n',
encoding='utf-8'
)
logger.info(f"✅ Written: {output_path}")
class McpMerger:
"""Safely merges MCP server configurations."""
def __init__(self, plugin_root: Path):
self.plugin_root = plugin_root
def merge(
self,
framework_mcp: dict,
plugin_mcp: dict
) -> Tuple[dict, List[str]]:
"""
Merge MCP configurations with conflict detection.
Strategy:
- Framework servers take precedence
- Preserve Plugin-specific servers
- Log warnings for conflicts
Args:
framework_mcp: MCP servers from Framework
plugin_mcp: MCP servers from Plugin
Returns:
(merged_config, warnings)
"""
merged = {}
warnings = []
# Add Framework servers (source of truth)
for name, config in framework_mcp.items():
merged[name] = config
# Add Plugin-specific servers if not in Framework
for name, config in plugin_mcp.items():
if name not in merged:
merged[name] = config
warnings.append(
f"Preserved plugin-specific MCP server: {name}"
)
else:
# Check if configurations differ
if config != merged[name]:
warnings.append(
f"MCP server '{name}' conflict - using Framework version"
)
return merged, warnings
def backup_current(self) -> Optional[Path]:
"""Create backup of current plugin.json."""
plugin_json = self.plugin_root / 'plugin.json'
if not plugin_json.exists():
return None
backup_dir = self.plugin_root / 'backups'
backup_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = backup_dir / f'plugin.json.{timestamp}.backup'
shutil.copy2(plugin_json, backup_path)
logger.info(f"📦 Backup created: {backup_path}")
return backup_path
class FrameworkSyncer:
"""Main orchestrator for Framework → Plugin sync."""
# ── SYNC MAPPINGS ──────────────────────────────────────────────────────────
# What to pull from Framework and transform for Plugin distribution.
# Symmetric pair with PROTECTED_PATHS below: a path appears in one or the other,
# never both.
SYNC_MAPPINGS = {
"src/superclaude/commands": "commands", # /cmd → /sc:cmd, sc- prefix
"src/superclaude/agents": "agents", # name → sc-name in frontmatter
# core/ and modes/ are intentionally absent — they live in PROTECTED_PATHS
}
# ── PROTECTED PATHS ────────────────────────────────────────────────────────
# Plugin-owned files and directories that must NEVER be overwritten by sync,
# regardless of what the Framework contains.
#
# Algorithm: before sync → hash all protected paths → after sync → re-hash
# and raise ProtectionViolationError if anything changed.
#
# To move a path from protected to synced: remove it here, add to SYNC_MAPPINGS.
PROTECTED_PATHS: List[str] = [
# Plugin-specific documentation (Plugin spec, not Framework spec)
"README.md",
"README-ja.md",
"README-zh.md",
"BACKUP_GUIDE.md",
"MIGRATION_GUIDE.md",
"SECURITY.md",
"CLAUDE.md",
"LICENSE",
".gitignore",
# Plugin configuration & marketplace metadata
".claude-plugin/",
# Plugin infrastructure (workflows, scripts, tests are Plugin-owned)
".github/",
"docs/",
"scripts/",
"tests/",
"backups/",
# Plugin-customized behavioral content
# Plugin maintains its own tuned versions; Framework versions are ignored.
"core/",
"modes/",
]
def __init__(
self,
framework_repo: str,
plugin_root: Path,
dry_run: bool = False
):
self.framework_repo = framework_repo
self.plugin_root = plugin_root
self.dry_run = dry_run
self.temp_dir = None
self.warnings = []
self.errors = []
def sync(self) -> SyncResult:
"""Execute full sync workflow."""
try:
logger.info("🔄 Starting Framework sync...")
# Step 1: Clone Framework
framework_path = self._clone_framework()
framework_commit = self._get_commit_hash(framework_path)
framework_version = self._get_version(framework_path)
logger.info(f"📦 Framework version: {framework_version}")
logger.info(f"📝 Framework commit: {framework_commit[:8]}")
# Step 2: Snapshot protected files BEFORE any changes
protection_snapshot = self._snapshot_protected_files()
# Step 3: Create backup
self._create_backup()
# Step 4: Transform and sync content
stats = self._sync_content(framework_path)
# Step 5: Verify protected files were NOT touched
self._validate_protected_files(protection_snapshot)
# Step 6: Generate plugin.json
self._generate_plugin_json(framework_version)
# Step 7: Merge MCP configurations
mcp_merged = self._merge_mcp_configs(framework_path)
# Step 8: Validate sync results
self._validate_sync()
logger.info("✅ Sync completed successfully!")
return SyncResult(
success=True,
timestamp=datetime.now().isoformat(),
framework_commit=framework_commit,
framework_version=framework_version,
files_synced=stats['files_synced'],
files_modified=stats['files_modified'],
commands_transformed=stats['commands'],
agents_transformed=stats['agents'],
mcp_servers_merged=mcp_merged,
warnings=self.warnings,
errors=self.errors
)
except ProtectionViolationError as e:
# Protection violations are logged already; surface them clearly in the report
self.errors.append(str(e))
return SyncResult(
success=False,
timestamp=datetime.now().isoformat(),
framework_commit="",
framework_version="",
files_synced=0,
files_modified=0,
commands_transformed=0,
agents_transformed=0,
mcp_servers_merged=0,
warnings=self.warnings,
errors=self.errors
)
except Exception as e:
logger.error(f"❌ Sync failed: {e}", exc_info=True)
self.errors.append(str(e))
return SyncResult(
success=False,
timestamp=datetime.now().isoformat(),
framework_commit="",
framework_version="",
files_synced=0,
files_modified=0,
commands_transformed=0,
agents_transformed=0,
mcp_servers_merged=0,
warnings=self.warnings,
errors=self.errors
)
finally:
self._cleanup()
# ── Protection helpers ─────────────────────────────────────────────────────
@staticmethod
def _hash_file(path: Path) -> str:
"""Return SHA-256 hex digest of a file's contents."""
h = hashlib.sha256()
h.update(path.read_bytes())
return h.hexdigest()
def _snapshot_protected_files(self) -> Dict[str, str]:
"""
Hash every file that lives under a PROTECTED_PATHS entry.
Called BEFORE sync begins so we have a baseline to compare against.
Returns:
Mapping of relative-path-string → SHA-256 hex digest.
"""
snapshot: Dict[str, str] = {}
for protected in self.PROTECTED_PATHS:
target = self.plugin_root / protected
if target.is_file():
rel = protected
snapshot[rel] = self._hash_file(target)
elif target.is_dir():
for f in sorted(target.rglob('*')):
if f.is_file():
rel = str(f.relative_to(self.plugin_root))
snapshot[rel] = self._hash_file(f)
logger.info(f"🔒 Protection snapshot: {len(snapshot)} Plugin-owned files hashed")
return snapshot
def _validate_protected_files(self, snapshot: Dict[str, str]) -> None:
"""
Re-hash every file from the snapshot and compare.
Called AFTER sync to verify no protected file was touched.
Raises:
ProtectionViolationError: if any protected file was modified or deleted.
"""
violations: List[str] = []
for rel_path, original_hash in snapshot.items():
current = self.plugin_root / rel_path
if not current.exists():
violations.append(f"DELETED : {rel_path}")
else:
current_hash = self._hash_file(current)
if current_hash != original_hash:
violations.append(f"MODIFIED : {rel_path}")
if violations:
msg = (
"🚨 PROTECTION VIOLATION — sync modified Plugin-owned files:\n"
+ "\n".join(f"{v}" for v in violations)
+ "\n\nFix: ensure SYNC_MAPPINGS does not target any path in PROTECTED_PATHS."
)
logger.error(msg)
raise ProtectionViolationError(msg)
logger.info(f"🔒 Protection check passed — {len(snapshot)} Plugin-owned files unchanged")
# ── Core sync workflow ─────────────────────────────────────────────────────
def _clone_framework(self) -> Path:
"""Clone Framework repository to temp directory."""
logger.info(f"📥 Cloning Framework: {self.framework_repo}")
self.temp_dir = tempfile.mkdtemp(prefix='superclaude_framework_')
framework_path = Path(self.temp_dir) / 'framework'
try:
subprocess.run(
['git', 'clone', '--depth', '1', self.framework_repo, str(framework_path)],
check=True,
capture_output=True,
text=True
)
logger.info(f"✅ Cloned to: {framework_path}")
return framework_path
except subprocess.CalledProcessError as e:
logger.error(f"Failed to clone Framework: {e.stderr}")
raise
def _get_commit_hash(self, repo_path: Path) -> str:
"""Get current commit hash from repository."""
try:
result = subprocess.run(
['git', 'rev-parse', 'HEAD'],
cwd=repo_path,
check=True,
capture_output=True,
text=True
)
return result.stdout.strip()
except subprocess.CalledProcessError:
return "unknown"
def _get_version(self, framework_path: Path) -> str:
"""Extract version from Framework."""
# Try to read version from plugin.json or package.json
for version_file in ['plugin.json', 'package.json']:
version_path = framework_path / version_file
if version_path.exists():
try:
data = json.loads(version_path.read_text())
if 'version' in data:
return data['version']
except (json.JSONDecodeError, KeyError):
continue
# Fallback to current Plugin version
plugin_json = self.plugin_root / 'plugin.json'
if plugin_json.exists():
try:
data = json.loads(plugin_json.read_text())
return data.get('version', '1.0.0')
except json.JSONDecodeError:
pass
return '1.0.0'
def _create_backup(self):
"""Create backup of current plugin state."""
logger.info("📦 Creating backup...")
mcp_merger = McpMerger(self.plugin_root)
backup_path = mcp_merger.backup_current()
if backup_path:
logger.info(f"✅ Backup created: {backup_path}")
def _sync_content(self, framework_path: Path) -> Dict[str, int]:
"""Sync and transform content from Framework."""
logger.info("🔄 Syncing content...")
file_syncer = FileSyncer(self.plugin_root, self.dry_run)
stats = {
'files_synced': 0,
'files_modified': 0,
'commands': 0,
'agents': 0
}
# Sync commands with transformation
logger.info("📝 Syncing commands...")
source_commands = framework_path / 'src/superclaude/commands'
dest_commands = self.plugin_root / 'commands'
if source_commands.exists():
cmd_stats = file_syncer.sync_directory(
source_commands,
dest_commands,
filename_prefix='sc-',
transform_fn=ContentTransformer.transform_command
)
stats['commands'] = cmd_stats['synced'] + cmd_stats['modified']
stats['files_synced'] += cmd_stats['synced']
stats['files_modified'] += cmd_stats['modified']
logger.info(f"✅ Commands: {stats['commands']} transformed")
# Sync agents with transformation
logger.info("📝 Syncing agents...")
source_agents = framework_path / 'src/superclaude/agents'
dest_agents = self.plugin_root / 'agents'
if source_agents.exists():
agent_stats = file_syncer.sync_directory(
source_agents,
dest_agents,
filename_prefix='sc-',
transform_fn=ContentTransformer.transform_agent
)
stats['agents'] = agent_stats['synced'] + agent_stats['modified']
stats['files_synced'] += agent_stats['synced']
stats['files_modified'] += agent_stats['modified']
logger.info(f"✅ Agents: {stats['agents']} transformed")
# core/ and modes/ are in PROTECTED_PATHS — Plugin maintains its own versions.
# They are intentionally excluded from SYNC_MAPPINGS and will never be
# overwritten here. To re-enable Framework sync for either directory,
# remove it from PROTECTED_PATHS and add it back to SYNC_MAPPINGS.
logger.info("🔒 core/ and modes/ are Plugin-owned (PROTECTED_PATHS) — skipping")
return stats
def _generate_plugin_json(self, framework_version: str):
"""Generate plugin.json from synced commands."""
logger.info("📄 Generating plugin.json...")
generator = PluginJsonGenerator(self.plugin_root)
plugin_json = generator.generate(framework_version)
generator.write(plugin_json, self.dry_run)
def _merge_mcp_configs(self, framework_path: Path) -> int:
"""Merge MCP configurations from Framework."""
logger.info("🔗 Merging MCP configurations...")
# Read Framework MCP config
framework_plugin_json = framework_path / 'plugin.json'
framework_mcp = {}
if framework_plugin_json.exists():
try:
data = json.loads(framework_plugin_json.read_text())
framework_mcp = data.get('mcpServers', {})
except json.JSONDecodeError:
logger.warning("Failed to read Framework plugin.json")
# Read Plugin MCP config
plugin_json_path = self.plugin_root / 'plugin.json'
plugin_mcp = {}
if plugin_json_path.exists():
try:
data = json.loads(plugin_json_path.read_text())
plugin_mcp = data.get('mcpServers', {})
except json.JSONDecodeError:
logger.warning("Failed to read Plugin plugin.json")
# Merge configurations
merger = McpMerger(self.plugin_root)
merged_mcp, warnings = merger.merge(framework_mcp, plugin_mcp)
# Log warnings
for warning in warnings:
logger.warning(f"⚠️ {warning}")
self.warnings.append(warning)
# Update plugin.json with merged MCP config
if not self.dry_run and plugin_json_path.exists():
data = json.loads(plugin_json_path.read_text())
data['mcpServers'] = merged_mcp
plugin_json_path.write_text(
json.dumps(data, indent=2) + '\n',
encoding='utf-8'
)
logger.info(f"✅ MCP servers merged: {len(merged_mcp)}")
return len(merged_mcp)
def _validate_sync(self):
"""Validate sync results."""
logger.info("🔍 Validating sync...")
# Check commands directory
commands_dir = self.plugin_root / 'commands'
if commands_dir.exists():
sc_commands = list(commands_dir.glob('sc-*.md'))
logger.info(f"✅ Found {len(sc_commands)} sc- prefixed commands")
# Check agents directory
agents_dir = self.plugin_root / 'agents'
if agents_dir.exists():
sc_agents = list(agents_dir.glob('sc-*.md'))
logger.info(f"✅ Found {len(sc_agents)} sc- prefixed agents")
# Check plugin.json
plugin_json_path = self.plugin_root / '.claude-plugin' / 'plugin.json'
if plugin_json_path.exists():
logger.info(f"✅ plugin.json exists at {plugin_json_path}")
else:
logger.warning("⚠️ plugin.json not found")
def _cleanup(self):
"""Clean up temporary directories."""
if self.temp_dir and Path(self.temp_dir).exists():
shutil.rmtree(self.temp_dir)
logger.debug(f"🧹 Cleaned up temp directory: {self.temp_dir}")
def main():
"""Main entry point."""
parser = argparse.ArgumentParser(
description='Sync SuperClaude Framework to Plugin with namespace isolation'
)
parser.add_argument(
'--framework-repo',
default='https://github.com/SuperClaude-Org/SuperClaude_Framework',
help='Framework repository URL'
)
parser.add_argument(
'--plugin-root',
type=Path,
default=Path.cwd(),
help='Plugin repository root path'
)
parser.add_argument(
'--dry-run',
type=lambda x: x.lower() in ('true', '1', 'yes'),
default=False,
help='Preview changes without applying'
)
parser.add_argument(
'--output-report',
type=Path,
help='Save sync report to file'
)
parser.add_argument(
'--verbose',
action='store_true',
help='Enable verbose logging'
)
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
if args.dry_run:
logger.info("🔍 DRY RUN MODE - No changes will be applied")
# Run sync
syncer = FrameworkSyncer(
framework_repo=args.framework_repo,
plugin_root=args.plugin_root,
dry_run=args.dry_run
)
result = syncer.sync()
# Output report
if args.output_report:
args.output_report.write_text(
json.dumps(result.to_dict(), indent=2) + '\n'
)
logger.info(f"📊 Report saved to: {args.output_report}")
# Print summary
print("\n" + "=" * 60)
print("SYNC SUMMARY")
print("=" * 60)
print(f"Success: {result.success}")
print(f"Framework Version: {result.framework_version}")
print(f"Framework Commit: {result.framework_commit[:8]}")
print(f"Files Synced: {result.files_synced}")
print(f"Files Modified: {result.files_modified}")
print(f"Commands Transformed: {result.commands_transformed}")
print(f"Agents Transformed: {result.agents_transformed}")
print(f"MCP Servers Merged: {result.mcp_servers_merged}")
if result.warnings:
print(f"\n⚠️ Warnings: {len(result.warnings)}")
for warning in result.warnings:
print(f" - {warning}")
if result.errors:
print(f"\n❌ Errors: {len(result.errors)}")
for error in result.errors:
print(f" - {error}")
print("=" * 60)
sys.exit(0 if result.success else 1)
if __name__ == '__main__':
main()