|
|
@@ -3,13 +3,28 @@ from django.conf import settings
|
|
|
from django.contrib import messages
|
|
|
from django.contrib.auth.mixins import UserPassesTestMixin
|
|
|
from django.core.cache import cache
|
|
|
-from django.http import HttpResponseForbidden
|
|
|
+from django.http import HttpResponseForbidden, Http404
|
|
|
from django.shortcuts import get_object_or_404, redirect, render
|
|
|
+from django.urls import reverse
|
|
|
+from django.utils.translation import gettext_lazy as _
|
|
|
from django.views.generic import View
|
|
|
+from django_rq.queues import get_queue_by_index, get_redis_connection
|
|
|
+from django_rq.settings import QUEUES_MAP, QUEUES_LIST
|
|
|
+from django_rq.utils import get_jobs, get_statistics, stop_jobs
|
|
|
+from rq import requeue_job
|
|
|
+from rq.exceptions import NoSuchJobError
|
|
|
+from rq.job import Job as RQ_Job, JobStatus as RQJobStatus
|
|
|
+from rq.registry import (
|
|
|
+ DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, ScheduledJobRegistry, StartedJobRegistry,
|
|
|
+)
|
|
|
+from rq.worker import Worker
|
|
|
+from rq.worker_registration import clean_worker_registry
|
|
|
|
|
|
from netbox.config import get_config, PARAMS
|
|
|
from netbox.views import generic
|
|
|
from netbox.views.generic.base import BaseObjectView
|
|
|
+from netbox.views.generic.mixins import TableMixin
|
|
|
+from utilities.forms import ConfirmationForm
|
|
|
from utilities.utils import count_related
|
|
|
from utilities.views import ContentTypePermissionRequiredMixin, register_model_view
|
|
|
from . import filtersets, forms, tables
|
|
|
@@ -237,6 +252,276 @@ class ConfigRevisionRestoreView(ContentTypePermissionRequiredMixin, View):
|
|
|
return redirect(candidate_config.get_absolute_url())
|
|
|
|
|
|
|
|
|
+#
|
|
|
+# Background Tasks (RQ)
|
|
|
+#
|
|
|
+
|
|
|
+class BaseRQView(UserPassesTestMixin, View):
|
|
|
+
|
|
|
+ def test_func(self):
|
|
|
+ return self.request.user.is_staff
|
|
|
+
|
|
|
+
|
|
|
+class BackgroundQueueListView(TableMixin, BaseRQView):
|
|
|
+ table = tables.BackgroundQueueTable
|
|
|
+
|
|
|
+ def get(self, request):
|
|
|
+ data = get_statistics(run_maintenance_tasks=True)["queues"]
|
|
|
+ table = self.get_table(data, request, bulk_actions=False)
|
|
|
+
|
|
|
+ return render(request, 'core/rq_queue_list.html', {
|
|
|
+ 'table': table,
|
|
|
+ })
|
|
|
+
|
|
|
+
|
|
|
+class BackgroundTaskListView(TableMixin, BaseRQView):
|
|
|
+ table = tables.BackgroundTaskTable
|
|
|
+
|
|
|
+ def get_table_data(self, request, queue, status):
|
|
|
+ jobs = []
|
|
|
+
|
|
|
+ # Call get_jobs() to returned queued tasks
|
|
|
+ if status == RQJobStatus.QUEUED:
|
|
|
+ return queue.get_jobs()
|
|
|
+
|
|
|
+ # For other statuses, determine the registry to list (or raise a 404 for invalid statuses)
|
|
|
+ try:
|
|
|
+ registry_cls = {
|
|
|
+ RQJobStatus.STARTED: StartedJobRegistry,
|
|
|
+ RQJobStatus.DEFERRED: DeferredJobRegistry,
|
|
|
+ RQJobStatus.FINISHED: FinishedJobRegistry,
|
|
|
+ RQJobStatus.FAILED: FailedJobRegistry,
|
|
|
+ RQJobStatus.SCHEDULED: ScheduledJobRegistry,
|
|
|
+ }[status]
|
|
|
+ except KeyError:
|
|
|
+ raise Http404
|
|
|
+ registry = registry_cls(queue.name, queue.connection)
|
|
|
+
|
|
|
+ job_ids = registry.get_job_ids()
|
|
|
+ if status != RQJobStatus.DEFERRED:
|
|
|
+ jobs = get_jobs(queue, job_ids, registry)
|
|
|
+ else:
|
|
|
+ # Deferred jobs require special handling
|
|
|
+ for job_id in job_ids:
|
|
|
+ try:
|
|
|
+ jobs.append(RQ_Job.fetch(job_id, connection=queue.connection, serializer=queue.serializer))
|
|
|
+ except NoSuchJobError:
|
|
|
+ pass
|
|
|
+
|
|
|
+ if jobs and status == RQJobStatus.SCHEDULED:
|
|
|
+ for job in jobs:
|
|
|
+ job.scheduled_at = registry.get_scheduled_time(job)
|
|
|
+
|
|
|
+ return jobs
|
|
|
+
|
|
|
+ def get(self, request, queue_index, status):
|
|
|
+ queue = get_queue_by_index(queue_index)
|
|
|
+ data = self.get_table_data(request, queue, status)
|
|
|
+ table = self.get_table(data, request, False)
|
|
|
+
|
|
|
+ # If this is an HTMX request, return only the rendered table HTML
|
|
|
+ if request.htmx:
|
|
|
+ return render(request, 'htmx/table.html', {
|
|
|
+ 'table': table,
|
|
|
+ })
|
|
|
+
|
|
|
+ return render(request, 'core/rq_task_list.html', {
|
|
|
+ 'table': table,
|
|
|
+ 'queue': queue,
|
|
|
+ 'status': status,
|
|
|
+ })
|
|
|
+
|
|
|
+
|
|
|
+class BackgroundTaskView(BaseRQView):
|
|
|
+
|
|
|
+ def get(self, request, job_id):
|
|
|
+ # all the RQ queues should use the same connection
|
|
|
+ config = QUEUES_LIST[0]
|
|
|
+ try:
|
|
|
+ job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
|
|
|
+ except NoSuchJobError:
|
|
|
+ raise Http404(_("Job {job_id} not found").format(job_id=job_id))
|
|
|
+
|
|
|
+ queue_index = QUEUES_MAP[job.origin]
|
|
|
+ queue = get_queue_by_index(queue_index)
|
|
|
+
|
|
|
+ try:
|
|
|
+ exc_info = job._exc_info
|
|
|
+ except AttributeError:
|
|
|
+ exc_info = None
|
|
|
+
|
|
|
+ return render(request, 'core/rq_task.html', {
|
|
|
+ 'queue': queue,
|
|
|
+ 'job': job,
|
|
|
+ 'queue_index': queue_index,
|
|
|
+ 'dependency_id': job._dependency_id,
|
|
|
+ 'exc_info': exc_info,
|
|
|
+ })
|
|
|
+
|
|
|
+
|
|
|
+class BackgroundTaskDeleteView(BaseRQView):
|
|
|
+
|
|
|
+ def get(self, request, job_id):
|
|
|
+ if not request.htmx:
|
|
|
+ return redirect(reverse('core:background_queue_list'))
|
|
|
+
|
|
|
+ form = ConfirmationForm(initial=request.GET)
|
|
|
+
|
|
|
+ return render(request, 'htmx/delete_form.html', {
|
|
|
+ 'object_type': 'background task',
|
|
|
+ 'object': job_id,
|
|
|
+ 'form': form,
|
|
|
+ 'form_url': reverse('core:background_task_delete', kwargs={'job_id': job_id})
|
|
|
+ })
|
|
|
+
|
|
|
+ def post(self, request, job_id):
|
|
|
+ form = ConfirmationForm(request.POST)
|
|
|
+
|
|
|
+ if form.is_valid():
|
|
|
+ # all the RQ queues should use the same connection
|
|
|
+ config = QUEUES_LIST[0]
|
|
|
+ try:
|
|
|
+ job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
|
|
|
+ except NoSuchJobError:
|
|
|
+ raise Http404(_("Job {job_id} not found").format(job_id=job_id))
|
|
|
+
|
|
|
+ queue_index = QUEUES_MAP[job.origin]
|
|
|
+ queue = get_queue_by_index(queue_index)
|
|
|
+
|
|
|
+ # Remove job id from queue and delete the actual job
|
|
|
+ queue.connection.lrem(queue.key, 0, job.id)
|
|
|
+ job.delete()
|
|
|
+ messages.success(request, f'Deleted job {job_id}')
|
|
|
+ else:
|
|
|
+ messages.error(request, f'Error deleting job: {form.errors[0]}')
|
|
|
+
|
|
|
+ return redirect(reverse('core:background_queue_list'))
|
|
|
+
|
|
|
+
|
|
|
+class BackgroundTaskRequeueView(BaseRQView):
|
|
|
+
|
|
|
+ def get(self, request, job_id):
|
|
|
+ # all the RQ queues should use the same connection
|
|
|
+ config = QUEUES_LIST[0]
|
|
|
+ try:
|
|
|
+ job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
|
|
|
+ except NoSuchJobError:
|
|
|
+ raise Http404(_("Job {job_id} not found").format(job_id=job_id))
|
|
|
+
|
|
|
+ queue_index = QUEUES_MAP[job.origin]
|
|
|
+ queue = get_queue_by_index(queue_index)
|
|
|
+
|
|
|
+ requeue_job(job_id, connection=queue.connection, serializer=queue.serializer)
|
|
|
+ messages.success(request, f'You have successfully requeued: {job_id}')
|
|
|
+ return redirect(reverse('core:background_task', args=[job_id]))
|
|
|
+
|
|
|
+
|
|
|
+class BackgroundTaskEnqueueView(BaseRQView):
|
|
|
+
|
|
|
+ def get(self, request, job_id):
|
|
|
+ # all the RQ queues should use the same connection
|
|
|
+ config = QUEUES_LIST[0]
|
|
|
+ try:
|
|
|
+ job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
|
|
|
+ except NoSuchJobError:
|
|
|
+ raise Http404(_("Job {job_id} not found").format(job_id=job_id))
|
|
|
+
|
|
|
+ queue_index = QUEUES_MAP[job.origin]
|
|
|
+ queue = get_queue_by_index(queue_index)
|
|
|
+
|
|
|
+ try:
|
|
|
+ # _enqueue_job is new in RQ 1.14, this is used to enqueue
|
|
|
+ # job regardless of its dependencies
|
|
|
+ queue._enqueue_job(job)
|
|
|
+ except AttributeError:
|
|
|
+ queue.enqueue_job(job)
|
|
|
+
|
|
|
+ # Remove job from correct registry if needed
|
|
|
+ if job.get_status() == RQJobStatus.DEFERRED:
|
|
|
+ registry = DeferredJobRegistry(queue.name, queue.connection)
|
|
|
+ registry.remove(job)
|
|
|
+ elif job.get_status() == RQJobStatus.FINISHED:
|
|
|
+ registry = FinishedJobRegistry(queue.name, queue.connection)
|
|
|
+ registry.remove(job)
|
|
|
+ elif job.get_status() == RQJobStatus.SCHEDULED:
|
|
|
+ registry = ScheduledJobRegistry(queue.name, queue.connection)
|
|
|
+ registry.remove(job)
|
|
|
+
|
|
|
+ messages.success(request, f'You have successfully enqueued: {job_id}')
|
|
|
+ return redirect(reverse('core:background_task', args=[job_id]))
|
|
|
+
|
|
|
+
|
|
|
+class BackgroundTaskStopView(BaseRQView):
|
|
|
+
|
|
|
+ def get(self, request, job_id):
|
|
|
+ # all the RQ queues should use the same connection
|
|
|
+ config = QUEUES_LIST[0]
|
|
|
+ try:
|
|
|
+ job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
|
|
|
+ except NoSuchJobError:
|
|
|
+ raise Http404(_("Job {job_id} not found").format(job_id=job_id))
|
|
|
+
|
|
|
+ queue_index = QUEUES_MAP[job.origin]
|
|
|
+ queue = get_queue_by_index(queue_index)
|
|
|
+
|
|
|
+ stopped, _ = stop_jobs(queue, job_id)
|
|
|
+ if len(stopped) == 1:
|
|
|
+ messages.success(request, f'You have successfully stopped {job_id}')
|
|
|
+ else:
|
|
|
+ messages.error(request, f'Failed to stop {job_id}')
|
|
|
+
|
|
|
+ return redirect(reverse('core:background_task', args=[job_id]))
|
|
|
+
|
|
|
+
|
|
|
+class WorkerListView(TableMixin, BaseRQView):
|
|
|
+ table = tables.WorkerTable
|
|
|
+
|
|
|
+ def get_table_data(self, request, queue):
|
|
|
+ clean_worker_registry(queue)
|
|
|
+ all_workers = Worker.all(queue.connection)
|
|
|
+ workers = [worker for worker in all_workers if queue.name in worker.queue_names()]
|
|
|
+ return workers
|
|
|
+
|
|
|
+ def get(self, request, queue_index):
|
|
|
+ queue = get_queue_by_index(queue_index)
|
|
|
+ data = self.get_table_data(request, queue)
|
|
|
+
|
|
|
+ table = self.get_table(data, request, False)
|
|
|
+
|
|
|
+ # If this is an HTMX request, return only the rendered table HTML
|
|
|
+ if request.htmx:
|
|
|
+ if request.htmx.target != 'object_list':
|
|
|
+ table.embedded = True
|
|
|
+ # Hide selection checkboxes
|
|
|
+ if 'pk' in table.base_columns:
|
|
|
+ table.columns.hide('pk')
|
|
|
+ return render(request, 'htmx/table.html', {
|
|
|
+ 'table': table,
|
|
|
+ 'queue': queue,
|
|
|
+ })
|
|
|
+
|
|
|
+ return render(request, 'core/rq_worker_list.html', {
|
|
|
+ 'table': table,
|
|
|
+ 'queue': queue,
|
|
|
+ })
|
|
|
+
|
|
|
+
|
|
|
+class WorkerView(BaseRQView):
|
|
|
+
|
|
|
+ def get(self, request, key):
|
|
|
+ # all the RQ queues should use the same connection
|
|
|
+ config = QUEUES_LIST[0]
|
|
|
+ worker = Worker.find_by_key('rq:worker:' + key, connection=get_redis_connection(config['connection_config']))
|
|
|
+ # Convert microseconds to milliseconds
|
|
|
+ worker.total_working_time = worker.total_working_time / 1000
|
|
|
+
|
|
|
+ return render(request, 'core/rq_worker.html', {
|
|
|
+ 'worker': worker,
|
|
|
+ 'job': worker.get_current_job(),
|
|
|
+ 'total_working_time': worker.total_working_time * 1000,
|
|
|
+ })
|
|
|
+
|
|
|
+
|
|
|
#
|
|
|
# Plugins
|
|
|
#
|