template.py 24 KB


  1. from __future__ import annotations
  2. from .variables import Variable, VariableCollection
  3. from pathlib import Path
  4. from typing import Any, Dict, List, Set, Optional, Literal
  5. from dataclasses import dataclass, field
  6. import logging
  7. import os
  8. import yaml
  9. from jinja2 import Environment, FileSystemLoader, meta
  10. from jinja2 import nodes
  11. from jinja2.visitor import NodeVisitor
  12. logger = logging.getLogger(__name__)
  13. # -----------------------
  14. # SECTION: TemplateFile Class
  15. # -----------------------
  16. @dataclass
  17. class TemplateFile:
  18. """Represents a single file within a template directory."""
  19. relative_path: Path
  20. file_type: Literal['j2', 'static']
  21. output_path: Path # The path it will have in the output directory
  22. # !SECTION
  23. # -----------------------
  24. # SECTION: Metadata Class
  25. # -----------------------
  26. @dataclass
  27. class TemplateMetadata:
  28. """Represents template metadata with proper typing."""
  29. name: str
  30. description: str
  31. author: str
  32. date: str
  33. version: str
  34. module: str = ""
  35. tags: List[str] = field(default_factory=list)
  36. # files: List[str] = field(default_factory=list) # No longer needed, as TemplateFile handles this
  37. library: str = "unknown"
  38. next_steps: str = ""
  39. draft: bool = False
  40. def __init__(self, template_data: dict, library_name: str | None = None) -> None:
  41. """Initialize TemplateMetadata from parsed YAML template data.
  42. Args:
  43. template_data: Parsed YAML data from template.yaml
  44. library_name: Name of the library this template belongs to
  45. """
  46. # Validate metadata format first
  47. self._validate_metadata(template_data)
  48. # Extract metadata section
  49. metadata_section = template_data.get("metadata", {})
  50. self.name = metadata_section.get("name", "")
  51. # YAML block scalar (|) preserves a trailing newline. Remove only trailing newlines
  52. # while preserving internal newlines/formatting.
  53. raw_description = metadata_section.get("description", "")
  54. if isinstance(raw_description, str):
  55. description = raw_description.rstrip("\n")
  56. else:
  57. description = str(raw_description)
  58. self.description = description or "No description available"
  59. self.author = metadata_section.get("author", "")
  60. self.date = metadata_section.get("date", "")
  61. self.version = metadata_section.get("version", "")
  62. self.module = metadata_section.get("module", "")
  63. self.tags = metadata_section.get("tags", []) or []
  64. # self.files = metadata_section.get("files", []) or [] # No longer needed
  65. self.library = library_name or "unknown"
  66. self.draft = metadata_section.get("draft", False)
  67. # Extract next_steps (optional)
  68. raw_next_steps = metadata_section.get("next_steps", "")
  69. if isinstance(raw_next_steps, str):
  70. next_steps = raw_next_steps.rstrip("\n")
  71. else:
  72. next_steps = str(raw_next_steps) if raw_next_steps else ""
  73. self.next_steps = next_steps
  74. @staticmethod
  75. def _validate_metadata(template_data: dict) -> None:
  76. """Validate that template has required 'metadata' section with all required fields.
  77. Args:
  78. template_data: Parsed YAML data from template.yaml
  79. Raises:
  80. ValueError: If metadata section is missing or incomplete
  81. """
  82. metadata_section = template_data.get("metadata")
  83. if metadata_section is None:
  84. raise ValueError("Template format error: missing 'metadata' section")
  85. # Validate that metadata section has all required fields
  86. required_fields = ["name", "author", "version", "date", "description"]
  87. missing_fields = [field for field in required_fields if not metadata_section.get(field)]
  88. if missing_fields:
  89. raise ValueError(f"Template format error: missing required metadata fields: {missing_fields}")
  90. # !SECTION
  91. # -----------------------
  92. # SECTION: Template Class
  93. # -----------------------
  94. @dataclass
  95. class Template:
  96. """Represents a template directory."""
  97. def __init__(self, template_dir: Path, library_name: str) -> None:
  98. """Create a Template instance from a directory path."""
  99. logger.debug(f"Loading template from directory: {template_dir}")
  100. self.template_dir = template_dir
  101. self.id = template_dir.name
  102. self.library_name = library_name
  103. # Initialize caches for lazy loading
  104. self.__module_specs: Optional[dict] = None
  105. self.__merged_specs: Optional[dict] = None
  106. self.__jinja_env: Optional[Environment] = None
  107. self.__used_variables: Optional[Set[str]] = None
  108. self.__variables: Optional[VariableCollection] = None
  109. self.__template_files: Optional[List[TemplateFile]] = None # New attribute
  110. try:
  111. # Find and parse the main template file (template.yaml or template.yml)
  112. main_template_path = self._find_main_template_file()
  113. with open(main_template_path, "r", encoding="utf-8") as f:
  114. # Load all YAML documents (handles templates with empty lines before ---)
  115. documents = list(yaml.safe_load_all(f))
  116. # Filter out None/empty documents and get the first non-empty one
  117. valid_docs = [doc for doc in documents if doc is not None]
  118. if not valid_docs:
  119. raise ValueError("Template file contains no valid YAML data")
  120. if len(valid_docs) > 1:
  121. logger.warning(f"Template file contains multiple YAML documents, using the first one")
  122. self._template_data = valid_docs[0]
  123. # Validate template data
  124. if not isinstance(self._template_data, dict):
  125. raise ValueError("Template file must contain a valid YAML dictionary")
  126. # Load metadata (always needed)
  127. self.metadata = TemplateMetadata(self._template_data, library_name)
  128. logger.debug(f"Loaded metadata: {self.metadata}")
  129. # Validate 'kind' field (always needed)
  130. self._validate_kind(self._template_data)
  131. # Collect file paths (relatively lightweight, needed for various lazy loads)
  132. # This will now populate self.template_files
  133. self._collect_template_files()
  134. logger.info(f"Loaded template '{self.id}' (v{self.metadata.version})")
  135. except (ValueError, FileNotFoundError) as e:
  136. logger.error(f"Error loading template from {template_dir}: {e}")
  137. raise
  138. except Exception as e:
  139. logger.error(f"An unexpected error occurred while loading template {template_dir}: {e}")
  140. raise
  141. def _find_main_template_file(self) -> Path:
  142. """Find the main template file (template.yaml or template.yml)."""
  143. for filename in ["template.yaml", "template.yml"]:
  144. path = self.template_dir / filename
  145. if path.exists():
  146. return path
  147. raise FileNotFoundError(f"Main template file (template.yaml or template.yml) not found in {self.template_dir}")
  148. def _load_module_specs(self, kind: str) -> dict:
  149. """Load specifications from the corresponding module."""
  150. if not kind:
  151. return {}
  152. try:
  153. import importlib
  154. module = importlib.import_module(f"..modules.{kind}", package=__package__)
  155. return getattr(module, 'spec', {})
  156. except Exception as e:
  157. raise ValueError(f"Error loading module specifications for kind '{kind}': {e}")
  158. def _merge_specs(self, module_specs: dict, template_specs: dict) -> dict:
  159. """Deep merge template specs with module specs using VariableCollection.
  160. Uses VariableCollection's native merge() method for consistent merging logic.
  161. Module specs are base, template specs override with origin tracking.
  162. """
  163. # Create VariableCollection from module specs (base)
  164. module_collection = VariableCollection(module_specs) if module_specs else VariableCollection({})
  165. # Set origin for module variables
  166. for section in module_collection.get_sections().values():
  167. for variable in section.variables.values():
  168. if not variable.origin:
  169. variable.origin = "module"
  170. # Merge template specs into module specs (template overrides)
  171. if template_specs:
  172. merged_collection = module_collection.merge(template_specs, origin="template")
  173. else:
  174. merged_collection = module_collection
  175. # Convert back to dict format
  176. merged_spec = {}
  177. for section_key, section in merged_collection.get_sections().items():
  178. merged_spec[section_key] = section.to_dict()
  179. return merged_spec
  180. def _collect_template_files(self) -> None:
  181. """Collects all TemplateFile objects in the template directory."""
  182. template_files: List[TemplateFile] = []
  183. for root, _, files in os.walk(self.template_dir):
  184. for filename in files:
  185. file_path = Path(root) / filename
  186. relative_path = file_path.relative_to(self.template_dir)
  187. # Skip the main template file
  188. if filename in ["template.yaml", "template.yml"]:
  189. continue
  190. if filename.endswith(".j2"):
  191. file_type: Literal['j2', 'static'] = 'j2'
  192. output_path = relative_path.with_suffix('') # Remove .j2 suffix
  193. else:
  194. file_type = 'static'
  195. output_path = relative_path # Static files keep their name
  196. template_files.append(TemplateFile(relative_path=relative_path, file_type=file_type, output_path=output_path))
  197. self.__template_files = template_files
  198. def _extract_all_used_variables(self) -> Set[str]:
  199. """Extract all undeclared variables from all .j2 files in the template directory.
  200. Raises:
  201. ValueError: If any Jinja2 template has syntax errors
  202. """
  203. used_variables: Set[str] = set()
  204. syntax_errors = []
  205. for template_file in self.template_files: # Iterate over TemplateFile objects
  206. if template_file.file_type == 'j2':
  207. file_path = self.template_dir / template_file.relative_path
  208. try:
  209. with open(file_path, "r", encoding="utf-8") as f:
  210. content = f.read()
  211. ast = self.jinja_env.parse(content) # Use lazy-loaded jinja_env
  212. used_variables.update(meta.find_undeclared_variables(ast))
  213. except Exception as e:
  214. # Collect syntax errors instead of just warning
  215. relative_path = file_path.relative_to(self.template_dir)
  216. syntax_errors.append(f" - {relative_path}: {e}")
  217. # Raise error if any syntax errors were found
  218. if syntax_errors:
  219. error_msg = (
  220. f"Jinja2 syntax errors found in template '{self.id}':\n" +
  221. "\n".join(syntax_errors) +
  222. "\n\nPlease fix the syntax errors in the template files."
  223. )
  224. logger.error(error_msg)
  225. raise ValueError(error_msg)
  226. return used_variables
  227. def _extract_jinja_default_values(self) -> dict[str, object]:
  228. """Scan all .j2 files and extract literal arguments to the `default` filter.
  229. Returns a mapping var_name -> literal_value for simple cases like
  230. {{ var | default("value") }} or {{ var | default(123) }}.
  231. This does not attempt to evaluate complex expressions.
  232. """
  233. defaults: dict[str, object] = {}
  234. class _DefaultVisitor(NodeVisitor):
  235. def __init__(self):
  236. self.found: dict[str, object] = {}
  237. def visit_Filter(self, node: nodes.Filter) -> None: # type: ignore[override]
  238. try:
  239. if getattr(node, 'name', None) == 'default' and node.args:
  240. # target variable name when filter is applied directly to a Name
  241. target = None
  242. if isinstance(node.node, nodes.Name):
  243. target = node.node.name
  244. # first arg literal
  245. first = node.args[0]
  246. if isinstance(first, nodes.Const) and target:
  247. self.found[target] = first.value
  248. except Exception:
  249. # Be resilient to unexpected node shapes
  250. pass
  251. # continue traversal
  252. self.generic_visit(node)
  253. visitor = _DefaultVisitor()
  254. for template_file in self.template_files:
  255. if template_file.file_type != 'j2':
  256. continue
  257. file_path = self.template_dir / template_file.relative_path
  258. try:
  259. with open(file_path, 'r', encoding='utf-8') as f:
  260. content = f.read()
  261. ast = self.jinja_env.parse(content)
  262. visitor.visit(ast)
  263. except Exception:
  264. # skip failures - this extraction is best-effort only
  265. continue
  266. return visitor.found
  267. def _filter_specs_to_used(self, used_variables: set, merged_specs: dict, module_specs: dict, template_specs: dict) -> dict:
  268. """Filter specs to only include variables used in templates using VariableCollection.
  269. Uses VariableCollection's native filter_to_used() method.
  270. Keeps sensitive variables only if they're defined in the template spec or actually used.
  271. """
  272. # Build set of variables explicitly defined in template spec
  273. template_defined_vars = set()
  274. for section_data in (template_specs or {}).values():
  275. if isinstance(section_data, dict) and 'vars' in section_data:
  276. template_defined_vars.update(section_data['vars'].keys())
  277. # Create VariableCollection from merged specs
  278. merged_collection = VariableCollection(merged_specs)
  279. # Filter to only used variables (and sensitive ones that are template-defined)
  280. # We keep sensitive variables that are either:
  281. # 1. Actually used in template files, OR
  282. # 2. Explicitly defined in the template spec (even if not yet used)
  283. variables_to_keep = used_variables | template_defined_vars
  284. filtered_collection = merged_collection.filter_to_used(variables_to_keep, keep_sensitive=False)
  285. # Convert back to dict format
  286. filtered_specs = {}
  287. for section_key, section in filtered_collection.get_sections().items():
  288. filtered_specs[section_key] = section.to_dict()
  289. return filtered_specs
  290. # ---------------------------
  291. # SECTION: Validation Methods
  292. # ---------------------------
  293. @staticmethod
  294. def _validate_kind(template_data: dict) -> None:
  295. """Validate that template has required 'kind' field.
  296. Args:
  297. template_data: Parsed YAML data from template.yaml
  298. Raises:
  299. ValueError: If 'kind' field is missing
  300. """
  301. if not template_data.get("kind"):
  302. raise ValueError("Template format error: missing 'kind' field")
  303. def _validate_variable_definitions(self, used_variables: set[str], merged_specs: dict[str, Any]) -> None:
  304. """Validate that all variables used in Jinja2 content are defined in the spec."""
  305. defined_variables = set()
  306. for section_data in merged_specs.values():
  307. if "vars" in section_data and isinstance(section_data["vars"], dict):
  308. defined_variables.update(section_data["vars"].keys())
  309. undefined_variables = used_variables - defined_variables
  310. if undefined_variables:
  311. undefined_list = sorted(undefined_variables)
  312. error_msg = (
  313. f"Template validation error in '{self.id}': "
  314. f"Variables used in template content but not defined in spec: {undefined_list}\n\n"
  315. f"Please add these variables to your template's template.yaml spec. "
  316. f"Each variable must have a default value.\n\n"
  317. f"Example:\n"
  318. f"spec:\n"
  319. f" general:\n"
  320. f" vars:\n"
  321. )
  322. for var_name in undefined_list:
  323. error_msg += (
  324. f" {var_name}:\n"
  325. f" type: str\n"
  326. f" description: Description for {var_name}\n"
  327. f" default: <your_default_value_here>\n"
  328. )
  329. logger.error(error_msg)
  330. raise ValueError(error_msg)
  331. # !SECTION
  332. # ---------------------------------
  333. # SECTION: Jinja2 Rendering Methods
  334. # ---------------------------------
  335. @staticmethod
  336. def _create_jinja_env(searchpath: Path) -> Environment:
  337. """Create standardized Jinja2 environment for consistent template processing.
  338. Includes custom filters for generating random values:
  339. - random_string(length): Generate random alphanumeric string
  340. - random_hex(length): Generate random hexadecimal string
  341. - random_base64(length): Generate random base64 string
  342. - random_uuid: Generate a UUID4
  343. """
  344. import secrets
  345. import string
  346. import base64
  347. import uuid
  348. env = Environment(
  349. loader=FileSystemLoader(searchpath),
  350. trim_blocks=True,
  351. lstrip_blocks=True,
  352. keep_trailing_newline=False,
  353. )
  354. # Add custom filters for generating random values
  355. def random_string(value: str = '', length: int = 32) -> str:
  356. """Generate a random alphanumeric string of specified length.
  357. Usage: {{ '' | random_string(64) }} or {{ 'ignored' | random_string(32) }}
  358. """
  359. alphabet = string.ascii_letters + string.digits
  360. return ''.join(secrets.choice(alphabet) for _ in range(length))
  361. def pwgen(value: str = '', length: int = 50) -> str:
  362. """Generate a secure random string (mimics pwgen -s).
  363. Default length is 50 (matching Authentik recommendation).
  364. Usage: {{ '' | pwgen }} or {{ '' | pwgen(64) }}
  365. """
  366. alphabet = string.ascii_letters + string.digits
  367. return ''.join(secrets.choice(alphabet) for _ in range(length))
  368. def random_hex(value: str = '', length: int = 32) -> str:
  369. """Generate a random hexadecimal string of specified length.
  370. Usage: {{ '' | random_hex(64) }}
  371. """
  372. return secrets.token_hex(length // 2)
  373. def random_base64(value: str = '', length: int = 32) -> str:
  374. """Generate a random base64 string of specified length.
  375. Usage: {{ '' | random_base64(64) }}
  376. """
  377. num_bytes = (length * 3) // 4 # Convert length to approximate bytes
  378. return base64.b64encode(secrets.token_bytes(num_bytes)).decode('utf-8')[:length]
  379. def random_uuid(value: str = '') -> str:
  380. """Generate a random UUID4.
  381. Usage: {{ '' | random_uuid }}
  382. """
  383. return str(uuid.uuid4())
  384. # Register filters
  385. env.filters['random_string'] = random_string
  386. env.filters['pwgen'] = pwgen
  387. env.filters['random_hex'] = random_hex
  388. env.filters['random_base64'] = random_base64
  389. env.filters['random_uuid'] = random_uuid
  390. return env
  391. def render(self, variables: VariableCollection) -> Dict[str, str]:
  392. """Render all .j2 files in the template directory."""
  393. # Use get_satisfied_values() to exclude variables from sections with unsatisfied dependencies
  394. variable_values = variables.get_satisfied_values()
  395. logger.debug(f"Rendering template '{self.id}' with variables: {variable_values}")
  396. rendered_files = {}
  397. for template_file in self.template_files: # Iterate over TemplateFile objects
  398. if template_file.file_type == 'j2':
  399. try:
  400. template = self.jinja_env.get_template(str(template_file.relative_path)) # Use lazy-loaded jinja_env
  401. rendered_content = template.render(**variable_values)
  402. # Sanitize the rendered content to remove excessive blank lines
  403. rendered_content = self._sanitize_content(rendered_content, template_file.output_path)
  404. rendered_files[str(template_file.output_path)] = rendered_content
  405. except Exception as e:
  406. logger.error(f"Error rendering template file {template_file.relative_path}: {e}")
  407. raise
  408. elif template_file.file_type == 'static':
  409. # For static files, just read their content and add to rendered_files
  410. # This ensures static files are also part of the output dictionary
  411. file_path = self.template_dir / template_file.relative_path
  412. try:
  413. with open(file_path, "r", encoding="utf-8") as f:
  414. content = f.read()
  415. rendered_files[str(template_file.output_path)] = content
  416. except Exception as e:
  417. logger.error(f"Error reading static file {file_path}: {e}")
  418. raise
  419. return rendered_files
  420. def _sanitize_content(self, content: str, file_path: Path) -> str:
  421. """Sanitize rendered content by removing excessive blank lines.
  422. This function:
  423. - Reduces multiple consecutive blank lines to a maximum of one blank line
  424. - Preserves file structure and readability
  425. - Removes trailing whitespace from lines
  426. - Ensures file ends with a single newline
  427. Args:
  428. content: The rendered content to sanitize
  429. file_path: Path to the output file (used for file-type detection)
  430. Returns:
  431. Sanitized content with cleaned up blank lines
  432. """
  433. if not content:
  434. return content
  435. # Split content into lines
  436. lines = content.split('\n')
  437. sanitized_lines = []
  438. blank_line_count = 0
  439. for line in lines:
  440. # Remove trailing whitespace from the line
  441. cleaned_line = line.rstrip()
  442. # Check if this is a blank line
  443. if not cleaned_line:
  444. blank_line_count += 1
  445. # Only keep the first blank line in a sequence
  446. if blank_line_count == 1:
  447. sanitized_lines.append('')
  448. else:
  449. # Reset counter when we hit a non-blank line
  450. blank_line_count = 0
  451. sanitized_lines.append(cleaned_line)
  452. # Join lines back together
  453. result = '\n'.join(sanitized_lines)
  454. # Remove leading blank lines
  455. result = result.lstrip('\n')
  456. # Ensure file ends with exactly one newline
  457. result = result.rstrip('\n') + '\n'
  458. return result
  459. def mask_sensitive_values(self, rendered_files: Dict[str, str], variables: VariableCollection) -> Dict[str, str]:
  460. """Mask sensitive values in rendered files using Variable's native masking."""
  461. masked_files = {}
  462. # Get all variables (not just sensitive ones) to use their native get_display_value()
  463. for file_path, content in rendered_files.items():
  464. # Iterate through all sections and variables
  465. for section in variables.get_sections().values():
  466. for variable in section.variables.values():
  467. if variable.sensitive and variable.value:
  468. # Use variable's native masking - always returns "********" for sensitive vars
  469. masked_value = variable.get_display_value(mask_sensitive=True)
  470. content = content.replace(str(variable.value), masked_value)
  471. masked_files[file_path] = content
  472. return masked_files
  473. # !SECTION
  474. # ---------------------------
  475. # SECTION: Lazy Loaded Properties
  476. # ---------------------------
  477. @property
  478. def template_files(self) -> List[TemplateFile]:
  479. if self.__template_files is None:
  480. self._collect_template_files() # Populate self.__template_files
  481. return self.__template_files
  482. @property
  483. def template_specs(self) -> dict:
  484. """Get the spec section from template YAML data."""
  485. return self._template_data.get("spec", {})
  486. @property
  487. def module_specs(self) -> dict:
  488. """Get the spec from the module definition."""
  489. if self.__module_specs is None:
  490. kind = self._template_data.get("kind")
  491. self.__module_specs = self._load_module_specs(kind)
  492. return self.__module_specs
  493. @property
  494. def merged_specs(self) -> dict:
  495. if self.__merged_specs is None:
  496. self.__merged_specs = self._merge_specs(self.module_specs, self.template_specs)
  497. return self.__merged_specs
  498. @property
  499. def jinja_env(self) -> Environment:
  500. if self.__jinja_env is None:
  501. self.__jinja_env = self._create_jinja_env(self.template_dir)
  502. return self.__jinja_env
  503. @property
  504. def used_variables(self) -> Set[str]:
  505. if self.__used_variables is None:
  506. self.__used_variables = self._extract_all_used_variables()
  507. return self.__used_variables
  508. @property
  509. def variables(self) -> VariableCollection:
  510. if self.__variables is None:
  511. # Validate that all used variables are defined
  512. self._validate_variable_definitions(self.used_variables, self.merged_specs)
  513. # Filter specs to only used variables
  514. filtered_specs = self._filter_specs_to_used(self.used_variables, self.merged_specs, self.module_specs, self.template_specs)
  515. # Best-effort: extract literal defaults from Jinja `default()` filter and
  516. # merge them into the filtered_specs when no default exists there.
  517. try:
  518. jinja_defaults = self._extract_jinja_default_values()
  519. for section_key, section_data in filtered_specs.items():
  520. # Guard against None from empty YAML sections
  521. vars_dict = section_data.get('vars') or {}
  522. for var_name, var_data in vars_dict.items():
  523. if 'default' not in var_data or var_data.get('default') in (None, ''):
  524. if var_name in jinja_defaults:
  525. var_data['default'] = jinja_defaults[var_name]
  526. except Exception:
  527. # keep behavior stable on any extraction errors
  528. pass
  529. self.__variables = VariableCollection(filtered_specs)
  530. # Sort sections: required first, then enabled, then disabled
  531. self.__variables.sort_sections()
  532. return self.__variables