Selaa lähdekoodia

test(commands): Add comprehensive tests for management commands

Add test coverage for Django management commands across core, dcim,
extras, ipam, and utilities apps.
Tests verify command argument handling, error cases, and integration
with mocked dependencies using patches and test doubles.

Fixes #22124
Martin Hauser 1 viikko sitten
vanhempi
commit
0a49618297

+ 1 - 1
netbox/core/management/commands/nbshell.py

@@ -150,7 +150,7 @@ class Command(BaseCommand):
         try:
         try:
             import readline
             import readline
             import rlcompleter
             import rlcompleter
-        except ModuleNotFoundError:
+        except ModuleNotFoundError:  # pragma: no cover
             pass
             pass
         else:
         else:
             readline.set_completer(rlcompleter.Completer(namespace).complete)
             readline.set_completer(rlcompleter.Completer(namespace).complete)

+ 79 - 0
netbox/core/tests/test_management_command_coverage.py

@@ -0,0 +1,79 @@
+import ast
+from pathlib import Path
+
+from django.apps import apps
+from django.conf import settings
+from django.test import SimpleTestCase
+
+EXCLUDED_CUSTOM_COMMANDS = {
+    # Deprecated; excluded from management command test coverage by #22124.
+    'housekeeping',
+}
+
+
+class ManagementCommandCoverageTestCase(SimpleTestCase):
+    def test_all_custom_management_commands_have_tests(self):
+        custom_commands = self._get_custom_management_commands()
+        tested_commands = self._get_tested_management_commands()
+
+        self.assertTrue(
+            custom_commands,
+            'No custom management commands were discovered; check command discovery logic.',
+        )
+
+        missing_commands = sorted(custom_commands - tested_commands - EXCLUDED_CUSTOM_COMMANDS)
+
+        self.assertEqual(
+            missing_commands,
+            [],
+            msg=(f'Tests are missing for custom management commands: {", ".join(missing_commands)}'),
+        )
+
+    @staticmethod
+    def _get_custom_management_commands():
+        base_dir = Path(settings.BASE_DIR).resolve()
+        commands = set()
+
+        for app_config in apps.get_app_configs():
+            app_path = Path(app_config.path).resolve()
+            if not app_path.is_relative_to(base_dir):
+                continue
+
+            commands_path = app_path / 'management' / 'commands'
+            if not commands_path.exists():
+                continue
+
+            commands.update(path.stem for path in commands_path.glob('*.py') if not path.name.startswith('_'))
+
+        return commands
+
+    @staticmethod
+    def _get_tested_management_commands():
+        base_dir = Path(settings.BASE_DIR).resolve()
+        commands = set()
+
+        for test_file in base_dir.glob('*/tests/test_management_commands.py'):
+            tree = ast.parse(test_file.read_text(encoding='utf-8'))
+            for node in ast.walk(tree):
+                if not isinstance(node, ast.Call):
+                    continue
+                if not _is_call_command(node.func):
+                    continue
+                if not node.args:
+                    continue
+
+                command_name = node.args[0]
+                if isinstance(command_name, ast.Constant) and isinstance(command_name.value, str):
+                    commands.add(command_name.value)
+
+        return commands
+
+
+def _is_call_command(func):
+    if isinstance(func, ast.Name):
+        return func.id == 'call_command'
+
+    if isinstance(func, ast.Attribute):
+        return func.attr == 'call_command'
+
+    return False

+ 317 - 0
netbox/core/tests/test_management_commands.py

