Просмотр исходного кода

Merge pull request #22193 from netbox-community/22124-add-tests-for-management-commands

Closes #22124: Add test coverage for custom management commands
bctiemann 1 неделя назад
Родитель
Сommit
55b2c6e0a8

+ 1 - 1
netbox/core/management/commands/nbshell.py

@@ -150,7 +150,7 @@ class Command(BaseCommand):
         try:
             import readline
             import rlcompleter
-        except ModuleNotFoundError:
+        except ModuleNotFoundError:  # pragma: no cover
             pass
         else:
             readline.set_completer(rlcompleter.Completer(namespace).complete)

+ 79 - 0
netbox/core/tests/test_management_command_coverage.py

@@ -0,0 +1,79 @@
+import ast
+from pathlib import Path
+
+from django.apps import apps
+from django.conf import settings
+from django.test import SimpleTestCase
+
+EXCLUDED_CUSTOM_COMMANDS = {
+    # Deprecated; excluded from management command test coverage by #22124.
+    'housekeeping',
+}
+
+
+class ManagementCommandCoverageTestCase(SimpleTestCase):
+    def test_all_custom_management_commands_have_tests(self):
+        custom_commands = self._get_custom_management_commands()
+        tested_commands = self._get_tested_management_commands()
+
+        self.assertTrue(
+            custom_commands,
+            'No custom management commands were discovered; check command discovery logic.',
+        )
+
+        missing_commands = sorted(custom_commands - tested_commands - EXCLUDED_CUSTOM_COMMANDS)
+
+        self.assertEqual(
+            missing_commands,
+            [],
+            msg=(f'Tests are missing for custom management commands: {", ".join(missing_commands)}'),
+        )
+
+    @staticmethod
+    def _get_custom_management_commands():
+        base_dir = Path(settings.BASE_DIR).resolve()
+        commands = set()
+
+        for app_config in apps.get_app_configs():
+            app_path = Path(app_config.path).resolve()
+            if not app_path.is_relative_to(base_dir):
+                continue
+
+            commands_path = app_path / 'management' / 'commands'
+            if not commands_path.exists():
+                continue
+
+            commands.update(path.stem for path in commands_path.glob('*.py') if not path.name.startswith('_'))
+
+        return commands
+
+    @staticmethod
+    def _get_tested_management_commands():
+        base_dir = Path(settings.BASE_DIR).resolve()
+        commands = set()
+
+        for test_file in base_dir.glob('*/tests/test_management_commands.py'):
+            tree = ast.parse(test_file.read_text(encoding='utf-8'))
+            for node in ast.walk(tree):
+                if not isinstance(node, ast.Call):
+                    continue
+                if not _is_call_command(node.func):
+                    continue
+                if not node.args:
+                    continue
+
+                command_name = node.args[0]
+                if isinstance(command_name, ast.Constant) and isinstance(command_name.value, str):
+                    commands.add(command_name.value)
+
+        return commands
+
+
+def _is_call_command(func):
+    if isinstance(func, ast.Name):
+        return func.id == 'call_command'
+
+    if isinstance(func, ast.Attribute):
+        return func.attr == 'call_command'
+
+    return False

+ 317 - 0
netbox/core/tests/test_management_commands.py

