scripts.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. import inspect
  2. import logging
  3. from functools import cached_property
  4. from django.contrib.contenttypes.fields import GenericRelation
  5. from django.db import models
  6. from django.db.models import Q
  7. from django.db.models.signals import post_save
  8. from django.dispatch import receiver
  9. from django.urls import reverse
  10. from django.utils.translation import gettext_lazy as _
  11. from core.choices import ManagedFileRootPathChoices
  12. from core.models import ManagedFile
  13. from extras.utils import is_script
  14. from netbox.models.features import JobsMixin, EventRulesMixin
  15. from utilities.querysets import RestrictedQuerySet
  16. from .mixins import PythonModuleMixin
  17. __all__ = (
  18. 'Script',
  19. 'ScriptModule',
  20. )
  21. logger = logging.getLogger('netbox.data_backends')
  22. class Script(EventRulesMixin, JobsMixin):
  23. name = models.CharField(
  24. verbose_name=_('name'),
  25. max_length=79, # Maximum length for a Python class name
  26. editable=False,
  27. )
  28. module = models.ForeignKey(
  29. to='extras.ScriptModule',
  30. on_delete=models.CASCADE,
  31. related_name='scripts',
  32. editable=False
  33. )
  34. is_executable = models.BooleanField(
  35. default=True,
  36. verbose_name=_('is executable'),
  37. editable=False
  38. )
  39. events = GenericRelation(
  40. 'extras.EventRule',
  41. content_type_field='action_object_type',
  42. object_id_field='action_object_id'
  43. )
  44. def __str__(self):
  45. return self.name
  46. objects = RestrictedQuerySet.as_manager()
  47. class Meta:
  48. ordering = ('module', 'name')
  49. constraints = (
  50. models.UniqueConstraint(
  51. fields=('name', 'module'),
  52. name='extras_script_unique_name_module'
  53. ),
  54. )
  55. verbose_name = _('script')
  56. verbose_name_plural = _('scripts')
  57. def get_absolute_url(self):
  58. return reverse('extras:script', args=[self.pk])
  59. @property
  60. def result(self):
  61. return self.jobs.all().order_by('-created').first()
  62. @cached_property
  63. def python_class(self):
  64. return self.module.module_scripts.get(self.name)
  65. def delete(self, soft_delete=False, **kwargs):
  66. if soft_delete and self.jobs.exists():
  67. self.is_executable = False
  68. self.save()
  69. else:
  70. super().delete(**kwargs)
  71. self.id = None
  72. class ScriptModuleManager(models.Manager.from_queryset(RestrictedQuerySet)):
  73. def get_queryset(self):
  74. return super().get_queryset().filter(
  75. Q(file_root=ManagedFileRootPathChoices.SCRIPTS) | Q(file_root=ManagedFileRootPathChoices.REPORTS))
  76. class ScriptModule(PythonModuleMixin, JobsMixin, ManagedFile):
  77. """
  78. Proxy model for script module files.
  79. """
  80. objects = ScriptModuleManager()
  81. error = None
  82. event_rules = GenericRelation(
  83. to='extras.EventRule',
  84. content_type_field='action_object_type',
  85. object_id_field='action_object_id',
  86. for_concrete_model=False
  87. )
  88. class Meta:
  89. proxy = True
  90. ordering = ('file_root', 'file_path')
  91. verbose_name = _('script module')
  92. verbose_name_plural = _('script modules')
  93. def get_absolute_url(self):
  94. return reverse('extras:script_list')
  95. def __str__(self):
  96. return self.python_name
  97. @property
  98. def module_scripts(self):
  99. def _get_name(cls):
  100. # For child objects in submodules use the full import path w/o the root module as the name
  101. return cls.full_name.split(".", maxsplit=1)[1]
  102. try:
  103. module = self.get_module()
  104. except Exception as e:
  105. self.error = e
  106. logger.debug(f"Failed to load script: {self.python_name} error: {e}")
  107. module = None
  108. scripts = {}
  109. ordered = getattr(module, 'script_order', [])
  110. for cls in ordered:
  111. scripts[_get_name(cls)] = cls
  112. for name, cls in inspect.getmembers(module, is_script):
  113. if cls not in ordered:
  114. scripts[_get_name(cls)] = cls
  115. return scripts
  116. def sync_classes(self):
  117. """
  118. Syncs the file-based module to the database, adding and removing individual Script objects
  119. in the database as needed.
  120. """
  121. if self.id:
  122. db_classes = {
  123. script.name: script for script in self.scripts.all()
  124. }
  125. else:
  126. db_classes = {}
  127. db_classes_set = set(db_classes.keys())
  128. module_classes_set = set(self.module_scripts.keys())
  129. # remove any existing db classes if they are no longer in the file
  130. removed = db_classes_set - module_classes_set
  131. for name in removed:
  132. db_classes[name].delete(soft_delete=True)
  133. added = module_classes_set - db_classes_set
  134. for name in added:
  135. Script.objects.create(
  136. module=self,
  137. name=name,
  138. is_executable=True,
  139. )
  140. def sync_data(self):
  141. super().sync_data()
  142. def save(self, *args, **kwargs):
  143. self.file_root = ManagedFileRootPathChoices.SCRIPTS
  144. super().save(*args, **kwargs)
  145. self.sync_classes()
  146. @receiver(post_save, sender=ScriptModule)
  147. def script_module_post_save_handler(instance, created, **kwargs):
  148. instance.sync_classes()