validators.py 9.8 KB


  1. """Semantic validators for template content.
  2. This module provides validators for specific file types and formats,
  3. enabling semantic validation beyond Jinja2 syntax checking.
  4. """
  5. from __future__ import annotations
  6. import logging
  7. from abc import ABC, abstractmethod
  8. from pathlib import Path
  9. from typing import TYPE_CHECKING, Any, ClassVar
  10. if TYPE_CHECKING:
  11. pass
  12. import yaml
  13. from .display import DisplayManager
  14. logger = logging.getLogger(__name__)
  15. class ValidationResult:
  16. """Represents the result of a validation operation."""
  17. def __init__(self):
  18. self.errors: list[str] = []
  19. self.warnings: list[str] = []
  20. self.info: list[str] = []
  21. def add_error(self, message: str) -> None:
  22. """Add an error message."""
  23. self.errors.append(message)
  24. logger.error(f"Validation error: {message}")
  25. def add_warning(self, message: str) -> None:
  26. """Add a warning message."""
  27. self.warnings.append(message)
  28. logger.warning(f"Validation warning: {message}")
  29. def add_info(self, message: str) -> None:
  30. """Add an info message."""
  31. self.info.append(message)
  32. logger.info(f"Validation info: {message}")
  33. @property
  34. def is_valid(self) -> bool:
  35. """Check if validation passed (no errors)."""
  36. return len(self.errors) == 0
  37. @property
  38. def has_warnings(self) -> bool:
  39. """Check if validation has warnings."""
  40. return len(self.warnings) > 0
  41. def display(self, context: str = "Validation") -> None:
  42. """Display validation results using DisplayManager."""
  43. display = DisplayManager()
  44. if self.errors:
  45. display.error(f"\n✗ {context} Failed:")
  46. for error in self.errors:
  47. display.error(f" • {error}")
  48. if self.warnings:
  49. display.warning(f"\n⚠ {context} Warnings:")
  50. for warning in self.warnings:
  51. display.warning(f" • {warning}")
  52. if self.info:
  53. display.text(f"\n[blue]i {context} Info:[/blue]")
  54. for info_msg in self.info:
  55. display.text(f" [blue]• {info_msg}[/blue]")
  56. if self.is_valid and not self.has_warnings:
  57. display.text(f"\n[green]✓ {context} Passed[/green]")
  58. class ContentValidator(ABC):
  59. """Abstract base class for content validators."""
  60. @abstractmethod
  61. def validate(self, content: str, _file_path: str) -> ValidationResult:
  62. """Validate content and return results.
  63. Args:
  64. content: The file content to validate
  65. _file_path: Path to the file (unused in base class, kept for API compatibility)
  66. Returns:
  67. ValidationResult with errors, warnings, and info
  68. """
  69. pass
  70. @abstractmethod
  71. def can_validate(self, file_path: str) -> bool:
  72. """Check if this validator can validate the given file.
  73. Args:
  74. file_path: Path to the file
  75. Returns:
  76. True if this validator can handle the file
  77. """
  78. pass
  79. class DockerComposeValidator(ContentValidator):
  80. """Validator for Docker Compose files."""
  81. COMPOSE_FILENAMES: ClassVar[set[str]] = {
  82. "docker-compose.yml",
  83. "docker-compose.yaml",
  84. "compose.yml",
  85. "compose.yaml",
  86. }
  87. def can_validate(self, file_path: str) -> bool:
  88. """Check if file is a Docker Compose file."""
  89. filename = Path(file_path).name.lower()
  90. return filename in self.COMPOSE_FILENAMES
  91. def validate(self, content: str, _file_path: str) -> ValidationResult:
  92. """Validate Docker Compose file structure."""
  93. result = ValidationResult()
  94. try:
  95. # Parse YAML
  96. data = yaml.safe_load(content)
  97. if not isinstance(data, dict):
  98. result.add_error("Docker Compose file must be a YAML dictionary")
  99. return result
  100. # Check for version (optional in Compose v2, but good practice)
  101. if "version" not in data:
  102. result.add_info("No 'version' field specified (using Compose v2 format)")
  103. # Check for services (required)
  104. if "services" not in data:
  105. result.add_error("Missing required 'services' section")
  106. return result
  107. services = data.get("services", {})
  108. if not isinstance(services, dict):
  109. result.add_error("'services' must be a dictionary")
  110. return result
  111. if not services:
  112. result.add_warning("No services defined")
  113. # Validate each service
  114. for service_name, service_config in services.items():
  115. self._validate_service(service_name, service_config, result)
  116. # Check for networks (optional but recommended)
  117. if "networks" in data:
  118. networks = data.get("networks", {})
  119. if networks and isinstance(networks, dict):
  120. result.add_info(f"Defines {len(networks)} network(s)")
  121. # Check for volumes (optional)
  122. if "volumes" in data:
  123. volumes = data.get("volumes", {})
  124. if volumes and isinstance(volumes, dict):
  125. result.add_info(f"Defines {len(volumes)} volume(s)")
  126. except yaml.YAMLError as e:
  127. result.add_error(f"YAML parsing error: {e}")
  128. except Exception as e:
  129. result.add_error(f"Unexpected validation error: {e}")
  130. return result
  131. def _validate_service(self, name: str, config: Any, result: ValidationResult) -> None:
  132. """Validate a single service configuration."""
  133. if not isinstance(config, dict):
  134. result.add_error(f"Service '{name}': configuration must be a dictionary")
  135. return
  136. # Check for image or build (at least one required)
  137. has_image = "image" in config
  138. has_build = "build" in config
  139. if not has_image and not has_build:
  140. result.add_error(f"Service '{name}': must specify 'image' or 'build'")
  141. # Warn about common misconfigurations
  142. if "restart" in config:
  143. restart_value = config["restart"]
  144. valid_restart_policies = ["no", "always", "on-failure", "unless-stopped"]
  145. if restart_value not in valid_restart_policies:
  146. result.add_warning(
  147. f"Service '{name}': restart policy '{restart_value}' may be invalid. "
  148. f"Valid values: {', '.join(valid_restart_policies)}"
  149. )
  150. # Check for environment variables
  151. if "environment" in config:
  152. env = config["environment"]
  153. if isinstance(env, list):
  154. # Check for duplicate keys in list format
  155. keys = [e.split("=")[0] for e in env if isinstance(e, str) and "=" in e]
  156. duplicates = {k for k in keys if keys.count(k) > 1}
  157. if duplicates:
  158. dups = ", ".join(duplicates)
  159. result.add_warning(f"Service '{name}': duplicate environment variables: {dups}")
  160. # Check for ports
  161. if "ports" in config:
  162. ports = config["ports"]
  163. if not isinstance(ports, list):
  164. result.add_warning(f"Service '{name}': 'ports' should be a list")
  165. class YAMLValidator(ContentValidator):
  166. """Basic YAML syntax validator."""
  167. def can_validate(self, file_path: str) -> bool:
  168. """Check if file is a YAML file."""
  169. return Path(file_path).suffix.lower() in [".yml", ".yaml"]
  170. def validate(self, content: str, _file_path: str) -> ValidationResult:
  171. """Validate YAML syntax."""
  172. result = ValidationResult()
  173. try:
  174. yaml.safe_load(content)
  175. result.add_info("YAML syntax is valid")
  176. except yaml.YAMLError as e:
  177. result.add_error(f"YAML parsing error: {e}")
  178. return result
  179. class ValidatorRegistry:
  180. """Registry for content validators."""
  181. def __init__(self):
  182. self.validators: list[ContentValidator] = []
  183. self._register_default_validators()
  184. def _register_default_validators(self) -> None:
  185. """Register built-in validators."""
  186. self.register(DockerComposeValidator())
  187. self.register(YAMLValidator())
  188. def register(self, validator: ContentValidator) -> None:
  189. """Register a validator.
  190. Args:
  191. validator: The validator to register
  192. """
  193. self.validators.append(validator)
  194. logger.debug(f"Registered validator: {validator.__class__.__name__}")
  195. def get_validator(self, file_path: str) -> ContentValidator | None:
  196. """Get the most appropriate validator for a file.
  197. Args:
  198. file_path: Path to the file
  199. Returns:
  200. ContentValidator if found, None otherwise
  201. """
  202. # Try specific validators first (e.g., DockerComposeValidator before YAMLValidator)
  203. for validator in self.validators:
  204. if validator.can_validate(file_path):
  205. return validator
  206. return None
  207. def validate_file(self, content: str, file_path: str) -> ValidationResult:
  208. """Validate file content using appropriate validator.
  209. Args:
  210. content: The file content
  211. file_path: Path to the file
  212. Returns:
  213. ValidationResult with validation results
  214. """
  215. validator = self.get_validator(file_path)
  216. if validator:
  217. logger.debug(f"Validating {file_path} with {validator.__class__.__name__}")
  218. return validator.validate(content, file_path)
  219. # No validator found - return empty result
  220. result = ValidationResult()
  221. result.add_info(f"No semantic validator available for {Path(file_path).suffix} files")
  222. return result
  223. # Global registry instance
  224. _registry = ValidatorRegistry()
  225. def get_validator_registry() -> ValidatorRegistry:
  226. """Get the global validator registry."""
  227. return _registry