소스 검색

Closes #19589: Background job for bulk operations (#19804)

* Initial work on #19589

* Add tooling for handling background requests

* UI notification should link to enqueued job

* Use an informative name for the job

* Disable background jobs for file uploads
Jeremy Stretch 7 달 전
부모
커밋
875a641687

+ 1 - 1
netbox/core/models/jobs.py

@@ -116,7 +116,7 @@ class Job(models.Model):
         verbose_name_plural = _('jobs')
 
     def __str__(self):
-        return str(self.job_id)
+        return self.name
 
     def get_absolute_url(self):
         # TODO: Employ dynamic registration

+ 36 - 0
netbox/netbox/jobs.py

@@ -8,11 +8,15 @@ from django_pglocks import advisory_lock
 from rq.timeouts import JobTimeoutException
 
 from core.choices import JobStatusChoices
+from core.events import JOB_COMPLETED, JOB_FAILED
 from core.models import Job, ObjectType
+from extras.models import Notification
 from netbox.constants import ADVISORY_LOCK_KEYS
 from netbox.registry import registry
+from utilities.request import apply_request_processors
 
 __all__ = (
+    'AsyncViewJob',
     'JobRunner',
     'system_job',
 )
@@ -154,3 +158,35 @@ class JobRunner(ABC):
             job.delete()
 
         return cls.enqueue(instance=instance, schedule_at=schedule_at, interval=interval, *args, **kwargs)
+
+
+class AsyncViewJob(JobRunner):
+    """
+    Execute a view as a background job.
+    """
+    class Meta:
+        name = 'Async View'
+
+    def run(self, view_cls, request, **kwargs):
+        view = view_cls.as_view()
+
+        # Apply all registered request processors (e.g. event_tracking)
+        with apply_request_processors(request):
+            data = view(request)
+
+        self.job.data = {
+            'log': data.log,
+            'errors': data.errors,
+        }
+
+        # Notify the user
+        notification = Notification(
+            user=request.user,
+            object=self.job,
+            event_type=JOB_COMPLETED if not data.errors else JOB_FAILED,
+        )
+        notification.save()
+
+        # TODO: Waiting on fix for bug #19806
+        # if errors:
+        #     raise JobFailed()

+ 2 - 10
netbox/netbox/middleware.py

@@ -1,8 +1,5 @@
-from contextlib import ExitStack
-
 import logging
 import uuid
-import warnings
 
 from django.conf import settings
 from django.contrib import auth, messages
@@ -13,10 +10,10 @@ from django.db.utils import InternalError
 from django.http import Http404, HttpResponseRedirect
 
 from netbox.config import clear_config, get_config
-from netbox.registry import registry
 from netbox.views import handler_500
 from utilities.api import is_api_request
 from utilities.error_handlers import handle_rest_api_exception
+from utilities.request import apply_request_processors
 
 __all__ = (
     'CoreMiddleware',
@@ -36,12 +33,7 @@ class CoreMiddleware:
         request.id = uuid.uuid4()
 
         # Apply all registered request processors
-        with ExitStack() as stack:
-            for request_processor in registry['request_processors']:
-                try:
-                    stack.enter_context(request_processor(request))
-                except Exception as e:
-                    warnings.warn(f'Failed to initialize request processor {request_processor}: {e}')
+        with apply_request_processors(request):
             response = self.get_response(request)
 
         # Check if language cookie should be renewed

+ 38 - 11
netbox/netbox/views/generic/bulk_views.py

@@ -28,6 +28,7 @@ from utilities.export import TableExport
 from utilities.forms import BulkRenameForm, ConfirmationForm, restrict_form_fields
 from utilities.forms.bulk_import import BulkImportForm
 from utilities.htmx import htmx_partial
+from utilities.jobs import AsyncJobData, is_background_request, process_request_as_job
 from utilities.permissions import get_permission_for_model
 from utilities.query import reapply_model_ordering
 from utilities.request import safe_for_redirect
@@ -503,25 +504,32 @@ class BulkImportView(GetReturnURLMixin, BaseMultiObjectView):
 
         if form.is_valid():
             logger.debug("Import form validation was successful")
+            redirect_url = reverse(get_viewname(model, action='list'))
+            new_objects = []
+
+            # If indicated, defer this request to a background job & redirect the user
+            if form.cleaned_data['background_job']:
+                job_name = _('Bulk import {count} {object_type}').format(
+                    count=len(form.cleaned_data['data']),
+                    object_type=model._meta.verbose_name_plural,
+                )
+                if job := process_request_as_job(self.__class__, request, name=job_name):
+                    msg = _('Created background job {job.pk}: <a href="{url}">{job.name}</a>').format(
+                        url=job.get_absolute_url(),
+                        job=job
+                    )
+                    messages.info(request, mark_safe(msg))
+                    return redirect(redirect_url)
 
             try:
                 # Iterate through data and bind each record to a new model form instance.
                 with transaction.atomic(using=router.db_for_write(model)):
-                    new_objs = self.create_and_update_objects(form, request)
+                    new_objects = self.create_and_update_objects(form, request)
 
                     # Enforce object-level permissions
-                    if self.queryset.filter(pk__in=[obj.pk for obj in new_objs]).count() != len(new_objs):
+                    if self.queryset.filter(pk__in=[obj.pk for obj in new_objects]).count() != len(new_objects):
                         raise PermissionsViolation
 
-                if new_objs:
-                    msg = f"Imported {len(new_objs)} {model._meta.verbose_name_plural}"
-                    logger.info(msg)
-                    messages.success(request, msg)
-
-                    view_name = get_viewname(model, action='list')
-                    results_url = f"{reverse(view_name)}?modified_by_request={request.id}"
-                    return redirect(results_url)
-
             except (AbortTransaction, ValidationError):
                 clear_events.send(sender=self)
 
@@ -530,6 +538,25 @@ class BulkImportView(GetReturnURLMixin, BaseMultiObjectView):
                 form.add_error(None, e.message)
                 clear_events.send(sender=self)
 
+            # If this request was executed via a background job, return the raw data for logging
+            if is_background_request(request):
+                return AsyncJobData(
+                    log=[
+                        _('Created {object}').format(object=str(obj))
+                        for obj in new_objects
+                    ],
+                    errors=form.errors
+                )
+
+            if new_objects:
+                msg = _("Imported {count} {object_type}").format(
+                    count=len(new_objects),
+                    object_type=model._meta.verbose_name_plural
+                )
+                logger.info(msg)
+                messages.success(request, msg)
+                return redirect(f"{redirect_url}?modified_by_request={request.id}")
+
         else:
             logger.debug("Form validation failed")
 

+ 2 - 0
netbox/templates/generic/bulk_import.html

@@ -50,6 +50,7 @@ Context:
           {% render_field form.data %}
           {% render_field form.format %}
           {% render_field form.csv_delimiter %}
+          {% render_field form.background_job %}
           <div class="form-group">
             <div class="col col-md-12 text-end">
               {% if return_url %}
@@ -94,6 +95,7 @@ Context:
         {% render_field form.data_file %}
         {% render_field form.format %}
         {% render_field form.csv_delimiter %}
+        {% render_field form.background_job %}
         <div class="form-group">
           <div class="col col-md-12 text-end">
             {% if return_url %}

+ 5 - 0
netbox/utilities/forms/bulk_import.py

@@ -37,6 +37,11 @@ class BulkImportForm(SyncedDataMixin, forms.Form):
         help_text=_("The character which delimits CSV fields. Applies only to CSV format."),
         required=False
     )