@@ -0,0 +1,317 @@
+from io import StringIO
+from types import SimpleNamespace
+from unittest.mock import MagicMock, patch
+
+from django.core.management import call_command
+from django.core.management.base import CommandError
+from django.test import TestCase, override_settings
+
+from core.choices import DataSourceStatusChoices
+from core.management.commands import nbshell
+from core.management.commands.rqworker import DEFAULT_QUEUES
+
+
+class MakeMigrationsTestCase(TestCase):
+    @override_settings(DEVELOPER=False)
+    def test_blocked_in_non_developer_mode(self):
+        with self.assertRaisesMessage(CommandError, 'development purposes only'):
+            call_command('makemigrations', stdout=StringIO(), stderr=StringIO())
+
+    @override_settings(DEVELOPER=False)
+    def test_check_flag_allowed_in_non_developer_mode(self):
+        with patch('core.management.commands.makemigrations._Command.handle') as super_handle:
+            call_command(
+                'makemigrations',
+                check_changes=True,
+                stdout=StringIO(),
+                stderr=StringIO(),
+            )
+
+        super_handle.assert_called_once()
+        self.assertTrue(super_handle.call_args.kwargs['check_changes'])
+
+
+class NbShellTestCase(TestCase):
+    def test_color_helpers_wrap_text(self):
+        self.assertIn('message', nbshell.color('green', 'message'))
+        self.assertIn('message', nbshell.bright('message'))
+
+    def test_get_models_excludes_private_models(self):
+        public_model = type('PublicModel', (), {})
+        private_model = type('PrivateModel', (), {'_netbox_private': True})
+        app_config = SimpleNamespace(get_models=lambda: [public_model, private_model])
+
+        self.assertEqual(nbshell.get_models(app_config), [public_model])
+
+    def test_get_constants_returns_module_attributes(self):
+        constants = SimpleNamespace(FOO='bar', ANSWER=42)
+
+        with patch('core.management.commands.nbshell.import_string', return_value=constants):
+            self.assertEqual(
+                nbshell.get_constants(SimpleNamespace(name='testapp')),
+                {'FOO': 'bar', 'ANSWER': 42},
+            )
+
+    def test_get_constants_handles_missing_constants_module(self):
+        with patch('core.management.commands.nbshell.import_string', side_effect=ImportError):
+            self.assertEqual(nbshell.get_constants(SimpleNamespace(name='testapp')), {})
+
+    def test_executes_inline_command(self):
+        namespace = {}
+
+        with patch(
+            'core.management.commands.nbshell.Command.get_namespace',
+            return_value=namespace,
+        ):
+            call_command('nbshell', command='answer = 42')
+
+        self.assertEqual(namespace['answer'], 42)
+
+    def test_starts_interactive_shell_without_inline_command(self):
+        namespace = {'answer': 42}
+
+        with (
+            patch('core.management.commands.nbshell.Command.get_namespace', return_value=namespace),
+            patch('core.management.commands.nbshell.Command.get_banner_text', return_value='banner'),
+            patch('core.management.commands.nbshell.code.interact', return_value=None) as interact,
+        ):
+            call_command('nbshell', stdout=StringIO())
+
+        interact.assert_called_once_with(banner='banner', local=namespace)
+
+    def test_get_namespace_includes_models_constants_and_helpers(self):
+        class DummyModel:
+            pass
+
+        app_config = SimpleNamespace(
+            name='dummyapp',
+            get_models=lambda: [DummyModel],
+        )
+        command = nbshell.Command()
+        command.django_models = {}
+
+        with (
+            patch('core.management.commands.nbshell.CORE_APPS', ('dummyapp',)),
+            patch('core.management.commands.nbshell.get_installed_plugins', return_value={}),
+            patch('core.management.commands.nbshell.apps.get_app_config', return_value=app_config),
+            patch('core.management.commands.nbshell.get_constants', return_value={'CONSTANT': 'value'}),
+        ):
+            namespace = command.get_namespace()
+
+        self.assertIs(namespace['dummyapp'].DummyModel, DummyModel)
+        self.assertEqual(namespace['dummyapp'].CONSTANT, 'value')
+        self.assertEqual(command.django_models['dummyapp'], ['DummyModel'])
+        self.assertEqual(namespace['lsapps'], command._lsapps)
+        self.assertEqual(namespace['lsmodels'], command._lsmodels)
+
+    def test_list_apps_and_models_helpers(self):
+        command = nbshell.Command()
+        command.django_models = {'dcim': ['Device', 'Site']}
+        app_config = SimpleNamespace(verbose_name='DCIM')
+
+        with (
+            patch('core.management.commands.nbshell.apps.get_app_config', return_value=app_config),
+            patch('builtins.print') as print_,
+        ):
+            command._lsapps()
+            command._lsmodels('dcim')
+
+        self.assertIn(('dcim - DCIM',), [call.args for call in print_.call_args_list])
+        self.assertIn(('DCIM:',), [call.args for call in print_.call_args_list])
+        self.assertIn(('  dcim.Device',), [call.args for call in print_.call_args_list])
+        self.assertIn(('  dcim.Site',), [call.args for call in print_.call_args_list])
+
+    def test_list_models_reports_unknown_app(self):
+        command = nbshell.Command()
+        command.django_models = {}
+
+        with patch('builtins.print') as print_:
+            command._lsmodels('unknown')
+
+        print_.assert_called_once_with('No models listed for unknown')
+
+    def test_list_models_lists_all_apps_when_no_app_label_given(self):
+        command = nbshell.Command()
+        command.django_models = {'dcim': ['Device'], 'ipam': ['IPAddress']}
+        app_configs = {
+            'dcim': SimpleNamespace(verbose_name='DCIM'),
+            'ipam': SimpleNamespace(verbose_name='IPAM'),
+        }
+
+        with (
+            patch(
+                'core.management.commands.nbshell.apps.get_app_config',
+                side_effect=lambda label: app_configs[label],
+            ),
+            patch('builtins.print') as print_,
+        ):
+            command._lsmodels()
+
+        printed = [call.args for call in print_.call_args_list]
+        self.assertIn(('DCIM:',), printed)
+        self.assertIn(('IPAM:',), printed)
+        self.assertIn(('  dcim.Device',), printed)
+        self.assertIn(('  ipam.IPAddress',), printed)
+
+    def test_banner_includes_installed_plugins(self):
+        with (
+            patch('core.management.commands.nbshell.platform.node', return_value='netbox'),
+            patch('core.management.commands.nbshell.platform.python_version', return_value='3.12.0'),
+            patch('core.management.commands.nbshell.get_version', return_value='5.2.0'),
+            patch('core.management.commands.nbshell.get_installed_plugins', return_value={'plugin': '1.2.3'}),
+        ):
+            banner = nbshell.Command.get_banner_text()
+
+        self.assertIn('NetBox interactive shell', banner)
+        self.assertIn('Plugins:', banner)
+        self.assertIn('plugin', banner)
+
+
+class RQWorkerTestCase(TestCase):
+    def test_defaults_to_all_queues_and_enables_scheduler(self):
+        with (
+            patch('core.management.commands.rqworker.registry', {'system_jobs': {}}),
+            patch('core.management.commands.rqworker._Command.handle') as super_handle,
+            self.assertLogs('netbox.rqworker', level='WARNING') as logs,
+        ):
+            call_command('rqworker', stdout=StringIO(), stderr=StringIO())
+
+        super_handle.assert_called_once()
+        args, kwargs = super_handle.call_args
+        self.assertEqual(args, DEFAULT_QUEUES)
+        self.assertTrue(kwargs['with_scheduler'])
+        self.assertEqual(len(logs.output), 1)
+        self.assertIn('No queues have been specified', logs.output[0])
+
+    def test_schedules_registered_system_jobs(self):
+        job = MagicMock()
+        job.name = 'TestJob'
+
+        with (
+            patch('core.management.commands.rqworker.registry', {'system_jobs': {job: {'interval': 5}}}),
+            patch('core.management.commands.rqworker._Command.handle') as super_handle,
+        ):
+            call_command('rqworker', 'high', stdout=StringIO(), stderr=StringIO())
+
+        job.enqueue_once.assert_called_once_with(interval=5)
+        super_handle.assert_called_once()
+        args, kwargs = super_handle.call_args
+        self.assertEqual(args, ('high',))
+        self.assertTrue(kwargs['with_scheduler'])
+
+    def test_system_jobs_must_specify_interval(self):
+        job = MagicMock()
+        job.name = 'TestJob'
+
+        with patch('core.management.commands.rqworker.registry', {'system_jobs': {job: {}}}):
+            with self.assertRaisesMessage(TypeError, 'System job must specify an interval'):
+                call_command('rqworker', stdout=StringIO(), stderr=StringIO())
+
+
+class SyncDataSourceTestCase(TestCase):
+    class FakeDataSource:
+        def __init__(self, name):
+            self.name = name
+            self.pk = name
+            self.sync = MagicMock()
+
+        def __str__(self):
+            return self.name
+
+        def get_status_display(self):
+            return 'completed'
+
+    class FakeQuerySet(list):
+        def values(self, *fields):
+            return [{field: getattr(item, field) for field in fields} for item in self]
+
+    def test_requires_name_or_all(self):
+        with self.assertRaisesMessage(CommandError, 'Must specify at least one data source'):
+            call_command('syncdatasource', stdout=StringIO())
+
+    def test_invalid_name(self):
+        with patch('core.management.commands.syncdatasource.DataSource') as data_source_model:
+            data_source_model.objects.filter.return_value = self.FakeQuerySet()
+            with self.assertRaisesMessage(CommandError, 'Invalid data source names: nonexistent-source'):
+                call_command('syncdatasource', 'nonexistent-source', stdout=StringIO())
+
+        data_source_model.objects.filter.assert_called_once()
+        self.assertEqual(
+            set(data_source_model.objects.filter.call_args.kwargs['name__in']),
+            {'nonexistent-source'},
+        )
+
+    def test_all_syncs_datasource(self):
+        datasource = MagicMock()
+        datasource.__str__.return_value = 'Test Data Source'
+        datasource.get_status_display.return_value = 'completed'
+
+        out = StringIO()
+
+        with patch('core.management.commands.syncdatasource.DataSource') as data_source_model:
+            data_source_model.objects.all.return_value = [datasource]
+            call_command('syncdatasource', sync_all=True, stdout=out)
+
+        data_source_model.objects.all.assert_called_once_with()
+        datasource.sync.assert_called_once_with()
+        self.assertIn('Syncing Test Data Source', out.getvalue())
+        self.assertIn('completed', out.getvalue())
+
+    def test_named_datasource_syncs_matching_datasource(self):
+        datasource = self.FakeDataSource('source-a')
+        datasources = self.FakeQuerySet([datasource])
+        out = StringIO()
+
+        with patch('core.management.commands.syncdatasource.DataSource') as data_source_model:
+            data_source_model.objects.filter.return_value = datasources
+            call_command('syncdatasource', 'source-a', stdout=out)
+
+        data_source_model.objects.filter.assert_called_once()
+        self.assertEqual(
+            set(data_source_model.objects.filter.call_args.kwargs['name__in']),
+            {'source-a'},
+        )
+        datasource.sync.assert_called_once_with()
+        self.assertIn('[1] Syncing source-a', out.getvalue())
+        self.assertIn('completed', out.getvalue())
+        self.assertNotIn('Syncing 1 data sources.', out.getvalue())
+        self.assertNotIn('Finished.', out.getvalue())
+
+    def test_sync_failure_marks_datasource_failed_and_reraises(self):
+        datasource = MagicMock()
+        datasource.__str__.return_value = 'source-a'
+        datasource.pk = 1
+        datasource.sync.side_effect = RuntimeError('boom')
+
+        with patch('core.management.commands.syncdatasource.DataSource') as data_source_model:
+            data_source_model.objects.all.return_value = [datasource]
+
+            with self.assertRaisesMessage(RuntimeError, 'boom'):
+                call_command('syncdatasource', sync_all=True, stdout=StringIO())
+
+        data_source_model.objects.filter.assert_called_once_with(pk=1)
+        data_source_model.objects.filter.return_value.update.assert_called_once_with(
+            status=DataSourceStatusChoices.FAILED,
+        )
+
+    def test_multiple_names_prints_summary_and_syncs_datasources(self):
+        datasource_a = self.FakeDataSource('source-a')
+        datasource_b = self.FakeDataSource('source-b')
+        datasources = self.FakeQuerySet([datasource_a, datasource_b])
+        out = StringIO()
+
+        with patch('core.management.commands.syncdatasource.DataSource') as data_source_model:
+            data_source_model.objects.filter.return_value = datasources
+            call_command('syncdatasource', 'source-a', 'source-b', stdout=out)
+
+        data_source_model.objects.filter.assert_called_once()
+        self.assertEqual(
+            set(data_source_model.objects.filter.call_args.kwargs['name__in']),
+            {'source-a', 'source-b'},
+        )
+        datasource_a.sync.assert_called_once_with()
+        datasource_b.sync.assert_called_once_with()
+        self.assertIn('Syncing 2 data sources.', out.getvalue())
+        self.assertIn('[1] Syncing source-a', out.getvalue())
+        self.assertIn('[2] Syncing source-b', out.getvalue())
+        self.assertIn('Finished.', out.getvalue())

