variable_collection.py 49 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124
  1. from __future__ import annotations
  2. import logging
  3. from collections import defaultdict
  4. from typing import Any
  5. from .variable import Variable
  6. from .variable_section import VariableSection
  7. logger = logging.getLogger(__name__)
  8. class VariableCollection:
  9. """Manages variables grouped by sections and builds Jinja context."""
  10. def __init__(self, spec: dict[str, Any]) -> None:
  11. """Initialize VariableCollection from a specification dictionary.
  12. Args:
  13. spec: Dictionary containing the complete variable specification structure
  14. Expected format (as used in compose.py):
  15. {
  16. "section_key": {
  17. "title": "Section Title",
  18. "prompt": "Optional prompt text",
  19. "toggle": "optional_toggle_var_name",
  20. "description": "Optional description",
  21. "vars": {
  22. "var_name": {
  23. "description": "Variable description",
  24. "type": "str",
  25. "default": "default_value",
  26. ...
  27. }
  28. }
  29. }
  30. }
  31. """
  32. if not isinstance(spec, dict):
  33. raise ValueError("Spec must be a dictionary")
  34. self._sections: dict[str, VariableSection] = {}
  35. # NOTE: The _variable_map provides a flat, O(1) lookup for any variable by its name,
  36. # avoiding the need to iterate through sections. It stores references to the same
  37. # Variable objects contained in the _set structure.
  38. self._variable_map: dict[str, Variable] = {}
  39. self._initialize_sections(spec)
  40. # Validate dependencies after all sections are loaded
  41. self._validate_dependencies()
  42. @classmethod
  43. def from_json(cls, json_spec: list[dict[str, Any]]) -> VariableCollection:
  44. """Create VariableCollection from JSON array format.
  45. Args:
  46. json_spec: List of section specifications in JSON format.
  47. Expected format:
  48. [
  49. {
  50. "key": "section_key",
  51. "title": "Section Title",
  52. "description": "Optional description",
  53. "toggle": "optional_toggle_var_name",
  54. "required": true,
  55. "needs": "dependency_section",
  56. "vars": [
  57. {
  58. "name": "var_name",
  59. "description": "Variable description",
  60. "type": "str",
  61. "default": "default_value",
  62. ...
  63. }
  64. ]
  65. }
  66. ]
  67. Returns:
  68. VariableCollection initialized from JSON spec
  69. Raises:
  70. ValueError: If json_spec is not a list or has invalid structure
  71. """
  72. if not isinstance(json_spec, list):
  73. raise ValueError("JSON spec must be a list")
  74. # Convert JSON array format to dict format expected by __init__
  75. dict_spec = {}
  76. for section_data in json_spec:
  77. section_key = cls._validate_and_extract_section_key(section_data)
  78. section_dict = cls._build_section_dict(section_data)
  79. vars_dict = cls._convert_vars_to_dict(section_data, section_key)
  80. section_dict["vars"] = vars_dict
  81. dict_spec[section_key] = section_dict
  82. # Create and return VariableCollection using standard __init__
  83. return cls(dict_spec)
  84. @staticmethod
  85. def _validate_and_extract_section_key(section_data: Any) -> str:
  86. """Validate section data and extract the section key.
  87. Args:
  88. section_data: Section data to validate
  89. Returns:
  90. The section key
  91. Raises:
  92. ValueError: If validation fails
  93. """
  94. if not isinstance(section_data, dict):
  95. raise ValueError(f"Section must be a dict, got {type(section_data).__name__}")
  96. if "key" not in section_data:
  97. raise ValueError("Section missing required 'key' field")
  98. if "vars" not in section_data:
  99. raise ValueError(f"Section '{section_data['key']}' missing required 'vars' field")
  100. return section_data["key"]
  101. @staticmethod
  102. def _build_section_dict(section_data: dict[str, Any]) -> dict[str, Any]:
  103. """Build section dictionary with optional fields.
  104. Args:
  105. section_data: Source section data
  106. Returns:
  107. Dictionary with only present optional fields
  108. """
  109. section_dict = {}
  110. optional_fields = ["title", "description", "toggle", "needs"]
  111. for field in optional_fields:
  112. if field in section_data:
  113. section_dict[field] = section_data[field]
  114. return section_dict
  115. @staticmethod
  116. def _convert_vars_to_dict(section_data: dict[str, Any], section_key: str) -> dict[str, Any]:
  117. """Convert vars array to dictionary format.
  118. Args:
  119. section_data: Section data containing vars array
  120. section_key: Section key for error messages
  121. Returns:
  122. Dictionary mapping variable names to their specifications
  123. Raises:
  124. ValueError: If vars format is invalid
  125. """
  126. if not isinstance(section_data["vars"], list):
  127. raise ValueError(f"Section '{section_key}' vars must be a list")
  128. vars_dict = {}
  129. for var_data in section_data["vars"]:
  130. if not isinstance(var_data, dict):
  131. raise ValueError(f"Variable in section '{section_key}' must be a dict")
  132. if "name" not in var_data:
  133. raise ValueError(f"Variable in section '{section_key}' missing 'name' field")
  134. var_name = var_data["name"]
  135. # Copy all fields except 'name' to the var dict
  136. var_dict = {k: v for k, v in var_data.items() if k != "name"}
  137. vars_dict[var_name] = var_dict
  138. return vars_dict
  139. def _initialize_sections(self, spec: dict[str, Any]) -> None:
  140. """Initialize sections from the spec."""
  141. for section_key, section_data in spec.items():
  142. if not isinstance(section_data, dict):
  143. continue
  144. section = self._create_section(section_key, section_data)
  145. # Guard against None from empty YAML sections (vars: with no content)
  146. vars_data = section_data.get("vars") or {}
  147. self._initialize_variables(section, vars_data)
  148. self._sections[section_key] = section
  149. # Validate all variable names are unique across sections
  150. self._validate_unique_variable_names()
  151. def _create_section(self, key: str, data: dict[str, Any]) -> VariableSection:
  152. """Create a VariableSection from data."""
  153. # Build section init data with only explicitly provided fields
  154. # This prevents None values from overriding module spec values during merge
  155. section_init_data = {
  156. "key": key,
  157. "title": data.get("title", key.replace("_", " ").title()),
  158. }
  159. # Only add optional fields if explicitly provided in the source data
  160. if "description" in data:
  161. section_init_data["description"] = data["description"]
  162. if "toggle" in data:
  163. section_init_data["toggle"] = data["toggle"]
  164. if "needs" in data:
  165. section_init_data["needs"] = data["needs"]
  166. return VariableSection(section_init_data)
  167. def _initialize_variables(self, section: VariableSection, vars_data: dict[str, Any]) -> None:
  168. """Initialize variables for a section."""
  169. # Guard against None from empty YAML sections
  170. if vars_data is None:
  171. vars_data = {}
  172. for var_name, var_data in vars_data.items():
  173. var_init_data = {"name": var_name, "parent_section": section, **var_data}
  174. variable = Variable(var_init_data)
  175. section.variables[var_name] = variable
  176. # NOTE: Populate the direct lookup map for efficient access.
  177. self._variable_map[var_name] = variable
  178. # Validate toggle variable after all variables are added
  179. self._validate_section_toggle(section)
  180. # TODO: Add more section-level validation:
  181. # - Validate that required sections have at least one non-toggle variable
  182. # - Validate that enum variables have non-empty options lists
  183. # - Validate that variable names follow naming conventions (e.g., lowercase_with_underscores)
  184. # - Validate that default values are compatible with their type definitions
  185. def _validate_unique_variable_names(self) -> None:
  186. """Validate that all variable names are unique across all sections."""
  187. var_to_sections: dict[str, list[str]] = defaultdict(list)
  188. # Build mapping of variable names to sections
  189. for section_key, section in self._sections.items():
  190. for var_name in section.variables:
  191. var_to_sections[var_name].append(section_key)
  192. # Find duplicates and format error
  193. duplicates = {var: sections for var, sections in var_to_sections.items() if len(sections) > 1}
  194. if duplicates:
  195. errors = ["Variable names must be unique across all sections, but found duplicates:"]
  196. errors.extend(
  197. f" - '{var}' appears in sections: {', '.join(secs)}" for var, secs in sorted(duplicates.items())
  198. )
  199. errors.append("\nPlease rename variables to be unique or consolidate them into a single section.")
  200. error_msg = "\n".join(errors)
  201. logger.error(error_msg)
  202. raise ValueError(error_msg)
  203. def _validate_section_toggle(self, section: VariableSection) -> None:
  204. """Validate that toggle variable is of type bool if it exists.
  205. If the toggle variable doesn't exist (e.g., filtered out), removes the toggle.
  206. Args:
  207. section: The section to validate
  208. Raises:
  209. ValueError: If toggle variable exists but is not boolean type
  210. """
  211. if not section.toggle:
  212. return
  213. toggle_var = section.variables.get(section.toggle)
  214. if not toggle_var:
  215. # Toggle variable doesn't exist (e.g., was filtered out) - remove toggle metadata
  216. section.toggle = None
  217. return
  218. if toggle_var.type != "bool":
  219. raise ValueError(
  220. f"Section '{section.key}' toggle variable '{section.toggle}' must be type 'bool', "
  221. f"but is type '{toggle_var.type}'"
  222. )
  223. @staticmethod
  224. def _parse_need(need_str: str) -> tuple[str, bool, Any | None]:
  225. """Parse a need string into variable name, operator, and expected value(s).
  226. Supports four formats:
  227. 1. Negation with multiple values: "variable_name!=value1,value2" - checks if variable does NOT equal any value
  228. 2. Negation with single value: "variable_name!=value" - checks if variable does NOT equal value
  229. 3. Equality with multiple values: "variable_name=value1,value2" - checks if variable equals any value
  230. 4. Equality with single value: "variable_name=value" - checks if variable equals value
  231. 5. Old format (backwards compatibility): "section_name" - checks if section is enabled
  232. Args:
  233. need_str: Need specification string
  234. Returns:
  235. Tuple of (variable_or_section_name, is_positive, expected_value)
  236. - is_positive: True for '=' (must match), False for '!=' (must NOT match)
  237. - For old format, expected_value is None (means check section enabled) and is_positive is True
  238. - For new format, expected_value is the string value(s) after operator (string or list)
  239. Examples:
  240. "traefik_enabled=true" -> ("traefik_enabled", True, "true")
  241. "storage_mode=nfs" -> ("storage_mode", True, "nfs")
  242. "network_mode=bridge,macvlan" -> ("network_mode", True, ["bridge", "macvlan"])
  243. "network_mode!=host,macvlan" -> ("network_mode", False, ["host", "macvlan"])
  244. "network_mode!=host" -> ("network_mode", False, "host")
  245. "traefik" -> ("traefik", True, None) # Old format: section name
  246. """
  247. # Check for != operator first (must check before = to avoid false positive)
  248. if "!=" in need_str:
  249. # Negation format: variable!=value or variable!=value1,value2
  250. parts = need_str.split("!=", 1)
  251. var_name = parts[0].strip()
  252. value_part = parts[1].strip()
  253. # Check if multiple values are provided (comma-separated)
  254. if "," in value_part:
  255. values = [v.strip() for v in value_part.split(",")]
  256. return (var_name, False, values)
  257. return (var_name, False, value_part)
  258. if "=" in need_str:
  259. # Equality format: variable=value or variable=value1,value2
  260. parts = need_str.split("=", 1)
  261. var_name = parts[0].strip()
  262. value_part = parts[1].strip()
  263. # Check if multiple values are provided (comma-separated)
  264. if "," in value_part:
  265. values = [v.strip() for v in value_part.split(",")]
  266. return (var_name, True, values)
  267. return (var_name, True, value_part)
  268. # Old format: section name (backwards compatibility)
  269. return (need_str.strip(), True, None)
  270. def _is_need_satisfied(self, need_str: str) -> bool:
  271. """Check if a single need condition is satisfied.
  272. Args:
  273. need_str: Need specification ("variable=value", "variable!=value",
  274. "variable=value1,value2" or "section_name")
  275. Returns:
  276. True if need is satisfied, False otherwise
  277. """
  278. var_or_section, is_positive, expected_value = self._parse_need(need_str)
  279. # Old format: check if section is enabled (backwards compatibility)
  280. if expected_value is None:
  281. result = self._check_section_need(var_or_section)
  282. section = self._sections.get(var_or_section)
  283. if section:
  284. logger.debug(
  285. f"Checking section need '{need_str}': "
  286. f"exists=True, enabled={section.is_enabled()}, satisfied={result}"
  287. )
  288. else:
  289. logger.debug(f"Checking section need '{need_str}': exists=False, satisfied={result}")
  290. return result
  291. # New format: check if variable has expected value(s)
  292. result = self._check_variable_need(var_or_section, is_positive, expected_value, need_str)
  293. variable = self._variable_map.get(var_or_section)
  294. if variable:
  295. operator = "=" if is_positive else "!="
  296. logger.debug(
  297. f"Checking variable need '{need_str}': "
  298. f"var_value={variable.value} {operator} expected={expected_value}, satisfied={result}"
  299. )
  300. else:
  301. logger.debug(f"Checking variable need '{need_str}': variable not found, satisfied={result}")
  302. return result
  303. def _check_section_need(self, section_name: str) -> bool:
  304. """Check if a section-based need is satisfied."""
  305. section = self._sections.get(section_name)
  306. if not section:
  307. logger.warning(f"Need references missing section '{section_name}'")
  308. return False
  309. return section.is_enabled()
  310. def _check_variable_need(self, var_name: str, is_positive: bool, expected_value: Any, need_str: str) -> bool:
  311. """Check if a variable-based need is satisfied.
  312. Args:
  313. var_name: Variable name to check
  314. is_positive: True for '=' (must match), False for '!=' (must NOT match)
  315. expected_value: Expected value(s) to compare against
  316. need_str: Original need string for logging
  317. Returns:
  318. True if need is satisfied, False otherwise
  319. """
  320. variable = self._variable_map.get(var_name)
  321. if not variable:
  322. # Variable doesn't exist - ignore the constraint and treat as satisfied
  323. # This allows templates to override sections without breaking needs constraints
  324. logger.debug(
  325. f"Need '{need_str}' references missing variable '{var_name}' - "
  326. f"ignoring constraint and treating as satisfied"
  327. )
  328. return True
  329. try:
  330. actual_value = variable.convert(variable.value)
  331. # Handle multiple expected values
  332. if isinstance(expected_value, list):
  333. matches = self._matches_any_value(variable, actual_value, expected_value)
  334. else:
  335. # Single expected value
  336. matches = self._matches_single_value(variable, actual_value, expected_value)
  337. # For positive checks (=), return match result directly
  338. # For negative checks (!=), invert the result
  339. return matches if is_positive else not matches
  340. except Exception as e:
  341. logger.debug(f"Failed to compare need '{need_str}': {e}")
  342. return False
  343. def _matches_any_value(self, variable: Variable, actual_value: Any, expected_values: list) -> bool:
  344. """Check if actual value matches any of the expected values."""
  345. for expected in expected_values:
  346. expected_converted = variable.convert(expected)
  347. if self._values_match(variable, actual_value, expected_converted):
  348. return True
  349. return False
  350. def _matches_single_value(self, variable: Variable, actual_value: Any, expected_value: Any) -> bool:
  351. """Check if actual value matches the expected value."""
  352. expected_converted = variable.convert(expected_value)
  353. return self._values_match(variable, actual_value, expected_converted)
  354. def _values_match(self, variable: Variable, actual: Any, expected: Any) -> bool:
  355. """Compare two values based on variable type."""
  356. if variable.type == "bool":
  357. return bool(actual) == bool(expected)
  358. return actual is not None and str(actual) == str(expected)
  359. def _validate_dependencies(self) -> None:
  360. """Validate section dependencies for cycles.
  361. Missing section references are logged as warnings but do not raise errors,
  362. allowing templates to be modified without breaking when dependencies are removed.
  363. Raises:
  364. ValueError: If circular dependencies are found
  365. """
  366. # Check for missing dependencies in sections
  367. for section_key, section in self._sections.items():
  368. for dep in section.needs:
  369. var_or_section, _is_positive, expected_value = self._parse_need(dep)
  370. if expected_value is None:
  371. # Old format: validate section exists
  372. if var_or_section not in self._sections:
  373. logger.warning(
  374. f"Section '{section_key}' depends on '{var_or_section}', "
  375. f"but '{var_or_section}' does not exist. Ignoring this dependency."
  376. )
  377. # New format: validate variable exists
  378. # NOTE: We only warn here, not raise an error, because the variable might be
  379. # added later during merge with module spec. The actual runtime check in
  380. # _is_need_satisfied() will handle missing variables gracefully.
  381. elif var_or_section not in self._variable_map:
  382. logger.debug(
  383. f"Section '{section_key}' has need '{dep}', but variable '{var_or_section}' "
  384. f"not found (might be added during merge)"
  385. )
  386. # Check for missing dependencies in variables
  387. for var_name, variable in self._variable_map.items():
  388. for dep in variable.needs:
  389. dep_var, _is_positive, expected_value = self._parse_need(dep)
  390. # Only validate new format and check if variable is missing
  391. if expected_value is not None and dep_var not in self._variable_map:
  392. # NOTE: We only warn here, not raise an error, because the variable might be
  393. # added later during merge with module spec. The actual runtime check in
  394. # _is_need_satisfied() will handle missing variables gracefully.
  395. logger.debug(
  396. f"Variable '{var_name}' has need '{dep}', but variable '{dep_var}' "
  397. f"not found (might be added during merge)"
  398. )
  399. # Check for circular dependencies using depth-first search
  400. # Note: Only checks section-level dependencies in old format (section names)
  401. # Variable-level dependencies (variable=value) don't create cycles in the same way
  402. visited = set()
  403. rec_stack = set()
  404. def has_cycle(section_key: str) -> bool:
  405. visited.add(section_key)
  406. rec_stack.add(section_key)
  407. section = self._sections[section_key]
  408. for dep in section.needs:
  409. # Only check circular deps for old format (section references)
  410. dep_name, _is_positive, expected_value = self._parse_need(dep)
  411. # Old format section dependency - check for cycles
  412. if expected_value is None and dep_name in self._sections:
  413. if dep_name not in visited:
  414. if has_cycle(dep_name):
  415. return True
  416. elif dep_name in rec_stack:
  417. raise ValueError(
  418. f"Circular dependency detected: '{section_key}' depends on '{dep_name}', "
  419. f"which creates a cycle"
  420. )
  421. rec_stack.remove(section_key)
  422. return False
  423. for section_key in self._sections:
  424. if section_key not in visited:
  425. has_cycle(section_key)
  426. def is_section_satisfied(self, section_key: str) -> bool:
  427. """Check if all dependencies for a section are satisfied.
  428. Supports both formats:
  429. - Old format: "section_name" - checks if section is enabled (backwards compatible)
  430. - New format: "variable=value" - checks if variable has specific value
  431. Args:
  432. section_key: The key of the section to check
  433. Returns:
  434. True if all dependencies are satisfied, False otherwise
  435. """
  436. section = self._sections.get(section_key)
  437. if not section:
  438. return False
  439. # No dependencies = always satisfied
  440. if not section.needs:
  441. return True
  442. # Check each dependency using the unified need satisfaction logic
  443. for need in section.needs:
  444. if not self._is_need_satisfied(need):
  445. logger.debug(f"Section '{section_key}' need '{need}' is not satisfied")
  446. return False
  447. return True
  448. def is_variable_satisfied(self, var_name: str) -> bool:
  449. """Check if all dependencies for a variable are satisfied.
  450. A variable is satisfied if all its needs are met.
  451. Needs are specified as "variable_name=value".
  452. Args:
  453. var_name: The name of the variable to check
  454. Returns:
  455. True if all dependencies are satisfied, False otherwise
  456. """
  457. variable = self._variable_map.get(var_name)
  458. if not variable:
  459. return False
  460. # No dependencies = always satisfied
  461. if not variable.needs:
  462. return True
  463. # Check each dependency
  464. for need in variable.needs:
  465. if not self._is_need_satisfied(need):
  466. logger.debug(f"Variable '{var_name}' need '{need}' is not satisfied")
  467. return False
  468. return True
  469. def reset_disabled_bool_variables(self) -> list[str]:
  470. """Reset bool variables with unsatisfied dependencies to False.
  471. This ensures that disabled bool variables don't accidentally remain True
  472. and cause confusion in templates or configuration.
  473. Note: CLI-provided variables are NOT reset here - they are validated
  474. later in validate_all() to provide better error messages.
  475. Returns:
  476. List of variable names that were reset
  477. """
  478. reset_vars = []
  479. logger.debug("Starting reset of disabled bool variables")
  480. for section_key, section in self._sections.items():
  481. # Check if section dependencies are satisfied
  482. section_satisfied = self.is_section_satisfied(section_key)
  483. is_enabled = section.is_enabled()
  484. for var_name, variable in section.variables.items():
  485. # Only process bool variables
  486. if variable.type != "bool":
  487. continue
  488. # Check if variable's own dependencies are satisfied
  489. var_satisfied = self.is_variable_satisfied(var_name)
  490. # If section is disabled OR variable dependencies aren't met, reset to False
  491. if (
  492. (not section_satisfied or not is_enabled or not var_satisfied)
  493. and variable.value is not False
  494. and variable.origin != "cli"
  495. ):
  496. # Store original value if not already stored (for display purposes)
  497. if not hasattr(variable, "_original_disabled"):
  498. variable._original_disabled = variable.value
  499. variable.value = False
  500. reset_vars.append(var_name)
  501. logger.debug(
  502. f"Reset disabled bool variable '{var_name}' to False "
  503. f"(section satisfied: {section_satisfied}, enabled: {is_enabled}, "
  504. f"var satisfied: {var_satisfied})"
  505. )
  506. if reset_vars:
  507. logger.debug(f"Reset {len(reset_vars)} disabled bool variables: {', '.join(reset_vars)}")
  508. else:
  509. logger.debug("No bool variables needed reset")
  510. return reset_vars
  511. def sort_sections(self) -> None:
  512. """Sort sections with the following priority:
  513. 1. Dependencies come before dependents (topological sort)
  514. 2. Enabled sections with satisfied dependencies first (in their original order)
  515. 3. Disabled sections or sections with unsatisfied dependencies last (in their original order)
  516. This maintains the original ordering within each group while organizing
  517. sections logically for display and user interaction, and ensures that
  518. sections are prompted in the correct dependency order.
  519. """
  520. # First, perform topological sort to respect dependencies
  521. sorted_keys = self._topological_sort()
  522. # Then apply priority sorting within dependency groups
  523. section_items = [(key, self._sections[key]) for key in sorted_keys]
  524. # Define sort key: (priority, original_index)
  525. # Priority: 0 = enabled with satisfied dependencies, 1 = disabled or unsatisfied dependencies
  526. def get_sort_key(item_with_index):
  527. index, (key, section) = item_with_index
  528. priority = 0 if section.is_enabled() and self.is_section_satisfied(key) else 1
  529. return (priority, index)
  530. # Sort with original index to maintain order within each priority group
  531. # Note: This preserves the topological order from earlier
  532. sorted_items = sorted(enumerate(section_items), key=get_sort_key)
  533. # Rebuild _sections dict in new order
  534. self._sections = {key: section for _, (key, section) in sorted_items}
  535. # NOTE: Sort variables within each section by their dependencies.
  536. # This is critical for correct behavior in both display and prompts:
  537. # 1. DISPLAY: Variables are shown in logical order (dependencies before dependents)
  538. # 2. PROMPTS: Users are asked for dependency values BEFORE dependent values
  539. # Example: network_mode (bridge/host/macvlan) is prompted before
  540. # network_macvlan_ipv4_address (which needs network_mode=macvlan)
  541. # 3. VALIDATION: Ensures config/CLI overrides can be checked in correct order
  542. # Without this sorting, users would be prompted for irrelevant variables or see
  543. # confusing variable order in the UI.
  544. for section in self._sections.values():
  545. section.sort_variables(self._is_need_satisfied)
  546. def _topological_sort(self) -> list[str]:
  547. """Perform topological sort on sections based on dependencies using Kahn's algorithm."""
  548. in_degree = {key: len(section.needs) for key, section in self._sections.items()}
  549. queue = [key for key, degree in in_degree.items() if degree == 0]
  550. queue.sort(key=lambda k: list(self._sections.keys()).index(k)) # Preserve original order
  551. result = []
  552. while queue:
  553. current = queue.pop(0)
  554. result.append(current)
  555. # Update in-degree for dependent sections
  556. for key, section in self._sections.items():
  557. if current in section.needs:
  558. in_degree[key] -= 1
  559. if in_degree[key] == 0:
  560. queue.append(key)
  561. # Fallback to original order if cycle detected
  562. if len(result) != len(self._sections):
  563. missing = set(self._sections.keys()) - set(result)
  564. # Identify which sections have circular dependencies
  565. circular_deps = []
  566. for section_key in missing:
  567. section = self._sections[section_key]
  568. if section.needs:
  569. circular_deps.append(f"{section_key} (needs: {', '.join(section.needs)})")
  570. logger.warning(
  571. f"Topological sort incomplete - circular dependency detected. "
  572. f"Missing sections: {', '.join(missing)}. "
  573. f"Circular dependencies: {'; '.join(circular_deps) if circular_deps else 'none identified'}. "
  574. f"Using original order."
  575. )
  576. return list(self._sections.keys())
  577. return result
  578. def get_sections(self) -> dict[str, VariableSection]:
  579. """Get all sections in the collection."""
  580. return self._sections.copy()
  581. def get_section(self, key: str) -> VariableSection | None:
  582. """Get a specific section by its key."""
  583. return self._sections.get(key)
  584. def has_sections(self) -> bool:
  585. """Check if the collection has any sections."""
  586. return bool(self._sections)
  587. def get_all_values(self) -> dict[str, Any]:
  588. """Get all variable values as a dictionary."""
  589. # NOTE: Uses _variable_map for O(1) access
  590. return {name: var.convert(var.value) for name, var in self._variable_map.items()}
  591. def get_satisfied_values(self) -> dict[str, Any]:
  592. """Get variable values only from sections with satisfied dependencies.
  593. This respects both toggle states and section dependencies, ensuring that:
  594. - Variables from disabled sections (toggle=false) are excluded EXCEPT required variables
  595. - Variables from sections with unsatisfied dependencies are excluded
  596. - Required variables are always included if their section dependencies are satisfied
  597. Returns:
  598. Dictionary of variable names to values for satisfied sections only
  599. """
  600. satisfied_values = {}
  601. for section_key, section in self._sections.items():
  602. # Skip sections with unsatisfied dependencies (even required variables need satisfied deps)
  603. if not self.is_section_satisfied(section_key):
  604. logger.debug(f"Excluding variables from section '{section_key}' - dependencies not satisfied")
  605. continue
  606. # Check if section is enabled
  607. is_enabled = section.is_enabled()
  608. if is_enabled:
  609. # Include all variables from enabled section
  610. for var_name, variable in section.variables.items():
  611. satisfied_values[var_name] = variable.convert(variable.value)
  612. else:
  613. # Section is disabled - exclude all variables
  614. logger.debug(f"Section '{section_key}' is disabled - excluding all variables")
  615. return satisfied_values
  616. def get_sensitive_variables(self) -> dict[str, Any]:
  617. """Get only the sensitive variables with their values."""
  618. return {name: var.value for name, var in self._variable_map.items() if var.sensitive and var.value}
  619. def apply_defaults(self, defaults: dict[str, Any], origin: str = "cli") -> list[str]:
  620. """Apply default values to variables, updating their origin.
  621. Args:
  622. defaults: Dictionary mapping variable names to their default values
  623. origin: Source of these defaults (e.g., 'config', 'cli')
  624. Returns:
  625. List of variable names that were successfully updated
  626. """
  627. # NOTE: This method uses the _variable_map for a significant performance gain,
  628. # as it allows direct O(1) lookup of variables instead of iterating
  629. # through all sections to find a match.
  630. successful = []
  631. errors = []
  632. for var_name, value in defaults.items():
  633. try:
  634. variable = self._variable_map.get(var_name)
  635. if not variable:
  636. logger.debug(
  637. f"Default value for '{var_name}' not applicable to this template (variable not defined)"
  638. )
  639. continue
  640. # Check if variable's needs are satisfied
  641. # If not, warn that the override will have no effect
  642. if not self.is_variable_satisfied(var_name):
  643. # Build a friendly message about which needs aren't satisfied
  644. unmet_needs = []
  645. for need in variable.needs:
  646. if not self._is_need_satisfied(need):
  647. unmet_needs.append(need)
  648. needs_str = ", ".join(unmet_needs) if unmet_needs else "unknown"
  649. logger.warning(
  650. f"Setting '{var_name}' via {origin} will have no effect - needs not satisfied: {needs_str}"
  651. )
  652. # Continue anyway to store the value (it might become relevant later)
  653. # Store original value before overriding (for display purposes)
  654. # Only store if this is the first time config is being applied
  655. if origin == "config" and not hasattr(variable, "_original_stored"):
  656. variable.original_value = variable.value
  657. variable._original_stored = True
  658. # Convert and set the new value
  659. converted_value = variable.convert(value)
  660. variable.value = converted_value
  661. # Set origin to the current source (not a chain)
  662. variable.origin = origin
  663. successful.append(var_name)
  664. except ValueError as e:
  665. error_msg = f"Invalid value for '{var_name}': {value} - {e}"
  666. errors.append(error_msg)
  667. logger.error(error_msg)
  668. if errors:
  669. # Raise exception to halt execution on validation errors
  670. raise ValueError(f"Variable validation failed: {'; '.join(errors)}")
  671. return successful
  672. def validate_all(self) -> None:
  673. """Validate all variables in the collection.
  674. Validates:
  675. - All variables in enabled sections with satisfied dependencies
  676. - Required variables even if their section is disabled (but dependencies must be satisfied)
  677. - CLI-provided bool variables with unsatisfied dependencies
  678. """
  679. errors: list[str] = []
  680. # First, check for CLI-provided bool variables with unsatisfied dependencies
  681. self._validate_cli_bool_variables(errors)
  682. # Then validate all other variables
  683. self._validate_section_variables(errors)
  684. if errors:
  685. error_msg = "Variable validation failed: " + ", ".join(errors)
  686. logger.error(error_msg)
  687. raise ValueError(error_msg)
  688. def _validate_cli_bool_variables(self, errors: list[str]) -> None:
  689. """Validate CLI-provided bool variables with unsatisfied dependencies."""
  690. for section_key, section in self._sections.items():
  691. section_satisfied = self.is_section_satisfied(section_key)
  692. is_enabled = section.is_enabled()
  693. for var_name, variable in section.variables.items():
  694. # Check CLI-provided bool variables with unsatisfied dependencies
  695. if not self._is_cli_bool_variable(variable):
  696. continue
  697. var_satisfied = self.is_variable_satisfied(var_name)
  698. if section_satisfied and is_enabled and var_satisfied:
  699. continue
  700. # Build error message with unmet needs
  701. unmet_needs = self._collect_unmet_needs(section, variable, section_satisfied, var_satisfied)
  702. needs_str = ", ".join(sorted(unmet_needs)) if unmet_needs else "dependencies not satisfied"
  703. errors.append(f"{section.key}.{var_name} (set via CLI to {variable.value} but requires: {needs_str})")
  704. def _is_cli_bool_variable(self, variable: Variable) -> bool:
  705. """Check if variable is a CLI-provided boolean."""
  706. return variable.type == "bool" and variable.origin == "cli" and variable.value is not False
  707. def _collect_unmet_needs(
  708. self, section, variable: Variable, section_satisfied: bool, var_satisfied: bool
  709. ) -> set[str]:
  710. """Collect all unmet needs from section and variable."""
  711. unmet_needs = set()
  712. if not section_satisfied:
  713. for need in section.needs:
  714. if not self._is_need_satisfied(need):
  715. unmet_needs.add(need)
  716. if not var_satisfied:
  717. for need in variable.needs:
  718. if not self._is_need_satisfied(need):
  719. unmet_needs.add(need)
  720. return unmet_needs
  721. def _validate_section_variables(self, errors: list[str]) -> None:
  722. """Validate all variables in each section."""
  723. for section_key, section in self._sections.items():
  724. # Skip sections with unsatisfied dependencies
  725. if not self.is_section_satisfied(section_key):
  726. logger.debug(f"Skipping validation for section '{section_key}' - dependencies not satisfied")
  727. continue
  728. # Check if section is enabled
  729. is_enabled = section.is_enabled()
  730. if not is_enabled:
  731. logger.debug(f"Section '{section_key}' is disabled - skipping all variables")
  732. continue
  733. # Validate variables in the section
  734. for var_name, variable in section.variables.items():
  735. self._validate_single_variable(section, var_name, variable, errors)
  736. def _validate_single_variable(self, section, var_name: str, variable: Variable, errors: list[str]) -> None:
  737. """Validate a single variable and append errors."""
  738. try:
  739. # Skip autogenerated variables when empty
  740. if variable.autogenerated and not variable.value:
  741. return
  742. # Skip variables with unsatisfied needs (even if required)
  743. if not self.is_variable_satisfied(var_name):
  744. logger.debug(f"Skipping validation for variable '{var_name}' - needs not satisfied")
  745. return
  746. # Check required fields
  747. if variable.value is None:
  748. if variable.is_required():
  749. # Enhanced error message with context
  750. origin_info = f" from {variable.origin}" if variable.origin else ""
  751. logger.debug(
  752. f"Required variable validation failed: '{var_name}'{origin_info} "
  753. f"in section '{section.key}' has no value and no default"
  754. )
  755. errors.append(f"{section.key}.{var_name} (required{origin_info} - no default provided)")
  756. return
  757. typed = variable.convert(variable.value)
  758. if variable.type not in ("bool",) and not typed:
  759. msg = f"{section.key}.{var_name}"
  760. error = f"{msg} (required - cannot be empty)" if variable.is_required() else f"{msg} (empty)"
  761. errors.append(error)
  762. except ValueError as e:
  763. errors.append(f"{section.key}.{var_name} (invalid format: {e})")
  764. def merge(
  765. self,
  766. other_spec: dict[str, Any] | VariableCollection,
  767. origin: str = "override",
  768. ) -> VariableCollection:
  769. """Merge another spec or VariableCollection into this one with precedence tracking.
  770. OPTIMIZED: Works directly on objects without dict conversions for better performance.
  771. The other spec/collection has higher precedence and will override values in self.
  772. Creates a new VariableCollection with merged data.
  773. Args:
  774. other_spec: Either a spec dictionary or another VariableCollection to merge
  775. origin: Origin label for variables from other_spec (e.g., 'template', 'config')
  776. Returns:
  777. New VariableCollection with merged data
  778. Example:
  779. module_vars = VariableCollection(module_spec)
  780. template_vars = module_vars.merge(template_spec, origin='template')
  781. # Variables from template_spec override module_spec
  782. # Origins tracked: 'module' or 'module -> template'
  783. """
  784. logger.debug(f"Starting merge operation with origin '{origin}'")
  785. # Convert dict to VariableCollection if needed (only once)
  786. other = VariableCollection(other_spec) if isinstance(other_spec, dict) else other_spec
  787. # Create new collection without calling __init__ (optimization)
  788. merged = VariableCollection.__new__(VariableCollection)
  789. merged._sections = {}
  790. merged._variable_map = {}
  791. # First pass: clone sections from self
  792. for section_key, self_section in self._sections.items():
  793. if section_key in other._sections:
  794. # Section exists in both - will merge
  795. merged._sections[section_key] = self._merge_sections(self_section, other._sections[section_key], origin)
  796. else:
  797. # Section only in self - clone it
  798. merged._sections[section_key] = self_section.clone()
  799. # Second pass: add sections that only exist in other
  800. for section_key, other_section in other._sections.items():
  801. if section_key not in merged._sections:
  802. # New section from other - clone with origin update
  803. merged._sections[section_key] = other_section.clone(origin_update=origin)
  804. # Rebuild variable map for O(1) lookups
  805. for section in merged._sections.values():
  806. for var_name, variable in section.variables.items():
  807. merged._variable_map[var_name] = variable
  808. # Log merge statistics
  809. self_var_count = sum(len(s.variables) for s in self._sections.values())
  810. other_var_count = sum(len(s.variables) for s in other._sections.values())
  811. merged_var_count = len(merged._variable_map)
  812. logger.debug(
  813. f"Merge complete: {len(self._sections)} sections (base) + {len(other._sections)} sections (override) = "
  814. f"{len(merged._sections)} sections, {self_var_count} vars + "
  815. f"{other_var_count} vars = {merged_var_count} vars"
  816. )
  817. # Validate dependencies after merge is complete
  818. merged._validate_dependencies()
  819. return merged
  820. def _merge_sections(
  821. self, self_section: VariableSection, other_section: VariableSection, origin: str
  822. ) -> VariableSection:
  823. """Merge two sections, with other_section taking precedence."""
  824. merged_section = self_section.clone()
  825. # Update section metadata from other (other takes precedence)
  826. # Explicit null/empty values clear the property (reset mechanism)
  827. for attr in ("title", "description", "toggle"):
  828. if hasattr(other_section, "_explicit_fields") and attr in other_section._explicit_fields:
  829. # Set to the other value even if null/empty (enables explicit reset)
  830. setattr(merged_section, attr, getattr(other_section, attr))
  831. # Respect explicit clears for dependencies (explicit null/empty clears, missing field preserves)
  832. if hasattr(other_section, "_explicit_fields") and "needs" in other_section._explicit_fields:
  833. merged_section.needs = other_section.needs.copy() if other_section.needs else []
  834. # Merge variables
  835. for var_name, other_var in other_section.variables.items():
  836. if var_name in merged_section.variables:
  837. # Variable exists in both - merge with other taking precedence
  838. self_var = merged_section.variables[var_name]
  839. # Build update dict with ONLY explicitly provided fields from other
  840. update = {"origin": origin}
  841. field_map = {
  842. "type": other_var.type,
  843. "description": other_var.description,
  844. "prompt": other_var.prompt,
  845. "options": other_var.options,
  846. "sensitive": other_var.sensitive,
  847. "extra": other_var.extra,
  848. }
  849. # Add fields that were explicitly provided, even if falsy/empty
  850. for field, value in field_map.items():
  851. if field in other_var._explicit_fields:
  852. update[field] = value
  853. # For boolean flags, only copy if explicitly provided in other
  854. # This prevents False defaults from overriding True values
  855. for bool_field in ("autogenerated", "required"):
  856. if bool_field in other_var._explicit_fields:
  857. update[bool_field] = getattr(other_var, bool_field)
  858. # Special handling for needs (allow explicit null/empty to clear)
  859. if "needs" in other_var._explicit_fields:
  860. update["needs"] = other_var.needs.copy() if other_var.needs else []
  861. # Special handling for value/default (allow explicit null to clear)
  862. if "value" in other_var._explicit_fields or "default" in other_var._explicit_fields:
  863. update["value"] = other_var.value
  864. merged_section.variables[var_name] = self_var.clone(update=update)
  865. else:
  866. # New variable from other - clone with origin
  867. merged_section.variables[var_name] = other_var.clone(update={"origin": origin})
  868. return merged_section
  869. def filter_to_used(self, used_variables: set[str], keep_sensitive: bool = True) -> VariableCollection:
  870. """Filter collection to only variables that are used (or sensitive).
  871. OPTIMIZED: Works directly on objects without dict conversions for better performance.
  872. Creates a new VariableCollection containing only the variables in used_variables.
  873. Sections with no remaining variables are removed.
  874. Args:
  875. used_variables: Set of variable names that are actually used
  876. keep_sensitive: If True, also keep sensitive variables even if not in used set
  877. Returns:
  878. New VariableCollection with filtered variables
  879. Example:
  880. all_vars = VariableCollection(spec)
  881. used_vars = all_vars.filter_to_used({'var1', 'var2', 'var3'})
  882. # Only var1, var2, var3 (and any sensitive vars) remain
  883. """
  884. # Create new collection without calling __init__ (optimization)
  885. filtered = VariableCollection.__new__(VariableCollection)
  886. filtered._sections = {}
  887. filtered._variable_map = {}
  888. # Filter each section
  889. for section_key, section in self._sections.items():
  890. # Create a new section with same metadata
  891. filtered_section = VariableSection(
  892. {
  893. "key": section.key,
  894. "title": section.title,
  895. "description": section.description,
  896. "toggle": section.toggle,
  897. "needs": section.needs.copy() if section.needs else None,
  898. }
  899. )
  900. # Clone only the variables that should be included
  901. for var_name, variable in section.variables.items():
  902. # Include if used OR if sensitive (and keep_sensitive is True)
  903. should_include = var_name in used_variables or (keep_sensitive and variable.sensitive)
  904. if should_include:
  905. filtered_section.variables[var_name] = variable.clone()
  906. # Only add section if it has variables
  907. if filtered_section.variables:
  908. filtered._sections[section_key] = filtered_section
  909. # Add variables to map
  910. for var_name, variable in filtered_section.variables.items():
  911. filtered._variable_map[var_name] = variable
  912. return filtered
  913. def get_all_variable_names(self) -> set[str]:
  914. """Get set of all variable names across all sections.
  915. Returns:
  916. Set of all variable names
  917. """
  918. return set(self._variable_map.keys())