Просмотр исходного кода

7848 Add RQ API (#17938)

* 7848 Add Background Tasks (RQ) to API

* 7848 Tasks

* 7848 cleanup

* 7848 add worker support

* 7848 switch to APIView

* 7848 Task detail view

* 7848 Task enqueue, requeue, stop

* 7848 Task enqueue, requeue, stop

* 7848 Task enqueue, requeue, stop

* 7848 tests

* 7848 tests

* 7848 OpenAPI doc generation

* 7848 OpenAPI doc generation

* 7848 review changes

* 7848 viewset

* 7848 viewset

* 7848 fix tests

* 7848 more viewsets

* 7848 fix docstring

* 7848 review comments

* 7848 review comments - get all tasks

* 7848 queue detail view

* 7848 cleanup

* 7848 cleanup

* 7848 cleanup

* 7848 cleanup

* Rename viewsets for consistency w/serializers

* Misc cleanup

* 7848 review changes

* 7848 review changes

* 7848 add test

* 7848 queue detail view

* 7848 fix tests

* 7848 fix the spectacular test failure

* 7848 fix the spectacular test failure

* Misc cleanup

---------

Co-authored-by: Jeremy Stretch <jstretch@netboxlabs.com>
Arthur Hanson 1 год назад
Родитель
Сommit
a24576f126

+ 3 - 0
netbox/core/api/schema.py

@@ -158,6 +158,9 @@ class NetBoxAutoSchema(AutoSchema):
         fields = {} if hasattr(serializer, 'child') else serializer.fields
         remove_fields = []
 
+        # If you get a failure here for "AttributeError: 'cached_property' object has no attribute 'items'"
+        # it is probably because you are using a viewsets.ViewSet for the API View and are defining a
+        # serializer_class. You will also need to define a get_serializer() method like for GenericAPIView.
         for child_name, child in fields.items():
             # read_only fields don't need to be in writable (write only) serializers
             if 'read_only' in dir(child) and child.read_only:

+ 1 - 0
netbox/core/api/serializers.py

@@ -1,3 +1,4 @@
 from .serializers_.change_logging import *
 from .serializers_.data import *
 from .serializers_.jobs import *
+from .serializers_.tasks import *

+ 87 - 0
netbox/core/api/serializers_/tasks.py

@@ -0,0 +1,87 @@
+from rest_framework import serializers
+from rest_framework.reverse import reverse
+
+__all__ = (
+    'BackgroundTaskSerializer',
+    'BackgroundQueueSerializer',
+    'BackgroundWorkerSerializer',
+)
+
+
+class BackgroundTaskSerializer(serializers.Serializer):
+    id = serializers.CharField()
+    url = serializers.HyperlinkedIdentityField(
+        view_name='core-api:rqtask-detail',
+        lookup_field='id',
+        lookup_url_kwarg='pk'
+    )
+    description = serializers.CharField()
+    origin = serializers.CharField()
+    func_name = serializers.CharField()
+    args = serializers.ListField(child=serializers.CharField())
+    kwargs = serializers.DictField()
+    result = serializers.CharField()
+    timeout = serializers.IntegerField()
+    result_ttl = serializers.IntegerField()
+    created_at = serializers.DateTimeField()
+    enqueued_at = serializers.DateTimeField()
+    started_at = serializers.DateTimeField()
+    ended_at = serializers.DateTimeField()
+    worker_name = serializers.CharField()
+    position = serializers.SerializerMethodField()
+    status = serializers.SerializerMethodField()
+    meta = serializers.DictField()
+    last_heartbeat = serializers.CharField()
+
+    is_finished = serializers.BooleanField()
+    is_queued = serializers.BooleanField()
+    is_failed = serializers.BooleanField()
+    is_started = serializers.BooleanField()
+    is_deferred = serializers.BooleanField()
+    is_canceled = serializers.BooleanField()
+    is_scheduled = serializers.BooleanField()
+    is_stopped = serializers.BooleanField()
+
+    def get_position(self, obj) -> int:
+        return obj.get_position()
+
+    def get_status(self, obj) -> str:
+        return obj.get_status()
+
+
+class BackgroundQueueSerializer(serializers.Serializer):
+    name = serializers.CharField()
+    url = serializers.SerializerMethodField()
+    jobs = serializers.IntegerField()
+    oldest_job_timestamp = serializers.CharField()
+    index = serializers.IntegerField()
+    scheduler_pid = serializers.CharField()
+    workers = serializers.IntegerField()
+    finished_jobs = serializers.IntegerField()
+    started_jobs = serializers.IntegerField()
+    deferred_jobs = serializers.IntegerField()
+    failed_jobs = serializers.IntegerField()
+    scheduled_jobs = serializers.IntegerField()
+
+    def get_url(self, obj):
+        return reverse('core-api:rqqueue-detail', args=[obj['name']], request=self.context.get("request"))
+
+
+class BackgroundWorkerSerializer(serializers.Serializer):
+    name = serializers.CharField()
+    url = serializers.HyperlinkedIdentityField(
+        view_name='core-api:rqworker-detail',
+        lookup_field='name'
+    )
+    state = serializers.SerializerMethodField()
+    birth_date = serializers.DateTimeField()
+    queue_names = serializers.ListField(
+        child=serializers.CharField()
+    )
+    pid = serializers.CharField()
+    successful_job_count = serializers.IntegerField()
+    failed_job_count = serializers.IntegerField()
+    total_working_time = serializers.IntegerField()
+
+    def get_state(self, obj):
+        return obj.get_state()

+ 4 - 1
netbox/core/api/urls.py

@@ -1,6 +1,7 @@
 from netbox.api.routers import NetBoxRouter
 from . import views
 
+app_name = 'core-api'
 
 router = NetBoxRouter()
 router.APIRootView = views.CoreRootView
@@ -9,6 +10,8 @@ router.register('data-sources', views.DataSourceViewSet)
 router.register('data-files', views.DataFileViewSet)
 router.register('jobs', views.JobViewSet)
 router.register('object-changes', views.ObjectChangeViewSet)
+router.register('background-queues', views.BackgroundQueueViewSet, basename='rqqueue')
+router.register('background-workers', views.BackgroundWorkerViewSet, basename='rqworker')
+router.register('background-tasks', views.BackgroundTaskViewSet, basename='rqtask')
 
-app_name = 'core-api'
 urlpatterns = router.urls

+ 161 - 0
netbox/core/api/views.py

@@ -1,5 +1,8 @@
+from django.http import Http404, HttpResponse
 from django.shortcuts import get_object_or_404
 from django.utils.translation import gettext_lazy as _
+from drf_spectacular.types import OpenApiTypes
+from drf_spectacular.utils import extend_schema
 from rest_framework.decorators import action
 from rest_framework.exceptions import PermissionDenied
 from rest_framework.response import Response
@@ -10,8 +13,17 @@ from core import filtersets
 from core.choices import DataSourceStatusChoices
 from core.jobs import SyncDataSourceJob
 from core.models import *
+from core.utils import delete_rq_job, enqueue_rq_job, get_rq_jobs, requeue_rq_job, stop_rq_job
+from django_rq.queues import get_redis_connection
+from django_rq.utils import get_statistics
+from django_rq.settings import QUEUES_LIST
 from netbox.api.metadata import ContentTypeMetadata
+from netbox.api.pagination import LimitOffsetListPagination
 from netbox.api.viewsets import NetBoxModelViewSet, NetBoxReadOnlyModelViewSet
+from rest_framework import viewsets
+from rest_framework.permissions import IsAdminUser
+from rq.job import Job as RQ_Job
+from rq.worker import Worker
 from . import serializers
 
 
@@ -71,3 +83,152 @@ class ObjectChangeViewSet(ReadOnlyModelViewSet):
     queryset = ObjectChange.objects.valid_models()
     serializer_class = serializers.ObjectChangeSerializer
     filterset_class = filtersets.ObjectChangeFilterSet
+
+
+class BaseRQViewSet(viewsets.ViewSet):
+    """
+    Base class for RQ view sets. Provides a list() method. Subclasses must implement get_data().
+    """
+    permission_classes = [IsAdminUser]
+    serializer_class = None
+
+    def get_data(self):
+        raise NotImplementedError()
+
+    @extend_schema(responses={200: OpenApiTypes.OBJECT})
+    def list(self, request):
+        data = self.get_data()
+        paginator = LimitOffsetListPagination()
+        data = paginator.paginate_list(data, request)
+
+        serializer = self.serializer_class(data, many=True, context={'request': request})
+        return paginator.get_paginated_response(serializer.data)
+
+    def get_serializer(self, *args, **kwargs):
+        """
+        Return the serializer instance that should be used for validating and
+        deserializing input, and for serializing output.
+        """
+        serializer_class = self.get_serializer_class()
+        kwargs['context'] = self.get_serializer_context()
+        return serializer_class(*args, **kwargs)
+
+
+class BackgroundQueueViewSet(BaseRQViewSet):
+    """
+    Retrieve a list of RQ Queues.
+    Note: Queue names are not URL safe so not returning a detail view.
+    """
+    serializer_class = serializers.BackgroundQueueSerializer
+    lookup_field = 'name'
+    lookup_value_regex = r'[\w.@+-]+'
+
+    def get_view_name(self):
+        return "Background Queues"
+
+    def get_data(self):
+        return get_statistics(run_maintenance_tasks=True)["queues"]
+
+    @extend_schema(responses={200: OpenApiTypes.OBJECT})
+    def retrieve(self, request, name):
+        data = self.get_data()
+        if not data:
+            raise Http404
+
+        for queue in data:
+            if queue['name'] == name:
+                serializer = self.serializer_class(queue, context={'request': request})
+                return Response(serializer.data)
+
+        raise Http404
+
+
+class BackgroundWorkerViewSet(BaseRQViewSet):
+    """
+    Retrieve a list of RQ Workers.
+    """
+    serializer_class = serializers.BackgroundWorkerSerializer
+    lookup_field = 'name'
+
+    def get_view_name(self):
+        return "Background Workers"
+
+    def get_data(self):
+        config = QUEUES_LIST[0]
+        return Worker.all(get_redis_connection(config['connection_config']))
+
+    def retrieve(self, request, name):
+        # all the RQ queues should use the same connection
+        config = QUEUES_LIST[0]
+        workers = Worker.all(get_redis_connection(config['connection_config']))
+        worker = next((item for item in workers if item.name == name), None)
+        if not worker:
+            raise Http404
+
+        serializer = serializers.BackgroundWorkerSerializer(worker, context={'request': request})
+        return Response(serializer.data)
+
+
+class BackgroundTaskViewSet(BaseRQViewSet):
+    """
+    Retrieve a list of RQ Tasks.
+    """
+    serializer_class = serializers.BackgroundTaskSerializer
+
+    def get_view_name(self):
+        return "Background Tasks"
+
+    def get_data(self):
+        return get_rq_jobs()
+
+    def get_task_from_id(self, task_id):
+        config = QUEUES_LIST[0]
+        task = RQ_Job.fetch(task_id, connection=get_redis_connection(config['connection_config']))
+        if not task:
+            raise Http404
+
+        return task
+
+    @extend_schema(responses={200: OpenApiTypes.OBJECT})
+    def retrieve(self, request, pk):
+        """
+        Retrieve the details of the specified RQ Task.
+        """
+        task = self.get_task_from_id(pk)
+        serializer = self.serializer_class(task, context={'request': request})
+        return Response(serializer.data)
+
+    @action(methods=["POST"], detail=True)
+    def delete(self, request, pk):
+        """
+        Delete the specified RQ Task.
+        """
+        delete_rq_job(pk)
+        return HttpResponse(status=200)
+
+    @action(methods=["POST"], detail=True)
+    def requeue(self, request, pk):
+        """
+        Requeues the specified RQ Task.
+        """
+        requeue_rq_job(pk)
+        return HttpResponse(status=200)
+
+    @action(methods=["POST"], detail=True)
+    def enqueue(self, request, pk):
+        """
+        Enqueues the specified RQ Task.
+        """
+        enqueue_rq_job(pk)
+        return HttpResponse(status=200)
+
+    @action(methods=["POST"], detail=True)
+    def stop(self, request, pk):
+        """
+        Stops the specified RQ Task.
+        """
+        stopped_jobs = stop_rq_job(pk)
+        if len(stopped_jobs) == 1:
+            return HttpResponse(status=200)
+        else:
+            return HttpResponse(status=204)

+ 169 - 1
netbox/core/tests/test_api.py

@@ -1,7 +1,14 @@
+import uuid
+
+from django_rq import get_queue
+from django_rq.workers import get_worker
 from django.urls import reverse
 from django.utils import timezone
+from rq.job import Job as RQ_Job, JobStatus
+from rq.registry import FailedJobRegistry, StartedJobRegistry
 
-from utilities.testing import APITestCase, APIViewTestCases
+from users.models import Token, User
+from utilities.testing import APITestCase, APIViewTestCases, TestCase
 from ..models import *
 
 
@@ -91,3 +98,164 @@ class DataFileTest(
             ),
         )
         DataFile.objects.bulk_create(data_files)
