validators.py 10 KB


  1. """Semantic validators for template content.
  2. This module provides validators for specific file types and formats,
  3. enabling semantic validation beyond Jinja2 syntax checking.
  4. """
  5. from __future__ import annotations
  6. import logging
  7. from abc import ABC, abstractmethod
  8. from pathlib import Path
  9. from typing import Any, Dict, List, Optional
  10. import yaml
  11. from rich.console import Console
  12. logger = logging.getLogger(__name__)
  13. console = Console()
  14. class ValidationResult:
  15. """Represents the result of a validation operation."""
  16. def __init__(self):
  17. self.errors: List[str] = []
  18. self.warnings: List[str] = []
  19. self.info: List[str] = []
  20. def add_error(self, message: str) -> None:
  21. """Add an error message."""
  22. self.errors.append(message)
  23. logger.error(f"Validation error: {message}")
  24. def add_warning(self, message: str) -> None:
  25. """Add a warning message."""
  26. self.warnings.append(message)
  27. logger.warning(f"Validation warning: {message}")
  28. def add_info(self, message: str) -> None:
  29. """Add an info message."""
  30. self.info.append(message)
  31. logger.info(f"Validation info: {message}")
  32. @property
  33. def is_valid(self) -> bool:
  34. """Check if validation passed (no errors)."""
  35. return len(self.errors) == 0
  36. @property
  37. def has_warnings(self) -> bool:
  38. """Check if validation has warnings."""
  39. return len(self.warnings) > 0
  40. def display(self, context: str = "Validation") -> None:
  41. """Display validation results to console."""
  42. if self.errors:
  43. console.print(f"\n[red]✗ {context} Failed:[/red]")
  44. for error in self.errors:
  45. console.print(f" [red]• {error}[/red]")
  46. if self.warnings:
  47. console.print(f"\n[yellow]⚠ {context} Warnings:[/yellow]")
  48. for warning in self.warnings:
  49. console.print(f" [yellow]• {warning}[/yellow]")
  50. if self.info:
  51. console.print(f"\n[blue]ℹ {context} Info:[/blue]")
  52. for info_msg in self.info:
  53. console.print(f" [blue]• {info_msg}[/blue]")
  54. if self.is_valid and not self.has_warnings:
  55. console.print(f"\n[green]✓ {context} Passed[/green]")
  56. class ContentValidator(ABC):
  57. """Abstract base class for content validators."""
  58. @abstractmethod
  59. def validate(self, content: str, file_path: str) -> ValidationResult:
  60. """Validate content and return results.
  61. Args:
  62. content: The file content to validate
  63. file_path: Path to the file (for error messages)
  64. Returns:
  65. ValidationResult with errors, warnings, and info
  66. """
  67. pass
  68. @abstractmethod
  69. def can_validate(self, file_path: str) -> bool:
  70. """Check if this validator can validate the given file.
  71. Args:
  72. file_path: Path to the file
  73. Returns:
  74. True if this validator can handle the file
  75. """
  76. pass
  77. class DockerComposeValidator(ContentValidator):
  78. """Validator for Docker Compose files."""
  79. COMPOSE_FILENAMES = {
  80. "docker-compose.yml",
  81. "docker-compose.yaml",
  82. "compose.yml",
  83. "compose.yaml",
  84. }
  85. def can_validate(self, file_path: str) -> bool:
  86. """Check if file is a Docker Compose file."""
  87. filename = Path(file_path).name.lower()
  88. return filename in self.COMPOSE_FILENAMES
  89. def validate(self, content: str, file_path: str) -> ValidationResult:
  90. """Validate Docker Compose file structure."""
  91. result = ValidationResult()
  92. try:
  93. # Parse YAML
  94. data = yaml.safe_load(content)
  95. if not isinstance(data, dict):
  96. result.add_error("Docker Compose file must be a YAML dictionary")
  97. return result
  98. # Check for version (optional in Compose v2, but good practice)
  99. if "version" not in data:
  100. result.add_info("No 'version' field specified (using Compose v2 format)")
  101. # Check for services (required)
  102. if "services" not in data:
  103. result.add_error("Missing required 'services' section")
  104. return result
  105. services = data.get("services", {})
  106. if not isinstance(services, dict):
  107. result.add_error("'services' must be a dictionary")
  108. return result
  109. if not services:
  110. result.add_warning("No services defined")
  111. # Validate each service
  112. for service_name, service_config in services.items():
  113. self._validate_service(service_name, service_config, result)
  114. # Check for networks (optional but recommended)
  115. if "networks" in data:
  116. networks = data.get("networks", {})
  117. if networks and isinstance(networks, dict):
  118. result.add_info(f"Defines {len(networks)} network(s)")
  119. # Check for volumes (optional)
  120. if "volumes" in data:
  121. volumes = data.get("volumes", {})
  122. if volumes and isinstance(volumes, dict):
  123. result.add_info(f"Defines {len(volumes)} volume(s)")
  124. except yaml.YAMLError as e:
  125. result.add_error(f"YAML parsing error: {e}")
  126. except Exception as e:
  127. result.add_error(f"Unexpected validation error: {e}")
  128. return result
  129. def _validate_service(self, name: str, config: Any, result: ValidationResult) -> None:
  130. """Validate a single service configuration."""
  131. if not isinstance(config, dict):
  132. result.add_error(f"Service '{name}': configuration must be a dictionary")
  133. return
  134. # Check for image or build (at least one required)
  135. has_image = "image" in config
  136. has_build = "build" in config
  137. if not has_image and not has_build:
  138. result.add_error(f"Service '{name}': must specify 'image' or 'build'")
  139. # Warn about common misconfigurations
  140. if "restart" in config:
  141. restart_value = config["restart"]
  142. valid_restart_policies = ["no", "always", "on-failure", "unless-stopped"]
  143. if restart_value not in valid_restart_policies:
  144. result.add_warning(
  145. f"Service '{name}': restart policy '{restart_value}' may be invalid. "
  146. f"Valid values: {', '.join(valid_restart_policies)}"
  147. )
  148. # Check for environment variables
  149. if "environment" in config:
  150. env = config["environment"]
  151. if isinstance(env, list):
  152. # Check for duplicate keys in list format
  153. keys = [e.split("=")[0] for e in env if isinstance(e, str) and "=" in e]
  154. duplicates = {k for k in keys if keys.count(k) > 1}
  155. if duplicates:
  156. result.add_warning(
  157. f"Service '{name}': duplicate environment variables: {', '.join(duplicates)}"
  158. )
  159. # Check for ports
  160. if "ports" in config:
  161. ports = config["ports"]
  162. if not isinstance(ports, list):
  163. result.add_warning(f"Service '{name}': 'ports' should be a list")
  164. class YAMLValidator(ContentValidator):
  165. """Basic YAML syntax validator."""
  166. def can_validate(self, file_path: str) -> bool:
  167. """Check if file is a YAML file."""
  168. return Path(file_path).suffix.lower() in [".yml", ".yaml"]
  169. def validate(self, content: str, file_path: str) -> ValidationResult:
  170. """Validate YAML syntax."""
  171. result = ValidationResult()
  172. try:
  173. yaml.safe_load(content)
  174. result.add_info("YAML syntax is valid")
  175. except yaml.YAMLError as e:
  176. result.add_error(f"YAML parsing error: {e}")
  177. return result
  178. class ValidatorRegistry:
  179. """Registry for content validators."""
  180. def __init__(self):
  181. self.validators: List[ContentValidator] = []
  182. self._register_default_validators()
  183. def _register_default_validators(self) -> None:
  184. """Register built-in validators."""
  185. self.register(DockerComposeValidator())
  186. self.register(YAMLValidator())
  187. def register(self, validator: ContentValidator) -> None:
  188. """Register a validator.
  189. Args:
  190. validator: The validator to register
  191. """
  192. self.validators.append(validator)
  193. logger.debug(f"Registered validator: {validator.__class__.__name__}")
  194. def get_validator(self, file_path: str) -> Optional[ContentValidator]:
  195. """Get the most appropriate validator for a file.
  196. Args:
  197. file_path: Path to the file
  198. Returns:
  199. ContentValidator if found, None otherwise
  200. """
  201. # Try specific validators first (e.g., DockerComposeValidator before YAMLValidator)
  202. for validator in self.validators:
  203. if validator.can_validate(file_path):
  204. return validator
  205. return None
  206. def validate_file(self, content: str, file_path: str) -> ValidationResult:
  207. """Validate file content using appropriate validator.
  208. Args:
  209. content: The file content
  210. file_path: Path to the file
  211. Returns:
  212. ValidationResult with validation results
  213. """
  214. validator = self.get_validator(file_path)
  215. if validator:
  216. logger.debug(f"Validating {file_path} with {validator.__class__.__name__}")
  217. return validator.validate(content, file_path)
  218. # No validator found - return empty result
  219. result = ValidationResult()
  220. result.add_info(f"No semantic validator available for {Path(file_path).suffix} files")
  221. return result
  222. # Global registry instance
  223. _registry = ValidatorRegistry()
  224. def get_validator_registry() -> ValidatorRegistry:
  225. """Get the global validator registry."""
  226. return _registry