+    background_job = forms.BooleanField(
+        label=_('Background job'),
+        help_text=_("Enqueue a background job to complete the bulk import/update."),
+        required=False,
+    )
 
     data_field = 'data'
 

+ 46 - 0
netbox/utilities/jobs.py

@@ -0,0 +1,46 @@
+from dataclasses import dataclass
+from typing import List
+
+from netbox.jobs import AsyncViewJob
+from utilities.request import copy_safe_request
+
+__all__ = (
+    'AsyncJobData',
+    'is_background_request',
+    'process_request_as_job',
+)
+
+
+@dataclass
+class AsyncJobData:
+    log: List[str]
+    errors: List[str]
+
+
+def is_background_request(request):
+    """
+    Return True if the request is being processed as a background job.
+    """
+    return getattr(request, '_background', False)
+
+
+def process_request_as_job(view, request, name=None):
+    """
+    Process a request using a view as a background job.
+    """
+
+    # Check that the request that is not already being processed as a background job (would be a loop)
+    if is_background_request(request):
+        return
+
+    # Create a serializable copy of the original request
+    request_copy = copy_safe_request(request)
+    request_copy._background = True
+
+    # Enqueue a job to perform the work in the background
+    return AsyncViewJob.enqueue(
+        name=name,
+        user=request.user,
+        view_cls=view,
+        request=request_copy,
+    )

+ 19 - 0
netbox/utilities/request.py

@@ -1,13 +1,17 @@
+import warnings
+from contextlib import ExitStack, contextmanager
 from urllib.parse import urlparse
 
 from django.utils.http import url_has_allowed_host_and_scheme
 from django.utils.translation import gettext_lazy as _
 from netaddr import AddrFormatError, IPAddress
 
+from netbox.registry import registry
 from .constants import HTTP_REQUEST_META_SAFE_COPY
 
 __all__ = (
     'NetBoxFakeRequest',
+    'apply_request_processors',
     'copy_safe_request',
     'get_client_ip',
     'safe_for_redirect',
@@ -48,6 +52,7 @@ def copy_safe_request(request):
         'GET': request.GET,
         'FILES': request.FILES,
         'user': request.user,
+        'method': request.method,
         'path': request.path,
         'id': getattr(request, 'id', None),  # UUID assigned by middleware
     })
@@ -87,3 +92,17 @@ def safe_for_redirect(url):
     Returns True if the given URL is safe to use as an HTTP redirect; otherwise returns False.
     """
     return url_has_allowed_host_and_scheme(url, allowed_hosts=None)
+
+
+@contextmanager
+def apply_request_processors(request):
+    """
+    A context manager with applies all registered request processors (such as event_tracking).
+    """
+    with ExitStack() as stack:
+        for request_processor in registry['request_processors']:
+            try:
+                stack.enter_context(request_processor(request))
+            except Exception as e:
+                warnings.warn(f'Failed to initialize request processor {request_processor.__name__}: {e}')
+        yield