Bladeren bron

Merge pull request #22201 from netbox-community/22125-extend-test-coverage-of-background-jobs

Closes #22125: Add test coverage for system housekeeping, data source sync, and script jobs
bctiemann 1 week geleden
bovenliggende
commit
a65afe7eaf
2 gewijzigde bestanden met toevoegingen van 696 en 0 verwijderingen
  1. 367 0
      netbox/core/tests/test_jobs.py
  2. 329 0
      netbox/extras/tests/test_jobs.py

+ 367 - 0
netbox/core/tests/test_jobs.py

@@ -0,0 +1,367 @@
+import uuid
+from datetime import timedelta
+from unittest.mock import MagicMock, patch
+
+import requests
+from django.conf import settings
+from django.test import TestCase, override_settings
+from django.utils import timezone
+
+from core.choices import DataSourceStatusChoices
+from core.jobs import SyncDataSourceJob, SystemHousekeepingJob
+from core.models import DataFile, DataSource, Job
+
+
+def _make_runner(cls, **job_attrs):
+    """
+    Build a JobRunner without going through ``__init__``.
+
+    ``JobRunner.__init__`` attaches a ``JobLogHandler`` to a module-level
+    singleton logger, so calling it once per test would accumulate handlers
+    across the suite. Bypass it and stub the logger directly.
+    """
+    runner = cls.__new__(cls)
+    runner.job = MagicMock(**job_attrs)
+    runner.logger = MagicMock()
+    return runner
+
+
+class HousekeepingRunnerMixin:
+    """Provides a `_runner()` helper that builds a SystemHousekeepingJob with a mock job."""
+
+    @staticmethod
+    def _runner():
+        return _make_runner(SystemHousekeepingJob)
+
+
+class SyncDataSourceJobTestCase(TestCase):
+    @classmethod
+    def setUpTestData(cls):
+        cls.datasource = DataSource.objects.create(
+            name='Test Source',
+            type='local',
+            source_url='/tmp/test',
+        )
+
+    def test_enqueue_sets_datasource_status_to_queued(self):
+        job = MagicMock()
+        job.object = self.datasource
+
+        with patch('core.models.Job.enqueue', return_value=job):
+            result = SyncDataSourceJob.enqueue(instance=self.datasource)
+
+        self.assertIs(result, job)
+        # Verify both the in-memory assignment (`datasource.status = ...`) and the
+        # persisted update (`DataSource.objects.filter(pk=...).update(...)`).
+        self.assertEqual(self.datasource.status, DataSourceStatusChoices.QUEUED)
+        self.datasource.refresh_from_db()
+        self.assertEqual(self.datasource.status, DataSourceStatusChoices.QUEUED)
+
+    def test_enqueue_without_object_is_noop(self):
+        # Baseline: the datasource starts at NEW. (Captures setUpTestData drift.)
+        self.assertEqual(self.datasource.status, DataSourceStatusChoices.NEW)
+
+        job = MagicMock()
+        job.object = None
+
+        with (
+            patch('core.models.Job.enqueue', return_value=job),
+            patch(
+                'core.jobs.DataSource.objects.filter',
+                wraps=DataSource.objects.filter,
+            ) as filter_,
+        ):
+            result = SyncDataSourceJob.enqueue()
+
+        self.assertIs(result, job)
+        # Intent: the `if datasource := job.object` branch was skipped, so no
+        # filter().update() call was even attempted.
+        filter_.assert_not_called()
+        # Outcome: no side effect on any existing DataSource.
+        self.datasource.refresh_from_db()
+        self.assertEqual(self.datasource.status, DataSourceStatusChoices.NEW)
+
+    def test_run_syncs_datasource_and_updates_search_cache(self):
+        datafile = DataFile.objects.create(
+            source=self.datasource,
+            path='test.txt',
+            last_updated=timezone.now(),
+            size=4,
+            hash='0' * 64,
+            data=b'test',
+        )
+        runner = _make_runner(SyncDataSourceJob, object_id=self.datasource.pk)
+
+        with (
+            patch('core.models.DataSource.sync') as sync,
+            patch('core.jobs.search_backend.cache') as cache,
+        ):
+            runner.run()
+
+        sync.assert_called_once_with()
+        cache.assert_called_once()
+        # The cache argument should iterate over the datasource's data files.
+        cache_arg = cache.call_args.args[0]
+        self.assertEqual(list(cache_arg), [datafile])
+
+    def test_run_marks_datasource_failed_and_reraises_on_sync_error(self):
+        runner = _make_runner(SyncDataSourceJob, object_id=self.datasource.pk)
+
+        with (
+            patch('core.models.DataSource.sync', side_effect=RuntimeError('boom')),
+            patch('core.jobs.search_backend.cache') as cache,
+        ):
+            with self.assertRaisesMessage(RuntimeError, 'boom'):
+                runner.run()
+
+        self.datasource.refresh_from_db()
+        self.assertEqual(self.datasource.status, DataSourceStatusChoices.FAILED)
+        cache.assert_not_called()
+
+    def test_run_raises_when_datasource_no_longer_exists(self):
+        # If the DataSource was deleted between enqueue and run, the initial lookup
+        # raises DoesNotExist; the framework will surface it as a job error.
+        runner = _make_runner(SyncDataSourceJob, object_id=99_999_999)
+
+        with self.assertRaises(DataSource.DoesNotExist):
+            runner.run()
+
+
+class SystemHousekeepingRunTestCase(HousekeepingRunnerMixin, TestCase):
+    SUBMETHODS = (
+        'send_census_report',
+        'clear_expired_sessions',
+        'prune_changelog',
+        'delete_expired_jobs',
+        'check_for_new_releases',
+    )
+
+    @override_settings(DEBUG=True)
+    def test_run_skips_when_debug_is_enabled(self):
+        # DEBUG is checked before sys.argv; sys.argv is irrelevant here.
+        runner = self._runner()
+        patches = {name: patch.object(SystemHousekeepingJob, name) for name in self.SUBMETHODS}
+        mocks = {name: p.start() for name, p in patches.items()}
+        self.addCleanup(lambda: [p.stop() for p in patches.values()])
+
+        runner.run()
+
+        for mock in mocks.values():
+            mock.assert_not_called()
+
+    @override_settings(DEBUG=False)
+    def test_run_skips_during_test_invocation(self):
+        runner = self._runner()
+        patches = {name: patch.object(SystemHousekeepingJob, name) for name in self.SUBMETHODS}
+        mocks = {name: p.start() for name, p in patches.items()}
+        self.addCleanup(lambda: [p.stop() for p in patches.values()])
+
+        with patch('core.jobs.sys.argv', ['manage.py', 'test']):
+            runner.run()
+
+        for mock in mocks.values():
+            mock.assert_not_called()
+
+    @override_settings(DEBUG=False)
+    def test_run_executes_all_housekeeping_tasks(self):
+        runner = self._runner()
+        patches = {name: patch.object(SystemHousekeepingJob, name) for name in self.SUBMETHODS}
+        mocks = {name: p.start() for name, p in patches.items()}
+        self.addCleanup(lambda: [p.stop() for p in patches.values()])
+
+        with patch('core.jobs.sys.argv', ['netbox']):
+            runner.run()
+
+        for mock in mocks.values():
+            mock.assert_called_once_with()
+
+
+class SendCensusReportTestCase(HousekeepingRunnerMixin, TestCase):
+    @override_settings(ISOLATED_DEPLOYMENT=True)
+    def test_send_census_report_skips_when_isolated_deployment(self):
+        with patch('core.jobs.requests.get') as get:
+            self._runner().send_census_report()
+        get.assert_not_called()
+
+    @override_settings(ISOLATED_DEPLOYMENT=False, CENSUS_REPORTING_ENABLED=False)
+    def test_send_census_report_skips_when_reporting_disabled(self):
+        with patch('core.jobs.requests.get') as get:
+            self._runner().send_census_report()
+        get.assert_not_called()
+
+    @override_settings(
+        ISOLATED_DEPLOYMENT=False,
+        CENSUS_REPORTING_ENABLED=True,
+        CENSUS_URL='https://census.example/',
+        DEPLOYMENT_ID='abc123',
+    )
+    def test_send_census_report_sends_expected_payload(self):
+        with (
+            patch('core.jobs.requests.get') as get,
+            patch('core.jobs.resolve_proxies', return_value={'https': 'proxy'}) as resolve,
+        ):
+            self._runner().send_census_report()
+
+        resolve.assert_called_once_with(url='https://census.example/')
+        get.assert_called_once()
+        kwargs = get.call_args.kwargs
+        self.assertEqual(kwargs['url'], 'https://census.example/')
+        self.assertEqual(kwargs['timeout'], 3)
+        self.assertEqual(kwargs['proxies'], {'https': 'proxy'})
+        self.assertEqual(kwargs['params']['deployment_id'], 'abc123')
+        self.assertIn('version', kwargs['params'])
+        self.assertIn('python_version', kwargs['params'])
+
+    @override_settings(
+        ISOLATED_DEPLOYMENT=False,
+        CENSUS_REPORTING_ENABLED=True,
+        CENSUS_URL='https://census.example/',
+    )
+    def test_send_census_report_swallows_request_exception(self):
+        with (
+            patch('core.jobs.requests.get', side_effect=requests.RequestException('down')),
+            patch('core.jobs.resolve_proxies', return_value={}),
+        ):
+            self._runner().send_census_report()  # must not raise
+
+
+class ClearExpiredSessionsTestCase(HousekeepingRunnerMixin, TestCase):
+    def test_clear_expired_sessions_calls_session_store(self):
+        engine = MagicMock()
+        with patch('core.jobs.import_module', return_value=engine) as import_module:
+            self._runner().clear_expired_sessions()
+
+        import_module.assert_called_once_with(settings.SESSION_ENGINE)
+        engine.SessionStore.clear_expired.assert_called_once_with()
+
+    def test_clear_expired_sessions_handles_not_implemented(self):
+        engine = MagicMock()
+        engine.SessionStore.clear_expired.side_effect = NotImplementedError
+        runner = self._runner()
+
+        with patch('core.jobs.import_module', return_value=engine):
+            runner.clear_expired_sessions()  # must not raise
+
+        runner.logger.warning.assert_called_once()
+        self.assertIn(
+            'does not support',
+            runner.logger.warning.call_args.args[0],
+        )
+
+
+class DeleteExpiredJobsTestCase(HousekeepingRunnerMixin, TestCase):
+    def test_delete_expired_jobs_skips_when_retention_unset(self):
+        old_job = Job.objects.create(name='old', job_id=uuid.uuid4())
+        Job.objects.filter(pk=old_job.pk).update(created=timezone.now() - timedelta(days=365))
+
+        with patch('core.jobs.Config') as config_cls:
+            config_cls.return_value.JOB_RETENTION = 0
+            self._runner().delete_expired_jobs()
+
+        self.assertTrue(Job.objects.filter(pk=old_job.pk).exists())
+
+    def test_delete_expired_jobs_deletes_only_jobs_older_than_retention(self):
+        old_job = Job.objects.create(name='old', job_id=uuid.uuid4())
+        recent_job = Job.objects.create(name='recent', job_id=uuid.uuid4())
+        Job.objects.filter(pk=old_job.pk).update(created=timezone.now() - timedelta(days=30))
+        Job.objects.filter(pk=recent_job.pk).update(created=timezone.now() - timedelta(hours=1))
+
+        with patch('core.jobs.Config') as config_cls:
+            config_cls.return_value.JOB_RETENTION = 7
+            self._runner().delete_expired_jobs()
+
+        self.assertFalse(Job.objects.filter(pk=old_job.pk).exists())
+        self.assertTrue(Job.objects.filter(pk=recent_job.pk).exists())
+
+
+class CheckForNewReleasesTestCase(HousekeepingRunnerMixin, TestCase):
+    @override_settings(ISOLATED_DEPLOYMENT=True)
+    def test_check_for_new_releases_skips_when_isolated(self):
+        with patch('core.jobs.requests.get') as get:
+            self._runner().check_for_new_releases()
+        get.assert_not_called()
+
+    @override_settings(ISOLATED_DEPLOYMENT=False, RELEASE_CHECK_URL=None)
+    def test_check_for_new_releases_skips_when_url_unset(self):
+        with patch('core.jobs.requests.get') as get:
+            self._runner().check_for_new_releases()
+        get.assert_not_called()
+
+    @override_settings(ISOLATED_DEPLOYMENT=False, RELEASE_CHECK_URL='https://api.example/')
+    def test_check_for_new_releases_handles_request_exception(self):
+        with (
+            patch('core.jobs.requests.get', side_effect=requests.RequestException('down')),
+            patch('core.jobs.cache') as cache,
+            patch('core.jobs.resolve_proxies', return_value={}),
+        ):
+            self._runner().check_for_new_releases()
+
+        cache.set.assert_not_called()
+
+    @override_settings(ISOLATED_DEPLOYMENT=False, RELEASE_CHECK_URL='https://api.example/')
+    def test_check_for_new_releases_caches_latest_stable_release(self):
+        response = MagicMock()
+        response.json.return_value = [
+            {'tag_name': 'v4.5.0', 'html_url': 'https://example/4.5.0'},
+            {'tag_name': 'v4.6.0', 'html_url': 'https://example/4.6.0'},
+            {'tag_name': 'v4.7.0-rc1', 'html_url': 'https://example/rc', 'prerelease': True},
+            {'tag_name': 'v4.7.0-dev', 'html_url': 'https://example/dev', 'devrelease': True},
+            {'html_url': 'https://example/no-tag'},
+        ]
+
+        with (
+            patch('core.jobs.requests.get', return_value=response) as get,
+            patch('core.jobs.cache.set') as cache_set,
+            patch('core.jobs.resolve_proxies', return_value={'http': 'proxy'}) as resolve,
+        ):
+            self._runner().check_for_new_releases()
+
+        # HTTP request: URL, GitHub API Accept header, resolved proxies.
+        resolve.assert_called_once_with(url='https://api.example/')
+        get.assert_called_once_with(
+            url='https://api.example/',
+            headers={'Accept': 'application/vnd.github.v3+json'},
+            proxies={'http': 'proxy'},
+        )
+        response.raise_for_status.assert_called_once_with()
+
+        cache_set.assert_called_once()
+        # Accept either positional or keyword form for cache.set(key, value, ttl).
+        call = cache_set.call_args
+        bound = {**dict(zip(('key', 'value', 'timeout'), call.args)), **call.kwargs}
+        self.assertEqual(bound['key'], 'latest_release')
+        self.assertIsNone(bound['timeout'])
+        latest_version, latest_url = bound['value']
+        self.assertEqual(str(latest_version), '4.6.0')
+        self.assertEqual(latest_url, 'https://example/4.6.0')
+
+
+class PruneChangelogTestCase(HousekeepingRunnerMixin, TestCase):
+    def test_prune_changelog_skips_when_retention_unset(self):
+        with (
+            patch('core.jobs.Config') as config_cls,
+            patch('core.jobs.ObjectChange') as object_change,
+        ):
+            config_cls.return_value.CHANGELOG_RETENTION = None
+            self._runner().prune_changelog()
+
+        object_change.objects.filter.assert_not_called()
+
+    def test_prune_changelog_uses_strict_cutoff_filter(self):
+        # Implementation pin: prune_changelog must use time__lt (strict less-than) so a
+        # record exactly at the cutoff is retained. End-to-end behavior of the prune is
+        # covered by ChangelogPruneRetentionTestCase in core/tests/test_changelog.py.
+        with (
+            patch('core.jobs.Config') as config_cls,
+            patch('core.jobs.ObjectChange') as object_change,
+            patch('core.jobs.timezone') as tz,
+        ):
+            config_cls.return_value.CHANGELOG_RETENTION = 7
+            config_cls.return_value.CHANGELOG_RETAIN_CREATE_LAST_UPDATE = False
+            tz.now.return_value = timezone.datetime(2026, 1, 8, tzinfo=timezone.get_current_timezone())
+            object_change.objects.filter.return_value.delete.return_value = (0, {})
+
+            self._runner().prune_changelog()
+
+        expected_cutoff = timezone.datetime(2026, 1, 1, tzinfo=timezone.get_current_timezone())
+        object_change.objects.filter.assert_called_once_with(time__lt=expected_cutoff)

