| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369 |
- """Repository management module for syncing library repositories."""
- from __future__ import annotations
- import logging
- import subprocess
- from pathlib import Path
- from typing import Optional
- from rich.console import Console
- from rich.panel import Panel
- from rich.progress import Progress, SpinnerColumn, TextColumn
- from rich.table import Table
- from typer import Argument, Option, Typer
- from ..core.config import ConfigManager
- from ..core.exceptions import ConfigError
- logger = logging.getLogger(__name__)
- console = Console()
- console_err = Console(stderr=True)
- app = Typer(help="Manage library repositories")
- def _run_git_command(args: list[str], cwd: Optional[Path] = None) -> tuple[bool, str, str]:
- """Run a git command and return the result.
-
- Args:
- args: Git command arguments (without 'git' prefix)
- cwd: Working directory for the command
-
- Returns:
- Tuple of (success, stdout, stderr)
- """
- try:
- result = subprocess.run(
- ["git"] + args,
- cwd=cwd,
- capture_output=True,
- text=True,
- timeout=300 # 5 minute timeout
- )
- return result.returncode == 0, result.stdout, result.stderr
- except subprocess.TimeoutExpired:
- return False, "", "Command timed out after 5 minutes"
- except FileNotFoundError:
- return False, "", "Git command not found. Please install git."
- except Exception as e:
- return False, "", str(e)
- def _clone_or_pull_repo(name: str, url: str, target_path: Path, branch: Optional[str] = None, sparse_dir: Optional[str] = None) -> tuple[bool, str]:
- """Clone or pull a git repository with optional sparse-checkout.
-
- Args:
- name: Library name
- url: Git repository URL
- target_path: Target directory for the repository
- branch: Git branch to clone/pull (optional)
- sparse_dir: Directory to sparse-checkout (optional, use None or "." for full clone)
-
- Returns:
- Tuple of (success, message)
- """
- if target_path.exists() and (target_path / ".git").exists():
- # Repository exists, pull updates
- logger.debug(f"Pulling updates for library '{name}' at {target_path}")
-
- # If branch is specified, checkout the branch first
- if branch:
- success, stdout, stderr = _run_git_command(["checkout", branch], cwd=target_path)
- if not success:
- logger.warning(f"Failed to checkout branch '{branch}' for library '{name}': {stderr}")
-
- success, stdout, stderr = _run_git_command(["pull", "--ff-only"], cwd=target_path)
-
- if success:
- # Check if anything was updated
- if "Already up to date" in stdout or "Already up-to-date" in stdout:
- return True, "Already up to date"
- else:
- return True, "Updated successfully"
- else:
- error_msg = stderr or stdout
- logger.error(f"Failed to pull library '{name}': {error_msg}")
- return False, f"Pull failed: {error_msg}"
- else:
- # Repository doesn't exist, clone it
- logger.debug(f"Cloning library '{name}' from {url} to {target_path}")
-
- # Ensure parent directory exists
- target_path.parent.mkdir(parents=True, exist_ok=True)
-
- # Determine if we should use sparse-checkout
- use_sparse = sparse_dir and sparse_dir != "."
-
- if use_sparse:
- # Use sparse-checkout to clone only specific directory
- logger.debug(f"Using sparse-checkout for directory: {sparse_dir}")
-
- # Initialize empty repo
- success, stdout, stderr = _run_git_command(["init"], cwd=None)
- if success:
- # Create target directory
- target_path.mkdir(parents=True, exist_ok=True)
-
- # Initialize git repo
- success, stdout, stderr = _run_git_command(["init"], cwd=target_path)
- if not success:
- return False, f"Failed to initialize repo: {stderr or stdout}"
-
- # Add remote
- success, stdout, stderr = _run_git_command(["remote", "add", "origin", url], cwd=target_path)
- if not success:
- return False, f"Failed to add remote: {stderr or stdout}"
-
- # Enable sparse-checkout
- success, stdout, stderr = _run_git_command(["config", "core.sparseCheckout", "true"], cwd=target_path)
- if not success:
- return False, f"Failed to enable sparse-checkout: {stderr or stdout}"
-
- # Create sparse-checkout file
- sparse_checkout_file = target_path / ".git" / "info" / "sparse-checkout"
- sparse_checkout_file.parent.mkdir(parents=True, exist_ok=True)
- with open(sparse_checkout_file, "w") as f:
- f.write(f"{sparse_dir}/*\n")
-
- # Pull specific branch with sparse-checkout
- pull_args = ["pull", "--depth", "1", "origin"]
- if branch:
- pull_args.append(branch)
- else:
- pull_args.append("main")
-
- success, stdout, stderr = _run_git_command(pull_args, cwd=target_path)
- if not success:
- return False, f"Sparse-checkout failed: {stderr or stdout}"
-
- # Move contents of sparse directory to root
- sparse_path = target_path / sparse_dir
- if sparse_path.exists() and sparse_path.is_dir():
- # Move all contents from sparse_dir to target_path root
- import shutil
- for item in sparse_path.iterdir():
- dest = target_path / item.name
- if dest.exists():
- if dest.is_dir():
- shutil.rmtree(dest)
- else:
- dest.unlink()
- shutil.move(str(item), str(target_path))
-
- # Remove empty sparse directory
- sparse_path.rmdir()
-
- return True, "Cloned successfully (sparse)"
- else:
- return False, f"Failed to initialize: {stderr or stdout}"
- else:
- # Regular full clone
- clone_args = ["clone", "--depth", "1"]
- if branch:
- clone_args.extend(["--branch", branch])
- clone_args.extend([url, str(target_path)])
-
- success, stdout, stderr = _run_git_command(clone_args)
-
- if success:
- return True, "Cloned successfully"
- else:
- error_msg = stderr or stdout
- logger.error(f"Failed to clone library '{name}': {error_msg}")
- return False, f"Clone failed: {error_msg}"
- @app.command()
- def update(
- library_name: Optional[str] = Argument(
- None,
- help="Name of specific library to update (updates all if not specified)"
- ),
- verbose: bool = Option(False, "--verbose", "-v", help="Show detailed output")
- ) -> None:
- """Update library repositories by cloning or pulling from git.
-
- This command syncs all configured libraries from their git repositories.
- If a library doesn't exist locally, it will be cloned. If it exists, it will be pulled.
- """
- config = ConfigManager()
- libraries = config.get_libraries()
-
- if not libraries:
- console.print("[yellow]No libraries configured.[/yellow]")
- console.print("Libraries are auto-configured on first run with a default library.")
- return
-
- # Filter to specific library if requested
- if library_name:
- libraries = [lib for lib in libraries if lib.get("name") == library_name]
- if not libraries:
- console_err.print(f"[red]Error:[/red] Library '{library_name}' not found in configuration")
- return
-
- libraries_path = config.get_libraries_path()
-
- # Create results table
- results = []
-
- with Progress(
- SpinnerColumn(),
- TextColumn("[progress.description]{task.description}"),
- console=console,
- ) as progress:
- for lib in libraries:
- name = lib.get("name")
- url = lib.get("url")
- branch = lib.get("branch")
- directory = lib.get("directory", "library")
- enabled = lib.get("enabled", True)
-
- if not enabled:
- if verbose:
- console.print(f"[dim]Skipping disabled library: {name}[/dim]")
- results.append((name, "Skipped (disabled)", False))
- continue
-
- task = progress.add_task(f"Updating {name}...", total=None)
-
- # Target path: ~/.config/boilerplates/libraries/{name}/
- target_path = libraries_path / name
-
- # Clone or pull the repository with sparse-checkout if directory is specified
- success, message = _clone_or_pull_repo(name, url, target_path, branch, directory)
-
- results.append((name, message, success))
- progress.remove_task(task)
-
- if verbose:
- status = "[green]✓[/green]" if success else "[red]✗[/red]"
- console.print(f"{status} {name}: {message}")
-
- # Display summary table
- if not verbose:
- table = Table(title="Library Update Summary", show_header=True)
- table.add_column("Library", style="cyan", no_wrap=True)
- table.add_column("Status")
-
- for name, message, success in results:
- status_style = "green" if success else "red"
- status_icon = "✓" if success else "✗"
- table.add_row(name, f"[{status_style}]{status_icon}[/{status_style}] {message}")
-
- console.print(table)
-
- # Summary
- total = len(results)
- successful = sum(1 for _, _, success in results if success)
-
- if successful == total:
- console.print(f"\n[green]All libraries updated successfully ({successful}/{total})[/green]")
- elif successful > 0:
- console.print(f"\n[yellow]Partially successful: {successful}/{total} libraries updated[/yellow]")
- else:
- console.print(f"\n[red]Failed to update libraries[/red]")
- @app.command()
- def list() -> None:
- """List all configured libraries."""
- config = ConfigManager()
- libraries = config.get_libraries()
-
- if not libraries:
- console.print("[yellow]No libraries configured.[/yellow]")
- return
-
- table = Table(title="Configured Libraries", show_header=True)
- table.add_column("Name", style="cyan", no_wrap=True)
- table.add_column("URL", style="blue")
- table.add_column("Branch", style="yellow")
- table.add_column("Directory", style="magenta")
- table.add_column("Status", style="green")
-
- libraries_path = config.get_libraries_path()
-
- for lib in libraries:
- name = lib.get("name", "")
- url = lib.get("url", "")
- branch = lib.get("branch", "main")
- directory = lib.get("directory", "library")
- enabled = lib.get("enabled", True)
-
- # Check if library exists locally (check base path, not directory subdirectory)
- library_path = libraries_path / name
- exists = library_path.exists()
-
- status_parts = []
- if not enabled:
- status_parts.append("[dim]disabled[/dim]")
- elif exists:
- status_parts.append("[green]synced[/green]")
- else:
- status_parts.append("[yellow]not synced[/yellow]")
-
- status = " ".join(status_parts)
-
- table.add_row(name, url, branch, directory, status)
-
- console.print(table)
- @app.command()
- def add(
- name: str = Argument(..., help="Unique name for the library"),
- url: str = Argument(..., help="Git repository URL"),
- branch: str = Option("main", "--branch", "-b", help="Git branch to use"),
- directory: str = Option("library", "--directory", "-d", help="Directory within repo containing templates (metadata only)"),
- enabled: bool = Option(True, "--enabled/--disabled", help="Enable or disable the library"),
- sync: bool = Option(True, "--sync/--no-sync", help="Sync the library after adding")
- ) -> None:
- """Add a new library to the configuration."""
- config = ConfigManager()
-
- try:
- config.add_library(name, url, directory, branch, enabled)
- console.print(f"[green]✓[/green] Added library '{name}'")
-
- if sync and enabled:
- console.print(f"\nSyncing library '{name}'...")
- # Call update for this specific library
- update(library_name=name, verbose=True)
- except ConfigError as e:
- console_err.print(f"[red]Error:[/red] {e}")
- @app.command()
- def remove(
- name: str = Argument(..., help="Name of the library to remove"),
- keep_files: bool = Option(False, "--keep-files", help="Keep the local library files (don't delete)")
- ) -> None:
- """Remove a library from the configuration and delete its local files."""
- config = ConfigManager()
-
- try:
- # Remove from config
- config.remove_library(name)
- console.print(f"[green]✓[/green] Removed library '{name}' from configuration")
-
- # Delete local files unless --keep-files is specified
- if not keep_files:
- libraries_path = config.get_libraries_path()
- library_path = libraries_path / name
-
- if library_path.exists():
- import shutil
- shutil.rmtree(library_path)
- console.print(f"[green]✓[/green] Deleted local files at {library_path}")
- else:
- console.print(f"[dim]No local files found at {library_path}[/dim]")
- except ConfigError as e:
- console_err.print(f"[red]Error:[/red] {e}")
- # Register the repo command with the CLI
- def register_cli(parent_app: Typer) -> None:
- """Register the repo command with the parent Typer app."""
- parent_app.add_typer(app, name="repo")
|