|
|
@@ -1,6 +1,6 @@
|
|
|
from __future__ import annotations
|
|
|
|
|
|
-import logging
|
|
|
+from dataclasses import dataclass
|
|
|
from typing import TYPE_CHECKING, Any
|
|
|
from urllib.parse import urlparse
|
|
|
|
|
|
@@ -9,105 +9,369 @@ from email_validator import EmailNotValidError, validate_email
|
|
|
if TYPE_CHECKING:
|
|
|
from cli.core.template.variable_section import VariableSection
|
|
|
|
|
|
-logger = logging.getLogger(__name__)
|
|
|
-
|
|
|
-# Constants
|
|
|
DEFAULT_AUTOGENERATED_LENGTH = 32
|
|
|
+DEFAULT_AUTOGENERATED_BYTES = 32
|
|
|
TRUE_VALUES = {"true", "1", "yes", "on"}
|
|
|
FALSE_VALUES = {"false", "0", "no", "off"}
|
|
|
+SECRET_TYPE = "secret"
|
|
|
+SECRET_AUTOGENERATED_KIND_CHARACTERS = "characters"
|
|
|
+SECRET_AUTOGENERATED_KIND_BASE64 = "base64"
|
|
|
+
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class SecretAutogeneratedConfig:
|
|
|
+ """Structured autogeneration settings for secret variables."""
|
|
|
+
|
|
|
+ kind: str = SECRET_AUTOGENERATED_KIND_CHARACTERS
|
|
|
+ length: int | None = None
|
|
|
+ characters: list[str] | None = None
|
|
|
+ bytes: int | None = None
|
|
|
+
|
|
|
+ def clone(self) -> SecretAutogeneratedConfig:
|
|
|
+ return SecretAutogeneratedConfig(
|
|
|
+ kind=self.kind,
|
|
|
+ length=self.length,
|
|
|
+ characters=self.characters.copy() if self.characters else None,
|
|
|
+ bytes=self.bytes,
|
|
|
+ )
|
|
|
+
|
|
|
+ def length_or_default(self) -> int:
|
|
|
+ return self.length if self.length is not None else DEFAULT_AUTOGENERATED_LENGTH
|
|
|
+
|
|
|
+ def bytes_or_default(self) -> int:
|
|
|
+ return self.bytes if self.bytes is not None else DEFAULT_AUTOGENERATED_BYTES
|
|
|
+
|
|
|
+
|
|
|
+@dataclass
|
|
|
+class VariableConfig:
|
|
|
+ """Type-specific variable configuration."""
|
|
|
+
|
|
|
+ placeholder: str | None = None
|
|
|
+ textarea: bool = False
|
|
|
+ unit: str | None = None
|
|
|
+ options: list[str] | None = None
|
|
|
+ slider: bool = False
|
|
|
+ min: int | None = None
|
|
|
+ max: int | None = None
|
|
|
+ step: int | None = None
|
|
|
+ autogenerated: SecretAutogeneratedConfig | None = None
|
|
|
+
|
|
|
+ def clone(self) -> VariableConfig:
|
|
|
+ return VariableConfig(
|
|
|
+ placeholder=self.placeholder,
|
|
|
+ textarea=self.textarea,
|
|
|
+ unit=self.unit,
|
|
|
+ options=self.options.copy() if self.options else None,
|
|
|
+ slider=self.slider,
|
|
|
+ min=self.min,
|
|
|
+ max=self.max,
|
|
|
+ step=self.step,
|
|
|
+ autogenerated=self.autogenerated.clone() if self.autogenerated else None,
|
|
|
+ )
|
|
|
+
|
|
|
+ def is_empty(self) -> bool:
|
|
|
+ return (
|
|
|
+ not self.placeholder
|
|
|
+ and not self.textarea
|
|
|
+ and not self.unit
|
|
|
+ and not self.options
|
|
|
+ and not self.slider
|
|
|
+ and self.min is None
|
|
|
+ and self.max is None
|
|
|
+ and self.step is None
|
|
|
+ and self.autogenerated is None
|
|
|
+ )
|
|
|
|
|
|
|
|
|
class Variable:
|
|
|
"""Represents a single templating variable with lightweight validation."""
|
|
|
|
|
|
def __init__(self, data: dict[str, Any]) -> None:
|
|
|
- """Initialize Variable from a dictionary containing variable specification.
|
|
|
-
|
|
|
- Args:
|
|
|
- data: Dictionary containing variable specification with required
|
|
|
- 'name' key and optional keys: description, type, options,
|
|
|
- prompt, value, default, section, origin
|
|
|
-
|
|
|
- Raises:
|
|
|
- ValueError: If data is not a dict, missing 'name' key, or has invalid default value
|
|
|
- """
|
|
|
- # Validate input
|
|
|
- if not isinstance(data, dict):
|
|
|
- raise ValueError("Variable data must be a dictionary")
|
|
|
-
|
|
|
- if "name" not in data:
|
|
|
- raise ValueError("Variable data must contain 'name' key")
|
|
|
-
|
|
|
- # Track which fields were explicitly provided in source data
|
|
|
+ self._validate_input_data(data)
|
|
|
self._explicit_fields: set[str] = set(data.keys())
|
|
|
|
|
|
- # Initialize fields
|
|
|
self.name: str = data["name"]
|
|
|
- # Reference to parent section (set by VariableCollection)
|
|
|
self.parent_section: VariableSection | None = data.get("parent_section")
|
|
|
self.description: str | None = data.get("description") or data.get("display", "")
|
|
|
self.type: str = data.get("type", "str")
|
|
|
- self.options: list[Any] | None = data.get("options", [])
|
|
|
self.prompt: str | None = data.get("prompt")
|
|
|
- if "value" in data:
|
|
|
- self.value: Any = data.get("value")
|
|
|
- elif "default" in data:
|
|
|
- self.value: Any = data.get("default")
|
|
|
- else:
|
|
|
- # RULE: If bool variables don't have any default value or value at all,
|
|
|
- # automatically set them to false
|
|
|
- self.value: Any = False if self.type == "bool" else None
|
|
|
+ self.value: Any = self._resolve_initial_value(data)
|
|
|
self.origin: str | None = data.get("origin")
|
|
|
- self.sensitive: bool = data.get("sensitive", False)
|
|
|
- # Optional extra explanation used by interactive prompts
|
|
|
self.extra: str | None = data.get("extra")
|
|
|
- # Flag indicating this variable should be auto-generated when empty
|
|
|
- self.autogenerated: bool = data.get("autogenerated", False)
|
|
|
- # Length of auto-generated value
|
|
|
- self.autogenerated_length: int = data.get("autogenerated_length", DEFAULT_AUTOGENERATED_LENGTH)
|
|
|
- # Flag indicating if autogenerated value should be base64 encoded
|
|
|
- self.autogenerated_base64: bool = data.get("autogenerated_base64", False)
|
|
|
- # Flag indicating this variable is required (must have a value)
|
|
|
self.required: bool = data.get("required", False)
|
|
|
- # Original value before config override (used for display)
|
|
|
self.original_value: Any | None = data.get("original_value")
|
|
|
- # Variable dependencies - can be string or list of strings in format "var_name=value"
|
|
|
- # Supports semicolon-separated multiple conditions: "var1=value1;var2=value2,value3"
|
|
|
- needs_value = data.get("needs")
|
|
|
- if needs_value:
|
|
|
- if isinstance(needs_value, str):
|
|
|
- # Split by semicolon to support multiple AND conditions in a single string
|
|
|
- # Example: "traefik_enabled=true;network_mode=bridge,macvlan"
|
|
|
- self.needs: list[str] = [need.strip() for need in needs_value.split(";") if need.strip()]
|
|
|
- elif isinstance(needs_value, list):
|
|
|
- self.needs: list[str] = needs_value
|
|
|
- else:
|
|
|
- raise ValueError(f"Variable '{self.name}' has invalid 'needs' value: must be string or list")
|
|
|
+
|
|
|
+ self.config = self._normalize_config(self.name, self.type, data.get("config"))
|
|
|
+ self._apply_config_state()
|
|
|
+ self._validate_secret_defaults(data)
|
|
|
+ self.needs = self._parse_needs(data.get("needs"))
|
|
|
+ self._validate_initial_value()
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _validate_input_data(data: dict[str, Any]) -> None:
|
|
|
+ if not isinstance(data, dict):
|
|
|
+ raise ValueError("Variable data must be a dictionary")
|
|
|
+ if "name" not in data:
|
|
|
+ raise ValueError("Variable data must contain 'name' key")
|
|
|
+
|
|
|
+ def _resolve_initial_value(self, data: dict[str, Any]) -> Any:
|
|
|
+ if "value" in data:
|
|
|
+ return data.get("value")
|
|
|
+ if "default" in data:
|
|
|
+ return data.get("default")
|
|
|
+ return False if self.type == "bool" else None
|
|
|
+
|
|
|
+ def _apply_config_state(self) -> None:
|
|
|
+ self.options: list[str] | None = self.config.options.copy() if self.config.options else None
|
|
|
+ self.autogenerated_config: SecretAutogeneratedConfig | None = (
|
|
|
+ self.config.autogenerated.clone() if self.config.autogenerated else None
|
|
|
+ )
|
|
|
+ self.autogenerated: bool = self.autogenerated_config is not None
|
|
|
+ self.autogenerated_length: int = (
|
|
|
+ self.autogenerated_config.length_or_default() if self.autogenerated_config else DEFAULT_AUTOGENERATED_LENGTH
|
|
|
+ )
|
|
|
+ self.autogenerated_base64: bool = bool(
|
|
|
+ self.autogenerated_config and self.autogenerated_config.kind == SECRET_AUTOGENERATED_KIND_BASE64
|
|
|
+ )
|
|
|
+
|
|
|
+ def _validate_secret_defaults(self, data: dict[str, Any]) -> None:
|
|
|
+ if self.type == SECRET_TYPE and self.autogenerated and "default" in data:
|
|
|
+ raise ValueError(
|
|
|
+ f"Invalid default for variable '{self.name}': autogenerated secrets cannot define defaults"
|
|
|
+ )
|
|
|
+
|
|
|
+ def _parse_needs(self, needs_value: Any) -> list[str]:
|
|
|
+ if not needs_value:
|
|
|
+ return []
|
|
|
+ if isinstance(needs_value, str):
|
|
|
+ return [need.strip() for need in needs_value.split(";") if need.strip()]
|
|
|
+ if isinstance(needs_value, list):
|
|
|
+ return needs_value
|
|
|
+ raise ValueError(f"Variable '{self.name}' has invalid 'needs' value: must be string or list")
|
|
|
+
|
|
|
+ def _validate_initial_value(self) -> None:
|
|
|
+ if self.value is None:
|
|
|
+ return
|
|
|
+ try:
|
|
|
+ self.value = self.convert(self.value)
|
|
|
+ if self.type == "int" and self.value is not None and self.config.slider:
|
|
|
+ self._validate_slider_value(self.value)
|
|
|
+ except ValueError as exc:
|
|
|
+ raise ValueError(f"Invalid default for variable '{self.name}': {exc}") from exc
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _normalize_str_list(values: list[Any] | None) -> list[str] | None:
|
|
|
+ if not values:
|
|
|
+ return None
|
|
|
+
|
|
|
+ normalized: list[str] = []
|
|
|
+ seen: set[str] = set()
|
|
|
+ for value in values:
|
|
|
+ item = str(value).strip()
|
|
|
+ if not item or item in seen:
|
|
|
+ continue
|
|
|
+ seen.add(item)
|
|
|
+ normalized.append(item)
|
|
|
+ return normalized or None
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _parse_secret_autogenerated(
|
|
|
+ cls,
|
|
|
+ variable_name: str,
|
|
|
+ autogenerated_input: Any,
|
|
|
+ legacy_length: Any = None,
|
|
|
+ legacy_base64: bool = False,
|
|
|
+ ) -> SecretAutogeneratedConfig | None:
|
|
|
+ if autogenerated_input in (None, False):
|
|
|
+ return cls._legacy_secret_autogenerated(legacy_length, legacy_base64)
|
|
|
+
|
|
|
+ if autogenerated_input is True:
|
|
|
+ return cls._boolean_secret_autogenerated(variable_name, legacy_length, legacy_base64)
|
|
|
+
|
|
|
+ if not isinstance(autogenerated_input, dict):
|
|
|
+ raise ValueError("autogenerated must be a boolean or object")
|
|
|
+
|
|
|
+ config = cls._dict_secret_autogenerated(autogenerated_input, legacy_length, legacy_base64)
|
|
|
+ return cls._validate_secret_autogenerated(variable_name, config)
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _legacy_secret_autogenerated(
|
|
|
+ legacy_length: Any,
|
|
|
+ legacy_base64: bool,
|
|
|
+ ) -> SecretAutogeneratedConfig | None:
|
|
|
+ if not legacy_base64:
|
|
|
+ return None
|
|
|
+ return SecretAutogeneratedConfig(
|
|
|
+ kind=SECRET_AUTOGENERATED_KIND_BASE64,
|
|
|
+ bytes=int(legacy_length) if legacy_length is not None else DEFAULT_AUTOGENERATED_BYTES,
|
|
|
+ )
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _boolean_secret_autogenerated(
|
|
|
+ cls,
|
|
|
+ variable_name: str,
|
|
|
+ legacy_length: Any,
|
|
|
+ legacy_base64: bool,
|
|
|
+ ) -> SecretAutogeneratedConfig:
|
|
|
+ legacy_config = cls._legacy_secret_autogenerated(legacy_length, legacy_base64)
|
|
|
+ if legacy_config is not None:
|
|
|
+ return legacy_config
|
|
|
+ config = SecretAutogeneratedConfig()
|
|
|
+ if legacy_length is not None:
|
|
|
+ config.length = int(legacy_length)
|
|
|
+ return cls._validate_secret_autogenerated(variable_name, config)
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _dict_secret_autogenerated(
|
|
|
+ cls,
|
|
|
+ autogenerated_input: dict[str, Any],
|
|
|
+ legacy_length: Any,
|
|
|
+ legacy_base64: bool,
|
|
|
+ ) -> SecretAutogeneratedConfig:
|
|
|
+ kind = str(
|
|
|
+ autogenerated_input.get("kind")
|
|
|
+ or (SECRET_AUTOGENERATED_KIND_BASE64 if legacy_base64 else SECRET_AUTOGENERATED_KIND_CHARACTERS)
|
|
|
+ ).strip()
|
|
|
+ config = SecretAutogeneratedConfig(kind=kind)
|
|
|
+
|
|
|
+ if autogenerated_input.get("length") is not None:
|
|
|
+ config.length = int(autogenerated_input["length"])
|
|
|
+ elif legacy_length is not None and kind != SECRET_AUTOGENERATED_KIND_BASE64:
|
|
|
+ config.length = int(legacy_length)
|
|
|
+
|
|
|
+ if autogenerated_input.get("bytes") is not None:
|
|
|
+ config.bytes = int(autogenerated_input["bytes"])
|
|
|
+ if autogenerated_input.get("characters") is not None:
|
|
|
+ config.characters = cls._normalize_secret_characters(autogenerated_input["characters"])
|
|
|
+ return config
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _normalize_secret_characters(characters: Any) -> list[str] | None:
|
|
|
+ if not isinstance(characters, list):
|
|
|
+ raise ValueError("autogenerated.characters must be a list")
|
|
|
+
|
|
|
+ normalized_characters = []
|
|
|
+ seen: set[str] = set()
|
|
|
+ for char in characters:
|
|
|
+ item = str(char).strip()
|
|
|
+ if not item:
|
|
|
+ continue
|
|
|
+ if len(item) != 1:
|
|
|
+ raise ValueError("autogenerated.characters entries must each be exactly one character")
|
|
|
+ if item in seen:
|
|
|
+ continue
|
|
|
+ seen.add(item)
|
|
|
+ normalized_characters.append(item)
|
|
|
+ return normalized_characters or None
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _validate_secret_autogenerated(
|
|
|
+ variable_name: str,
|
|
|
+ config: SecretAutogeneratedConfig,
|
|
|
+ ) -> SecretAutogeneratedConfig:
|
|
|
+ kind = (config.kind or SECRET_AUTOGENERATED_KIND_CHARACTERS).strip()
|
|
|
+ if kind not in (SECRET_AUTOGENERATED_KIND_CHARACTERS, SECRET_AUTOGENERATED_KIND_BASE64):
|
|
|
+ raise ValueError(
|
|
|
+ f"variable '{variable_name}' autogenerated.kind must be one of "
|
|
|
+ f"'{SECRET_AUTOGENERATED_KIND_CHARACTERS}' or '{SECRET_AUTOGENERATED_KIND_BASE64}'"
|
|
|
+ )
|
|
|
+ config.kind = kind
|
|
|
+
|
|
|
+ if kind == SECRET_AUTOGENERATED_KIND_CHARACTERS:
|
|
|
+ if config.bytes is not None:
|
|
|
+ raise ValueError(f"variable '{variable_name}' character autogenerated secrets cannot define bytes")
|
|
|
+ if config.length is not None and config.length <= 0:
|
|
|
+ raise ValueError(f"variable '{variable_name}' autogenerated.length must be greater than 0")
|
|
|
+ if config.characters is not None and not config.characters:
|
|
|
+ raise ValueError(f"variable '{variable_name}' autogenerated.characters must not be empty")
|
|
|
+ return config
|
|
|
+
|
|
|
+ if config.length is not None:
|
|
|
+ raise ValueError(
|
|
|
+ f"variable '{variable_name}' base64 autogenerated secrets must use bytes instead of length"
|
|
|
+ )
|
|
|
+ if config.characters is not None:
|
|
|
+ raise ValueError(f"variable '{variable_name}' base64 autogenerated secrets cannot define characters")
|
|
|
+ if config.bytes is not None and config.bytes <= 0:
|
|
|
+ raise ValueError(f"variable '{variable_name}' autogenerated.bytes must be greater than 0")
|
|
|
+ return config
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def _normalize_config(
|
|
|
+ cls,
|
|
|
+ variable_name: str,
|
|
|
+ variable_type: str,
|
|
|
+ config_input: Any,
|
|
|
+ ) -> VariableConfig:
|
|
|
+ if config_input is None:
|
|
|
+ config_data: dict[str, Any] = {}
|
|
|
+ elif isinstance(config_input, dict):
|
|
|
+ config_data = config_input.copy()
|
|
|
else:
|
|
|
- self.needs: list[str] = []
|
|
|
+ raise ValueError(f"Variable '{variable_name}' config must be a dictionary")
|
|
|
+
|
|
|
+ options = cls._normalize_str_list(config_data.get("options"))
|
|
|
+
|
|
|
+ placeholder = config_data.get("placeholder")
|
|
|
+ placeholder = str(placeholder).strip() if placeholder not in (None, "") else None
|
|
|
+ textarea = bool(config_data.get("textarea", False))
|
|
|
+ unit = config_data.get("unit")
|
|
|
+ unit = str(unit).strip() if unit not in (None, "") else None
|
|
|
+ slider = bool(config_data.get("slider", False))
|
|
|
+
|
|
|
+ min_value = config_data.get("min")
|
|
|
+ max_value = config_data.get("max")
|
|
|
+ step_value = config_data.get("step")
|
|
|
+
|
|
|
+ min_int = int(min_value) if min_value is not None else None
|
|
|
+ max_int = int(max_value) if max_value is not None else None
|
|
|
+ step_int = int(step_value) if step_value is not None else None
|
|
|
+
|
|
|
+ autogenerated_input = config_data.get("autogenerated")
|
|
|
+ autogenerated_config = None
|
|
|
+ if autogenerated_input not in (None, False) and variable_type != SECRET_TYPE:
|
|
|
+ raise ValueError("autogenerated is only supported for secret variables")
|
|
|
+ if variable_type == SECRET_TYPE:
|
|
|
+ autogenerated_config = cls._parse_secret_autogenerated(
|
|
|
+ variable_name,
|
|
|
+ autogenerated_input,
|
|
|
+ )
|
|
|
|
|
|
- # Validate and convert the default/initial value if present
|
|
|
- if self.value is not None:
|
|
|
- try:
|
|
|
- self.value = self.convert(self.value)
|
|
|
- except ValueError as exc:
|
|
|
- raise ValueError(f"Invalid default for variable '{self.name}': {exc}") from exc
|
|
|
+ config = VariableConfig(
|
|
|
+ placeholder=placeholder,
|
|
|
+ textarea=textarea,
|
|
|
+ unit=unit,
|
|
|
+ options=options,
|
|
|
+ slider=slider,
|
|
|
+ min=min_int,
|
|
|
+ max=max_int,
|
|
|
+ step=step_int,
|
|
|
+ autogenerated=autogenerated_config,
|
|
|
+ )
|
|
|
+
|
|
|
+ if variable_type == "enum" and not config.options:
|
|
|
+ raise ValueError("enum variables require non-empty options")
|
|
|
+
|
|
|
+ if variable_type == "int" and config.slider:
|
|
|
+ if config.min is None or config.max is None:
|
|
|
+ raise ValueError("slider variables require min and max")
|
|
|
+ if config.max < config.min:
|
|
|
+ raise ValueError("slider variables require max >= min")
|
|
|
+ if config.step is not None and config.step <= 0:
|
|
|
+ raise ValueError("slider variables require step > 0")
|
|
|
+
|
|
|
+ return config
|
|
|
+
|
|
|
+ def is_secret(self) -> bool:
|
|
|
+ return self.type == SECRET_TYPE
|
|
|
|
|
|
def convert(self, value: Any) -> Any:
|
|
|
- """Validate and convert a raw value based on the variable type.
|
|
|
-
|
|
|
- This method performs type conversion but does NOT check if the value
|
|
|
- is required. Use validate_and_convert() for full validation including
|
|
|
- required field checks.
|
|
|
- """
|
|
|
if value is None:
|
|
|
return None
|
|
|
|
|
|
- # Treat empty strings as None to avoid storing "" for missing values.
|
|
|
if isinstance(value, str) and value.strip() == "":
|
|
|
return None
|
|
|
|
|
|
- # Type conversion mapping for cleaner code
|
|
|
converters = {
|
|
|
"bool": self._convert_bool,
|
|
|
"int": self._convert_int,
|
|
|
@@ -121,44 +385,14 @@ class Variable:
|
|
|
if converter:
|
|
|
return converter(value)
|
|
|
|
|
|
- # Default to string conversion
|
|
|
return str(value)
|
|
|
|
|
|
def validate_and_convert(self, value: Any, check_required: bool = True) -> Any:
|
|
|
- """Validate and convert a value with comprehensive checks.
|
|
|
-
|
|
|
- This method combines type conversion with validation logic including
|
|
|
- required field checks. It's the recommended method for user input validation.
|
|
|
-
|
|
|
- Args:
|
|
|
- value: The raw value to validate and convert
|
|
|
- check_required: If True, raises ValueError for required fields with empty values
|
|
|
-
|
|
|
- Returns:
|
|
|
- The converted and validated value
|
|
|
-
|
|
|
- Raises:
|
|
|
- ValueError: If validation fails (invalid format, required field empty, etc.)
|
|
|
-
|
|
|
- Examples:
|
|
|
- # Basic validation
|
|
|
- var.validate_and_convert("example@email.com") # Returns validated email
|
|
|
-
|
|
|
- # Required field validation
|
|
|
- var.validate_and_convert("", check_required=True) # Raises ValueError if required
|
|
|
-
|
|
|
- # Autogenerated variables - allow empty values
|
|
|
- var.validate_and_convert("", check_required=False) # Returns None for autogeneration
|
|
|
- """
|
|
|
- # First, convert the value using standard type conversion
|
|
|
converted = self.convert(value)
|
|
|
|
|
|
- # Special handling for autogenerated variables
|
|
|
- # Allow empty values as they will be auto-generated later
|
|
|
- if self.autogenerated and (converted is None or (isinstance(converted, str) and (converted in {"", "*auto"}))):
|
|
|
- return None # Signal that auto-generation should happen
|
|
|
+ if self.autogenerated and (converted is None or (isinstance(converted, str) and converted in {"", "*auto"})):
|
|
|
+ return None
|
|
|
|
|
|
- # Check if this is a required field and the value is empty
|
|
|
if (
|
|
|
check_required
|
|
|
and self.is_required()
|
|
|
@@ -166,10 +400,12 @@ class Variable:
|
|
|
):
|
|
|
raise ValueError("This field is required and cannot be empty")
|
|
|
|
|
|
+ if self.type == "int" and converted is not None and self.config.slider:
|
|
|
+ self._validate_slider_value(converted)
|
|
|
+
|
|
|
return converted
|
|
|
|
|
|
def _convert_bool(self, value: Any) -> bool:
|
|
|
- """Convert value to boolean."""
|
|
|
if isinstance(value, bool):
|
|
|
return value
|
|
|
if isinstance(value, str):
|
|
|
@@ -181,7 +417,8 @@ class Variable:
|
|
|
raise ValueError("value must be a boolean (true/false)")
|
|
|
|
|
|
def _convert_int(self, value: Any) -> int | None:
|
|
|
- """Convert value to integer."""
|
|
|
+ if isinstance(value, bool):
|
|
|
+ raise ValueError("value must be an integer")
|
|
|
if isinstance(value, int):
|
|
|
return value
|
|
|
if isinstance(value, str) and value.strip() == "":
|
|
|
@@ -192,9 +429,10 @@ class Variable:
|
|
|
raise ValueError("value must be an integer") from exc
|
|
|
|
|
|
def _convert_float(self, value: Any) -> float | None:
|
|
|
- """Convert value to float."""
|
|
|
- if isinstance(value, float):
|
|
|
- return value
|
|
|
+ if isinstance(value, bool):
|
|
|
+ raise ValueError("value must be a float")
|
|
|
+ if isinstance(value, (int, float)):
|
|
|
+ return float(value)
|
|
|
if isinstance(value, str) and value.strip() == "":
|
|
|
return None
|
|
|
try:
|
|
|
@@ -210,7 +448,7 @@ class Variable:
|
|
|
raise ValueError(f"value must be one of: {', '.join(self.options)}")
|
|
|
return val
|
|
|
|
|
|
- def _convert_url(self, value: Any) -> str:
|
|
|
+ def _convert_url(self, value: Any) -> str | None:
|
|
|
val = str(value).strip()
|
|
|
if not val:
|
|
|
return None
|
|
|
@@ -219,213 +457,241 @@ class Variable:
|
|
|
raise ValueError("value must be a valid URL (include scheme and host)")
|
|
|
return val
|
|
|
|
|
|
- def _convert_email(self, value: Any) -> str:
|
|
|
+ def _convert_email(self, value: Any) -> str | None:
|
|
|
val = str(value).strip()
|
|
|
if not val:
|
|
|
return None
|
|
|
try:
|
|
|
- # Validate email using RFC 5321/5322 compliant parser
|
|
|
validated = validate_email(val, check_deliverability=False)
|
|
|
return validated.normalized
|
|
|
except EmailNotValidError as exc:
|
|
|
raise ValueError(f"value must be a valid email address: {exc}") from exc
|
|
|
|
|
|
+ def _validate_slider_value(self, value: int) -> None:
|
|
|
+ if not self.config.slider:
|
|
|
+ return
|
|
|
+
|
|
|
+ min_value = self.config.min
|
|
|
+ max_value = self.config.max
|
|
|
+ if min_value is None or max_value is None:
|
|
|
+ return
|
|
|
+
|
|
|
+ if value < min_value:
|
|
|
+ raise ValueError(f"value must be at least {min_value}")
|
|
|
+ if value > max_value:
|
|
|
+ raise ValueError(f"value must be at most {max_value}")
|
|
|
+
|
|
|
+ step = self.config.step if self.config.step is not None else 1
|
|
|
+ if step > 0 and (value - min_value) % step != 0:
|
|
|
+ raise ValueError(f"value must align with step {step} starting at {min_value}")
|
|
|
+
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
|
- """Serialize Variable to a dictionary for storage."""
|
|
|
- result = {}
|
|
|
+ result: dict[str, Any] = {}
|
|
|
|
|
|
- # Always include type
|
|
|
if self.type:
|
|
|
result["type"] = self.type
|
|
|
|
|
|
- # Include value/default if not None
|
|
|
if self.value is not None:
|
|
|
result["default"] = self.value
|
|
|
|
|
|
- # Include string fields if truthy
|
|
|
for field in ("description", "prompt", "extra", "origin"):
|
|
|
if value := getattr(self, field):
|
|
|
result[field] = value
|
|
|
|
|
|
- # Include boolean/list fields if truthy (but empty list is OK for options)
|
|
|
- if self.sensitive:
|
|
|
- result["sensitive"] = True
|
|
|
- if self.autogenerated:
|
|
|
- result["autogenerated"] = True
|
|
|
- # Only include length if not default
|
|
|
- if self.autogenerated_length != DEFAULT_AUTOGENERATED_LENGTH:
|
|
|
- result["autogenerated_length"] = self.autogenerated_length
|
|
|
- # Include base64 flag if enabled
|
|
|
- if self.autogenerated_base64:
|
|
|
- result["autogenerated_base64"] = True
|
|
|
if self.required:
|
|
|
result["required"] = True
|
|
|
- if self.options is not None: # Allow empty list
|
|
|
- result["options"] = self.options
|
|
|
+ config_dict = self._serialize_config()
|
|
|
+ if config_dict:
|
|
|
+ result["config"] = config_dict
|
|
|
|
|
|
- # Store dependencies (single value if only one, list otherwise)
|
|
|
if self.needs:
|
|
|
result["needs"] = self.needs[0] if len(self.needs) == 1 else self.needs
|
|
|
|
|
|
return result
|
|
|
|
|
|
- def get_display_value(self, mask_sensitive: bool = True, max_length: int = 30, show_none: bool = True) -> str:
|
|
|
- """Get formatted display value with optional masking and truncation.
|
|
|
+ def _serialize_config(self) -> dict[str, Any]:
|
|
|
+ if not self.config or self.config.is_empty():
|
|
|
+ return {}
|
|
|
+
|
|
|
+ config_dict: dict[str, Any] = {}
|
|
|
+ value_fields = {
|
|
|
+ "placeholder": self.config.placeholder,
|
|
|
+ "unit": self.config.unit,
|
|
|
+ "options": self.config.options,
|
|
|
+ }
|
|
|
+ for field_name, field_value in value_fields.items():
|
|
|
+ if field_value:
|
|
|
+ config_dict[field_name] = field_value
|
|
|
+
|
|
|
+ if self.config.textarea:
|
|
|
+ config_dict["textarea"] = True
|
|
|
+ if self.config.slider:
|
|
|
+ config_dict["slider"] = True
|
|
|
+
|
|
|
+ for field_name in ("min", "max", "step"):
|
|
|
+ field_value = getattr(self.config, field_name)
|
|
|
+ if field_value is not None:
|
|
|
+ config_dict[field_name] = field_value
|
|
|
+
|
|
|
+ autogenerated_dict = self._serialize_autogenerated_config()
|
|
|
+ if autogenerated_dict is not None:
|
|
|
+ config_dict["autogenerated"] = autogenerated_dict
|
|
|
|
|
|
- Args:
|
|
|
- mask_sensitive: If True, mask sensitive values with asterisks
|
|
|
- max_length: Maximum length before truncation (0 = no limit)
|
|
|
- show_none: If True, display "(none)" for None values instead of empty string
|
|
|
+ return config_dict
|
|
|
|
|
|
- Returns:
|
|
|
- Formatted string representation of the value
|
|
|
- """
|
|
|
+ def _serialize_autogenerated_config(self) -> bool | dict[str, Any] | None:
|
|
|
+ autogenerated = self.config.autogenerated
|
|
|
+ if not autogenerated:
|
|
|
+ return None
|
|
|
+ if self._is_default_character_autogenerated(autogenerated):
|
|
|
+ return True
|
|
|
+
|
|
|
+ autogenerated_dict = {"kind": autogenerated.kind}
|
|
|
+ for field_name in ("length", "characters", "bytes"):
|
|
|
+ field_value = getattr(autogenerated, field_name)
|
|
|
+ if field_value is not None:
|
|
|
+ autogenerated_dict[field_name] = field_value
|
|
|
+ return autogenerated_dict
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _is_default_character_autogenerated(autogenerated: SecretAutogeneratedConfig) -> bool:
|
|
|
+ return (
|
|
|
+ autogenerated.kind == SECRET_AUTOGENERATED_KIND_CHARACTERS
|
|
|
+ and autogenerated.length is None
|
|
|
+ and autogenerated.characters is None
|
|
|
+ and autogenerated.bytes is None
|
|
|
+ )
|
|
|
+
|
|
|
+ def get_display_value(self, mask_secret: bool = True, max_length: int = 30, show_none: bool = True) -> str:
|
|
|
if self.value is None or self.value == "":
|
|
|
- # Show (*auto) for autogenerated variables instead of (none)
|
|
|
if self.autogenerated:
|
|
|
return "[dim](*auto)[/dim]" if show_none else ""
|
|
|
return "[dim](none)[/dim]" if show_none else ""
|
|
|
|
|
|
- # Mask sensitive values
|
|
|
- if self.sensitive and mask_sensitive:
|
|
|
+ if self.is_secret() and mask_secret:
|
|
|
return "********"
|
|
|
|
|
|
- # Convert to string
|
|
|
display = str(self.value)
|
|
|
-
|
|
|
- # Truncate if needed
|
|
|
if max_length > 0 and len(display) > max_length:
|
|
|
return display[: max_length - 3] + "..."
|
|
|
-
|
|
|
return display
|
|
|
|
|
|
def get_normalized_default(self) -> Any:
|
|
|
- """Get normalized default value suitable for prompts and display."""
|
|
|
- try:
|
|
|
- typed = self.convert(self.value)
|
|
|
- except Exception:
|
|
|
- typed = self.value
|
|
|
-
|
|
|
- # Autogenerated: return display hint
|
|
|
+ typed = self._coerce_default_value()
|
|
|
if self.autogenerated and not typed:
|
|
|
return "*auto"
|
|
|
+ normalizers = {
|
|
|
+ "enum": self._normalize_enum_default,
|
|
|
+ "bool": self._normalize_bool_default,
|
|
|
+ "int": lambda value: self._normalize_numeric_default(value, int),
|
|
|
+ "float": lambda value: self._normalize_numeric_default(value, float),
|
|
|
+ }
|
|
|
+ normalizer = normalizers.get(self.type, self._normalize_string_default)
|
|
|
+ return normalizer(typed)
|
|
|
|
|
|
- # Type-specific handlers
|
|
|
- if self.type == "enum":
|
|
|
- return (
|
|
|
- typed
|
|
|
- if not self.options
|
|
|
- else (self.options[0] if typed is None or str(typed) not in self.options else str(typed))
|
|
|
- )
|
|
|
-
|
|
|
- if self.type == "bool":
|
|
|
- return typed if isinstance(typed, bool) else (None if typed is None else bool(typed))
|
|
|
+ def _coerce_default_value(self) -> Any:
|
|
|
+ try:
|
|
|
+ return self.convert(self.value)
|
|
|
+ except Exception:
|
|
|
+ return self.value
|
|
|
+
|
|
|
+ def _normalize_enum_default(self, typed: Any) -> Any:
|
|
|
+ if not self.options:
|
|
|
+ return typed
|
|
|
+ if typed is None or str(typed) not in self.options:
|
|
|
+ return self.options[0]
|
|
|
+ return str(typed)
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _normalize_bool_default(typed: Any) -> bool | None:
|
|
|
+ if isinstance(typed, bool):
|
|
|
+ return typed
|
|
|
+ if typed is None:
|
|
|
+ return None
|
|
|
+ return bool(typed)
|
|
|
|
|
|
- if self.type == "int":
|
|
|
- try:
|
|
|
- return int(typed) if typed not in (None, "") else None
|
|
|
- except Exception:
|
|
|
- return None
|
|
|
+ @staticmethod
|
|
|
+ def _normalize_numeric_default(typed: Any, caster) -> Any:
|
|
|
+ try:
|
|
|
+ return caster(typed) if typed not in (None, "") else None
|
|
|
+ except Exception:
|
|
|
+ return None
|
|
|
|
|
|
- # Default: return string or None
|
|
|
+ @staticmethod
|
|
|
+ def _normalize_string_default(typed: Any) -> str | None:
|
|
|
return None if typed is None else str(typed)
|
|
|
|
|
|
def get_prompt_text(self) -> str:
|
|
|
- """Get formatted prompt text for interactive input.
|
|
|
-
|
|
|
- Returns:
|
|
|
- Prompt text with optional type hints and descriptions
|
|
|
- """
|
|
|
prompt_text = self.prompt or self.description or self.name
|
|
|
-
|
|
|
- # Add type hint for semantic types if there's a default
|
|
|
if self.value is not None and self.type in ["email", "url"]:
|
|
|
prompt_text += f" ({self.type})"
|
|
|
-
|
|
|
return prompt_text
|
|
|
|
|
|
def get_validation_hint(self) -> str | None:
|
|
|
- """Get validation hint for prompts (e.g., enum options).
|
|
|
-
|
|
|
- Returns:
|
|
|
- Formatted hint string or None if no hint needed
|
|
|
- """
|
|
|
hints = []
|
|
|
|
|
|
- # Add enum options
|
|
|
if self.type == "enum" and self.options:
|
|
|
hints.append(f"Options: {', '.join(self.options)}")
|
|
|
|
|
|
- # Add extra help text
|
|
|
+ if self.type == "int" and self.config.slider and self.config.min is not None and self.config.max is not None:
|
|
|
+ slider_hint = f"Range: {self.config.min}..{self.config.max}"
|
|
|
+ step = self.config.step if self.config.step is not None else 1
|
|
|
+ if step != 1:
|
|
|
+ slider_hint += f", step {step}"
|
|
|
+ if self.config.unit:
|
|
|
+ slider_hint += f" {self.config.unit}"
|
|
|
+ hints.append(slider_hint)
|
|
|
+ elif self.type == "int" and self.config.unit:
|
|
|
+ hints.append(f"Unit: {self.config.unit}")
|
|
|
+
|
|
|
+ if self.autogenerated:
|
|
|
+ if self.autogenerated_base64:
|
|
|
+ bytes_value = (
|
|
|
+ self.autogenerated_config.bytes_or_default()
|
|
|
+ if self.autogenerated_config
|
|
|
+ else DEFAULT_AUTOGENERATED_BYTES
|
|
|
+ )
|
|
|
+ hints.append(f"Auto-generated base64 secret ({bytes_value} bytes) if empty")
|
|
|
+ else:
|
|
|
+ length = (
|
|
|
+ self.autogenerated_config.length_or_default()
|
|
|
+ if self.autogenerated_config
|
|
|
+ else DEFAULT_AUTOGENERATED_LENGTH
|
|
|
+ )
|
|
|
+ hints.append(f"Auto-generated secret (length {length}) if empty")
|
|
|
+
|
|
|
if self.extra:
|
|
|
hints.append(self.extra)
|
|
|
|
|
|
return " — ".join(hints) if hints else None
|
|
|
|
|
|
def is_required(self) -> bool:
|
|
|
- """Check if this variable requires a value (cannot be empty/None).
|
|
|
-
|
|
|
- A variable is considered required ONLY if it has an explicit 'required: true' flag.
|
|
|
- All other variables are optional by default.
|
|
|
-
|
|
|
- Returns:
|
|
|
- True if the variable must have a non-empty value, False otherwise
|
|
|
- """
|
|
|
- # Only explicitly marked required variables are required
|
|
|
- # Autogenerated variables can still be empty (will be generated later)
|
|
|
return self.required and not self.autogenerated
|
|
|
|
|
|
def get_parent(self) -> VariableSection | None:
|
|
|
- """Get the parent VariableSection that contains this variable.
|
|
|
-
|
|
|
- Returns:
|
|
|
- The parent VariableSection if set, None otherwise
|
|
|
- """
|
|
|
return self.parent_section
|
|
|
|
|
|
def clone(self, update: dict[str, Any] | None = None) -> Variable:
|
|
|
- """Create a deep copy of the variable with optional field updates.
|
|
|
-
|
|
|
- This is more efficient than converting to dict and back when copying variables.
|
|
|
-
|
|
|
- Args:
|
|
|
- update: Optional dictionary of field updates to apply to the clone
|
|
|
-
|
|
|
- Returns:
|
|
|
- New Variable instance with copied data
|
|
|
-
|
|
|
- Example:
|
|
|
- var2 = var1.clone(update={'origin': 'template'})
|
|
|
- """
|
|
|
data = {
|
|
|
"name": self.name,
|
|
|
"type": self.type,
|
|
|
"value": self.value,
|
|
|
"description": self.description,
|
|
|
"prompt": self.prompt,
|
|
|
- "options": self.options.copy() if self.options else None,
|
|
|
"origin": self.origin,
|
|
|
- "sensitive": self.sensitive,
|
|
|
"extra": self.extra,
|
|
|
- "autogenerated": self.autogenerated,
|
|
|
- "autogenerated_length": self.autogenerated_length,
|
|
|
- "autogenerated_base64": self.autogenerated_base64,
|
|
|
"required": self.required,
|
|
|
"original_value": self.original_value,
|
|
|
"needs": self.needs.copy() if self.needs else None,
|
|
|
"parent_section": self.parent_section,
|
|
|
+ "config": self.config.clone() if self.config else None,
|
|
|
}
|
|
|
|
|
|
- # Apply updates if provided
|
|
|
if update:
|
|
|
data.update(update)
|
|
|
|
|
|
- # Create new variable
|
|
|
cloned = Variable(data)
|
|
|
-
|
|
|
- # Preserve explicit fields from original, and add any update keys
|
|
|
cloned._explicit_fields = self._explicit_fields.copy()
|
|
|
if update:
|
|
|
cloned._explicit_fields.update(update.keys())
|
|
|
-
|
|
|
return cloned
|