template.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493
  1. from __future__ import annotations
  2. from .variables import Variable, VariableCollection
  3. from pathlib import Path
  4. from typing import Any, Dict, List, Set, Optional, Literal
  5. from dataclasses import dataclass, field
  6. import logging
  7. import os
  8. import yaml
  9. from jinja2 import Environment, FileSystemLoader, meta
  10. from jinja2 import nodes
  11. from jinja2.visitor import NodeVisitor
  12. logger = logging.getLogger(__name__)
  13. # -----------------------
  14. # SECTION: TemplateFile Class
  15. # -----------------------
  16. @dataclass
  17. class TemplateFile:
  18. """Represents a single file within a template directory."""
  19. relative_path: Path
  20. file_type: Literal['j2', 'static']
  21. output_path: Path # The path it will have in the output directory
  22. # !SECTION
  23. # -----------------------
  24. # SECTION: Metadata Class
  25. # -----------------------
  26. @dataclass
  27. class TemplateMetadata:
  28. """Represents template metadata with proper typing."""
  29. name: str
  30. description: str
  31. author: str
  32. date: str
  33. version: str
  34. module: str = ""
  35. tags: List[str] = field(default_factory=list)
  36. # files: List[str] = field(default_factory=list) # No longer needed, as TemplateFile handles this
  37. library: str = "unknown"
  38. def __init__(self, template_data: dict, library_name: str | None = None) -> None:
  39. """Initialize TemplateMetadata from parsed YAML template data.
  40. Args:
  41. template_data: Parsed YAML data from template.yaml
  42. library_name: Name of the library this template belongs to
  43. """
  44. # Validate metadata format first
  45. self._validate_metadata(template_data)
  46. # Extract metadata section
  47. metadata_section = template_data.get("metadata", {})
  48. self.name = metadata_section.get("name", "")
  49. # YAML block scalar (|) preserves a trailing newline. Remove only trailing newlines
  50. # while preserving internal newlines/formatting.
  51. raw_description = metadata_section.get("description", "")
  52. if isinstance(raw_description, str):
  53. description = raw_description.rstrip("\n")
  54. else:
  55. description = str(raw_description)
  56. self.description = description or "No description available"
  57. self.author = metadata_section.get("author", "")
  58. self.date = metadata_section.get("date", "")
  59. self.version = metadata_section.get("version", "")
  60. self.module = metadata_section.get("module", "")
  61. self.tags = metadata_section.get("tags", []) or []
  62. # self.files = metadata_section.get("files", []) or [] # No longer needed
  63. self.library = library_name or "unknown"
  64. @staticmethod
  65. def _validate_metadata(template_data: dict) -> None:
  66. """Validate that template has required 'metadata' section with all required fields.
  67. Args:
  68. template_data: Parsed YAML data from template.yaml
  69. Raises:
  70. ValueError: If metadata section is missing or incomplete
  71. """
  72. metadata_section = template_data.get("metadata")
  73. if metadata_section is None:
  74. raise ValueError("Template format error: missing 'metadata' section")
  75. # Validate that metadata section has all required fields
  76. required_fields = ["name", "author", "version", "date", "description"]
  77. missing_fields = [field for field in required_fields if not metadata_section.get(field)]
  78. if missing_fields:
  79. raise ValueError(f"Template format error: missing required metadata fields: {missing_fields}")
  80. # !SECTION
  81. # -----------------------
  82. # SECTION: Template Class
  83. # -----------------------
  84. @dataclass
  85. class Template:
  86. """Represents a template directory."""
  87. def __init__(self, template_dir: Path, library_name: str) -> None:
  88. """Create a Template instance from a directory path."""
  89. logger.debug(f"Loading template from directory: {template_dir}")
  90. self.template_dir = template_dir
  91. self.id = template_dir.name
  92. self.library_name = library_name
  93. # Initialize caches for lazy loading
  94. self.__module_specs: Optional[dict] = None
  95. self.__merged_specs: Optional[dict] = None
  96. self.__jinja_env: Optional[Environment] = None
  97. self.__used_variables: Optional[Set[str]] = None
  98. self.__variables: Optional[VariableCollection] = None
  99. self.__template_files: Optional[List[TemplateFile]] = None # New attribute
  100. try:
  101. # Find and parse the main template file (template.yaml or template.yml)
  102. main_template_path = self._find_main_template_file()
  103. with open(main_template_path, "r", encoding="utf-8") as f:
  104. # Load all YAML documents (handles templates with empty lines before ---)
  105. documents = list(yaml.safe_load_all(f))
  106. # Filter out None/empty documents and get the first non-empty one
  107. valid_docs = [doc for doc in documents if doc is not None]
  108. if not valid_docs:
  109. raise ValueError("Template file contains no valid YAML data")
  110. if len(valid_docs) > 1:
  111. logger.warning(f"Template file contains multiple YAML documents, using the first one")
  112. self._template_data = valid_docs[0]
  113. # Validate template data
  114. if not isinstance(self._template_data, dict):
  115. raise ValueError("Template file must contain a valid YAML dictionary")
  116. # Load metadata (always needed)
  117. self.metadata = TemplateMetadata(self._template_data, library_name)
  118. logger.debug(f"Loaded metadata: {self.metadata}")
  119. # Validate 'kind' field (always needed)
  120. self._validate_kind(self._template_data)
  121. # Collect file paths (relatively lightweight, needed for various lazy loads)
  122. # This will now populate self.template_files
  123. self._collect_template_files()
  124. logger.info(f"Loaded template '{self.id}' (v{self.metadata.version})")
  125. except (ValueError, FileNotFoundError) as e:
  126. logger.error(f"Error loading template from {template_dir}: {e}")
  127. raise
  128. except Exception as e:
  129. logger.error(f"An unexpected error occurred while loading template {template_dir}: {e}")
  130. raise
  131. def _find_main_template_file(self) -> Path:
  132. """Find the main template file (template.yaml or template.yml)."""
  133. for filename in ["template.yaml", "template.yml"]:
  134. path = self.template_dir / filename
  135. if path.exists():
  136. return path
  137. raise FileNotFoundError(f"Main template file (template.yaml or template.yml) not found in {self.template_dir}")
  138. def _load_module_specs(self, kind: str) -> dict:
  139. """Load specifications from the corresponding module."""
  140. if not kind:
  141. return {}
  142. try:
  143. import importlib
  144. module = importlib.import_module(f"..modules.{kind}", package=__package__)
  145. return getattr(module, 'spec', {})
  146. except Exception as e:
  147. raise ValueError(f"Error loading module specifications for kind '{kind}': {e}")
  148. def _merge_specs(self, module_specs: dict, template_specs: dict) -> dict:
  149. """Deep merge template specs with module specs using VariableCollection.
  150. Uses VariableCollection's native merge() method for consistent merging logic.
  151. Module specs are base, template specs override with origin tracking.
  152. """
  153. # Create VariableCollection from module specs (base)
  154. module_collection = VariableCollection(module_specs) if module_specs else VariableCollection({})
  155. # Set origin for module variables
  156. for section in module_collection.get_sections().values():
  157. for variable in section.variables.values():
  158. if not variable.origin:
  159. variable.origin = "module"
  160. # Merge template specs into module specs (template overrides)
  161. if template_specs:
  162. merged_collection = module_collection.merge(template_specs, origin="template")
  163. else:
  164. merged_collection = module_collection
  165. # Convert back to dict format
  166. merged_spec = {}
  167. for section_key, section in merged_collection.get_sections().items():
  168. merged_spec[section_key] = section.to_dict()
  169. return merged_spec
  170. def _collect_template_files(self) -> None:
  171. """Collects all TemplateFile objects in the template directory."""
  172. template_files: List[TemplateFile] = []
  173. for root, _, files in os.walk(self.template_dir):
  174. for filename in files:
  175. file_path = Path(root) / filename
  176. relative_path = file_path.relative_to(self.template_dir)
  177. # Skip the main template file
  178. if filename in ["template.yaml", "template.yml"]:
  179. continue
  180. if filename.endswith(".j2"):
  181. file_type: Literal['j2', 'static'] = 'j2'
  182. output_path = relative_path.with_suffix('') # Remove .j2 suffix
  183. else:
  184. file_type = 'static'
  185. output_path = relative_path # Static files keep their name
  186. template_files.append(TemplateFile(relative_path=relative_path, file_type=file_type, output_path=output_path))
  187. self.__template_files = template_files
  188. def _extract_all_used_variables(self) -> Set[str]:
  189. """Extract all undeclared variables from all .j2 files in the template directory."""
  190. used_variables: Set[str] = set()
  191. for template_file in self.template_files: # Iterate over TemplateFile objects
  192. if template_file.file_type == 'j2':
  193. file_path = self.template_dir / template_file.relative_path
  194. try:
  195. with open(file_path, "r", encoding="utf-8") as f:
  196. content = f.read()
  197. ast = self.jinja_env.parse(content) # Use lazy-loaded jinja_env
  198. used_variables.update(meta.find_undeclared_variables(ast))
  199. except Exception as e:
  200. logger.warning(f"Could not parse Jinja2 variables from {file_path}: {e}")
  201. return used_variables
  202. def _extract_jinja_default_values(self) -> dict[str, object]:
  203. """Scan all .j2 files and extract literal arguments to the `default` filter.
  204. Returns a mapping var_name -> literal_value for simple cases like
  205. {{ var | default("value") }} or {{ var | default(123) }}.
  206. This does not attempt to evaluate complex expressions.
  207. """
  208. defaults: dict[str, object] = {}
  209. class _DefaultVisitor(NodeVisitor):
  210. def __init__(self):
  211. self.found: dict[str, object] = {}
  212. def visit_Filter(self, node: nodes.Filter) -> None: # type: ignore[override]
  213. try:
  214. if getattr(node, 'name', None) == 'default' and node.args:
  215. # target variable name when filter is applied directly to a Name
  216. target = None
  217. if isinstance(node.node, nodes.Name):
  218. target = node.node.name
  219. # first arg literal
  220. first = node.args[0]
  221. if isinstance(first, nodes.Const) and target:
  222. self.found[target] = first.value
  223. except Exception:
  224. # Be resilient to unexpected node shapes
  225. pass
  226. # continue traversal
  227. self.generic_visit(node)
  228. visitor = _DefaultVisitor()
  229. for template_file in self.template_files:
  230. if template_file.file_type != 'j2':
  231. continue
  232. file_path = self.template_dir / template_file.relative_path
  233. try:
  234. with open(file_path, 'r', encoding='utf-8') as f:
  235. content = f.read()
  236. ast = self.jinja_env.parse(content)
  237. visitor.visit(ast)
  238. except Exception:
  239. # skip failures - this extraction is best-effort only
  240. continue
  241. return visitor.found
  242. def _filter_specs_to_used(self, used_variables: set, merged_specs: dict, module_specs: dict, template_specs: dict) -> dict:
  243. """Filter specs to only include variables used in templates using VariableCollection.
  244. Uses VariableCollection's native filter_to_used() method.
  245. Keeps sensitive variables even if not used.
  246. """
  247. # Create VariableCollection from merged specs
  248. merged_collection = VariableCollection(merged_specs)
  249. # Filter to only used variables (and sensitive ones)
  250. filtered_collection = merged_collection.filter_to_used(used_variables, keep_sensitive=True)
  251. # Convert back to dict format
  252. filtered_specs = {}
  253. for section_key, section in filtered_collection.get_sections().items():
  254. filtered_specs[section_key] = section.to_dict()
  255. return filtered_specs
  256. # ---------------------------
  257. # SECTION: Validation Methods
  258. # ---------------------------
  259. @staticmethod
  260. def _validate_kind(template_data: dict) -> None:
  261. """Validate that template has required 'kind' field.
  262. Args:
  263. template_data: Parsed YAML data from template.yaml
  264. Raises:
  265. ValueError: If 'kind' field is missing
  266. """
  267. if not template_data.get("kind"):
  268. raise ValueError("Template format error: missing 'kind' field")
  269. def _validate_variable_definitions(self, used_variables: set[str], merged_specs: dict[str, Any]) -> None:
  270. """Validate that all variables used in Jinja2 content are defined in the spec."""
  271. defined_variables = set()
  272. for section_data in merged_specs.values():
  273. if "vars" in section_data and isinstance(section_data["vars"], dict):
  274. defined_variables.update(section_data["vars"].keys())
  275. undefined_variables = used_variables - defined_variables
  276. if undefined_variables:
  277. undefined_list = sorted(undefined_variables)
  278. error_msg = (
  279. f"Template validation error in '{self.id}': "
  280. f"Variables used in template content but not defined in spec: {undefined_list}\n\n"
  281. f"Please add these variables to your template's template.yaml spec. "
  282. f"Each variable must have a default value.\n\n"
  283. f"Example:\n"
  284. f"spec:\n"
  285. f" general:\n"
  286. f" vars:\n"
  287. )
  288. for var_name in undefined_list:
  289. error_msg += (
  290. f" {var_name}:\n"
  291. f" type: str\n"
  292. f" description: Description for {var_name}\n"
  293. f" default: <your_default_value_here>\n"
  294. )
  295. logger.error(error_msg)
  296. raise ValueError(error_msg)
  297. # !SECTION
  298. # ---------------------------------
  299. # SECTION: Jinja2 Rendering Methods
  300. # ---------------------------------
  301. @staticmethod
  302. def _create_jinja_env(searchpath: Path) -> Environment:
  303. """Create standardized Jinja2 environment for consistent template processing."""
  304. return Environment(
  305. loader=FileSystemLoader(searchpath),
  306. trim_blocks=True,
  307. lstrip_blocks=True,
  308. keep_trailing_newline=False,
  309. )
  310. def render(self, variables: VariableCollection) -> Dict[str, str]:
  311. """Render all .j2 files in the template directory."""
  312. variable_values = variables.get_all_values()
  313. logger.debug(f"Rendering template '{self.id}' with variables: {variable_values}")
  314. rendered_files = {}
  315. for template_file in self.template_files: # Iterate over TemplateFile objects
  316. if template_file.file_type == 'j2':
  317. try:
  318. template = self.jinja_env.get_template(str(template_file.relative_path)) # Use lazy-loaded jinja_env
  319. rendered_content = template.render(**variable_values)
  320. rendered_files[str(template_file.output_path)] = rendered_content
  321. except Exception as e:
  322. logger.error(f"Error rendering template file {template_file.relative_path}: {e}")
  323. raise
  324. elif template_file.file_type == 'static':
  325. # For static files, just read their content and add to rendered_files
  326. # This ensures static files are also part of the output dictionary
  327. file_path = self.template_dir / template_file.relative_path
  328. try:
  329. with open(file_path, "r", encoding="utf-8") as f:
  330. content = f.read()
  331. rendered_files[str(template_file.output_path)] = content
  332. except Exception as e:
  333. logger.error(f"Error reading static file {file_path}: {e}")
  334. raise
  335. return rendered_files
  336. def mask_sensitive_values(self, rendered_files: Dict[str, str], variables: VariableCollection) -> Dict[str, str]:
  337. """Mask sensitive values in rendered files using Variable's native masking."""
  338. masked_files = {}
  339. # Get all variables (not just sensitive ones) to use their native get_display_value()
  340. for file_path, content in rendered_files.items():
  341. # Iterate through all sections and variables
  342. for section in variables.get_sections().values():
  343. for variable in section.variables.values():
  344. if variable.sensitive and variable.value:
  345. # Use variable's native masking - always returns "********" for sensitive vars
  346. masked_value = variable.get_display_value(mask_sensitive=True)
  347. content = content.replace(str(variable.value), masked_value)
  348. masked_files[file_path] = content
  349. return masked_files
  350. # !SECTION
  351. # ---------------------------
  352. # SECTION: Lazy Loaded Properties
  353. # ---------------------------
  354. @property
  355. def template_files(self) -> List[TemplateFile]:
  356. if self.__template_files is None:
  357. self._collect_template_files() # Populate self.__template_files
  358. return self.__template_files
  359. @property
  360. def template_specs(self) -> dict:
  361. """Get the spec section from template YAML data."""
  362. return self._template_data.get("spec", {})
  363. @property
  364. def module_specs(self) -> dict:
  365. """Get the spec from the module definition."""
  366. if self.__module_specs is None:
  367. kind = self._template_data.get("kind")
  368. self.__module_specs = self._load_module_specs(kind)
  369. return self.__module_specs
  370. @property
  371. def merged_specs(self) -> dict:
  372. if self.__merged_specs is None:
  373. self.__merged_specs = self._merge_specs(self.module_specs, self.template_specs)
  374. return self.__merged_specs
  375. @property
  376. def jinja_env(self) -> Environment:
  377. if self.__jinja_env is None:
  378. self.__jinja_env = self._create_jinja_env(self.template_dir)
  379. return self.__jinja_env
  380. @property
  381. def used_variables(self) -> Set[str]:
  382. if self.__used_variables is None:
  383. self.__used_variables = self._extract_all_used_variables()
  384. return self.__used_variables
  385. @property
  386. def variables(self) -> VariableCollection:
  387. if self.__variables is None:
  388. # Validate that all used variables are defined
  389. self._validate_variable_definitions(self.used_variables, self.merged_specs)
  390. # Filter specs to only used variables
  391. filtered_specs = self._filter_specs_to_used(self.used_variables, self.merged_specs, self.module_specs, self.template_specs)
  392. # Best-effort: extract literal defaults from Jinja `default()` filter and
  393. # merge them into the filtered_specs when no default exists there.
  394. try:
  395. jinja_defaults = self._extract_jinja_default_values()
  396. for section_key, section_data in filtered_specs.items():
  397. # Guard against None from empty YAML sections
  398. vars_dict = section_data.get('vars') or {}
  399. for var_name, var_data in vars_dict.items():
  400. if 'default' not in var_data or var_data.get('default') in (None, ''):
  401. if var_name in jinja_defaults:
  402. var_data['default'] = jinja_defaults[var_name]
  403. except Exception:
  404. # keep behavior stable on any extraction errors
  405. pass
  406. self.__variables = VariableCollection(filtered_specs)
  407. return self.__variables