+
+
+class BackgroundTaskTestCase(TestCase):
+    user_permissions = ()
+
+    @staticmethod
+    def dummy_job_default():
+        return "Job finished"
+
+    @staticmethod
+    def dummy_job_failing():
+        raise Exception("Job failed")
+
+    def setUp(self):
+        """
+        Create a user and token for API calls.
+        """
+        # Create the test user and assign permissions
+        self.user = User.objects.create_user(username='testuser')
+        self.user.is_staff = True
+        self.user.is_active = True
+        self.user.save()
+        self.token = Token.objects.create(user=self.user)
+        self.header = {'HTTP_AUTHORIZATION': f'Token {self.token.key}'}
+
+        # Clear all queues prior to running each test
+        get_queue('default').connection.flushall()
+        get_queue('high').connection.flushall()
+        get_queue('low').connection.flushall()
+
+    def test_background_queue_list(self):
+        url = reverse('core-api:rqqueue-list')
+
+        # Attempt to load view without permission
+        self.user.is_staff = False
+        self.user.save()
+        response = self.client.get(url, **self.header)
+        self.assertEqual(response.status_code, 403)
+
+        # Load view with permission
+        self.user.is_staff = True
+        self.user.save()
+        response = self.client.get(url, **self.header)
+        self.assertEqual(response.status_code, 200)
+        self.assertIn('default', str(response.content))
+        self.assertIn('high', str(response.content))
+        self.assertIn('low', str(response.content))
+
+    def test_background_queue(self):
+        response = self.client.get(reverse('core-api:rqqueue-detail', args=['default']), **self.header)
+        self.assertEqual(response.status_code, 200)
+        self.assertIn('default', str(response.content))
+        self.assertIn('oldest_job_timestamp', str(response.content))
+        self.assertIn('scheduled_jobs', str(response.content))
+
+    def test_background_task_list(self):
+        queue = get_queue('default')
+        queue.enqueue(self.dummy_job_default)
+
+        response = self.client.get(reverse('core-api:rqtask-list'), **self.header)
+        self.assertEqual(response.status_code, 200)
+        self.assertIn('origin', str(response.content))
+        self.assertIn('core.tests.test_api.BackgroundTaskTestCase.dummy_job_default()', str(response.content))
+
+    def test_background_task(self):
+        queue = get_queue('default')
+        job = queue.enqueue(self.dummy_job_default)
+
+        response = self.client.get(reverse('core-api:rqtask-detail', args=[job.id]), **self.header)
+        self.assertEqual(response.status_code, 200)
+        self.assertIn(str(job.id), str(response.content))
+        self.assertIn('origin', str(response.content))
+        self.assertIn('meta', str(response.content))
+        self.assertIn('kwargs', str(response.content))
+
+    def test_background_task_delete(self):
+        queue = get_queue('default')
+        job = queue.enqueue(self.dummy_job_default)
+
+        response = self.client.post(reverse('core-api:rqtask-delete', args=[job.id]), **self.header)
+        self.assertEqual(response.status_code, 200)
+        self.assertFalse(RQ_Job.exists(job.id, connection=queue.connection))
+        queue = get_queue('default')
+        self.assertNotIn(job.id, queue.job_ids)
+
+    def test_background_task_requeue(self):
+        queue = get_queue('default')
+
+        # Enqueue & run a job that will fail
+        job = queue.enqueue(self.dummy_job_failing)
+        worker = get_worker('default')
+        worker.work(burst=True)
+        self.assertTrue(job.is_failed)
+
+        # Re-enqueue the failed job and check that its status has been reset
+        response = self.client.post(reverse('core-api:rqtask-requeue', args=[job.id]), **self.header)
+        self.assertEqual(response.status_code, 200)
+        job = RQ_Job.fetch(job.id, queue.connection)
+        self.assertFalse(job.is_failed)
+
+    def test_background_task_enqueue(self):
+        queue = get_queue('default')
+
+        # Enqueue some jobs that each depends on its predecessor
+        job = previous_job = None
+        for _ in range(0, 3):
+            job = queue.enqueue(self.dummy_job_default, depends_on=previous_job)
+            previous_job = job
+
+        # Check that the last job to be enqueued has a status of deferred
+        self.assertIsNotNone(job)
+        self.assertEqual(job.get_status(), JobStatus.DEFERRED)
+        self.assertIsNone(job.enqueued_at)
+
+        # Force-enqueue the deferred job
+        response = self.client.post(reverse('core-api:rqtask-enqueue', args=[job.id]), **self.header)
+        self.assertEqual(response.status_code, 200)
+
+        # Check that job's status is updated correctly
+        job = queue.fetch_job(job.id)
+        self.assertEqual(job.get_status(), JobStatus.QUEUED)
+        self.assertIsNotNone(job.enqueued_at)
+
+    def test_background_task_stop(self):
+        queue = get_queue('default')
+
+        worker = get_worker('default')
+        job = queue.enqueue(self.dummy_job_default)
+        worker.prepare_job_execution(job)
+
+        self.assertEqual(job.get_status(), JobStatus.STARTED)
+        response = self.client.post(reverse('core-api:rqtask-stop', args=[job.id]), **self.header)
+        self.assertEqual(response.status_code, 200)
+        worker.monitor_work_horse(job, queue)  # Sets the job as Failed and removes from Started
+        started_job_registry = StartedJobRegistry(queue.name, connection=queue.connection)
+        self.assertEqual(len(started_job_registry), 0)
+
+        canceled_job_registry = FailedJobRegistry(queue.name, connection=queue.connection)
+        self.assertEqual(len(canceled_job_registry), 1)
+        self.assertIn(job.id, canceled_job_registry)
+
+    def test_worker_list(self):
+        worker1 = get_worker('default', name=uuid.uuid4().hex)
+        worker1.register_birth()
+
+        worker2 = get_worker('high')
+        worker2.register_birth()
+
+        response = self.client.get(reverse('core-api:rqworker-list'), **self.header)
+        self.assertEqual(response.status_code, 200)
+        self.assertIn(str(worker1.name), str(response.content))
+
+    def test_worker(self):
+        worker1 = get_worker('default', name=uuid.uuid4().hex)
+        worker1.register_birth()
+
+        response = self.client.get(reverse('core-api:rqworker-detail', args=[worker1.name]), **self.header)
+        self.assertEqual(response.status_code, 200)
+        self.assertIn(str(worker1.name), str(response.content))
+        self.assertIn('birth_date', str(response.content))
+        self.assertIn('total_working_time', str(response.content))

