Explorar o código

test(jobs): Add comprehensive test coverage for job runners

Add test suites for SystemHousekeepingJob, SyncDataSourceJob, and
ScriptJob covering housekeeping tasks, data source synchronization,
script rollback paths, and request processor integration.

Includes helpers to safely instantiate runners without accumulating
log handlers across tests, plus a DummyScript test double.

Fixes #22125
Martin Hauser hai 1 semana
pai
achega
7fb061c4d1
Modificáronse 2 ficheiros con 696 adicións e 0 borrados
  1. 367 0
      netbox/core/tests/test_jobs.py
  2. 329 0
      netbox/extras/tests/test_jobs.py

+ 367 - 0
netbox/core/tests/test_jobs.py

@@ -0,0 +1,367 @@
+import uuid
+from datetime import timedelta
+from unittest.mock import MagicMock, patch
+
+import requests
+from django.conf import settings
+from django.test import TestCase, override_settings
+from django.utils import timezone
+
+from core.choices import DataSourceStatusChoices
+from core.jobs import SyncDataSourceJob, SystemHousekeepingJob
+from core.models import DataFile, DataSource, Job
+
+
+def _make_runner(cls, **job_attrs):
+    """
+    Build a JobRunner without going through ``__init__``.
+
+    ``JobRunner.__init__`` attaches a ``JobLogHandler`` to a module-level
+    singleton logger, so calling it once per test would accumulate handlers
+    across the suite. Bypass it and stub the logger directly.
+    """
+    runner = cls.__new__(cls)
+    runner.job = MagicMock(**job_attrs)
+    runner.logger = MagicMock()
+    return runner
+
+
+class HousekeepingRunnerMixin:
+    """Provides a `_runner()` helper that builds a SystemHousekeepingJob with a mock job."""
+
+    @staticmethod
+    def _runner():
+        return _make_runner(SystemHousekeepingJob)
+
+
+class SyncDataSourceJobTestCase(TestCase):
+    @classmethod
+    def setUpTestData(cls):
+        cls.datasource = DataSource.objects.create(
+            name='Test Source',
+            type='local',
+            source_url='/tmp/test',
+        )
+
+    def test_enqueue_sets_datasource_status_to_queued(self):
+        job = MagicMock()
+        job.object = self.datasource
+
+        with patch('core.models.Job.enqueue', return_value=job):
+            result = SyncDataSourceJob.enqueue(instance=self.datasource)
+
+        self.assertIs(result, job)
+        # Verify both the in-memory assignment (`datasource.status = ...`) and the
+        # persisted update (`DataSource.objects.filter(pk=...).update(...)`).
+        self.assertEqual(self.datasource.status, DataSourceStatusChoices.QUEUED)
+        self.datasource.refresh_from_db()
+        self.assertEqual(self.datasource.status, DataSourceStatusChoices.QUEUED)
+
+    def test_enqueue_without_object_is_noop(self):
+        # Baseline: the datasource starts at NEW. (Captures setUpTestData drift.)
+        self.assertEqual(self.datasource.status, DataSourceStatusChoices.NEW)
+
+        job = MagicMock()
+        job.object = None
+
+        with (
+            patch('core.models.Job.enqueue', return_value=job),
+            patch(
+                'core.jobs.DataSource.objects.filter',
+                wraps=DataSource.objects.filter,
+            ) as filter_,
+        ):
+            result = SyncDataSourceJob.enqueue()
+
+        self.assertIs(result, job)
+        # Intent: the `if datasource := job.object` branch was skipped, so no
+        # filter().update() call was even attempted.
+        filter_.assert_not_called()
+        # Outcome: no side effect on any existing DataSource.
+        self.datasource.refresh_from_db()
+        self.assertEqual(self.datasource.status, DataSourceStatusChoices.NEW)
+
+    def test_run_syncs_datasource_and_updates_search_cache(self):
+        datafile = DataFile.objects.create(
+            source=self.datasource,
+            path='test.txt',
+            last_updated=timezone.now(),
+            size=4,
+            hash='0' * 64,
+            data=b'test',
+        )
+        runner = _make_runner(SyncDataSourceJob, object_id=self.datasource.pk)
+
+        with (
+            patch('core.models.DataSource.sync') as sync,
+            patch('core.jobs.search_backend.cache') as cache,
+        ):
+            runner.run()
+
+        sync.assert_called_once_with()
+        cache.assert_called_once()
+        # The cache argument should iterate over the datasource's data files.
+        cache_arg = cache.call_args.args[0]
+        self.assertEqual(list(cache_arg), [datafile])
+
+    def test_run_marks_datasource_failed_and_reraises_on_sync_error(self):
+        runner = _make_runner(SyncDataSourceJob, object_id=self.datasource.pk)
+
+        with (
+            patch('core.models.DataSource.sync', side_effect=RuntimeError('boom')),
+            patch('core.jobs.search_backend.cache') as cache,
+        ):
+            with self.assertRaisesMessage(RuntimeError, 'boom'):
+                runner.run()
+
+        self.datasource.refresh_from_db()
+        self.assertEqual(self.datasource.status, DataSourceStatusChoices.FAILED)
+        cache.assert_not_called()
+
+    def test_run_raises_when_datasource_no_longer_exists(self):
+        # If the DataSource was deleted between enqueue and run, the initial lookup
+        # raises DoesNotExist; the framework will surface it as a job error.
+        runner = _make_runner(SyncDataSourceJob, object_id=99_999_999)
+
+        with self.assertRaises(DataSource.DoesNotExist):
+            runner.run()
+
+
+class SystemHousekeepingRunTestCase(HousekeepingRunnerMixin, TestCase):
+    SUBMETHODS = (
+        'send_census_report',
+        'clear_expired_sessions',
+        'prune_changelog',
+        'delete_expired_jobs',
+        'check_for_new_releases',
+    )
+
+    @override_settings(DEBUG=True)
+    def test_run_skips_when_debug_is_enabled(self):
+        # DEBUG is checked before sys.argv; sys.argv is irrelevant here.
+        runner = self._runner()
+        patches = {name: patch.object(SystemHousekeepingJob, name) for name in self.SUBMETHODS}
+        mocks = {name: p.start() for name, p in patches.items()}
+        self.addCleanup(lambda: [p.stop() for p in patches.values()])
+
+        runner.run()
+
+        for mock in mocks.values():
+            mock.assert_not_called()
+
+    @override_settings(DEBUG=False)
+    def test_run_skips_during_test_invocation(self):
+        runner = self._runner()
+        patches = {name: patch.object(SystemHousekeepingJob, name) for name in self.SUBMETHODS}
+        mocks = {name: p.start() for name, p in patches.items()}
+        self.addCleanup(lambda: [p.stop() for p in patches.values()])
+
+        with patch('core.jobs.sys.argv', ['manage.py', 'test']):
+            runner.run()
+
+        for mock in mocks.values():
+            mock.assert_not_called()
+
+    @override_settings(DEBUG=False)
+    def test_run_executes_all_housekeeping_tasks(self):
+        runner = self._runner()
+        patches = {name: patch.object(SystemHousekeepingJob, name) for name in self.SUBMETHODS}
+        mocks = {name: p.start() for name, p in patches.items()}
+        self.addCleanup(lambda: [p.stop() for p in patches.values()])
+
+        with patch('core.jobs.sys.argv', ['netbox']):
+            runner.run()
+
+        for mock in mocks.values():
+            mock.assert_called_once_with()
+
+
+class SendCensusReportTestCase(HousekeepingRunnerMixin, TestCase):
+    @override_settings(ISOLATED_DEPLOYMENT=True)
+    def test_send_census_report_skips_when_isolated_deployment(self):
+        with patch('core.jobs.requests.get') as get:
+            self._runner().send_census_report()
+        get.assert_not_called()
+
+    @override_settings(ISOLATED_DEPLOYMENT=False, CENSUS_REPORTING_ENABLED=False)
+    def test_send_census_report_skips_when_reporting_disabled(self):
+        with patch('core.jobs.requests.get') as get:
+            self._runner().send_census_report()
+        get.assert_not_called()
+
+    @override_settings(
+        ISOLATED_DEPLOYMENT=False,
+        CENSUS_REPORTING_ENABLED=True,
+        CENSUS_URL='https://census.example/',
+        DEPLOYMENT_ID='abc123',
+    )
+    def test_send_census_report_sends_expected_payload(self):
+        with (
+            patch('core.jobs.requests.get') as get,
+            patch('core.jobs.resolve_proxies', return_value={'https': 'proxy'}) as resolve,
+        ):
+            self._runner().send_census_report()
+
+        resolve.assert_called_once_with(url='https://census.example/')
+        get.assert_called_once()
+        kwargs = get.call_args.kwargs
+        self.assertEqual(kwargs['url'], 'https://census.example/')
+        self.assertEqual(kwargs['timeout'], 3)
+        self.assertEqual(kwargs['proxies'], {'https': 'proxy'})
+        self.assertEqual(kwargs['params']['deployment_id'], 'abc123')
+        self.assertIn('version', kwargs['params'])
+        self.assertIn('python_version', kwargs['params'])
+
+    @override_settings(
+        ISOLATED_DEPLOYMENT=False,
+        CENSUS_REPORTING_ENABLED=True,
+        CENSUS_URL='https://census.example/',
+    )
+    def test_send_census_report_swallows_request_exception(self):
+        with (
+            patch('core.jobs.requests.get', side_effect=requests.RequestException('down')),
+            patch('core.jobs.resolve_proxies', return_value={}),
+        ):
+            self._runner().send_census_report()  # must not raise
+
+
+class ClearExpiredSessionsTestCase(HousekeepingRunnerMixin, TestCase):
+    def test_clear_expired_sessions_calls_session_store(self):
+        engine = MagicMock()
+        with patch('core.jobs.import_module', return_value=engine) as import_module:
+            self._runner().clear_expired_sessions()
+
+        import_module.assert_called_once_with(settings.SESSION_ENGINE)
+        engine.SessionStore.clear_expired.assert_called_once_with()
+
+    def test_clear_expired_sessions_handles_not_implemented(self):
+        engine = MagicMock()
+        engine.SessionStore.clear_expired.side_effect = NotImplementedError
+        runner = self._runner()
+
+        with patch('core.jobs.import_module', return_value=engine):
+            runner.clear_expired_sessions()  # must not raise
+
+        runner.logger.warning.assert_called_once()
+        self.assertIn(
+            'does not support',
+            runner.logger.warning.call_args.args[0],
+        )
+
+
+class DeleteExpiredJobsTestCase(HousekeepingRunnerMixin, TestCase):
+    def test_delete_expired_jobs_skips_when_retention_unset(self):
+        old_job = Job.objects.create(name='old', job_id=uuid.uuid4())
+        Job.objects.filter(pk=old_job.pk).update(created=timezone.now() - timedelta(days=365))
+
+        with patch('core.jobs.Config') as config_cls:
+            config_cls.return_value.JOB_RETENTION = 0
+            self._runner().delete_expired_jobs()
+
+        self.assertTrue(Job.objects.filter(pk=old_job.pk).exists())
+
+    def test_delete_expired_jobs_deletes_only_jobs_older_than_retention(self):
+        old_job = Job.objects.create(name='old', job_id=uuid.uuid4())
+        recent_job = Job.objects.create(name='recent', job_id=uuid.uuid4())
+        Job.objects.filter(pk=old_job.pk).update(created=timezone.now() - timedelta(days=30))
+        Job.objects.filter(pk=recent_job.pk).update(created=timezone.now() - timedelta(hours=1))
+
+        with patch('core.jobs.Config') as config_cls:
+            config_cls.return_value.JOB_RETENTION = 7
+            self._runner().delete_expired_jobs()
+
+        self.assertFalse(Job.objects.filter(pk=old_job.pk).exists())
+        self.assertTrue(Job.objects.filter(pk=recent_job.pk).exists())
+
+
+class CheckForNewReleasesTestCase(HousekeepingRunnerMixin, TestCase):
+    @override_settings(ISOLATED_DEPLOYMENT=True)
+    def test_check_for_new_releases_skips_when_isolated(self):
+        with patch('core.jobs.requests.get') as get:
+            self._runner().check_for_new_releases()
+        get.assert_not_called()
+
+    @override_settings(ISOLATED_DEPLOYMENT=False, RELEASE_CHECK_URL=None)
+    def test_check_for_new_releases_skips_when_url_unset(self):
+        with patch('core.jobs.requests.get') as get:
+            self._runner().check_for_new_releases()
+        get.assert_not_called()
+
+    @override_settings(ISOLATED_DEPLOYMENT=False, RELEASE_CHECK_URL='https://api.example/')
+    def test_check_for_new_releases_handles_request_exception(self):
+        with (
+            patch('core.jobs.requests.get', side_effect=requests.RequestException('down')),
+            patch('core.jobs.cache') as cache,
+            patch('core.jobs.resolve_proxies', return_value={}),
+        ):
+            self._runner().check_for_new_releases()
+
+        cache.set.assert_not_called()
+
+    @override_settings(ISOLATED_DEPLOYMENT=False, RELEASE_CHECK_URL='https://api.example/')
+    def test_check_for_new_releases_caches_latest_stable_release(self):
+        response = MagicMock()
+        response.json.return_value = [
+            {'tag_name': 'v4.5.0', 'html_url': 'https://example/4.5.0'},
+            {'tag_name': 'v4.6.0', 'html_url': 'https://example/4.6.0'},
+            {'tag_name': 'v4.7.0-rc1', 'html_url': 'https://example/rc', 'prerelease': True},
+            {'tag_name': 'v4.7.0-dev', 'html_url': 'https://example/dev', 'devrelease': True},
+            {'html_url': 'https://example/no-tag'},
+        ]
+
+        with (
+            patch('core.jobs.requests.get', return_value=response) as get,
+            patch('core.jobs.cache.set') as cache_set,
+            patch('core.jobs.resolve_proxies', return_value={'http': 'proxy'}) as resolve,
+        ):
+            self._runner().check_for_new_releases()
+
+        # HTTP request: URL, GitHub API Accept header, resolved proxies.
+        resolve.assert_called_once_with(url='https://api.example/')
+        get.assert_called_once_with(
+            url='https://api.example/',
+            headers={'Accept': 'application/vnd.github.v3+json'},
+            proxies={'http': 'proxy'},
+        )
+        response.raise_for_status.assert_called_once_with()
+
+        cache_set.assert_called_once()
+        # Accept either positional or keyword form for cache.set(key, value, ttl).
+        call = cache_set.call_args
+        bound = {**dict(zip(('key', 'value', 'timeout'), call.args)), **call.kwargs}
+        self.assertEqual(bound['key'], 'latest_release')
+        self.assertIsNone(bound['timeout'])
+        latest_version, latest_url = bound['value']
+        self.assertEqual(str(latest_version), '4.6.0')
+        self.assertEqual(latest_url, 'https://example/4.6.0')
+
+
+class PruneChangelogTestCase(HousekeepingRunnerMixin, TestCase):
+    def test_prune_changelog_skips_when_retention_unset(self):
+        with (
+            patch('core.jobs.Config') as config_cls,
+            patch('core.jobs.ObjectChange') as object_change,
+        ):
+            config_cls.return_value.CHANGELOG_RETENTION = None
+            self._runner().prune_changelog()
+
+        object_change.objects.filter.assert_not_called()
+
+    def test_prune_changelog_uses_strict_cutoff_filter(self):
+        # Implementation pin: prune_changelog must use time__lt (strict less-than) so a
+        # record exactly at the cutoff is retained. End-to-end behavior of the prune is
+        # covered by ChangelogPruneRetentionTestCase in core/tests/test_changelog.py.
+        with (
+            patch('core.jobs.Config') as config_cls,
+            patch('core.jobs.ObjectChange') as object_change,
+            patch('core.jobs.timezone') as tz,
+        ):
+            config_cls.return_value.CHANGELOG_RETENTION = 7
+            config_cls.return_value.CHANGELOG_RETAIN_CREATE_LAST_UPDATE = False
+            tz.now.return_value = timezone.datetime(2026, 1, 8, tzinfo=timezone.get_current_timezone())
+            object_change.objects.filter.return_value.delete.return_value = (0, {})
+
+            self._runner().prune_changelog()
+
+        expected_cutoff = timezone.datetime(2026, 1, 1, tzinfo=timezone.get_current_timezone())
+        object_change.objects.filter.assert_called_once_with(time__lt=expected_cutoff)

