variables.py 40 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132
  1. from __future__ import annotations
  2. from collections import OrderedDict
  3. from dataclasses import dataclass, field
  4. from typing import Any, Dict, List, Optional, Set, Union
  5. from urllib.parse import urlparse
  6. import logging
  7. import re
  8. logger = logging.getLogger(__name__)
  9. TRUE_VALUES = {"true", "1", "yes", "on"}
  10. FALSE_VALUES = {"false", "0", "no", "off"}
  11. HOSTNAME_REGEX = re.compile(r"^(?=.{1,253}$)(?!-)[A-Za-z0-9_-]{1,63}(?<!-)(\.(?!-)[A-Za-z0-9_-]{1,63}(?<!-))*$")
  12. EMAIL_REGEX = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
  13. class Variable:
  14. """Represents a single templating variable with lightweight validation."""
  15. def __init__(self, data: dict[str, Any]) -> None:
  16. """Initialize Variable from a dictionary containing variable specification.
  17. Args:
  18. data: Dictionary containing variable specification with required 'name' key
  19. and optional keys: description, type, options, prompt, value, default, section, origin
  20. Raises:
  21. ValueError: If data is not a dict, missing 'name' key, or has invalid default value
  22. """
  23. # Validate input
  24. if not isinstance(data, dict):
  25. raise ValueError("Variable data must be a dictionary")
  26. if "name" not in data:
  27. raise ValueError("Variable data must contain 'name' key")
  28. # Track which fields were explicitly provided in source data
  29. self._explicit_fields: Set[str] = set(data.keys())
  30. # Initialize fields
  31. self.name: str = data["name"]
  32. self.description: Optional[str] = data.get("description") or data.get("display", "")
  33. self.type: str = data.get("type", "str")
  34. self.options: Optional[List[Any]] = data.get("options", [])
  35. self.prompt: Optional[str] = data.get("prompt")
  36. self.value: Any = data.get("value") if data.get("value") is not None else data.get("default")
  37. self.section: Optional[str] = data.get("section")
  38. self.origin: Optional[str] = data.get("origin")
  39. self.sensitive: bool = data.get("sensitive", False)
  40. # Optional extra explanation used by interactive prompts
  41. self.extra: Optional[str] = data.get("extra")
  42. # Flag indicating this variable should be auto-generated when empty
  43. self.autogenerated: bool = data.get("autogenerated", False)
  44. # Original value before config override (used for display)
  45. self.original_value: Optional[Any] = data.get("original_value")
  46. # Validate and convert the default/initial value if present
  47. if self.value is not None:
  48. try:
  49. self.value = self.convert(self.value)
  50. except ValueError as exc:
  51. raise ValueError(f"Invalid default for variable '{self.name}': {exc}")
  52. def _validate_not_empty(self, value: Any, converted_value: Any) -> None:
  53. """Validate that a value is not empty for non-boolean types."""
  54. if self.type not in ["bool"] and (converted_value is None or converted_value == ""):
  55. raise ValueError("value cannot be empty")
  56. def _validate_enum_option(self, value: str) -> None:
  57. """Validate that a value is in the allowed enum options."""
  58. if self.options and value not in self.options:
  59. raise ValueError(f"value must be one of: {', '.join(self.options)}")
  60. def _validate_regex_pattern(self, value: str, pattern: re.Pattern, error_msg: str) -> None:
  61. """Validate that a value matches a regex pattern."""
  62. if not pattern.fullmatch(value):
  63. raise ValueError(error_msg)
  64. def _validate_url_structure(self, parsed_url) -> None:
  65. """Validate that a parsed URL has required components."""
  66. if not (parsed_url.scheme and parsed_url.netloc):
  67. raise ValueError("value must be a valid URL (include scheme and host)")
  68. def convert(self, value: Any) -> Any:
  69. """Validate and convert a raw value based on the variable type."""
  70. if value is None:
  71. return None
  72. # Treat empty strings as None to avoid storing "" for missing values.
  73. if isinstance(value, str) and value.strip() == "":
  74. return None
  75. # Type conversion mapping for cleaner code
  76. converters = {
  77. "bool": self._convert_bool,
  78. "int": self._convert_int,
  79. "float": self._convert_float,
  80. "enum": self._convert_enum,
  81. "hostname": self._convert_hostname,
  82. "url": self._convert_url,
  83. "email": self._convert_email,
  84. }
  85. converter = converters.get(self.type)
  86. if converter:
  87. return converter(value)
  88. # Default to string conversion
  89. return str(value)
  90. def _convert_bool(self, value: Any) -> bool:
  91. """Convert value to boolean."""
  92. if isinstance(value, bool):
  93. return value
  94. if isinstance(value, str):
  95. lowered = value.strip().lower()
  96. if lowered in TRUE_VALUES:
  97. return True
  98. if lowered in FALSE_VALUES:
  99. return False
  100. raise ValueError("value must be a boolean (true/false)")
  101. def _convert_int(self, value: Any) -> Optional[int]:
  102. """Convert value to integer."""
  103. if isinstance(value, int):
  104. return value
  105. if isinstance(value, str) and value.strip() == "":
  106. return None
  107. try:
  108. return int(value)
  109. except (TypeError, ValueError) as exc:
  110. raise ValueError("value must be an integer") from exc
  111. def _convert_float(self, value: Any) -> Optional[float]:
  112. """Convert value to float."""
  113. if isinstance(value, float):
  114. return value
  115. if isinstance(value, str) and value.strip() == "":
  116. return None
  117. try:
  118. return float(value)
  119. except (TypeError, ValueError) as exc:
  120. raise ValueError("value must be a float") from exc
  121. def _convert_enum(self, value: Any) -> Optional[str]:
  122. """Convert value to enum option."""
  123. if value == "":
  124. return None
  125. val = str(value)
  126. self._validate_enum_option(val)
  127. return val
  128. def _convert_hostname(self, value: Any) -> str:
  129. """Convert and validate hostname."""
  130. val = str(value).strip()
  131. if not val:
  132. return None
  133. if val.lower() != "localhost":
  134. self._validate_regex_pattern(val, HOSTNAME_REGEX, "value must be a valid hostname")
  135. return val
  136. def _convert_url(self, value: Any) -> str:
  137. """Convert and validate URL."""
  138. val = str(value).strip()
  139. if not val:
  140. return None
  141. parsed = urlparse(val)
  142. self._validate_url_structure(parsed)
  143. return val
  144. def _convert_email(self, value: Any) -> str:
  145. """Convert and validate email."""
  146. val = str(value).strip()
  147. if not val:
  148. return None
  149. self._validate_regex_pattern(val, EMAIL_REGEX, "value must be a valid email address")
  150. return val
  151. def get_typed_value(self) -> Any:
  152. """Return the stored value converted to the appropriate Python type."""
  153. return self.convert(self.value)
  154. def to_dict(self) -> Dict[str, Any]:
  155. """Serialize Variable to a dictionary for storage.
  156. Returns:
  157. Dictionary representation of the variable with only relevant fields.
  158. """
  159. var_dict = {}
  160. if self.type:
  161. var_dict["type"] = self.type
  162. if self.value is not None:
  163. var_dict["default"] = self.value
  164. if self.description:
  165. var_dict["description"] = self.description
  166. if self.prompt:
  167. var_dict["prompt"] = self.prompt
  168. if self.sensitive:
  169. var_dict["sensitive"] = self.sensitive
  170. if self.extra:
  171. var_dict["extra"] = self.extra
  172. if self.autogenerated:
  173. var_dict["autogenerated"] = self.autogenerated
  174. if self.options:
  175. var_dict["options"] = self.options
  176. if self.origin:
  177. var_dict["origin"] = self.origin
  178. return var_dict
  179. def get_display_value(self, mask_sensitive: bool = True, max_length: int = 30, show_none: bool = True) -> str:
  180. """Get formatted display value with optional masking and truncation.
  181. Args:
  182. mask_sensitive: If True, mask sensitive values with asterisks
  183. max_length: Maximum length before truncation (0 = no limit)
  184. show_none: If True, display "(none)" for None values instead of empty string
  185. Returns:
  186. Formatted string representation of the value
  187. """
  188. if self.value is None or self.value == "":
  189. return "[dim](none)[/dim]" if show_none else ""
  190. # Mask sensitive values
  191. if self.sensitive and mask_sensitive:
  192. return "********"
  193. # Convert to string
  194. display = str(self.value)
  195. # Truncate if needed
  196. if max_length > 0 and len(display) > max_length:
  197. return display[:max_length - 3] + "..."
  198. return display
  199. def get_normalized_default(self) -> Any:
  200. """Get normalized default value suitable for prompts and display.
  201. Handles type conversion and provides sensible defaults for different types.
  202. Especially useful for enum, bool, and int types in interactive prompts.
  203. Returns:
  204. Normalized default value appropriate for the variable type
  205. """
  206. try:
  207. typed = self.get_typed_value()
  208. except Exception:
  209. typed = self.value
  210. # Enum: ensure default is valid option
  211. if self.type == "enum":
  212. if not self.options:
  213. return typed
  214. # If typed is invalid or missing, use first option
  215. if typed is None or str(typed) not in self.options:
  216. return self.options[0]
  217. return str(typed)
  218. # Boolean: return as bool type
  219. if self.type == "bool":
  220. if isinstance(typed, bool):
  221. return typed
  222. return None if typed is None else bool(typed)
  223. # Integer: return as int type
  224. if self.type == "int":
  225. try:
  226. return int(typed) if typed is not None and typed != "" else None
  227. except Exception:
  228. return None
  229. # Default: return string or None
  230. return None if typed is None else str(typed)
  231. def get_prompt_text(self) -> str:
  232. """Get formatted prompt text for interactive input.
  233. Returns:
  234. Prompt text with optional type hints and descriptions
  235. """
  236. prompt_text = self.prompt or self.description or self.name
  237. # Add type hint for semantic types if there's a default
  238. if self.value is not None and self.type in ["hostname", "email", "url"]:
  239. prompt_text += f" ({self.type})"
  240. return prompt_text
  241. def get_validation_hint(self) -> Optional[str]:
  242. """Get validation hint for prompts (e.g., enum options).
  243. Returns:
  244. Formatted hint string or None if no hint needed
  245. """
  246. hints = []
  247. # Add enum options
  248. if self.type == "enum" and self.options:
  249. hints.append(f"Options: {', '.join(self.options)}")
  250. # Add extra help text
  251. if self.extra:
  252. hints.append(self.extra)
  253. return " — ".join(hints) if hints else None
  254. def is_required(self) -> bool:
  255. """Check if this variable requires a value (cannot be empty/None).
  256. A variable is considered required if:
  257. - It doesn't have a default value (value is None)
  258. - It's not marked as autogenerated (which can be empty and generated later)
  259. - It's not a boolean type (booleans default to False if not set)
  260. Returns:
  261. True if the variable must have a non-empty value, False otherwise
  262. """
  263. # Autogenerated variables can be empty (will be generated later)
  264. if self.autogenerated:
  265. return False
  266. # Boolean variables always have a value (True or False)
  267. if self.type == "bool":
  268. return False
  269. # Variables with a default value are not required
  270. if self.value is not None:
  271. return False
  272. # No default value and not autogenerated = required
  273. return True
  274. def clone(self, update: Optional[Dict[str, Any]] = None) -> 'Variable':
  275. """Create a deep copy of the variable with optional field updates.
  276. This is more efficient than converting to dict and back when copying variables.
  277. Args:
  278. update: Optional dictionary of field updates to apply to the clone
  279. Returns:
  280. New Variable instance with copied data
  281. Example:
  282. var2 = var1.clone(update={'origin': 'template'})
  283. """
  284. data = {
  285. 'name': self.name,
  286. 'type': self.type,
  287. 'value': self.value,
  288. 'description': self.description,
  289. 'prompt': self.prompt,
  290. 'options': self.options.copy() if self.options else None,
  291. 'section': self.section,
  292. 'origin': self.origin,
  293. 'sensitive': self.sensitive,
  294. 'extra': self.extra,
  295. 'autogenerated': self.autogenerated,
  296. 'original_value': self.original_value,
  297. }
  298. # Apply updates if provided
  299. if update:
  300. data.update(update)
  301. # Create new variable
  302. cloned = Variable(data)
  303. # Preserve explicit fields from original, and add any update keys
  304. cloned._explicit_fields = self._explicit_fields.copy()
  305. if update:
  306. cloned._explicit_fields.update(update.keys())
  307. return cloned
  308. class VariableSection:
  309. """Groups variables together with shared metadata for presentation."""
  310. def __init__(self, data: dict[str, Any]) -> None:
  311. """Initialize VariableSection from a dictionary.
  312. Args:
  313. data: Dictionary containing section specification with required 'key' and 'title' keys
  314. """
  315. if not isinstance(data, dict):
  316. raise ValueError("VariableSection data must be a dictionary")
  317. if "key" not in data:
  318. raise ValueError("VariableSection data must contain 'key'")
  319. if "title" not in data:
  320. raise ValueError("VariableSection data must contain 'title'")
  321. self.key: str = data["key"]
  322. self.title: str = data["title"]
  323. self.variables: OrderedDict[str, Variable] = OrderedDict()
  324. self.description: Optional[str] = data.get("description")
  325. self.toggle: Optional[str] = data.get("toggle")
  326. # Default "general" section to required=True, all others to required=False
  327. self.required: bool = data.get("required", data["key"] == "general")
  328. # Section dependencies - can be string or list of strings
  329. needs_value = data.get("needs")
  330. if needs_value:
  331. if isinstance(needs_value, str):
  332. self.needs: List[str] = [needs_value]
  333. elif isinstance(needs_value, list):
  334. self.needs: List[str] = needs_value
  335. else:
  336. raise ValueError(f"Section '{self.key}' has invalid 'needs' value: must be string or list")
  337. else:
  338. self.needs: List[str] = []
  339. def variable_names(self) -> list[str]:
  340. return list(self.variables.keys())
  341. def to_dict(self) -> Dict[str, Any]:
  342. """Serialize VariableSection to a dictionary for storage.
  343. Returns:
  344. Dictionary representation of the section with all metadata and variables.
  345. """
  346. section_dict = {}
  347. if self.title:
  348. section_dict["title"] = self.title
  349. if self.description:
  350. section_dict["description"] = self.description
  351. if self.toggle:
  352. section_dict["toggle"] = self.toggle
  353. # Always store required flag
  354. section_dict["required"] = self.required
  355. # Store dependencies if any
  356. if self.needs:
  357. section_dict["needs"] = self.needs if len(self.needs) > 1 else self.needs[0]
  358. # Serialize all variables using their own to_dict method
  359. section_dict["vars"] = {}
  360. for var_name, variable in self.variables.items():
  361. section_dict["vars"][var_name] = variable.to_dict()
  362. return section_dict
  363. def is_enabled(self) -> bool:
  364. """Check if section is currently enabled based on toggle variable.
  365. Returns:
  366. True if section is enabled (no toggle or toggle is True), False otherwise
  367. """
  368. if not self.toggle:
  369. return True
  370. toggle_var = self.variables.get(self.toggle)
  371. if not toggle_var:
  372. return True
  373. try:
  374. return bool(toggle_var.get_typed_value())
  375. except Exception:
  376. return False
  377. def get_toggle_value(self) -> Optional[bool]:
  378. """Get the current value of the toggle variable.
  379. Returns:
  380. Boolean value of toggle variable, or None if no toggle exists
  381. """
  382. if not self.toggle:
  383. return None
  384. toggle_var = self.variables.get(self.toggle)
  385. if not toggle_var:
  386. return None
  387. try:
  388. return bool(toggle_var.get_typed_value())
  389. except Exception:
  390. return None
  391. def clone(self, origin_update: Optional[str] = None) -> 'VariableSection':
  392. """Create a deep copy of the section with all variables.
  393. This is more efficient than converting to dict and back when copying sections.
  394. Args:
  395. origin_update: Optional origin string to apply to all cloned variables
  396. Returns:
  397. New VariableSection instance with deep-copied variables
  398. Example:
  399. section2 = section1.clone(origin_update='template')
  400. """
  401. # Create new section with same metadata
  402. cloned = VariableSection({
  403. 'key': self.key,
  404. 'title': self.title,
  405. 'description': self.description,
  406. 'toggle': self.toggle,
  407. 'required': self.required,
  408. 'needs': self.needs.copy() if self.needs else None,
  409. })
  410. # Deep copy all variables
  411. for var_name, variable in self.variables.items():
  412. if origin_update:
  413. cloned.variables[var_name] = variable.clone(update={'origin': origin_update})
  414. else:
  415. cloned.variables[var_name] = variable.clone()
  416. return cloned
  417. class VariableCollection:
  418. """Manages variables grouped by sections and builds Jinja context."""
  419. def __init__(self, spec: dict[str, Any]) -> None:
  420. """Initialize VariableCollection from a specification dictionary.
  421. Args:
  422. spec: Dictionary containing the complete variable specification structure
  423. Expected format (as used in compose.py):
  424. {
  425. "section_key": {
  426. "title": "Section Title",
  427. "prompt": "Optional prompt text",
  428. "toggle": "optional_toggle_var_name",
  429. "description": "Optional description",
  430. "vars": {
  431. "var_name": {
  432. "description": "Variable description",
  433. "type": "str",
  434. "default": "default_value",
  435. ...
  436. }
  437. }
  438. }
  439. }
  440. """
  441. if not isinstance(spec, dict):
  442. raise ValueError("Spec must be a dictionary")
  443. self._sections: Dict[str, VariableSection] = {}
  444. # NOTE: The _variable_map provides a flat, O(1) lookup for any variable by its name,
  445. # avoiding the need to iterate through sections. It stores references to the same
  446. # Variable objects contained in the _set structure.
  447. self._variable_map: Dict[str, Variable] = {}
  448. self._initialize_sections(spec)
  449. # Validate dependencies after all sections are loaded
  450. self._validate_dependencies()
  451. def _initialize_sections(self, spec: dict[str, Any]) -> None:
  452. """Initialize sections from the spec."""
  453. for section_key, section_data in spec.items():
  454. if not isinstance(section_data, dict):
  455. continue
  456. section = self._create_section(section_key, section_data)
  457. # Guard against None from empty YAML sections (vars: with no content)
  458. vars_data = section_data.get("vars") or {}
  459. self._initialize_variables(section, vars_data)
  460. self._sections[section_key] = section
  461. def _create_section(self, key: str, data: dict[str, Any]) -> VariableSection:
  462. """Create a VariableSection from data."""
  463. section_init_data = {
  464. "key": key,
  465. "title": data.get("title", key.replace("_", " ").title()),
  466. "description": data.get("description"),
  467. "toggle": data.get("toggle"),
  468. "required": data.get("required", key == "general"),
  469. "needs": data.get("needs")
  470. }
  471. return VariableSection(section_init_data)
  472. def _initialize_variables(self, section: VariableSection, vars_data: dict[str, Any]) -> None:
  473. """Initialize variables for a section."""
  474. # Guard against None from empty YAML sections
  475. if vars_data is None:
  476. vars_data = {}
  477. for var_name, var_data in vars_data.items():
  478. var_init_data = {"name": var_name, **var_data}
  479. variable = Variable(var_init_data)
  480. section.variables[var_name] = variable
  481. # NOTE: Populate the direct lookup map for efficient access.
  482. self._variable_map[var_name] = variable
  483. # Validate toggle variable after all variables are added
  484. self._validate_section_toggle(section)
  485. # FIXME: Add more section-level validation here as needed:
  486. # - Validate that variable names don't conflict across sections (currently allowed but could be confusing)
  487. # - Validate that required sections have at least one non-toggle variable
  488. # - Validate that enum variables have non-empty options lists
  489. # - Validate that variable names follow naming conventions (e.g., lowercase_with_underscores)
  490. # - Validate that default values are compatible with their type definitions
  491. def _validate_section_toggle(self, section: VariableSection) -> None:
  492. """Validate that toggle variable is of type bool if it exists.
  493. If the toggle variable doesn't exist (e.g., filtered out), removes the toggle.
  494. Args:
  495. section: The section to validate
  496. Raises:
  497. ValueError: If toggle variable exists but is not boolean type
  498. """
  499. if not section.toggle:
  500. return
  501. toggle_var = section.variables.get(section.toggle)
  502. if not toggle_var:
  503. # Toggle variable doesn't exist (e.g., was filtered out) - remove toggle metadata
  504. section.toggle = None
  505. return
  506. if toggle_var.type != "bool":
  507. raise ValueError(
  508. f"Section '{section.key}' toggle variable '{section.toggle}' must be type 'bool', "
  509. f"but is type '{toggle_var.type}'"
  510. )
  511. def _validate_dependencies(self) -> None:
  512. """Validate section dependencies for cycles and missing references.
  513. Raises:
  514. ValueError: If circular dependencies or missing section references are found
  515. """
  516. # Check for missing dependencies
  517. for section_key, section in self._sections.items():
  518. for dep in section.needs:
  519. if dep not in self._sections:
  520. raise ValueError(
  521. f"Section '{section_key}' depends on '{dep}', but '{dep}' does not exist"
  522. )
  523. # Check for circular dependencies using depth-first search
  524. visited = set()
  525. rec_stack = set()
  526. def has_cycle(section_key: str) -> bool:
  527. visited.add(section_key)
  528. rec_stack.add(section_key)
  529. section = self._sections[section_key]
  530. for dep in section.needs:
  531. if dep not in visited:
  532. if has_cycle(dep):
  533. return True
  534. elif dep in rec_stack:
  535. raise ValueError(
  536. f"Circular dependency detected: '{section_key}' depends on '{dep}', "
  537. f"which creates a cycle"
  538. )
  539. rec_stack.remove(section_key)
  540. return False
  541. for section_key in self._sections:
  542. if section_key not in visited:
  543. has_cycle(section_key)
  544. def is_section_satisfied(self, section_key: str) -> bool:
  545. """Check if all dependencies for a section are satisfied.
  546. A dependency is satisfied if:
  547. 1. The dependency section exists
  548. 2. The dependency section is enabled (if it has a toggle)
  549. Args:
  550. section_key: The key of the section to check
  551. Returns:
  552. True if all dependencies are satisfied, False otherwise
  553. """
  554. section = self._sections.get(section_key)
  555. if not section:
  556. return False
  557. # No dependencies = always satisfied
  558. if not section.needs:
  559. return True
  560. # Check each dependency
  561. for dep_key in section.needs:
  562. dep_section = self._sections.get(dep_key)
  563. if not dep_section:
  564. logger.warning(f"Section '{section_key}' depends on missing section '{dep_key}'")
  565. return False
  566. # Check if dependency is enabled
  567. if not dep_section.is_enabled():
  568. logger.debug(f"Section '{section_key}' dependency '{dep_key}' is disabled")
  569. return False
  570. return True
  571. def sort_sections(self) -> None:
  572. """Sort sections with the following priority:
  573. 1. Dependencies come before dependents (topological sort)
  574. 2. Required sections first (in their original order)
  575. 3. Enabled sections with satisfied dependencies next (in their original order)
  576. 4. Disabled sections or sections with unsatisfied dependencies last (in their original order)
  577. This maintains the original ordering within each group while organizing
  578. sections logically for display and user interaction, and ensures that
  579. sections are prompted in the correct dependency order.
  580. """
  581. # First, perform topological sort to respect dependencies
  582. sorted_keys = self._topological_sort()
  583. # Then apply priority sorting within dependency groups
  584. section_items = [(key, self._sections[key]) for key in sorted_keys]
  585. # Define sort key: (priority, original_index)
  586. # Priority: 0 = required, 1 = enabled with satisfied dependencies, 2 = disabled or unsatisfied dependencies
  587. def get_sort_key(item_with_index):
  588. index, (key, section) = item_with_index
  589. if section.required:
  590. priority = 0
  591. elif section.is_enabled() and self.is_section_satisfied(key):
  592. priority = 1
  593. else:
  594. priority = 2
  595. return (priority, index)
  596. # Sort with original index to maintain order within each priority group
  597. # Note: This preserves the topological order from earlier
  598. sorted_items = sorted(
  599. enumerate(section_items),
  600. key=get_sort_key
  601. )
  602. # Rebuild _sections dict in new order
  603. self._sections = {key: section for _, (key, section) in sorted_items}
  604. def _topological_sort(self) -> List[str]:
  605. """Perform topological sort on sections based on dependencies.
  606. Uses Kahn's algorithm to ensure dependencies come before dependents.
  607. Preserves original order when no dependencies exist.
  608. Returns:
  609. List of section keys in topologically sorted order
  610. """
  611. # Calculate in-degree (number of dependencies) for each section
  612. in_degree = {key: len(section.needs) for key, section in self._sections.items()}
  613. # Find all sections with no dependencies
  614. queue = [key for key, degree in in_degree.items() if degree == 0]
  615. result = []
  616. # Process sections in order
  617. while queue:
  618. # Sort queue to preserve original order when possible
  619. queue.sort(key=lambda k: list(self._sections.keys()).index(k))
  620. current = queue.pop(0)
  621. result.append(current)
  622. # Find sections that depend on current
  623. for key, section in self._sections.items():
  624. if current in section.needs:
  625. in_degree[key] -= 1
  626. if in_degree[key] == 0:
  627. queue.append(key)
  628. # If not all sections processed, there's a cycle (shouldn't happen due to validation)
  629. if len(result) != len(self._sections):
  630. logger.warning("Topological sort incomplete - possible dependency cycle")
  631. return list(self._sections.keys())
  632. return result
  633. def get_sections(self) -> Dict[str, VariableSection]:
  634. """Get all sections in the collection."""
  635. return self._sections.copy()
  636. def get_section(self, key: str) -> Optional[VariableSection]:
  637. """Get a specific section by its key."""
  638. return self._sections.get(key)
  639. def has_sections(self) -> bool:
  640. """Check if the collection has any sections."""
  641. return bool(self._sections)
  642. def get_all_values(self) -> dict[str, Any]:
  643. """Get all variable values as a dictionary."""
  644. # NOTE: This method is optimized to use the _variable_map for direct O(1) access
  645. # to each variable, which is much faster than iterating through sections.
  646. all_values = {}
  647. for var_name, variable in self._variable_map.items():
  648. all_values[var_name] = variable.get_typed_value()
  649. return all_values
  650. def get_satisfied_values(self) -> dict[str, Any]:
  651. """Get variable values only from sections with satisfied dependencies.
  652. This respects both toggle states and section dependencies, ensuring that:
  653. - Variables from disabled sections (toggle=false) are excluded
  654. - Variables from sections with unsatisfied dependencies are excluded
  655. Returns:
  656. Dictionary of variable names to values for satisfied sections only
  657. """
  658. satisfied_values = {}
  659. for section_key, section in self._sections.items():
  660. # Skip sections with unsatisfied dependencies
  661. if not self.is_section_satisfied(section_key):
  662. logger.debug(f"Excluding variables from section '{section_key}' - dependencies not satisfied")
  663. continue
  664. # Skip disabled sections (toggle check)
  665. if not section.is_enabled():
  666. logger.debug(f"Excluding variables from section '{section_key}' - section is disabled")
  667. continue
  668. # Include all variables from this satisfied section
  669. for var_name, variable in section.variables.items():
  670. satisfied_values[var_name] = variable.get_typed_value()
  671. return satisfied_values
  672. def get_sensitive_variables(self) -> Dict[str, Any]:
  673. """Get only the sensitive variables with their values."""
  674. return {name: var.value for name, var in self._variable_map.items() if var.sensitive and var.value}
  675. def apply_defaults(self, defaults: dict[str, Any], origin: str = "cli") -> list[str]:
  676. """Apply default values to variables, updating their origin.
  677. Args:
  678. defaults: Dictionary mapping variable names to their default values
  679. origin: Source of these defaults (e.g., 'config', 'cli')
  680. Returns:
  681. List of variable names that were successfully updated
  682. """
  683. # NOTE: This method uses the _variable_map for a significant performance gain,
  684. # as it allows direct O(1) lookup of variables instead of iterating
  685. # through all sections to find a match.
  686. successful = []
  687. errors = []
  688. for var_name, value in defaults.items():
  689. try:
  690. variable = self._variable_map.get(var_name)
  691. if not variable:
  692. logger.warning(f"Variable '{var_name}' not found in template")
  693. continue
  694. # Store original value before overriding (for display purposes)
  695. # Only store if this is the first time config is being applied
  696. if origin == "config" and not hasattr(variable, '_original_stored'):
  697. variable.original_value = variable.value
  698. variable._original_stored = True
  699. # Convert and set the new value
  700. converted_value = variable.convert(value)
  701. variable.value = converted_value
  702. # Set origin to the current source (not a chain)
  703. variable.origin = origin
  704. successful.append(var_name)
  705. except ValueError as e:
  706. error_msg = f"Invalid value for '{var_name}': {value} - {e}"
  707. errors.append(error_msg)
  708. logger.error(error_msg)
  709. if errors:
  710. logger.warning(f"Some defaults failed to apply: {'; '.join(errors)}")
  711. return successful
  712. def validate_all(self) -> None:
  713. """Validate all variables in the collection, skipping disabled and unsatisfied sections."""
  714. errors: list[str] = []
  715. for section_key, section in self._sections.items():
  716. # Skip sections with unsatisfied dependencies
  717. if not self.is_section_satisfied(section_key):
  718. logger.debug(f"Skipping validation for section '{section_key}' - dependencies not satisfied")
  719. continue
  720. # Check if the section is disabled by a toggle
  721. if section.toggle:
  722. toggle_var = section.variables.get(section.toggle)
  723. if toggle_var and not toggle_var.get_typed_value():
  724. logger.debug(f"Skipping validation for disabled section: '{section.key}'")
  725. continue # Skip this entire section
  726. # Validate each variable in the section
  727. for var_name, variable in section.variables.items():
  728. try:
  729. # Skip validation for autogenerated variables when empty/None
  730. if variable.autogenerated and (variable.value is None or variable.value == ""):
  731. logger.debug(f"Skipping validation for autogenerated variable: '{section.key}.{var_name}'")
  732. continue
  733. # If value is None and the variable is required, report as missing
  734. if variable.value is None:
  735. if variable.is_required():
  736. errors.append(f"{section.key}.{var_name} (required - no default provided)")
  737. continue
  738. # Attempt to convert/validate typed value
  739. typed = variable.get_typed_value()
  740. # For non-boolean types, treat None or empty string as invalid
  741. if variable.type not in ("bool",) and (typed is None or typed == ""):
  742. if variable.is_required():
  743. errors.append(f"{section.key}.{var_name} (required - cannot be empty)")
  744. else:
  745. errors.append(f"{section.key}.{var_name} (empty)")
  746. except ValueError as e:
  747. errors.append(f"{section.key}.{var_name} (invalid format: {e})")
  748. if errors:
  749. error_msg = "Variable validation failed: " + ", ".join(errors)
  750. logger.error(error_msg)
  751. raise ValueError(error_msg)
  752. def merge(self, other_spec: Union[Dict[str, Any], 'VariableCollection'], origin: str = "override") -> 'VariableCollection':
  753. """Merge another spec or VariableCollection into this one with precedence tracking.
  754. OPTIMIZED: Works directly on objects without dict conversions for better performance.
  755. The other spec/collection has higher precedence and will override values in self.
  756. Creates a new VariableCollection with merged data.
  757. Args:
  758. other_spec: Either a spec dictionary or another VariableCollection to merge
  759. origin: Origin label for variables from other_spec (e.g., 'template', 'config')
  760. Returns:
  761. New VariableCollection with merged data
  762. Example:
  763. module_vars = VariableCollection(module_spec)
  764. template_vars = module_vars.merge(template_spec, origin='template')
  765. # Variables from template_spec override module_spec
  766. # Origins tracked: 'module' or 'module -> template'
  767. """
  768. # Convert dict to VariableCollection if needed (only once)
  769. if isinstance(other_spec, dict):
  770. other = VariableCollection(other_spec)
  771. else:
  772. other = other_spec
  773. # Create new collection without calling __init__ (optimization)
  774. merged = VariableCollection.__new__(VariableCollection)
  775. merged._sections = {}
  776. merged._variable_map = {}
  777. # First pass: clone sections from self
  778. for section_key, self_section in self._sections.items():
  779. if section_key in other._sections:
  780. # Section exists in both - will merge
  781. merged._sections[section_key] = self._merge_sections(
  782. self_section,
  783. other._sections[section_key],
  784. origin
  785. )
  786. else:
  787. # Section only in self - clone it
  788. merged._sections[section_key] = self_section.clone()
  789. # Second pass: add sections that only exist in other
  790. for section_key, other_section in other._sections.items():
  791. if section_key not in merged._sections:
  792. # New section from other - clone with origin update
  793. merged._sections[section_key] = other_section.clone(origin_update=origin)
  794. # Rebuild variable map for O(1) lookups
  795. for section in merged._sections.values():
  796. for var_name, variable in section.variables.items():
  797. merged._variable_map[var_name] = variable
  798. return merged
  799. def _infer_origin_from_context(self) -> str:
  800. """Infer origin from existing variables (fallback)."""
  801. for section in self._sections.values():
  802. for variable in section.variables.values():
  803. if variable.origin:
  804. return variable.origin
  805. return "template"
  806. def _merge_sections(self, self_section: VariableSection, other_section: VariableSection, origin: str) -> VariableSection:
  807. """Merge two sections, with other_section taking precedence.
  808. Args:
  809. self_section: Base section
  810. other_section: Section to merge in (takes precedence)
  811. origin: Origin label for merged variables
  812. Returns:
  813. New merged VariableSection
  814. """
  815. # Start with a clone of self_section
  816. merged_section = self_section.clone()
  817. # Update section metadata from other (other takes precedence)
  818. if other_section.title:
  819. merged_section.title = other_section.title
  820. if other_section.description:
  821. merged_section.description = other_section.description
  822. if other_section.toggle:
  823. merged_section.toggle = other_section.toggle
  824. # Required flag always updated
  825. merged_section.required = other_section.required
  826. # Needs/dependencies always updated
  827. if other_section.needs:
  828. merged_section.needs = other_section.needs.copy()
  829. # Merge variables
  830. for var_name, other_var in other_section.variables.items():
  831. if var_name in merged_section.variables:
  832. # Variable exists in both - merge with other taking precedence
  833. self_var = merged_section.variables[var_name]
  834. # Build update dict with ONLY explicitly provided fields from other
  835. update = {}
  836. if 'type' in other_var._explicit_fields and other_var.type:
  837. update['type'] = other_var.type
  838. if ('value' in other_var._explicit_fields or 'default' in other_var._explicit_fields) and other_var.value is not None:
  839. update['value'] = other_var.value
  840. if 'description' in other_var._explicit_fields and other_var.description:
  841. update['description'] = other_var.description
  842. if 'prompt' in other_var._explicit_fields and other_var.prompt:
  843. update['prompt'] = other_var.prompt
  844. if 'options' in other_var._explicit_fields and other_var.options:
  845. update['options'] = other_var.options
  846. if 'sensitive' in other_var._explicit_fields and other_var.sensitive:
  847. update['sensitive'] = other_var.sensitive
  848. if 'extra' in other_var._explicit_fields and other_var.extra:
  849. update['extra'] = other_var.extra
  850. # Update origin tracking (only keep the current source, not the chain)
  851. update['origin'] = origin
  852. # Clone with updates
  853. merged_section.variables[var_name] = self_var.clone(update=update)
  854. else:
  855. # New variable from other - clone with origin
  856. merged_section.variables[var_name] = other_var.clone(update={'origin': origin})
  857. return merged_section
  858. def filter_to_used(self, used_variables: Set[str], keep_sensitive: bool = True) -> 'VariableCollection':
  859. """Filter collection to only variables that are used (or sensitive).
  860. OPTIMIZED: Works directly on objects without dict conversions for better performance.
  861. Creates a new VariableCollection containing only the variables in used_variables.
  862. Sections with no remaining variables are removed.
  863. Args:
  864. used_variables: Set of variable names that are actually used
  865. keep_sensitive: If True, also keep sensitive variables even if not in used set
  866. Returns:
  867. New VariableCollection with filtered variables
  868. Example:
  869. all_vars = VariableCollection(spec)
  870. used_vars = all_vars.filter_to_used({'var1', 'var2', 'var3'})
  871. # Only var1, var2, var3 (and any sensitive vars) remain
  872. """
  873. # Create new collection without calling __init__ (optimization)
  874. filtered = VariableCollection.__new__(VariableCollection)
  875. filtered._sections = {}
  876. filtered._variable_map = {}
  877. # Filter each section
  878. for section_key, section in self._sections.items():
  879. # Create a new section with same metadata
  880. filtered_section = VariableSection({
  881. 'key': section.key,
  882. 'title': section.title,
  883. 'description': section.description,
  884. 'toggle': section.toggle,
  885. 'required': section.required,
  886. 'needs': section.needs.copy() if section.needs else None,
  887. })
  888. # Clone only the variables that should be included
  889. for var_name, variable in section.variables.items():
  890. # Include if used OR if sensitive (and keep_sensitive is True)
  891. should_include = (
  892. var_name in used_variables or
  893. (keep_sensitive and variable.sensitive)
  894. )
  895. if should_include:
  896. filtered_section.variables[var_name] = variable.clone()
  897. # Only add section if it has variables
  898. if filtered_section.variables:
  899. filtered._sections[section_key] = filtered_section
  900. # Add variables to map
  901. for var_name, variable in filtered_section.variables.items():
  902. filtered._variable_map[var_name] = variable
  903. return filtered
  904. def get_all_variable_names(self) -> Set[str]:
  905. """Get set of all variable names across all sections.
  906. Returns:
  907. Set of all variable names
  908. """
  909. return set(self._variable_map.keys())