+ 155 - 0
netbox/core/utils.py

@@ -0,0 +1,155 @@
+from django.http import Http404
+from django.utils.translation import gettext_lazy as _
+from django_rq.queues import get_queue, get_queue_by_index, get_redis_connection
+from django_rq.settings import QUEUES_MAP, QUEUES_LIST
+from django_rq.utils import get_jobs, stop_jobs
+from rq import requeue_job
+from rq.exceptions import NoSuchJobError
+from rq.job import Job as RQ_Job, JobStatus as RQJobStatus
+from rq.registry import (
+    DeferredJobRegistry,
+    FailedJobRegistry,
+    FinishedJobRegistry,
+    ScheduledJobRegistry,
+    StartedJobRegistry,
+)
+
+__all__ = (
+    'delete_rq_job',
+    'enqueue_rq_job',
+    'get_rq_jobs',
+    'get_rq_jobs_from_status',
+    'requeue_rq_job',
+    'stop_rq_job',
+)
+
+
+def get_rq_jobs():
+    """
+    Return a list of all RQ jobs.
+    """
+    jobs = set()
+
+    for queue in QUEUES_LIST:
+        queue = get_queue(queue['name'])
+        jobs.update(queue.get_jobs())
+
+    return list(jobs)
+
+
+def get_rq_jobs_from_status(queue, status):
+    """
+    Return the RQ jobs with the given status.
+    """
+    jobs = []
+
+    try:
+        registry_cls = {
+            RQJobStatus.STARTED: StartedJobRegistry,
+            RQJobStatus.DEFERRED: DeferredJobRegistry,
+            RQJobStatus.FINISHED: FinishedJobRegistry,
+            RQJobStatus.FAILED: FailedJobRegistry,
+            RQJobStatus.SCHEDULED: ScheduledJobRegistry,
+        }[status]
+    except KeyError:
+        raise Http404
+    registry = registry_cls(queue.name, queue.connection)
+
+    job_ids = registry.get_job_ids()
+    if status != RQJobStatus.DEFERRED:
+        jobs = get_jobs(queue, job_ids, registry)
+    else:
+        # Deferred jobs require special handling
+        for job_id in job_ids:
+            try:
+                jobs.append(RQ_Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer))
+            except NoSuchJobError:
+                pass
+
+    if jobs and status == RQJobStatus.SCHEDULED:
+        for job in jobs:
+            job.scheduled_at = registry.get_scheduled_time(job)
+
+    return jobs
+
+
+def delete_rq_job(job_id):
+    """
+    Delete the specified RQ job.
+    """
+    config = QUEUES_LIST[0]
+    try:
+        job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
+    except NoSuchJobError:
+        raise Http404(_("Job {job_id} not found").format(job_id=job_id))
+
+    queue_index = QUEUES_MAP[job.origin]
+    queue = get_queue_by_index(queue_index)
+
+    # Remove job id from queue and delete the actual job
+    queue.connection.lrem(queue.key, 0, job.id)
+    job.delete()
+
+
+def requeue_rq_job(job_id):
+    """
+    Requeue the specified RQ job.
+    """
+    config = QUEUES_LIST[0]
+    try:
+        job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
+    except NoSuchJobError:
+        raise Http404(_("Job {id} not found.").format(id=job_id))
+
+    queue_index = QUEUES_MAP[job.origin]
+    queue = get_queue_by_index(queue_index)
+
+    requeue_job(job_id, connection=queue.connection, serializer=queue.serializer)
+
+
+def enqueue_rq_job(job_id):
+    """
+    Enqueue the specified RQ job.
+    """
+    config = QUEUES_LIST[0]
+    try:
+        job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
+    except NoSuchJobError:
+        raise Http404(_("Job {id} not found.").format(id=job_id))
+
+    queue_index = QUEUES_MAP[job.origin]
+    queue = get_queue_by_index(queue_index)
+
+    try:
+        # _enqueue_job is new in RQ 1.14, this is used to enqueue
+        # job regardless of its dependencies
+        queue._enqueue_job(job)
+    except AttributeError:
+        queue.enqueue_job(job)
+
+    # Remove job from correct registry if needed
+    if job.get_status() == RQJobStatus.DEFERRED:
+        registry = DeferredJobRegistry(queue.name, queue.connection)
+        registry.remove(job)
+    elif job.get_status() == RQJobStatus.FINISHED:
+        registry = FinishedJobRegistry(queue.name, queue.connection)
+        registry.remove(job)
+    elif job.get_status() == RQJobStatus.SCHEDULED:
+        registry = ScheduledJobRegistry(queue.name, queue.connection)
+        registry.remove(job)
+
+
+def stop_rq_job(job_id):
+    """
+    Stop the specified RQ job.
+    """
+    config = QUEUES_LIST[0]
+    try:
+        job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
+    except NoSuchJobError:
+        raise Http404(_("Job {job_id} not found").format(job_id=job_id))
+
+    queue_index = QUEUES_MAP[job.origin]
+    queue = get_queue_by_index(queue_index)
+
+    return stop_jobs(queue, job_id)[0]