@@ -0,0 +1,317 @@
+from io import StringIO
+from types import SimpleNamespace
+from unittest.mock import MagicMock, patch
+
+from django.core.management import call_command
+from django.core.management.base import CommandError
+from django.test import TestCase, override_settings
+
+from core.choices import DataSourceStatusChoices
+from core.management.commands import nbshell
+from core.management.commands.rqworker import DEFAULT_QUEUES
+
+
+class MakeMigrationsTestCase(TestCase):
+    @override_settings(DEVELOPER=False)
+    def test_blocked_in_non_developer_mode(self):
+        with self.assertRaisesMessage(CommandError, 'development purposes only'):
+            call_command('makemigrations', stdout=StringIO(), stderr=StringIO())
+
+    @override_settings(DEVELOPER=False)
+    def test_check_flag_allowed_in_non_developer_mode(self):
+        with patch('core.management.commands.makemigrations._Command.handle') as super_handle:
+            call_command(
+                'makemigrations',
+                check_changes=True,
+                stdout=StringIO(),
+                stderr=StringIO(),
+            )
+
+        super_handle.assert_called_once()
+        self.assertTrue(super_handle.call_args.kwargs['check_changes'])
+
+
+class NbShellTestCase(TestCase):
+    def test_color_helpers_wrap_text(self):
+        self.assertIn('message', nbshell.color('green', 'message'))
+        self.assertIn('message', nbshell.bright('message'))
+
+    def test_get_models_excludes_private_models(self):
+        public_model = type('PublicModel', (), {})
+        private_model = type('PrivateModel', (), {'_netbox_private': True})
+        app_config = SimpleNamespace(get_models=lambda: [public_model, private_model])
+
+        self.assertEqual(nbshell.get_models(app_config), [public_model])
+
+    def test_get_constants_returns_module_attributes(self):
+        constants = SimpleNamespace(FOO='bar', ANSWER=42)
+
+        with patch('core.management.commands.nbshell.import_string', return_value=constants):
+            self.assertEqual(
+                nbshell.get_constants(SimpleNamespace(name='testapp')),
+                {'FOO': 'bar', 'ANSWER': 42},
+            )
+
+    def test_get_constants_handles_missing_constants_module(self):
+        with patch('core.management.commands.nbshell.import_string', side_effect=ImportError):
+            self.assertEqual(nbshell.get_constants(SimpleNamespace(name='testapp')), {})
+
+    def test_executes_inline_command(self):
+        namespace = {}
+
+        with patch(
+            'core.management.commands.nbshell.Command.get_namespace',
+            return_value=namespace,
+        ):
+            call_command('nbshell', command='answer = 42')
+
+        self.assertEqual(namespace['answer'], 42)
+
+    def test_starts_interactive_shell_without_inline_command(self):
+        namespace = {'answer': 42}
+
+        with (
+            patch('core.management.commands.nbshell.Command.get_namespace', return_value=namespace),
+            patch('core.management.commands.nbshell.Command.get_banner_text', return_value='banner'),
+            patch('core.management.commands.nbshell.code.interact', return_value=None) as interact,
+        ):
+            call_command('nbshell', stdout=StringIO())
+
+        interact.assert_called_once_with(banner='banner', local=namespace)
+
+    def test_get_namespace_includes_models_constants_and_helpers(self):
+        class DummyModel:
+            pass
+
+        app_config = SimpleNamespace(
+            name='dummyapp',
+            get_models=lambda: [DummyModel],
+        )
+        command = nbshell.Command()
+        command.django_models = {}
+
+        with (
+            patch('core.management.commands.nbshell.CORE_APPS', ('dummyapp',)),
+            patch('core.management.commands.nbshell.get_installed_plugins', return_value={}),
+            patch('core.management.commands.nbshell.apps.get_app_config', return_value=app_config),
+            patch('core.management.commands.nbshell.get_constants', return_value={'CONSTANT': 'value'}),
+        ):
+            namespace = command.get_namespace()
+
+        self.assertIs(namespace['dummyapp'].DummyModel, DummyModel)
+        self.assertEqual(namespace['dummyapp'].CONSTANT, 'value')
+        self.assertEqual(command.django_models['dummyapp'], ['DummyModel'])
+        self.assertEqual(namespace['lsapps'], command._lsapps)
+        self.assertEqual(namespace['lsmodels'], command._lsmodels)
+
+    def test_list_apps_and_models_helpers(self):
+        command = nbshell.Command()
+        command.django_models = {'dcim': ['Device', 'Site']}
+        app_config = SimpleNamespace(verbose_name='DCIM')
+
+        with (
+            patch('core.management.commands.nbshell.apps.get_app_config', return_value=app_config),
+            patch('builtins.print') as print_,
+        ):
+            command._lsapps()
+            command._lsmodels('dcim')
+
+        self.assertIn(('dcim - DCIM',), [call.args for call in print_.call_args_list])
+        self.assertIn(('DCIM:',), [call.args for call in print_.call_args_list])
+        self.assertIn(('  dcim.Device',), [call.args for call in print_.call_args_list])
+        self.assertIn(('  dcim.Site',), [call.args for call in print_.call_args_list])
+
+    def test_list_models_reports_unknown_app(self):
+        command = nbshell.Command()
+        command.django_models = {}
+
+        with patch('builtins.print') as print_:
+            command._lsmodels('unknown')
+
+        print_.assert_called_once_with('No models listed for unknown')
+
+    def test_list_models_lists_all_apps_when_no_app_label_given(self):
+        command = nbshell.Command()
+        command.django_models = {'dcim': ['Device'], 'ipam': ['IPAddress']}
+        app_configs = {
+            'dcim': SimpleNamespace(verbose_name='DCIM'),
+            'ipam': SimpleNamespace(verbose_name='IPAM'),
+        }
+
+        with (
+            patch(
+                'core.management.commands.nbshell.apps.get_app_config',
+                side_effect=lambda label: app_configs[label],
+            ),
+            patch('builtins.print') as print_,
+        ):
+            command._lsmodels()
+
+        printed = [call.args for call in print_.call_args_list]
+        self.assertIn(('DCIM:',), printed)
+        self.assertIn(('IPAM:',), printed)
+        self.assertIn(('  dcim.Device',), printed)
+        self.assertIn(('  ipam.IPAddress',), printed)
+
+    def test_banner_includes_installed_plugins(self):
+        with (
+            patch('core.management.commands.nbshell.platform.node', return_value='netbox'),
+            patch('core.management.commands.nbshell.platform.python_version', return_value='3.12.0'),
+            patch('core.management.commands.nbshell.get_version', return_value='5.2.0'),
+            patch('core.management.commands.nbshell.get_installed_plugins', return_value={'plugin': '1.2.3'}),
+        ):
+            banner = nbshell.Command.get_banner_text()
+
+        self.assertIn('NetBox interactive shell', banner)
+        self.assertIn('Plugins:', banner)
+        self.assertIn('plugin', banner)
+
+
+class RQWorkerTestCase(TestCase):
+    def test_defaults_to_all_queues_and_enables_scheduler(self):
+        with (
+            patch('core.management.commands.rqworker.registry', {'system_jobs': {}}),
+            patch('core.management.commands.rqworker._Command.handle') as super_handle,
+            self.assertLogs('netbox.rqworker', level='WARNING') as logs,
+        ):
+            call_command('rqworker', stdout=StringIO(), stderr=StringIO())
+
+        super_handle.assert_called_once()
+        args, kwargs = super_handle.call_args
+        self.assertEqual(args, DEFAULT_QUEUES)
+        self.assertTrue(kwargs['with_scheduler'])
+        self.assertEqual(len(logs.output), 1)
+        self.assertIn('No queues have been specified', logs.output[0])
+
+    def test_schedules_registered_system_jobs(self):
+        job = MagicMock()
+        job.name = 'TestJob'
+
+        with (
+            patch('core.management.commands.rqworker.registry', {'system_jobs': {job: {'interval': 5}}}),
+            patch('core.management.commands.rqworker._Command.handle') as super_handle,
+        ):
+            call_command('rqworker', 'high', stdout=StringIO(), stderr=StringIO())
+
+        job.enqueue_once.assert_called_once_with(interval=5)
+        super_handle.assert_called_once()
+        args, kwargs = super_handle.call_args
+        self.assertEqual(args, ('high',))
+        self.assertTrue(kwargs['with_scheduler'])
+
+    def test_system_jobs_must_specify_interval(self):
+        job = MagicMock()
+        job.name = 'TestJob'
+
+        with patch('core.management.commands.rqworker.registry', {'system_jobs': {job: {}}}):
+            with self.assertRaisesMessage(TypeError, 'System job must specify an interval'):
+                call_command('rqworker', stdout=StringIO(), stderr=StringIO())
+
+
+class SyncDataSourceTestCase(TestCase):
+    class FakeDataSource:
+        def __init__(self, name):
+            self.name = name
+            self.pk = name
+            self.sync = MagicMock()
+
+        def __str__(self):
+            return self.name
+
+        def get_status_display(self):
+            return 'completed'
+
+    class FakeQuerySet(list):
+        def values(self, *fields):
+            return [{field: getattr(item, field) for field in fields} for item in self]
+
+    def test_requires_name_or_all(self):
+        with self.assertRaisesMessage(CommandError, 'Must specify at least one data source'):
+            call_command('syncdatasource', stdout=StringIO())
+
+    def test_invalid_name(self):
+        with patch('core.management.commands.syncdatasource.DataSource') as data_source_model:
+            data_source_model.objects.filter.return_value = self.FakeQuerySet()
+            with self.assertRaisesMessage(CommandError, 'Invalid data source names: nonexistent-source'):
+                call_command('syncdatasource', 'nonexistent-source', stdout=StringIO())
+
+        data_source_model.objects.filter.assert_called_once()
+        self.assertEqual(
+            set(data_source_model.objects.filter.call_args.kwargs['name__in']),
+            {'nonexistent-source'},
+        )
+
+    def test_all_syncs_datasource(self):
+        datasource = MagicMock()
+        datasource.__str__.return_value = 'Test Data Source'
+        datasource.get_status_display.return_value = 'completed'
+
+        out = StringIO()
+
+        with patch('core.management.commands.syncdatasource.DataSource') as data_source_model:
+            data_source_model.objects.all.return_value = [datasource]
+            call_command('syncdatasource', sync_all=True, stdout=out)
+
+        data_source_model.objects.all.assert_called_once_with()
+        datasource.sync.assert_called_once_with()
+        self.assertIn('Syncing Test Data Source', out.getvalue())
+        self.assertIn('completed', out.getvalue())
+
+    def test_named_datasource_syncs_matching_datasource(self):
+        datasource = self.FakeDataSource('source-a')
+        datasources = self.FakeQuerySet([datasource])
+        out = StringIO()
+
+        with patch('core.management.commands.syncdatasource.DataSource') as data_source_model:
+            data_source_model.objects.filter.return_value = datasources
+            call_command('syncdatasource', 'source-a', stdout=out)
+
+        data_source_model.objects.filter.assert_called_once()
+        self.assertEqual(
+            set(data_source_model.objects.filter.call_args.kwargs['name__in']),
+            {'source-a'},
+        )
+        datasource.sync.assert_called_once_with()
+        self.assertIn('[1] Syncing source-a', out.getvalue())
+        self.assertIn('completed', out.getvalue())
+        self.assertNotIn('Syncing 1 data sources.', out.getvalue())
+        self.assertNotIn('Finished.', out.getvalue())
+
+    def test_sync_failure_marks_datasource_failed_and_reraises(self):
+        datasource = MagicMock()
+        datasource.__str__.return_value = 'source-a'
+        datasource.pk = 1
+        datasource.sync.side_effect = RuntimeError('boom')
+
+        with patch('core.management.commands.syncdatasource.DataSource') as data_source_model:
+            data_source_model.objects.all.return_value = [datasource]
+
+            with self.assertRaisesMessage(RuntimeError, 'boom'):
+                call_command('syncdatasource', sync_all=True, stdout=StringIO())
+
+        data_source_model.objects.filter.assert_called_once_with(pk=1)
+        data_source_model.objects.filter.return_value.update.assert_called_once_with(
+            status=DataSourceStatusChoices.FAILED,
+        )
+
+    def test_multiple_names_prints_summary_and_syncs_datasources(self):
+        datasource_a = self.FakeDataSource('source-a')
+        datasource_b = self.FakeDataSource('source-b')
+        datasources = self.FakeQuerySet([datasource_a, datasource_b])
+        out = StringIO()
+
+        with patch('core.management.commands.syncdatasource.DataSource') as data_source_model:
+            data_source_model.objects.filter.return_value = datasources
+            call_command('syncdatasource', 'source-a', 'source-b', stdout=out)
+
+        data_source_model.objects.filter.assert_called_once()
+        self.assertEqual(
+            set(data_source_model.objects.filter.call_args.kwargs['name__in']),
+            {'source-a', 'source-b'},
+        )
+        datasource_a.sync.assert_called_once_with()
+        datasource_b.sync.assert_called_once_with()
+        self.assertIn('Syncing 2 data sources.', out.getvalue())
+        self.assertIn('[1] Syncing source-a', out.getvalue())
+        self.assertIn('[2] Syncing source-b', out.getvalue())
+        self.assertIn('Finished.', out.getvalue())

