jobs.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. import uuid
  2. import django_rq
  3. from django.conf import settings
  4. from django.contrib.contenttypes.fields import GenericForeignKey
  5. from django.core.exceptions import ValidationError
  6. from django.core.validators import MinValueValidator
  7. from django.db import models
  8. from django.urls import reverse
  9. from django.utils import timezone
  10. from django.utils.translation import gettext as _
  11. from core.choices import JobStatusChoices
  12. from core.models import ObjectType
  13. from core.signals import job_end, job_start
  14. from extras.constants import EVENT_JOB_END, EVENT_JOB_START
  15. from netbox.config import get_config
  16. from netbox.constants import RQ_QUEUE_DEFAULT
  17. from utilities.querysets import RestrictedQuerySet
  18. from utilities.rqworker import get_queue_for_model
  19. __all__ = (
  20. 'Job',
  21. )
  22. class Job(models.Model):
  23. """
  24. Tracks the lifecycle of a job which represents a background task (e.g. the execution of a custom script).
  25. """
  26. object_type = models.ForeignKey(
  27. to='contenttypes.ContentType',
  28. related_name='jobs',
  29. on_delete=models.CASCADE,
  30. )
  31. object_id = models.PositiveBigIntegerField(
  32. blank=True,
  33. null=True
  34. )
  35. object = GenericForeignKey(
  36. ct_field='object_type',
  37. fk_field='object_id',
  38. for_concrete_model=False
  39. )
  40. name = models.CharField(
  41. verbose_name=_('name'),
  42. max_length=200
  43. )
  44. created = models.DateTimeField(
  45. verbose_name=_('created'),
  46. auto_now_add=True
  47. )
  48. scheduled = models.DateTimeField(
  49. verbose_name=_('scheduled'),
  50. null=True,
  51. blank=True
  52. )
  53. interval = models.PositiveIntegerField(
  54. verbose_name=_('interval'),
  55. blank=True,
  56. null=True,
  57. validators=(
  58. MinValueValidator(1),
  59. ),
  60. help_text=_('Recurrence interval (in minutes)')
  61. )
  62. started = models.DateTimeField(
  63. verbose_name=_('started'),
  64. null=True,
  65. blank=True
  66. )
  67. completed = models.DateTimeField(
  68. verbose_name=_('completed'),
  69. null=True,
  70. blank=True
  71. )
  72. user = models.ForeignKey(
  73. to=settings.AUTH_USER_MODEL,
  74. on_delete=models.SET_NULL,
  75. related_name='+',
  76. blank=True,
  77. null=True
  78. )
  79. status = models.CharField(
  80. verbose_name=_('status'),
  81. max_length=30,
  82. choices=JobStatusChoices,
  83. default=JobStatusChoices.STATUS_PENDING
  84. )
  85. data = models.JSONField(
  86. verbose_name=_('data'),
  87. null=True,
  88. blank=True
  89. )
  90. error = models.TextField(
  91. verbose_name=_('error'),
  92. editable=False,
  93. blank=True
  94. )
  95. job_id = models.UUIDField(
  96. verbose_name=_('job ID'),
  97. unique=True
  98. )
  99. objects = RestrictedQuerySet.as_manager()
  100. class Meta:
  101. ordering = ['-created']
  102. indexes = (
  103. models.Index(fields=('object_type', 'object_id')),
  104. )
  105. verbose_name = _('job')
  106. verbose_name_plural = _('jobs')
  107. def __str__(self):
  108. return str(self.job_id)
  109. def get_absolute_url(self):
  110. # TODO: Employ dynamic registration
  111. if self.object_type.model == 'reportmodule':
  112. return reverse(f'extras:report_result', kwargs={'job_pk': self.pk})
  113. if self.object_type.model == 'scriptmodule':
  114. return reverse(f'extras:script_result', kwargs={'job_pk': self.pk})
  115. return reverse('core:job', args=[self.pk])
  116. def get_status_color(self):
  117. return JobStatusChoices.colors.get(self.status)
  118. def clean(self):
  119. super().clean()
  120. # Validate the assigned object type
  121. if self.object_type not in ObjectType.objects.with_feature('jobs'):
  122. raise ValidationError(
  123. _("Jobs cannot be assigned to this object type ({type}).").format(type=self.object_type)
  124. )
  125. @property
  126. def duration(self):
  127. if not self.completed:
  128. return None
  129. start_time = self.started or self.created
  130. if not start_time:
  131. return None
  132. duration = self.completed - start_time
  133. minutes, seconds = divmod(duration.total_seconds(), 60)
  134. return f"{int(minutes)} minutes, {seconds:.2f} seconds"
  135. def delete(self, *args, **kwargs):
  136. super().delete(*args, **kwargs)
  137. rq_queue_name = get_config().QUEUE_MAPPINGS.get(self.object_type.model, RQ_QUEUE_DEFAULT)
  138. queue = django_rq.get_queue(rq_queue_name)
  139. job = queue.fetch_job(str(self.job_id))
  140. if job:
  141. job.cancel()
  142. def start(self):
  143. """
  144. Record the job's start time and update its status to "running."
  145. """
  146. if self.started is not None:
  147. return
  148. # Start the job
  149. self.started = timezone.now()
  150. self.status = JobStatusChoices.STATUS_RUNNING
  151. self.save()
  152. # Send signal
  153. job_start.send(self)
  154. def terminate(self, status=JobStatusChoices.STATUS_COMPLETED, error=None):
  155. """
  156. Mark the job as completed, optionally specifying a particular termination status.
  157. """
  158. valid_statuses = JobStatusChoices.TERMINAL_STATE_CHOICES
  159. if status not in valid_statuses:
  160. raise ValueError(
  161. _("Invalid status for job termination. Choices are: {choices}").format(
  162. choices=', '.join(valid_statuses)
  163. )
  164. )
  165. # Mark the job as completed
  166. self.status = status
  167. if error:
  168. self.error = error
  169. self.completed = timezone.now()
  170. self.save()
  171. # Send signal
  172. job_end.send(self)
  173. @classmethod
  174. def enqueue(cls, func, instance, name='', user=None, schedule_at=None, interval=None, **kwargs):
  175. """
  176. Create a Job instance and enqueue a job using the given callable
  177. Args:
  178. func: The callable object to be enqueued for execution
  179. instance: The NetBox object to which this job pertains
  180. name: Name for the job (optional)
  181. user: The user responsible for running the job
  182. schedule_at: Schedule the job to be executed at the passed date and time
  183. interval: Recurrence interval (in minutes)
  184. """
  185. object_type = ObjectType.objects.get_for_model(instance, for_concrete_model=False)
  186. rq_queue_name = get_queue_for_model(object_type.model)
  187. queue = django_rq.get_queue(rq_queue_name)
  188. status = JobStatusChoices.STATUS_SCHEDULED if schedule_at else JobStatusChoices.STATUS_PENDING
  189. job = Job.objects.create(
  190. object_type=object_type,
  191. object_id=instance.pk,
  192. name=name,
  193. status=status,
  194. scheduled=schedule_at,
  195. interval=interval,
  196. user=user,
  197. job_id=uuid.uuid4()
  198. )
  199. if schedule_at:
  200. queue.enqueue_at(schedule_at, func, job_id=str(job.job_id), job=job, **kwargs)
  201. else:
  202. queue.enqueue(func, job_id=str(job.job_id), job=job, **kwargs)
  203. return job