Просмотр исходного кода

Fixes #22232: Avoid enqueuing duplicate housekeeping jobs on startup (#22258)

Jeremy Stretch 12 часов назад
Родитель
Сommit
adc5b79330
2 измененных файлов с 154 добавлено и 1 удалено
  1. 23 1
      netbox/netbox/jobs.py
  2. 131 0
      netbox/netbox/tests/test_jobs.py

+ 23 - 1
netbox/netbox/jobs.py

@@ -136,7 +136,8 @@ class JobRunner(ABC):
                 )
                 if job.object and getattr(job.object, "python_class", None):
                     kwargs["job_timeout"] = job.object.python_class.job_timeout
-                cls.enqueue(
+
+                enqueue_kwargs = dict(
                     instance=job.object,
                     name=job.name,
                     user=job.user,
@@ -146,6 +147,27 @@ class JobRunner(ABC):
                     **kwargs,
                 )
 
+                if cls in registry['system_jobs']:
+                    # System jobs are also scheduled by `enqueue_once()` at worker startup,
+                    # which races with this finally block and can produce duplicate schedules
+                    # (see #22232). Acquire the same advisory lock used by `enqueue_once()`
+                    # and skip rescheduling if a successor is already enqueued.
+                    #
+                    # This branch is limited to system jobs because generic recurring jobs
+                    # (e.g. scheduled scripts) may have multiple legitimate schedules sharing
+                    # the same runner/object/interval but differing in their runtime kwargs.
+                    with advisory_lock(ADVISORY_LOCK_KEYS['job-schedules']):
+                        successor_exists = Job.objects.filter(
+                            name=cls.name,
+                            object_id__isnull=True,
+                            status__in=JobStatusChoices.ENQUEUED_STATE_CHOICES,
+                            interval=job.interval,
+                        ).exclude(pk=job.pk).exists()
+                        if not successor_exists:
+                            cls.enqueue(**enqueue_kwargs)
+                else:
+                    cls.enqueue(**enqueue_kwargs)
+
     @classmethod
     def get_jobs(cls, instance=None):
         """

+ 131 - 0
netbox/netbox/tests/test_jobs.py

@@ -1,4 +1,6 @@
+import uuid
 from datetime import timedelta
+from unittest.mock import patch
 
 from django.test import TestCase
 from django.utils import timezone
@@ -24,6 +26,13 @@ class TestJobRunner(JobRunner):
         self.logger.error("Error message")
 
 
+@system_job(interval=60)
+class TestSystemJobRunner(JobRunner):
+
+    def run(self, *args, **kwargs):
+        pass
+
+
 class JobRunnerTestCase(TestCase):
     def tearDown(self):
         super().tearDown()
@@ -193,3 +202,125 @@ class SystemJobTestCase(JobRunnerTestCase):
 
         self.assertEqual(job1, job2)
         self.assertEqual(TestJobRunner.get_jobs().count(), 1)
+
+    def test_handle_skips_reschedule_when_successor_exists(self):
+        """
+        When `handle()` finishes a periodic system job, it must not create a duplicate
+        scheduled job if a successor is already enqueued (issue #22232). This guards
+        against the race where a worker starts up between `job.terminate()` and the
+        finally block's reschedule, calling `enqueue_once()` which would create a parallel
+        job.
+        """
+        interval = 60
+
+        # Simulate a successor that was already created by another worker.
+        successor = Job.objects.create(
+            name=TestSystemJobRunner.name,
+            status=JobStatusChoices.STATUS_SCHEDULED,
+            interval=interval,
+            scheduled=self.get_schedule_at(),
+            job_id=uuid.uuid4(),
+        )
+
+        # The just-finished job. `handle()` will run its finally block.
+        finished = Job.objects.create(
+            name=TestSystemJobRunner.name,
+            status=JobStatusChoices.STATUS_COMPLETED,
+            interval=interval,
+            started=timezone.now(),
+            completed=timezone.now(),
+            job_id=uuid.uuid4(),
+        )
+
+        TestSystemJobRunner.handle(finished)
+
+        # Only the original successor should remain enqueued — no duplicate should have
+        # been created.
+        enqueued = Job.objects.filter(
+            name=TestSystemJobRunner.name,
+            status__in=JobStatusChoices.ENQUEUED_STATE_CHOICES,
+            interval=interval,
+        )
+        self.assertEqual(enqueued.count(), 1)
+        self.assertEqual(enqueued.first().pk, successor.pk)
+
+    def test_handle_reschedules_when_only_instance_bound_successor_exists(self):
+        """
+        For a system (object-less) job, an instance-bound job of the same JobRunner class
+        must not be treated as a successor. The system job should still reschedule itself.
+        """
+        interval = 60
+        instance = DataSource.objects.create(name='test-ds', type='local')
+
+        # An instance-bound enqueued job of the same class and interval — must NOT be
+        # treated as a successor of the object-less finished job.
+        Job.objects.create(
+            name=TestSystemJobRunner.name,
+            object=instance,
+            status=JobStatusChoices.STATUS_SCHEDULED,
+            interval=interval,
+            scheduled=self.get_schedule_at(),
+            job_id=uuid.uuid4(),
+        )
+
+        # Object-less finished system job.
+        finished = Job.objects.create(
+            name=TestSystemJobRunner.name,
+            status=JobStatusChoices.STATUS_COMPLETED,
+            interval=interval,
+            started=timezone.now(),
+            completed=timezone.now(),
+            job_id=uuid.uuid4(),
+        )
+
+        TestSystemJobRunner.handle(finished)
+
+        # A new object-less successor should have been scheduled.
+        enqueued = Job.objects.filter(
+            name=TestSystemJobRunner.name,
+            object_id__isnull=True,
+            status__in=JobStatusChoices.ENQUEUED_STATE_CHOICES,
+            interval=interval,
+        )
+        self.assertEqual(enqueued.count(), 1)
+
+    def test_handle_reschedules_non_system_job_independently(self):
+        """
+        Two recurring non-system jobs (e.g. scheduled scripts) for the same runner and
+        object with the same interval but distinct runtime kwargs must each reschedule
+        themselves; one must not be treated as the successor of the other and skipped.
+        """
+        interval = 60
+        instance = DataSource.objects.create(name='test-ds-script', type='local')
+
+        # An unrelated recurring schedule for the same runner/object/interval. Stands in
+        # for a second scheduled-script entry with different `data`.
+        Job.objects.create(
+            name=TestJobRunner.name,
+            object=instance,
+            status=JobStatusChoices.STATUS_SCHEDULED,
+            interval=interval,
+            scheduled=self.get_schedule_at(),
+            job_id=uuid.uuid4(),
+        )
+
+        finished = Job.objects.create(
+            name=TestJobRunner.name,
+            object=instance,
+            status=JobStatusChoices.STATUS_COMPLETED,
+            interval=interval,
+            started=timezone.now(),
+            completed=timezone.now(),
+            job_id=uuid.uuid4(),
+        )
+
+        with patch.object(TestJobRunner, 'run'):
+            TestJobRunner.handle(finished)
+
+        # Both the unrelated schedule and the finished job's successor should be enqueued.
+        enqueued = Job.objects.filter(
+            name=TestJobRunner.name,
+            status__in=JobStatusChoices.ENQUEUED_STATE_CHOICES,
+            interval=interval,
+        )
+        self.assertEqual(enqueued.count(), 2)