+ 329 - 0
netbox/extras/tests/test_jobs.py

@@ -0,0 +1,329 @@
+from contextlib import contextmanager
+from types import SimpleNamespace
+from unittest.mock import MagicMock, patch
+
+from django.db import DEFAULT_DB_ALIAS
+from django.test import TestCase
+
+from extras.jobs import ScriptJob
+from utilities.exceptions import AbortScript
+
+
+def _make_runner(**job_attrs):
+    """
+    Build a ScriptJob without going through ``__init__``.
+
+    ``JobRunner.__init__`` attaches a ``JobLogHandler`` to a module-level
+    singleton logger; instantiating one per test would accumulate handlers.
+    """
+    runner = ScriptJob.__new__(ScriptJob)
+    runner.job = MagicMock(**job_attrs)
+    runner.logger = MagicMock()
+    return runner
+
+
+class DummyScript:
+    """A minimal stand-in for a Script implementation."""
+
+    full_name = 'tests.DummyScript'
+
+    def __init__(self, run_result=None, run_exception=None, failed=False):
+        self._run_result = run_result
+        self._run_exception = run_exception
+        self.failed = failed
+        self.output = None
+        self.request = None
+        self.run = MagicMock(side_effect=self._run_impl)
+        self.log_info = MagicMock()
+        self.log_failure = MagicMock()
+        self.get_job_data = MagicMock(return_value={'status': 'done'})
+
+    def _run_impl(self, data, commit):
+        if self._run_exception is not None:
+            raise self._run_exception
+        return self._run_result
+
+
+class RunScriptTestCase(TestCase):
+    def test_run_script_success_commit_true_sets_output_and_job_data(self):
+        runner = _make_runner()
+        script = DummyScript(run_result='hello')
+
+        runner.run_script(script, request=None, data={'k': 'v'}, commit=True)
+
+        self.assertEqual(script.output, 'hello')
+        script.run.assert_called_once_with({'k': 'v'}, True)
+        self.assertEqual(runner.job.data, {'status': 'done'})
+
+    def test_run_script_commit_false_rolls_back_without_raising(self):
+        runner = _make_runner()
+        script = DummyScript(run_result='hello')
+
+        runner.run_script(script, request=None, data={}, commit=False)
+
+        script.run.assert_called_once_with({}, False)
+        # Rollback path: log_info() is called (with the translated revert message);
+        # job.data is still populated. Don't assert exact wording.
+        script.log_info.assert_called_once()
+        self.assertEqual(runner.job.data, {'status': 'done'})
+
+    def test_run_script_commit_false_logs_warning_when_script_failed(self):
+        runner = _make_runner()
+        script = DummyScript(run_result='hello', failed=True)
+
+        with patch('extras.jobs.logging.getLogger') as get_logger:
+            logger = get_logger.return_value
+            runner.run_script(script, request=None, data={}, commit=False)
+
+        logger.warning.assert_any_call('Script failed')
+
+    def test_run_script_abort_script_logs_failure_and_reraises(self):
+        # Non-report scripts call `script.log_failure(msg)` positionally; report-style
+        # scripts use `log_failure(message=msg)`. See `test_run_script_abort_script_uses_report_log_failure_signature`.
+        runner = _make_runner()
+        script = DummyScript(run_exception=AbortScript('nope'))
+
+        with self.assertRaises(AbortScript):
+            runner.run_script(script, request=None, data={}, commit=True)
+
+        # Outcome assertions: AbortScript re-raised, failure logged on the script,
+        # rollback path traversed (log_info called), job.data populated.
+        script.log_failure.assert_called_once()
+        # Non-report path uses the positional signature; verify both that there is
+        # a positional arg and no `message=` kwarg.
+        self.assertTrue(script.log_failure.call_args.args)
+        self.assertFalse(script.log_failure.call_args.kwargs)
+        script.log_info.assert_called_once()
+        self.assertEqual(runner.job.data, {'status': 'done'})
+
+    def test_run_script_abort_script_uses_report_log_failure_signature(self):
+        runner = _make_runner()
+        script = DummyScript(run_exception=AbortScript('reportfail'))
+
+        with (
+            patch('extras.jobs.is_report', return_value=True),
+            self.assertRaises(AbortScript),
+        ):
+            runner.run_script(script, request=None, data={}, commit=True)
+
+        # For reports, log_failure is called with the keyword-form signature.
+        script.log_failure.assert_called_once()
+        self.assertIn('message', script.log_failure.call_args.kwargs)
+
+    def test_run_script_general_exception_logs_traceback_and_reraises(self):
+        runner = _make_runner()
+        script = DummyScript(run_exception=RuntimeError('boom'))
+
+        with self.assertRaisesMessage(RuntimeError, 'boom'):
+            runner.run_script(script, request=None, data={}, commit=True)
+
+        script.log_failure.assert_called_once()
+        # The failure message wraps a traceback; assert on the structural marker
+        # (Traceback header) rather than translated copy.
+        message = script.log_failure.call_args.kwargs['message']
+        self.assertIn('Traceback', message)
+        self.assertEqual(runner.job.data, {'status': 'done'})
+
+    def test_run_script_sends_clear_events_when_request_is_present_and_error_occurs(self):
+        runner = _make_runner()
+        script = DummyScript(run_exception=RuntimeError('boom'))
+        request = MagicMock()
+
+        with patch('extras.jobs.clear_events.send') as send:
+            with self.assertRaises(RuntimeError):
+                runner.run_script(script, request=request, data={}, commit=True)
+
+        send.assert_called_once_with(request)
+
+    def test_run_script_default_db_uses_single_atomic(self):
+        # Symmetric with test_run_script_enters_secondary_atomic_when_changelog_db_differs:
+        # when the changelog DB is the default, only a single atomic() block is opened.
+        runner = _make_runner()
+        script = DummyScript(run_result='ok')
+        atomic_calls = []
+
+        @contextmanager
+        def fake_atomic(using):
+            atomic_calls.append(using)
+            yield
+
+        with (
+            patch('extras.jobs.router.db_for_write', return_value=DEFAULT_DB_ALIAS),
+            patch('extras.jobs.transaction.atomic', side_effect=fake_atomic),
+        ):
+            runner.run_script(script, request=None, data={}, commit=True)
+
+        self.assertEqual(atomic_calls, [DEFAULT_DB_ALIAS])
+        script.run.assert_called_once_with({}, True)
+        self.assertEqual(runner.job.data, {'status': 'done'})
+
+    def test_run_script_enters_secondary_atomic_when_changelog_db_differs(self):
+        # Verifies that the code enters two nested atomic() context managers when
+        # the changelog DB differs from the default. Does not exercise real rollback
+        # semantics (transaction.atomic is patched).
+        runner = _make_runner()
+        script = DummyScript(run_result='ok')
+
+        atomic_calls = []
+
+        @contextmanager
+        def fake_atomic(using):
+            atomic_calls.append(using)
+            yield
+
+        with (
+            patch('extras.jobs.router.db_for_write', return_value='changelog_db'),
+            patch('extras.jobs.transaction.atomic', side_effect=fake_atomic),
+        ):
+            runner.run_script(script, request=None, data={}, commit=True)
+
+        self.assertEqual(atomic_calls, [DEFAULT_DB_ALIAS, 'changelog_db'])
+        script.run.assert_called_once_with({}, True)
+        self.assertEqual(runner.job.data, {'status': 'done'})
+
+    def test_run_script_enters_secondary_atomic_when_commit_false(self):
+        # Mirror of the previous test for the commit=False branch on the secondary
+        # DB path. Verifies both atomic blocks are entered; does not exercise true
+        # rollback semantics (transaction.atomic is patched).
+        runner = _make_runner()
+        script = DummyScript(run_result='ok')
+        atomic_calls = []
+
+        @contextmanager
+        def fake_atomic(using):
+            atomic_calls.append(using)
+            yield
+
+        with (
+            patch('extras.jobs.router.db_for_write', return_value='changelog_db'),
+            patch('extras.jobs.transaction.atomic', side_effect=fake_atomic),
+        ):
+            runner.run_script(script, request=None, data={}, commit=False)
+
+        self.assertEqual(atomic_calls, [DEFAULT_DB_ALIAS, 'changelog_db'])
+        script.run.assert_called_once_with({}, False)
+        script.log_info.assert_called_once()
+
+
+class ScriptJobRunTestCase(TestCase):
+    @staticmethod
+    def _runner():
+        return _make_runner(object_id=1)
+
+    @staticmethod
+    def _script_model(script_instance):
+        return SimpleNamespace(pk=1, python_class=MagicMock(return_value=script_instance))
+
+    def test_run_loads_script_model_and_instantiates_python_class(self):
+        runner = self._runner()
+        script_instance = DummyScript()
+        script_model = self._script_model(script_instance)
+
+        with (
+            patch('extras.jobs.ScriptModel.objects.get', return_value=script_model) as get_script_model,
+            patch.object(ScriptJob, 'run_script') as run_script,
+            patch.dict('netbox.registry.registry', {'request_processors': []}, clear=False),
+        ):
+            runner.run(data={'k': 'v'}, commit=True)
+
+        get_script_model.assert_called_once_with(pk=1)
+        script_model.python_class.assert_called_once_with()
+        run_script.assert_called_once()
+        args, _ = run_script.call_args
+        self.assertIs(args[0], script_instance)
+        self.assertEqual(args[2], {'k': 'v'})  # data
+        self.assertIs(args[3], True)  # commit
+
+    def test_run_merges_request_files_into_data(self):
+        runner = self._runner()
+        script_instance = DummyScript()
+        script_model = self._script_model(script_instance)
+        request = MagicMock(FILES={'upload': 'fileobj'}, id='req-1')
+        data = {'k': 'v'}
+
+        with (
+            patch('extras.jobs.ScriptModel.objects.get', return_value=script_model),
+            patch.object(ScriptJob, 'run_script') as run_script,
+            patch.dict('netbox.registry.registry', {'request_processors': []}, clear=False),
+        ):
+            runner.run(data=data, request=request, commit=True)
+
+        passed_data = run_script.call_args.args[2]
+        self.assertEqual(passed_data['k'], 'v')
+        self.assertEqual(passed_data['upload'], 'fileobj')
+
+    def test_run_sets_script_request_when_request_is_present(self):
+        runner = self._runner()
+        script_instance = DummyScript()
+        script_model = self._script_model(script_instance)
+        request = MagicMock(FILES={}, id='req-1')
+
+        with (
+            patch('extras.jobs.ScriptModel.objects.get', return_value=script_model),
+            patch.object(ScriptJob, 'run_script'),
+            patch.dict('netbox.registry.registry', {'request_processors': []}, clear=False),
+        ):
+            runner.run(data={}, request=request, commit=True)
+
+        self.assertIs(script_instance.request, request)
+
+    def _processor_factories(self, entered):
+        def ctx_a(request):
+            @contextmanager
+            def _ctx():
+                entered.append('proc_a')
+                yield
+
+            return _ctx()
+
+        def ctx_event(request):
+            @contextmanager
+            def _ctx():
+                entered.append('event_tracking')
+                yield
+
+            return _ctx()
+
+        return ctx_a, ctx_event
+
+    def test_run_uses_request_processors_when_commit_true(self):
+        runner = self._runner()
+        script_instance = DummyScript()
+        script_model = self._script_model(script_instance)
+        entered = []
+        ctx_a, ctx_event = self._processor_factories(entered)
+
+        with (
+            patch('extras.jobs.ScriptModel.objects.get', return_value=script_model),
+            patch.object(ScriptJob, 'run_script'),
+            patch('extras.jobs.event_tracking', new=ctx_event),
+            patch.dict(
+                'netbox.registry.registry',
+                {'request_processors': [ctx_a, ctx_event]},
+                clear=False,
+            ),
+        ):
+            runner.run(data={}, commit=True)
+
+        self.assertEqual(entered, ['proc_a', 'event_tracking'])
+
+    def test_run_skips_event_tracking_when_commit_false(self):
+        runner = self._runner()
+        script_instance = DummyScript()
+        script_model = self._script_model(script_instance)
+        entered = []
+        ctx_a, ctx_event = self._processor_factories(entered)
+
+        with (
+            patch('extras.jobs.ScriptModel.objects.get', return_value=script_model),
+            patch.object(ScriptJob, 'run_script'),
+            patch('extras.jobs.event_tracking', new=ctx_event),
+            patch.dict(
+                'netbox.registry.registry',
+                {'request_processors': [ctx_a, ctx_event]},
+                clear=False,
+            ),
+        ):
+            runner.run(data={}, commit=False)
+
+        self.assertEqual(entered, ['proc_a'])