| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262 |
- """Kind-specific validators for rendered templates."""
- from __future__ import annotations
- import os
- import shutil
- import subprocess
- import tempfile
- from pathlib import Path
- from .validation_runner import KindValidationFailure, KindValidationResult
- class RenderedFilesValidator:
- """Base class for validators that run CLI tools against rendered files."""
- validator_name: str
- unavailable_message: str
- def __init__(self, verbose: bool = False) -> None:
- self.verbose = verbose
- self._available: bool | None = None
- def command_available(self, command: str) -> bool:
- return shutil.which(command) is not None
- def validate_rendered_files(self, rendered_files: dict[str, str], case_name: str) -> KindValidationResult:
- result = KindValidationResult(validator=self.validator_name, available=self.is_available())
- if not result.available:
- result.details.append(self.unavailable_message)
- return result
- with tempfile.TemporaryDirectory(prefix=f"boilerplates-{case_name[:20]}-") as tmp_dir:
- workdir = Path(tmp_dir)
- self._write_rendered_files(rendered_files, workdir)
- return self.validate_directory(workdir)
- def is_available(self) -> bool:
- raise NotImplementedError
- def validate_directory(self, workdir: Path) -> KindValidationResult:
- raise NotImplementedError
- @staticmethod
- def _write_rendered_files(rendered_files: dict[str, str], workdir: Path) -> None:
- for filename, content in rendered_files.items():
- path = workdir / filename
- path.parent.mkdir(parents=True, exist_ok=True)
- path.write_text(content, encoding="utf-8")
- def run_command(
- self,
- args: list[str],
- workdir: Path,
- *,
- env: dict[str, str] | None = None,
- ) -> subprocess.CompletedProcess[str]:
- return subprocess.run(
- args,
- cwd=workdir,
- env=env,
- capture_output=True,
- text=True,
- check=False,
- )
- def failure_from_process(
- self,
- result: subprocess.CompletedProcess[str],
- file_path: str = "",
- ) -> KindValidationFailure | None:
- if result.returncode == 0:
- return None
- message = result.stderr.strip() or result.stdout.strip() or f"{self.validator_name} failed"
- return KindValidationFailure(file_path=file_path, validator=self.validator_name, message=message)
- class TerraformValidator(RenderedFilesValidator):
- """Validate Terraform/OpenTofu configurations."""
- validator_name = "tofu validate"
- unavailable_message = "Required command is unavailable: tofu or terraform"
- def __init__(self, verbose: bool = False) -> None:
- super().__init__(verbose)
- self.command = "tofu" if self.command_available("tofu") else "terraform"
- self.validator_name = f"{self.command} validate"
- def is_available(self) -> bool:
- if self._available is None:
- self._available = self.command_available(self.command)
- return self._available
- def validate_directory(self, workdir: Path) -> KindValidationResult:
- result = KindValidationResult(validator=self.validator_name)
- init = self.run_command([self.command, "init", "-backend=false", "-input=false", "-no-color"], workdir)
- failure = self.failure_from_process(init)
- if failure is not None:
- if self._is_provider_resolution_failure(failure.message):
- result.skipped = True
- result.warnings.append(failure.message)
- else:
- result.failures.append(failure)
- return result
- validate = self.run_command([self.command, "validate", "-no-color"], workdir)
- failure = self.failure_from_process(validate)
- if failure is not None:
- result.failures.append(failure)
- return result
- @staticmethod
- def _is_provider_resolution_failure(message: str) -> bool:
- return "Failed to resolve provider packages" in message or "could not connect to registry" in message
- class KubernetesValidator(RenderedFilesValidator):
- """Validate Kubernetes manifests with kubectl client dry-run."""
- validator_name = "kubectl create --dry-run=client"
- unavailable_message = "Required command is unavailable: kubectl"
- def is_available(self) -> bool:
- if self._available is None:
- self._available = self.command_available("kubectl")
- return self._available
- def validate_directory(self, workdir: Path) -> KindValidationResult:
- result = KindValidationResult(validator=self.validator_name)
- process = self.run_command(["kubectl", "create", "--dry-run=client", "--validate=false", "-f", "."], workdir)
- failure = self.failure_from_process(process)
- if failure is not None:
- if self._is_cluster_discovery_failure(failure.message):
- result.skipped = True
- result.warnings.append(failure.message)
- else:
- result.failures.append(failure)
- return result
- @staticmethod
- def _is_cluster_discovery_failure(message: str) -> bool:
- return "couldn't get current server API group list" in message or "unable to recognize" in message
- class HelmValidator(RenderedFilesValidator):
- """Validate Helm chart files."""
- validator_name = "helm lint"
- unavailable_message = "Required command is unavailable: helm"
- def is_available(self) -> bool:
- if self._available is None:
- self._available = self.command_available("helm")
- return self._available
- def validate_directory(self, workdir: Path) -> KindValidationResult:
- result = KindValidationResult(validator=self.validator_name)
- if not (workdir / "Chart.yaml").exists():
- result.skipped = True
- result.warnings.append("Rendered files do not include Chart.yaml")
- return result
- process = self.run_command(["helm", "lint", "."], workdir)
- failure = self.failure_from_process(process)
- if failure is not None:
- result.failures.append(failure)
- return result
- class PackerValidator(RenderedFilesValidator):
- """Validate Packer templates."""
- validator_name = "packer validate"
- unavailable_message = "Required command is unavailable: packer"
- def is_available(self) -> bool:
- if self._available is None:
- self._available = self.command_available("packer")
- return self._available
- def validate_directory(self, workdir: Path) -> KindValidationResult:
- result = KindValidationResult(validator=self.validator_name)
- if list(workdir.glob("*.pkr.hcl")):
- target = "."
- else:
- candidates = sorted(path for path in workdir.rglob("*") if path.is_file() and path.suffix == ".json")
- if not candidates:
- result.skipped = True
- result.warnings.append("No Packer template files found")
- return result
- target = str(candidates[0].relative_to(workdir))
- process = self.run_command(["packer", "validate", target], workdir)
- failure = self.failure_from_process(process)
- if failure is not None:
- result.failures.append(failure)
- return result
- class AnsibleValidator(RenderedFilesValidator):
- """Validate Ansible playbooks with syntax-check."""
- validator_name = "ansible-playbook --syntax-check"
- unavailable_message = "Required command is unavailable: ansible-playbook"
- def is_available(self) -> bool:
- if self._available is None:
- self._available = self.command_available("ansible-playbook")
- return self._available
- def validate_directory(self, workdir: Path) -> KindValidationResult:
- result = KindValidationResult(validator=self.validator_name)
- playbooks = self._find_playbooks(workdir)
- if not playbooks:
- result.skipped = True
- result.warnings.append("No Ansible playbooks found")
- return result
- env = os.environ.copy()
- env["ANSIBLE_REMOTE_TEMP"] = "/tmp/.ansible-${USER}/tmp"
- with tempfile.TemporaryDirectory(prefix="ansible-local-") as ansible_local_temp:
- env["ANSIBLE_LOCAL_TEMP"] = ansible_local_temp
- for playbook in playbooks:
- process = self.run_command(
- ["ansible-playbook", "--syntax-check", str(playbook.relative_to(workdir))],
- workdir,
- env=env,
- )
- failure = self.failure_from_process(process, str(playbook.relative_to(workdir)))
- if failure is not None:
- if self._is_dependency_resolution_failure(failure.message):
- result.skipped = True
- result.warnings.append(failure.message)
- continue
- result.failures.append(failure)
- return result
- @staticmethod
- def _find_playbooks(workdir: Path) -> list[Path]:
- candidates = []
- for path in workdir.rglob("*"):
- if not path.is_file() or path.suffix.lower() not in {".yaml", ".yml"}:
- continue
- if "playbook" in path.name.lower() or AnsibleValidator._looks_like_playbook(path):
- candidates.append(path)
- return candidates
- @staticmethod
- def _looks_like_playbook(path: Path) -> bool:
- try:
- content = path.read_text(encoding="utf-8")
- except OSError:
- return False
- return any(line.lstrip().startswith("hosts:") for line in content.splitlines())
- @staticmethod
- def _is_dependency_resolution_failure(message: str) -> bool:
- return ("the role" in message and "was not found" in message) or "couldn't resolve module/action" in message
|