+ 150 - 0
netbox/dcim/tests/test_management_commands.py

@@ -0,0 +1,150 @@
+import json
+import tempfile
+from io import StringIO
+from pathlib import Path
+from types import SimpleNamespace
+from unittest.mock import MagicMock, patch
+
+from django.core.management import call_command
+from django.test import TestCase, override_settings
+
+
+class BuildSchemaTestCase(TestCase):
+    def test_output_is_valid_json(self):
+        out = StringIO()
+
+        call_command('buildschema', stdout=out)
+
+        self.assertIsInstance(json.loads(out.getvalue()), dict)
+
+    def test_write_flag_writes_schema_to_configured_base_dir(self):
+        with tempfile.TemporaryDirectory() as tmpdir:
+            base_dir = Path(tmpdir) / 'netbox'
+            output_dir = Path(tmpdir) / 'contrib'
+            output_dir.mkdir()
+
+            out = StringIO()
+            with override_settings(BASE_DIR=base_dir):
+                call_command('buildschema', write=True, stdout=out)
+
+            output_file = output_dir / 'generated_schema.json'
+            self.assertTrue(output_file.exists())
+            self.assertIsInstance(json.loads(output_file.read_text(encoding='utf-8')), dict)
+            self.assertIn(str(output_file), out.getvalue())
+
+
+class TracePathsTestCase(TestCase):
+    def test_no_cables(self):
+        out = StringIO()
+
+        call_command('trace_paths', no_input=True, stdout=out)
+
+        self.assertIn('Finished.', out.getvalue())
+
+    def test_force_no_cable_paths(self):
+        out = StringIO()
+
+        call_command('trace_paths', force=True, no_input=True, stdout=out)
+
+        self.assertIn('Finished.', out.getvalue())
+
+    def test_retraces_missing_cabled_endpoint_path(self):
+        endpoint = object()
+
+        class FakeQuerySet(list):
+            def filter(self, *args, **kwargs):
+                return self
+
+            def count(self):
+                return len(self)
+
+        class FakeObjects:
+            def filter(self, *args, **kwargs):
+                return FakeQuerySet([endpoint])
+
+        model = SimpleNamespace(
+            objects=FakeObjects(),
+            wireless_link=object(),
+            _meta=SimpleNamespace(verbose_name='interface', verbose_name_plural='interfaces'),
+        )
+        out = StringIO()
+
+        with (
+            patch('dcim.management.commands.trace_paths.ENDPOINT_MODELS', (model,)),
+            patch('dcim.management.commands.trace_paths.create_cablepaths') as create_cablepaths,
+        ):
+            call_command('trace_paths', no_input=True, stdout=out)
+
+        create_cablepaths.assert_called_once_with([endpoint])
+        self.assertIn('Retracing 1 cabled interfaces', out.getvalue())
+        self.assertIn('Retraced 1 interfaces', out.getvalue())
+        self.assertIn('Finished.', out.getvalue())
+
+    def test_progress_bar_drawn_every_100_endpoints(self):
+        endpoints = [object() for _ in range(100)]
+
+        class FakeQuerySet(list):
+            def filter(self, *args, **kwargs):
+                return self
+
+            def count(self):
+                return len(self)
+
+        class FakeObjects:
+            def filter(self, *args, **kwargs):
+                return FakeQuerySet(endpoints)
+
+        model = SimpleNamespace(
+            objects=FakeObjects(),
+            wireless_link=object(),
+            _meta=SimpleNamespace(verbose_name='interface', verbose_name_plural='interfaces'),
+        )
+        out = StringIO()
+
+        with (
+            patch('dcim.management.commands.trace_paths.ENDPOINT_MODELS', (model,)),
+            patch('dcim.management.commands.trace_paths.create_cablepaths'),
+        ):
+            call_command('trace_paths', no_input=True, stdout=out)
+
+        self.assertIn('[####################] 100%', out.getvalue())
+        self.assertIn('Retraced 100 interfaces', out.getvalue())
+
+    def test_force_aborts_when_confirmation_is_not_yes(self):
+        out = StringIO()
+        cable_paths = MagicMock()
+        cable_paths.count.return_value = 1
+
+        with (
+            patch('dcim.management.commands.trace_paths.CablePath') as cable_path_model,
+            patch('builtins.input', return_value='no'),
+        ):
+            cable_path_model.objects.all.return_value = cable_paths
+            call_command('trace_paths', force=True, stdout=out)
+
+        cable_paths.delete.assert_not_called()
+        self.assertIn('WARNING: Forcing recalculation', out.getvalue())
+        self.assertIn('Aborting', out.getvalue())
+
+    def test_force_deletes_existing_paths_and_resets_sequence(self):
+        out = StringIO()
+        cable_paths = MagicMock()
+        cable_paths.count.return_value = 2
+        cable_paths.delete.return_value = (2, {})
+
+        with (
+            patch('dcim.management.commands.trace_paths.CablePath') as cable_path_model,
+            patch('dcim.management.commands.trace_paths.ENDPOINT_MODELS', ()),
+            patch('dcim.management.commands.trace_paths.connection') as connection,
+        ):
+            cable_path_model.objects.all.return_value = cable_paths
+            connection.ops.sequence_reset_sql.return_value = ['RESET SEQUENCE']
+            cursor = connection.cursor.return_value.__enter__.return_value
+
+            call_command('trace_paths', force=True, no_input=True, stdout=out)
+
+        cable_paths.delete.assert_called_once_with()
+        cursor.execute.assert_called_once_with('RESET SEQUENCE')
+        self.assertIn('Deleting 2 existing cable paths', out.getvalue())
+        self.assertIn('Deleted 2 paths', out.getvalue())
+        self.assertIn('Finished.', out.getvalue())