+ 7 - 97
netbox/core/views.py

@@ -14,16 +14,13 @@ from django.utils.translation import gettext_lazy as _
 from django.views.generic import View
 from django_rq.queues import get_connection, get_queue_by_index, get_redis_connection
 from django_rq.settings import QUEUES_MAP, QUEUES_LIST
-from django_rq.utils import get_jobs, get_statistics, stop_jobs
-from rq import requeue_job
+from django_rq.utils import get_statistics
 from rq.exceptions import NoSuchJobError
 from rq.job import Job as RQ_Job, JobStatus as RQJobStatus
-from rq.registry import (
-    DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, ScheduledJobRegistry, StartedJobRegistry,
-)
 from rq.worker import Worker
 from rq.worker_registration import clean_worker_registry
 
+from core.utils import delete_rq_job, enqueue_rq_job, get_rq_jobs_from_status, requeue_rq_job, stop_rq_job
 from netbox.config import get_config, PARAMS
 from netbox.views import generic
 from netbox.views.generic.base import BaseObjectView
@@ -363,41 +360,12 @@ class BackgroundTaskListView(TableMixin, BaseRQView):
     table = tables.BackgroundTaskTable
 
     def get_table_data(self, request, queue, status):
-        jobs = []
 
         # Call get_jobs() to returned queued tasks
         if status == RQJobStatus.QUEUED:
             return queue.get_jobs()
 
