| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894 |
- from __future__ import annotations
- from collections import OrderedDict
- from dataclasses import dataclass, field
- from typing import Any, Dict, List, Optional, Set, Union
- from urllib.parse import urlparse
- import logging
- import re
- logger = logging.getLogger(__name__)
- # -----------------------
- # SECTION: Constants
- # -----------------------
- TRUE_VALUES = {"true", "1", "yes", "on"}
- FALSE_VALUES = {"false", "0", "no", "off"}
- HOSTNAME_REGEX = re.compile(r"^(?=.{1,253}$)(?!-)[A-Za-z0-9_-]{1,63}(?<!-)(\.(?!-)[A-Za-z0-9_-]{1,63}(?<!-))*$")
- EMAIL_REGEX = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
- # !SECTION
- # ----------------------
- # SECTION: Variable Class
- # ----------------------
- class Variable:
- """Represents a single templating variable with lightweight validation."""
- def __init__(self, data: dict[str, Any]) -> None:
- """Initialize Variable from a dictionary containing variable specification.
-
- Args:
- data: Dictionary containing variable specification with required 'name' key
- and optional keys: description, type, options, prompt, value, default, section, origin
-
- Raises:
- ValueError: If data is not a dict, missing 'name' key, or has invalid default value
- """
- # Validate input
- if not isinstance(data, dict):
- raise ValueError("Variable data must be a dictionary")
-
- if "name" not in data:
- raise ValueError("Variable data must contain 'name' key")
-
- # Initialize fields
- self.name: str = data["name"]
- self.description: Optional[str] = data.get("description") or data.get("display", "")
- self.type: str = data.get("type", "str")
- self.options: Optional[List[Any]] = data.get("options", [])
- self.prompt: Optional[str] = data.get("prompt")
- self.value: Any = data.get("value") if data.get("value") is not None else data.get("default")
- self.section: Optional[str] = data.get("section")
- self.origin: Optional[str] = data.get("origin")
- self.sensitive: bool = data.get("sensitive", False)
- # Optional extra explanation used by interactive prompts
- self.extra: Optional[str] = data.get("extra")
- # Validate and convert the default/initial value if present
- if self.value is not None:
- try:
- self.value = self.convert(self.value)
- except ValueError as exc:
- raise ValueError(f"Invalid default for variable '{self.name}': {exc}")
- # -------------------------
- # SECTION: Validation Helpers
- # -------------------------
- def _validate_not_empty(self, value: Any, converted_value: Any) -> None:
- """Validate that a value is not empty for non-boolean types."""
- if self.type not in ["bool"] and (converted_value is None or converted_value == ""):
- raise ValueError("value cannot be empty")
- def _validate_enum_option(self, value: str) -> None:
- """Validate that a value is in the allowed enum options."""
- if self.options and value not in self.options:
- raise ValueError(f"value must be one of: {', '.join(self.options)}")
- def _validate_regex_pattern(self, value: str, pattern: re.Pattern, error_msg: str) -> None:
- """Validate that a value matches a regex pattern."""
- if not pattern.fullmatch(value):
- raise ValueError(error_msg)
- def _validate_url_structure(self, parsed_url) -> None:
- """Validate that a parsed URL has required components."""
- if not (parsed_url.scheme and parsed_url.netloc):
- raise ValueError("value must be a valid URL (include scheme and host)")
- # !SECTION
- # -------------------------
- # SECTION: Type Conversion
- # -------------------------
- def convert(self, value: Any) -> Any:
- """Validate and convert a raw value based on the variable type."""
- if value is None:
- return None
- # Treat empty strings as None to avoid storing "" for missing values.
- if isinstance(value, str) and value.strip() == "":
- return None
- # Type conversion mapping for cleaner code
- converters = {
- "bool": self._convert_bool,
- "int": self._convert_int,
- "float": self._convert_float,
- "enum": self._convert_enum,
- "hostname": self._convert_hostname,
- "url": self._convert_url,
- "email": self._convert_email,
- }
-
- converter = converters.get(self.type)
- if converter:
- return converter(value)
-
- # Default to string conversion
- return str(value)
- def _convert_bool(self, value: Any) -> bool:
- """Convert value to boolean."""
- if isinstance(value, bool):
- return value
- if isinstance(value, str):
- lowered = value.strip().lower()
- if lowered in TRUE_VALUES:
- return True
- if lowered in FALSE_VALUES:
- return False
- raise ValueError("value must be a boolean (true/false)")
- def _convert_int(self, value: Any) -> Optional[int]:
- """Convert value to integer."""
- if isinstance(value, int):
- return value
- if isinstance(value, str) and value.strip() == "":
- return None
- try:
- return int(value)
- except (TypeError, ValueError) as exc:
- raise ValueError("value must be an integer") from exc
- def _convert_float(self, value: Any) -> Optional[float]:
- """Convert value to float."""
- if isinstance(value, float):
- return value
- if isinstance(value, str) and value.strip() == "":
- return None
- try:
- return float(value)
- except (TypeError, ValueError) as exc:
- raise ValueError("value must be a float") from exc
- def _convert_enum(self, value: Any) -> Optional[str]:
- """Convert value to enum option."""
- if value == "":
- return None
- val = str(value)
- self._validate_enum_option(val)
- return val
- def _convert_hostname(self, value: Any) -> str:
- """Convert and validate hostname."""
- val = str(value).strip()
- if not val:
- return None
- if val.lower() != "localhost":
- self._validate_regex_pattern(val, HOSTNAME_REGEX, "value must be a valid hostname")
- return val
- def _convert_url(self, value: Any) -> str:
- """Convert and validate URL."""
- val = str(value).strip()
- if not val:
- return None
- parsed = urlparse(val)
- self._validate_url_structure(parsed)
- return val
- def _convert_email(self, value: Any) -> str:
- """Convert and validate email."""
- val = str(value).strip()
- if not val:
- return None
- self._validate_regex_pattern(val, EMAIL_REGEX, "value must be a valid email address")
- return val
- def get_typed_value(self) -> Any:
- """Return the stored value converted to the appropriate Python type."""
- return self.convert(self.value)
-
- def to_dict(self) -> Dict[str, Any]:
- """Serialize Variable to a dictionary for storage.
-
- Returns:
- Dictionary representation of the variable with only relevant fields.
- """
- var_dict = {}
-
- if self.type:
- var_dict["type"] = self.type
-
- if self.value is not None:
- var_dict["default"] = self.value
-
- if self.description:
- var_dict["description"] = self.description
-
- if self.prompt:
- var_dict["prompt"] = self.prompt
-
- if self.sensitive:
- var_dict["sensitive"] = self.sensitive
-
- if self.extra:
- var_dict["extra"] = self.extra
-
- if self.options:
- var_dict["options"] = self.options
-
- if self.origin:
- var_dict["origin"] = self.origin
-
- return var_dict
-
- # -------------------------
- # SECTION: Display Methods
- # -------------------------
-
- def get_display_value(self, mask_sensitive: bool = True, max_length: int = 30) -> str:
- """Get formatted display value with optional masking and truncation.
-
- Args:
- mask_sensitive: If True, mask sensitive values with asterisks
- max_length: Maximum length before truncation (0 = no limit)
-
- Returns:
- Formatted string representation of the value
- """
- if self.value is None:
- return ""
-
- # Mask sensitive values
- if self.sensitive and mask_sensitive:
- return "********"
-
- # Convert to string
- display = str(self.value)
-
- # Truncate if needed
- if max_length > 0 and len(display) > max_length:
- return display[:max_length - 3] + "..."
-
- return display
-
- def get_normalized_default(self) -> Any:
- """Get normalized default value suitable for prompts and display.
-
- Handles type conversion and provides sensible defaults for different types.
- Especially useful for enum, bool, and int types in interactive prompts.
-
- Returns:
- Normalized default value appropriate for the variable type
- """
- try:
- typed = self.get_typed_value()
- except Exception:
- typed = self.value
-
- # Enum: ensure default is valid option
- if self.type == "enum":
- if not self.options:
- return typed
- # If typed is invalid or missing, use first option
- if typed is None or str(typed) not in self.options:
- return self.options[0]
- return str(typed)
-
- # Boolean: return as bool type
- if self.type == "bool":
- if isinstance(typed, bool):
- return typed
- return None if typed is None else bool(typed)
-
- # Integer: return as int type
- if self.type == "int":
- try:
- return int(typed) if typed is not None and typed != "" else None
- except Exception:
- return None
-
- # Default: return string or None
- return None if typed is None else str(typed)
-
- def get_prompt_text(self) -> str:
- """Get formatted prompt text for interactive input.
-
- Returns:
- Prompt text with optional type hints and descriptions
- """
- prompt_text = self.prompt or self.description or self.name
-
- # Add type hint for semantic types if there's a default
- if self.value is not None and self.type in ["hostname", "email", "url"]:
- prompt_text += f" ({self.type})"
-
- return prompt_text
-
- def get_validation_hint(self) -> Optional[str]:
- """Get validation hint for prompts (e.g., enum options).
-
- Returns:
- Formatted hint string or None if no hint needed
- """
- hints = []
-
- # Add enum options
- if self.type == "enum" and self.options:
- hints.append(f"Options: {', '.join(self.options)}")
-
- # Add extra help text
- if self.extra:
- hints.append(self.extra)
-
- return " — ".join(hints) if hints else None
-
- def clone(self, update: Optional[Dict[str, Any]] = None) -> 'Variable':
- """Create a deep copy of the variable with optional field updates.
-
- This is more efficient than converting to dict and back when copying variables.
-
- Args:
- update: Optional dictionary of field updates to apply to the clone
-
- Returns:
- New Variable instance with copied data
-
- Example:
- var2 = var1.clone(update={'origin': 'template'})
- """
- data = {
- 'name': self.name,
- 'type': self.type,
- 'value': self.value,
- 'description': self.description,
- 'prompt': self.prompt,
- 'options': self.options.copy() if self.options else None,
- 'section': self.section,
- 'origin': self.origin,
- 'sensitive': self.sensitive,
- 'extra': self.extra,
- }
-
- # Apply updates if provided
- if update:
- data.update(update)
-
- return Variable(data)
-
- # !SECTION
- # !SECTION
- # ----------------------------
- # SECTION: VariableSection Class
- # ----------------------------
- class VariableSection:
- """Groups variables together with shared metadata for presentation."""
- def __init__(self, data: dict[str, Any]) -> None:
- """Initialize VariableSection from a dictionary.
-
- Args:
- data: Dictionary containing section specification with required 'key' and 'title' keys
- """
- if not isinstance(data, dict):
- raise ValueError("VariableSection data must be a dictionary")
-
- if "key" not in data:
- raise ValueError("VariableSection data must contain 'key'")
-
- if "title" not in data:
- raise ValueError("VariableSection data must contain 'title'")
-
- self.key: str = data["key"]
- self.title: str = data["title"]
- self.variables: OrderedDict[str, Variable] = OrderedDict()
- self.prompt: Optional[str] = data.get("prompt")
- self.description: Optional[str] = data.get("description")
- self.toggle: Optional[str] = data.get("toggle")
- # Default "general" section to required=True, all others to required=False
- self.required: bool = data.get("required", data["key"] == "general")
- def variable_names(self) -> list[str]:
- return list(self.variables.keys())
-
- def to_dict(self) -> Dict[str, Any]:
- """Serialize VariableSection to a dictionary for storage.
-
- Returns:
- Dictionary representation of the section with all metadata and variables.
- """
- section_dict = {}
-
- if self.title:
- section_dict["title"] = self.title
-
- if self.description:
- section_dict["description"] = self.description
-
- if self.prompt:
- section_dict["prompt"] = self.prompt
-
- if self.toggle:
- section_dict["toggle"] = self.toggle
-
- # Always store required flag
- section_dict["required"] = self.required
-
- # Serialize all variables using their own to_dict method
- section_dict["vars"] = {}
- for var_name, variable in self.variables.items():
- section_dict["vars"][var_name] = variable.to_dict()
-
- return section_dict
-
- # -------------------------
- # SECTION: State Methods
- # -------------------------
-
- def is_enabled(self) -> bool:
- """Check if section is currently enabled based on toggle variable.
-
- Returns:
- True if section is enabled (no toggle or toggle is True), False otherwise
- """
- if not self.toggle:
- return True
-
- toggle_var = self.variables.get(self.toggle)
- if not toggle_var:
- return True
-
- try:
- return bool(toggle_var.get_typed_value())
- except Exception:
- return False
-
- def get_toggle_value(self) -> Optional[bool]:
- """Get the current value of the toggle variable.
-
- Returns:
- Boolean value of toggle variable, or None if no toggle exists
- """
- if not self.toggle:
- return None
-
- toggle_var = self.variables.get(self.toggle)
- if not toggle_var:
- return None
-
- try:
- return bool(toggle_var.get_typed_value())
- except Exception:
- return None
-
- def clone(self, origin_update: Optional[str] = None) -> 'VariableSection':
- """Create a deep copy of the section with all variables.
-
- This is more efficient than converting to dict and back when copying sections.
-
- Args:
- origin_update: Optional origin string to apply to all cloned variables
-
- Returns:
- New VariableSection instance with deep-copied variables
-
- Example:
- section2 = section1.clone(origin_update='template')
- """
- # Create new section with same metadata
- cloned = VariableSection({
- 'key': self.key,
- 'title': self.title,
- 'prompt': self.prompt,
- 'description': self.description,
- 'toggle': self.toggle,
- 'required': self.required,
- })
-
- # Deep copy all variables
- for var_name, variable in self.variables.items():
- if origin_update:
- cloned.variables[var_name] = variable.clone(update={'origin': origin_update})
- else:
- cloned.variables[var_name] = variable.clone()
-
- return cloned
-
- # !SECTION
- # !SECTION
- # --------------------------------
- # SECTION: VariableCollection Class
- # --------------------------------
- class VariableCollection:
- """Manages variables grouped by sections and builds Jinja context."""
- def __init__(self, spec: dict[str, Any]) -> None:
- """Initialize VariableCollection from a specification dictionary.
-
- Args:
- spec: Dictionary containing the complete variable specification structure
- Expected format (as used in compose.py):
- {
- "section_key": {
- "title": "Section Title",
- "prompt": "Optional prompt text",
- "toggle": "optional_toggle_var_name",
- "description": "Optional description",
- "vars": {
- "var_name": {
- "description": "Variable description",
- "type": "str",
- "default": "default_value",
- ...
- }
- }
- }
- }
- """
- if not isinstance(spec, dict):
- raise ValueError("Spec must be a dictionary")
-
- self._sections: Dict[str, VariableSection] = {}
- # NOTE: The _variable_map provides a flat, O(1) lookup for any variable by its name,
- # avoiding the need to iterate through sections. It stores references to the same
- # Variable objects contained in the _set structure.
- self._variable_map: Dict[str, Variable] = {}
- self._initialize_sections(spec)
- def _initialize_sections(self, spec: dict[str, Any]) -> None:
- """Initialize sections from the spec."""
- for section_key, section_data in spec.items():
- if not isinstance(section_data, dict):
- continue
-
- section = self._create_section(section_key, section_data)
- # Guard against None from empty YAML sections (vars: with no content)
- vars_data = section_data.get("vars") or {}
- self._initialize_variables(section, vars_data)
- self._sections[section_key] = section
- def _create_section(self, key: str, data: dict[str, Any]) -> VariableSection:
- """Create a VariableSection from data."""
- section_init_data = {
- "key": key,
- "title": data.get("title", key.replace("_", " ").title()),
- "prompt": data.get("prompt"),
- "description": data.get("description"),
- "toggle": data.get("toggle"),
- "required": data.get("required", key == "general")
- }
- return VariableSection(section_init_data)
- def _initialize_variables(self, section: VariableSection, vars_data: dict[str, Any]) -> None:
- """Initialize variables for a section."""
- # Guard against None from empty YAML sections
- if vars_data is None:
- vars_data = {}
-
- for var_name, var_data in vars_data.items():
- var_init_data = {"name": var_name, **var_data}
- variable = Variable(var_init_data)
- section.variables[var_name] = variable
- # NOTE: Populate the direct lookup map for efficient access.
- self._variable_map[var_name] = variable
- # -------------------------
- # SECTION: Public API Methods
- # -------------------------
- def get_sections(self) -> Dict[str, VariableSection]:
- """Get all sections in the collection."""
- return self._sections.copy()
-
- def get_section(self, key: str) -> Optional[VariableSection]:
- """Get a specific section by its key."""
- return self._sections.get(key)
-
- def has_sections(self) -> bool:
- """Check if the collection has any sections."""
- return bool(self._sections)
- def get_all_values(self) -> dict[str, Any]:
- """Get all variable values as a dictionary."""
- # NOTE: This method is optimized to use the _variable_map for direct O(1) access
- # to each variable, which is much faster than iterating through sections.
- all_values = {}
- for var_name, variable in self._variable_map.items():
- all_values[var_name] = variable.get_typed_value()
- return all_values
- def get_sensitive_variables(self) -> Dict[str, Any]:
- """Get only the sensitive variables with their values."""
- return {name: var.value for name, var in self._variable_map.items() if var.sensitive and var.value}
- # !SECTION
- # -------------------------
- # SECTION: Helper Methods
- # -------------------------
- # NOTE: These helper methods reduce code duplication across module.py and prompt.py
- # by centralizing common variable collection operations
- def apply_defaults(self, defaults: dict[str, Any], origin: str = "cli") -> list[str]:
- """Apply default values to variables, updating their origin.
-
- Args:
- defaults: Dictionary mapping variable names to their default values
- origin: Source of these defaults (e.g., 'config', 'cli')
-
- Returns:
- List of variable names that were successfully updated
- """
- # NOTE: This method uses the _variable_map for a significant performance gain,
- # as it allows direct O(1) lookup of variables instead of iterating
- # through all sections to find a match.
- successful = []
- errors = []
-
- for var_name, value in defaults.items():
- try:
- variable = self._variable_map.get(var_name)
- if not variable:
- logger.warning(f"Variable '{var_name}' not found in template")
- continue
-
- # Convert and set the new value
- converted_value = variable.convert(value)
- variable.value = converted_value
-
- # Set origin to the current source (not a chain)
- variable.origin = origin
-
- successful.append(var_name)
-
- except ValueError as e:
- error_msg = f"Invalid value for '{var_name}': {value} - {e}"
- errors.append(error_msg)
- logger.error(error_msg)
-
- if errors:
- logger.warning(f"Some defaults failed to apply: {'; '.join(errors)}")
-
- return successful
-
- def validate_all(self) -> None:
- """Validate all variables in the collection, skipping disabled sections."""
- errors: list[str] = []
- for section in self._sections.values():
- # Check if the section is disabled by a toggle
- if section.toggle:
- toggle_var = section.variables.get(section.toggle)
- if toggle_var and not toggle_var.get_typed_value():
- logger.debug(f"Skipping validation for disabled section: '{section.key}'")
- continue # Skip this entire section
- # Validate each variable in the section
- for var_name, variable in section.variables.items():
- try:
- # If value is None, treat as missing
- if variable.value is None:
- errors.append(f"{section.key}.{var_name} (missing)")
- continue
- # Attempt to convert/validate typed value
- typed = variable.get_typed_value()
- # For non-boolean types, treat None or empty string as invalid
- if variable.type not in ("bool",) and (typed is None or typed == ""):
- errors.append(f"{section.key}.{var_name} (empty)")
- except ValueError as e:
- errors.append(f"{section.key}.{var_name} (invalid: {e})")
- if errors:
- error_msg = "Variable validation failed: " + ", ".join(errors)
- logger.error(error_msg)
- raise ValueError(error_msg)
- def merge(self, other_spec: Union[Dict[str, Any], 'VariableCollection'], origin: str = "override") -> 'VariableCollection':
- """Merge another spec or VariableCollection into this one with precedence tracking.
-
- OPTIMIZED: Works directly on objects without dict conversions for better performance.
-
- The other spec/collection has higher precedence and will override values in self.
- Creates a new VariableCollection with merged data.
-
- Args:
- other_spec: Either a spec dictionary or another VariableCollection to merge
- origin: Origin label for variables from other_spec (e.g., 'template', 'config')
-
- Returns:
- New VariableCollection with merged data
-
- Example:
- module_vars = VariableCollection(module_spec)
- template_vars = module_vars.merge(template_spec, origin='template')
- # Variables from template_spec override module_spec
- # Origins tracked: 'module' or 'module -> template'
- """
- # Convert dict to VariableCollection if needed (only once)
- if isinstance(other_spec, dict):
- other = VariableCollection(other_spec)
- else:
- other = other_spec
-
- # Create new collection without calling __init__ (optimization)
- merged = VariableCollection.__new__(VariableCollection)
- merged._sections = {}
- merged._variable_map = {}
-
- # First pass: clone sections from self
- for section_key, self_section in self._sections.items():
- if section_key in other._sections:
- # Section exists in both - will merge
- merged._sections[section_key] = self._merge_sections(
- self_section,
- other._sections[section_key],
- origin
- )
- else:
- # Section only in self - clone it
- merged._sections[section_key] = self_section.clone()
-
- # Second pass: add sections that only exist in other
- for section_key, other_section in other._sections.items():
- if section_key not in merged._sections:
- # New section from other - clone with origin update
- merged._sections[section_key] = other_section.clone(origin_update=origin)
-
- # Rebuild variable map for O(1) lookups
- for section in merged._sections.values():
- for var_name, variable in section.variables.items():
- merged._variable_map[var_name] = variable
-
- return merged
-
- def _infer_origin_from_context(self) -> str:
- """Infer origin from existing variables (fallback)."""
- for section in self._sections.values():
- for variable in section.variables.values():
- if variable.origin:
- return variable.origin
- return "template"
-
- def _merge_sections(self, self_section: VariableSection, other_section: VariableSection, origin: str) -> VariableSection:
- """Merge two sections, with other_section taking precedence.
-
- Args:
- self_section: Base section
- other_section: Section to merge in (takes precedence)
- origin: Origin label for merged variables
-
- Returns:
- New merged VariableSection
- """
- # Start with a clone of self_section
- merged_section = self_section.clone()
-
- # Update section metadata from other (other takes precedence)
- if other_section.title:
- merged_section.title = other_section.title
- if other_section.prompt:
- merged_section.prompt = other_section.prompt
- if other_section.description:
- merged_section.description = other_section.description
- if other_section.toggle:
- merged_section.toggle = other_section.toggle
- # Required flag always updated
- merged_section.required = other_section.required
-
- # Merge variables
- for var_name, other_var in other_section.variables.items():
- if var_name in merged_section.variables:
- # Variable exists in both - merge with other taking precedence
- self_var = merged_section.variables[var_name]
-
- # Build update dict with other's values taking precedence
- update = {}
- if other_var.type:
- update['type'] = other_var.type
- if other_var.value is not None:
- update['value'] = other_var.value
- if other_var.description:
- update['description'] = other_var.description
- if other_var.prompt:
- update['prompt'] = other_var.prompt
- if other_var.options:
- update['options'] = other_var.options
- if other_var.sensitive:
- update['sensitive'] = other_var.sensitive
- if other_var.extra:
- update['extra'] = other_var.extra
-
- # Update origin tracking (only keep the current source, not the chain)
- update['origin'] = origin
-
- # Clone with updates
- merged_section.variables[var_name] = self_var.clone(update=update)
- else:
- # New variable from other - clone with origin
- merged_section.variables[var_name] = other_var.clone(update={'origin': origin})
-
- return merged_section
-
- def filter_to_used(self, used_variables: Set[str], keep_sensitive: bool = True) -> 'VariableCollection':
- """Filter collection to only variables that are used (or sensitive).
-
- OPTIMIZED: Works directly on objects without dict conversions for better performance.
-
- Creates a new VariableCollection containing only the variables in used_variables.
- Sections with no remaining variables are removed.
-
- Args:
- used_variables: Set of variable names that are actually used
- keep_sensitive: If True, also keep sensitive variables even if not in used set
-
- Returns:
- New VariableCollection with filtered variables
-
- Example:
- all_vars = VariableCollection(spec)
- used_vars = all_vars.filter_to_used({'var1', 'var2', 'var3'})
- # Only var1, var2, var3 (and any sensitive vars) remain
- """
- # Create new collection without calling __init__ (optimization)
- filtered = VariableCollection.__new__(VariableCollection)
- filtered._sections = {}
- filtered._variable_map = {}
-
- # Filter each section
- for section_key, section in self._sections.items():
- # Create a new section with same metadata
- filtered_section = VariableSection({
- 'key': section.key,
- 'title': section.title,
- 'prompt': section.prompt,
- 'description': section.description,
- 'toggle': section.toggle,
- 'required': section.required,
- })
-
- # Clone only the variables that should be included
- for var_name, variable in section.variables.items():
- # Include if used OR if sensitive (and keep_sensitive is True)
- should_include = (
- var_name in used_variables or
- (keep_sensitive and variable.sensitive)
- )
-
- if should_include:
- filtered_section.variables[var_name] = variable.clone()
-
- # Only add section if it has variables
- if filtered_section.variables:
- filtered._sections[section_key] = filtered_section
- # Add variables to map
- for var_name, variable in filtered_section.variables.items():
- filtered._variable_map[var_name] = variable
-
- return filtered
-
- def get_all_variable_names(self) -> Set[str]:
- """Get set of all variable names across all sections.
-
- Returns:
- Set of all variable names
- """
- return set(self._variable_map.keys())
- # !SECTION
- # !SECTION
|