from __future__ import annotations from collections import OrderedDict from dataclasses import dataclass, field from typing import Any, Dict, List, Optional, Set, Union from urllib.parse import urlparse import logging import re logger = logging.getLogger(__name__) # ----------------------- # SECTION: Constants # ----------------------- TRUE_VALUES = {"true", "1", "yes", "on"} FALSE_VALUES = {"false", "0", "no", "off"} HOSTNAME_REGEX = re.compile(r"^(?=.{1,253}$)(?!-)[A-Za-z0-9_-]{1,63}(? None: """Initialize Variable from a dictionary containing variable specification. Args: data: Dictionary containing variable specification with required 'name' key and optional keys: description, type, options, prompt, value, default, section, origin Raises: ValueError: If data is not a dict, missing 'name' key, or has invalid default value """ # Validate input if not isinstance(data, dict): raise ValueError("Variable data must be a dictionary") if "name" not in data: raise ValueError("Variable data must contain 'name' key") # Initialize fields self.name: str = data["name"] self.description: Optional[str] = data.get("description") or data.get("display", "") self.type: str = data.get("type", "str") self.options: Optional[List[Any]] = data.get("options", []) self.prompt: Optional[str] = data.get("prompt") self.value: Any = data.get("value") if data.get("value") is not None else data.get("default") self.section: Optional[str] = data.get("section") self.origin: Optional[str] = data.get("origin") self.sensitive: bool = data.get("sensitive", False) # Optional extra explanation used by interactive prompts self.extra: Optional[str] = data.get("extra") # Validate and convert the default/initial value if present if self.value is not None: try: self.value = self.convert(self.value) except ValueError as exc: raise ValueError(f"Invalid default for variable '{self.name}': {exc}") # ------------------------- # SECTION: Validation Helpers # ------------------------- def _validate_not_empty(self, value: Any, converted_value: Any) -> None: """Validate that a value is not empty for non-boolean types.""" if self.type not in ["bool"] and (converted_value is None or converted_value == ""): raise ValueError("value cannot be empty") def _validate_enum_option(self, value: str) -> None: """Validate that a value is in the allowed enum options.""" if self.options and value not in self.options: raise ValueError(f"value must be one of: {', '.join(self.options)}") def _validate_regex_pattern(self, value: str, pattern: re.Pattern, error_msg: str) -> None: """Validate that a value matches a regex pattern.""" if not pattern.fullmatch(value): raise ValueError(error_msg) def _validate_url_structure(self, parsed_url) -> None: """Validate that a parsed URL has required components.""" if not (parsed_url.scheme and parsed_url.netloc): raise ValueError("value must be a valid URL (include scheme and host)") # !SECTION # ------------------------- # SECTION: Type Conversion # ------------------------- def convert(self, value: Any) -> Any: """Validate and convert a raw value based on the variable type.""" if value is None: return None # Treat empty strings as None to avoid storing "" for missing values. if isinstance(value, str) and value.strip() == "": return None # Type conversion mapping for cleaner code converters = { "bool": self._convert_bool, "int": self._convert_int, "float": self._convert_float, "enum": self._convert_enum, "hostname": self._convert_hostname, "url": self._convert_url, "email": self._convert_email, } converter = converters.get(self.type) if converter: return converter(value) # Default to string conversion return str(value) def _convert_bool(self, value: Any) -> bool: """Convert value to boolean.""" if isinstance(value, bool): return value if isinstance(value, str): lowered = value.strip().lower() if lowered in TRUE_VALUES: return True if lowered in FALSE_VALUES: return False raise ValueError("value must be a boolean (true/false)") def _convert_int(self, value: Any) -> Optional[int]: """Convert value to integer.""" if isinstance(value, int): return value if isinstance(value, str) and value.strip() == "": return None try: return int(value) except (TypeError, ValueError) as exc: raise ValueError("value must be an integer") from exc def _convert_float(self, value: Any) -> Optional[float]: """Convert value to float.""" if isinstance(value, float): return value if isinstance(value, str) and value.strip() == "": return None try: return float(value) except (TypeError, ValueError) as exc: raise ValueError("value must be a float") from exc def _convert_enum(self, value: Any) -> Optional[str]: """Convert value to enum option.""" if value == "": return None val = str(value) self._validate_enum_option(val) return val def _convert_hostname(self, value: Any) -> str: """Convert and validate hostname.""" val = str(value).strip() if not val: return None if val.lower() != "localhost": self._validate_regex_pattern(val, HOSTNAME_REGEX, "value must be a valid hostname") return val def _convert_url(self, value: Any) -> str: """Convert and validate URL.""" val = str(value).strip() if not val: return None parsed = urlparse(val) self._validate_url_structure(parsed) return val def _convert_email(self, value: Any) -> str: """Convert and validate email.""" val = str(value).strip() if not val: return None self._validate_regex_pattern(val, EMAIL_REGEX, "value must be a valid email address") return val def get_typed_value(self) -> Any: """Return the stored value converted to the appropriate Python type.""" return self.convert(self.value) def to_dict(self) -> Dict[str, Any]: """Serialize Variable to a dictionary for storage. Returns: Dictionary representation of the variable with only relevant fields. """ var_dict = {} if self.type: var_dict["type"] = self.type if self.value is not None: var_dict["default"] = self.value if self.description: var_dict["description"] = self.description if self.prompt: var_dict["prompt"] = self.prompt if self.sensitive: var_dict["sensitive"] = self.sensitive if self.extra: var_dict["extra"] = self.extra if self.options: var_dict["options"] = self.options if self.origin: var_dict["origin"] = self.origin return var_dict # ------------------------- # SECTION: Display Methods # ------------------------- def get_display_value(self, mask_sensitive: bool = True, max_length: int = 30) -> str: """Get formatted display value with optional masking and truncation. Args: mask_sensitive: If True, mask sensitive values with asterisks max_length: Maximum length before truncation (0 = no limit) Returns: Formatted string representation of the value """ if self.value is None: return "" # Mask sensitive values if self.sensitive and mask_sensitive: return "********" # Convert to string display = str(self.value) # Truncate if needed if max_length > 0 and len(display) > max_length: return display[:max_length - 3] + "..." return display def get_normalized_default(self) -> Any: """Get normalized default value suitable for prompts and display. Handles type conversion and provides sensible defaults for different types. Especially useful for enum, bool, and int types in interactive prompts. Returns: Normalized default value appropriate for the variable type """ try: typed = self.get_typed_value() except Exception: typed = self.value # Enum: ensure default is valid option if self.type == "enum": if not self.options: return typed # If typed is invalid or missing, use first option if typed is None or str(typed) not in self.options: return self.options[0] return str(typed) # Boolean: return as bool type if self.type == "bool": if isinstance(typed, bool): return typed return None if typed is None else bool(typed) # Integer: return as int type if self.type == "int": try: return int(typed) if typed is not None and typed != "" else None except Exception: return None # Default: return string or None return None if typed is None else str(typed) def get_prompt_text(self) -> str: """Get formatted prompt text for interactive input. Returns: Prompt text with optional type hints and descriptions """ prompt_text = self.prompt or self.description or self.name # Add type hint for semantic types if there's a default if self.value is not None and self.type in ["hostname", "email", "url"]: prompt_text += f" ({self.type})" return prompt_text def get_validation_hint(self) -> Optional[str]: """Get validation hint for prompts (e.g., enum options). Returns: Formatted hint string or None if no hint needed """ hints = [] # Add enum options if self.type == "enum" and self.options: hints.append(f"Options: {', '.join(self.options)}") # Add extra help text if self.extra: hints.append(self.extra) return " — ".join(hints) if hints else None def clone(self, update: Optional[Dict[str, Any]] = None) -> 'Variable': """Create a deep copy of the variable with optional field updates. This is more efficient than converting to dict and back when copying variables. Args: update: Optional dictionary of field updates to apply to the clone Returns: New Variable instance with copied data Example: var2 = var1.clone(update={'origin': 'template'}) """ data = { 'name': self.name, 'type': self.type, 'value': self.value, 'description': self.description, 'prompt': self.prompt, 'options': self.options.copy() if self.options else None, 'section': self.section, 'origin': self.origin, 'sensitive': self.sensitive, 'extra': self.extra, } # Apply updates if provided if update: data.update(update) return Variable(data) # !SECTION # !SECTION # ---------------------------- # SECTION: VariableSection Class # ---------------------------- class VariableSection: """Groups variables together with shared metadata for presentation.""" def __init__(self, data: dict[str, Any]) -> None: """Initialize VariableSection from a dictionary. Args: data: Dictionary containing section specification with required 'key' and 'title' keys """ if not isinstance(data, dict): raise ValueError("VariableSection data must be a dictionary") if "key" not in data: raise ValueError("VariableSection data must contain 'key'") if "title" not in data: raise ValueError("VariableSection data must contain 'title'") self.key: str = data["key"] self.title: str = data["title"] self.variables: OrderedDict[str, Variable] = OrderedDict() self.prompt: Optional[str] = data.get("prompt") self.description: Optional[str] = data.get("description") self.toggle: Optional[str] = data.get("toggle") # Default "general" section to required=True, all others to required=False self.required: bool = data.get("required", data["key"] == "general") def variable_names(self) -> list[str]: return list(self.variables.keys()) def to_dict(self) -> Dict[str, Any]: """Serialize VariableSection to a dictionary for storage. Returns: Dictionary representation of the section with all metadata and variables. """ section_dict = {} if self.title: section_dict["title"] = self.title if self.description: section_dict["description"] = self.description if self.prompt: section_dict["prompt"] = self.prompt if self.toggle: section_dict["toggle"] = self.toggle # Always store required flag section_dict["required"] = self.required # Serialize all variables using their own to_dict method section_dict["vars"] = {} for var_name, variable in self.variables.items(): section_dict["vars"][var_name] = variable.to_dict() return section_dict # ------------------------- # SECTION: State Methods # ------------------------- def is_enabled(self) -> bool: """Check if section is currently enabled based on toggle variable. Returns: True if section is enabled (no toggle or toggle is True), False otherwise """ if not self.toggle: return True toggle_var = self.variables.get(self.toggle) if not toggle_var: return True try: return bool(toggle_var.get_typed_value()) except Exception: return False def get_toggle_value(self) -> Optional[bool]: """Get the current value of the toggle variable. Returns: Boolean value of toggle variable, or None if no toggle exists """ if not self.toggle: return None toggle_var = self.variables.get(self.toggle) if not toggle_var: return None try: return bool(toggle_var.get_typed_value()) except Exception: return None def clone(self, origin_update: Optional[str] = None) -> 'VariableSection': """Create a deep copy of the section with all variables. This is more efficient than converting to dict and back when copying sections. Args: origin_update: Optional origin string to apply to all cloned variables Returns: New VariableSection instance with deep-copied variables Example: section2 = section1.clone(origin_update='template') """ # Create new section with same metadata cloned = VariableSection({ 'key': self.key, 'title': self.title, 'prompt': self.prompt, 'description': self.description, 'toggle': self.toggle, 'required': self.required, }) # Deep copy all variables for var_name, variable in self.variables.items(): if origin_update: cloned.variables[var_name] = variable.clone(update={'origin': origin_update}) else: cloned.variables[var_name] = variable.clone() return cloned # !SECTION # !SECTION # -------------------------------- # SECTION: VariableCollection Class # -------------------------------- class VariableCollection: """Manages variables grouped by sections and builds Jinja context.""" def __init__(self, spec: dict[str, Any]) -> None: """Initialize VariableCollection from a specification dictionary. Args: spec: Dictionary containing the complete variable specification structure Expected format (as used in compose.py): { "section_key": { "title": "Section Title", "prompt": "Optional prompt text", "toggle": "optional_toggle_var_name", "description": "Optional description", "vars": { "var_name": { "description": "Variable description", "type": "str", "default": "default_value", ... } } } } """ if not isinstance(spec, dict): raise ValueError("Spec must be a dictionary") self._sections: Dict[str, VariableSection] = {} # NOTE: The _variable_map provides a flat, O(1) lookup for any variable by its name, # avoiding the need to iterate through sections. It stores references to the same # Variable objects contained in the _set structure. self._variable_map: Dict[str, Variable] = {} self._initialize_sections(spec) def _initialize_sections(self, spec: dict[str, Any]) -> None: """Initialize sections from the spec.""" for section_key, section_data in spec.items(): if not isinstance(section_data, dict): continue section = self._create_section(section_key, section_data) # Guard against None from empty YAML sections (vars: with no content) vars_data = section_data.get("vars") or {} self._initialize_variables(section, vars_data) self._sections[section_key] = section def _create_section(self, key: str, data: dict[str, Any]) -> VariableSection: """Create a VariableSection from data.""" section_init_data = { "key": key, "title": data.get("title", key.replace("_", " ").title()), "prompt": data.get("prompt"), "description": data.get("description"), "toggle": data.get("toggle"), "required": data.get("required", key == "general") } return VariableSection(section_init_data) def _initialize_variables(self, section: VariableSection, vars_data: dict[str, Any]) -> None: """Initialize variables for a section.""" # Guard against None from empty YAML sections if vars_data is None: vars_data = {} for var_name, var_data in vars_data.items(): var_init_data = {"name": var_name, **var_data} variable = Variable(var_init_data) section.variables[var_name] = variable # NOTE: Populate the direct lookup map for efficient access. self._variable_map[var_name] = variable # ------------------------- # SECTION: Public API Methods # ------------------------- def get_sections(self) -> Dict[str, VariableSection]: """Get all sections in the collection.""" return self._sections.copy() def get_section(self, key: str) -> Optional[VariableSection]: """Get a specific section by its key.""" return self._sections.get(key) def has_sections(self) -> bool: """Check if the collection has any sections.""" return bool(self._sections) def get_all_values(self) -> dict[str, Any]: """Get all variable values as a dictionary.""" # NOTE: This method is optimized to use the _variable_map for direct O(1) access # to each variable, which is much faster than iterating through sections. all_values = {} for var_name, variable in self._variable_map.items(): all_values[var_name] = variable.get_typed_value() return all_values def get_sensitive_variables(self) -> Dict[str, Any]: """Get only the sensitive variables with their values.""" return {name: var.value for name, var in self._variable_map.items() if var.sensitive and var.value} # !SECTION # ------------------------- # SECTION: Helper Methods # ------------------------- # NOTE: These helper methods reduce code duplication across module.py and prompt.py # by centralizing common variable collection operations def apply_defaults(self, defaults: dict[str, Any], origin: str = "cli") -> list[str]: """Apply default values to variables, updating their origin. Args: defaults: Dictionary mapping variable names to their default values origin: Source of these defaults (e.g., 'config', 'cli') Returns: List of variable names that were successfully updated """ # NOTE: This method uses the _variable_map for a significant performance gain, # as it allows direct O(1) lookup of variables instead of iterating # through all sections to find a match. successful = [] errors = [] for var_name, value in defaults.items(): try: variable = self._variable_map.get(var_name) if not variable: logger.warning(f"Variable '{var_name}' not found in template") continue # Convert and set the new value converted_value = variable.convert(value) variable.value = converted_value # Set origin to the current source (not a chain) variable.origin = origin successful.append(var_name) except ValueError as e: error_msg = f"Invalid value for '{var_name}': {value} - {e}" errors.append(error_msg) logger.error(error_msg) if errors: logger.warning(f"Some defaults failed to apply: {'; '.join(errors)}") return successful def validate_all(self) -> None: """Validate all variables in the collection, skipping disabled sections.""" errors: list[str] = [] for section in self._sections.values(): # Check if the section is disabled by a toggle if section.toggle: toggle_var = section.variables.get(section.toggle) if toggle_var and not toggle_var.get_typed_value(): logger.debug(f"Skipping validation for disabled section: '{section.key}'") continue # Skip this entire section # Validate each variable in the section for var_name, variable in section.variables.items(): try: # If value is None, treat as missing if variable.value is None: errors.append(f"{section.key}.{var_name} (missing)") continue # Attempt to convert/validate typed value typed = variable.get_typed_value() # For non-boolean types, treat None or empty string as invalid if variable.type not in ("bool",) and (typed is None or typed == ""): errors.append(f"{section.key}.{var_name} (empty)") except ValueError as e: errors.append(f"{section.key}.{var_name} (invalid: {e})") if errors: error_msg = "Variable validation failed: " + ", ".join(errors) logger.error(error_msg) raise ValueError(error_msg) def merge(self, other_spec: Union[Dict[str, Any], 'VariableCollection'], origin: str = "override") -> 'VariableCollection': """Merge another spec or VariableCollection into this one with precedence tracking. OPTIMIZED: Works directly on objects without dict conversions for better performance. The other spec/collection has higher precedence and will override values in self. Creates a new VariableCollection with merged data. Args: other_spec: Either a spec dictionary or another VariableCollection to merge origin: Origin label for variables from other_spec (e.g., 'template', 'config') Returns: New VariableCollection with merged data Example: module_vars = VariableCollection(module_spec) template_vars = module_vars.merge(template_spec, origin='template') # Variables from template_spec override module_spec # Origins tracked: 'module' or 'module -> template' """ # Convert dict to VariableCollection if needed (only once) if isinstance(other_spec, dict): other = VariableCollection(other_spec) else: other = other_spec # Create new collection without calling __init__ (optimization) merged = VariableCollection.__new__(VariableCollection) merged._sections = {} merged._variable_map = {} # First pass: clone sections from self for section_key, self_section in self._sections.items(): if section_key in other._sections: # Section exists in both - will merge merged._sections[section_key] = self._merge_sections( self_section, other._sections[section_key], origin ) else: # Section only in self - clone it merged._sections[section_key] = self_section.clone() # Second pass: add sections that only exist in other for section_key, other_section in other._sections.items(): if section_key not in merged._sections: # New section from other - clone with origin update merged._sections[section_key] = other_section.clone(origin_update=origin) # Rebuild variable map for O(1) lookups for section in merged._sections.values(): for var_name, variable in section.variables.items(): merged._variable_map[var_name] = variable return merged def _infer_origin_from_context(self) -> str: """Infer origin from existing variables (fallback).""" for section in self._sections.values(): for variable in section.variables.values(): if variable.origin: return variable.origin return "template" def _merge_sections(self, self_section: VariableSection, other_section: VariableSection, origin: str) -> VariableSection: """Merge two sections, with other_section taking precedence. Args: self_section: Base section other_section: Section to merge in (takes precedence) origin: Origin label for merged variables Returns: New merged VariableSection """ # Start with a clone of self_section merged_section = self_section.clone() # Update section metadata from other (other takes precedence) if other_section.title: merged_section.title = other_section.title if other_section.prompt: merged_section.prompt = other_section.prompt if other_section.description: merged_section.description = other_section.description if other_section.toggle: merged_section.toggle = other_section.toggle # Required flag always updated merged_section.required = other_section.required # Merge variables for var_name, other_var in other_section.variables.items(): if var_name in merged_section.variables: # Variable exists in both - merge with other taking precedence self_var = merged_section.variables[var_name] # Build update dict with other's values taking precedence update = {} if other_var.type: update['type'] = other_var.type if other_var.value is not None: update['value'] = other_var.value if other_var.description: update['description'] = other_var.description if other_var.prompt: update['prompt'] = other_var.prompt if other_var.options: update['options'] = other_var.options if other_var.sensitive: update['sensitive'] = other_var.sensitive if other_var.extra: update['extra'] = other_var.extra # Update origin tracking (only keep the current source, not the chain) update['origin'] = origin # Clone with updates merged_section.variables[var_name] = self_var.clone(update=update) else: # New variable from other - clone with origin merged_section.variables[var_name] = other_var.clone(update={'origin': origin}) return merged_section def filter_to_used(self, used_variables: Set[str], keep_sensitive: bool = True) -> 'VariableCollection': """Filter collection to only variables that are used (or sensitive). OPTIMIZED: Works directly on objects without dict conversions for better performance. Creates a new VariableCollection containing only the variables in used_variables. Sections with no remaining variables are removed. Args: used_variables: Set of variable names that are actually used keep_sensitive: If True, also keep sensitive variables even if not in used set Returns: New VariableCollection with filtered variables Example: all_vars = VariableCollection(spec) used_vars = all_vars.filter_to_used({'var1', 'var2', 'var3'}) # Only var1, var2, var3 (and any sensitive vars) remain """ # Create new collection without calling __init__ (optimization) filtered = VariableCollection.__new__(VariableCollection) filtered._sections = {} filtered._variable_map = {} # Filter each section for section_key, section in self._sections.items(): # Create a new section with same metadata filtered_section = VariableSection({ 'key': section.key, 'title': section.title, 'prompt': section.prompt, 'description': section.description, 'toggle': section.toggle, 'required': section.required, }) # Clone only the variables that should be included for var_name, variable in section.variables.items(): # Include if used OR if sensitive (and keep_sensitive is True) should_include = ( var_name in used_variables or (keep_sensitive and variable.sensitive) ) if should_include: filtered_section.variables[var_name] = variable.clone() # Only add section if it has variables if filtered_section.variables: filtered._sections[section_key] = filtered_section # Add variables to map for var_name, variable in filtered_section.variables.items(): filtered._variable_map[var_name] = variable return filtered def get_all_variable_names(self) -> Set[str]: """Get set of all variable names across all sections. Returns: Set of all variable names """ return set(self._variable_map.keys()) # !SECTION # !SECTION