template.py 22 KB


  1. from __future__ import annotations
  2. from .variable import Variable
  3. from .collection import VariableCollection
  4. from .exceptions import (
  5. TemplateError,
  6. TemplateLoadError,
  7. TemplateSyntaxError,
  8. TemplateValidationError,
  9. TemplateRenderError,
  10. YAMLParseError,
  11. ModuleLoadError
  12. )
  13. from pathlib import Path
  14. from typing import Any, Dict, List, Set, Optional, Literal
  15. from dataclasses import dataclass, field
  16. from functools import lru_cache
  17. import logging
  18. import os
  19. import yaml
  20. from jinja2 import Environment, FileSystemLoader, meta
  21. from jinja2.sandbox import SandboxedEnvironment
  22. from jinja2 import nodes
  23. from jinja2.visitor import NodeVisitor
  24. logger = logging.getLogger(__name__)
  25. @dataclass
  26. class TemplateFile:
  27. """Represents a single file within a template directory."""
  28. relative_path: Path
  29. file_type: Literal['j2', 'static']
  30. output_path: Path # The path it will have in the output directory
  31. @dataclass
  32. class TemplateMetadata:
  33. """Represents template metadata with proper typing."""
  34. name: str
  35. description: str
  36. author: str
  37. date: str
  38. version: str
  39. module: str = ""
  40. tags: List[str] = field(default_factory=list)
  41. library: str = "unknown"
  42. next_steps: str = ""
  43. draft: bool = False
  44. def __init__(self, template_data: dict, library_name: str | None = None) -> None:
  45. """Initialize TemplateMetadata from parsed YAML template data.
  46. Args:
  47. template_data: Parsed YAML data from template.yaml
  48. library_name: Name of the library this template belongs to
  49. """
  50. # Validate metadata format first
  51. self._validate_metadata(template_data)
  52. # Extract metadata section
  53. metadata_section = template_data.get("metadata", {})
  54. self.name = metadata_section.get("name", "")
  55. # YAML block scalar (|) preserves a trailing newline. Remove only trailing newlines
  56. # while preserving internal newlines/formatting.
  57. raw_description = metadata_section.get("description", "")
  58. if isinstance(raw_description, str):
  59. description = raw_description.rstrip("\n")
  60. else:
  61. description = str(raw_description)
  62. self.description = description or "No description available"
  63. self.author = metadata_section.get("author", "")
  64. self.date = metadata_section.get("date", "")
  65. self.version = metadata_section.get("version", "")
  66. self.module = metadata_section.get("module", "")
  67. self.tags = metadata_section.get("tags", []) or []
  68. self.library = library_name or "unknown"
  69. self.draft = metadata_section.get("draft", False)
  70. # Extract next_steps (optional)
  71. raw_next_steps = metadata_section.get("next_steps", "")
  72. if isinstance(raw_next_steps, str):
  73. next_steps = raw_next_steps.rstrip("\n")
  74. else:
  75. next_steps = str(raw_next_steps) if raw_next_steps else ""
  76. self.next_steps = next_steps
  77. @staticmethod
  78. def _validate_metadata(template_data: dict) -> None:
  79. """Validate that template has required 'metadata' section with all required fields.
  80. Args:
  81. template_data: Parsed YAML data from template.yaml
  82. Raises:
  83. ValueError: If metadata section is missing or incomplete
  84. """
  85. metadata_section = template_data.get("metadata")
  86. if metadata_section is None:
  87. raise ValueError("Template format error: missing 'metadata' section")
  88. # Validate that metadata section has all required fields
  89. required_fields = ["name", "author", "version", "date", "description"]
  90. missing_fields = [field for field in required_fields if not metadata_section.get(field)]
  91. if missing_fields:
  92. raise ValueError(f"Template format error: missing required metadata fields: {missing_fields}")
  93. @dataclass
  94. class Template:
  95. """Represents a template directory."""
  96. def __init__(self, template_dir: Path, library_name: str) -> None:
  97. """Create a Template instance from a directory path."""
  98. logger.debug(f"Loading template from directory: {template_dir}")
  99. self.template_dir = template_dir
  100. self.id = template_dir.name
  101. self.library_name = library_name
  102. # Initialize caches for lazy loading
  103. self.__module_specs: Optional[dict] = None
  104. self.__merged_specs: Optional[dict] = None
  105. self.__jinja_env: Optional[Environment] = None
  106. self.__used_variables: Optional[Set[str]] = None
  107. self.__variables: Optional[VariableCollection] = None
  108. self.__template_files: Optional[List[TemplateFile]] = None # New attribute
  109. try:
  110. # Find and parse the main template file (template.yaml or template.yml)
  111. main_template_path = self._find_main_template_file()
  112. with open(main_template_path, "r", encoding="utf-8") as f:
  113. # Load all YAML documents (handles templates with empty lines before ---)
  114. documents = list(yaml.safe_load_all(f))
  115. # Filter out None/empty documents and get the first non-empty one
  116. valid_docs = [doc for doc in documents if doc is not None]
  117. if not valid_docs:
  118. raise ValueError("Template file contains no valid YAML data")
  119. if len(valid_docs) > 1:
  120. logger.warning(f"Template file contains multiple YAML documents, using the first one")
  121. self._template_data = valid_docs[0]
  122. # Validate template data
  123. if not isinstance(self._template_data, dict):
  124. raise ValueError("Template file must contain a valid YAML dictionary")
  125. # Load metadata (always needed)
  126. self.metadata = TemplateMetadata(self._template_data, library_name)
  127. logger.debug(f"Loaded metadata: {self.metadata}")
  128. # Validate 'kind' field (always needed)
  129. self._validate_kind(self._template_data)
  130. # NOTE: File collection is now lazy-loaded via the template_files property
  131. # This significantly improves performance when listing many templates
  132. logger.info(f"Loaded template '{self.id}' (v{self.metadata.version})")
  133. except (ValueError, FileNotFoundError) as e:
  134. logger.error(f"Error loading template from {template_dir}: {e}")
  135. raise TemplateLoadError(f"Error loading template from {template_dir}: {e}")
  136. except yaml.YAMLError as e:
  137. logger.error(f"YAML parsing error in template {template_dir}: {e}")
  138. raise YAMLParseError(str(template_dir / "template.y*ml"), e)
  139. except (IOError, OSError) as e:
  140. logger.error(f"File I/O error loading template {template_dir}: {e}")
  141. raise TemplateLoadError(f"File I/O error loading template from {template_dir}: {e}")
  142. def _find_main_template_file(self) -> Path:
  143. """Find the main template file (template.yaml or template.yml)."""
  144. for filename in ["template.yaml", "template.yml"]:
  145. path = self.template_dir / filename
  146. if path.exists():
  147. return path
  148. raise FileNotFoundError(f"Main template file (template.yaml or template.yml) not found in {self.template_dir}")
  149. @staticmethod
  150. @lru_cache(maxsize=32)
  151. def _load_module_specs(kind: str) -> dict:
  152. """Load specifications from the corresponding module with caching.
  153. Uses LRU cache to avoid re-loading the same module spec multiple times.
  154. This significantly improves performance when listing many templates of the same kind.
  155. Args:
  156. kind: The module kind (e.g., 'compose', 'terraform')
  157. Returns:
  158. Dictionary containing the module's spec, or empty dict if kind is empty
  159. Raises:
  160. ValueError: If module cannot be loaded or spec is invalid
  161. """
  162. if not kind:
  163. return {}
  164. try:
  165. import importlib
  166. module = importlib.import_module(f"cli.modules.{kind}")
  167. spec = getattr(module, 'spec', {})
  168. logger.debug(f"Loaded and cached module spec for kind '{kind}'")
  169. return spec
  170. except Exception as e:
  171. raise ValueError(f"Error loading module specifications for kind '{kind}': {e}")
  172. def _merge_specs(self, module_specs: dict, template_specs: dict) -> dict:
  173. """Deep merge template specs with module specs using VariableCollection.
  174. Uses VariableCollection's native merge() method for consistent merging logic.
  175. Module specs are base, template specs override with origin tracking.
  176. """
  177. # Create VariableCollection from module specs (base)
  178. module_collection = VariableCollection(module_specs) if module_specs else VariableCollection({})
  179. # Set origin for module variables
  180. for section in module_collection.get_sections().values():
  181. for variable in section.variables.values():
  182. if not variable.origin:
  183. variable.origin = "module"
  184. # Merge template specs into module specs (template overrides)
  185. if template_specs:
  186. merged_collection = module_collection.merge(template_specs, origin="template")
  187. else:
  188. merged_collection = module_collection
  189. # Convert back to dict format
  190. merged_spec = {}
  191. for section_key, section in merged_collection.get_sections().items():
  192. merged_spec[section_key] = section.to_dict()
  193. return merged_spec
  194. def _collect_template_files(self) -> None:
  195. """Collects all TemplateFile objects in the template directory."""
  196. template_files: List[TemplateFile] = []
  197. for root, _, files in os.walk(self.template_dir):
  198. for filename in files:
  199. file_path = Path(root) / filename
  200. relative_path = file_path.relative_to(self.template_dir)
  201. # Skip the main template file
  202. if filename in ["template.yaml", "template.yml"]:
  203. continue
  204. if filename.endswith(".j2"):
  205. file_type: Literal['j2', 'static'] = 'j2'
  206. output_path = relative_path.with_suffix('') # Remove .j2 suffix
  207. else:
  208. file_type = 'static'
  209. output_path = relative_path # Static files keep their name
  210. template_files.append(TemplateFile(relative_path=relative_path, file_type=file_type, output_path=output_path))
  211. self.__template_files = template_files
  212. def _extract_all_used_variables(self) -> Set[str]:
  213. """Extract all undeclared variables from all .j2 files in the template directory.
  214. Raises:
  215. ValueError: If any Jinja2 template has syntax errors
  216. """
  217. used_variables: Set[str] = set()
  218. syntax_errors = []
  219. for template_file in self.template_files: # Iterate over TemplateFile objects
  220. if template_file.file_type == 'j2':
  221. file_path = self.template_dir / template_file.relative_path
  222. try:
  223. with open(file_path, "r", encoding="utf-8") as f:
  224. content = f.read()
  225. ast = self.jinja_env.parse(content) # Use lazy-loaded jinja_env
  226. used_variables.update(meta.find_undeclared_variables(ast))
  227. except (IOError, OSError) as e:
  228. relative_path = file_path.relative_to(self.template_dir)
  229. syntax_errors.append(f" - {relative_path}: File I/O error: {e}")
  230. except Exception as e:
  231. # Collect syntax errors for Jinja2 issues
  232. relative_path = file_path.relative_to(self.template_dir)
  233. syntax_errors.append(f" - {relative_path}: {e}")
  234. # Raise error if any syntax errors were found
  235. if syntax_errors:
  236. logger.error(f"Jinja2 syntax errors found in template '{self.id}'")
  237. raise TemplateSyntaxError(self.id, syntax_errors)
  238. return used_variables
  239. def _extract_jinja_default_values(self) -> dict[str, object]:
  240. """Scan all .j2 files and extract literal arguments to the `default` filter.
  241. Returns a mapping var_name -> literal_value for simple cases like
  242. {{ var | default("value") }} or {{ var | default(123) }}.
  243. This does not attempt to evaluate complex expressions.
  244. """
  245. defaults: dict[str, object] = {}
  246. class _DefaultVisitor(NodeVisitor):
  247. def __init__(self):
  248. self.found: dict[str, object] = {}
  249. def visit_Filter(self, node: nodes.Filter) -> None: # type: ignore[override]
  250. try:
  251. if getattr(node, 'name', None) == 'default' and node.args:
  252. # target variable name when filter is applied directly to a Name
  253. target = None
  254. if isinstance(node.node, nodes.Name):
  255. target = node.node.name
  256. # first arg literal
  257. first = node.args[0]
  258. if isinstance(first, nodes.Const) and target:
  259. self.found[target] = first.value
  260. except Exception:
  261. # Be resilient to unexpected node shapes
  262. pass
  263. # continue traversal
  264. self.generic_visit(node)
  265. visitor = _DefaultVisitor()
  266. for template_file in self.template_files:
  267. if template_file.file_type != 'j2':
  268. continue
  269. file_path = self.template_dir / template_file.relative_path
  270. try:
  271. with open(file_path, 'r', encoding='utf-8') as f:
  272. content = f.read()
  273. ast = self.jinja_env.parse(content)
  274. visitor.visit(ast)
  275. except (IOError, OSError, yaml.YAMLError):
  276. # Skip failures - this extraction is best-effort only
  277. continue
  278. return visitor.found
  279. def _filter_specs_to_used(self, used_variables: set, merged_specs: dict, module_specs: dict, template_specs: dict) -> dict:
  280. """Filter specs to only include variables used in templates using VariableCollection.
  281. Uses VariableCollection's native filter_to_used() method.
  282. Keeps sensitive variables only if they're defined in the template spec or actually used.
  283. """
  284. # Build set of variables explicitly defined in template spec
  285. template_defined_vars = set()
  286. for section_data in (template_specs or {}).values():
  287. if isinstance(section_data, dict) and 'vars' in section_data:
  288. template_defined_vars.update(section_data['vars'].keys())
  289. # Create VariableCollection from merged specs
  290. merged_collection = VariableCollection(merged_specs)
  291. # Filter to only used variables (and sensitive ones that are template-defined)
  292. # We keep sensitive variables that are either:
  293. # 1. Actually used in template files, OR
  294. # 2. Explicitly defined in the template spec (even if not yet used)
  295. variables_to_keep = used_variables | template_defined_vars
  296. filtered_collection = merged_collection.filter_to_used(variables_to_keep, keep_sensitive=False)
  297. # Convert back to dict format
  298. filtered_specs = {}
  299. for section_key, section in filtered_collection.get_sections().items():
  300. filtered_specs[section_key] = section.to_dict()
  301. return filtered_specs
  302. @staticmethod
  303. def _validate_kind(template_data: dict) -> None:
  304. """Validate that template has required 'kind' field.
  305. Args:
  306. template_data: Parsed YAML data from template.yaml
  307. Raises:
  308. ValueError: If 'kind' field is missing
  309. """
  310. if not template_data.get("kind"):
  311. raise TemplateValidationError("Template format error: missing 'kind' field")
  312. def _validate_variable_definitions(self, used_variables: set[str], merged_specs: dict[str, Any]) -> None:
  313. """Validate that all variables used in Jinja2 content are defined in the spec."""
  314. defined_variables = set()
  315. for section_data in merged_specs.values():
  316. if "vars" in section_data and isinstance(section_data["vars"], dict):
  317. defined_variables.update(section_data["vars"].keys())
  318. undefined_variables = used_variables - defined_variables
  319. if undefined_variables:
  320. undefined_list = sorted(undefined_variables)
  321. error_msg = (
  322. f"Template validation error in '{self.id}': "
  323. f"Variables used in template content but not defined in spec: {undefined_list}\n\n"
  324. f"Please add these variables to your template's template.yaml spec. "
  325. f"Each variable must have a default value.\n\n"
  326. f"Example:\n"
  327. f"spec:\n"
  328. f" general:\n"
  329. f" vars:\n"
  330. )
  331. for var_name in undefined_list:
  332. error_msg += (
  333. f" {var_name}:\n"
  334. f" type: str\n"
  335. f" description: Description for {var_name}\n"
  336. f" default: <your_default_value_here>\n"
  337. )
  338. logger.error(error_msg)
  339. raise TemplateValidationError(error_msg)
  340. @staticmethod
  341. def _create_jinja_env(searchpath: Path) -> Environment:
  342. """Create sandboxed Jinja2 environment for secure template processing.
  343. Uses SandboxedEnvironment to prevent code injection vulnerabilities
  344. when processing untrusted templates. This restricts access to dangerous
  345. operations while still allowing safe template rendering.
  346. Returns:
  347. SandboxedEnvironment configured for template processing.
  348. """
  349. # NOTE Use SandboxedEnvironment for security - prevents arbitrary code execution
  350. return SandboxedEnvironment(
  351. loader=FileSystemLoader(searchpath),
  352. trim_blocks=True,
  353. lstrip_blocks=True,
  354. keep_trailing_newline=False,
  355. )
  356. def render(self, variables: VariableCollection) -> tuple[Dict[str, str], Dict[str, Any]]:
  357. """Render all .j2 files in the template directory.
  358. Returns:
  359. Tuple of (rendered_files, variable_values) where variable_values includes autogenerated values
  360. """
  361. # Use get_satisfied_values() to exclude variables from sections with unsatisfied dependencies
  362. variable_values = variables.get_satisfied_values()
  363. # Auto-generate values for autogenerated variables that are empty
  364. import secrets
  365. import string
  366. for section in variables.get_sections().values():
  367. for var_name, variable in section.variables.items():
  368. if variable.autogenerated and (variable.value is None or variable.value == ""):
  369. # Generate a secure random string (32 characters by default)
  370. alphabet = string.ascii_letters + string.digits
  371. generated_value = ''.join(secrets.choice(alphabet) for _ in range(32))
  372. variable_values[var_name] = generated_value
  373. logger.debug(f"Auto-generated value for variable '{var_name}'")
  374. logger.debug(f"Rendering template '{self.id}' with variables: {variable_values}")
  375. rendered_files = {}
  376. for template_file in self.template_files: # Iterate over TemplateFile objects
  377. if template_file.file_type == 'j2':
  378. try:
  379. template = self.jinja_env.get_template(str(template_file.relative_path)) # Use lazy-loaded jinja_env
  380. rendered_content = template.render(**variable_values)
  381. # Sanitize the rendered content to remove excessive blank lines
  382. rendered_content = self._sanitize_content(rendered_content, template_file.output_path)
  383. rendered_files[str(template_file.output_path)] = rendered_content
  384. except Exception as e:
  385. logger.error(f"Error rendering template file {template_file.relative_path}: {e}")
  386. raise TemplateRenderError(f"Error rendering {template_file.relative_path}: {e}")
  387. elif template_file.file_type == 'static':
  388. # For static files, just read their content and add to rendered_files
  389. # This ensures static files are also part of the output dictionary
  390. file_path = self.template_dir / template_file.relative_path
  391. try:
  392. with open(file_path, "r", encoding="utf-8") as f:
  393. content = f.read()
  394. rendered_files[str(template_file.output_path)] = content
  395. except (IOError, OSError) as e:
  396. logger.error(f"Error reading static file {file_path}: {e}")
  397. raise TemplateRenderError(f"Error reading static file {file_path}: {e}")
  398. return rendered_files, variable_values
  399. def _sanitize_content(self, content: str, file_path: Path) -> str:
  400. """Sanitize rendered content by removing excessive blank lines and trailing whitespace."""
  401. if not content:
  402. return content
  403. lines = [line.rstrip() for line in content.split('\n')]
  404. sanitized = []
  405. prev_blank = False
  406. for line in lines:
  407. is_blank = not line
  408. if is_blank and prev_blank:
  409. continue # Skip consecutive blank lines
  410. sanitized.append(line)
  411. prev_blank = is_blank
  412. # Remove leading blanks and ensure single trailing newline
  413. return '\n'.join(sanitized).lstrip('\n').rstrip('\n') + '\n'
  414. @property
  415. def template_files(self) -> List[TemplateFile]:
  416. if self.__template_files is None:
  417. self._collect_template_files() # Populate self.__template_files
  418. return self.__template_files
  419. @property
  420. def template_specs(self) -> dict:
  421. """Get the spec section from template YAML data."""
  422. return self._template_data.get("spec", {})
  423. @property
  424. def module_specs(self) -> dict:
  425. """Get the spec from the module definition."""
  426. if self.__module_specs is None:
  427. kind = self._template_data.get("kind")
  428. self.__module_specs = self._load_module_specs(kind)
  429. return self.__module_specs
  430. @property
  431. def merged_specs(self) -> dict:
  432. if self.__merged_specs is None:
  433. self.__merged_specs = self._merge_specs(self.module_specs, self.template_specs)
  434. return self.__merged_specs
  435. @property
  436. def jinja_env(self) -> Environment:
  437. if self.__jinja_env is None:
  438. self.__jinja_env = self._create_jinja_env(self.template_dir)
  439. return self.__jinja_env
  440. @property
  441. def used_variables(self) -> Set[str]:
  442. if self.__used_variables is None:
  443. self.__used_variables = self._extract_all_used_variables()
  444. return self.__used_variables
  445. @property
  446. def variables(self) -> VariableCollection:
  447. if self.__variables is None:
  448. # Validate that all used variables are defined
  449. self._validate_variable_definitions(self.used_variables, self.merged_specs)
  450. # Filter specs to only used variables
  451. filtered_specs = self._filter_specs_to_used(self.used_variables, self.merged_specs, self.module_specs, self.template_specs)
  452. # Best-effort: extract literal defaults from Jinja `default()` filter and
  453. # merge them into the filtered_specs when no default exists there.
  454. try:
  455. jinja_defaults = self._extract_jinja_default_values()
  456. for section_key, section_data in filtered_specs.items():
  457. # Guard against None from empty YAML sections
  458. vars_dict = section_data.get('vars') or {}
  459. for var_name, var_data in vars_dict.items():
  460. if 'default' not in var_data or var_data.get('default') in (None, ''):
  461. if var_name in jinja_defaults:
  462. var_data['default'] = jinja_defaults[var_name]
  463. except (KeyError, TypeError, AttributeError):
  464. # Keep behavior stable on any extraction errors
  465. pass
  466. self.__variables = VariableCollection(filtered_specs)
  467. # Sort sections: required first, then enabled, then disabled
  468. self.__variables.sort_sections()
  469. return self.__variables