+ 1 - 1
netbox/extras/management/commands/runscript.py

@@ -68,7 +68,7 @@ class Command(BaseCommand):
                 'info': logging.INFO,
                 'info': logging.INFO,
                 'warning': logging.WARNING,
                 'warning': logging.WARNING,
             }[loglevel])
             }[loglevel])
-        except KeyError:
+        except KeyError:  # pragma: no cover
             raise CommandError(f"Invalid log level: {loglevel}")
             raise CommandError(f"Invalid log level: {loglevel}")
 
 
         # Initialize the script form
         # Initialize the script form

+ 504 - 0
netbox/extras/tests/test_management_commands.py

@@ -0,0 +1,504 @@
+from io import BytesIO, StringIO
+from types import SimpleNamespace
+from unittest.mock import MagicMock, patch
+
+from django.contrib.contenttypes.models import ContentType
+from django.core.management import call_command
+from django.core.management.base import CommandError
+from django.test import TestCase
+
+from dcim.choices import InterfaceTypeChoices
+from dcim.models import Device, DeviceRole, DeviceType, Interface, Manufacturer, Site
+from extras.management.commands import renaturalize, webhook_receiver
+from extras.management.commands.webhook_receiver import WebhookHandler
+from users.models import User
+from utilities.fields import NaturalOrderingField
+
+
+class ReindexTestCase(TestCase):
+    def test_reindex_all_registered_indexers(self):
+        class DummyObjects:
+            @staticmethod
+            def iterator():
+                return iter(())
+
+        class DummyModel:
+            objects = DummyObjects()
+            _meta = SimpleNamespace(app_label='extras', model_name='dummy')
+
+        indexer = SimpleNamespace(model=DummyModel)
+        out = StringIO()
+
+        with (
+            patch('extras.management.commands.reindex.registry', {'search': {'extras.dummy': indexer}}),
+            patch('extras.management.commands.reindex.search_backend') as search_backend,
+        ):
+            search_backend.clear.return_value = 0
+            search_backend.cache.return_value = 0
+            search_backend.size = 0
+
+            call_command('reindex', stdout=out)
+
+        search_backend.clear.assert_called_once_with(object_types=None)
+        search_backend.cache.assert_called_once()
+        self.assertIn('Completed.', out.getvalue())
+
+    def test_reindex_lazy_skips_models_with_existing_cache(self):
+        class DummyObjects:
+            @staticmethod
+            def iterator():
+                return iter(())
+
+        class DummyModel:
+            objects = DummyObjects()
+            _meta = SimpleNamespace(app_label='extras', model_name='dummy')
+
+        content_type = object()
+        indexer = SimpleNamespace(model=DummyModel)
+        out = StringIO()
+
+        with (
+            patch('extras.management.commands.reindex.registry', {'search': {'extras.dummy': indexer}}),
+            patch('extras.management.commands.reindex.search_backend') as search_backend,
+            patch.object(ContentType.objects, 'get_for_model', return_value=content_type),
+        ):
+            search_backend.count.return_value = 1
+            search_backend.size = 1
+
+            call_command('reindex', lazy=True, stdout=out)
+
+        search_backend.clear.assert_not_called()
+        search_backend.count.assert_called_once_with(object_types=[content_type])
+        search_backend.cache.assert_not_called()
+        self.assertIn('Skipping', out.getvalue())
+
+    def test_reindex_specific_model_caches_objects_and_reports_total_count(self):
+        iterator = iter([object()])
+
+        class DummyObjects:
+            @staticmethod
+            def iterator():
+                return iterator
+
+        class DummyModel:
+            objects = DummyObjects()
+            _meta = SimpleNamespace(app_label='extras', model_name='dummy')
+
+        content_type = object()
+        indexer = SimpleNamespace(model=DummyModel)
+        out = StringIO()
+
+        with (
+            patch('extras.management.commands.reindex.registry', {'search': {'extras.dummy': indexer}}),
+            patch('extras.management.commands.reindex.search_backend') as search_backend,
+            patch.object(ContentType.objects, 'get_for_model', return_value=content_type),
+        ):
+            search_backend.clear.return_value = 2
+            search_backend.cache.return_value = 1
+            search_backend.size = 1
+
+            call_command('reindex', 'extras.dummy', stdout=out)
+
+        search_backend.clear.assert_called_once_with(object_types=[content_type])
+        search_backend.cache.assert_called_once_with(iterator, remove_existing=False)
+        self.assertIn('1 entries cached.', out.getvalue())
+        self.assertIn('Total entries: 1', out.getvalue())
+
+    def test_reindex_app_label_uses_matching_indexers(self):
+        class DummyObjects:
+            @staticmethod
+            def iterator():
+                return iter(())
+
+        class DummyModel:
+            objects = DummyObjects()
+            _meta = SimpleNamespace(app_label='extras', model_name='dummy')
+
+        class OtherModel:
+            objects = DummyObjects()
+            _meta = SimpleNamespace(app_label='dcim', model_name='device')
+
+        content_type = object()
+        indexer = SimpleNamespace(model=DummyModel)
+        other_indexer = SimpleNamespace(model=OtherModel)
+
+        with (
+            patch(
+                'extras.management.commands.reindex.registry',
+                {'search': {'extras.dummy': indexer, 'dcim.device': other_indexer}},
+            ),
+            patch('extras.management.commands.reindex.search_backend') as search_backend,
+            patch.object(ContentType.objects, 'get_for_model', return_value=content_type),
+        ):
+            search_backend.clear.return_value = 0
+            search_backend.cache.return_value = 0
+            search_backend.size = 0
+
+            call_command('reindex', 'extras', stdout=StringIO())
+
+        search_backend.clear.assert_called_once_with(object_types=[content_type])
+        search_backend.cache.assert_called_once()
+
+    def test_reindex_unknown_registered_model(self):
+        with (
+            patch('extras.management.commands.reindex.registry', {'search': {}}),
+            self.assertRaisesMessage(CommandError, 'No indexer registered for extras.dummy'),
+        ):
+            call_command('reindex', 'extras.dummy', stdout=StringIO())
+
+    def test_reindex_app_with_no_registered_indexers(self):
+        with (
+            patch('extras.management.commands.reindex.registry', {'search': {}}),
+            self.assertRaisesMessage(CommandError, 'No indexers found'),
+        ):
+            call_command('reindex', 'extras', stdout=StringIO())
+
+    def test_invalid_model_label(self):
+        with self.assertRaisesMessage(CommandError, 'Invalid model'):
+            call_command('reindex', 'dcim.rack.extra', stdout=StringIO())
+
+
+class RenaturalizeTestCase(TestCase):
+    @classmethod
+    def setUpTestData(cls):
+        site = Site.objects.create(name='Test Site', slug='test-site')
+        manufacturer = Manufacturer.objects.create(name='Test Manufacturer', slug='test-manufacturer')
+        device_type = DeviceType.objects.create(
+            manufacturer=manufacturer,
+            model='Test Device Type',
+            slug='test-device-type',
+        )
+        device_role = DeviceRole.objects.create(
+            name='Test Device Role',
+            slug='test-device-role',
+            color='ff0000',
+        )
+        cls.device = Device.objects.create(
+            device_type=device_type,
+            role=device_role,
+            name='Test Device',
+            site=site,
+        )
+
+    def test_recalculates_natural_ordering_fields(self):
+        interface = Interface.objects.create(
+            device=self.device,
+            name='Ethernet10',
+            type=InterfaceTypeChoices.TYPE_1GE_FIXED,
+        )
+        field = next(field for field in Interface._meta.concrete_fields if type(field) is NaturalOrderingField)
+        Interface.objects.filter(pk=interface.pk).update(**{field.name: 'incorrect'})
+
+        out = StringIO()
+        call_command('renaturalize', 'dcim.Interface', verbosity=2, stdout=out)
+
+        interface.refresh_from_db()
+        expected = field.naturalize_function(interface.name, max_length=field.max_length)
+        self.assertEqual(getattr(interface, field.name), expected)
+        self.assertIn('Ethernet10 ->', out.getvalue())
+        self.assertIn('updated', out.getvalue())
+
+    def test_recalculates_with_default_verbosity(self):
+        interface = Interface.objects.create(
+            device=self.device,
+            name='Ethernet11',
+            type=InterfaceTypeChoices.TYPE_1GE_FIXED,
+        )
+        field = next(field for field in Interface._meta.concrete_fields if type(field) is NaturalOrderingField)
+        Interface.objects.filter(pk=interface.pk).update(**{field.name: 'incorrect'})
+
+        out = StringIO()
+        call_command('renaturalize', 'dcim.Interface', verbosity=1, stdout=out)
+
+        interface.refresh_from_db()
+        expected = field.naturalize_function(interface.name, max_length=field.max_length)
+        self.assertEqual(getattr(interface, field.name), expected)
+        self.assertIn('Renaturalizing 1 models.', out.getvalue())
+        self.assertIn('Done.', out.getvalue())
+
+    def test_invalid_format(self):
+        with self.assertRaisesMessage(CommandError, 'Invalid format'):
+            call_command('renaturalize', 'dcim', stdout=StringIO())
+
+    def test_model_without_natural_ordering(self):
+        with self.assertRaisesMessage(CommandError, 'does not employ natural ordering'):
+            call_command('renaturalize', 'extras.Tag', stdout=StringIO())
+
+    def test_unknown_app_label(self):
+        with self.assertRaises(CommandError):
+            call_command('renaturalize', 'invalid.Interface', stdout=StringIO())
+
+    def test_unknown_model_name(self):
+        with self.assertRaisesMessage(CommandError, 'Unknown model: dcim.UnknownModel'):
+            call_command('renaturalize', 'dcim.UnknownModel', stdout=StringIO())
+
+    def test_get_models_discovers_all_models_with_natural_ordering_fields(self):
+        field = next(field for field in Interface._meta.concrete_fields if type(field) is NaturalOrderingField)
+        model = SimpleNamespace(_meta=SimpleNamespace(concrete_fields=[field]))
+        app_config = SimpleNamespace(models={'interface': model})
+
+        with patch('extras.management.commands.renaturalize.apps.get_app_configs', return_value=[app_config]):
+            models = renaturalize.Command()._get_models(())
+
+        self.assertEqual(models, [(model, [field])])
+
+
+class RunScriptTestCase(TestCase):
+    @classmethod
+    def setUpTestData(cls):
+        cls.user = User.objects.create_superuser(
+            username='admin',
+            email='admin@example.com',
+            password='password',
+        )
+
+    def test_enqueues_script_job(self):
+        class TestScript:
+            full_name = 'test.Script'
+
+            def as_form(self, data, files):
+                form = MagicMock()
+                form.is_valid.return_value = True
+                form.cleaned_data = {
+                    '_schedule_at': None,
+                    '_interval': None,
+                    '_commit': None,
+                    'name': data['name'],
+                }
+                form.errors.get_json_data.return_value = {}
+                return form
+
+        script_obj = SimpleNamespace(python_class=TestScript)
+        job = SimpleNamespace(duration='0 seconds')
+
+        with (
+            patch(
+                'extras.management.commands.runscript.get_module_and_script',
+                return_value=(None, script_obj),
+            ) as get_module_and_script,
+            patch(
+                'extras.management.commands.runscript.ScriptJob.enqueue',
+                return_value=job,
+            ) as enqueue,
+            patch('extras.management.commands.runscript.logging.getLogger'),
+        ):
+            call_command(
+                'runscript',
+                'test.Script',
+                user='admin',
+                data='{"name": "test"}',
+                stdout=StringIO(),
+            )
+
+        get_module_and_script.assert_called_once_with('test', 'Script')
+        enqueue.assert_called_once()
+        kwargs = enqueue.call_args.kwargs
+        self.assertEqual(kwargs['instance'], script_obj)
+        self.assertEqual(kwargs['user'], self.user)
+        self.assertTrue(kwargs['immediate'])
+        self.assertEqual(kwargs['data'], {'name': 'test'})
+        self.assertFalse(kwargs['commit'])
+
+    def test_invalid_script_data_raises_error_without_enqueueing_job(self):
+        class TestScript:
+            full_name = 'test.Script'
+
+            def as_form(self, data, files):
+                form = MagicMock()
+                form.is_valid.return_value = False
+                form.errors.get_json_data.return_value = {
+                    'name': [
+                        {'message': 'This field is required.'},
+                    ],
+                }
+                return form
+
+        script_obj = SimpleNamespace(python_class=TestScript)
+        logger = MagicMock()
+
+        with (
+            patch(
+                'extras.management.commands.runscript.get_module_and_script',
+                return_value=(None, script_obj),
+            ) as get_module_and_script,
+            patch('extras.management.commands.runscript.ScriptJob.enqueue') as enqueue,
+            patch('extras.management.commands.runscript.logging.getLogger', return_value=logger),
+        ):
+            with self.assertRaises(CommandError):
+                call_command(
+                    'runscript',
+                    'test.Script',
+                    user='admin',
+                    data='{}',
+                    stdout=StringIO(),
+                )
+
+        get_module_and_script.assert_called_once_with('test', 'Script')
+        enqueue.assert_not_called()
+        logger.error.assert_any_call('Data is not valid:')
+        logger.error.assert_any_call('\tname: This field is required.')
+
+    def test_missing_user_falls_back_to_superuser_and_empty_data(self):
+        class TestScript:
+            full_name = 'test.Script'
+
+            def as_form(self, data, files):
+                form = MagicMock()
+                form.is_valid.return_value = True
+                form.cleaned_data = {
+                    '_schedule_at': None,
+                    '_interval': None,
+                    '_commit': None,
+                }
+                form.errors.get_json_data.return_value = {}
+                return form
+
+        script_obj = SimpleNamespace(python_class=TestScript)
+        job = SimpleNamespace(duration='0 seconds')
+
+        with (
+            patch(
+                'extras.management.commands.runscript.get_module_and_script',
+                return_value=(None, script_obj),
+            ),
+            patch(
+                'extras.management.commands.runscript.ScriptJob.enqueue',
+                return_value=job,
+            ) as enqueue,
+            patch('extras.management.commands.runscript.logging.getLogger'),
+        ):
+            call_command(
+                'runscript',
+                'test.Script',
+                user='missing-user',
+                stdout=StringIO(),
+            )
+
+        kwargs = enqueue.call_args.kwargs
+        self.assertEqual(kwargs['user'], self.user)
+        self.assertEqual(kwargs['data'], {})
+
+    def test_no_user_argument_falls_back_to_first_superuser(self):
+        class TestScript:
+            full_name = 'test.Script'
+
+            def as_form(self, data, files):
+                form = MagicMock()
+                form.is_valid.return_value = True
+                form.cleaned_data = {
+                    '_schedule_at': None,
+                    '_interval': None,
+                    '_commit': None,
+                }
+                form.errors.get_json_data.return_value = {}
+                return form
+
+        script_obj = SimpleNamespace(python_class=TestScript)
+        job = SimpleNamespace(duration='0 seconds')
+
+        with (
+            patch(
+                'extras.management.commands.runscript.get_module_and_script',
+                return_value=(None, script_obj),
+            ),
+            patch(
+                'extras.management.commands.runscript.ScriptJob.enqueue',
+                return_value=job,
+            ) as enqueue,
+            patch('extras.management.commands.runscript.logging.getLogger'),
+        ):
+            call_command('runscript', 'test.Script', stdout=StringIO())
+
+        self.assertEqual(enqueue.call_args.kwargs['user'], self.user)
+
+
+class WebhookReceiverTestCase(TestCase):
+    def test_starts_http_server(self):
+        out = StringIO()
+
+        with (
+            patch('extras.management.commands.webhook_receiver.HTTPServer') as http_server,
+            patch.object(WebhookHandler, 'show_headers', True),
+        ):
+            server = http_server.return_value
+            server.serve_forever.side_effect = KeyboardInterrupt
+
+            call_command(
+                'webhook_receiver',
+                port=9999,
+                no_headers=True,
+                stdout=out,
+            )
+
+            self.assertFalse(WebhookHandler.show_headers)
+
+        http_server.assert_called_once_with(('localhost', 9999), WebhookHandler)
+        server.serve_forever.assert_called_once_with()
+        self.assertIn('Listening on port http://localhost:9999', out.getvalue())
+        self.assertIn('Exiting', out.getvalue())
+
+    def test_handler_routes_arbitrary_http_methods(self):
+        handler = object.__new__(WebhookHandler)
+
+        self.assertEqual(handler.__getattr__('do_PATCH').__func__, WebhookHandler.do_ANY)
+        with self.assertRaises(AttributeError):
+            handler.__getattr__('missing')
+
+    def test_handler_logs_request_message(self):
+        handler = object.__new__(WebhookHandler)
+        handler.date_time_string = MagicMock(return_value='now')
+        handler.address_string = MagicMock(return_value='127.0.0.1')
+
+        with (
+            patch('extras.management.commands.webhook_receiver.request_counter', 7),
+            patch('builtins.print') as print_,
+        ):
+            handler.log_message('%s', 'message')
+
+        print_.assert_called_once_with('[7] now 127.0.0.1 message')
+
+    def test_handler_accepts_json_request_body(self):
+        handler = object.__new__(WebhookHandler)
+        body = b'{"ok": true}'
+        handler.headers = {
+            'Content-Length': str(len(body)),
+            'Content-Type': 'application/json',
+            'X-Test': 'value',
+        }
+        handler.rfile = BytesIO(body)
+        handler.wfile = BytesIO()
+        handler.send_response = MagicMock()
+        handler.end_headers = MagicMock()
+        handler.show_headers = True
+
+        with (
+            patch('extras.management.commands.webhook_receiver.request_counter', 1),
+            patch('builtins.print') as print_,
+        ):
+            handler.do_ANY()
+            self.assertEqual(webhook_receiver.request_counter, 2)
+
+        handler.send_response.assert_called_once_with(200)
+        handler.end_headers.assert_called_once_with()
+        self.assertEqual(handler.wfile.getvalue(), b'Webhook received!\n')
+        print_.assert_any_call('X-Test: value')
+        print_.assert_any_call('{\n    "ok": true\n}')
+        print_.assert_any_call('Completed request #1')
+
+    def test_handler_prints_no_body_when_content_length_is_missing(self):
+        handler = object.__new__(WebhookHandler)
+        handler.headers = {}
+        handler.rfile = BytesIO()
+        handler.wfile = BytesIO()
+        handler.send_response = MagicMock()
+        handler.end_headers = MagicMock()
+        handler.show_headers = False
+
+        with (
+            patch('extras.management.commands.webhook_receiver.request_counter', 1),
+            patch('builtins.print') as print_,
+        ):
+            handler.do_ANY()
+
+        print_.assert_any_call('(No body)')
+        print_.assert_any_call('Completed request #1')

