Просмотр исходного кода

Closes #21992: Enable background job support for REST API bulk requests (#22452)

Bulk write operations (create/update/delete a JSON list at a model's list
endpoint) can opt into background processing with the ?background=true query
parameter. The request is validated synchronously and, if accepted, an
AsyncAPIJob is enqueued and a 202 Accepted is returned with the job id and
poll URL; the write is performed later by a worker that re-invokes the same
viewset action, so behavior matches the synchronous path (including
all-or-nothing transaction semantics).

- AsyncAPIJob reconstructs the request in the worker, re-applies object
  permissions, runs within the request processors (change logging/events),
  and captures the action's response into job.data as {status_code, data}.
- Handled rejections are translated to match the synchronous API: APIException
  via handle_exception(), and AbortRequest/ProtectedError/RestrictedError via a
  new NetBoxModelViewSet.exception_to_response() helper. These terminate the
  job as "failed" (reserving "errored" for unexpected crashes).
- Background processing is refused with 503 when no worker is servicing the
  queue, and rejected with 400 when combined with an If-Match precondition
  (which cannot be honored once execution is deferred).
- Single-object writes, GET requests, and non-list payloads ignore the
  parameter and run synchronously.

exception_to_response() intentionally duplicates the translation logic in
dispatch() rather than dispatch() being refactored to call it; consolidating
the two is left as a follow-up to keep this change off the synchronous hot path.