+ 150 - 0
netbox/dcim/tests/test_management_commands.py

@@ -0,0 +1,150 @@
+import json
+import tempfile
+from io import StringIO
+from pathlib import Path
+from types import SimpleNamespace
+from unittest.mock import MagicMock, patch
+
+from django.core.management import call_command
+from django.test import TestCase, override_settings
+
+
+class BuildSchemaTestCase(TestCase):
+    def test_output_is_valid_json(self):
+        out = StringIO()
+
+        call_command('buildschema', stdout=out)
+
+        self.assertIsInstance(json.loads(out.getvalue()), dict)
+
+    def test_write_flag_writes_schema_to_configured_base_dir(self):
+        with tempfile.TemporaryDirectory() as tmpdir:
+            base_dir = Path(tmpdir) / 'netbox'
+            output_dir = Path(tmpdir) / 'contrib'
+            output_dir.mkdir()
+
+            out = StringIO()
+            with override_settings(BASE_DIR=base_dir):
+                call_command('buildschema', write=True, stdout=out)
+
+            output_file = output_dir / 'generated_schema.json'
+            self.assertTrue(output_file.exists())
+            self.assertIsInstance(json.loads(output_file.read_text(encoding='utf-8')), dict)
+            self.assertIn(str(output_file), out.getvalue())
+
+
+class TracePathsTestCase(TestCase):
+    def test_no_cables(self):
+        out = StringIO()
+
+        call_command('trace_paths', no_input=True, stdout=out)
+
+        self.assertIn('Finished.', out.getvalue())
+
+    def test_force_no_cable_paths(self):
+        out = StringIO()
+
+        call_command('trace_paths', force=True, no_input=True, stdout=out)
+
+        self.assertIn('Finished.', out.getvalue())
+
+    def test_retraces_missing_cabled_endpoint_path(self):
+        endpoint = object()
+
+        class FakeQuerySet(list):
+            def filter(self, *args, **kwargs):
+                return self
+
+            def count(self):
+                return len(self)
+
+        class FakeObjects:
+            def filter(self, *args, **kwargs):
+                return FakeQuerySet([endpoint])
+
+        model = SimpleNamespace(
+            objects=FakeObjects(),
+            wireless_link=object(),
+            _meta=SimpleNamespace(verbose_name='interface', verbose_name_plural='interfaces'),
+        )
+        out = StringIO()
+
+        with (
+            patch('dcim.management.commands.trace_paths.ENDPOINT_MODELS', (model,)),
+            patch('dcim.management.commands.trace_paths.create_cablepaths') as create_cablepaths,
+        ):
+            call_command('trace_paths', no_input=True, stdout=out)
+
+        create_cablepaths.assert_called_once_with([endpoint])
+        self.assertIn('Retracing 1 cabled interfaces', out.getvalue())
+        self.assertIn('Retraced 1 interfaces', out.getvalue())
+        self.assertIn('Finished.', out.getvalue())
+
+    def test_progress_bar_drawn_every_100_endpoints(self):
+        endpoints = [object() for _ in range(100)]
+
+        class FakeQuerySet(list):
+            def filter(self, *args, **kwargs):
+                return self
+
+            def count(self):
+                return len(self)
+
+        class FakeObjects:
+            def filter(self, *args, **kwargs):
+                return FakeQuerySet(endpoints)
+
+        model = SimpleNamespace(
+            objects=FakeObjects(),
+            wireless_link=object(),
+            _meta=SimpleNamespace(verbose_name='interface', verbose_name_plural='interfaces'),
+        )
+        out = StringIO()
+
+        with (
+            patch('dcim.management.commands.trace_paths.ENDPOINT_MODELS', (model,)),
+            patch('dcim.management.commands.trace_paths.create_cablepaths'),
+        ):
+            call_command('trace_paths', no_input=True, stdout=out)
+
+        self.assertIn('[####################] 100%', out.getvalue())
+        self.assertIn('Retraced 100 interfaces', out.getvalue())
+
+    def test_force_aborts_when_confirmation_is_not_yes(self):
+        out = StringIO()
+        cable_paths = MagicMock()
+        cable_paths.count.return_value = 1
+
+        with (
+            patch('dcim.management.commands.trace_paths.CablePath') as cable_path_model,
+            patch('builtins.input', return_value='no'),
+        ):
+            cable_path_model.objects.all.return_value = cable_paths
+            call_command('trace_paths', force=True, stdout=out)
+
+        cable_paths.delete.assert_not_called()
+        self.assertIn('WARNING: Forcing recalculation', out.getvalue())
+        self.assertIn('Aborting', out.getvalue())
+
+    def test_force_deletes_existing_paths_and_resets_sequence(self):
+        out = StringIO()
+        cable_paths = MagicMock()
+        cable_paths.count.return_value = 2
+        cable_paths.delete.return_value = (2, {})
+
+        with (
+            patch('dcim.management.commands.trace_paths.CablePath') as cable_path_model,
+            patch('dcim.management.commands.trace_paths.ENDPOINT_MODELS', ()),
+            patch('dcim.management.commands.trace_paths.connection') as connection,
+        ):
+            cable_path_model.objects.all.return_value = cable_paths
+            connection.ops.sequence_reset_sql.return_value = ['RESET SEQUENCE']
+            cursor = connection.cursor.return_value.__enter__.return_value
+
+            call_command('trace_paths', force=True, no_input=True, stdout=out)
+
+        cable_paths.delete.assert_called_once_with()
+        cursor.execute.assert_called_once_with('RESET SEQUENCE')
+        self.assertIn('Deleting 2 existing cable paths', out.getvalue())
+        self.assertIn('Deleted 2 paths', out.getvalue())
+        self.assertIn('Finished.', out.getvalue())