-        # For other statuses, determine the registry to list (or raise a 404 for invalid statuses)
-        try:
-            registry_cls = {
-                RQJobStatus.STARTED: StartedJobRegistry,
-                RQJobStatus.DEFERRED: DeferredJobRegistry,
-                RQJobStatus.FINISHED: FinishedJobRegistry,
-                RQJobStatus.FAILED: FailedJobRegistry,
-                RQJobStatus.SCHEDULED: ScheduledJobRegistry,
-            }[status]
-        except KeyError:
-            raise Http404
-        registry = registry_cls(queue.name, queue.connection)
-
-        job_ids = registry.get_job_ids()
-        if status != RQJobStatus.DEFERRED:
-            jobs = get_jobs(queue, job_ids, registry)
-        else:
-            # Deferred jobs require special handling
-            for job_id in job_ids:
-                try:
-                    jobs.append(RQ_Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer))
-                except NoSuchJobError:
-                    pass
-
-        if jobs and status == RQJobStatus.SCHEDULED:
-            for job in jobs:
-                job.scheduled_at = registry.get_scheduled_time(job)
-
-        return jobs
+        return get_rq_jobs_from_status(queue, status)
 
     def get(self, request, queue_index, status):
         queue = get_queue_by_index(queue_index)
@@ -463,19 +431,7 @@ class BackgroundTaskDeleteView(BaseRQView):
         form = ConfirmationForm(request.POST)
 
         if form.is_valid():
