|
|
@@ -1,12 +1,11 @@
|
|
|
from __future__ import annotations
|
|
|
|
|
|
-import logging
|
|
|
from collections import defaultdict
|
|
|
-from typing import Any
|
|
|
+from typing import Any, Dict, List, Optional, Set, Union
|
|
|
+import logging
|
|
|
|
|
|
-from ..exceptions import VariableError, VariableValidationError
|
|
|
from .variable import Variable
|
|
|
-from .variable_section import VariableSection
|
|
|
+from .section import VariableSection
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@@ -40,11 +39,11 @@ class VariableCollection:
|
|
|
if not isinstance(spec, dict):
|
|
|
raise ValueError("Spec must be a dictionary")
|
|
|
|
|
|
- self._sections: dict[str, VariableSection] = {}
|
|
|
+ self._sections: Dict[str, VariableSection] = {}
|
|
|
# NOTE: The _variable_map provides a flat, O(1) lookup for any variable by its name,
|
|
|
# avoiding the need to iterate through sections. It stores references to the same
|
|
|
# Variable objects contained in the _set structure.
|
|
|
- self._variable_map: dict[str, Variable] = {}
|
|
|
+ self._variable_map: Dict[str, Variable] = {}
|
|
|
self._initialize_sections(spec)
|
|
|
# Validate dependencies after all sections are loaded
|
|
|
self._validate_dependencies()
|
|
|
@@ -112,7 +111,7 @@ class VariableCollection:
|
|
|
|
|
|
def _validate_unique_variable_names(self) -> None:
|
|
|
"""Validate that all variable names are unique across all sections."""
|
|
|
- var_to_sections: dict[str, list[str]] = defaultdict(list)
|
|
|
+ var_to_sections: Dict[str, List[str]] = defaultdict(list)
|
|
|
|
|
|
# Build mapping of variable names to sections
|
|
|
for section_key, section in self._sections.items():
|
|
|
@@ -139,7 +138,7 @@ class VariableCollection:
|
|
|
)
|
|
|
error_msg = "\n".join(errors)
|
|
|
logger.error(error_msg)
|
|
|
- raise VariableValidationError("__collection__", error_msg)
|
|
|
+ raise ValueError(error_msg)
|
|
|
|
|
|
def _validate_section_toggle(self, section: VariableSection) -> None:
|
|
|
"""Validate that toggle variable is of type bool if it exists.
|
|
|
@@ -162,13 +161,13 @@ class VariableCollection:
|
|
|
return
|
|
|
|
|
|
if toggle_var.type != "bool":
|
|
|
- raise VariableValidationError(
|
|
|
- section.toggle,
|
|
|
- f"Section '{section.key}' toggle variable must be type 'bool', but is type '{toggle_var.type}'",
|
|
|
+ raise ValueError(
|
|
|
+ f"Section '{section.key}' toggle variable '{section.toggle}' must be type 'bool', "
|
|
|
+ f"but is type '{toggle_var.type}'"
|
|
|
)
|
|
|
|
|
|
@staticmethod
|
|
|
- def _parse_need(need_str: str) -> tuple[str, Any | None]:
|
|
|
+ def _parse_need(need_str: str) -> tuple[str, Optional[Any]]:
|
|
|
"""Parse a need string into variable name and expected value(s).
|
|
|
|
|
|
Supports three formats:
|
|
|
@@ -206,48 +205,6 @@ class VariableCollection:
|
|
|
# Old format: section name (backwards compatibility)
|
|
|
return (need_str.strip(), None)
|
|
|
|
|
|
- def _check_section_need(self, section_name: str) -> bool:
|
|
|
- """Check if a section dependency is satisfied."""
|
|
|
- section = self._sections.get(section_name)
|
|
|
- if not section:
|
|
|
- logger.warning(f"Need references missing section '{section_name}'")
|
|
|
- return False
|
|
|
- return section.is_enabled()
|
|
|
-
|
|
|
- def _check_value_match(
|
|
|
- self, actual_value: Any, expected: Any, var_type: str
|
|
|
- ) -> bool:
|
|
|
- """Check if actual value matches expected value based on variable type."""
|
|
|
- if var_type == "bool":
|
|
|
- return bool(actual_value) == bool(expected)
|
|
|
- return actual_value is not None and str(actual_value) == str(expected)
|
|
|
-
|
|
|
- def _check_variable_need(
|
|
|
- self, var_name: str, expected_value: Any, variable
|
|
|
- ) -> bool:
|
|
|
- """Check if a variable dependency is satisfied."""
|
|
|
- try:
|
|
|
- actual_value = variable.convert(variable.value)
|
|
|
-
|
|
|
- # Handle multiple expected values
|
|
|
- if isinstance(expected_value, list):
|
|
|
- for expected in expected_value:
|
|
|
- expected_converted = variable.convert(expected)
|
|
|
- if self._check_value_match(
|
|
|
- actual_value, expected_converted, variable.type
|
|
|
- ):
|
|
|
- return True
|
|
|
- return False
|
|
|
-
|
|
|
- # Single expected value
|
|
|
- expected_converted = variable.convert(expected_value)
|
|
|
- return self._check_value_match(
|
|
|
- actual_value, expected_converted, variable.type
|
|
|
- )
|
|
|
- except Exception as e:
|
|
|
- logger.debug(f"Failed to compare need for '{var_name}': {e}")
|
|
|
- return False
|
|
|
-
|
|
|
def _is_need_satisfied(self, need_str: str) -> bool:
|
|
|
"""Check if a single need condition is satisfied.
|
|
|
|
|
|
@@ -259,17 +216,58 @@ class VariableCollection:
|
|
|
"""
|
|
|
var_or_section, expected_value = self._parse_need(need_str)
|
|
|
|
|
|
- # Old format: section name check
|
|
|
if expected_value is None:
|
|
|
- return self._check_section_need(var_or_section)
|
|
|
-
|
|
|
- # New format: variable value check
|
|
|
- variable = self._variable_map.get(var_or_section)
|
|
|
- if not variable:
|
|
|
- logger.warning(f"Need references missing variable '{var_or_section}'")
|
|
|
- return False
|
|
|
+ # Old format: check if section is enabled (backwards compatibility)
|
|
|
+ section = self._sections.get(var_or_section)
|
|
|
+ if not section:
|
|
|
+ logger.warning(f"Need references missing section '{var_or_section}'")
|
|
|
+ return False
|
|
|
+ return section.is_enabled()
|
|
|
+ else:
|
|
|
+ # New format: check if variable has expected value(s)
|
|
|
+ variable = self._variable_map.get(var_or_section)
|
|
|
+ if not variable:
|
|
|
+ logger.warning(f"Need references missing variable '{var_or_section}'")
|
|
|
+ return False
|
|
|
|
|
|
- return self._check_variable_need(var_or_section, expected_value, variable)
|
|
|
+ # Convert actual value for comparison
|
|
|
+ try:
|
|
|
+ actual_value = variable.convert(variable.value)
|
|
|
+
|
|
|
+ # Handle multiple expected values (comma-separated in needs)
|
|
|
+ if isinstance(expected_value, list):
|
|
|
+ # Check if actual value matches any of the expected values
|
|
|
+ for expected in expected_value:
|
|
|
+ expected_converted = variable.convert(expected)
|
|
|
+
|
|
|
+ # Handle boolean comparisons specially
|
|
|
+ if variable.type == "bool":
|
|
|
+ if bool(actual_value) == bool(expected_converted):
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ # String comparison for other types
|
|
|
+ if actual_value is not None and str(actual_value) == str(
|
|
|
+ expected_converted
|
|
|
+ ):
|
|
|
+ return True
|
|
|
+ return False # None of the expected values matched
|
|
|
+ else:
|
|
|
+ # Single expected value (original behavior)
|
|
|
+ expected_converted = variable.convert(expected_value)
|
|
|
+
|
|
|
+ # Handle boolean comparisons specially
|
|
|
+ if variable.type == "bool":
|
|
|
+ return bool(actual_value) == bool(expected_converted)
|
|
|
+
|
|
|
+ # String comparison for other types
|
|
|
+ return (
|
|
|
+ str(actual_value) == str(expected_converted)
|
|
|
+ if actual_value is not None
|
|
|
+ else False
|
|
|
+ )
|
|
|
+ except Exception as e:
|
|
|
+ logger.debug(f"Failed to compare need '{need_str}': {e}")
|
|
|
+ return False
|
|
|
|
|
|
def _validate_dependencies(self) -> None:
|
|
|
"""Validate section dependencies for cycles and missing references.
|
|
|
@@ -285,35 +283,33 @@ class VariableCollection:
|
|
|
if expected_value is None:
|
|
|
# Old format: validate section exists
|
|
|
if var_or_section not in self._sections:
|
|
|
- raise VariableError(
|
|
|
+ raise ValueError(
|
|
|
f"Section '{section_key}' depends on '{var_or_section}', but '{var_or_section}' does not exist"
|
|
|
)
|
|
|
- # New format: validate variable exists
|
|
|
- # NOTE: We only warn here, not raise an error, because the variable might be
|
|
|
- # added later during merge with module spec. The actual runtime check in
|
|
|
- # _is_need_satisfied() will handle missing variables gracefully.
|
|
|
- elif (
|
|
|
- expected_value is not None
|
|
|
- and var_or_section not in self._variable_map
|
|
|
- ):
|
|
|
- logger.debug(
|
|
|
- f"Section '{section_key}' has need '{dep}', but variable '{var_or_section}' "
|
|
|
- f"not found (might be added during merge)"
|
|
|
- )
|
|
|
+ else:
|
|
|
+ # New format: validate variable exists
|
|
|
+ # NOTE: We only warn here, not raise an error, because the variable might be
|
|
|
+ # added later during merge with module spec. The actual runtime check in
|
|
|
+ # _is_need_satisfied() will handle missing variables gracefully.
|
|
|
+ if var_or_section not in self._variable_map:
|
|
|
+ logger.debug(
|
|
|
+ f"Section '{section_key}' has need '{dep}', but variable '{var_or_section}' "
|
|
|
+ f"not found (might be added during merge)"
|
|
|
+ )
|
|
|
|
|
|
# Check for missing dependencies in variables
|
|
|
for var_name, variable in self._variable_map.items():
|
|
|
for dep in variable.needs:
|
|
|
dep_var, expected_value = self._parse_need(dep)
|
|
|
- # Only validate new format
|
|
|
- if expected_value is not None and dep_var not in self._variable_map:
|
|
|
- # NOTE: We only warn here, not raise an error, because the variable might be
|
|
|
- # added later during merge with module spec. The actual runtime check in
|
|
|
- # _is_need_satisfied() will handle missing variables gracefully.
|
|
|
- logger.debug(
|
|
|
- f"Variable '{var_name}' has need '{dep}', but variable '{dep_var}' "
|
|
|
- f"not found (might be added during merge)"
|
|
|
- )
|
|
|
+ if expected_value is not None: # Only validate new format
|
|
|
+ if dep_var not in self._variable_map:
|
|
|
+ # NOTE: We only warn here, not raise an error, because the variable might be
|
|
|
+ # added later during merge with module spec. The actual runtime check in
|
|
|
+ # _is_need_satisfied() will handle missing variables gracefully.
|
|
|
+ logger.debug(
|
|
|
+ f"Variable '{var_name}' has need '{dep}', but variable '{dep_var}' "
|
|
|
+ f"not found (might be added during merge)"
|
|
|
+ )
|
|
|
|
|
|
# Check for circular dependencies using depth-first search
|
|
|
# Note: Only checks section-level dependencies in old format (section names)
|
|
|
@@ -335,7 +331,7 @@ class VariableCollection:
|
|
|
if has_cycle(dep_name):
|
|
|
return True
|
|
|
elif dep_name in rec_stack:
|
|
|
- raise VariableError(
|
|
|
+ raise ValueError(
|
|
|
f"Circular dependency detected: '{section_key}' depends on '{dep_name}', "
|
|
|
f"which creates a cycle"
|
|
|
)
|
|
|
@@ -418,14 +414,10 @@ class VariableCollection:
|
|
|
"""
|
|
|
reset_vars = []
|
|
|
|
|
|
- # Pre-compute satisfaction states to avoid repeated lookups
|
|
|
- section_states = {
|
|
|
- key: (self.is_section_satisfied(key), section.is_enabled())
|
|
|
- for key, section in self._sections.items()
|
|
|
- }
|
|
|
-
|
|
|
for section_key, section in self._sections.items():
|
|
|
- section_satisfied, is_enabled = section_states[section_key]
|
|
|
+ # Check if section dependencies are satisfied
|
|
|
+ section_satisfied = self.is_section_satisfied(section_key)
|
|
|
+ is_enabled = section.is_enabled()
|
|
|
|
|
|
for var_name, variable in section.variables.items():
|
|
|
# Only process bool variables
|
|
|
@@ -436,23 +428,24 @@ class VariableCollection:
|
|
|
var_satisfied = self.is_variable_satisfied(var_name)
|
|
|
|
|
|
# If section is disabled OR variable dependencies aren't met, reset to False
|
|
|
- # Only reset if current value is not already False and not CLI-provided
|
|
|
- if (
|
|
|
- (not section_satisfied or not is_enabled or not var_satisfied)
|
|
|
- and variable.value is not False
|
|
|
- and variable.origin != "cli"
|
|
|
- ):
|
|
|
- # Store original value if not already stored (for display purposes)
|
|
|
- if not hasattr(variable, "_original_disabled"):
|
|
|
- variable._original_disabled = variable.value
|
|
|
-
|
|
|
- variable.value = False
|
|
|
- reset_vars.append(var_name)
|
|
|
- logger.debug(
|
|
|
- f"Reset disabled bool variable '{var_name}' to False "
|
|
|
- f"(section satisfied: {section_satisfied}, enabled: {is_enabled}, "
|
|
|
- f"var satisfied: {var_satisfied})"
|
|
|
- )
|
|
|
+ if not section_satisfied or not is_enabled or not var_satisfied:
|
|
|
+ # Only reset if current value is not already False
|
|
|
+ if variable.value is not False:
|
|
|
+ # Don't reset CLI-provided variables - they'll be validated later
|
|
|
+ if variable.origin == "cli":
|
|
|
+ continue
|
|
|
+
|
|
|
+ # Store original value if not already stored (for display purposes)
|
|
|
+ if not hasattr(variable, "_original_disabled"):
|
|
|
+ variable._original_disabled = variable.value
|
|
|
+
|
|
|
+ variable.value = False
|
|
|
+ reset_vars.append(var_name)
|
|
|
+ logger.debug(
|
|
|
+ f"Reset disabled bool variable '{var_name}' to False "
|
|
|
+ f"(section satisfied: {section_satisfied}, enabled: {is_enabled}, "
|
|
|
+ f"var satisfied: {var_satisfied})"
|
|
|
+ )
|
|
|
|
|
|
return reset_vars
|
|
|
|
|
|
@@ -505,7 +498,7 @@ class VariableCollection:
|
|
|
for section in self._sections.values():
|
|
|
section.sort_variables(self._is_need_satisfied)
|
|
|
|
|
|
- def _topological_sort(self) -> list[str]:
|
|
|
+ def _topological_sort(self) -> List[str]:
|
|
|
"""Perform topological sort on sections based on dependencies using Kahn's algorithm."""
|
|
|
in_degree = {key: len(section.needs) for key, section in self._sections.items()}
|
|
|
queue = [key for key, degree in in_degree.items() if degree == 0]
|
|
|
@@ -532,52 +525,11 @@ class VariableCollection:
|
|
|
|
|
|
return result
|
|
|
|
|
|
- def iter_active_sections(
|
|
|
- self,
|
|
|
- include_disabled: bool = False,
|
|
|
- include_unsatisfied: bool = False,
|
|
|
- ):
|
|
|
- """Iterate over sections respecting dependencies and toggles.
|
|
|
-
|
|
|
- This is the centralized iterator for processing sections with proper
|
|
|
- filtering. It eliminates duplicate iteration logic across the codebase.
|
|
|
-
|
|
|
- Args:
|
|
|
- include_disabled: If True, include sections that are disabled via toggle
|
|
|
- include_unsatisfied: If True, include sections with unsatisfied dependencies
|
|
|
-
|
|
|
- Yields:
|
|
|
- Tuple of (section_key, section) for each active section
|
|
|
-
|
|
|
- Examples:
|
|
|
- # Only enabled sections with satisfied dependencies (default)
|
|
|
- for key, section in variables.iter_active_sections():
|
|
|
- process(section)
|
|
|
-
|
|
|
- # Include disabled sections but skip unsatisfied dependencies
|
|
|
- for key, section in variables.iter_active_sections(include_disabled=True):
|
|
|
- process(section)
|
|
|
- """
|
|
|
- for section_key, section in self._sections.items():
|
|
|
- # Check dependencies first
|
|
|
- if not include_unsatisfied and not self.is_section_satisfied(section_key):
|
|
|
- logger.debug(
|
|
|
- f"Skipping section '{section_key}' - dependencies not satisfied"
|
|
|
- )
|
|
|
- continue
|
|
|
-
|
|
|
- # Check enabled status
|
|
|
- if not include_disabled and not section.is_enabled():
|
|
|
- logger.debug(f"Skipping section '{section_key}' - section is disabled")
|
|
|
- continue
|
|
|
-
|
|
|
- yield section_key, section
|
|
|
-
|
|
|
- def get_sections(self) -> dict[str, VariableSection]:
|
|
|
+ def get_sections(self) -> Dict[str, VariableSection]:
|
|
|
"""Get all sections in the collection."""
|
|
|
return self._sections.copy()
|
|
|
|
|
|
- def get_section(self, key: str) -> VariableSection | None:
|
|
|
+ def get_section(self, key: str) -> Optional[VariableSection]:
|
|
|
"""Get a specific section by its key."""
|
|
|
return self._sections.get(key)
|
|
|
|
|
|
@@ -634,7 +586,7 @@ class VariableCollection:
|
|
|
|
|
|
return satisfied_values
|
|
|
|
|
|
- def get_sensitive_variables(self) -> dict[str, Any]:
|
|
|
+ def get_sensitive_variables(self) -> Dict[str, Any]:
|
|
|
"""Get only the sensitive variables with their values."""
|
|
|
return {
|
|
|
name: var.value
|
|
|
@@ -725,32 +677,23 @@ class VariableCollection:
|
|
|
|
|
|
return successful
|
|
|
|
|
|
- def _collect_unmet_needs(self, section, var_name: str, variable) -> set[str]:
|
|
|
- """Collect unmet needs for a variable."""
|
|
|
- section_satisfied = self.is_section_satisfied(section.key)
|
|
|
- var_satisfied = self.is_variable_satisfied(var_name)
|
|
|
- unmet_needs = set()
|
|
|
-
|
|
|
- if not section_satisfied:
|
|
|
- for need in section.needs:
|
|
|
- if not self._is_need_satisfied(need):
|
|
|
- unmet_needs.add(need)
|
|
|
- if not var_satisfied:
|
|
|
- for need in variable.needs:
|
|
|
- if not self._is_need_satisfied(need):
|
|
|
- unmet_needs.add(need)
|
|
|
-
|
|
|
- return unmet_needs
|
|
|
-
|
|
|
- def _validate_cli_bool_variables(self) -> list[str]:
|
|
|
- """Validate CLI-provided bool variables with unsatisfied dependencies."""
|
|
|
+ def validate_all(self) -> None:
|
|
|
+ """Validate all variables in the collection.
|
|
|
+
|
|
|
+ Validates:
|
|
|
+ - All variables in enabled sections with satisfied dependencies
|
|
|
+ - Required variables even if their section is disabled (but dependencies must be satisfied)
|
|
|
+ - CLI-provided bool variables with unsatisfied dependencies
|
|
|
+ """
|
|
|
errors: list[str] = []
|
|
|
|
|
|
+ # First, check for CLI-provided bool variables with unsatisfied dependencies
|
|
|
for section_key, section in self._sections.items():
|
|
|
section_satisfied = self.is_section_satisfied(section_key)
|
|
|
is_enabled = section.is_enabled()
|
|
|
|
|
|
for var_name, variable in section.variables.items():
|
|
|
+ # Check CLI-provided bool variables with unsatisfied dependencies
|
|
|
if (
|
|
|
variable.type == "bool"
|
|
|
and variable.origin == "cli"
|
|
|
@@ -759,9 +702,17 @@ class VariableCollection:
|
|
|
var_satisfied = self.is_variable_satisfied(var_name)
|
|
|
|
|
|
if not section_satisfied or not is_enabled or not var_satisfied:
|
|
|
- unmet_needs = self._collect_unmet_needs(
|
|
|
- section, var_name, variable
|
|
|
- )
|
|
|
+ # Build error message with unmet needs (use set to avoid duplicates)
|
|
|
+ unmet_needs = set()
|
|
|
+ if not section_satisfied:
|
|
|
+ for need in section.needs:
|
|
|
+ if not self._is_need_satisfied(need):
|
|
|
+ unmet_needs.add(need)
|
|
|
+ if not var_satisfied:
|
|
|
+ for need in variable.needs:
|
|
|
+ if not self._is_need_satisfied(need):
|
|
|
+ unmet_needs.add(need)
|
|
|
+
|
|
|
needs_str = (
|
|
|
", ".join(sorted(unmet_needs))
|
|
|
if unmet_needs
|
|
|
@@ -771,102 +722,69 @@ class VariableCollection:
|
|
|
f"{section.key}.{var_name} (set via CLI to {variable.value} but requires: {needs_str})"
|
|
|
)
|
|
|
|
|
|
- return errors
|
|
|
-
|
|
|
- def _validate_variable(self, section, var_name: str, variable) -> str | None:
|
|
|
- """Validate a single variable and return error message if invalid."""
|
|
|
- try:
|
|
|
- # Skip autogenerated variables when empty
|
|
|
- if variable.autogenerated and not variable.value:
|
|
|
- return None
|
|
|
-
|
|
|
- # Check required fields
|
|
|
- if variable.value is None:
|
|
|
- return self._validate_none_value(section, var_name, variable)
|
|
|
-
|
|
|
- # Validate typed value
|
|
|
- typed = variable.convert(variable.value)
|
|
|
- if variable.type not in ("bool",) and not typed:
|
|
|
- return self._validate_empty_value(section, var_name, variable)
|
|
|
-
|
|
|
- except ValueError as e:
|
|
|
- return f"{section.key}.{var_name} (invalid format: {e})"
|
|
|
-
|
|
|
- return None
|
|
|
-
|
|
|
- def _validate_none_value(self, section, var_name: str, variable) -> str | None:
|
|
|
- """Validate when variable value is None."""
|
|
|
- # Optional variables can be None/empty
|
|
|
- if hasattr(variable, "optional") and variable.optional:
|
|
|
- return None
|
|
|
- if variable.is_required():
|
|
|
- return f"{section.key}.{var_name} (required - no default provided)"
|
|
|
- return None
|
|
|
-
|
|
|
- def _validate_empty_value(self, section, var_name: str, variable) -> str | None:
|
|
|
- """Validate when variable value is empty."""
|
|
|
- msg = f"{section.key}.{var_name}"
|
|
|
- if variable.is_required():
|
|
|
- return f"{msg} (required - cannot be empty)"
|
|
|
- return f"{msg} (empty)"
|
|
|
-
|
|
|
- def _validate_section_variables(self, section_key: str, section) -> list[str]:
|
|
|
- """Validate all variables in a section."""
|
|
|
- errors: list[str] = []
|
|
|
-
|
|
|
- # Skip sections with unsatisfied dependencies
|
|
|
- if not self.is_section_satisfied(section_key):
|
|
|
- logger.debug(
|
|
|
- f"Skipping validation for section '{section_key}' - dependencies not satisfied"
|
|
|
- )
|
|
|
- return errors
|
|
|
-
|
|
|
- is_enabled = section.is_enabled()
|
|
|
-
|
|
|
- if not is_enabled:
|
|
|
- logger.debug(
|
|
|
- f"Section '{section_key}' is disabled - validating only required variables"
|
|
|
- )
|
|
|
-
|
|
|
- # Validate variables in the section
|
|
|
- for var_name, variable in section.variables.items():
|
|
|
- # Skip all variables in disabled sections
|
|
|
- if not is_enabled:
|
|
|
+ # Then validate all other variables
|
|
|
+ for section_key, section in self._sections.items():
|
|
|
+ # Skip sections with unsatisfied dependencies (even for required variables)
|
|
|
+ if not self.is_section_satisfied(section_key):
|
|
|
+ logger.debug(
|
|
|
+ f"Skipping validation for section '{section_key}' - dependencies not satisfied"
|
|
|
+ )
|
|
|
continue
|
|
|
|
|
|
- error = self._validate_variable(section, var_name, variable)
|
|
|
- if error:
|
|
|
- errors.append(error)
|
|
|
-
|
|
|
- return errors
|
|
|
+ # Check if section is enabled
|
|
|
+ is_enabled = section.is_enabled()
|
|
|
|
|
|
- def validate_all(self) -> None:
|
|
|
- """Validate all variables in the collection.
|
|
|
+ if not is_enabled:
|
|
|
+ logger.debug(
|
|
|
+ f"Section '{section_key}' is disabled - validating only required variables"
|
|
|
+ )
|
|
|
|
|
|
- Validates:
|
|
|
- - All variables in enabled sections with satisfied dependencies
|
|
|
- - Required variables even if their section is disabled (but dependencies must be satisfied)
|
|
|
- - CLI-provided bool variables with unsatisfied dependencies
|
|
|
- """
|
|
|
- errors: list[str] = []
|
|
|
+ # Validate variables in the section
|
|
|
+ for var_name, variable in section.variables.items():
|
|
|
+ # Skip all variables (including required ones) in disabled sections
|
|
|
+ # Required variables are only required when their section is actually enabled
|
|
|
+ if not is_enabled:
|
|
|
+ continue
|
|
|
|
|
|
- # First, check for CLI-provided bool variables with unsatisfied dependencies
|
|
|
- errors.extend(self._validate_cli_bool_variables())
|
|
|
+ try:
|
|
|
+ # Skip autogenerated variables when empty
|
|
|
+ if variable.autogenerated and not variable.value:
|
|
|
+ continue
|
|
|
+
|
|
|
+ # Check required fields
|
|
|
+ if variable.value is None:
|
|
|
+ # Optional variables can be None/empty
|
|
|
+ if hasattr(variable, "optional") and variable.optional:
|
|
|
+ continue
|
|
|
+ if variable.is_required():
|
|
|
+ errors.append(
|
|
|
+ f"{section.key}.{var_name} (required - no default provided)"
|
|
|
+ )
|
|
|
+ continue
|
|
|
+
|
|
|
+ # Validate typed value
|
|
|
+ typed = variable.convert(variable.value)
|
|
|
+ if variable.type not in ("bool",) and not typed:
|
|
|
+ msg = f"{section.key}.{var_name}"
|
|
|
+ errors.append(
|
|
|
+ f"{msg} (required - cannot be empty)"
|
|
|
+ if variable.is_required()
|
|
|
+ else f"{msg} (empty)"
|
|
|
+ )
|
|
|
|
|
|
- # Then validate all other variables
|
|
|
- for section_key, section in self._sections.items():
|
|
|
- errors.extend(self._validate_section_variables(section_key, section))
|
|
|
+ except ValueError as e:
|
|
|
+ errors.append(f"{section.key}.{var_name} (invalid format: {e})")
|
|
|
|
|
|
if errors:
|
|
|
error_msg = "Variable validation failed: " + ", ".join(errors)
|
|
|
logger.error(error_msg)
|
|
|
- raise VariableValidationError("__multiple__", ", ".join(errors))
|
|
|
+ raise ValueError(error_msg)
|
|
|
|
|
|
def merge(
|
|
|
self,
|
|
|
- other_spec: dict[str, Any] | VariableCollection,
|
|
|
+ other_spec: Union[Dict[str, Any], "VariableCollection"],
|
|
|
origin: str = "override",
|
|
|
- ) -> VariableCollection:
|
|
|
+ ) -> "VariableCollection":
|
|
|
"""Merge another spec or VariableCollection into this one with precedence tracking.
|
|
|
|
|
|
OPTIMIZED: Works directly on objects without dict conversions for better performance.
|
|
|
@@ -986,10 +904,9 @@ class VariableCollection:
|
|
|
update["needs"] = other_var.needs.copy() if other_var.needs else []
|
|
|
|
|
|
# Special handling for value/default (allow explicit null to clear)
|
|
|
- if (
|
|
|
- "value" in other_var._explicit_fields
|
|
|
- or "default" in other_var._explicit_fields
|
|
|
- ):
|
|
|
+ if "value" in other_var._explicit_fields:
|
|
|
+ update["value"] = other_var.value
|
|
|
+ elif "default" in other_var._explicit_fields:
|
|
|
update["value"] = other_var.value
|
|
|
|
|
|
merged_section.variables[var_name] = self_var.clone(update=update)
|
|
|
@@ -1002,8 +919,8 @@ class VariableCollection:
|
|
|
return merged_section
|
|
|
|
|
|
def filter_to_used(
|
|
|
- self, used_variables: set[str], keep_sensitive: bool = True
|
|
|
- ) -> VariableCollection:
|
|
|
+ self, used_variables: Set[str], keep_sensitive: bool = True
|
|
|
+ ) -> "VariableCollection":
|
|
|
"""Filter collection to only variables that are used (or sensitive).
|
|
|
|
|
|
OPTIMIZED: Works directly on objects without dict conversions for better performance.
|
|
|
@@ -1061,7 +978,7 @@ class VariableCollection:
|
|
|
|
|
|
return filtered
|
|
|
|
|
|
- def get_all_variable_names(self) -> set[str]:
|
|
|
+ def get_all_variable_names(self) -> Set[str]:
|
|
|
"""Get set of all variable names across all sections.
|
|
|
|
|
|
Returns:
|