+ 1 - 1
netbox/extras/management/commands/runscript.py

@@ -68,7 +68,7 @@ class Command(BaseCommand):
                 'info': logging.INFO,
                 'warning': logging.WARNING,
             }[loglevel])
-        except KeyError:
+        except KeyError:  # pragma: no cover
             raise CommandError(f"Invalid log level: {loglevel}")
 
         # Initialize the script form

+ 504 - 0
netbox/extras/tests/test_management_commands.py

@@ -0,0 +1,504 @@
+from io import BytesIO, StringIO
+from types import SimpleNamespace
+from unittest.mock import MagicMock, patch
+
+from django.contrib.contenttypes.models import ContentType
+from django.core.management import call_command
+from django.core.management.base import CommandError
+from django.test import TestCase
+
+from dcim.choices import InterfaceTypeChoices
+from dcim.models import Device, DeviceRole, DeviceType, Interface, Manufacturer, Site
+from extras.management.commands import renaturalize, webhook_receiver
+from extras.management.commands.webhook_receiver import WebhookHandler
+from users.models import User
+from utilities.fields import NaturalOrderingField
+
+
+class ReindexTestCase(TestCase):
+    def test_reindex_all_registered_indexers(self):
+        class DummyObjects:
+            @staticmethod
+            def iterator():
+                return iter(())
+
+        class DummyModel:
+            objects = DummyObjects()
+            _meta = SimpleNamespace(app_label='extras', model_name='dummy')
+
+        indexer = SimpleNamespace(model=DummyModel)
+        out = StringIO()
+
+        with (
+            patch('extras.management.commands.reindex.registry', {'search': {'extras.dummy': indexer}}),
+            patch('extras.management.commands.reindex.search_backend') as search_backend,
+        ):
+            search_backend.clear.return_value = 0
+            search_backend.cache.return_value = 0
+            search_backend.size = 0
+
+            call_command('reindex', stdout=out)
+
+        search_backend.clear.assert_called_once_with(object_types=None)
+        search_backend.cache.assert_called_once()
+        self.assertIn('Completed.', out.getvalue())
+
+    def test_reindex_lazy_skips_models_with_existing_cache(self):
+        class DummyObjects:
+            @staticmethod
+            def iterator():
+                return iter(())
+
+        class DummyModel:
+            objects = DummyObjects()
+            _meta = SimpleNamespace(app_label='extras', model_name='dummy')
+
+        content_type = object()
+        indexer = SimpleNamespace(model=DummyModel)
+        out = StringIO()
+
+        with (
+            patch('extras.management.commands.reindex.registry', {'search': {'extras.dummy': indexer}}),
+            patch('extras.management.commands.reindex.search_backend') as search_backend,
+            patch.object(ContentType.objects, 'get_for_model', return_value=content_type),
+        ):
+            search_backend.count.return_value = 1
+            search_backend.size = 1
+
+            call_command('reindex', lazy=True, stdout=out)
+
+        search_backend.clear.assert_not_called()
+        search_backend.count.assert_called_once_with(object_types=[content_type])
+        search_backend.cache.assert_not_called()
+        self.assertIn('Skipping', out.getvalue())
+
+    def test_reindex_specific_model_caches_objects_and_reports_total_count(self):
+        iterator = iter([object()])
+
+        class DummyObjects:
+            @staticmethod
+            def iterator():
+                return iterator
+
+        class DummyModel:
+            objects = DummyObjects()
+            _meta = SimpleNamespace(app_label='extras', model_name='dummy')
+
+        content_type = object()
+        indexer = SimpleNamespace(model=DummyModel)
+        out = StringIO()
+
+        with (
+            patch('extras.management.commands.reindex.registry', {'search': {'extras.dummy': indexer}}),
+            patch('extras.management.commands.reindex.search_backend') as search_backend,
+            patch.object(ContentType.objects, 'get_for_model', return_value=content_type),
+        ):
+            search_backend.clear.return_value = 2
+            search_backend.cache.return_value = 1
+            search_backend.size = 1
+
+            call_command('reindex', 'extras.dummy', stdout=out)
+
+        search_backend.clear.assert_called_once_with(object_types=[content_type])
+        search_backend.cache.assert_called_once_with(iterator, remove_existing=False)
+        self.assertIn('1 entries cached.', out.getvalue())
+        self.assertIn('Total entries: 1', out.getvalue())
+
+    def test_reindex_app_label_uses_matching_indexers(self):
+        class DummyObjects:
+            @staticmethod
+            def iterator():
+                return iter(())
+
+        class DummyModel:
+            objects = DummyObjects()
+            _meta = SimpleNamespace(app_label='extras', model_name='dummy')
+
+        class OtherModel:
+            objects = DummyObjects()
+            _meta = SimpleNamespace(app_label='dcim', model_name='device')
+
+        content_type = object()
+        indexer = SimpleNamespace(model=DummyModel)
+        other_indexer = SimpleNamespace(model=OtherModel)
+
+        with (
+            patch(
+                'extras.management.commands.reindex.registry',
+                {'search': {'extras.dummy': indexer, 'dcim.device': other_indexer}},
+            ),
+            patch('extras.management.commands.reindex.search_backend') as search_backend,
+            patch.object(ContentType.objects, 'get_for_model', return_value=content_type),
+        ):
+            search_backend.clear.return_value = 0
+            search_backend.cache.return_value = 0
+            search_backend.size = 0
+
+            call_command('reindex', 'extras', stdout=StringIO())
+
+        search_backend.clear.assert_called_once_with(object_types=[content_type])
+        search_backend.cache.assert_called_once()
+
+    def test_reindex_unknown_registered_model(self):
+        with (
+            patch('extras.management.commands.reindex.registry', {'search': {}}),
+            self.assertRaisesMessage(CommandError, 'No indexer registered for extras.dummy'),
+        ):
+            call_command('reindex', 'extras.dummy', stdout=StringIO())
+
+    def test_reindex_app_with_no_registered_indexers(self):
+        with (
+            patch('extras.management.commands.reindex.registry', {'search': {}}),
+            self.assertRaisesMessage(CommandError, 'No indexers found'),
+        ):
+            call_command('reindex', 'extras', stdout=StringIO())
+
+    def test_invalid_model_label(self):
+        with self.assertRaisesMessage(CommandError, 'Invalid model'):
+            call_command('reindex', 'dcim.rack.extra', stdout=StringIO())
+
+
+class RenaturalizeTestCase(TestCase):
+    @classmethod
+    def setUpTestData(cls):
+        site = Site.objects.create(name='Test Site', slug='test-site')
+        manufacturer = Manufacturer.objects.create(name='Test Manufacturer', slug='test-manufacturer')
+        device_type = DeviceType.objects.create(
+            manufacturer=manufacturer,
+            model='Test Device Type',
+            slug='test-device-type',
+        )
+        device_role = DeviceRole.objects.create(
+            name='Test Device Role',
+            slug='test-device-role',
+            color='ff0000',
+        )
+        cls.device = Device.objects.create(
+            device_type=device_type,
+            role=device_role,
+            name='Test Device',
+            site=site,
+        )
+
+    def test_recalculates_natural_ordering_fields(self):
+        interface = Interface.objects.create(
+            device=self.device,
+            name='Ethernet10',
+            type=InterfaceTypeChoices.TYPE_1GE_FIXED,
+        )
+        field = next(field for field in Interface._meta.concrete_fields if type(field) is NaturalOrderingField)
+        Interface.objects.filter(pk=interface.pk).update(**{field.name: 'incorrect'})
+
+        out = StringIO()
+        call_command('renaturalize', 'dcim.Interface', verbosity=2, stdout=out)
+
+        interface.refresh_from_db()
+        expected = field.naturalize_function(interface.name, max_length=field.max_length)
+        self.assertEqual(getattr(interface, field.name), expected)
+        self.assertIn('Ethernet10 ->', out.getvalue())
+        self.assertIn('updated', out.getvalue())
+
+    def test_recalculates_with_default_verbosity(self):
+        interface = Interface.objects.create(
+            device=self.device,
+            name='Ethernet11',
+            type=InterfaceTypeChoices.TYPE_1GE_FIXED,
+        )
+        field = next(field for field in Interface._meta.concrete_fields if type(field) is NaturalOrderingField)
+        Interface.objects.filter(pk=interface.pk).update(**{field.name: 'incorrect'})
+
+        out = StringIO()
+        call_command('renaturalize', 'dcim.Interface', verbosity=1, stdout=out)
+
+        interface.refresh_from_db()
+        expected = field.naturalize_function(interface.name, max_length=field.max_length)
+        self.assertEqual(getattr(interface, field.name), expected)
+        self.assertIn('Renaturalizing 1 models.', out.getvalue())
+        self.assertIn('Done.', out.getvalue())
+
+    def test_invalid_format(self):
+        with self.assertRaisesMessage(CommandError, 'Invalid format'):
+            call_command('renaturalize', 'dcim', stdout=StringIO())
+
+    def test_model_without_natural_ordering(self):
+        with self.assertRaisesMessage(CommandError, 'does not employ natural ordering'):
+            call_command('renaturalize', 'extras.Tag', stdout=StringIO())
+
+    def test_unknown_app_label(self):
+        with self.assertRaises(CommandError):
+            call_command('renaturalize', 'invalid.Interface', stdout=StringIO())
+
+    def test_unknown_model_name(self):
+        with self.assertRaisesMessage(CommandError, 'Unknown model: dcim.UnknownModel'):
+            call_command('renaturalize', 'dcim.UnknownModel', stdout=StringIO())
+
+    def test_get_models_discovers_all_models_with_natural_ordering_fields(self):
+        field = next(field for field in Interface._meta.concrete_fields if type(field) is NaturalOrderingField)
+        model = SimpleNamespace(_meta=SimpleNamespace(concrete_fields=[field]))
+        app_config = SimpleNamespace(models={'interface': model})
+
+        with patch('extras.management.commands.renaturalize.apps.get_app_configs', return_value=[app_config]):
+            models = renaturalize.Command()._get_models(())
+
+        self.assertEqual(models, [(model, [field])])
+
+
+class RunScriptTestCase(TestCase):
+    @classmethod
+    def setUpTestData(cls):
+        cls.user = User.objects.create_superuser(
+            username='admin',
+            email='admin@example.com',
+            password='password',
+        )
+
+    def test_enqueues_script_job(self):
+        class TestScript:
+            full_name = 'test.Script'
+
+            def as_form(self, data, files):
+                form = MagicMock()
+                form.is_valid.return_value = True
+                form.cleaned_data = {
+                    '_schedule_at': None,
+                    '_interval': None,
+                    '_commit': None,
+                    'name': data['name'],
+                }
+                form.errors.get_json_data.return_value = {}
+                return form
+
+        script_obj = SimpleNamespace(python_class=TestScript)
+        job = SimpleNamespace(duration='0 seconds')
+
+        with (
+            patch(
+                'extras.management.commands.runscript.get_module_and_script',
+                return_value=(None, script_obj),
+            ) as get_module_and_script,
+            patch(
+                'extras.management.commands.runscript.ScriptJob.enqueue',
+                return_value=job,
+            ) as enqueue,
+            patch('extras.management.commands.runscript.logging.getLogger'),
+        ):
+            call_command(
+                'runscript',
+                'test.Script',
+                user='admin',
+                data='{"name": "test"}',
+                stdout=StringIO(),
+            )
+
+        get_module_and_script.assert_called_once_with('test', 'Script')
+        enqueue.assert_called_once()
+        kwargs = enqueue.call_args.kwargs
+        self.assertEqual(kwargs['instance'], script_obj)
+        self.assertEqual(kwargs['user'], self.user)
+        self.assertTrue(kwargs['immediate'])
+        self.assertEqual(kwargs['data'], {'name': 'test'})
+        self.assertFalse(kwargs['commit'])
+
+    def test_invalid_script_data_raises_error_without_enqueueing_job(self):
+        class TestScript:
+            full_name = 'test.Script'
+
+            def as_form(self, data, files):
+                form = MagicMock()
+                form.is_valid.return_value = False
+                form.errors.get_json_data.return_value = {
+                    'name': [
+                        {'message': 'This field is required.'},
+                    ],
+                }
+                return form
+
+        script_obj = SimpleNamespace(python_class=TestScript)
+        logger = MagicMock()
+
+        with (
+            patch(
+                'extras.management.commands.runscript.get_module_and_script',
+                return_value=(None, script_obj),
+            ) as get_module_and_script,
+            patch('extras.management.commands.runscript.ScriptJob.enqueue') as enqueue,
+            patch('extras.management.commands.runscript.logging.getLogger', return_value=logger),
+        ):
+            with self.assertRaises(CommandError):
+                call_command(
+                    'runscript',
+                    'test.Script',
+                    user='admin',
+                    data='{}',
+                    stdout=StringIO(),
+                )
+
+        get_module_and_script.assert_called_once_with('test', 'Script')
+        enqueue.assert_not_called()
+        logger.error.assert_any_call('Data is not valid:')
+        logger.error.assert_any_call('\tname: This field is required.')
+
+    def test_missing_user_falls_back_to_superuser_and_empty_data(self):
+        class TestScript:
+            full_name = 'test.Script'
+
+            def as_form(self, data, files):
+                form = MagicMock()
+                form.is_valid.return_value = True
+                form.cleaned_data = {
+                    '_schedule_at': None,
+                    '_interval': None,
+                    '_commit': None,
+                }
+                form.errors.get_json_data.return_value = {}
+                return form
+
+        script_obj = SimpleNamespace(python_class=TestScript)
+        job = SimpleNamespace(duration='0 seconds')
+
+        with (
+            patch(
+                'extras.management.commands.runscript.get_module_and_script',
+                return_value=(None, script_obj),
+            ),
+            patch(
+                'extras.management.commands.runscript.ScriptJob.enqueue',
+                return_value=job,
+            ) as enqueue,
+            patch('extras.management.commands.runscript.logging.getLogger'),
+        ):
+            call_command(
+                'runscript',
+                'test.Script',
+                user='missing-user',
+                stdout=StringIO(),
+            )
+
+        kwargs = enqueue.call_args.kwargs
+        self.assertEqual(kwargs['user'], self.user)
+        self.assertEqual(kwargs['data'], {})
+
+    def test_no_user_argument_falls_back_to_first_superuser(self):
+        class TestScript:
+            full_name = 'test.Script'
+
+            def as_form(self, data, files):
+                form = MagicMock()
+                form.is_valid.return_value = True
+                form.cleaned_data = {
+                    '_schedule_at': None,
+                    '_interval': None,
+                    '_commit': None,
+                }
+                form.errors.get_json_data.return_value = {}
+                return form
+
+        script_obj = SimpleNamespace(python_class=TestScript)
+        job = SimpleNamespace(duration='0 seconds')
+
+        with (
+            patch(
+                'extras.management.commands.runscript.get_module_and_script',
+                return_value=(None, script_obj),
+            ),
+            patch(
+                'extras.management.commands.runscript.ScriptJob.enqueue',
+                return_value=job,
+            ) as enqueue,
+            patch('extras.management.commands.runscript.logging.getLogger'),
+        ):
+            call_command('runscript', 'test.Script', stdout=StringIO())
+
+        self.assertEqual(enqueue.call_args.kwargs['user'], self.user)
+
+
+class WebhookReceiverTestCase(TestCase):
+    def test_starts_http_server(self):
+        out = StringIO()
+
+        with (
+            patch('extras.management.commands.webhook_receiver.HTTPServer') as http_server,
+            patch.object(WebhookHandler, 'show_headers', True),
+        ):
+            server = http_server.return_value
+            server.serve_forever.side_effect = KeyboardInterrupt
+
+            call_command(
+                'webhook_receiver',
+                port=9999,
+                no_headers=True,
+                stdout=out,
+            )
+
+            self.assertFalse(WebhookHandler.show_headers)
+
+        http_server.assert_called_once_with(('localhost', 9999), WebhookHandler)
+        server.serve_forever.assert_called_once_with()
+        self.assertIn('Listening on port http://localhost:9999', out.getvalue())
+        self.assertIn('Exiting', out.getvalue())
+
+    def test_handler_routes_arbitrary_http_methods(self):
+        handler = object.__new__(WebhookHandler)
+
+        self.assertEqual(handler.__getattr__('do_PATCH').__func__, WebhookHandler.do_ANY)
+        with self.assertRaises(AttributeError):
+            handler.__getattr__('missing')
+
+    def test_handler_logs_request_message(self):
+        handler = object.__new__(WebhookHandler)
+        handler.date_time_string = MagicMock(return_value='now')
+        handler.address_string = MagicMock(return_value='127.0.0.1')
+
+        with (
+            patch('extras.management.commands.webhook_receiver.request_counter', 7),
+            patch('builtins.print') as print_,
+        ):
+            handler.log_message('%s', 'message')
+
+        print_.assert_called_once_with('[7] now 127.0.0.1 message')
+
+    def test_handler_accepts_json_request_body(self):
+        handler = object.__new__(WebhookHandler)
+        body = b'{"ok": true}'
+        handler.headers = {
+            'Content-Length': str(len(body)),
+            'Content-Type': 'application/json',
+            'X-Test': 'value',
+        }
+        handler.rfile = BytesIO(body)
+        handler.wfile = BytesIO()
+        handler.send_response = MagicMock()
+        handler.end_headers = MagicMock()
+        handler.show_headers = True
+
+        with (
+            patch('extras.management.commands.webhook_receiver.request_counter', 1),
+            patch('builtins.print') as print_,
+        ):
+            handler.do_ANY()
+            self.assertEqual(webhook_receiver.request_counter, 2)
+
+        handler.send_response.assert_called_once_with(200)
+        handler.end_headers.assert_called_once_with()
+        self.assertEqual(handler.wfile.getvalue(), b'Webhook received!\n')
+        print_.assert_any_call('X-Test: value')
+        print_.assert_any_call('{\n    "ok": true\n}')
+        print_.assert_any_call('Completed request #1')
+
+    def test_handler_prints_no_body_when_content_length_is_missing(self):
+        handler = object.__new__(WebhookHandler)
+        handler.headers = {}
+        handler.rfile = BytesIO()
+        handler.wfile = BytesIO()
+        handler.send_response = MagicMock()
+        handler.end_headers = MagicMock()
+        handler.show_headers = False
+
+        with (
+            patch('extras.management.commands.webhook_receiver.request_counter', 1),
+            patch('builtins.print') as print_,
+        ):
+            handler.do_ANY()
+
+        print_.assert_any_call('(No body)')
+        print_.assert_any_call('Completed request #1')