-            # all the RQ queues should use the same connection
-            config = QUEUES_LIST[0]
-            try:
-                job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
-            except NoSuchJobError:
-                raise Http404(_("Job {job_id} not found").format(job_id=job_id))
-
-            queue_index = QUEUES_MAP[job.origin]
-            queue = get_queue_by_index(queue_index)
-
-            # Remove job id from queue and delete the actual job
-            queue.connection.lrem(queue.key, 0, job.id)
-            job.delete()
+            delete_rq_job(job_id)
             messages.success(request, _('Job {id} has been deleted.').format(id=job_id))
         else:
             messages.error(request, _('Error deleting job {id}: {error}').format(id=job_id, error=form.errors[0]))
@@ -486,17 +442,7 @@ class BackgroundTaskDeleteView(BaseRQView):
 class BackgroundTaskRequeueView(BaseRQView):
 
     def get(self, request, job_id):
-        # all the RQ queues should use the same connection
-        config = QUEUES_LIST[0]
-        try:
-            job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
-        except NoSuchJobError:
-            raise Http404(_("Job {id} not found.").format(id=job_id))
-
-        queue_index = QUEUES_MAP[job.origin]
-        queue = get_queue_by_index(queue_index)
-
-        requeue_job(job_id, connection=queue.connection, serializer=queue.serializer)
+        requeue_rq_job(job_id)
         messages.success(request, _('Job {id} has been re-enqueued.').format(id=job_id))
         return redirect(reverse('core:background_task', args=[job_id]))
 
