| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672 |
- import inspect
- import json
- import logging
- import os
- import re
- import yaml
- from django import forms
- from django.conf import settings
- from django.core.files.storage import storages
- from django.core.validators import RegexValidator
- from django.utils import timezone
- from django.utils.functional import classproperty
- from django.utils.translation import gettext as _
- from extras.choices import LogLevelChoices
- from extras.models import ScriptModule
- from ipam.formfields import IPAddressFormField, IPNetworkFormField
- from ipam.validators import MaxPrefixLengthValidator, MinPrefixLengthValidator, prefix_validator
- from utilities.forms import add_blank_choice
- from utilities.forms.fields import DynamicModelChoiceField, DynamicModelMultipleChoiceField
- from utilities.forms.widgets import DatePicker, DateTimePicker
- from .forms import ScriptForm
- __all__ = (
- 'BaseScript',
- 'BooleanVar',
- 'ChoiceVar',
- 'DateVar',
- 'DateTimeVar',
- 'FileVar',
- 'IntegerVar',
- 'DecimalVar',
- 'IPAddressVar',
- 'IPAddressWithMaskVar',
- 'IPNetworkVar',
- 'MultiChoiceVar',
- 'MultiObjectVar',
- 'ObjectVar',
- 'Script',
- 'StringVar',
- 'TextVar',
- 'get_module_and_script',
- )
- #
- # Script variables
- #
- class ScriptVariable:
- """
- Base model for script variables
- """
- form_field = forms.CharField
- def __init__(self, label='', description='', default=None, required=True, widget=None):
- # Initialize field attributes
- if not hasattr(self, 'field_attrs'):
- self.field_attrs = {}
- if label:
- self.field_attrs['label'] = label
- if description:
- self.field_attrs['help_text'] = description
- if default:
- self.field_attrs['initial'] = default
- if widget:
- self.field_attrs['widget'] = widget
- self.field_attrs['required'] = required
- def as_field(self):
- """
- Render the variable as a Django form field.
- """
- form_field = self.form_field(**self.field_attrs)
- if not isinstance(form_field.widget, forms.CheckboxInput):
- if form_field.widget.attrs and 'class' in form_field.widget.attrs.keys():
- form_field.widget.attrs['class'] += ' form-control'
- else:
- form_field.widget.attrs['class'] = 'form-control'
- return form_field
- class StringVar(ScriptVariable):
- """
- Character string representation. Can enforce minimum/maximum length and/or regex validation.
- """
- def __init__(self, min_length=None, max_length=None, regex=None, *args, **kwargs):
- super().__init__(*args, **kwargs)
- # Optional minimum/maximum lengths
- if min_length:
- self.field_attrs['min_length'] = min_length
- if max_length:
- self.field_attrs['max_length'] = max_length
- # Optional regular expression validation
- if regex:
- self.field_attrs['validators'] = [
- RegexValidator(
- regex=regex,
- message='Invalid value. Must match regex: {}'.format(regex),
- code='invalid'
- )
- ]
- class TextVar(ScriptVariable):
- """
- Free-form text data. Renders as a <textarea>.
- """
- form_field = forms.CharField
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.field_attrs['widget'] = forms.Textarea
- class IntegerVar(ScriptVariable):
- """
- Integer representation. Can enforce minimum/maximum values.
- """
- form_field = forms.IntegerField
- def __init__(self, min_value=None, max_value=None, *args, **kwargs):
- super().__init__(*args, **kwargs)
- # Optional minimum/maximum values
- if min_value:
- self.field_attrs['min_value'] = min_value
- if max_value:
- self.field_attrs['max_value'] = max_value
- class DecimalVar(ScriptVariable):
- """
- Decimal representation. Can enforce minimum/maximum values, maximum digits and decimal places.
- """
- form_field = forms.DecimalField
- def __init__(self, min_value=None, max_value=None, max_digits=None, decimal_places=None, *args, **kwargs,):
- super().__init__(*args, **kwargs)
- # Optional constraints
- if min_value:
- self.field_attrs["min_value"] = min_value
- if max_value:
- self.field_attrs["max_value"] = max_value
- if max_digits:
- self.field_attrs["max_digits"] = max_digits
- if decimal_places:
- self.field_attrs["decimal_places"] = decimal_places
- class BooleanVar(ScriptVariable):
- """
- Boolean representation (true/false). Renders as a checkbox.
- """
- form_field = forms.BooleanField
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- # Boolean fields cannot be required
- self.field_attrs['required'] = False
- class ChoiceVar(ScriptVariable):
- """
- Select one of several predefined static choices, passed as a list of two-tuples. Example:
- color = ChoiceVar(
- choices=(
- ('#ff0000', 'Red'),
- ('#00ff00', 'Green'),
- ('#0000ff', 'Blue')
- )
- )
- """
- form_field = forms.ChoiceField
- def __init__(self, choices, *args, **kwargs):
- super().__init__(*args, **kwargs)
- # Set field choices, adding a blank choice to avoid forced selections
- self.field_attrs['choices'] = add_blank_choice(choices)
- class DateVar(ScriptVariable):
- """
- A date.
- """
- form_field = forms.DateField
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.form_field.widget = DatePicker()
- class DateTimeVar(ScriptVariable):
- """
- A date and a time.
- """
- form_field = forms.DateTimeField
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.form_field.widget = DateTimePicker()
- class MultiChoiceVar(ScriptVariable):
- """
- Like ChoiceVar, but allows for the selection of multiple choices.
- """
- form_field = forms.MultipleChoiceField
- def __init__(self, choices, *args, **kwargs):
- super().__init__(*args, **kwargs)
- # Set field choices
- self.field_attrs['choices'] = choices
- class ObjectVar(ScriptVariable):
- """
- A single object within NetBox.
- :param model: The NetBox model being referenced
- :param query_params: A dictionary of additional query parameters to attach when making REST API requests (optional)
- :param context: A custom dictionary mapping template context variables to fields, used when rendering <option>
- elements within the dropdown menu (optional)
- :param null_option: The label to use as a "null" selection option (optional)
- :param selector: Include an advanced object selection widget to assist the user in identifying the desired
- object (optional)
- """
- form_field = DynamicModelChoiceField
- def __init__(self, model, query_params=None, context=None, null_option=None, selector=False, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.field_attrs.update({
- 'queryset': model.objects.all(),
- 'query_params': query_params,
- 'context': context,
- 'null_option': null_option,
- 'selector': selector,
- })
- class MultiObjectVar(ObjectVar):
- """
- Like ObjectVar, but can represent one or more objects.
- """
- form_field = DynamicModelMultipleChoiceField
- class FileVar(ScriptVariable):
- """
- An uploaded file.
- """
- form_field = forms.FileField
- class IPAddressVar(ScriptVariable):
- """
- An IPv4 or IPv6 address without a mask.
- """
- form_field = IPAddressFormField
- class IPAddressWithMaskVar(ScriptVariable):
- """
- An IPv4 or IPv6 address with a mask.
- """
- form_field = IPNetworkFormField
- class IPNetworkVar(ScriptVariable):
- """
- An IPv4 or IPv6 prefix.
- """
- form_field = IPNetworkFormField
- def __init__(self, min_prefix_length=None, max_prefix_length=None, *args, **kwargs):
- super().__init__(*args, **kwargs)
- # Set prefix validator and optional minimum/maximum prefix lengths
- self.field_attrs['validators'] = [prefix_validator]
- if min_prefix_length is not None:
- self.field_attrs['validators'].append(
- MinPrefixLengthValidator(min_prefix_length)
- )
- if max_prefix_length is not None:
- self.field_attrs['validators'].append(
- MaxPrefixLengthValidator(max_prefix_length)
- )
- #
- # Scripts
- #
- class BaseScript:
- """
- Base model for custom scripts. User classes should inherit from this model if they want to extend Script
- functionality for use in other subclasses.
- """
- # Prevent django from instantiating the class on all accesses
- do_not_call_in_templates = True
- class Meta:
- pass
- def __init__(self):
- self.messages = [] # Primary script log
- self.tests = {} # Mapping of logs for test methods
- self.output = ''
- self.failed = False
- self._current_test = None # Tracks the current test method being run (if any)
- # Initiate the log
- self.logger = logging.getLogger(f"netbox.scripts.{self.__module__}.{self.__class__.__name__}")
- # Declare the placeholder for the current request
- self.request = None
- # Compile test methods and initialize results skeleton
- for method in dir(self):
- if method.startswith('test_') and callable(getattr(self, method)):
- self.tests[method] = {
- LogLevelChoices.LOG_SUCCESS: 0,
- LogLevelChoices.LOG_INFO: 0,
- LogLevelChoices.LOG_WARNING: 0,
- LogLevelChoices.LOG_FAILURE: 0,
- 'log': [],
- }
- def __str__(self):
- return self.name
- @classproperty
- def module(self):
- return self.__module__
- @classproperty
- def class_name(self):
- return self.__name__
- @classproperty
- def full_name(self):
- return f'{self.module}.{self.class_name}'
- @classmethod
- def root_module(cls):
- return cls.__module__.split(".")[0]
- # Author-defined attributes
- @classproperty
- def name(self):
- return getattr(self.Meta, 'name', self.__name__)
- @classproperty
- def description(self):
- return getattr(self.Meta, 'description', '')
- @classproperty
- def field_order(self):
- return getattr(self.Meta, 'field_order', None)
- @classproperty
- def fieldsets(self):
- return getattr(self.Meta, 'fieldsets', None)
- @classproperty
- def commit_default(self):
- return getattr(self.Meta, 'commit_default', True)
- @classproperty
- def job_timeout(self):
- return getattr(self.Meta, 'job_timeout', None)
- @classproperty
- def scheduling_enabled(self):
- return getattr(self.Meta, 'scheduling_enabled', True)
- @property
- def filename(self):
- return inspect.getfile(self.__class__)
- def findsource(self, object):
- storage = storages.create_storage(storages.backends["scripts"])
- with storage.open(os.path.basename(self.filename), 'r') as f:
- data = f.read()
- # Break the source code into lines
- lines = [line + '\n' for line in data.splitlines()]
- # Find the class definition
- name = object.__name__
- pat = re.compile(r'^(\s*)class\s*' + name + r'\b')
- # use the class definition with the least indentation
- candidates = []
- for i in range(len(lines)):
- match = pat.match(lines[i])
- if match:
- if lines[i][0] == 'c':
- return lines, i
- candidates.append((match.group(1), i))
- if not candidates:
- raise OSError('could not find class definition')
- # Sort the candidates by whitespace, and by line number
- candidates.sort()
- return lines, candidates[0][1]
- @property
- def source(self):
- # Can't use inspect.getsource() as it uses os to get the file
- # inspect uses ast, but that is overkill for this as we only do
- # classes.
- object = self.__class__
- try:
- lines, lnum = self.findsource(object)
- lines = inspect.getblock(lines[lnum:])
- return ''.join(lines)
- except OSError:
- return ''
- @classmethod
- def _get_vars(cls):
- vars = {}
- # Iterate all base classes looking for ScriptVariables
- for base_class in inspect.getmro(cls):
- # When object is reached there's no reason to continue
- if base_class is object:
- break
- for name, attr in base_class.__dict__.items():
- if name not in vars and issubclass(attr.__class__, ScriptVariable):
- vars[name] = attr
- # Order variables according to field_order
- if not cls.field_order:
- return vars
- ordered_vars = {
- field: vars.pop(field) for field in cls.field_order if field in vars
- }
- ordered_vars.update(vars)
- return ordered_vars
- def run(self, data, commit):
- """
- Override this method with custom script logic.
- """
- # Backward compatibility for legacy Reports
- self.pre_run()
- self.run_tests()
- self.post_run()
- def get_job_data(self):
- """
- Return a dictionary of data to attach to the script's Job.
- """
- return {
- 'log': self.messages,
- 'output': self.output,
- 'tests': self.tests,
- }
- #
- # Form rendering
- #
- def get_fieldsets(self):
- fieldsets = []
- if self.fieldsets:
- fieldsets.extend(self.fieldsets)
- else:
- fields = list(name for name, _ in self._get_vars().items())
- fieldsets.append((_('Script Data'), fields))
- # Append the default fieldset if defined in the Meta class
- exec_parameters = ('_schedule_at', '_interval', '_commit') if self.scheduling_enabled else ('_commit',)
- fieldsets.append((_('Script Execution Parameters'), exec_parameters))
- return fieldsets
- def as_form(self, data=None, files=None, initial=None):
- """
- Return a Django form suitable for populating the context data required to run this Script.
- """
- # Create a dynamic ScriptForm subclass from script variables
- fields = {
- name: var.as_field() for name, var in self._get_vars().items()
- }
- FormClass = type('ScriptForm', (ScriptForm,), fields)
- form = FormClass(data, files, initial=initial)
- # Set initial "commit" checkbox state based on the script's Meta parameter
- form.fields['_commit'].initial = self.commit_default
- # Hide fields if scheduling has been disabled
- if not self.scheduling_enabled:
- form.fields['_schedule_at'].widget = forms.HiddenInput()
- form.fields['_interval'].widget = forms.HiddenInput()
- return form
- #
- # Logging
- #
- def _log(self, message, obj=None, level=LogLevelChoices.LOG_INFO):
- """
- Log a message. Do not call this method directly; use one of the log_* wrappers below.
- """
- if level not in LogLevelChoices.values():
- raise ValueError(f"Invalid logging level: {level}")
- # A test method is currently active, so log the message using legacy Report logging
- if self._current_test:
- # Increment the event counter for this level
- if level in self.tests[self._current_test]:
- self.tests[self._current_test][level] += 1
- # Record message (if any) to the report log
- if message:
- # TODO: Use a dataclass for test method logs
- self.tests[self._current_test]['log'].append((
- timezone.now().isoformat(),
- level,
- str(obj) if obj else None,
- obj.get_absolute_url() if hasattr(obj, 'get_absolute_url') else None,
- str(message),
- ))
- elif message:
- # Record to the script's log
- self.messages.append({
- 'time': timezone.now().isoformat(),
- 'status': level,
- 'message': str(message),
- 'obj': str(obj) if obj else None,
- 'url': obj.get_absolute_url() if hasattr(obj, 'get_absolute_url') else None,
- })
- # Record to the system log
- if obj:
- message = f"{obj}: {message}"
- self.logger.log(LogLevelChoices.SYSTEM_LEVELS[level], message)
- def log_debug(self, message=None, obj=None):
- self._log(message, obj, level=LogLevelChoices.LOG_DEBUG)
- def log_success(self, message=None, obj=None):
- self._log(message, obj, level=LogLevelChoices.LOG_SUCCESS)
- def log_info(self, message=None, obj=None):
- self._log(message, obj, level=LogLevelChoices.LOG_INFO)
- def log_warning(self, message=None, obj=None):
- self._log(message, obj, level=LogLevelChoices.LOG_WARNING)
- def log_failure(self, message=None, obj=None):
- self._log(message, obj, level=LogLevelChoices.LOG_FAILURE)
- self.failed = True
- #
- # Convenience functions
- #
- def load_yaml(self, filename):
- """
- Return data from a YAML file
- """
- # TODO: DEPRECATED: Remove this method in v4.4
- self._log(
- _("load_yaml is deprecated and will be removed in v4.4"),
- level=LogLevelChoices.LOG_WARNING
- )
- file_path = os.path.join(settings.SCRIPTS_ROOT, filename)
- with open(file_path, 'r') as datafile:
- data = yaml.load(datafile, Loader=yaml.SafeLoader)
- return data
- def load_json(self, filename):
- """
- Return data from a JSON file
- """
- # TODO: DEPRECATED: Remove this method in v4.4
- self._log(
- _("load_json is deprecated and will be removed in v4.4"),
- level=LogLevelChoices.LOG_WARNING
- )
- file_path = os.path.join(settings.SCRIPTS_ROOT, filename)
- with open(file_path, 'r') as datafile:
- data = json.load(datafile)
- return data
- #
- # Legacy Report functionality
- #
- def run_tests(self):
- """
- Run the report and save its results. Each test method will be executed in order.
- """
- self.logger.info("Running report")
- try:
- for test_name in self.tests:
- self._current_test = test_name
- test_method = getattr(self, test_name)
- test_method()
- self._current_test = None
- except Exception as e:
- self._current_test = None
- self.post_run()
- raise e
- def pre_run(self):
- """
- Legacy method for operations performed immediately prior to running a Report.
- """
- pass
- def post_run(self):
- """
- Legacy method for operations performed immediately after running a Report.
- """
- pass
- class Script(BaseScript):
- """
- Classes which inherit this model will appear in the list of available scripts.
- """
- pass
- #
- # Functions
- #
- def is_variable(obj):
- """
- Returns True if the object is a ScriptVariable.
- """
- return isinstance(obj, ScriptVariable)
- def get_module_and_script(module_name, script_name):
- module = ScriptModule.objects.get(file_path=f'{module_name}.py')
- script = module.scripts.get(name=script_name)
- return module, script
|