+ 84 - 0
netbox/ipam/tests/test_management_commands.py

@@ -0,0 +1,84 @@
+from io import StringIO
+from unittest.mock import patch
+
+from django.core.management import call_command
+from django.test import TestCase
+from netaddr import IPNetwork
+
+from ipam.models import Prefix
+
+
+class RebuildPrefixesTestCase(TestCase):
+    def test_rebuilds_global_prefix_tree(self):
+        out = StringIO()
+
+        with (
+            patch('ipam.management.commands.rebuild_prefixes.Prefix') as prefix_model,
+            patch('ipam.management.commands.rebuild_prefixes.VRF') as vrf_model,
+            patch('ipam.management.commands.rebuild_prefixes.rebuild_prefixes') as rebuild_prefixes,
+        ):
+            prefix_model.objects.count.return_value = 0
+            prefix_model.objects.filter.return_value.count.return_value = 0
+            vrf_model.objects.all.return_value = []
+            call_command('rebuild_prefixes', stdout=out)
+
+        rebuild_prefixes.assert_called_once_with(None)
+        prefix_model.objects.update.assert_called_once_with(_depth=0, _children=0)
+        self.assertIn('Rebuilding 0 prefixes', out.getvalue())
+        self.assertIn('Finished.', out.getvalue())
+
+    def test_hierarchy_is_correct_after_rebuild(self):
+        Prefix.objects.bulk_create(
+            [
+                Prefix(prefix=IPNetwork('10.0.0.0/8')),
+                Prefix(prefix=IPNetwork('10.0.0.0/16')),
+                Prefix(prefix=IPNetwork('10.0.0.0/24')),
+            ]
+        )
+
+        out = StringIO()
+        call_command('rebuild_prefixes', stdout=out)
+
+        self.assertIn('Finished.', out.getvalue())
+        self.assertEqual(Prefix.objects.get(prefix=IPNetwork('10.0.0.0/8'))._depth, 0)
+        self.assertEqual(Prefix.objects.get(prefix=IPNetwork('10.0.0.0/8'))._children, 2)
+        self.assertEqual(Prefix.objects.get(prefix=IPNetwork('10.0.0.0/16'))._depth, 1)
+        self.assertEqual(Prefix.objects.get(prefix=IPNetwork('10.0.0.0/16'))._children, 1)
+        self.assertEqual(Prefix.objects.get(prefix=IPNetwork('10.0.0.0/24'))._depth, 2)
+        self.assertEqual(Prefix.objects.get(prefix=IPNetwork('10.0.0.0/24'))._children, 0)
+
+    def test_rebuilds_prefix_tree_for_each_vrf(self):
+        class FakeVRF:
+            pk = 123
+
+            def __str__(self):
+                return 'Tenant VRF'
+
+        class FakeQuerySet:
+            def __init__(self, count):
+                self._count = count
+
+            def count(self):
+                return self._count
+
+        out = StringIO()
+        vrf = FakeVRF()
+
+        with (
+            patch('ipam.management.commands.rebuild_prefixes.Prefix') as prefix_model,
+            patch('ipam.management.commands.rebuild_prefixes.VRF') as vrf_model,
+            patch('ipam.management.commands.rebuild_prefixes.rebuild_prefixes') as rebuild_prefixes,
+        ):
+            prefix_model.objects.count.return_value = 3
+            prefix_model.objects.filter.side_effect = [
+                FakeQuerySet(1),
+                FakeQuerySet(2),
+            ]
+            vrf_model.objects.all.return_value = [vrf]
+
+            call_command('rebuild_prefixes', stdout=out)
+
+        rebuild_prefixes.assert_any_call(None)
+        rebuild_prefixes.assert_any_call(vrf.pk)
+        self.assertIn('Global: 1 prefixes', out.getvalue())
+        self.assertIn('VRF Tenant VRF: 2 prefixes', out.getvalue())

