validators.py 9.9 KB


  1. """Semantic validators for template content.
  2. This module provides validators for specific file types and formats,
  3. enabling semantic validation beyond Jinja2 syntax checking.
  4. """
  5. from __future__ import annotations
  6. import logging
  7. from abc import ABC, abstractmethod
  8. from pathlib import Path
  9. from typing import Any, List, Optional
  10. import yaml
  11. from rich.console import Console
  12. logger = logging.getLogger(__name__)
  13. console = Console()
  14. class ValidationResult:
  15. """Represents the result of a validation operation."""
  16. def __init__(self):
  17. self.errors: List[str] = []
  18. self.warnings: List[str] = []
  19. self.info: List[str] = []
  20. def add_error(self, message: str) -> None:
  21. """Add an error message."""
  22. self.errors.append(message)
  23. logger.error(f"Validation error: {message}")
  24. def add_warning(self, message: str) -> None:
  25. """Add a warning message."""
  26. self.warnings.append(message)
  27. logger.warning(f"Validation warning: {message}")
  28. def add_info(self, message: str) -> None:
  29. """Add an info message."""
  30. self.info.append(message)
  31. logger.info(f"Validation info: {message}")
  32. @property
  33. def is_valid(self) -> bool:
  34. """Check if validation passed (no errors)."""
  35. return len(self.errors) == 0
  36. @property
  37. def has_warnings(self) -> bool:
  38. """Check if validation has warnings."""
  39. return len(self.warnings) > 0
  40. def display(self, context: str = "Validation") -> None:
  41. """Display validation results to console."""
  42. if self.errors:
  43. console.print(f"\n[red]✗ {context} Failed:[/red]")
  44. for error in self.errors:
  45. console.print(f" [red]• {error}[/red]")
  46. if self.warnings:
  47. console.print(f"\n[yellow]⚠ {context} Warnings:[/yellow]")
  48. for warning in self.warnings:
  49. console.print(f" [yellow]• {warning}[/yellow]")
  50. if self.info:
  51. console.print(f"\n[blue]ℹ {context} Info:[/blue]")
  52. for info_msg in self.info:
  53. console.print(f" [blue]• {info_msg}[/blue]")
  54. if self.is_valid and not self.has_warnings:
  55. console.print(f"\n[green]✓ {context} Passed[/green]")
  56. class ContentValidator(ABC):
  57. """Abstract base class for content validators."""
  58. @abstractmethod
  59. def validate(self, content: str, file_path: str) -> ValidationResult:
  60. """Validate content and return results.
  61. Args:
  62. content: The file content to validate
  63. file_path: Path to the file (for error messages)
  64. Returns:
  65. ValidationResult with errors, warnings, and info
  66. """
  67. pass
  68. @abstractmethod
  69. def can_validate(self, file_path: str) -> bool:
  70. """Check if this validator can validate the given file.
  71. Args:
  72. file_path: Path to the file
  73. Returns:
  74. True if this validator can handle the file
  75. """
  76. pass
  77. class DockerComposeValidator(ContentValidator):
  78. """Validator for Docker Compose files."""
  79. COMPOSE_FILENAMES = {
  80. "docker-compose.yml",
  81. "docker-compose.yaml",
  82. "compose.yml",
  83. "compose.yaml",
  84. }
  85. def can_validate(self, file_path: str) -> bool:
  86. """Check if file is a Docker Compose file."""
  87. filename = Path(file_path).name.lower()
  88. return filename in self.COMPOSE_FILENAMES
  89. def validate(self, content: str, file_path: str) -> ValidationResult:
  90. """Validate Docker Compose file structure."""
  91. result = ValidationResult()
  92. try:
  93. # Parse YAML
  94. data = yaml.safe_load(content)
  95. if not isinstance(data, dict):
  96. result.add_error("Docker Compose file must be a YAML dictionary")
  97. return result
  98. # Check for version (optional in Compose v2, but good practice)
  99. if "version" not in data:
  100. result.add_info(
  101. "No 'version' field specified (using Compose v2 format)"
  102. )
  103. # Check for services (required)
  104. if "services" not in data:
  105. result.add_error("Missing required 'services' section")
  106. return result
  107. services = data.get("services", {})
  108. if not isinstance(services, dict):
  109. result.add_error("'services' must be a dictionary")
  110. return result
  111. if not services:
  112. result.add_warning("No services defined")
  113. # Validate each service
  114. for service_name, service_config in services.items():
  115. self._validate_service(service_name, service_config, result)
  116. # Check for networks (optional but recommended)
  117. if "networks" in data:
  118. networks = data.get("networks", {})
  119. if networks and isinstance(networks, dict):
  120. result.add_info(f"Defines {len(networks)} network(s)")
  121. # Check for volumes (optional)
  122. if "volumes" in data:
  123. volumes = data.get("volumes", {})
  124. if volumes and isinstance(volumes, dict):
  125. result.add_info(f"Defines {len(volumes)} volume(s)")
  126. except yaml.YAMLError as e:
  127. result.add_error(f"YAML parsing error: {e}")
  128. except Exception as e:
  129. result.add_error(f"Unexpected validation error: {e}")
  130. return result
  131. def _validate_service(
  132. self, name: str, config: Any, result: ValidationResult
  133. ) -> None:
  134. """Validate a single service configuration."""
  135. if not isinstance(config, dict):
  136. result.add_error(f"Service '{name}': configuration must be a dictionary")
  137. return
  138. # Check for image or build (at least one required)
  139. has_image = "image" in config
  140. has_build = "build" in config
  141. if not has_image and not has_build:
  142. result.add_error(f"Service '{name}': must specify 'image' or 'build'")
  143. # Warn about common misconfigurations
  144. if "restart" in config:
  145. restart_value = config["restart"]
  146. valid_restart_policies = ["no", "always", "on-failure", "unless-stopped"]
  147. if restart_value not in valid_restart_policies:
  148. result.add_warning(
  149. f"Service '{name}': restart policy '{restart_value}' may be invalid. "
  150. f"Valid values: {', '.join(valid_restart_policies)}"
  151. )
  152. # Check for environment variables
  153. if "environment" in config:
  154. env = config["environment"]
  155. if isinstance(env, list):
  156. # Check for duplicate keys in list format
  157. keys = [e.split("=")[0] for e in env if isinstance(e, str) and "=" in e]
  158. duplicates = {k for k in keys if keys.count(k) > 1}
  159. if duplicates:
  160. result.add_warning(
  161. f"Service '{name}': duplicate environment variables: {', '.join(duplicates)}"
  162. )
  163. # Check for ports
  164. if "ports" in config:
  165. ports = config["ports"]
  166. if not isinstance(ports, list):
  167. result.add_warning(f"Service '{name}': 'ports' should be a list")
  168. class YAMLValidator(ContentValidator):
  169. """Basic YAML syntax validator."""
  170. def can_validate(self, file_path: str) -> bool:
  171. """Check if file is a YAML file."""
  172. return Path(file_path).suffix.lower() in [".yml", ".yaml"]
  173. def validate(self, content: str, file_path: str) -> ValidationResult:
  174. """Validate YAML syntax."""
  175. result = ValidationResult()
  176. try:
  177. yaml.safe_load(content)
  178. result.add_info("YAML syntax is valid")
  179. except yaml.YAMLError as e:
  180. result.add_error(f"YAML parsing error: {e}")
  181. return result
  182. class ValidatorRegistry:
  183. """Registry for content validators."""
  184. def __init__(self):
  185. self.validators: List[ContentValidator] = []
  186. self._register_default_validators()
  187. def _register_default_validators(self) -> None:
  188. """Register built-in validators."""
  189. self.register(DockerComposeValidator())
  190. self.register(YAMLValidator())
  191. def register(self, validator: ContentValidator) -> None:
  192. """Register a validator.
  193. Args:
  194. validator: The validator to register
  195. """
  196. self.validators.append(validator)
  197. logger.debug(f"Registered validator: {validator.__class__.__name__}")
  198. def get_validator(self, file_path: str) -> Optional[ContentValidator]:
  199. """Get the most appropriate validator for a file.
  200. Args:
  201. file_path: Path to the file
  202. Returns:
  203. ContentValidator if found, None otherwise
  204. """
  205. # Try specific validators first (e.g., DockerComposeValidator before YAMLValidator)
  206. for validator in self.validators:
  207. if validator.can_validate(file_path):
  208. return validator
  209. return None
  210. def validate_file(self, content: str, file_path: str) -> ValidationResult:
  211. """Validate file content using appropriate validator.
  212. Args:
  213. content: The file content
  214. file_path: Path to the file
  215. Returns:
  216. ValidationResult with validation results
  217. """
  218. validator = self.get_validator(file_path)
  219. if validator:
  220. logger.debug(f"Validating {file_path} with {validator.__class__.__name__}")
  221. return validator.validate(content, file_path)
  222. # No validator found - return empty result
  223. result = ValidationResult()
  224. result.add_info(
  225. f"No semantic validator available for {Path(file_path).suffix} files"
  226. )
  227. return result
  228. # Global registry instance
  229. _registry = ValidatorRegistry()
  230. def get_validator_registry() -> ValidatorRegistry:
  231. """Get the global validator registry."""
  232. return _registry