+ 84 - 0
netbox/ipam/tests/test_management_commands.py

@@ -0,0 +1,84 @@
+from io import StringIO
+from unittest.mock import patch
+
+from django.core.management import call_command
+from django.test import TestCase
+from netaddr import IPNetwork
+
+from ipam.models import Prefix
+
+
+class RebuildPrefixesTestCase(TestCase):
+    def test_rebuilds_global_prefix_tree(self):
+        out = StringIO()
+
+        with (
+            patch('ipam.management.commands.rebuild_prefixes.Prefix') as prefix_model,
+            patch('ipam.management.commands.rebuild_prefixes.VRF') as vrf_model,
+            patch('ipam.management.commands.rebuild_prefixes.rebuild_prefixes') as rebuild_prefixes,
+        ):
+            prefix_model.objects.count.return_value = 0
+            prefix_model.objects.filter.return_value.count.return_value = 0
+            vrf_model.objects.all.return_value = []
+            call_command('rebuild_prefixes', stdout=out)
+
+        rebuild_prefixes.assert_called_once_with(None)
+        prefix_model.objects.update.assert_called_once_with(_depth=0, _children=0)
+        self.assertIn('Rebuilding 0 prefixes', out.getvalue())
+        self.assertIn('Finished.', out.getvalue())
+
+    def test_hierarchy_is_correct_after_rebuild(self):
+        Prefix.objects.bulk_create(
+            [
+                Prefix(prefix=IPNetwork('10.0.0.0/8')),
+                Prefix(prefix=IPNetwork('10.0.0.0/16')),
+                Prefix(prefix=IPNetwork('10.0.0.0/24')),
+            ]
+        )
+
+        out = StringIO()
+        call_command('rebuild_prefixes', stdout=out)
+
+        self.assertIn('Finished.', out.getvalue())
+        self.assertEqual(Prefix.objects.get(prefix=IPNetwork('10.0.0.0/8'))._depth, 0)
+        self.assertEqual(Prefix.objects.get(prefix=IPNetwork('10.0.0.0/8'))._children, 2)
+        self.assertEqual(Prefix.objects.get(prefix=IPNetwork('10.0.0.0/16'))._depth, 1)
+        self.assertEqual(Prefix.objects.get(prefix=IPNetwork('10.0.0.0/16'))._children, 1)
+        self.assertEqual(Prefix.objects.get(prefix=IPNetwork('10.0.0.0/24'))._depth, 2)
+        self.assertEqual(Prefix.objects.get(prefix=IPNetwork('10.0.0.0/24'))._children, 0)
+
+    def test_rebuilds_prefix_tree_for_each_vrf(self):
+        class FakeVRF:
+            pk = 123
+
+            def __str__(self):
+                return 'Tenant VRF'
+
+        class FakeQuerySet:
+            def __init__(self, count):
+                self._count = count
+
+            def count(self):
+                return self._count
+
+        out = StringIO()
+        vrf = FakeVRF()
+
+        with (
+            patch('ipam.management.commands.rebuild_prefixes.Prefix') as prefix_model,
+            patch('ipam.management.commands.rebuild_prefixes.VRF') as vrf_model,
+            patch('ipam.management.commands.rebuild_prefixes.rebuild_prefixes') as rebuild_prefixes,
+        ):
+            prefix_model.objects.count.return_value = 3
+            prefix_model.objects.filter.side_effect = [
+                FakeQuerySet(1),
+                FakeQuerySet(2),
+            ]
+            vrf_model.objects.all.return_value = [vrf]
+
+            call_command('rebuild_prefixes', stdout=out)
+
+        rebuild_prefixes.assert_any_call(None)
+        rebuild_prefixes.assert_any_call(vrf.pk)
+        self.assertIn('Global: 1 prefixes', out.getvalue())
+        self.assertIn('VRF Tenant VRF: 2 prefixes', out.getvalue())

