kind_validators.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262
  1. """Kind-specific validators for rendered templates."""
  2. from __future__ import annotations
  3. import os
  4. import shutil
  5. import subprocess
  6. import tempfile
  7. from pathlib import Path
  8. from .validation_runner import KindValidationFailure, KindValidationResult
  9. class RenderedFilesValidator:
  10. """Base class for validators that run CLI tools against rendered files."""
  11. validator_name: str
  12. unavailable_message: str
  13. def __init__(self, verbose: bool = False) -> None:
  14. self.verbose = verbose
  15. self._available: bool | None = None
  16. def command_available(self, command: str) -> bool:
  17. return shutil.which(command) is not None
  18. def validate_rendered_files(self, rendered_files: dict[str, str], case_name: str) -> KindValidationResult:
  19. result = KindValidationResult(validator=self.validator_name, available=self.is_available())
  20. if not result.available:
  21. result.details.append(self.unavailable_message)
  22. return result
  23. with tempfile.TemporaryDirectory(prefix=f"boilerplates-{case_name[:20]}-") as tmp_dir:
  24. workdir = Path(tmp_dir)
  25. self._write_rendered_files(rendered_files, workdir)
  26. return self.validate_directory(workdir)
  27. def is_available(self) -> bool:
  28. raise NotImplementedError
  29. def validate_directory(self, workdir: Path) -> KindValidationResult:
  30. raise NotImplementedError
  31. @staticmethod
  32. def _write_rendered_files(rendered_files: dict[str, str], workdir: Path) -> None:
  33. for filename, content in rendered_files.items():
  34. path = workdir / filename
  35. path.parent.mkdir(parents=True, exist_ok=True)
  36. path.write_text(content, encoding="utf-8")
  37. def run_command(
  38. self,
  39. args: list[str],
  40. workdir: Path,
  41. *,
  42. env: dict[str, str] | None = None,
  43. ) -> subprocess.CompletedProcess[str]:
  44. return subprocess.run(
  45. args,
  46. cwd=workdir,
  47. env=env,
  48. capture_output=True,
  49. text=True,
  50. check=False,
  51. )
  52. def failure_from_process(
  53. self,
  54. result: subprocess.CompletedProcess[str],
  55. file_path: str = "",
  56. ) -> KindValidationFailure | None:
  57. if result.returncode == 0:
  58. return None
  59. message = result.stderr.strip() or result.stdout.strip() or f"{self.validator_name} failed"
  60. return KindValidationFailure(file_path=file_path, validator=self.validator_name, message=message)
  61. class TerraformValidator(RenderedFilesValidator):
  62. """Validate Terraform/OpenTofu configurations."""
  63. validator_name = "tofu validate"
  64. unavailable_message = "Required command is unavailable: tofu or terraform"
  65. def __init__(self, verbose: bool = False) -> None:
  66. super().__init__(verbose)
  67. self.command = "tofu" if self.command_available("tofu") else "terraform"
  68. self.validator_name = f"{self.command} validate"
  69. def is_available(self) -> bool:
  70. if self._available is None:
  71. self._available = self.command_available(self.command)
  72. return self._available
  73. def validate_directory(self, workdir: Path) -> KindValidationResult:
  74. result = KindValidationResult(validator=self.validator_name)
  75. init = self.run_command([self.command, "init", "-backend=false", "-input=false", "-no-color"], workdir)
  76. failure = self.failure_from_process(init)
  77. if failure is not None:
  78. if self._is_provider_resolution_failure(failure.message):
  79. result.skipped = True
  80. result.warnings.append(failure.message)
  81. else:
  82. result.failures.append(failure)
  83. return result
  84. validate = self.run_command([self.command, "validate", "-no-color"], workdir)
  85. failure = self.failure_from_process(validate)
  86. if failure is not None:
  87. result.failures.append(failure)
  88. return result
  89. @staticmethod
  90. def _is_provider_resolution_failure(message: str) -> bool:
  91. return "Failed to resolve provider packages" in message or "could not connect to registry" in message
  92. class KubernetesValidator(RenderedFilesValidator):
  93. """Validate Kubernetes manifests with kubectl client dry-run."""
  94. validator_name = "kubectl create --dry-run=client"
  95. unavailable_message = "Required command is unavailable: kubectl"
  96. def is_available(self) -> bool:
  97. if self._available is None:
  98. self._available = self.command_available("kubectl")
  99. return self._available
  100. def validate_directory(self, workdir: Path) -> KindValidationResult:
  101. result = KindValidationResult(validator=self.validator_name)
  102. process = self.run_command(["kubectl", "create", "--dry-run=client", "--validate=false", "-f", "."], workdir)
  103. failure = self.failure_from_process(process)
  104. if failure is not None:
  105. if self._is_cluster_discovery_failure(failure.message):
  106. result.skipped = True
  107. result.warnings.append(failure.message)
  108. else:
  109. result.failures.append(failure)
  110. return result
  111. @staticmethod
  112. def _is_cluster_discovery_failure(message: str) -> bool:
  113. return "couldn't get current server API group list" in message or "unable to recognize" in message
  114. class HelmValidator(RenderedFilesValidator):
  115. """Validate Helm chart files."""
  116. validator_name = "helm lint"
  117. unavailable_message = "Required command is unavailable: helm"
  118. def is_available(self) -> bool:
  119. if self._available is None:
  120. self._available = self.command_available("helm")
  121. return self._available
  122. def validate_directory(self, workdir: Path) -> KindValidationResult:
  123. result = KindValidationResult(validator=self.validator_name)
  124. if not (workdir / "Chart.yaml").exists():
  125. result.skipped = True
  126. result.warnings.append("Rendered files do not include Chart.yaml")
  127. return result
  128. process = self.run_command(["helm", "lint", "."], workdir)
  129. failure = self.failure_from_process(process)
  130. if failure is not None:
  131. result.failures.append(failure)
  132. return result
  133. class PackerValidator(RenderedFilesValidator):
  134. """Validate Packer templates."""
  135. validator_name = "packer validate"
  136. unavailable_message = "Required command is unavailable: packer"
  137. def is_available(self) -> bool:
  138. if self._available is None:
  139. self._available = self.command_available("packer")
  140. return self._available
  141. def validate_directory(self, workdir: Path) -> KindValidationResult:
  142. result = KindValidationResult(validator=self.validator_name)
  143. if list(workdir.glob("*.pkr.hcl")):
  144. target = "."
  145. else:
  146. candidates = sorted(path for path in workdir.rglob("*") if path.is_file() and path.suffix == ".json")
  147. if not candidates:
  148. result.skipped = True
  149. result.warnings.append("No Packer template files found")
  150. return result
  151. target = str(candidates[0].relative_to(workdir))
  152. process = self.run_command(["packer", "validate", target], workdir)
  153. failure = self.failure_from_process(process)
  154. if failure is not None:
  155. result.failures.append(failure)
  156. return result
  157. class AnsibleValidator(RenderedFilesValidator):
  158. """Validate Ansible playbooks with syntax-check."""
  159. validator_name = "ansible-playbook --syntax-check"
  160. unavailable_message = "Required command is unavailable: ansible-playbook"
  161. def is_available(self) -> bool:
  162. if self._available is None:
  163. self._available = self.command_available("ansible-playbook")
  164. return self._available
  165. def validate_directory(self, workdir: Path) -> KindValidationResult:
  166. result = KindValidationResult(validator=self.validator_name)
  167. playbooks = self._find_playbooks(workdir)
  168. if not playbooks:
  169. result.skipped = True
  170. result.warnings.append("No Ansible playbooks found")
  171. return result
  172. env = os.environ.copy()
  173. env["ANSIBLE_REMOTE_TEMP"] = "/tmp/.ansible-${USER}/tmp"
  174. with tempfile.TemporaryDirectory(prefix="ansible-local-") as ansible_local_temp:
  175. env["ANSIBLE_LOCAL_TEMP"] = ansible_local_temp
  176. for playbook in playbooks:
  177. process = self.run_command(
  178. ["ansible-playbook", "--syntax-check", str(playbook.relative_to(workdir))],
  179. workdir,
  180. env=env,
  181. )
  182. failure = self.failure_from_process(process, str(playbook.relative_to(workdir)))
  183. if failure is not None:
  184. if self._is_dependency_resolution_failure(failure.message):
  185. result.skipped = True
  186. result.warnings.append(failure.message)
  187. continue
  188. result.failures.append(failure)
  189. return result
  190. @staticmethod
  191. def _find_playbooks(workdir: Path) -> list[Path]:
  192. candidates = []
  193. for path in workdir.rglob("*"):
  194. if not path.is_file() or path.suffix.lower() not in {".yaml", ".yml"}:
  195. continue
  196. if "playbook" in path.name.lower() or AnsibleValidator._looks_like_playbook(path):
  197. candidates.append(path)
  198. return candidates
  199. @staticmethod
  200. def _looks_like_playbook(path: Path) -> bool:
  201. try:
  202. content = path.read_text(encoding="utf-8")
  203. except OSError:
  204. return False
  205. return any(line.lstrip().startswith("hosts:") for line in content.splitlines())
  206. @staticmethod
  207. def _is_dependency_resolution_failure(message: str) -> bool:
  208. return ("the role" in message and "was not found" in message) or "couldn't resolve module/action" in message