+ 51 - 0
netbox/utilities/tests/test_management_commands.py

@@ -0,0 +1,51 @@
+from io import StringIO
+from unittest.mock import MagicMock, patch
+
+from django.core.management import call_command
+from django.test import TestCase
+
+from utilities.management.commands.calculate_cached_counts import Command
+
+
+class CalculateCachedCountsTestCase(TestCase):
+    def test_updates_registered_counter_fields(self):
+        class ParentModel:
+            pass
+
+        out = StringIO()
+
+        with (
+            patch.object(
+                Command,
+                'collect_models',
+                return_value={ParentModel: {'interface_count': 'interfaces'}},
+            ),
+            patch('utilities.management.commands.calculate_cached_counts.update_counts') as update_counts,
+        ):
+            call_command('calculate_cached_counts', stdout=out)
+
+        update_counts.assert_called_once_with(ParentModel, 'interface_count', 'interfaces')
+        self.assertIn('Finished.', out.getvalue())
+
+    def test_collect_models_returns_counter_field_mappings_by_parent_model(self):
+        class ParentModel:
+            pass
+
+        class ChildModel:
+            pass
+
+        fk_field = MagicMock()
+        fk_field.related_model = ParentModel
+        fk_field.related_query_name.return_value = 'children'
+        ChildModel._meta = MagicMock()
+        ChildModel._meta.get_field.return_value = fk_field
+
+        with patch(
+            'utilities.management.commands.calculate_cached_counts.registry',
+            {'counter_fields': {ChildModel: {'parent': 'child_count'}}},
+        ):
+            models = Command.collect_models()
+
+        ChildModel._meta.get_field.assert_called_once_with('parent')
+        fk_field.related_query_name.assert_called_once_with()
+        self.assertEqual(dict(models), {ParentModel: {'child_count': 'children'}})