+ 329 - 0
netbox/extras/tests/test_jobs.py

@@ -0,0 +1,329 @@
+from contextlib import contextmanager
+from types import SimpleNamespace
+from unittest.mock import MagicMock, patch
+
+from django.db import DEFAULT_DB_ALIAS
+from django.test import TestCase
+
+from extras.jobs import ScriptJob
+from utilities.exceptions import AbortScript
+
+
+def _make_runner(**job_attrs):
+    """
+    Build a ScriptJob without going through ``__init__``.
+
+    ``JobRunner.__init__`` attaches a ``JobLogHandler`` to a module-level
+    singleton logger; instantiating one per test would accumulate handlers.
+    """
+    runner = ScriptJob.__new__(ScriptJob)
+    runner.job = MagicMock(**job_attrs)
+    runner.logger = MagicMock()
+    return runner
+
+
+class DummyScript:
+    """A minimal stand-in for a Script implementation."""
+
+    full_name = 'tests.DummyScript'
+
+    def __init__(self, run_result=None, run_exception=None, failed=False):
+        self._run_result = run_result
+        self._run_exception = run_exception
+        self.failed = failed
+        self.output = None
+        self.request = None
+        self.run = MagicMock(side_effect=self._run_impl)
+        self.log_info = MagicMock()
+        self.log_failure = MagicMock()
+        self.get_job_data = MagicMock(return_value={'status': 'done'})
+
+    def _run_impl(self, data, commit):
+        if self._run_exception is not None:
+            raise self._run_exception
+        return self._run_result
+
+
+class RunScriptTestCase(TestCase):
+    def test_run_script_success_commit_true_sets_output_and_job_data(self):
+        runner = _make_runner()
+        script = DummyScript(run_result='hello')
+
+        runner.run_script(script, request=None, data={'k': 'v'}, commit=True)
+
+        self.assertEqual(script.output, 'hello')
+        script.run.assert_called_once_with({'k': 'v'}, True)
+        self.assertEqual(runner.job.data, {'status': 'done'})
+
+    def test_run_script_commit_false_rolls_back_without_raising(self):
+        runner = _make_runner()
+        script = DummyScript(run_result='hello')
+
+        runner.run_script(script, request=None, data={}, commit=False)
+
+        script.run.assert_called_once_with({}, False)
+        # Rollback path: log_info() is called (with the translated revert message);
+        # job.data is still populated. Don't assert exact wording.
+        script.log_info.assert_called_once()
+        self.assertEqual(runner.job.data, {'status': 'done'})
+
+    def test_run_script_commit_false_logs_warning_when_script_failed(self):
+        runner = _make_runner()
+        script = DummyScript(run_result='hello', failed=True)
+
+        with patch('extras.jobs.logging.getLogger') as get_logger:
+            logger = get_logger.return_value
+            runner.run_script(script, request=None, data={}, commit=False)
+
+        logger.warning.assert_any_call('Script failed')
+
+    def test_run_script_abort_script_logs_failure_and_reraises(self):
+        # Non-report scripts call `script.log_failure(msg)` positionally; report-style
+        # scripts use `log_failure(message=msg)`. See `test_run_script_abort_script_uses_report_log_failure_signature`.
+        runner = _make_runner()
+        script = DummyScript(run_exception=AbortScript('nope'))
+
+        with self.assertRaises(AbortScript):
+            runner.run_script(script, request=None, data={}, commit=True)
+
+        # Outcome assertions: AbortScript re-raised, failure logged on the script,
+        # rollback path traversed (log_info called), job.data populated.
+        script.log_failure.assert_called_once()
+        # Non-report path uses the positional signature; verify both that there is
+        # a positional arg and no `message=` kwarg.
+        self.assertTrue(script.log_failure.call_args.args)
+        self.assertFalse(script.log_failure.call_args.kwargs)
+        script.log_info.assert_called_once()
+        self.assertEqual(runner.job.data, {'status': 'done'})
+
+    def test_run_script_abort_script_uses_report_log_failure_signature(self):
+        runner = _make_runner()
+        script = DummyScript(run_exception=AbortScript('reportfail'))
+
+        with (
+            patch('extras.jobs.is_report', return_value=True),
+            self.assertRaises(AbortScript),
+        ):
+            runner.run_script(script, request=None, data={}, commit=True)
+
+        # For reports, log_failure is called with the keyword-form signature.
+        script.log_failure.assert_called_once()
+        self.assertIn('message', script.log_failure.call_args.kwargs)
+
+    def test_run_script_general_exception_logs_traceback_and_reraises(self):
+        runner = _make_runner()
+        script = DummyScript(run_exception=RuntimeError('boom'))
+
+        with self.assertRaisesMessage(RuntimeError, 'boom'):
+            runner.run_script(script, request=None, data={}, commit=True)
+
+        script.log_failure.assert_called_once()
+        # The failure message wraps a traceback; assert on the structural marker
+        # (Traceback header) rather than translated copy.
+        message = script.log_failure.call_args.kwargs['message']
+        self.assertIn('Traceback', message)
+        self.assertEqual(runner.job.data, {'status': 'done'})
+
+    def test_run_script_sends_clear_events_when_request_is_present_and_error_occurs(self):
+        runner = _make_runner()
+        script = DummyScript(run_exception=RuntimeError('boom'))
+        request = MagicMock()
+
+        with patch('extras.jobs.clear_events.send') as send:
+            with self.assertRaises(RuntimeError):
+                runner.run_script(script, request=request, data={}, commit=True)
+
+        send.assert_called_once_with(request)
+
+    def test_run_script_default_db_uses_single_atomic(self):
+        # Symmetric with test_run_script_enters_secondary_atomic_when_changelog_db_differs:
+        # when the changelog DB is the default, only a single atomic() block is opened.
+        runner = _make_runner()
+        script = DummyScript(run_result='ok')
+        atomic_calls = []
+
+        @contextmanager
+        def fake_atomic(using):
+            atomic_calls.append(using)
+            yield
+
+        with (
+            patch('extras.jobs.router.db_for_write', return_value=DEFAULT_DB_ALIAS),
+            patch('extras.jobs.transaction.atomic', side_effect=fake_atomic),
+        ):
+            runner.run_script(script, request=None, data={}, commit=True)
+
+        self.assertEqual(atomic_calls, [DEFAULT_DB_ALIAS])
+        script.run.assert_called_once_with({}, True)
+        self.assertEqual(runner.job.data, {'status': 'done'})
+
+    def test_run_script_enters_secondary_atomic_when_changelog_db_differs(self):
+        # Verifies that the code enters two nested atomic() context managers when
+        # the changelog DB differs from the default. Does not exercise real rollback
+        # semantics (transaction.atomic is patched).
+        runner = _make_runner()
+        script = DummyScript(run_result='ok')
+
+        atomic_calls = []
+
+        @contextmanager
+        def fake_atomic(using):
+            atomic_calls.append(using)
+            yield
+
+        with (
+            patch('extras.jobs.router.db_for_write', return_value='changelog_db'),
+            patch('extras.jobs.transaction.atomic', side_effect=fake_atomic),
+        ):
+            runner.run_script(script, request=None, data={}, commit=True)
+
+        self.assertEqual(atomic_calls, [DEFAULT_DB_ALIAS, 'changelog_db'])
+        script.run.assert_called_once_with({}, True)
+        self.assertEqual(runner.job.data, {'status': 'done'})
+
+    def test_run_script_enters_secondary_atomic_when_commit_false(self):
+        # Mirror of the previous test for the commit=False branch on the secondary
+        # DB path. Verifies both atomic blocks are entered; does not exercise true
+        # rollback semantics (transaction.atomic is patched).
+        runner = _make_runner()
+        script = DummyScript(run_result='ok')
+        atomic_calls = []
+
+        @contextmanager
+        def fake_atomic(using):
+            atomic_calls.append(using)
+            yield
+
+        with (
+            patch('extras.jobs.router.db_for_write', return_value='changelog_db'),
+            patch('extras.jobs.transaction.atomic', side_effect=fake_atomic),
+        ):
+            runner.run_script(script, request=None, data={}, commit=False)
+
+        self.assertEqual(atomic_calls, [DEFAULT_DB_ALIAS, 'changelog_db'])
+        script.run.assert_called_once_with({}, False)
+        script.log_info.assert_called_once()
+
+
+class ScriptJobRunTestCase(TestCase):
+    @staticmethod
+    def _runner():
+        return _make_runner(object_id=1)
+
+    @staticmethod
+    def _script_model(script_instance):
+        return SimpleNamespace(pk=1, python_class=MagicMock(return_value=script_instance))
+
+    def test_run_loads_script_model_and_instantiates_python_class(self):
+        runner = self._runner()
+        script_instance = DummyScript()
+        script_model = self._script_model(script_instance)
+
+        with (
+            patch('extras.jobs.ScriptModel.objects.get', return_value=script_model) as get_script_model,
+            patch.object(ScriptJob, 'run_script') as run_script,
+            patch.dict('netbox.registry.registry', {'request_processors': []}, clear=False),
+        ):
+            runner.run(data={'k': 'v'}, commit=True)
+
+        get_script_model.assert_called_once_with(pk=1)
+        script_model.python_class.assert_called_once_with()
+        run_script.assert_called_once()
+        args, _ = run_script.call_args
+        self.assertIs(args[0], script_instance)
+        self.assertEqual(args[2], {'k': 'v'})  # data
+        self.assertIs(args[3], True)  # commit
+
+    def test_run_merges_request_files_into_data(self):
+        runner = self._runner()
+        script_instance = DummyScript()
+        script_model = self._script_model(script_instance)
+        request = MagicMock(FILES={'upload': 'fileobj'}, id='req-1')
+        data = {'k': 'v'}
+
+        with (
+            patch('extras.jobs.ScriptModel.objects.get', return_value=script_model),
+            patch.object(ScriptJob, 'run_script') as run_script,
+            patch.dict('netbox.registry.registry', {'request_processors': []}, clear=False),
+        ):
+            runner.run(data=data, request=request, commit=True)
+
+        passed_data = run_script.call_args.args[2]
+        self.assertEqual(passed_data['k'], 'v')
+        self.assertEqual(passed_data['upload'], 'fileobj')
+
+    def test_run_sets_script_request_when_request_is_present(self):
+        runner = self._runner()
+        script_instance = DummyScript()
+        script_model = self._script_model(script_instance)
+        request = MagicMock(FILES={}, id='req-1')
+
+        with (
+            patch('extras.jobs.ScriptModel.objects.get', return_value=script_model),
+            patch.object(ScriptJob, 'run_script'),
+            patch.dict('netbox.registry.registry', {'request_processors': []}, clear=False),
+        ):
+            runner.run(data={}, request=request, commit=True)
+
+        self.assertIs(script_instance.request, request)
+
+    def _processor_factories(self, entered):
+        def ctx_a(request):
+            @contextmanager
+            def _ctx():
+                entered.append('proc_a')
+                yield
+
+            return _ctx()
+
+        def ctx_event(request):
+            @contextmanager
+            def _ctx():
+                entered.append('event_tracking')
+                yield
+
+            return _ctx()
+
+        return ctx_a, ctx_event
+
+    def test_run_uses_request_processors_when_commit_true(self):
+        runner = self._runner()
+        script_instance = DummyScript()
+        script_model = self._script_model(script_instance)
+        entered = []
+        ctx_a, ctx_event = self._processor_factories(entered)
+
+        with (
+            patch('extras.jobs.ScriptModel.objects.get', return_value=script_model),
+            patch.object(ScriptJob, 'run_script'),
+            patch('extras.jobs.event_tracking', new=ctx_event),
+            patch.dict(
+                'netbox.registry.registry',
+                {'request_processors': [ctx_a, ctx_event]},
+                clear=False,
+            ),
+        ):
+            runner.run(data={}, commit=True)
+
+        self.assertEqual(entered, ['proc_a', 'event_tracking'])
+
+    def test_run_skips_event_tracking_when_commit_false(self):
+        runner = self._runner()
+        script_instance = DummyScript()
+        script_model = self._script_model(script_instance)
+        entered = []
+        ctx_a, ctx_event = self._processor_factories(entered)
+
+        with (
+            patch('extras.jobs.ScriptModel.objects.get', return_value=script_model),
+            patch.object(ScriptJob, 'run_script'),
+            patch('extras.jobs.event_tracking', new=ctx_event),
+            patch.dict(
+                'netbox.registry.registry',
+                {'request_processors': [ctx_a, ctx_event]},
+                clear=False,
+            ),
+        ):
+            runner.run(data={}, commit=False)
+
+        self.assertEqual(entered, ['proc_a'])