@@ -505,33 +451,7 @@ class BackgroundTaskEnqueueView(BaseRQView):
 
     def get(self, request, job_id):
         # all the RQ queues should use the same connection
-        config = QUEUES_LIST[0]
-        try:
-            job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
-        except NoSuchJobError:
-            raise Http404(_("Job {id} not found.").format(id=job_id))
-
-        queue_index = QUEUES_MAP[job.origin]
-        queue = get_queue_by_index(queue_index)
-
-        try:
-            # _enqueue_job is new in RQ 1.14, this is used to enqueue
-            # job regardless of its dependencies
-            queue._enqueue_job(job)
-        except AttributeError:
-            queue.enqueue_job(job)
-
-        # Remove job from correct registry if needed
-        if job.get_status() == RQJobStatus.DEFERRED:
-            registry = DeferredJobRegistry(queue.name, queue.connection)
-            registry.remove(job)
-        elif job.get_status() == RQJobStatus.FINISHED:
-            registry = FinishedJobRegistry(queue.name, queue.connection)
-            registry.remove(job)
-        elif job.get_status() == RQJobStatus.SCHEDULED:
-            registry = ScheduledJobRegistry(queue.name, queue.connection)
-            registry.remove(job)
-
+        enqueue_rq_job(job_id)
         messages.success(request, _('Job {id} has been enqueued.').format(id=job_id))
         return redirect(reverse('core:background_task', args=[job_id]))
 
