| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153 |
- from __future__ import annotations
- from collections import OrderedDict
- from dataclasses import dataclass, field
- from typing import Any, Dict, List, Optional, Set, Union
- from urllib.parse import urlparse
- import logging
- import re
- logger = logging.getLogger(__name__)
- # -----------------------
- # SECTION: Constants
- # -----------------------
- TRUE_VALUES = {"true", "1", "yes", "on"}
- FALSE_VALUES = {"false", "0", "no", "off"}
- HOSTNAME_REGEX = re.compile(r"^(?=.{1,253}$)(?!-)[A-Za-z0-9_-]{1,63}(?<!-)(\.(?!-)[A-Za-z0-9_-]{1,63}(?<!-))*$")
- EMAIL_REGEX = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
- # !SECTION
- # ----------------------
- # SECTION: Variable Class
- # ----------------------
- class Variable:
- """Represents a single templating variable with lightweight validation."""
- def __init__(self, data: dict[str, Any]) -> None:
- """Initialize Variable from a dictionary containing variable specification.
-
- Args:
- data: Dictionary containing variable specification with required 'name' key
- and optional keys: description, type, options, prompt, value, default, section, origin
-
- Raises:
- ValueError: If data is not a dict, missing 'name' key, or has invalid default value
- """
- # Validate input
- if not isinstance(data, dict):
- raise ValueError("Variable data must be a dictionary")
-
- if "name" not in data:
- raise ValueError("Variable data must contain 'name' key")
-
- # Track which fields were explicitly provided in source data
- self._explicit_fields: Set[str] = set(data.keys())
-
- # Initialize fields
- self.name: str = data["name"]
- self.description: Optional[str] = data.get("description") or data.get("display", "")
- self.type: str = data.get("type", "str")
- self.options: Optional[List[Any]] = data.get("options", [])
- self.prompt: Optional[str] = data.get("prompt")
- self.value: Any = data.get("value") if data.get("value") is not None else data.get("default")
- self.section: Optional[str] = data.get("section")
- self.origin: Optional[str] = data.get("origin")
- self.sensitive: bool = data.get("sensitive", False)
- # Optional extra explanation used by interactive prompts
- self.extra: Optional[str] = data.get("extra")
- # Flag indicating this variable should be auto-generated when empty
- self.autogenerated: bool = data.get("autogenerated", False)
- # Validate and convert the default/initial value if present
- if self.value is not None:
- try:
- self.value = self.convert(self.value)
- except ValueError as exc:
- raise ValueError(f"Invalid default for variable '{self.name}': {exc}")
- # -------------------------
- # SECTION: Validation Helpers
- # -------------------------
- def _validate_not_empty(self, value: Any, converted_value: Any) -> None:
- """Validate that a value is not empty for non-boolean types."""
- if self.type not in ["bool"] and (converted_value is None or converted_value == ""):
- raise ValueError("value cannot be empty")
- def _validate_enum_option(self, value: str) -> None:
- """Validate that a value is in the allowed enum options."""
- if self.options and value not in self.options:
- raise ValueError(f"value must be one of: {', '.join(self.options)}")
- def _validate_regex_pattern(self, value: str, pattern: re.Pattern, error_msg: str) -> None:
- """Validate that a value matches a regex pattern."""
- if not pattern.fullmatch(value):
- raise ValueError(error_msg)
- def _validate_url_structure(self, parsed_url) -> None:
- """Validate that a parsed URL has required components."""
- if not (parsed_url.scheme and parsed_url.netloc):
- raise ValueError("value must be a valid URL (include scheme and host)")
- # !SECTION
- # -------------------------
- # SECTION: Type Conversion
- # -------------------------
- def convert(self, value: Any) -> Any:
- """Validate and convert a raw value based on the variable type."""
- if value is None:
- return None
- # Treat empty strings as None to avoid storing "" for missing values.
- if isinstance(value, str) and value.strip() == "":
- return None
- # Type conversion mapping for cleaner code
- converters = {
- "bool": self._convert_bool,
- "int": self._convert_int,
- "float": self._convert_float,
- "enum": self._convert_enum,
- "hostname": self._convert_hostname,
- "url": self._convert_url,
- "email": self._convert_email,
- }
-
- converter = converters.get(self.type)
- if converter:
- return converter(value)
-
- # Default to string conversion
- return str(value)
- def _convert_bool(self, value: Any) -> bool:
- """Convert value to boolean."""
- if isinstance(value, bool):
- return value
- if isinstance(value, str):
- lowered = value.strip().lower()
- if lowered in TRUE_VALUES:
- return True
- if lowered in FALSE_VALUES:
- return False
- raise ValueError("value must be a boolean (true/false)")
- def _convert_int(self, value: Any) -> Optional[int]:
- """Convert value to integer."""
- if isinstance(value, int):
- return value
- if isinstance(value, str) and value.strip() == "":
- return None
- try:
- return int(value)
- except (TypeError, ValueError) as exc:
- raise ValueError("value must be an integer") from exc
- def _convert_float(self, value: Any) -> Optional[float]:
- """Convert value to float."""
- if isinstance(value, float):
- return value
- if isinstance(value, str) and value.strip() == "":
- return None
- try:
- return float(value)
- except (TypeError, ValueError) as exc:
- raise ValueError("value must be a float") from exc
- def _convert_enum(self, value: Any) -> Optional[str]:
- """Convert value to enum option."""
- if value == "":
- return None
- val = str(value)
- self._validate_enum_option(val)
- return val
- def _convert_hostname(self, value: Any) -> str:
- """Convert and validate hostname."""
- val = str(value).strip()
- if not val:
- return None
- if val.lower() != "localhost":
- self._validate_regex_pattern(val, HOSTNAME_REGEX, "value must be a valid hostname")
- return val
- def _convert_url(self, value: Any) -> str:
- """Convert and validate URL."""
- val = str(value).strip()
- if not val:
- return None
- parsed = urlparse(val)
- self._validate_url_structure(parsed)
- return val
- def _convert_email(self, value: Any) -> str:
- """Convert and validate email."""
- val = str(value).strip()
- if not val:
- return None
- self._validate_regex_pattern(val, EMAIL_REGEX, "value must be a valid email address")
- return val
- def get_typed_value(self) -> Any:
- """Return the stored value converted to the appropriate Python type."""
- return self.convert(self.value)
-
- def to_dict(self) -> Dict[str, Any]:
- """Serialize Variable to a dictionary for storage.
-
- Returns:
- Dictionary representation of the variable with only relevant fields.
- """
- var_dict = {}
-
- if self.type:
- var_dict["type"] = self.type
-
- if self.value is not None:
- var_dict["default"] = self.value
-
- if self.description:
- var_dict["description"] = self.description
-
- if self.prompt:
- var_dict["prompt"] = self.prompt
-
- if self.sensitive:
- var_dict["sensitive"] = self.sensitive
-
- if self.extra:
- var_dict["extra"] = self.extra
-
- if self.autogenerated:
- var_dict["autogenerated"] = self.autogenerated
-
- if self.options:
- var_dict["options"] = self.options
-
- if self.origin:
- var_dict["origin"] = self.origin
-
- return var_dict
-
- # -------------------------
- # SECTION: Display Methods
- # -------------------------
-
- def get_display_value(self, mask_sensitive: bool = True, max_length: int = 30) -> str:
- """Get formatted display value with optional masking and truncation.
-
- Args:
- mask_sensitive: If True, mask sensitive values with asterisks
- max_length: Maximum length before truncation (0 = no limit)
-
- Returns:
- Formatted string representation of the value
- """
- if self.value is None:
- return ""
-
- # Mask sensitive values
- if self.sensitive and mask_sensitive:
- return "********"
-
- # Convert to string
- display = str(self.value)
-
- # Truncate if needed
- if max_length > 0 and len(display) > max_length:
- return display[:max_length - 3] + "..."
-
- return display
-
- def get_normalized_default(self) -> Any:
- """Get normalized default value suitable for prompts and display.
-
- Handles type conversion and provides sensible defaults for different types.
- Especially useful for enum, bool, and int types in interactive prompts.
-
- Returns:
- Normalized default value appropriate for the variable type
- """
- try:
- typed = self.get_typed_value()
- except Exception:
- typed = self.value
-
- # Enum: ensure default is valid option
- if self.type == "enum":
- if not self.options:
- return typed
- # If typed is invalid or missing, use first option
- if typed is None or str(typed) not in self.options:
- return self.options[0]
- return str(typed)
-
- # Boolean: return as bool type
- if self.type == "bool":
- if isinstance(typed, bool):
- return typed
- return None if typed is None else bool(typed)
-
- # Integer: return as int type
- if self.type == "int":
- try:
- return int(typed) if typed is not None and typed != "" else None
- except Exception:
- return None
-
- # Default: return string or None
- return None if typed is None else str(typed)
-
- def get_prompt_text(self) -> str:
- """Get formatted prompt text for interactive input.
-
- Returns:
- Prompt text with optional type hints and descriptions
- """
- prompt_text = self.prompt or self.description or self.name
-
- # Add type hint for semantic types if there's a default
- if self.value is not None and self.type in ["hostname", "email", "url"]:
- prompt_text += f" ({self.type})"
-
- return prompt_text
-
- def get_validation_hint(self) -> Optional[str]:
- """Get validation hint for prompts (e.g., enum options).
-
- Returns:
- Formatted hint string or None if no hint needed
- """
- hints = []
-
- # Add enum options
- if self.type == "enum" and self.options:
- hints.append(f"Options: {', '.join(self.options)}")
-
- # Add extra help text
- if self.extra:
- hints.append(self.extra)
-
- return " — ".join(hints) if hints else None
-
- def clone(self, update: Optional[Dict[str, Any]] = None) -> 'Variable':
- """Create a deep copy of the variable with optional field updates.
-
- This is more efficient than converting to dict and back when copying variables.
-
- Args:
- update: Optional dictionary of field updates to apply to the clone
-
- Returns:
- New Variable instance with copied data
-
- Example:
- var2 = var1.clone(update={'origin': 'template'})
- """
- data = {
- 'name': self.name,
- 'type': self.type,
- 'value': self.value,
- 'description': self.description,
- 'prompt': self.prompt,
- 'options': self.options.copy() if self.options else None,
- 'section': self.section,
- 'origin': self.origin,
- 'sensitive': self.sensitive,
- 'extra': self.extra,
- 'autogenerated': self.autogenerated,
- }
-
- # Apply updates if provided
- if update:
- data.update(update)
-
- # Create new variable
- cloned = Variable(data)
-
- # Preserve explicit fields from original, and add any update keys
- cloned._explicit_fields = self._explicit_fields.copy()
- if update:
- cloned._explicit_fields.update(update.keys())
-
- return cloned
-
- # !SECTION
- # !SECTION
- # ----------------------------
- # SECTION: VariableSection Class
- # ----------------------------
- class VariableSection:
- """Groups variables together with shared metadata for presentation."""
- def __init__(self, data: dict[str, Any]) -> None:
- """Initialize VariableSection from a dictionary.
-
- Args:
- data: Dictionary containing section specification with required 'key' and 'title' keys
- """
- if not isinstance(data, dict):
- raise ValueError("VariableSection data must be a dictionary")
-
- if "key" not in data:
- raise ValueError("VariableSection data must contain 'key'")
-
- if "title" not in data:
- raise ValueError("VariableSection data must contain 'title'")
-
- self.key: str = data["key"]
- self.title: str = data["title"]
- self.variables: OrderedDict[str, Variable] = OrderedDict()
- self.description: Optional[str] = data.get("description")
- self.toggle: Optional[str] = data.get("toggle")
- # Default "general" section to required=True, all others to required=False
- self.required: bool = data.get("required", data["key"] == "general")
- # Section dependencies - can be string or list of strings
- needs_value = data.get("needs")
- if needs_value:
- if isinstance(needs_value, str):
- self.needs: List[str] = [needs_value]
- elif isinstance(needs_value, list):
- self.needs: List[str] = needs_value
- else:
- raise ValueError(f"Section '{self.key}' has invalid 'needs' value: must be string or list")
- else:
- self.needs: List[str] = []
- def variable_names(self) -> list[str]:
- return list(self.variables.keys())
-
- def to_dict(self) -> Dict[str, Any]:
- """Serialize VariableSection to a dictionary for storage.
-
- Returns:
- Dictionary representation of the section with all metadata and variables.
- """
- section_dict = {}
-
- if self.title:
- section_dict["title"] = self.title
-
- if self.description:
- section_dict["description"] = self.description
-
- if self.toggle:
- section_dict["toggle"] = self.toggle
-
- # Always store required flag
- section_dict["required"] = self.required
-
- # Store dependencies if any
- if self.needs:
- section_dict["needs"] = self.needs if len(self.needs) > 1 else self.needs[0]
-
- # Serialize all variables using their own to_dict method
- section_dict["vars"] = {}
- for var_name, variable in self.variables.items():
- section_dict["vars"][var_name] = variable.to_dict()
-
- return section_dict
-
- # -------------------------
- # SECTION: State Methods
- # -------------------------
-
- def is_enabled(self) -> bool:
- """Check if section is currently enabled based on toggle variable.
-
- Returns:
- True if section is enabled (no toggle or toggle is True), False otherwise
- """
- if not self.toggle:
- return True
-
- toggle_var = self.variables.get(self.toggle)
- if not toggle_var:
- return True
-
- try:
- return bool(toggle_var.get_typed_value())
- except Exception:
- return False
-
- def get_toggle_value(self) -> Optional[bool]:
- """Get the current value of the toggle variable.
-
- Returns:
- Boolean value of toggle variable, or None if no toggle exists
- """
- if not self.toggle:
- return None
-
- toggle_var = self.variables.get(self.toggle)
- if not toggle_var:
- return None
-
- try:
- return bool(toggle_var.get_typed_value())
- except Exception:
- return None
-
- def clone(self, origin_update: Optional[str] = None) -> 'VariableSection':
- """Create a deep copy of the section with all variables.
-
- This is more efficient than converting to dict and back when copying sections.
-
- Args:
- origin_update: Optional origin string to apply to all cloned variables
-
- Returns:
- New VariableSection instance with deep-copied variables
-
- Example:
- section2 = section1.clone(origin_update='template')
- """
- # Create new section with same metadata
- cloned = VariableSection({
- 'key': self.key,
- 'title': self.title,
- 'description': self.description,
- 'toggle': self.toggle,
- 'required': self.required,
- 'needs': self.needs.copy() if self.needs else None,
- })
-
- # Deep copy all variables
- for var_name, variable in self.variables.items():
- if origin_update:
- cloned.variables[var_name] = variable.clone(update={'origin': origin_update})
- else:
- cloned.variables[var_name] = variable.clone()
-
- return cloned
-
- # !SECTION
- # !SECTION
- # --------------------------------
- # SECTION: VariableCollection Class
- # --------------------------------
- class VariableCollection:
- """Manages variables grouped by sections and builds Jinja context."""
- def __init__(self, spec: dict[str, Any]) -> None:
- """Initialize VariableCollection from a specification dictionary.
-
- Args:
- spec: Dictionary containing the complete variable specification structure
- Expected format (as used in compose.py):
- {
- "section_key": {
- "title": "Section Title",
- "prompt": "Optional prompt text",
- "toggle": "optional_toggle_var_name",
- "description": "Optional description",
- "vars": {
- "var_name": {
- "description": "Variable description",
- "type": "str",
- "default": "default_value",
- ...
- }
- }
- }
- }
- """
- if not isinstance(spec, dict):
- raise ValueError("Spec must be a dictionary")
-
- self._sections: Dict[str, VariableSection] = {}
- # NOTE: The _variable_map provides a flat, O(1) lookup for any variable by its name,
- # avoiding the need to iterate through sections. It stores references to the same
- # Variable objects contained in the _set structure.
- self._variable_map: Dict[str, Variable] = {}
- self._initialize_sections(spec)
- # Validate dependencies after all sections are loaded
- self._validate_dependencies()
- def _initialize_sections(self, spec: dict[str, Any]) -> None:
- """Initialize sections from the spec."""
- for section_key, section_data in spec.items():
- if not isinstance(section_data, dict):
- continue
-
- section = self._create_section(section_key, section_data)
- # Guard against None from empty YAML sections (vars: with no content)
- vars_data = section_data.get("vars") or {}
- self._initialize_variables(section, vars_data)
- self._sections[section_key] = section
- def _create_section(self, key: str, data: dict[str, Any]) -> VariableSection:
- """Create a VariableSection from data."""
- section_init_data = {
- "key": key,
- "title": data.get("title", key.replace("_", " ").title()),
- "description": data.get("description"),
- "toggle": data.get("toggle"),
- "required": data.get("required", key == "general"),
- "needs": data.get("needs")
- }
- return VariableSection(section_init_data)
- def _initialize_variables(self, section: VariableSection, vars_data: dict[str, Any]) -> None:
- """Initialize variables for a section."""
- # Guard against None from empty YAML sections
- if vars_data is None:
- vars_data = {}
-
- for var_name, var_data in vars_data.items():
- var_init_data = {"name": var_name, **var_data}
- variable = Variable(var_init_data)
- section.variables[var_name] = variable
- # NOTE: Populate the direct lookup map for efficient access.
- self._variable_map[var_name] = variable
-
- # Validate toggle variable after all variables are added
- self._validate_section_toggle(section)
- # FIXME: Add more section-level validation here as needed:
- # - Validate that variable names don't conflict across sections (currently allowed but could be confusing)
- # - Validate that required sections have at least one non-toggle variable
- # - Validate that enum variables have non-empty options lists
- # - Validate that variable names follow naming conventions (e.g., lowercase_with_underscores)
- # - Validate that default values are compatible with their type definitions
- def _validate_section_toggle(self, section: VariableSection) -> None:
- """Validate that toggle variable is of type bool if it exists.
-
- If the toggle variable doesn't exist (e.g., filtered out), removes the toggle.
-
- Args:
- section: The section to validate
-
- Raises:
- ValueError: If toggle variable exists but is not boolean type
- """
- if not section.toggle:
- return
-
- toggle_var = section.variables.get(section.toggle)
- if not toggle_var:
- # Toggle variable doesn't exist (e.g., was filtered out) - remove toggle metadata
- section.toggle = None
- return
-
- if toggle_var.type != "bool":
- raise ValueError(
- f"Section '{section.key}' toggle variable '{section.toggle}' must be type 'bool', "
- f"but is type '{toggle_var.type}'"
- )
-
- def _validate_dependencies(self) -> None:
- """Validate section dependencies for cycles and missing references.
-
- Raises:
- ValueError: If circular dependencies or missing section references are found
- """
- # Check for missing dependencies
- for section_key, section in self._sections.items():
- for dep in section.needs:
- if dep not in self._sections:
- raise ValueError(
- f"Section '{section_key}' depends on '{dep}', but '{dep}' does not exist"
- )
-
- # Check for circular dependencies using depth-first search
- visited = set()
- rec_stack = set()
-
- def has_cycle(section_key: str) -> bool:
- visited.add(section_key)
- rec_stack.add(section_key)
-
- section = self._sections[section_key]
- for dep in section.needs:
- if dep not in visited:
- if has_cycle(dep):
- return True
- elif dep in rec_stack:
- raise ValueError(
- f"Circular dependency detected: '{section_key}' depends on '{dep}', "
- f"which creates a cycle"
- )
-
- rec_stack.remove(section_key)
- return False
-
- for section_key in self._sections:
- if section_key not in visited:
- has_cycle(section_key)
-
- def is_section_satisfied(self, section_key: str) -> bool:
- """Check if all dependencies for a section are satisfied.
-
- A dependency is satisfied if:
- 1. The dependency section exists
- 2. The dependency section is enabled (if it has a toggle)
-
- Args:
- section_key: The key of the section to check
-
- Returns:
- True if all dependencies are satisfied, False otherwise
- """
- section = self._sections.get(section_key)
- if not section:
- return False
-
- # No dependencies = always satisfied
- if not section.needs:
- return True
-
- # Check each dependency
- for dep_key in section.needs:
- dep_section = self._sections.get(dep_key)
- if not dep_section:
- logger.warning(f"Section '{section_key}' depends on missing section '{dep_key}'")
- return False
-
- # Check if dependency is enabled
- if not dep_section.is_enabled():
- logger.debug(f"Section '{section_key}' dependency '{dep_key}' is disabled")
- return False
-
- return True
- def sort_sections(self) -> None:
- """Sort sections with the following priority:
-
- 1. Dependencies come before dependents (topological sort)
- 2. Required sections first (in their original order)
- 3. Enabled sections with satisfied dependencies next (in their original order)
- 4. Disabled sections or sections with unsatisfied dependencies last (in their original order)
-
- This maintains the original ordering within each group while organizing
- sections logically for display and user interaction, and ensures that
- sections are prompted in the correct dependency order.
- """
- # First, perform topological sort to respect dependencies
- sorted_keys = self._topological_sort()
-
- # Then apply priority sorting within dependency groups
- section_items = [(key, self._sections[key]) for key in sorted_keys]
-
- # Define sort key: (priority, original_index)
- # Priority: 0 = required, 1 = enabled with satisfied dependencies, 2 = disabled or unsatisfied dependencies
- def get_sort_key(item_with_index):
- index, (key, section) = item_with_index
- if section.required:
- priority = 0
- elif section.is_enabled() and self.is_section_satisfied(key):
- priority = 1
- else:
- priority = 2
- return (priority, index)
-
- # Sort with original index to maintain order within each priority group
- # Note: This preserves the topological order from earlier
- sorted_items = sorted(
- enumerate(section_items),
- key=get_sort_key
- )
-
- # Rebuild _sections dict in new order
- self._sections = {key: section for _, (key, section) in sorted_items}
-
- def _topological_sort(self) -> List[str]:
- """Perform topological sort on sections based on dependencies.
-
- Uses Kahn's algorithm to ensure dependencies come before dependents.
- Preserves original order when no dependencies exist.
-
- Returns:
- List of section keys in topologically sorted order
- """
- # Calculate in-degree (number of dependencies) for each section
- in_degree = {key: len(section.needs) for key, section in self._sections.items()}
-
- # Find all sections with no dependencies
- queue = [key for key, degree in in_degree.items() if degree == 0]
- result = []
-
- # Process sections in order
- while queue:
- # Sort queue to preserve original order when possible
- queue.sort(key=lambda k: list(self._sections.keys()).index(k))
-
- current = queue.pop(0)
- result.append(current)
-
- # Find sections that depend on current
- for key, section in self._sections.items():
- if current in section.needs:
- in_degree[key] -= 1
- if in_degree[key] == 0:
- queue.append(key)
-
- # If not all sections processed, there's a cycle (shouldn't happen due to validation)
- if len(result) != len(self._sections):
- logger.warning("Topological sort incomplete - possible dependency cycle")
- return list(self._sections.keys())
-
- return result
- # -------------------------
- # SECTION: Public API Methods
- # -------------------------
- def get_sections(self) -> Dict[str, VariableSection]:
- """Get all sections in the collection."""
- return self._sections.copy()
-
- def get_section(self, key: str) -> Optional[VariableSection]:
- """Get a specific section by its key."""
- return self._sections.get(key)
-
- def has_sections(self) -> bool:
- """Check if the collection has any sections."""
- return bool(self._sections)
- def get_all_values(self) -> dict[str, Any]:
- """Get all variable values as a dictionary."""
- # NOTE: This method is optimized to use the _variable_map for direct O(1) access
- # to each variable, which is much faster than iterating through sections.
- all_values = {}
- for var_name, variable in self._variable_map.items():
- all_values[var_name] = variable.get_typed_value()
- return all_values
-
- def get_satisfied_values(self) -> dict[str, Any]:
- """Get variable values only from sections with satisfied dependencies.
-
- This respects both toggle states and section dependencies, ensuring that:
- - Variables from disabled sections (toggle=false) are excluded
- - Variables from sections with unsatisfied dependencies are excluded
-
- Returns:
- Dictionary of variable names to values for satisfied sections only
- """
- satisfied_values = {}
-
- for section_key, section in self._sections.items():
- # Skip sections with unsatisfied dependencies
- if not self.is_section_satisfied(section_key):
- logger.debug(f"Excluding variables from section '{section_key}' - dependencies not satisfied")
- continue
-
- # Skip disabled sections (toggle check)
- if not section.is_enabled():
- logger.debug(f"Excluding variables from section '{section_key}' - section is disabled")
- continue
-
- # Include all variables from this satisfied section
- for var_name, variable in section.variables.items():
- satisfied_values[var_name] = variable.get_typed_value()
-
- return satisfied_values
- def get_sensitive_variables(self) -> Dict[str, Any]:
- """Get only the sensitive variables with their values."""
- return {name: var.value for name, var in self._variable_map.items() if var.sensitive and var.value}
- # !SECTION
- # -------------------------
- # SECTION: Helper Methods
- # -------------------------
- # NOTE: These helper methods reduce code duplication across module.py and prompt.py
- # by centralizing common variable collection operations
- def apply_defaults(self, defaults: dict[str, Any], origin: str = "cli") -> list[str]:
- """Apply default values to variables, updating their origin.
-
- Args:
- defaults: Dictionary mapping variable names to their default values
- origin: Source of these defaults (e.g., 'config', 'cli')
-
- Returns:
- List of variable names that were successfully updated
- """
- # NOTE: This method uses the _variable_map for a significant performance gain,
- # as it allows direct O(1) lookup of variables instead of iterating
- # through all sections to find a match.
- successful = []
- errors = []
-
- for var_name, value in defaults.items():
- try:
- variable = self._variable_map.get(var_name)
- if not variable:
- logger.warning(f"Variable '{var_name}' not found in template")
- continue
-
- # Convert and set the new value
- converted_value = variable.convert(value)
- variable.value = converted_value
-
- # Set origin to the current source (not a chain)
- variable.origin = origin
-
- successful.append(var_name)
-
- except ValueError as e:
- error_msg = f"Invalid value for '{var_name}': {value} - {e}"
- errors.append(error_msg)
- logger.error(error_msg)
-
- if errors:
- logger.warning(f"Some defaults failed to apply: {'; '.join(errors)}")
-
- return successful
-
- def validate_all(self) -> None:
- """Validate all variables in the collection, skipping disabled and unsatisfied sections."""
- errors: list[str] = []
- for section_key, section in self._sections.items():
- # Skip sections with unsatisfied dependencies
- if not self.is_section_satisfied(section_key):
- logger.debug(f"Skipping validation for section '{section_key}' - dependencies not satisfied")
- continue
-
- # Check if the section is disabled by a toggle
- if section.toggle:
- toggle_var = section.variables.get(section.toggle)
- if toggle_var and not toggle_var.get_typed_value():
- logger.debug(f"Skipping validation for disabled section: '{section.key}'")
- continue # Skip this entire section
- # Validate each variable in the section
- for var_name, variable in section.variables.items():
- try:
- # Skip validation for autogenerated variables when empty/None
- if variable.autogenerated and (variable.value is None or variable.value == ""):
- logger.debug(f"Skipping validation for autogenerated variable: '{section.key}.{var_name}'")
- continue
-
- # If value is None, treat as missing
- if variable.value is None:
- errors.append(f"{section.key}.{var_name} (missing)")
- continue
- # Attempt to convert/validate typed value
- typed = variable.get_typed_value()
- # For non-boolean types, treat None or empty string as invalid
- if variable.type not in ("bool",) and (typed is None or typed == ""):
- errors.append(f"{section.key}.{var_name} (empty)")
- except ValueError as e:
- errors.append(f"{section.key}.{var_name} (invalid: {e})")
- if errors:
- error_msg = "Variable validation failed: " + ", ".join(errors)
- logger.error(error_msg)
- raise ValueError(error_msg)
- def merge(self, other_spec: Union[Dict[str, Any], 'VariableCollection'], origin: str = "override") -> 'VariableCollection':
- """Merge another spec or VariableCollection into this one with precedence tracking.
-
- OPTIMIZED: Works directly on objects without dict conversions for better performance.
-
- The other spec/collection has higher precedence and will override values in self.
- Creates a new VariableCollection with merged data.
-
- Args:
- other_spec: Either a spec dictionary or another VariableCollection to merge
- origin: Origin label for variables from other_spec (e.g., 'template', 'config')
-
- Returns:
- New VariableCollection with merged data
-
- Example:
- module_vars = VariableCollection(module_spec)
- template_vars = module_vars.merge(template_spec, origin='template')
- # Variables from template_spec override module_spec
- # Origins tracked: 'module' or 'module -> template'
- """
- # Convert dict to VariableCollection if needed (only once)
- if isinstance(other_spec, dict):
- other = VariableCollection(other_spec)
- else:
- other = other_spec
-
- # Create new collection without calling __init__ (optimization)
- merged = VariableCollection.__new__(VariableCollection)
- merged._sections = {}
- merged._variable_map = {}
-
- # First pass: clone sections from self
- for section_key, self_section in self._sections.items():
- if section_key in other._sections:
- # Section exists in both - will merge
- merged._sections[section_key] = self._merge_sections(
- self_section,
- other._sections[section_key],
- origin
- )
- else:
- # Section only in self - clone it
- merged._sections[section_key] = self_section.clone()
-
- # Second pass: add sections that only exist in other
- for section_key, other_section in other._sections.items():
- if section_key not in merged._sections:
- # New section from other - clone with origin update
- merged._sections[section_key] = other_section.clone(origin_update=origin)
-
- # Rebuild variable map for O(1) lookups
- for section in merged._sections.values():
- for var_name, variable in section.variables.items():
- merged._variable_map[var_name] = variable
-
- return merged
-
- def _infer_origin_from_context(self) -> str:
- """Infer origin from existing variables (fallback)."""
- for section in self._sections.values():
- for variable in section.variables.values():
- if variable.origin:
- return variable.origin
- return "template"
-
- def _merge_sections(self, self_section: VariableSection, other_section: VariableSection, origin: str) -> VariableSection:
- """Merge two sections, with other_section taking precedence.
-
- Args:
- self_section: Base section
- other_section: Section to merge in (takes precedence)
- origin: Origin label for merged variables
-
- Returns:
- New merged VariableSection
- """
- # Start with a clone of self_section
- merged_section = self_section.clone()
-
- # Update section metadata from other (other takes precedence)
- if other_section.title:
- merged_section.title = other_section.title
- if other_section.description:
- merged_section.description = other_section.description
- if other_section.toggle:
- merged_section.toggle = other_section.toggle
- # Required flag always updated
- merged_section.required = other_section.required
- # Needs/dependencies always updated
- if other_section.needs:
- merged_section.needs = other_section.needs.copy()
-
- # Merge variables
- for var_name, other_var in other_section.variables.items():
- if var_name in merged_section.variables:
- # Variable exists in both - merge with other taking precedence
- self_var = merged_section.variables[var_name]
-
- # Build update dict with ONLY explicitly provided fields from other
- update = {}
- if 'type' in other_var._explicit_fields and other_var.type:
- update['type'] = other_var.type
- if ('value' in other_var._explicit_fields or 'default' in other_var._explicit_fields) and other_var.value is not None:
- update['value'] = other_var.value
- if 'description' in other_var._explicit_fields and other_var.description:
- update['description'] = other_var.description
- if 'prompt' in other_var._explicit_fields and other_var.prompt:
- update['prompt'] = other_var.prompt
- if 'options' in other_var._explicit_fields and other_var.options:
- update['options'] = other_var.options
- if 'sensitive' in other_var._explicit_fields and other_var.sensitive:
- update['sensitive'] = other_var.sensitive
- if 'extra' in other_var._explicit_fields and other_var.extra:
- update['extra'] = other_var.extra
-
- # Update origin tracking (only keep the current source, not the chain)
- update['origin'] = origin
-
- # Clone with updates
- merged_section.variables[var_name] = self_var.clone(update=update)
- else:
- # New variable from other - clone with origin
- merged_section.variables[var_name] = other_var.clone(update={'origin': origin})
-
- return merged_section
-
- def filter_to_used(self, used_variables: Set[str], keep_sensitive: bool = True) -> 'VariableCollection':
- """Filter collection to only variables that are used (or sensitive).
-
- OPTIMIZED: Works directly on objects without dict conversions for better performance.
-
- Creates a new VariableCollection containing only the variables in used_variables.
- Sections with no remaining variables are removed.
-
- Args:
- used_variables: Set of variable names that are actually used
- keep_sensitive: If True, also keep sensitive variables even if not in used set
-
- Returns:
- New VariableCollection with filtered variables
-
- Example:
- all_vars = VariableCollection(spec)
- used_vars = all_vars.filter_to_used({'var1', 'var2', 'var3'})
- # Only var1, var2, var3 (and any sensitive vars) remain
- """
- # Create new collection without calling __init__ (optimization)
- filtered = VariableCollection.__new__(VariableCollection)
- filtered._sections = {}
- filtered._variable_map = {}
-
- # Filter each section
- for section_key, section in self._sections.items():
- # Create a new section with same metadata
- filtered_section = VariableSection({
- 'key': section.key,
- 'title': section.title,
- 'description': section.description,
- 'toggle': section.toggle,
- 'required': section.required,
- 'needs': section.needs.copy() if section.needs else None,
- })
-
- # Clone only the variables that should be included
- for var_name, variable in section.variables.items():
- # Include if used OR if sensitive (and keep_sensitive is True)
- should_include = (
- var_name in used_variables or
- (keep_sensitive and variable.sensitive)
- )
-
- if should_include:
- filtered_section.variables[var_name] = variable.clone()
-
- # Only add section if it has variables
- if filtered_section.variables:
- filtered._sections[section_key] = filtered_section
- # Add variables to map
- for var_name, variable in filtered_section.variables.items():
- filtered._variable_map[var_name] = variable
-
- return filtered
-
- def get_all_variable_names(self) -> Set[str]:
- """Get set of all variable names across all sections.
-
- Returns:
- Set of all variable names
- """
- return set(self._variable_map.keys())
- # !SECTION
- # !SECTION
|