* Address code review feedback (#21992)

- Carry the request's scheme and host into the background worker so absolute
  URLs in the captured job result point at the real server instead of a
  hardcoded http://localhost/.
- Emit the same protected-delete warning log in exception_to_response() that
  dispatch() produces, restoring application-log parity for background failures.
- Drop the inert `_authenticator = None` assignment: setting request.user
  already prevents lazy re-authentication via the public API, and nothing on
  the worker's action path reads the authenticator.
- Remove the redundant success-path job.save() (JobRunner.handle() ->
  terminate() persists job.data) and hoist the AsyncAPIJob import in mixins.py
  to module level (no real import cycle through it).
- Add a test asserting result URLs reflect the request host.

* Fix IPv6 host parsing in background API request reconstruction

Parse the carried host with urlsplit (and pass it verbatim as HTTP_HOST)
instead of host.partition(':'), which split bracketed IPv6 hosts like
[::1]:8443 on their inner colons. Extract request construction into
AsyncAPIJob._build_request and add a test asserting the IPv6 host round-trips.

* Address review feedback (#21992)

- Make the bulk mixins safe to use without BackgroundOperationMixin: guard the
  _background_requested / _maybe_background_bulk_create calls with a getattr
  fallback so BulkUpdateModelMixin/BulkDestroyModelMixin/SequentialBulkCreatesMixin
  retain their standalone behavior in custom viewset composition.
- Add a test covering the background ProtectedError/RestrictedError path: a bulk
  delete of a protected object records the same 409 the synchronous API returns
  (job failed, status_code 409, object preserved), via exception_to_response().
Jason Novinger 2 недель назад
Родитель
Сommit
89504b2502

+ 47 - 0
docs/integrations/rest-api.md

@@ -738,6 +738,53 @@ http://netbox/api/dcim/sites/ \
 !!! note
     The bulk deletion of objects is an all-or-none operation, meaning that if NetBox fails to delete any of the specified objects (e.g. due a dependency by a related object), the entire operation will be aborted and none of the objects will be deleted.
 
+## Background Processing
+
+!!! info "This feature was introduced in NetBox v4.7."
+
+Bulk write operations (creating, updating, or deleting multiple objects via a model's list endpoint) can optionally be processed as a [background job](../features/background-jobs.md) rather than synchronously. This is useful for large batches that would otherwise hold the connection open long enough to risk a proxy or gateway timeout.
+
+To request background processing, append the `background=true` query parameter to a bulk write request. NetBox validates the request immediately and, if it is well-formed and authorized, enqueues a job and returns an `HTTP 202 Accepted` response containing the job's ID and URL. The actual write is performed later by a worker, running the same logic (and preserving the same all-or-none transaction semantics) as the synchronous path.
+
+```no-highlight
+curl -s -X PATCH \
+-H "Authorization: Token $TOKEN" \
+-H "Content-Type: application/json" \
+http://netbox/api/dcim/sites/?background=true \
+--data '[{"id": 10, "status": "active"}, {"id": 11, "status": "active"}]'
+```
+
+The response identifies the enqueued job:
+
+```json
+{
+    "job": {
+        "id": 42,
+        "url": "http://netbox/api/core/jobs/42/",
+        "status": "pending"
+    }
+}
+```
+
+Poll the job's URL to track its progress. When the job reaches a terminal status, its `data` field holds the result and its `error` field describes any failure. The `data` field mirrors the response the synchronous request would have returned, as an object with the HTTP `status_code` and the response `data`. For example, a completed bulk update records:
+
+```json
+{
+    "status_code": 200,
+    "data": [
+        {"id": 10, "url": "http://netbox/api/dcim/sites/10/", "status": {"value": "active"}, "...": "..."}
+    ]
+}
+```
+
+A failed job records the equivalent error response, for instance `{"status_code": 400, "data": {"slug": ["This field may not be blank."]}}`, with a short summary also placed in the job's `error` field.
+
+A `202` response indicates that the request was accepted and queued, not that it succeeded: object-level validation and the database write occur when the job runs. Always inspect the job's final status to confirm the outcome. Because the result is stored on the job, any user permitted to view jobs (`core.view_job`, subject to object permissions) can read the serialized objects it contains.
+
+Background processing applies only to bulk operations (a JSON list) on a model's list endpoint. For a single-object write the `background` parameter is ignored and the request is processed synchronously. It cannot be combined with an [`If-Match`](#if-match) precondition (which cannot be evaluated reliably once execution is deferred); such a request is rejected with an `HTTP 400` response. If no background worker is running to service the queue, the request is rejected with an `HTTP 503` response rather than enqueuing a job that would never run.
+
+Two behaviors differ from a synchronous request and may change in a future release: field selection via [`fields`/`omit`](#specifying-fields) (and brief mode) is not applied to the stored result, and the authorization captured when the request is accepted is not re-checked if the token is later disabled or expires before the job runs.
+
 ## Changelog Messages
 
 Most objects in NetBox support [change logging](../features/change-logging.md), which generates a detailed record each time an object is created, modified, or deleted. Additionally, users can attach a message to the change record as well. This is accomplished via the REST API by including a `changelog_message` field in the object representation.

+ 32 - 0
netbox/netbox/api/viewsets/__init__.py

@@ -151,6 +151,7 @@ class NetBoxReadOnlyModelViewSet(
 
 class NetBoxModelViewSet(
     ETagMixin,
+    mixins.BackgroundOperationMixin,
     mixins.BulkUpdateModelMixin,
     mixins.BulkDestroyModelMixin,
     mixins.ObjectValidationMixin,
@@ -215,9 +216,40 @@ class NetBoxModelViewSet(
                 **kwargs
             )
 
+    def exception_to_response(self, exc):
+        """
+        Translate a NetBox/Django exception that is not a DRF APIException into the same
+        Response that dispatch() would return for it. Returns None if the exception is not
+        one this method handles (the caller should then re-raise or defer to DRF).
+
+        This mirrors the except clauses in dispatch(); it is also called by the background
+        job runner (netbox.jobs.AsyncAPIJob), which executes action methods directly and so
+        bypasses dispatch(). NOTE: dispatch() does not yet call this helper itself — see the
+        PR description for the proposed consolidation.
+        """
+        logger = logging.getLogger(f'netbox.api.views.{self.__class__.__name__}')
+        if isinstance(exc, (ProtectedError, RestrictedError)):
+            if type(exc) is ProtectedError:
+                protected_objects = list(exc.protected_objects)
+            else:
+                protected_objects = list(exc.restricted_objects)
+            msg = f'Unable to delete object. {len(protected_objects)} dependent objects were found: '
+            msg += ', '.join([f'{obj} ({obj.pk})' for obj in protected_objects])
+            logger.warning(msg)
+            return Response({'detail': msg}, status=409)
+        if isinstance(exc, AbortRequest):
+            logger.debug(exc.message)
+            return Response({'detail': exc.message}, status=400)
+        return None
+
     # Creates
 
     def create(self, request, *args, **kwargs):
+        # If background processing was requested for a bulk (list) create, validate and enqueue.
+        # Single-object creates always run synchronously.
+        if (response := self._maybe_background_bulk_create(request)) is not None:
+            return response
+
         serializer = self.get_serializer(data=request.data)
         serializer.is_valid(raise_exception=True)
         bulk_create = getattr(serializer, 'many', False)

+ 114 - 0
netbox/netbox/api/viewsets/mixins.py

@@ -1,14 +1,21 @@
 from django.core.exceptions import ObjectDoesNotExist
 from django.db import router, transaction
 from django.http import Http404
+from django.utils.translation import gettext_lazy as _
 from rest_framework import status
+from rest_framework.exceptions import ValidationError
 from rest_framework.response import Response
+from rest_framework.reverse import reverse
 
 from core.models import ObjectType
 from extras.models import ExportTemplate
 from netbox.api.serializers import BulkOperationSerializer
+from netbox.jobs import AsyncAPIJob
+from utilities.exceptions import RQWorkerNotRunningException
+from utilities.rqworker import any_workers_for_queue
 
 __all__ = (
+    'BackgroundOperationMixin',
     'BulkDestroyModelMixin',
     'BulkUpdateModelMixin',
     'CustomFieldsMixin',
@@ -18,6 +25,90 @@ __all__ = (
 )
 
 
+class BackgroundOperationMixin:
+    """
+    Enable optional background processing of REST API bulk write operations. When a write
+    request to a list endpoint includes ``?background=true``, the bulk action validates the
+    payload synchronously, enqueues an ``AsyncAPIJob`` to perform the work, and immediately
+    returns ``202 Accepted`` with the job's ID and polling URL. The actual write runs in a
+    worker via the same action method, so behavior is identical to the synchronous path.
+
+    This mixin overrides no framework methods; the bulk action methods call its helpers.
+    """
+
+    def _background_requested(self, request):
+        """Return True if background processing was requested for this write."""
+        if request.method not in ('POST', 'PUT', 'PATCH', 'DELETE'):
+            return False
+        return request.query_params.get('background', '').lower() == 'true'
+
+    def _maybe_background_bulk_create(self, request):
+        """
+        Shared entry point for the create() overrides. If background processing was requested
+        for a bulk (list) create, validate the payload synchronously and return a 202 Response;
+        otherwise return None so the caller proceeds with synchronous creation.
+        """
+        if not (isinstance(request.data, list) and self._background_requested(request)):
+            return None
+
+        # Validate synchronously before enqueuing so a malformed payload is rejected with a
+        # 400 now, rather than producing a 202 for work that can never succeed. (Constraints
+        # that depend on other items in the batch are still evaluated when the job runs.)
+        serializer = self.get_serializer(data=request.data, many=True)
+        serializer.is_valid(raise_exception=True)
+
+        return self._enqueue_bulk_job(request, 'create', payload=list(request.data))
+
+    def _enqueue_bulk_job(self, request, action, payload, action_kwargs=None):
+        """
+        Enqueue an AsyncAPIJob to perform the given bulk action in the background and return
+        a 202 response containing the job ID and polling URL.
+        """
+        # Reject conditional requests: an If-Match precondition cannot be meaningfully
+        # honored when the write is deferred to a worker (the TOCTOU window is unbounded).
+        if request.META.get('HTTP_IF_MATCH'):
+            raise ValidationError(
+                _("The If-Match header is not supported with background processing.")
+            )
+
+        # Don't accept work that no worker can perform (mirrors the scripts API; AsyncAPIJob
+        # is enqueued without an instance, so it always lands on the default queue).
+        if not any_workers_for_queue('default'):
+            raise RQWorkerNotRunningException()
+
+        model = self.queryset.model
+        verb = _("delete") if action == 'bulk_destroy' else (
+            _("create") if action == 'create' else _("update")
+        )
+        job_name = _("Bulk {verb} {object_type}").format(
+            verb=verb,
+            object_type=model._meta.verbose_name_plural,
+        )
+        job = AsyncAPIJob.enqueue(
+            name=job_name,
+            user=request.user,
+            viewset_class=f'{type(self).__module__}.{type(self).__qualname__}',
+            action=action,
+            payload=payload,
+            user_pk=request.user.pk,
+            request_id=str(getattr(request, 'id', '')),
+            method=request.method,
+            action_kwargs=action_kwargs or {},
+            # Carry the request's scheme/host so URLs in the captured result are absolute
+            # and followable (the worker has no real request to derive them from).
+            scheme=request.scheme,
+            host=request.get_host(),
+        )
+
+        job_url = reverse('core-api:job-detail', kwargs={'pk': job.pk}, request=request)
+        response = Response(
+            {'job': {'id': job.pk, 'url': job_url, 'status': job.status}},
+            status=status.HTTP_202_ACCEPTED,
+        )
+        response['Location'] = job_url
+        return response
+
+
 class CustomFieldsMixin:
     """
     For models which support custom fields, populate the `custom_fields` context.
@@ -60,6 +151,13 @@ class SequentialBulkCreatesMixin:
     appropriately.
     """
     def create(self, request, *args, **kwargs):
+        # If background processing was requested for a bulk (list) create, validate and enqueue.
+        # _maybe_background_bulk_create() comes from BackgroundOperationMixin; fall back to "no
+        # background" so this mixin remains usable on its own (e.g. in custom viewsets).
+        maybe_background = getattr(self, '_maybe_background_bulk_create', lambda request: None)
+        if (response := maybe_background(request)) is not None:
+            return response
+
         with transaction.atomic(using=router.db_for_write(self.queryset.model)):
             if not isinstance(request.data, list):
                 # Creating a single object
@@ -102,6 +200,15 @@ class BulkUpdateModelMixin:
         partial = kwargs.pop('partial', False)
         serializer = BulkOperationSerializer(data=request.data, many=True)
         serializer.is_valid(raise_exception=True)
+
+        # If background processing was requested, enqueue a job and return immediately.
+        # The payload is captured here, before the request.data mutation below.
+        # _background_requested() comes from BackgroundOperationMixin; fall back to "no
+        # background" so this mixin remains usable on its own (e.g. in custom viewsets).
+        if getattr(self, '_background_requested', lambda request: False)(request):
+            action = 'bulk_partial_update' if partial else 'bulk_update'
+            return self._enqueue_bulk_job(request, action, payload=list(request.data))
+
         qs = self.get_bulk_update_queryset().filter(
             pk__in=[o['id'] for o in serializer.data]
         )
@@ -155,6 +262,13 @@ class BulkDestroyModelMixin:
     def bulk_destroy(self, request, *args, **kwargs):
         serializer = BulkOperationSerializer(data=request.data, many=True)
         serializer.is_valid(raise_exception=True)
+
+        # If background processing was requested, enqueue a job and return immediately.
+        # _background_requested() comes from BackgroundOperationMixin; fall back to "no
+        # background" so this mixin remains usable on its own (e.g. in custom viewsets).
+        if getattr(self, '_background_requested', lambda request: False)(request):
+            return self._enqueue_bulk_job(request, 'bulk_destroy', payload=list(request.data))
+
         qs = self.get_bulk_destroy_queryset().filter(
             pk__in=[o['id'] for o in serializer.validated_data]
         )

+ 136 - 1
netbox/netbox/jobs.py

@@ -1,14 +1,23 @@
+import json
 import logging
 import os
 import traceback
 from abc import ABC, abstractmethod
 from datetime import timedelta
+from io import BytesIO
 from pathlib import Path
+from urllib.parse import urlsplit
 
-from django.core.exceptions import ImproperlyConfigured
+from django.contrib.auth import get_user_model
+from django.core.exceptions import ImproperlyConfigured, PermissionDenied
+from django.core.handlers.wsgi import WSGIRequest
+from django.db.models import ProtectedError, RestrictedError
+from django.http import Http404
 from django.utils import timezone
 from django.utils.functional import classproperty
+from django.utils.module_loading import import_string
 from django_pglocks import advisory_lock
+from rest_framework.exceptions import APIException
 from rq.timeouts import JobTimeoutException
 
 from core.choices import JobStatusChoices
@@ -16,9 +25,11 @@ from core.exceptions import JobFailed
 from core.models import Job, ObjectType
 from netbox.constants import ADVISORY_LOCK_KEYS
 from netbox.registry import registry
+from utilities.exceptions import AbortRequest
 from utilities.request import apply_request_processors
 
 __all__ = (
+    'AsyncAPIJob',
     'AsyncViewJob',
     'JobRunner',
     'system_job',
@@ -243,3 +254,127 @@ class AsyncViewJob(JobRunner):
 
         if self.job.error:
             raise JobFailed()
+
+
+class AsyncAPIJob(JobRunner):
+    """
+    Execute a REST API bulk write (create/update/delete) as a background job.
+
+    The viewset's action method is re-invoked inside the worker against a reconstructed
+    request, so the synchronous and background code paths are identical (validation,
+    transaction semantics, object permissions, and change logging all behave the same).
+    The action's serialized Response is captured into the job's data.
+    """
+    class Meta:
+        name = 'Async API Request'
+
+    @staticmethod
+    def _build_request(payload, method, request_id, scheme, host):
+        """
+        Reconstruct a minimal WSGIRequest carrying the original JSON payload. DRF's Request
+        wrapper requires a real HttpRequest, and the original scheme/host are applied so that
+        absolute URLs in the captured result (serializer hyperlink fields) point at the real
+        server. The host is passed via HTTP_HOST verbatim (request.get_host() already formats
+        it correctly, including bracketed IPv6, and Django's get_host() prefers it);
+        SERVER_NAME/SERVER_PORT are populated only to satisfy WSGI, parsed via urlsplit so
+        host:port and [::1]:port split correctly.
+        """
+        parsed_host = urlsplit(f'//{host}')
+        body = json.dumps(payload).encode('utf-8')
+        request = WSGIRequest({
+            'REQUEST_METHOD': method.upper(),
+            'PATH_INFO': '/',
+            'CONTENT_TYPE': 'application/json',
+            'CONTENT_LENGTH': str(len(body)),
+            'wsgi.input': BytesIO(body),
+            'wsgi.url_scheme': scheme,
+            'HTTP_HOST': host,
+            'SERVER_NAME': parsed_host.hostname or 'localhost',
+            'SERVER_PORT': str(parsed_host.port) if parsed_host.port else ('443' if scheme == 'https' else '80'),
+            'SERVER_PROTOCOL': 'HTTP/1.1',
+        })
+        request.id = request_id
+        return request
+
+    def run(
+        self, viewset_class, action, payload, user_pk, request_id, method,
+        action_kwargs=None, scheme='http', host='localhost', **kwargs
+    ):
+        # Imported here to avoid a circular import (netbox.api.viewsets imports from this module).
+        from netbox.api.viewsets import HTTP_ACTIONS
+
+        action_kwargs = action_kwargs or {}
+        viewset_class = import_string(viewset_class)
+
+        # Re-fetch the requesting user. If the user no longer exists or is inactive, fail
+        # the job rather than running with stale identity (execution-time identity check).
+        User = get_user_model()
+        try:
+            user = User.objects.get(pk=user_pk)
+        except User.DoesNotExist:
+            self.job.error = "The requesting user no longer exists."
+            self.job.save()
+            raise JobFailed()
+        if not user.is_active:
+            self.job.error = "The requesting user is no longer active."
+            self.job.save()
+            raise JobFailed()
+
+        django_request = self._build_request(payload, method, request_id, scheme, host)
+
+        # Instantiate the viewset and apply the minimal scaffolding that DRF's as_view()
+        # normally sets, so initialize_request() can wire up parsers, etc.
+        viewset = viewset_class()
+        viewset.action_map = {method.lower(): action}
+        viewset.kwargs = {}
+        viewset.args = ()
+        viewset.action = action
+        viewset.format_kwarg = None
+
+        drf_request = viewset.initialize_request(django_request)
+        # Carry the authenticated user forward; we do not re-authenticate in the worker.
+        # Setting .user populates the request's user cache, so DRF never lazily invokes
+        # authentication (and nothing on the action path reads the authenticator/auth).
+        drf_request.user = user
+        drf_request.id = request_id
+        viewset.request = drf_request
+
+        # Re-apply object-level permission restriction exactly as BaseViewSet.initial() does.
+        if perm_action := HTTP_ACTIONS[method.upper()]:
+            viewset.queryset = viewset.queryset.restrict(user, perm_action)
+
+        # Execute the action method within the registered request processors so change
+        # logging and event rules fire (and are attributed to the original request_id).
+        #
+        # The synchronous path relies on NetBoxModelViewSet.dispatch() and DRF's
+        # handle_exception() to translate exceptions into HTTP responses. Because we invoke
+        # the action method directly (bypassing dispatch), we reproduce that translation here
+        # so the captured result matches what the synchronous API would have returned:
+        #   - APIException (incl. ValidationError, PermissionDenied, Http404) -> handle_exception()
+        #   - AbortRequest / ProtectedError / RestrictedError -> exception_to_response()
+        with apply_request_processors(drf_request):
+            try:
+                response = getattr(viewset, action)(drf_request, **action_kwargs)
+            except (APIException, Http404, PermissionDenied) as e:
+                response = viewset.handle_exception(e)
+            except (AbortRequest, ProtectedError, RestrictedError) as e:
+                response = viewset.exception_to_response(e)
+                if response is None:
+                    raise
+
+        # Capture the action's result for the polling client, in the same shape for both
+        # success and failure.
+        self.job.data = {
+            'status_code': response.status_code,
+            'data': response.data,
+        }
+
+        if response.status_code >= 400:
+            # A handled rejection (4xx), not a worker crash: record a concise summary and
+            # mark the job failed (JobRunner.handle reserves "errored" for unhandled crashes).
+            detail = response.data.get('detail') if isinstance(response.data, dict) else None
+            self.job.error = str(detail) if detail else f"Request failed with status {response.status_code}."
+            self.job.save()
+            raise JobFailed()
+
+        # On success, job.data is persisted by JobRunner.handle() -> job.terminate().

+ 341 - 0
netbox/netbox/tests/test_api_background.py

@@ -0,0 +1,341 @@
+"""
+Tests for optional background processing of REST API bulk write operations (#21992).
+
+These exercise the ?background=true path: a bulk create/update/delete returns 202 with a
+job reference, the AsyncAPIJob performs the work, and the result is recorded on the job.
+
+To run without a live RQ worker, setUp() (a) patches AsyncAPIJob.enqueue to run the job
+inline (immediate=True) and (b) reports a worker as available for the worker-liveness guard.
+Individual tests that exercise those guards override this locally.
+"""
+import uuid
+from unittest.mock import patch
+
+from django.contrib.contenttypes.models import ContentType
+from rest_framework import status
+
+from core.choices import JobStatusChoices
+from core.exceptions import JobFailed
+from core.models import Job, ObjectChange
+from dcim.models import DeviceType, Manufacturer, Region
+from users.models import ObjectPermission
+from utilities.testing.api import APITestCase
+from utilities.testing.mixins import RQQueueTestMixin
+
+
+class BackgroundBulkWriteTests(RQQueueTestMixin, APITestCase):
+    @classmethod
+    def setUpTestData(cls):
+        cls.regions = [
+            Region.objects.create(name=f'Region {i}', slug=f'region-{i}')
+            for i in range(1, 4)
+        ]
+
+    def setUp(self):
+        super().setUp()
+
+        # Run enqueued jobs inline (immediate=True) so they execute within the test.
+        import netbox.jobs as jobs_module
+        self._orig_enqueue = jobs_module.AsyncAPIJob.enqueue.__func__
+
+        def _immediate_enqueue(cls, *args, **kwargs):
+            kwargs.setdefault('immediate', True)
+            return self._orig_enqueue(cls, *args, **kwargs)
+
+        jobs_module.AsyncAPIJob.enqueue = classmethod(_immediate_enqueue)
+        self.addCleanup(
+            setattr, jobs_module.AsyncAPIJob, 'enqueue', classmethod(self._orig_enqueue)
+        )
+
+        # Report a worker as available so _enqueue_bulk_job's liveness guard passes.
+        worker_patcher = patch(
+            'netbox.api.viewsets.mixins.any_workers_for_queue', return_value=True
+        )
+        worker_patcher.start()
+        self.addCleanup(worker_patcher.stop)
+
+    def grant(self, *actions, constraints=None):
+        perm = ObjectPermission.objects.create(
+            name='Test permission', actions=list(actions), constraints=constraints
+        )
+        perm.users.add(self.user)
+        perm.object_types.add(ContentType.objects.get_for_model(Region))
+
+    def _ct(self):
+        return ContentType.objects.get_for_model(Region)
+
+    def _assert_job_link(self, response):
+        """Assert the 202 body + Location header reference the same job detail URL."""
+        job_block = response.data['job']
+        self.assertEqual(response['Location'], job_block['url'])
+        self.assertTrue(
+            job_block['url'].endswith(f"/api/core/jobs/{job_block['id']}/"),
+            job_block['url'],
+        )
+
+    # ------------------------------------------------------------------ create
+
+    def test_background_bulk_create(self):
+        self.grant('add', 'view')
+        payload = [
+            {'name': 'Region A', 'slug': 'region-a'},
+            {'name': 'Region B', 'slug': 'region-b'},
+        ]
+        response = self.client.post(
+            '/api/dcim/regions/?background=true', payload, format='json', **self.header
+        )
+        self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
+        self._assert_job_link(response)
+
+        job = Job.objects.get(pk=response.data['job']['id'])
+        self.assertEqual(job.status, JobStatusChoices.STATUS_COMPLETED)
+        self.assertTrue(Region.objects.filter(slug='region-a').exists())
+        self.assertTrue(Region.objects.filter(slug='region-b').exists())
+        # Result captured on the job.
+        self.assertEqual(job.data['status_code'], status.HTTP_201_CREATED)
+        self.assertEqual(len(job.data['data']), 2)
+        # URLs in the captured result are absolute and derived from the request host
+        # (the worker carries the request's scheme/host forward), not hardcoded localhost.
+        for obj in job.data['data']:
+            self.assertTrue(obj['url'].startswith('http://testserver/'), obj['url'])
+
+    # ------------------------------------------------------------------ update
+
+    def test_background_bulk_update_patch(self):
+        self.grant('change', 'view')
+        payload = [{'id': r.pk, 'description': 'bg'} for r in self.regions]
+        response = self.client.patch(
+            '/api/dcim/regions/?background=true', payload, format='json', **self.header
+        )
+        self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
+        self._assert_job_link(response)
+        job = Job.objects.get(pk=response.data['job']['id'])
+        self.assertEqual(job.status, JobStatusChoices.STATUS_COMPLETED)
+        self.assertEqual(job.data['status_code'], status.HTTP_200_OK)
+        for r in Region.objects.filter(pk__in=[x.pk for x in self.regions]):
+            self.assertEqual(r.description, 'bg')
+
+    def test_background_bulk_update_put(self):
+        # PUT (full, partial=False) bulk update must enqueue and persist like PATCH. Region
+        # only requires name + slug, so a full representation is straightforward.
+        self.grant('change', 'view')
+        payload = [
+            {'id': r.pk, 'name': r.name, 'slug': r.slug, 'description': 'put-bg'}
+            for r in self.regions
+        ]
+        response = self.client.put(
+            '/api/dcim/regions/?background=true', payload, format='json', **self.header
+        )
+        self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
+        job = Job.objects.get(pk=response.data['job']['id'])
+        self.assertEqual(job.status, JobStatusChoices.STATUS_COMPLETED)
+        for r in Region.objects.filter(pk__in=[x.pk for x in self.regions]):
+            self.assertEqual(r.description, 'put-bg')
+
+    def test_background_bulk_update_object_permission_subset(self):
+        # Constrained to Region 1 only; bulk update of all three should update only the
+        # permitted subset and SUCCEED (matching synchronous behavior; no rollback).
+        self.grant('change', 'view', constraints={'name': 'Region 1'})
+        payload = [{'id': r.pk, 'description': 'subset'} for r in self.regions]
+        response = self.client.patch(
+            '/api/dcim/regions/?background=true', payload, format='json', **self.header
+        )
+        self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
+        job = Job.objects.get(pk=response.data['job']['id'])
+        self.assertEqual(job.status, JobStatusChoices.STATUS_COMPLETED)
+        # DB side effect AND captured result reflect the permitted subset only.
+        self.assertEqual(Region.objects.filter(description='subset').count(), 1)
+        self.assertEqual(job.data['status_code'], status.HTTP_200_OK)
+        self.assertEqual(len(job.data['data']), 1)
+
+    def test_background_bulk_update_all_or_nothing(self):
+        self.grant('change', 'view')
+        payload = [
+            {'id': self.regions[0].pk, 'description': 'ok'},
+            {'id': self.regions[1].pk, 'slug': ''},  # invalid: blank slug
+        ]
+        response = self.client.patch(
+            '/api/dcim/regions/?background=true', payload, format='json', **self.header
+        )
+        self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
+        job = Job.objects.get(pk=response.data['job']['id'])
+        self.assertEqual(job.status, JobStatusChoices.STATUS_FAILED)
+        # The failure detail (a 400 validation error) is captured in job.data + job.error.
+        self.assertEqual(job.data['status_code'], status.HTTP_400_BAD_REQUEST)
+        self.assertTrue(job.error)
+        # Neither item persisted (whole batch rolled back).
+        self.regions[0].refresh_from_db()
+        self.assertNotEqual(self.regions[0].description, 'ok')
+
+    # ------------------------------------------------------------------ delete
+
+    def test_background_bulk_delete(self):
+        self.grant('delete', 'view')
+        payload = [{'id': r.pk} for r in self.regions]
+        response = self.client.delete(
+            '/api/dcim/regions/?background=true', payload, format='json', **self.header
+        )
+        self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
+        job = Job.objects.get(pk=response.data['job']['id'])
+        self.assertEqual(job.status, JobStatusChoices.STATUS_COMPLETED)
+        self.assertEqual(job.data['status_code'], status.HTTP_204_NO_CONTENT)
+        self.assertFalse(Region.objects.filter(pk__in=[r.pk for r in self.regions]).exists())
+
+    def test_background_bulk_delete_protected_dependent(self):
+        # A protected dependency makes the delete fail. The worker must capture this as the
+        # same 409 the synchronous API returns (via exception_to_response), terminating the
+        # job as "failed" with the dependent listed, NOT as an unexpected "errored" job.
+        manufacturer = Manufacturer.objects.create(name='Manufacturer 1', slug='manufacturer-1')
+        DeviceType.objects.create(manufacturer=manufacturer, model='Model 1', slug='model-1')
+
+        perm = ObjectPermission.objects.create(name='Delete manufacturers', actions=['delete'])
+        perm.users.add(self.user)
+        perm.object_types.add(ContentType.objects.get_for_model(Manufacturer))
+
+        response = self.client.delete(
+            '/api/dcim/manufacturers/?background=true',
+            [{'id': manufacturer.pk}], format='json', **self.header
+        )
+        self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
+
+        job = Job.objects.get(pk=response.data['job']['id'])
+        self.assertEqual(job.status, JobStatusChoices.STATUS_FAILED)
+        self.assertEqual(job.data['status_code'], status.HTTP_409_CONFLICT)
+        # The protected object was not deleted.
+        self.assertTrue(Manufacturer.objects.filter(pk=manufacturer.pk).exists())
+
+    # ------------------------------------------------------------------ contract guards
+
+    def test_synchronous_rejection_no_job(self):
+        # Malformed payload (missing required id) must 400 synchronously, with no Job created.
+        self.grant('change', 'view')
+        response = self.client.patch(
+            '/api/dcim/regions/?background=true',
+            [{'description': 'no id'}],
+            format='json', **self.header
+        )
+        self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
+        self.assertEqual(Job.objects.count(), 0)
+
+    def test_if_match_with_background_rejected(self):
+        self.grant('change', 'view')
+        payload = [{'id': self.regions[0].pk, 'description': 'x'}]
+        response = self.client.patch(
+            '/api/dcim/regions/?background=true', payload, format='json',
+            HTTP_IF_MATCH='W/"whatever"', **self.header
+        )
+        self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
+        self.assertEqual(Job.objects.count(), 0)
+
+    def test_no_worker_returns_503_no_job(self):
+        # When no worker is servicing the queue, the request is refused (no silent 202).
+        self.grant('change', 'view')
+        payload = [{'id': self.regions[0].pk, 'description': 'x'}]
+        with patch('netbox.api.viewsets.mixins.any_workers_for_queue', return_value=False):
+            response = self.client.patch(
+                '/api/dcim/regions/?background=true', payload, format='json', **self.header
+            )
+        self.assertEqual(response.status_code, status.HTTP_503_SERVICE_UNAVAILABLE)
+        self.assertEqual(Job.objects.count(), 0)
+
+    def test_get_with_background_is_ignored(self):
+        self.grant('view')
+        response = self.client.get('/api/dcim/regions/?background=true', **self.header)
+        self.assertEqual(response.status_code, status.HTTP_200_OK)
+        self.assertEqual(Job.objects.count(), 0)
+
+    def test_single_object_create_not_backgrounded(self):
+        # A single-object (non-list) create with ?background=true runs synchronously (201).
+        self.grant('add', 'view')
+        response = self.client.post(
+            '/api/dcim/regions/?background=true',
+            {'name': 'Solo', 'slug': 'solo'},
+            format='json', **self.header
+        )
+        self.assertEqual(response.status_code, status.HTTP_201_CREATED)
+        self.assertEqual(Job.objects.count(), 0)
+
+    # ------------------------------------------------------------------ enqueue scheduling
+
+    def test_non_immediate_enqueue_creates_pending_job(self):
+        # Without the immediate=True shortcut, enqueue must schedule a pending job (and not
+        # execute inline). This confirms the real transaction.on_commit scheduling path.
+        self.grant('change', 'view')
+        import netbox.jobs as jobs_module
+        # Restore the un-patched enqueue for this test only.
+        jobs_module.AsyncAPIJob.enqueue = classmethod(self._orig_enqueue)
+
+        payload = [{'id': self.regions[0].pk, 'description': 'queued'}]
+        response = self.client.patch(
+            '/api/dcim/regions/?background=true', payload, format='json', **self.header
+        )
+        self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
+        job = Job.objects.get(pk=response.data['job']['id'])
+        self.assertEqual(job.status, JobStatusChoices.STATUS_PENDING)
+        # The write has NOT happened yet (job is only queued).
+        self.regions[0].refresh_from_db()
+        self.assertNotEqual(self.regions[0].description, 'queued')
+
+    # ------------------------------------------------------------------ error terminal states
+
+    def test_inactive_user_fails_job(self):
+        # Models the real timing the worker guards against: the request was accepted (the
+        # user was active at enqueue), but the user is deactivated before the worker runs.
+        # We drive AsyncAPIJob directly because the HTTP layer would otherwise reject an
+        # inactive user's token at authentication time, never reaching the worker.
+        from netbox.jobs import AsyncAPIJob
+
+        self.grant('change', 'view')
+        self.user.is_active = False
+        self.user.save()
+
+        job = Job.objects.create(name='Bulk update regions', user=self.user, job_id=uuid.uuid4())
+        with self.assertRaises(JobFailed):
+            AsyncAPIJob(job).run(
+                viewset_class='dcim.api.views.RegionViewSet',
+                action='bulk_update',
+                payload=[{'id': self.regions[0].pk, 'description': 'x'}],
+                user_pk=self.user.pk,
+                request_id='',
+                method='PATCH',
+                action_kwargs={'partial': True},
+            )
+        job.refresh_from_db()
+        self.assertIn('active', job.error.lower())
+        self.regions[0].refresh_from_db()
+        self.assertNotEqual(self.regions[0].description, 'x')
+
+    # ------------------------------------------------------------------ host parsing
+
+    def test_ipv6_host_builds_correct_request(self):
+        # A bracketed IPv6 host:port must not be split on its inner colons. Assert directly on
+        # the request the worker reconstructs: get_host() must round-trip the bracketed host,
+        # and SERVER_NAME/SERVER_PORT must be the IPv6 literal and port (a host.partition(':')
+        # implementation would yield SERVER_NAME='[' here).
+        from netbox.jobs import AsyncAPIJob
+
+        request = AsyncAPIJob._build_request(
+            payload=[], method='PATCH', request_id=str(uuid.uuid4()),
+            scheme='https', host='[::1]:8443',
+        )
+        self.assertEqual(request.get_host(), '[::1]:8443')
+        self.assertEqual(request.META['SERVER_NAME'], '::1')
+        self.assertEqual(request.META['SERVER_PORT'], '8443')
+        self.assertEqual(request.scheme, 'https')
+
+    # ------------------------------------------------------------------ change logging
+
+    def test_background_update_changelog_fidelity(self):
+        self.grant('change', 'view')
+        payload = [{'id': r.pk, 'description': 'logged'} for r in self.regions]
+        response = self.client.patch(
+            '/api/dcim/regions/?background=true', payload, format='json', **self.header
+        )
+        self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED)
+        request_id = response['X-Request-ID']
+
+        changes = ObjectChange.objects.filter(changed_object_type=self._ct())
+        self.assertEqual(changes.count(), 3)
+        self.assertTrue(all(c.user_id == self.user.pk for c in changes))
+        # The original request UUID is propagated to the worker so change records group correctly.
+        self.assertTrue(all(str(c.request_id) == request_id for c in changes))