@@ -539,17 +459,7 @@ class BackgroundTaskEnqueueView(BaseRQView):
 class BackgroundTaskStopView(BaseRQView):
 
     def get(self, request, job_id):
-        # all the RQ queues should use the same connection
-        config = QUEUES_LIST[0]
-        try:
-            job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
-        except NoSuchJobError:
-            raise Http404(_("Job {job_id} not found").format(job_id=job_id))
-
-        queue_index = QUEUES_MAP[job.origin]
-        queue = get_queue_by_index(queue_index)
-
-        stopped_jobs = stop_jobs(queue, job_id)[0]
+        stopped_jobs = stop_rq_job(job_id)
         if len(stopped_jobs) == 1:
             messages.success(request, _('Job {id} has been stopped.').format(id=job_id))
         else:

+ 25 - 0
netbox/netbox/api/pagination.py

@@ -83,3 +83,28 @@ class StripCountAnnotationsPaginator(OptionalLimitOffsetPagination):
         cloned_queryset.query.annotations.clear()
 
         return cloned_queryset.count()
+
+
+class LimitOffsetListPagination(LimitOffsetPagination):
+    """
+    DRF LimitOffset Paginator but for list instead of queryset
+    """
+    count = 0
+    offset = 0
+
+    def paginate_list(self, data, request, view=None):
+        self.request = request
+        self.limit = self.get_limit(request)
+        self.count = len(data)
+        self.offset = self.get_offset(request)
+
+        if self.limit is None:
+            self.limit = self.count
+
+        if self.count == 0 or self.offset > self.count:
+            return []
+
+        if self.count > self.limit and self.template is not None:
+            self.display_page_controls = True
+
+        return data[self.offset:self.offset + self.limit]