scripts.py 19 KB


  1. import inspect
  2. import json
  3. import logging
  4. import os
  5. import re
  6. import yaml
  7. from django import forms
  8. from django.conf import settings
  9. from django.core.files.storage import storages
  10. from django.core.validators import RegexValidator
  11. from django.utils import timezone
  12. from django.utils.functional import classproperty
  13. from django.utils.translation import gettext as _
  14. from extras.choices import LogLevelChoices
  15. from extras.models import ScriptModule
  16. from ipam.formfields import IPAddressFormField, IPNetworkFormField
  17. from ipam.validators import MaxPrefixLengthValidator, MinPrefixLengthValidator, prefix_validator
  18. from utilities.forms import add_blank_choice
  19. from utilities.forms.fields import DynamicModelChoiceField, DynamicModelMultipleChoiceField
  20. from utilities.forms.widgets import DatePicker, DateTimePicker
  21. from .forms import ScriptForm
  22. __all__ = (
  23. 'BaseScript',
  24. 'BooleanVar',
  25. 'ChoiceVar',
  26. 'DateVar',
  27. 'DateTimeVar',
  28. 'FileVar',
  29. 'IntegerVar',
  30. 'DecimalVar',
  31. 'IPAddressVar',
  32. 'IPAddressWithMaskVar',
  33. 'IPNetworkVar',
  34. 'MultiChoiceVar',
  35. 'MultiObjectVar',
  36. 'ObjectVar',
  37. 'Script',
  38. 'StringVar',
  39. 'TextVar',
  40. 'get_module_and_script',
  41. )
  42. #
  43. # Script variables
  44. #
  45. class ScriptVariable:
  46. """
  47. Base model for script variables
  48. """
  49. form_field = forms.CharField
  50. def __init__(self, label='', description='', default=None, required=True, widget=None):
  51. # Initialize field attributes
  52. if not hasattr(self, 'field_attrs'):
  53. self.field_attrs = {}
  54. if label:
  55. self.field_attrs['label'] = label
  56. if description:
  57. self.field_attrs['help_text'] = description
  58. if default:
  59. self.field_attrs['initial'] = default
  60. if widget:
  61. self.field_attrs['widget'] = widget
  62. self.field_attrs['required'] = required
  63. def as_field(self):
  64. """
  65. Render the variable as a Django form field.
  66. """
  67. form_field = self.form_field(**self.field_attrs)
  68. if not isinstance(form_field.widget, forms.CheckboxInput):
  69. if form_field.widget.attrs and 'class' in form_field.widget.attrs.keys():
  70. form_field.widget.attrs['class'] += ' form-control'
  71. else:
  72. form_field.widget.attrs['class'] = 'form-control'
  73. return form_field
  74. class StringVar(ScriptVariable):
  75. """
  76. Character string representation. Can enforce minimum/maximum length and/or regex validation.
  77. """
  78. def __init__(self, min_length=None, max_length=None, regex=None, *args, **kwargs):
  79. super().__init__(*args, **kwargs)
  80. # Optional minimum/maximum lengths
  81. if min_length:
  82. self.field_attrs['min_length'] = min_length
  83. if max_length:
  84. self.field_attrs['max_length'] = max_length
  85. # Optional regular expression validation
  86. if regex:
  87. self.field_attrs['validators'] = [
  88. RegexValidator(
  89. regex=regex,
  90. message='Invalid value. Must match regex: {}'.format(regex),
  91. code='invalid'
  92. )
  93. ]
  94. class TextVar(ScriptVariable):
  95. """
  96. Free-form text data. Renders as a <textarea>.
  97. """
  98. form_field = forms.CharField
  99. def __init__(self, *args, **kwargs):
  100. super().__init__(*args, **kwargs)
  101. self.field_attrs['widget'] = forms.Textarea
  102. class IntegerVar(ScriptVariable):
  103. """
  104. Integer representation. Can enforce minimum/maximum values.
  105. """
  106. form_field = forms.IntegerField
  107. def __init__(self, min_value=None, max_value=None, *args, **kwargs):
  108. super().__init__(*args, **kwargs)
  109. # Optional minimum/maximum values
  110. if min_value:
  111. self.field_attrs['min_value'] = min_value
  112. if max_value:
  113. self.field_attrs['max_value'] = max_value
  114. class DecimalVar(ScriptVariable):
  115. """
  116. Decimal representation. Can enforce minimum/maximum values, maximum digits and decimal places.
  117. """
  118. form_field = forms.DecimalField
  119. def __init__(self, min_value=None, max_value=None, max_digits=None, decimal_places=None, *args, **kwargs,):
  120. super().__init__(*args, **kwargs)
  121. # Optional constraints
  122. if min_value:
  123. self.field_attrs["min_value"] = min_value
  124. if max_value:
  125. self.field_attrs["max_value"] = max_value
  126. if max_digits:
  127. self.field_attrs["max_digits"] = max_digits
  128. if decimal_places:
  129. self.field_attrs["decimal_places"] = decimal_places
  130. class BooleanVar(ScriptVariable):
  131. """
  132. Boolean representation (true/false). Renders as a checkbox.
  133. """
  134. form_field = forms.BooleanField
  135. def __init__(self, *args, **kwargs):
  136. super().__init__(*args, **kwargs)
  137. # Boolean fields cannot be required
  138. self.field_attrs['required'] = False
  139. class ChoiceVar(ScriptVariable):
  140. """
  141. Select one of several predefined static choices, passed as a list of two-tuples. Example:
  142. color = ChoiceVar(
  143. choices=(
  144. ('#ff0000', 'Red'),
  145. ('#00ff00', 'Green'),
  146. ('#0000ff', 'Blue')
  147. )
  148. )
  149. """
  150. form_field = forms.ChoiceField
  151. def __init__(self, choices, *args, **kwargs):
  152. super().__init__(*args, **kwargs)
  153. # Set field choices, adding a blank choice to avoid forced selections
  154. self.field_attrs['choices'] = add_blank_choice(choices)
  155. class DateVar(ScriptVariable):
  156. """
  157. A date.
  158. """
  159. form_field = forms.DateField
  160. def __init__(self, *args, **kwargs):
  161. super().__init__(*args, **kwargs)
  162. self.form_field.widget = DatePicker()
  163. class DateTimeVar(ScriptVariable):
  164. """
  165. A date and a time.
  166. """
  167. form_field = forms.DateTimeField
  168. def __init__(self, *args, **kwargs):
  169. super().__init__(*args, **kwargs)
  170. self.form_field.widget = DateTimePicker()
  171. class MultiChoiceVar(ScriptVariable):
  172. """
  173. Like ChoiceVar, but allows for the selection of multiple choices.
  174. """
  175. form_field = forms.MultipleChoiceField
  176. def __init__(self, choices, *args, **kwargs):
  177. super().__init__(*args, **kwargs)
  178. # Set field choices
  179. self.field_attrs['choices'] = choices
  180. class ObjectVar(ScriptVariable):
  181. """
  182. A single object within NetBox.
  183. :param model: The NetBox model being referenced
  184. :param query_params: A dictionary of additional query parameters to attach when making REST API requests (optional)
  185. :param context: A custom dictionary mapping template context variables to fields, used when rendering <option>
  186. elements within the dropdown menu (optional)
  187. :param null_option: The label to use as a "null" selection option (optional)
  188. :param selector: Include an advanced object selection widget to assist the user in identifying the desired
  189. object (optional)
  190. """
  191. form_field = DynamicModelChoiceField
  192. def __init__(self, model, query_params=None, context=None, null_option=None, selector=False, *args, **kwargs):
  193. super().__init__(*args, **kwargs)
  194. self.field_attrs.update({
  195. 'queryset': model.objects.all(),
  196. 'query_params': query_params,
  197. 'context': context,
  198. 'null_option': null_option,
  199. 'selector': selector,
  200. })
  201. class MultiObjectVar(ObjectVar):
  202. """
  203. Like ObjectVar, but can represent one or more objects.
  204. """
  205. form_field = DynamicModelMultipleChoiceField
  206. class FileVar(ScriptVariable):
  207. """
  208. An uploaded file.
  209. """
  210. form_field = forms.FileField
  211. class IPAddressVar(ScriptVariable):
  212. """
  213. An IPv4 or IPv6 address without a mask.
  214. """
  215. form_field = IPAddressFormField
  216. class IPAddressWithMaskVar(ScriptVariable):
  217. """
  218. An IPv4 or IPv6 address with a mask.
  219. """
  220. form_field = IPNetworkFormField
  221. class IPNetworkVar(ScriptVariable):
  222. """
  223. An IPv4 or IPv6 prefix.
  224. """
  225. form_field = IPNetworkFormField
  226. def __init__(self, min_prefix_length=None, max_prefix_length=None, *args, **kwargs):
  227. super().__init__(*args, **kwargs)
  228. # Set prefix validator and optional minimum/maximum prefix lengths
  229. self.field_attrs['validators'] = [prefix_validator]
  230. if min_prefix_length is not None:
  231. self.field_attrs['validators'].append(
  232. MinPrefixLengthValidator(min_prefix_length)
  233. )
  234. if max_prefix_length is not None:
  235. self.field_attrs['validators'].append(
  236. MaxPrefixLengthValidator(max_prefix_length)
  237. )
  238. #
  239. # Scripts
  240. #
  241. class BaseScript:
  242. """
  243. Base model for custom scripts. User classes should inherit from this model if they want to extend Script
  244. functionality for use in other subclasses.
  245. """
  246. # Prevent django from instantiating the class on all accesses
  247. do_not_call_in_templates = True
  248. class Meta:
  249. pass
  250. def __init__(self):
  251. self.messages = [] # Primary script log
  252. self.tests = {} # Mapping of logs for test methods
  253. self.output = ''
  254. self.failed = False
  255. self._current_test = None # Tracks the current test method being run (if any)
  256. # Initiate the log
  257. self.logger = logging.getLogger(f"netbox.scripts.{self.__module__}.{self.__class__.__name__}")
  258. # Declare the placeholder for the current request
  259. self.request = None
  260. # Compile test methods and initialize results skeleton
  261. for method in dir(self):
  262. if method.startswith('test_') and callable(getattr(self, method)):
  263. self.tests[method] = {
  264. LogLevelChoices.LOG_SUCCESS: 0,
  265. LogLevelChoices.LOG_INFO: 0,
  266. LogLevelChoices.LOG_WARNING: 0,
  267. LogLevelChoices.LOG_FAILURE: 0,
  268. 'log': [],
  269. }
  270. def __str__(self):
  271. return self.name
  272. @classproperty
  273. def module(self):
  274. return self.__module__
  275. @classproperty
  276. def class_name(self):
  277. return self.__name__
  278. @classproperty
  279. def full_name(self):
  280. return f'{self.module}.{self.class_name}'
  281. @classmethod
  282. def root_module(cls):
  283. return cls.__module__.split(".")[0]
  284. # Author-defined attributes
  285. @classproperty
  286. def name(self):
  287. return getattr(self.Meta, 'name', self.__name__)
  288. @classproperty
  289. def description(self):
  290. return getattr(self.Meta, 'description', '')
  291. @classproperty
  292. def field_order(self):
  293. return getattr(self.Meta, 'field_order', None)
  294. @classproperty
  295. def fieldsets(self):
  296. return getattr(self.Meta, 'fieldsets', None)
  297. @classproperty
  298. def commit_default(self):
  299. return getattr(self.Meta, 'commit_default', True)
  300. @classproperty
  301. def job_timeout(self):
  302. return getattr(self.Meta, 'job_timeout', None)
  303. @classproperty
  304. def scheduling_enabled(self):
  305. return getattr(self.Meta, 'scheduling_enabled', True)
  306. @property
  307. def filename(self):
  308. return inspect.getfile(self.__class__)
  309. def findsource(self, object):
  310. storage = storages.create_storage(storages.backends["scripts"])
  311. with storage.open(os.path.basename(self.filename), 'r') as f:
  312. data = f.read()
  313. # Break the source code into lines
  314. lines = [line + '\n' for line in data.splitlines()]
  315. # Find the class definition
  316. name = object.__name__
  317. pat = re.compile(r'^(\s*)class\s*' + name + r'\b')
  318. # use the class definition with the least indentation
  319. candidates = []
  320. for i in range(len(lines)):
  321. match = pat.match(lines[i])
  322. if match:
  323. if lines[i][0] == 'c':
  324. return lines, i
  325. candidates.append((match.group(1), i))
  326. if not candidates:
  327. raise OSError('could not find class definition')
  328. # Sort the candidates by whitespace, and by line number
  329. candidates.sort()
  330. return lines, candidates[0][1]
  331. @property
  332. def source(self):
  333. # Can't use inspect.getsource() as it uses os to get the file
  334. # inspect uses ast, but that is overkill for this as we only do
  335. # classes.
  336. object = self.__class__
  337. try:
  338. lines, lnum = self.findsource(object)
  339. lines = inspect.getblock(lines[lnum:])
  340. return ''.join(lines)
  341. except OSError:
  342. return ''
  343. @classmethod
  344. def _get_vars(cls):
  345. vars = {}
  346. # Iterate all base classes looking for ScriptVariables
  347. for base_class in inspect.getmro(cls):
  348. # When object is reached there's no reason to continue
  349. if base_class is object:
  350. break
  351. for name, attr in base_class.__dict__.items():
  352. if name not in vars and issubclass(attr.__class__, ScriptVariable):
  353. vars[name] = attr
  354. # Order variables according to field_order
  355. if not cls.field_order:
  356. return vars
  357. ordered_vars = {
  358. field: vars.pop(field) for field in cls.field_order if field in vars
  359. }
  360. ordered_vars.update(vars)
  361. return ordered_vars
  362. def run(self, data, commit):
  363. """
  364. Override this method with custom script logic.
  365. """
  366. # Backward compatibility for legacy Reports
  367. self.pre_run()
  368. self.run_tests()
  369. self.post_run()
  370. def get_job_data(self):
  371. """
  372. Return a dictionary of data to attach to the script's Job.
  373. """
  374. return {
  375. 'log': self.messages,
  376. 'output': self.output,
  377. 'tests': self.tests,
  378. }
  379. #
  380. # Form rendering
  381. #
  382. def get_fieldsets(self):
  383. fieldsets = []
  384. if self.fieldsets:
  385. fieldsets.extend(self.fieldsets)
  386. else:
  387. fields = list(name for name, _ in self._get_vars().items())
  388. fieldsets.append((_('Script Data'), fields))
  389. # Append the default fieldset if defined in the Meta class
  390. exec_parameters = ('_schedule_at', '_interval', '_commit') if self.scheduling_enabled else ('_commit',)
  391. fieldsets.append((_('Script Execution Parameters'), exec_parameters))
  392. return fieldsets
  393. def as_form(self, data=None, files=None, initial=None):
  394. """
  395. Return a Django form suitable for populating the context data required to run this Script.
  396. """
  397. # Create a dynamic ScriptForm subclass from script variables
  398. fields = {
  399. name: var.as_field() for name, var in self._get_vars().items()
  400. }
  401. FormClass = type('ScriptForm', (ScriptForm,), fields)
  402. form = FormClass(data, files, initial=initial)
  403. # Set initial "commit" checkbox state based on the script's Meta parameter
  404. form.fields['_commit'].initial = self.commit_default
  405. # Hide fields if scheduling has been disabled
  406. if not self.scheduling_enabled:
  407. form.fields['_schedule_at'].widget = forms.HiddenInput()
  408. form.fields['_interval'].widget = forms.HiddenInput()
  409. return form
  410. #
  411. # Logging
  412. #
  413. def _log(self, message, obj=None, level=LogLevelChoices.LOG_INFO):
  414. """
  415. Log a message. Do not call this method directly; use one of the log_* wrappers below.
  416. """
  417. if level not in LogLevelChoices.values():
  418. raise ValueError(f"Invalid logging level: {level}")
  419. # A test method is currently active, so log the message using legacy Report logging
  420. if self._current_test:
  421. # Increment the event counter for this level
  422. if level in self.tests[self._current_test]:
  423. self.tests[self._current_test][level] += 1
  424. # Record message (if any) to the report log
  425. if message:
  426. # TODO: Use a dataclass for test method logs
  427. self.tests[self._current_test]['log'].append((
  428. timezone.now().isoformat(),
  429. level,
  430. str(obj) if obj else None,
  431. obj.get_absolute_url() if hasattr(obj, 'get_absolute_url') else None,
  432. str(message),
  433. ))
  434. elif message:
  435. # Record to the script's log
  436. self.messages.append({
  437. 'time': timezone.now().isoformat(),
  438. 'status': level,
  439. 'message': str(message),
  440. 'obj': str(obj) if obj else None,
  441. 'url': obj.get_absolute_url() if hasattr(obj, 'get_absolute_url') else None,
  442. })
  443. # Record to the system log
  444. if obj:
  445. message = f"{obj}: {message}"
  446. self.logger.log(LogLevelChoices.SYSTEM_LEVELS[level], message)
  447. def log_debug(self, message=None, obj=None):
  448. self._log(message, obj, level=LogLevelChoices.LOG_DEBUG)
  449. def log_success(self, message=None, obj=None):
  450. self._log(message, obj, level=LogLevelChoices.LOG_SUCCESS)
  451. def log_info(self, message=None, obj=None):
  452. self._log(message, obj, level=LogLevelChoices.LOG_INFO)
  453. def log_warning(self, message=None, obj=None):
  454. self._log(message, obj, level=LogLevelChoices.LOG_WARNING)
  455. def log_failure(self, message=None, obj=None):
  456. self._log(message, obj, level=LogLevelChoices.LOG_FAILURE)
  457. self.failed = True
  458. #
  459. # Convenience functions
  460. #
  461. def load_yaml(self, filename):
  462. """
  463. Return data from a YAML file
  464. """
  465. # TODO: DEPRECATED: Remove this method in v4.4
  466. self._log(
  467. _("load_yaml is deprecated and will be removed in v4.4"),
  468. level=LogLevelChoices.LOG_WARNING
  469. )
  470. file_path = os.path.join(settings.SCRIPTS_ROOT, filename)
  471. with open(file_path, 'r') as datafile:
  472. data = yaml.load(datafile, Loader=yaml.SafeLoader)
  473. return data
  474. def load_json(self, filename):
  475. """
  476. Return data from a JSON file
  477. """
  478. # TODO: DEPRECATED: Remove this method in v4.4
  479. self._log(
  480. _("load_json is deprecated and will be removed in v4.4"),
  481. level=LogLevelChoices.LOG_WARNING
  482. )
  483. file_path = os.path.join(settings.SCRIPTS_ROOT, filename)
  484. with open(file_path, 'r') as datafile:
  485. data = json.load(datafile)
  486. return data
  487. #
  488. # Legacy Report functionality
  489. #
  490. def run_tests(self):
  491. """
  492. Run the report and save its results. Each test method will be executed in order.
  493. """
  494. self.logger.info("Running report")
  495. try:
  496. for test_name in self.tests:
  497. self._current_test = test_name
  498. test_method = getattr(self, test_name)
  499. test_method()
  500. self._current_test = None
  501. except Exception as e:
  502. self._current_test = None
  503. self.post_run()
  504. raise e
  505. def pre_run(self):
  506. """
  507. Legacy method for operations performed immediately prior to running a Report.
  508. """
  509. pass
  510. def post_run(self):
  511. """
  512. Legacy method for operations performed immediately after running a Report.
  513. """
  514. pass
  515. class Script(BaseScript):
  516. """
  517. Classes which inherit this model will appear in the list of available scripts.
  518. """
  519. pass
  520. #
  521. # Functions
  522. #
  523. def is_variable(obj):
  524. """
  525. Returns True if the object is a ScriptVariable.
  526. """
  527. return isinstance(obj, ScriptVariable)
  528. def get_module_and_script(module_name, script_name):
  529. module = ScriptModule.objects.get(file_path=f'{module_name}.py')
  530. script = module.scripts.get(name=script_name)
  531. return module, script