+ 51 - 0
netbox/utilities/tests/test_management_commands.py

@@ -0,0 +1,51 @@
+from io import StringIO
+from unittest.mock import MagicMock, patch
+
+from django.core.management import call_command
+from django.test import TestCase
+
+from utilities.management.commands.calculate_cached_counts import Command
+
+
+class CalculateCachedCountsTestCase(TestCase):
+    def test_updates_registered_counter_fields(self):
+        class ParentModel:
+            pass
+
+        out = StringIO()
+
+        with (
+            patch.object(
+                Command,
+                'collect_models',
+                return_value={ParentModel: {'interface_count': 'interfaces'}},
+            ),
+            patch('utilities.management.commands.calculate_cached_counts.update_counts') as update_counts,
+        ):
+            call_command('calculate_cached_counts', stdout=out)
+
+        update_counts.assert_called_once_with(ParentModel, 'interface_count', 'interfaces')
+        self.assertIn('Finished.', out.getvalue())
+
+    def test_collect_models_returns_counter_field_mappings_by_parent_model(self):
+        class ParentModel:
+            pass
+
+        class ChildModel:
+            pass
+
+        fk_field = MagicMock()
+        fk_field.related_model = ParentModel
+        fk_field.related_query_name.return_value = 'children'
+        ChildModel._meta = MagicMock()
+        ChildModel._meta.get_field.return_value = fk_field
+
+        with patch(
+            'utilities.management.commands.calculate_cached_counts.registry',
+            {'counter_fields': {ChildModel: {'parent': 'child_count'}}},
+        ):
+            models = Command.collect_models()
+
+        ChildModel._meta.get_field.assert_called_once_with('parent')
+        fk_field.related_query_name.assert_called_once_with()
+        self.assertEqual(dict(models), {ParentModel: {'child_count': 'children'}})