|
|
@@ -13,6 +13,7 @@ from netbox.api.serializers import BulkOperationSerializer
|
|
|
from netbox.api.serializers.bulk import get_bulk_update_serializer_class
|
|
|
from netbox.jobs import AsyncAPIJob
|
|
|
from utilities.exceptions import RQWorkerNotRunningException
|
|
|
+from utilities.request import copy_safe_request
|
|
|
from utilities.rqworker import any_workers_for_queue
|
|
|
|
|
|
__all__ = (
|
|
|
@@ -29,10 +30,10 @@ __all__ = (
|
|
|
class BackgroundOperationMixin:
|
|
|
"""
|
|
|
Enable optional background processing of REST API bulk write operations. When a write
|
|
|
- request to a list endpoint includes ``?background=true``, the bulk action validates the
|
|
|
- payload synchronously, enqueues an ``AsyncAPIJob`` to perform the work, and immediately
|
|
|
- returns ``202 Accepted`` with the job's ID and polling URL. The actual write runs in a
|
|
|
- worker via the same action method, so behavior is identical to the synchronous path.
|
|
|
+ request to a list endpoint includes ``?background=true``, the bulk action enqueues an
|
|
|
+ ``AsyncAPIJob`` to perform the work and immediately returns ``202 Accepted`` with the
|
|
|
+ job's ID and polling URL. The actual write (including validation) runs in a worker via
|
|
|
+ the same action method, so behavior is identical to the synchronous path.
|
|
|
|
|
|
This mixin overrides no framework methods; the bulk action methods call its helpers.
|
|
|
"""
|
|
|
@@ -43,22 +44,19 @@ class BackgroundOperationMixin:
|
|
|
return False
|
|
|
return request.query_params.get('background', '').lower() == 'true'
|
|
|
|
|
|
- def _maybe_background_bulk_create(self, request):
|
|
|
+ def _handle_background_request(self, request, action, action_kwargs=None):
|
|
|
"""
|
|
|
- Shared entry point for the create() overrides. If background processing was requested
|
|
|
- for a bulk (list) create, validate the payload synchronously and return a 202 Response;
|
|
|
- otherwise return None so the caller proceeds with synchronous creation.
|
|
|
+ Shared entry point for the bulk write actions. If background processing was requested
|
|
|
+ for a bulk (list) operation, enqueue an AsyncAPIJob and return a 202 Response; otherwise
|
|
|
+ return None so the caller proceeds synchronously.
|
|
|
+
|
|
|
+ Validation is intentionally deferred to the worker (which runs the same action method),
|
|
|
+ so it is not performed twice and the request returns promptly regardless of batch size.
|
|
|
"""
|
|
|
if not (isinstance(request.data, list) and self._background_requested(request)):
|
|
|
return None
|
|
|
|
|
|
- # Validate synchronously before enqueuing so a malformed payload is rejected with a
|
|
|
- # 400 now, rather than producing a 202 for work that can never succeed. (Constraints
|
|
|
- # that depend on other items in the batch are still evaluated when the job runs.)
|
|
|
- serializer = self.get_serializer(data=request.data, many=True)
|
|
|
- serializer.is_valid(raise_exception=True)
|
|
|
-
|
|
|
- return self._enqueue_bulk_job(request, 'create', payload=list(request.data))
|
|
|
+ return self._enqueue_bulk_job(request, action, payload=list(request.data), action_kwargs=action_kwargs)
|
|
|
|
|
|
def _enqueue_bulk_job(self, request, action, payload, action_kwargs=None):
|
|
|
"""
|
|
|
@@ -85,6 +83,16 @@ class BackgroundOperationMixin:
|
|
|
verb=verb,
|
|
|
object_type=model._meta.verbose_name_plural,
|
|
|
)
|
|
|
+ # Carry a serializable snapshot of the request so the worker can reconstruct it (method,
|
|
|
+ # request ID, and host metadata for absolute URLs in the captured result). The scheme is
|
|
|
+ # passed separately, as copy_safe_request() does not capture it. The worker re-fetches the
|
|
|
+ # user by PK and bypasses authentication entirely, so it reads neither the copied user nor
|
|
|
+ # cookies; drop both so no User instance or session data is pickled into the job payload
|
|
|
+ # for the lifetime of the job.
|
|
|
+ request_copy = copy_safe_request(request, include_files=False)
|
|
|
+ request_copy.user = None
|
|
|
+ request_copy.COOKIES = {}
|
|
|
+
|
|
|
job = AsyncAPIJob.enqueue(
|
|
|
name=job_name,
|
|
|
user=request.user,
|
|
|
@@ -92,13 +100,9 @@ class BackgroundOperationMixin:
|
|
|
action=action,
|
|
|
payload=payload,
|
|
|
user_pk=request.user.pk,
|
|
|
- request_id=str(getattr(request, 'id', '')),
|
|
|
- method=request.method,
|
|
|
action_kwargs=action_kwargs or {},
|
|
|
- # Carry the request's scheme/host so URLs in the captured result are absolute
|
|
|
- # and followable (the worker has no real request to derive them from).
|
|
|
+ request=request_copy,
|
|
|
scheme=request.scheme,
|
|
|
- host=request.get_host(),
|
|
|
)
|
|
|
|
|
|
job_url = reverse('core-api:job-detail', kwargs={'pk': job.pk}, request=request)
|
|
|
@@ -152,11 +156,12 @@ class SequentialBulkCreatesMixin:
|
|
|
appropriately.
|
|
|
"""
|
|
|
def create(self, request, *args, **kwargs):
|
|
|
- # If background processing was requested for a bulk (list) create, validate and enqueue.
|
|
|
- # _maybe_background_bulk_create() comes from BackgroundOperationMixin; fall back to "no
|
|
|
- # background" so this mixin remains usable on its own (e.g. in custom viewsets).
|
|
|
- maybe_background = getattr(self, '_maybe_background_bulk_create', lambda request: None)
|
|
|
- if (response := maybe_background(request)) is not None:
|
|
|
+ # If background processing was requested for a bulk (list) create, enqueue a job and
|
|
|
+ # return immediately. _handle_background_request() comes from BackgroundOperationMixin;
|
|
|
+ # fall back to "no background" so this mixin remains usable on its own (e.g. in custom
|
|
|
+ # viewsets).
|
|
|
+ handle_background = getattr(self, '_handle_background_request', lambda *a, **kw: None)
|
|
|
+ if (response := handle_background(request, 'create')) is not None:
|
|
|
return response
|
|
|
|
|
|
with transaction.atomic(using=router.db_for_write(self.queryset.model)):
|
|
|
@@ -199,17 +204,19 @@ class BulkUpdateModelMixin:
|
|
|
|
|
|
def bulk_update(self, request, *args, **kwargs):
|
|
|
partial = kwargs.pop('partial', False)
|
|
|
+
|
|
|
+ # If background processing was requested, enqueue a job and return immediately (before
|
|
|
+ # any validation, which is deferred to the worker). _handle_background_request() comes
|
|
|
+ # from BackgroundOperationMixin; fall back to "no background" so this mixin remains
|
|
|
+ # usable on its own (e.g. in custom viewsets).
|
|
|
+ handle_background = getattr(self, '_handle_background_request', lambda *a, **kw: None)
|
|
|
+ action = 'bulk_partial_update' if partial else 'bulk_update'
|
|
|
+ if (response := handle_background(request, action)) is not None:
|
|
|
+ return response
|
|
|
+
|
|
|
serializer = BulkOperationSerializer(data=request.data, many=True)
|
|
|
serializer.is_valid(raise_exception=True)
|
|
|
|
|
|
- # If background processing was requested, enqueue a job and return immediately.
|
|
|
- # The payload is captured here, before the request.data mutation below.
|
|
|
- # _background_requested() comes from BackgroundOperationMixin; fall back to "no
|
|
|
- # background" so this mixin remains usable on its own (e.g. in custom viewsets).
|
|
|
- if getattr(self, '_background_requested', lambda request: False)(request):
|
|
|
- action = 'bulk_partial_update' if partial else 'bulk_update'
|
|
|
- return self._enqueue_bulk_job(request, action, payload=list(request.data))
|
|
|
-
|
|
|
qs = self.get_bulk_update_queryset().filter(
|
|
|
pk__in=[o['id'] for o in serializer.data]
|
|
|
)
|
|
|
@@ -275,15 +282,17 @@ class BulkDestroyModelMixin:
|
|
|
return self.get_queryset()
|
|
|
|
|
|
def bulk_destroy(self, request, *args, **kwargs):
|
|
|
+ # If background processing was requested, enqueue a job and return immediately (before
|
|
|
+ # any validation, which is deferred to the worker). _handle_background_request() comes
|
|
|
+ # from BackgroundOperationMixin; fall back to "no background" so this mixin remains
|
|
|
+ # usable on its own (e.g. in custom viewsets).
|
|
|
+ handle_background = getattr(self, '_handle_background_request', lambda *a, **kw: None)
|
|
|
+ if (response := handle_background(request, 'bulk_destroy')) is not None:
|
|
|
+ return response
|
|
|
+
|
|
|
serializer = BulkOperationSerializer(data=request.data, many=True)
|
|
|
serializer.is_valid(raise_exception=True)
|
|
|
|
|
|
- # If background processing was requested, enqueue a job and return immediately.
|
|
|
- # _background_requested() comes from BackgroundOperationMixin; fall back to "no
|
|
|
- # background" so this mixin remains usable on its own (e.g. in custom viewsets).
|
|
|
- if getattr(self, '_background_requested', lambda request: False)(request):
|
|
|
- return self._enqueue_bulk_job(request, 'bulk_destroy', payload=list(request.data))
|
|
|
-
|
|
|
qs = self.get_bulk_destroy_queryset().filter(
|
|
|
pk__in=[o['id'] for o in serializer.validated_data]
|
|
|
)
|