scripts.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. import inspect
  2. import logging
  3. from functools import cached_property
  4. from django.contrib.contenttypes.fields import GenericRelation
  5. from django.db import models
  6. from django.db.models import Q
  7. from django.urls import reverse
  8. from django.utils.translation import gettext_lazy as _
  9. from core.choices import ManagedFileRootPathChoices
  10. from core.models import ManagedFile
  11. from extras.utils import is_script
  12. from netbox.models.features import EventRulesMixin, JobsMixin
  13. from utilities.querysets import RestrictedQuerySet
  14. from .mixins import PythonModuleMixin
  15. __all__ = (
  16. 'Script',
  17. 'ScriptModule',
  18. )
  19. logger = logging.getLogger('netbox.data_backends')
  20. class Script(EventRulesMixin, JobsMixin):
  21. name = models.CharField(
  22. verbose_name=_('name'),
  23. max_length=79, # Maximum length for a Python class name
  24. editable=False,
  25. )
  26. module = models.ForeignKey(
  27. to='extras.ScriptModule',
  28. on_delete=models.CASCADE,
  29. related_name='scripts',
  30. editable=False
  31. )
  32. is_executable = models.BooleanField(
  33. default=True,
  34. verbose_name=_('is executable'),
  35. editable=False
  36. )
  37. events = GenericRelation(
  38. 'extras.EventRule',
  39. content_type_field='action_object_type',
  40. object_id_field='action_object_id'
  41. )
  42. def __str__(self):
  43. return self.name
  44. objects = RestrictedQuerySet.as_manager()
  45. class Meta:
  46. ordering = ('module', 'name')
  47. constraints = (
  48. models.UniqueConstraint(
  49. fields=('name', 'module'),
  50. name='extras_script_unique_name_module'
  51. ),
  52. )
  53. verbose_name = _('script')
  54. verbose_name_plural = _('scripts')
  55. def get_absolute_url(self):
  56. return reverse('extras:script', args=[self.pk])
  57. @property
  58. def result(self):
  59. return self.jobs.all().order_by('-created').first()
  60. @cached_property
  61. def python_class(self):
  62. return self.module.module_scripts.get(self.name)
  63. def delete(self, soft_delete=False, **kwargs):
  64. if soft_delete and self.jobs.exists():
  65. self.is_executable = False
  66. self.save()
  67. else:
  68. super().delete(**kwargs)
  69. self.id = None
  70. class ScriptModuleManager(models.Manager.from_queryset(RestrictedQuerySet)):
  71. def get_queryset(self):
  72. return super().get_queryset().filter(
  73. Q(file_root=ManagedFileRootPathChoices.SCRIPTS) | Q(file_root=ManagedFileRootPathChoices.REPORTS))
  74. class ScriptModule(PythonModuleMixin, JobsMixin, ManagedFile):
  75. """
  76. Proxy model for script module files.
  77. """
  78. objects = ScriptModuleManager()
  79. error = None
  80. event_rules = GenericRelation(
  81. to='extras.EventRule',
  82. content_type_field='action_object_type',
  83. object_id_field='action_object_id',
  84. for_concrete_model=False
  85. )
  86. class Meta:
  87. proxy = True
  88. ordering = ('file_root', 'file_path')
  89. verbose_name = _('script module')
  90. verbose_name_plural = _('script modules')
  91. def get_absolute_url(self):
  92. return reverse('extras:script_list')
  93. def __str__(self):
  94. return self.python_name
  95. @property
  96. def ordered_scripts(self):
  97. script_objects = {s.name: s for s in self.scripts.all()}
  98. ordered = [
  99. script_objects.pop(sc) for sc in self.module_scripts.keys() if sc in script_objects
  100. ]
  101. ordered.extend(script_objects.values())
  102. return ordered
  103. @cached_property
  104. def module_scripts(self):
  105. def _get_name(cls):
  106. # For child objects in submodules use the full import path w/o the root module as the name
  107. return cls.full_name.split(".", maxsplit=1)[1]
  108. try:
  109. module = self.get_module()
  110. except Exception as e:
  111. self.error = e
  112. logger.error(f"Failed to load script: {self.python_name} error: {e}")
  113. module = None
  114. scripts = {}
  115. ordered = getattr(module, 'script_order', [])
  116. for cls in ordered:
  117. scripts[_get_name(cls)] = cls
  118. for name, cls in inspect.getmembers(module, is_script):
  119. if cls not in ordered:
  120. scripts[_get_name(cls)] = cls
  121. return scripts
  122. def sync_classes(self):
  123. """
  124. Syncs the file-based module to the database, adding and removing individual Script objects
  125. in the database as needed.
  126. """
  127. if self.id:
  128. db_classes = {
  129. script.name: script for script in self.scripts.all()
  130. }
  131. else:
  132. db_classes = {}
  133. db_classes_set = set(db_classes.keys())
  134. module_classes_set = set(self.module_scripts.keys())
  135. # remove any existing db classes if they are no longer in the file
  136. removed = db_classes_set - module_classes_set
  137. for name in removed:
  138. db_classes[name].delete(soft_delete=True)
  139. added = module_classes_set - db_classes_set
  140. for name in added:
  141. Script.objects.create(
  142. module=self,
  143. name=name,
  144. is_executable=True,
  145. )
  146. sync_classes.alters_data = True
  147. def sync_data(self):
  148. super().sync_data()
  149. sync_data.alters_data = True
  150. def save(self, *args, **kwargs):
  151. self.file_root = ManagedFileRootPathChoices.SCRIPTS
  152. super().save(*args, **kwargs)
  153. # Sync script classes after the module has been saved. This is the
  154. # single intended synchronization path for ScriptModule saves.
  155. self.sync_classes()