Browse Source

test(signals): Add signal handler test coverage

Add comprehensive test coverage for signal handlers in circuits, core,
dcim, extras, ipam, users, virtualization, and wireless, including
cable path rebuilding, change logging, data source sync, scope
propagation, and notification dispatch logic.

Fixes #22098
Martin Hauser 1 week ago
parent
commit
e60ba2859d

+ 92 - 0
netbox/circuits/tests/test_signals.py

@@ -0,0 +1,92 @@
+from types import SimpleNamespace
+from unittest.mock import MagicMock, patch
+
+from django.test import SimpleTestCase, TestCase
+
+from circuits import signals
+from circuits.models import Circuit, CircuitTermination, CircuitType, Provider
+from dcim.models import (
+    Cable,
+    CablePath,
+    Device,
+    DeviceRole,
+    DeviceType,
+    Interface,
+    Manufacturer,
+    Site,
+)
+
+
+class RebuildCablepathsSignalTestCase(TestCase):
+    """
+    Verify circuits.signals.rebuild_cablepaths retraces paths that cross the peer termination
+    when a CircuitTermination is saved or deleted.
+    """
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.site = Site.objects.create(name='Site', slug='site')
+        manufacturer = Manufacturer.objects.create(name='Manufacturer', slug='manufacturer')
+        device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Device Type')
+        device_role = DeviceRole.objects.create(name='Device Role', slug='device-role')
+        cls.device = Device.objects.create(site=cls.site, device_type=device_type, role=device_role, name='Device 1')
+        provider = Provider.objects.create(name='Provider', slug='provider')
+        circuit_type = CircuitType.objects.create(name='Circuit Type', slug='circuit-type')
+        cls.circuit = Circuit.objects.create(provider=provider, type=circuit_type, cid='Circuit 1')
+
+    def test_saving_termination_rebuilds_peer_path(self):
+        interface = Interface.objects.create(device=self.device, name='Interface 1')
+        site_z = Site.objects.create(name='Site Z', slug='site-z')
+        termination_a = CircuitTermination.objects.create(circuit=self.circuit, termination=self.site, term_side='A')
+        termination_z = CircuitTermination.objects.create(circuit=self.circuit, termination=site_z, term_side='Z')
+        Cable(a_terminations=[interface], b_terminations=[termination_a]).save()
+        original_path = CablePath.objects.get()
+
+        # Saving the Z (peer) termination should cause rebuild_paths to run for the A peer.
+        with patch('circuits.signals.rebuild_paths') as rebuild_paths:
+            termination_z.save()
+
+        rebuild_paths.assert_called_once()
+        rebuilt_for = rebuild_paths.call_args.args[0]
+        self.assertEqual([t.pk for t in rebuilt_for], [termination_a.pk])
+
+        # Without patching, the real signal should retrace the path successfully.
+        termination_z.save()
+        self.assertEqual(CablePath.objects.count(), 1)
+        self.assertNotEqual(CablePath.objects.get().pk, original_path.pk)
+
+    def test_deleting_termination_rebuilds_peer_path(self):
+        site_z = Site.objects.create(name='Site Z', slug='site-z')
+        termination_a = CircuitTermination.objects.create(circuit=self.circuit, termination=self.site, term_side='A')
+        termination_z = CircuitTermination.objects.create(circuit=self.circuit, termination=site_z, term_side='Z')
+
+        with patch('circuits.signals.rebuild_paths') as rebuild_paths:
+            termination_z.delete()
+
+        rebuild_paths.assert_called_once()
+        rebuilt_for = rebuild_paths.call_args.args[0]
+        self.assertEqual([t.pk for t in rebuilt_for], [termination_a.pk])
+
+    def test_saving_termination_without_peer_does_not_rebuild(self):
+        termination = CircuitTermination.objects.create(circuit=self.circuit, termination=self.site, term_side='A')
+
+        with patch('circuits.signals.rebuild_paths') as rebuild_paths:
+            termination.save()
+
+        rebuild_paths.assert_not_called()
+
+
+class RebuildCablepathsDirectHandlerTestCase(SimpleTestCase):
+    """
+    Direct-call tests for rebuild_cablepaths branches that are not reachable through
+    normal model operations (e.g. raw=True is only set by Django's loaddata pathway).
+    """
+
+    def test_raw_import_skips_peer_lookup_and_rebuild(self):
+        instance = SimpleNamespace(get_peer_termination=MagicMock())
+
+        with patch.object(signals, 'rebuild_paths') as rebuild_paths:
+            signals.rebuild_cablepaths(instance=instance, raw=True)
+
+        instance.get_peer_termination.assert_not_called()
+        rebuild_paths.assert_not_called()

+ 505 - 0
netbox/core/tests/test_signals.py

@@ -0,0 +1,505 @@
+import uuid
+from types import SimpleNamespace
+from unittest.mock import MagicMock, Mock, patch
+
+from django.contrib.contenttypes.models import ContentType
+from django.core.exceptions import ValidationError
+from django.core.signals import request_finished
+from django.db import transaction
+from django.test import RequestFactory, SimpleTestCase, TestCase, override_settings
+
+from core import signals
+from core.choices import DataSourceStatusChoices, JobStatusChoices, ObjectChangeActionChoices
+from core.models import ConfigRevision, DataSource, ObjectChange, ObjectType
+from core.signals import _signals_received, clear_events, post_sync
+from dcim.models import Site, SiteGroup
+from extras.models import Tag
+from extras.validators import CustomValidator
+from netbox.context import events_queue
+from netbox.context_managers import event_tracking
+from users.models import User
+from utilities.exceptions import AbortRequest
+
+
+def _build_request(user):
+    request = RequestFactory().get('/')
+    request.id = uuid.uuid4()
+    request.user = user
+    return request
+
+
+class UpdateObjectTypesSignalTestCase(TestCase):
+    """
+    Verify core.signals.update_object_types has registered an ObjectType for known
+    models, with the expected public flag and feature set.
+    """
+
+    def test_public_model_object_type_is_registered(self):
+        ot = ObjectType.objects.get(app_label='dcim', model='site')
+        self.assertTrue(ot.public)
+        # Site supports several features — verify a couple representative ones.
+        self.assertIn('custom_fields', ot.features)
+        self.assertIn('tags', ot.features)
+
+    def test_private_model_object_type_is_registered_as_non_public(self):
+        ot = ObjectType.objects.get(app_label='dcim', model='cablepath')
+        self.assertFalse(ot.public)
+
+
+class HandleChangedObjectSignalTestCase(TestCase):
+    """
+    Verify core.signals.handle_changed_object writes an ObjectChange and increments
+    metric counters whenever a tracked object is created or updated within a request.
+    """
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.user = User.objects.create_user(username='alice', password='pw')
+
+    def test_create_records_an_objectchange(self):
+        request = _build_request(self.user)
+        with event_tracking(request):
+            site = Site.objects.create(name='Site 1', slug='site-1')
+
+        oc = ObjectChange.objects.get(
+            changed_object_type=ContentType.objects.get_for_model(Site),
+            changed_object_id=site.pk,
+        )
+        self.assertEqual(oc.action, ObjectChangeActionChoices.ACTION_CREATE)
+        self.assertEqual(oc.user, self.user)
+        self.assertEqual(oc.request_id, request.id)
+
+    def test_update_records_an_objectchange(self):
+        site = Site.objects.create(name='Site 1', slug='site-1')
+        request = _build_request(self.user)
+
+        with event_tracking(request):
+            site.description = 'updated'
+            site.save()
+
+        ocs = ObjectChange.objects.filter(
+            changed_object_type=ContentType.objects.get_for_model(Site),
+            changed_object_id=site.pk,
+        ).order_by('-pk')
+        self.assertEqual(ocs.first().action, ObjectChangeActionChoices.ACTION_UPDATE)
+
+    def test_no_request_skips_objectchange(self):
+        # Saving outside a request context (no event_tracking) should not record any
+        # ObjectChange entries.
+        Site.objects.create(name='Site 1', slug='site-1')
+        self.assertEqual(ObjectChange.objects.count(), 0)
+
+    def test_m2m_tag_change_records_objectchange_with_postchange_tags(self):
+        site = Site.objects.create(name='Site 1', slug='site-1')
+        tag = Tag.objects.create(name='Important', slug='important')
+        request = _build_request(self.user)
+
+        with event_tracking(request):
+            site.tags.add(tag)
+
+        oc = ObjectChange.objects.filter(
+            changed_object_type=ContentType.objects.get_for_model(Site),
+            changed_object_id=site.pk,
+        ).first()
+        self.assertEqual(oc.postchange_data['tags'], ['Important'])
+
+
+class HandleDeletedObjectSignalTestCase(TestCase):
+    """
+    Verify core.signals.handle_deleted_object writes a delete-type ObjectChange and
+    respects PROTECTION_RULES.
+    """
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.user = User.objects.create_user(username='alice', password='pw')
+
+    def setUp(self):
+        # Reset the in-memory pre_delete bookkeeping; the signal's de-dup set lives in a
+        # threading.local that is not rolled back between TestCase methods.
+        _signals_received.pre_delete = set()
+
+    def test_delete_records_an_objectchange(self):
+        site = Site.objects.create(name='Site 1', slug='site-1')
+        site_pk = site.pk
+        site_type = ContentType.objects.get_for_model(Site)
+        request = _build_request(self.user)
+
+        with event_tracking(request):
+            site.delete()
+
+        oc = ObjectChange.objects.get(changed_object_type=site_type, changed_object_id=site_pk)
+        self.assertEqual(oc.action, ObjectChangeActionChoices.ACTION_DELETE)
+        self.assertIsNone(oc.postchange_data)
+
+    @override_settings(PROTECTION_RULES={'dcim.site': [CustomValidator({'name': {'neq': 'protected'}})]})
+    def test_protection_rule_violation_aborts_deletion(self):
+        site = Site.objects.create(name='protected', slug='protected')
+        request = _build_request(self.user)
+
+        with event_tracking(request):
+            # The signal raises AbortRequest from a pre_delete handler, which can poison
+            # the surrounding transaction; isolate the delete in its own atomic block.
+            with self.assertRaises((AbortRequest, ValidationError)):
+                with transaction.atomic():
+                    site.delete()
+
+        self.assertTrue(Site.objects.filter(pk=site.pk).exists())
+
+    def test_delete_records_single_objectchange(self):
+        # A delete should record exactly one ObjectChange for the deleted object.
+        site = Site.objects.create(name='Site 1', slug='site-1')
+        site_pk = site.pk
+        site_type = ContentType.objects.get_for_model(Site)
+        request = _build_request(self.user)
+
+        with event_tracking(request):
+            site.delete()
+
+        ocs = ObjectChange.objects.filter(changed_object_type=site_type, changed_object_id=site_pk)
+        self.assertEqual(ocs.count(), 1)
+
+    def test_duplicate_pre_delete_for_same_instance_is_ignored(self):
+        # Exercise the dedup short-circuit by invoking the handler twice for the
+        # same instance, mirroring what happens when a parent and its child are
+        # deleted simultaneously and the same pre_delete fires more than once.
+        # Only the first invocation should record an ObjectChange.
+        site = Site.objects.create(name='Site 1', slug='site-1')
+        site_pk = site.pk
+        site_type = ContentType.objects.get_for_model(Site)
+        request = _build_request(self.user)
+
+        with event_tracking(request):
+            signals.handle_deleted_object(sender=Site, instance=site)
+            signals.handle_deleted_object(sender=Site, instance=site)
+
+        ocs = ObjectChange.objects.filter(
+            changed_object_type=site_type,
+            changed_object_id=site_pk,
+            action=ObjectChangeActionChoices.ACTION_DELETE,
+        )
+        self.assertEqual(ocs.count(), 1)
+
+    def test_delete_records_change_for_objects_with_nulled_fk(self):
+        # When a parent is deleted, related objects with on_delete=SET_NULL have
+        # their FK cleared by the signal *and* receive a change-log entry via
+        # snapshot()+save(). Without the signal, Django's SET_NULL would clear
+        # the FK silently with no ObjectChange.
+        group = SiteGroup.objects.create(name='Group', slug='group')
+        group_pk = group.pk
+        site = Site.objects.create(name='Site', slug='site', group=group)
+        site_type = ContentType.objects.get_for_model(Site)
+        request = _build_request(self.user)
+
+        with event_tracking(request):
+            group.delete()
+
+        site.refresh_from_db()
+        self.assertIsNone(site.group)
+        oc = ObjectChange.objects.get(
+            changed_object_type=site_type,
+            changed_object_id=site.pk,
+            action=ObjectChangeActionChoices.ACTION_UPDATE,
+        )
+        self.assertEqual(oc.prechange_data['group'], group_pk)
+        self.assertIsNone(oc.postchange_data['group'])
+
+
+class ClearSignalHistorySignalTestCase(TestCase):
+    """
+    Verify core.signals.clear_signal_history resets the pre_delete bookkeeping at the
+    end of every request.
+    """
+
+    def test_request_finished_clears_history(self):
+        _signals_received.pre_delete = {('a', 1), ('b', 2)}
+
+        request_finished.send(sender=self.__class__)
+
+        self.assertEqual(_signals_received.pre_delete, set())
+
+
+class ClearEventsQueueSignalTestCase(TestCase):
+    """
+    Verify core.signals.clear_events_queue empties the in-flight events queue when the
+    clear_events signal fires (e.g. during a rolled-back bulk transaction).
+    """
+
+    def test_clear_events_signal_empties_the_queue(self):
+        events_queue.set({'event-1': object(), 'event-2': object()})
+
+        clear_events.send(sender='test-suite')
+
+        self.assertEqual(events_queue.get(), {})
+
+
+class EnqueueSyncJobSignalTestCase(TestCase):
+    """
+    Verify core.signals.enqueue_sync_job schedules a recurring sync job when a
+    DataSource is saved with a sync_interval, and removes any existing schedule
+    otherwise.
+    """
+
+    def test_saving_datasource_with_interval_enqueues_sync_job(self):
+        with patch('core.jobs.SyncDataSourceJob') as sync_job:
+            DataSource.objects.create(
+                name='DS 1',
+                type='local',
+                source_url='/tmp/ds1',
+                enabled=True,
+                sync_interval=60,
+            )
+
+        sync_job.enqueue_once.assert_called_once()
+        _, kwargs = sync_job.enqueue_once.call_args
+        self.assertEqual(kwargs.get('interval'), 60)
+
+    def test_disabled_datasource_clears_scheduled_jobs(self):
+        class ScheduledJobQueryset:
+            def __init__(self, jobs):
+                self.jobs = jobs
+                self.defer = Mock(return_value=self)
+                self.filter = Mock(return_value=self)
+
+            def __iter__(self):
+                return iter(self.jobs)
+
+        with patch('core.jobs.SyncDataSourceJob') as sync_job:
+            ds = DataSource.objects.create(
+                name='DS 1',
+                type='local',
+                source_url='/tmp/ds1',
+                enabled=True,
+                sync_interval=60,
+            )
+            sync_job.reset_mock()
+
+            scheduled_job = Mock()
+            scheduled_jobs = ScheduledJobQueryset([scheduled_job])
+            sync_job.get_jobs.return_value = scheduled_jobs
+            ds.enabled = False
+            ds.sync_interval = None
+            ds.save()
+
+        sync_job.get_jobs.assert_called_once_with(ds)
+        scheduled_jobs.defer.assert_called_once_with('data')
+        scheduled_jobs.filter.assert_called_once_with(interval__isnull=False, status=JobStatusChoices.STATUS_SCHEDULED)
+        scheduled_job.delete.assert_called_once_with()
+
+    def test_creating_disabled_datasource_does_not_enqueue(self):
+        with patch('core.jobs.SyncDataSourceJob') as sync_job:
+            DataSource.objects.create(
+                name='DS 1',
+                type='local',
+                source_url='/tmp/ds1',
+                enabled=False,
+                sync_interval=None,
+            )
+
+        sync_job.enqueue_once.assert_not_called()
+        sync_job.get_jobs.assert_not_called()
+
+
+class AutoSyncSignalTestCase(TestCase):
+    """
+    Verify core.signals.auto_sync re-syncs every AutoSyncRecord linked to the
+    DataSource when post_sync fires.
+    """
+
+    def test_post_sync_resyncs_dependent_records(self):
+        ds = DataSource.objects.create(
+            name='DS 1',
+            type='local',
+            source_url='/tmp/ds1',
+            status=DataSourceStatusChoices.COMPLETED,
+        )
+        record_a = SimpleNamespace(object=SimpleNamespace(synced=False))
+        record_a.object.sync = lambda save: setattr(record_a.object, 'synced', save)
+        record_b = SimpleNamespace(object=SimpleNamespace(synced=False))
+        record_b.object.sync = lambda save: setattr(record_b.object, 'synced', save)
+
+        with patch('core.models.AutoSyncRecord') as autosync_model:
+            autosync_model.objects.filter.return_value.prefetch_related.return_value = [
+                record_a,
+                record_b,
+            ]
+            post_sync.send(sender=ds.__class__, instance=ds)
+
+        self.assertTrue(record_a.object.synced)
+        self.assertTrue(record_b.object.synced)
+
+
+class UpdateConfigSignalTestCase(TestCase):
+    """
+    Verify core.signals.update_config invokes activate() on a newly-saved
+    ConfigRevision.
+    """
+
+    def test_saving_config_revision_activates_it(self):
+        with patch.object(ConfigRevision, 'activate') as activate:
+            ConfigRevision.objects.create(data={'foo': 1}, comment='test')
+
+        activate.assert_called_once()
+
+
+class HandleChangedObjectDirectHandlerTestCase(SimpleTestCase):
+    """
+    Direct-call tests for handle_changed_object branches that are not naturally
+    reachable through ORM operations (a real .save() always produces changes, and
+    Django collapses every m2m_changed action into a single dispatch the handler
+    cannot fully simulate via real m2m operations).
+    """
+
+    def _instance(self):
+        objectchange = MagicMock()
+        objectchange.has_changes = True
+        objectchange.postchange_data = {'name': 'Device 1'}
+        instance = SimpleNamespace(
+            pk=123,
+            _meta=SimpleNamespace(model_name='device'),
+            refresh_from_db=MagicMock(),
+        )
+        instance.to_objectchange = MagicMock(return_value=objectchange)
+        return instance, objectchange
+
+    def test_unhandled_m2m_action_returns_without_recording(self):
+        request = SimpleNamespace(id='request-id', user='alice')
+        instance, _ = self._instance()
+        current_request = MagicMock()
+        current_request.get.return_value = request
+
+        with patch.object(signals, 'current_request', current_request):
+            signals.handle_changed_object(
+                sender=None,
+                instance=instance,
+                action='pre_add',
+                pk_set={1},
+            )
+
+        instance.to_objectchange.assert_not_called()
+
+    def test_objectchange_without_changes_is_not_saved(self):
+        request = SimpleNamespace(id='request-id', user='alice')
+        instance, objectchange = self._instance()
+        objectchange.has_changes = False
+        current_request = MagicMock()
+        current_request.get.return_value = request
+        events_queue_mock = MagicMock()
+        events_queue_mock.get.return_value = {}
+        update_metric = MagicMock()
+
+        with (
+            patch.object(signals, 'current_request', current_request),
+            patch.object(signals, 'events_queue', events_queue_mock),
+            patch.object(signals, 'enqueue_event') as enqueue_event,
+            patch.object(signals.model_updates, 'labels', return_value=update_metric),
+        ):
+            signals.handle_changed_object(sender=None, instance=instance, created=False)
+
+        instance.to_objectchange.assert_called_once_with(ObjectChangeActionChoices.ACTION_UPDATE)
+        # has_changes is False, so the ObjectChange should never be saved …
+        objectchange.save.assert_not_called()
+        # … but metric counters and event enqueueing still run.
+        update_metric.inc.assert_called_once_with()
+        enqueue_event.assert_called_once()
+
+    def test_m2m_change_updates_existing_objectchange_in_same_request(self):
+        request = SimpleNamespace(id='request-id', user='alice')
+        instance, objectchange = self._instance()
+        previous_change = MagicMock()
+        current_request = MagicMock()
+        current_request.get.return_value = request
+        events_queue_mock = MagicMock()
+        events_queue_mock.get.return_value = {}
+        objectchange_model = MagicMock()
+        objectchange_model.objects.filter.return_value.first.return_value = previous_change
+        content_type_model = MagicMock()
+        content_type_model.objects.get_for_model.return_value = object()
+
+        with (
+            patch.object(signals, 'current_request', current_request),
+            patch.object(signals, 'events_queue', events_queue_mock),
+            patch.object(signals, 'ObjectChange', objectchange_model),
+            patch.object(signals, 'ContentType', content_type_model),
+            patch.object(signals, 'enqueue_event'),
+            patch.object(signals.model_updates, 'labels', return_value=MagicMock()),
+        ):
+            signals.handle_changed_object(
+                sender=None,
+                instance=instance,
+                action='post_add',
+                pk_set={1},
+            )
+
+        # The handler should update the existing ObjectChange instead of creating a new one.
+        self.assertEqual(previous_change.postchange_data, objectchange.postchange_data)
+        previous_change.save.assert_called_once_with()
+        objectchange.save.assert_not_called()
+        instance.refresh_from_db.assert_called_once_with()
+
+
+class HandleDeletedObjectDirectHandlerTestCase(SimpleTestCase):
+    """
+    Direct-call tests for handle_deleted_object branches that are hard to construct
+    via real model operations (notably _netbox_private skip, which requires a
+    private-model instance with reverse relations the test can introspect).
+    """
+
+    def setUp(self):
+        _signals_received.pre_delete = set()
+
+    def test_private_model_skips_reverse_relation_processing(self):
+        # Build a relation the handler would normally process — a ManyToOneRel-typed
+        # instance pointing at a ChangeLoggingMixin subclass. The handler narrows by
+        # exact type (`type(relation) is ManyToOneRel`), so do not use
+        # MagicMock(spec=ManyToOneRel): it would be skipped before reaching the
+        # _netbox_private branch. Patching ManyToOneRel to this fake class keeps the
+        # exact-type check meaningful, so related_model.objects.filter() would be
+        # called if the private-model skip failed.
+        class FakeManyToOneRel:
+            pass
+
+        class FakeChangeLoggingMixin:
+            pass
+
+        class FakeRelatedModel(FakeChangeLoggingMixin):
+            pass
+
+        FakeRelatedModel.objects = MagicMock()
+
+        fake_relation = FakeManyToOneRel()
+        fake_relation.related_model = FakeRelatedModel
+        fake_relation.remote_field = SimpleNamespace(name='parent')
+        fake_relation.null = True
+        fake_relation.on_delete = object()
+
+        sender = SimpleNamespace(_meta=SimpleNamespace(app_label='dcim', model_name='cablepath'))
+        request = SimpleNamespace(id='request-id', user='alice')
+        instance = SimpleNamespace(
+            pk=1,
+            _meta=SimpleNamespace(model_name='cablepath', related_objects=[fake_relation]),
+            _netbox_private=True,
+        )
+        # Private models typically don't have to_objectchange, so skip change-log too.
+        config = SimpleNamespace(PROTECTION_RULES={})
+        current_request = MagicMock()
+        current_request.get.return_value = request
+        events_queue_mock = MagicMock()
+        events_queue_mock.get.return_value = {}
+
+        with (
+            patch.object(signals, 'ManyToOneRel', FakeManyToOneRel),
+            patch.object(signals, 'get_config', return_value=config),
+            patch.object(signals, 'get_config_value_ci', return_value=[]),
+            patch.object(signals, 'run_validators'),
+            patch.object(signals, 'current_request', current_request),
+            patch.object(signals, 'events_queue', events_queue_mock),
+            patch.object(signals, 'ContentType') as content_type_model,
+            patch.object(signals, 'ChangeLoggingMixin', FakeChangeLoggingMixin),
+            patch.object(signals, 'enqueue_event'),
+            patch.object(signals.model_deletes, 'labels', return_value=MagicMock()),
+        ):
+            content_type_model.objects.get_for_model.return_value = object()
+            signals.handle_deleted_object(sender=sender, instance=instance)
+
+        FakeRelatedModel.objects.filter.assert_not_called()

+ 488 - 0
netbox/dcim/tests/test_signals.py

@@ -0,0 +1,488 @@
+from types import SimpleNamespace
+from unittest.mock import MagicMock, patch
+
+from django.contrib.contenttypes.models import ContentType
+from django.test import SimpleTestCase, TestCase
+
+from dcim import signals
+from dcim.choices import CableEndChoices, LinkStatusChoices
+from dcim.models import (
+    Cable,
+    CablePath,
+    Device,
+    DeviceRole,
+    DeviceType,
+    FrontPort,
+    Interface,
+    Location,
+    MACAddress,
+    Manufacturer,
+    PortMapping,
+    PowerPanel,
+    Rack,
+    RearPort,
+    Site,
+    SiteGroup,
+    VirtualChassis,
+)
+from ipam.models import Prefix
+from virtualization.models import Cluster, ClusterType
+from wireless.models import WirelessLAN
+
+
+class LocationSiteChangeSignalTestCase(TestCase):
+    """
+    Verify dcim.signals.handle_location_site_change propagates a Location's new Site to
+    every descendant Location, Rack, Device, PowerPanel, and component when the parent
+    Location's site assignment changes.
+    """
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.site_a = Site.objects.create(name='Site A', slug='site-a')
+        cls.site_b = Site.objects.create(name='Site B', slug='site-b')
+        manufacturer = Manufacturer.objects.create(name='Manufacturer', slug='manufacturer')
+        cls.device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Device Type')
+        cls.device_role = DeviceRole.objects.create(name='Device Role', slug='device-role')
+
+    def test_changing_location_site_propagates_to_children(self):
+        parent_location = Location.objects.create(name='Parent', slug='parent', site=self.site_a)
+        child_location = Location.objects.create(name='Child', slug='child', site=self.site_a, parent=parent_location)
+        rack = Rack.objects.create(name='Rack', site=self.site_a, location=parent_location)
+        device = Device.objects.create(
+            name='Device',
+            site=self.site_a,
+            location=parent_location,
+            device_type=self.device_type,
+            role=self.device_role,
+        )
+        interface = Interface.objects.create(device=device, name='Interface 1')
+        power_panel = PowerPanel.objects.create(name='Panel', site=self.site_a, location=parent_location)
+
+        parent_location.site = self.site_b
+        parent_location.save()
+
+        child_location.refresh_from_db()
+        rack.refresh_from_db()
+        device.refresh_from_db()
+        interface.refresh_from_db()
+        power_panel.refresh_from_db()
+        self.assertEqual(child_location.site, self.site_b)
+        self.assertEqual(rack.site, self.site_b)
+        self.assertEqual(device.site, self.site_b)
+        self.assertEqual(interface._site, self.site_b)
+        self.assertEqual(power_panel.site, self.site_b)
+
+    def test_creating_location_does_not_attempt_to_propagate(self):
+        # Should not raise — newly-created locations have no descendants.
+        Location.objects.create(name='New', slug='new', site=self.site_a)
+
+
+class RackSiteChangeSignalTestCase(TestCase):
+    """
+    Verify dcim.signals.handle_rack_site_change propagates a Rack's site/location to its
+    Devices and their components when the Rack is moved.
+    """
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.site_a = Site.objects.create(name='Site A', slug='site-a')
+        cls.site_b = Site.objects.create(name='Site B', slug='site-b')
+        cls.location_b = Location.objects.create(name='Loc B', slug='loc-b', site=cls.site_b)
+        manufacturer = Manufacturer.objects.create(name='Manufacturer', slug='manufacturer')
+        cls.device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Device Type')
+        cls.device_role = DeviceRole.objects.create(name='Device Role', slug='device-role')
+
+    def test_changing_rack_site_propagates_to_devices_and_components(self):
+        rack = Rack.objects.create(name='Rack', site=self.site_a)
+        device = Device.objects.create(
+            name='Device',
+            site=self.site_a,
+            rack=rack,
+            device_type=self.device_type,
+            role=self.device_role,
+        )
+        interface = Interface.objects.create(device=device, name='Interface 1')
+
+        rack.site = self.site_b
+        rack.location = self.location_b
+        rack.save()
+
+        device.refresh_from_db()
+        interface.refresh_from_db()
+        self.assertEqual(device.site, self.site_b)
+        self.assertEqual(device.location, self.location_b)
+        self.assertEqual(interface._site, self.site_b)
+        self.assertEqual(interface._location, self.location_b)
+
+
+class DeviceSiteChangeSignalTestCase(TestCase):
+    """
+    Verify dcim.signals.handle_device_site_change propagates a Device's site/location/rack
+    to its components on save.
+    """
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.site_a = Site.objects.create(name='Site A', slug='site-a')
+        cls.site_b = Site.objects.create(name='Site B', slug='site-b')
+        manufacturer = Manufacturer.objects.create(name='Manufacturer', slug='manufacturer')
+        cls.device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Device Type')
+        cls.device_role = DeviceRole.objects.create(name='Device Role', slug='device-role')
+
+    def test_moving_device_updates_components_cached_scope(self):
+        device = Device.objects.create(
+            name='Device',
+            site=self.site_a,
+            device_type=self.device_type,
+            role=self.device_role,
+        )
+        interface = Interface.objects.create(device=device, name='Interface 1')
+        self.assertEqual(interface._site, self.site_a)
+
+        device.site = self.site_b
+        device.save()
+
+        interface.refresh_from_db()
+        self.assertEqual(interface._site, self.site_b)
+
+
+class VirtualChassisMasterSignalTestCase(TestCase):
+    """
+    Verify dcim.signals.assign_virtualchassis_master links the master device back to a
+    newly-created VirtualChassis.
+    """
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.site = Site.objects.create(name='Site', slug='site')
+        manufacturer = Manufacturer.objects.create(name='Manufacturer', slug='manufacturer')
+        cls.device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Device Type')
+        cls.device_role = DeviceRole.objects.create(name='Device Role', slug='device-role')
+
+    def test_master_is_assigned_to_new_virtual_chassis(self):
+        master = Device.objects.create(
+            name='Master',
+            site=self.site,
+            device_type=self.device_type,
+            role=self.device_role,
+        )
+        vc = VirtualChassis.objects.create(name='VC 1', master=master)
+
+        master.refresh_from_db()
+        self.assertEqual(master.virtual_chassis, vc)
+        self.assertEqual(master.vc_position, 1)
+
+    def test_updating_virtual_chassis_does_not_reassign_master(self):
+        master = Device.objects.create(
+            name='Master',
+            site=self.site,
+            device_type=self.device_type,
+            role=self.device_role,
+        )
+        vc = VirtualChassis.objects.create(name='VC 1', master=master)
+
+        # Detach the master, then save the VC again — the signal should not re-link.
+        master.virtual_chassis = None
+        master.vc_position = None
+        master.save()
+
+        vc.domain = 'updated'
+        vc.save()
+
+        master.refresh_from_db()
+        self.assertIsNone(master.virtual_chassis)
+
+
+class CableSignalTestCase(TestCase):
+    """
+    Verify dcim.signals.update_connected_endpoints, retrace_cable_paths, and
+    nullify_connected_endpoints maintain CablePaths in response to Cable lifecycle events.
+    """
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.site = Site.objects.create(name='Site', slug='site')
+        manufacturer = Manufacturer.objects.create(name='Manufacturer', slug='manufacturer')
+        device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Device Type')
+        role = DeviceRole.objects.create(name='Device Role', slug='device-role')
+        cls.device = Device.objects.create(
+            name='Device',
+            site=cls.site,
+            device_type=device_type,
+            role=role,
+        )
+
+    def test_creating_cable_creates_endpoint_paths(self):
+        interface_a = Interface.objects.create(device=self.device, name='Interface A')
+        interface_b = Interface.objects.create(device=self.device, name='Interface B')
+        cable = Cable(a_terminations=[interface_a], b_terminations=[interface_b])
+        cable.save()
+
+        self.assertEqual(CablePath.objects.count(), 2)
+        interface_a.refresh_from_db()
+        self.assertIsNotNone(interface_a._path_id)
+
+    def test_changing_cable_status_marks_paths_inactive(self):
+        interface_a = Interface.objects.create(device=self.device, name='Interface A')
+        interface_b = Interface.objects.create(device=self.device, name='Interface B')
+        cable = Cable(a_terminations=[interface_a], b_terminations=[interface_b])
+        cable.save()
+        self.assertTrue(all(cp.is_active for cp in CablePath.objects.all()))
+
+        # Reload the cable so _orig_status reflects the persisted value and
+        # _terminations_modified resets to False.
+        cable = Cable.objects.get(pk=cable.pk)
+        cable.status = LinkStatusChoices.STATUS_PLANNED
+        cable.save()
+
+        self.assertFalse(any(cp.is_active for cp in CablePath.objects.all()))
+
+    def test_reconnecting_cable_marks_paths_active(self):
+        interface_a = Interface.objects.create(device=self.device, name='Interface A')
+        interface_b = Interface.objects.create(device=self.device, name='Interface B')
+        cable = Cable(
+            a_terminations=[interface_a],
+            b_terminations=[interface_b],
+            status=LinkStatusChoices.STATUS_PLANNED,
+        )
+        cable.save()
+        self.assertFalse(any(cp.is_active for cp in CablePath.objects.all()))
+
+        cable = Cable.objects.get(pk=cable.pk)
+        cable.status = LinkStatusChoices.STATUS_CONNECTED
+        cable.save()
+
+        self.assertTrue(all(cp.is_active for cp in CablePath.objects.all()))
+
+    def test_deleting_cable_retraces_paths(self):
+        interface_a = Interface.objects.create(device=self.device, name='Interface A')
+        interface_b = Interface.objects.create(device=self.device, name='Interface B')
+        cable = Cable(a_terminations=[interface_a], b_terminations=[interface_b])
+        cable.save()
+        self.assertEqual(CablePath.objects.count(), 2)
+
+        cable.delete()
+        self.assertEqual(CablePath.objects.count(), 0)
+        interface_a.refresh_from_db()
+        interface_b.refresh_from_db()
+        # Cable deletion must fully detach both endpoints, even though the
+        # nullify_connected_endpoints signal short-circuits during Cable cascade.
+        self.assertIsNone(interface_a._path_id)
+        self.assertIsNone(interface_b._path_id)
+        self.assertIsNone(interface_a.cable_id)
+        self.assertIsNone(interface_b.cable_id)
+        self.assertEqual(interface_a.cable_end, '')
+        self.assertEqual(interface_b.cable_end, '')
+
+    def test_deleting_cable_skips_per_termination_retrace(self):
+        """
+        When a Cable is deleted, nullify_connected_endpoints (post_delete on each
+        cascaded CableTermination) must skip retracing — retrace_cable_paths
+        retraces each affected path once on Cable post_delete instead. See #22104.
+
+        Without the short-circuit, retrace would fire (n_terminations * n_paths)
+        times from the per-termination handler plus n_paths times from the Cable
+        handler — for this 2-termination, 2-path cable, 6 calls total. With the
+        short-circuit, only the n_paths calls from retrace_cable_paths remain.
+        """
+        interface_a = Interface.objects.create(device=self.device, name='Interface A')
+        interface_b = Interface.objects.create(device=self.device, name='Interface B')
+        cable = Cable(a_terminations=[interface_a], b_terminations=[interface_b])
+        cable.save()
+        self.assertEqual(CablePath.objects.count(), 2)
+        self.assertFalse(Cable._is_being_deleted(cable.pk))
+
+        with patch('dcim.models.cables.CablePath.retrace') as retrace:
+            cable.delete()
+
+        # Exactly one retrace per affected CablePath (from retrace_cable_paths),
+        # not the n*m calls the per-termination handler would have made.
+        self.assertEqual(retrace.call_count, 2)
+        # The deletion-tracking set must be cleaned up after delete() returns,
+        # even when the cascade runs to completion.
+        self.assertFalse(Cable._is_being_deleted(cable.pk))
+
+    def test_creating_portmapping_retraces_dependent_paths(self):
+        interface = Interface.objects.create(device=self.device, name='Interface A')
+        front_port = FrontPort.objects.create(device=self.device, name='Front Port 1')
+        rear_port = RearPort.objects.create(device=self.device, name='Rear Port 1')
+        Cable(a_terminations=[interface], b_terminations=[front_port]).save()
+
+        # Creating a PortMapping connecting the front and rear ports should retrace paths
+        # that traverse either port (i.e. the incomplete path through front_port).
+        PortMapping.objects.create(
+            device=self.device,
+            front_port=front_port,
+            front_port_position=1,
+            rear_port=rear_port,
+            rear_port_position=1,
+        )
+
+        path = CablePath.objects.filter(_nodes__contains=front_port).first()
+        self.assertIsNotNone(path)
+        # The retraced path should now extend through to the rear port. Path nodes are
+        # encoded as "<content_type_id>:<object_id>".
+        rear_port_node = f'{ContentType.objects.get_for_model(RearPort).pk}:{rear_port.pk}'
+        flat_nodes = [n for step in path.path for n in step]
+        self.assertIn(rear_port_node, flat_nodes)
+
+    def test_deleting_cabletermination_nullifies_endpoints(self):
+        interface_a = Interface.objects.create(device=self.device, name='Interface A')
+        interface_b = Interface.objects.create(device=self.device, name='Interface B')
+        cable = Cable(a_terminations=[interface_a], b_terminations=[interface_b])
+        cable.save()
+        termination = cable.terminations.get(cable_end=CableEndChoices.SIDE_A)
+
+        termination.delete()
+        interface_a.refresh_from_db()
+        self.assertIsNone(interface_a.cable_id)
+        self.assertEqual(interface_a.cable_end, '')
+
+
+class MACAddressInterfaceSignalTestCase(TestCase):
+    """
+    Verify dcim.signals.update_mac_address_interface assigns a designated primary MAC to
+    the newly-created Interface or VMInterface.
+    """
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.site = Site.objects.create(name='Site', slug='site')
+        manufacturer = Manufacturer.objects.create(name='Manufacturer', slug='manufacturer')
+        device_type = DeviceType.objects.create(manufacturer=manufacturer, model='Device Type')
+        role = DeviceRole.objects.create(name='Device Role', slug='device-role')
+        cls.device = Device.objects.create(
+            name='Device',
+            site=cls.site,
+            device_type=device_type,
+            role=role,
+        )
+
+    def test_primary_mac_is_assigned_to_new_interface(self):
+        mac = MACAddress.objects.create(mac_address='00:11:22:33:44:55')
+        interface = Interface(device=self.device, name='Interface 1', primary_mac_address=mac)
+        interface.save()
+
+        mac.refresh_from_db()
+        self.assertEqual(mac.assigned_object, interface)
+
+    def test_primary_mac_is_not_reassigned_on_interface_update(self):
+        mac = MACAddress.objects.create(mac_address='00:11:22:33:44:55')
+        interface = Interface.objects.create(device=self.device, name='Interface 1')
+        mac.assigned_object = interface
+        mac.save()
+        # Detach (simulate the MAC having been moved off the interface).
+        mac.assigned_object = None
+        mac.save()
+
+        interface.primary_mac_address = mac
+        interface.description = 'updated'
+        interface.save()
+
+        mac.refresh_from_db()
+        # Updating an existing interface should not re-assign the MAC.
+        self.assertIsNone(mac.assigned_object)
+
+
+class SyncCachedScopeFieldsSignalTestCase(TestCase):
+    """
+    Verify dcim.signals.sync_cached_scope_fields recomputes cached scope fields on
+    Prefix, Cluster, and WirelessLAN when a Site or Location is modified.
+    """
+
+    def test_site_group_change_updates_prefix_cached_scope(self):
+        group_a = SiteGroup.objects.create(name='Group A', slug='group-a')
+        group_b = SiteGroup.objects.create(name='Group B', slug='group-b')
+        site = Site.objects.create(name='Site', slug='site', group=group_a)
+        prefix = Prefix.objects.create(
+            prefix='10.0.0.0/24',
+            scope_type=ContentType.objects.get_for_model(Site),
+            scope_id=site.pk,
+        )
+        self.assertEqual(prefix._site_group, group_a)
+
+        site.group = group_b
+        site.save()
+
+        prefix.refresh_from_db()
+        self.assertEqual(prefix._site, site)
+        self.assertEqual(prefix._site_group, group_b)
+
+    def test_location_site_change_updates_prefix_cached_scope(self):
+        site_a = Site.objects.create(name='Site A', slug='site-a')
+        site_b = Site.objects.create(name='Site B', slug='site-b')
+        location = Location.objects.create(name='Loc', slug='loc', site=site_a)
+        prefix = Prefix.objects.create(
+            prefix='10.0.0.0/24',
+            scope_type=ContentType.objects.get_for_model(Location),
+            scope_id=location.pk,
+        )
+        self.assertEqual(prefix._site, site_a)
+        self.assertEqual(prefix._location, location)
+
+        location.site = site_b
+        location.save()
+
+        prefix.refresh_from_db()
+        self.assertEqual(prefix._location, location)
+        self.assertEqual(prefix._site, site_b)
+
+    def test_signal_updates_cluster_and_wirelesslan_cached_scope(self):
+        # Lock down the explicit (Prefix, Cluster, WirelessLAN) tuple in the
+        # signal by exercising Cluster and WirelessLAN alongside Prefix. If a
+        # future change drops Cluster or WirelessLAN from that tuple, this test
+        # will catch it.
+        group_a = SiteGroup.objects.create(name='Group A', slug='group-a')
+        group_b = SiteGroup.objects.create(name='Group B', slug='group-b')
+        site = Site.objects.create(name='Site', slug='site', group=group_a)
+        cluster_type = ClusterType.objects.create(name='CT', slug='ct')
+        cluster = Cluster.objects.create(name='Cluster', type=cluster_type, scope=site)
+        wireless_lan = WirelessLAN.objects.create(ssid='LAN', scope=site)
+
+        self.assertEqual(cluster._site_group, group_a)
+        self.assertEqual(wireless_lan._site_group, group_a)
+
+        site.group = group_b
+        site.save()
+
+        cluster.refresh_from_db()
+        wireless_lan.refresh_from_db()
+        self.assertEqual(cluster._site_group, group_b)
+        self.assertEqual(wireless_lan._site_group, group_b)
+
+    def test_create_site_does_not_attempt_to_resync(self):
+        # Should not raise — newly-created sites have nothing to sync.
+        Site.objects.create(name='New Site', slug='new-site')
+
+
+class CableSignalDirectHandlerTestCase(SimpleTestCase):
+    """
+    Direct-call tests for dcim signal branches that are not reachable through normal
+    model operations (raw=True is set only by Django's loaddata pathway).
+    """
+
+    def test_update_connected_endpoints_raw_import_is_a_no_op(self):
+        cable = SimpleNamespace(_terminations_modified=True)
+        logger = MagicMock()
+
+        with (
+            patch.object(signals.logging, 'getLogger', return_value=logger),
+            patch.object(signals, 'CableTermination') as cabletermination_model,
+            patch.object(signals, 'create_cablepaths') as create_cablepaths,
+            patch.object(signals, 'rebuild_paths') as rebuild_paths,
+        ):
+            signals.update_connected_endpoints(instance=cable, created=True, raw=True)
+
+        logger.debug.assert_called_once()
+        cabletermination_model.objects.filter.assert_not_called()
+        create_cablepaths.assert_not_called()
+        rebuild_paths.assert_not_called()
+
+    def test_update_mac_address_interface_raw_import_is_a_no_op(self):
+        primary_mac = SimpleNamespace(save=MagicMock())
+        interface = SimpleNamespace(primary_mac_address=primary_mac)
+
+        signals.update_mac_address_interface(instance=interface, created=True, raw=True)
+
+        primary_mac.save.assert_not_called()

+ 297 - 0
netbox/extras/tests/test_signals.py

@@ -0,0 +1,297 @@
+import uuid
+from unittest.mock import patch
+
+from django.contrib.contenttypes.models import ContentType
+from django.core.exceptions import ValidationError
+from django.test import RequestFactory, TestCase, override_settings
+
+from core.events import JOB_COMPLETED, JOB_STARTED, OBJECT_DELETED, OBJECT_UPDATED
+from core.models import Job, ObjectType
+from core.signals import job_end, job_start
+from dcim.choices import SiteStatusChoices
+from dcim.models import Region, Site
+from extras import signals
+from extras.choices import CustomFieldTypeChoices, EventRuleActionChoices
+from extras.models import CustomField, EventRule, Notification, Subscription, Tag, Webhook
+from extras.validators import CustomValidator
+from netbox.context_managers import event_tracking
+from users.models import User
+from utilities.exceptions import AbortRequest
+
+
+def _build_request(user=None):
+    request = RequestFactory().get('/')
+    request.id = uuid.uuid4()
+    request.user = user
+    return request
+
+
+class CustomFieldRenameSignalTestCase(TestCase):
+    """
+    Verify extras.signals.handle_cf_renamed migrates stored custom-field data when a
+    CustomField is renamed.
+    """
+
+    def test_renaming_custom_field_moves_object_data(self):
+        site_type = ContentType.objects.get_for_model(Site)
+        cf = CustomField.objects.create(
+            name='asset_tag',
+            type=CustomFieldTypeChoices.TYPE_TEXT,
+        )
+        cf.object_types.set([site_type])
+        site = Site.objects.create(name='Site 1', slug='site-1', custom_field_data={'asset_tag': 'A123'})
+
+        cf.name = 'inventory_id'
+        cf.save()
+
+        site.refresh_from_db()
+        self.assertNotIn('asset_tag', site.custom_field_data)
+        self.assertEqual(site.custom_field_data['inventory_id'], 'A123')
+
+    def test_creating_custom_field_does_not_attempt_rename(self):
+        # Should not raise — newly-created custom fields have no prior name to migrate.
+        site_type = ContentType.objects.get_for_model(Site)
+        cf = CustomField.objects.create(
+            name='cf1',
+            type=CustomFieldTypeChoices.TYPE_TEXT,
+        )
+        cf.object_types.set([site_type])
+
+
+class CustomFieldDeletedSignalTestCase(TestCase):
+    """
+    Verify extras.signals.handle_cf_deleted strips stale data from associated objects
+    when a CustomField is deleted.
+    """
+
+    def test_deleting_custom_field_clears_object_data(self):
+        site_type = ContentType.objects.get_for_model(Site)
+        cf = CustomField.objects.create(
+            name='asset_tag',
+            type=CustomFieldTypeChoices.TYPE_TEXT,
+        )
+        cf.object_types.set([site_type])
+        site = Site.objects.create(name='Site 1', slug='site-1', custom_field_data={'asset_tag': 'A123'})
+
+        cf.delete()
+
+        site.refresh_from_db()
+        self.assertNotIn('asset_tag', site.custom_field_data)
+
+
+class CustomFieldObjectTypeSignalTestCase(TestCase):
+    """
+    Verify extras.signals.handle_cf_added_obj_types and handle_cf_removed_obj_types
+    populate or strip default values when a CustomField's object_types m2m changes.
+    """
+
+    def test_adding_object_type_populates_default_value(self):
+        site_type = ContentType.objects.get_for_model(Site)
+        Site.objects.create(name='Site 1', slug='site-1')
+        cf = CustomField.objects.create(
+            name='asset_tag',
+            type=CustomFieldTypeChoices.TYPE_TEXT,
+            default='UNTAGGED',
+        )
+
+        cf.object_types.set([site_type])
+
+        site = Site.objects.get(slug='site-1')
+        self.assertEqual(site.custom_field_data.get('asset_tag'), 'UNTAGGED')
+
+    def test_removing_object_type_clears_stored_data(self):
+        site_type = ContentType.objects.get_for_model(Site)
+        cf = CustomField.objects.create(
+            name='asset_tag',
+            type=CustomFieldTypeChoices.TYPE_TEXT,
+        )
+        cf.object_types.set([site_type])
+        site = Site.objects.create(name='Site 1', slug='site-1', custom_field_data={'asset_tag': 'A123'})
+
+        cf.object_types.remove(site_type)
+
+        site.refresh_from_db()
+        self.assertNotIn('asset_tag', site.custom_field_data)
+
+
+class RunSaveValidatorsSignalTestCase(TestCase):
+    """
+    Verify extras.signals.run_save_validators invokes any configured CUSTOM_VALIDATORS
+    when a model emits the post_clean signal.
+    """
+
+    @override_settings(CUSTOM_VALIDATORS={'dcim.site': [CustomValidator({'name': {'eq': 'allowed'}})]})
+    def test_validation_failure_raises_validation_error(self):
+        with self.assertRaises(ValidationError):
+            Site(name='blocked', slug='blocked', status=SiteStatusChoices.STATUS_ACTIVE).clean()
+
+    @override_settings(CUSTOM_VALIDATORS={'dcim.site': [CustomValidator({'name': {'eq': 'allowed'}})]})
+    def test_validation_success_does_not_raise(self):
+        Site(name='allowed', slug='allowed', status=SiteStatusChoices.STATUS_ACTIVE).clean()
+
+
+class ValidateAssignedTagsSignalTestCase(TestCase):
+    """
+    Verify extras.signals.validate_assigned_tags rejects Tags that are restricted to
+    object types incompatible with the target object.
+    """
+
+    def test_restricted_tag_blocks_incompatible_object(self):
+        region_type = ContentType.objects.get_for_model(Region)
+        tag = Tag.objects.create(name='RegionOnly', slug='regiononly')
+        tag.object_types.set([region_type])
+        site = Site.objects.create(name='Site 1', slug='site-1')
+
+        with self.assertRaises(AbortRequest):
+            site.tags.add(tag)
+
+    def test_restricted_tag_allows_compatible_object(self):
+        site_type = ContentType.objects.get_for_model(Site)
+        tag = Tag.objects.create(name='SiteOnly', slug='siteonly')
+        tag.object_types.set([site_type])
+        site = Site.objects.create(name='Site 1', slug='site-1')
+
+        site.tags.add(tag)
+        self.assertEqual(list(site.tags.all()), [tag])
+
+    def test_unrestricted_tag_is_always_permitted(self):
+        tag = Tag.objects.create(name='Anywhere', slug='anywhere')
+        site = Site.objects.create(name='Site 1', slug='site-1')
+
+        site.tags.add(tag)
+        self.assertEqual(list(site.tags.all()), [tag])
+
+
+class JobEventRulesSignalTestCase(TestCase):
+    """
+    Verify extras.signals.process_job_start_event_rules and process_job_end_event_rules
+    invoke any EventRule registered for JOB_STARTED / JOB_COMPLETED on the sender's
+    object_type.
+    """
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.site_type = ObjectType.objects.get_for_model(Site)
+        webhook = Webhook.objects.create(
+            name='Webhook',
+            payload_url='http://localhost/',
+            secret='secret',
+        )
+        webhook_type = ObjectType.objects.get_for_model(Webhook)
+        cls.start_rule = EventRule.objects.create(
+            name='Job Start Rule',
+            event_types=[JOB_STARTED],
+            action_type=EventRuleActionChoices.WEBHOOK,
+            action_object_type=webhook_type,
+            action_object_id=webhook.pk,
+        )
+        cls.start_rule.object_types.set([cls.site_type])
+        cls.end_rule = EventRule.objects.create(
+            name='Job End Rule',
+            event_types=[JOB_COMPLETED],
+            action_type=EventRuleActionChoices.WEBHOOK,
+            action_object_type=webhook_type,
+            action_object_id=webhook.pk,
+        )
+        cls.end_rule.object_types.set([cls.site_type])
+
+    def _create_job(self):
+        return Job.objects.create(
+            object_type=self.site_type,
+            name='test-job',
+            job_id=uuid.uuid4(),
+            data={'foo': 1},
+        )
+
+    def test_job_start_filters_to_matching_event_rules(self):
+        sender = self._create_job()
+
+        with patch('extras.signals.process_event_rules') as process_event_rules:
+            job_start.send(sender=sender)
+
+        process_event_rules.assert_called_once()
+        rules_qs = process_event_rules.call_args.args[0]
+        self.assertEqual(list(rules_qs.values_list('pk', flat=True)), [self.start_rule.pk])
+
+    def test_job_end_filters_to_matching_event_rules(self):
+        sender = self._create_job()
+
+        with patch('extras.signals.process_event_rules') as process_event_rules:
+            job_end.send(sender=sender)
+
+        process_event_rules.assert_called_once()
+        rules_qs = process_event_rules.call_args.args[0]
+        self.assertEqual(list(rules_qs.values_list('pk', flat=True)), [self.end_rule.pk])
+
+
+class NotifyObjectChangedSignalTestCase(TestCase):
+    """
+    Verify extras.signals.notify_object_changed creates Notifications for subscribed
+    users on object update and deletion, and skips creation otherwise.
+    """
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.user = User.objects.create_user(username='alice', password='pw')
+
+    def test_creating_object_does_not_create_notifications(self):
+        # Subscribe a user to the object BEFORE invoking the handler so the
+        # discriminating branch (created=True early return) is genuinely
+        # exercised. Without the subscription this assertion passes trivially.
+        site = Site.objects.create(name='Site 1', slug='site-1')
+        site_type = ContentType.objects.get_for_model(Site)
+        Subscription.objects.create(user=self.user, object_type=site_type, object_id=site.pk)
+        Notification.objects.all().delete()
+
+        signals.notify_object_changed(sender=Site, instance=site, created=True)
+
+        self.assertFalse(Notification.objects.exists())
+
+    def test_updating_subscribed_object_creates_update_notification(self):
+        site = Site.objects.create(name='Site 1', slug='site-1')
+        site_type = ContentType.objects.get_for_model(Site)
+        Subscription.objects.create(user=self.user, object_type=site_type, object_id=site.pk)
+
+        site.description = 'updated'
+        site.save()
+
+        notification = Notification.objects.get(user=self.user)
+        self.assertEqual(notification.object_type, site_type)
+        self.assertEqual(notification.object_id, site.pk)
+        self.assertEqual(notification.event_type, OBJECT_UPDATED)
+
+    def test_deleting_subscribed_object_creates_delete_notification(self):
+        site = Site.objects.create(name='Site 1', slug='site-1')
+        site_type = ContentType.objects.get_for_model(Site)
+        Subscription.objects.create(user=self.user, object_type=site_type, object_id=site.pk)
+
+        site.delete()
+
+        notification = Notification.objects.get(user=self.user)
+        self.assertEqual(notification.event_type, OBJECT_DELETED)
+
+    def test_updating_unsubscribed_object_creates_no_notification(self):
+        site = Site.objects.create(name='Site 1', slug='site-1')
+
+        site.description = 'updated'
+        site.save()
+
+        self.assertEqual(Notification.objects.count(), 0)
+
+    def test_updating_object_replaces_existing_notification(self):
+        site = Site.objects.create(name='Site 1', slug='site-1')
+        site_type = ContentType.objects.get_for_model(Site)
+        Subscription.objects.create(user=self.user, object_type=site_type, object_id=site.pk)
+
+        # Trigger two updates within a single request; only one Notification should remain.
+        request = _build_request(user=self.user)
+        with event_tracking(request):
+            site.description = 'first'
+            site.save()
+            site.description = 'second'
+            site.save()
+
+        self.assertEqual(
+            Notification.objects.filter(user=self.user, object_type=site_type, object_id=site.pk).count(),
+            1,
+        )

+ 231 - 0
netbox/ipam/tests/test_signals.py

@@ -0,0 +1,231 @@
+import uuid
+
+from django.contrib.contenttypes.models import ContentType
+from django.test import RequestFactory, TestCase
+
+from core.choices import ObjectChangeActionChoices
+from core.models import ObjectChange
+from ipam.models import IPAddress, Prefix
+from netbox.context_managers import event_tracking
+from users.models import User
+from utilities.testing.utils import create_test_device, create_test_virtualmachine
+
+
+def _build_request(user):
+    request = RequestFactory().get('/')
+    request.id = uuid.uuid4()
+    request.user = user
+    return request
+
+
+class PrefixHierarchySignalTestCase(TestCase):
+    """
+    Verify ipam.signals.handle_prefix_saved / handle_prefix_deleted keep the cached
+    `_children` and `_depth` counters up to date as prefixes are added, modified, and
+    removed.
+    """
+
+    def _refresh_counters(self, *prefixes):
+        for prefix in prefixes:
+            prefix.refresh_from_db()
+        return prefixes
+
+    def test_creating_prefix_initializes_hierarchy_counters(self):
+        parent = Prefix.objects.create(prefix='10.0.0.0/16')
+        child = Prefix.objects.create(prefix='10.0.1.0/24')
+
+        self._refresh_counters(parent, child)
+        self.assertEqual(parent._children, 1)
+        self.assertEqual(child._depth, 1)
+        self.assertEqual(child._children, 0)
+
+    def test_modifying_prefix_recomputes_old_and_new_position(self):
+        parent_a = Prefix.objects.create(prefix='10.0.0.0/16')
+        parent_b = Prefix.objects.create(prefix='192.168.0.0/16')
+        child = Prefix.objects.create(prefix='10.0.1.0/24')
+
+        self._refresh_counters(parent_a, parent_b, child)
+        self.assertEqual(parent_a._children, 1)
+        self.assertEqual(parent_b._children, 0)
+
+        # Move the child under parent_b.
+        child.prefix = '192.168.1.0/24'
+        child.save()
+
+        self._refresh_counters(parent_a, parent_b, child)
+        self.assertEqual(parent_a._children, 0)
+        self.assertEqual(parent_b._children, 1)
+        self.assertEqual(child._depth, 1)
+
+    def test_unchanged_save_does_not_disturb_counters(self):
+        parent = Prefix.objects.create(prefix='10.0.0.0/16')
+        child = Prefix.objects.create(prefix='10.0.1.0/24')
+
+        self._refresh_counters(parent, child)
+        original_children = parent._children
+        original_depth = child._depth
+
+        # Save with no field changes.
+        parent.description = ''
+        parent.save()
+
+        self._refresh_counters(parent, child)
+        self.assertEqual(parent._children, original_children)
+        self.assertEqual(child._depth, original_depth)
+
+    def test_deleting_prefix_recomputes_neighbor_counters(self):
+        parent = Prefix.objects.create(prefix='10.0.0.0/16')
+        child = Prefix.objects.create(prefix='10.0.1.0/24')
+
+        self._refresh_counters(parent)
+        self.assertEqual(parent._children, 1)
+
+        child.delete()
+
+        self._refresh_counters(parent)
+        self.assertEqual(parent._children, 0)
+
+
+class ClearPrimaryIPSignalTestCase(TestCase):
+    """
+    Verify ipam.signals.clear_primary_ip detaches deleted IPAddresses from the Device or
+    VirtualMachine they were assigned as primary. The behavior under test is the
+    signal-driven snapshot+save (and resulting change-log entry), not the FK's
+    on_delete=SET_NULL fallback.
+    """
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.user = User.objects.create_user(username='alice', password='pw')
+
+    def test_device_primary_ip4_delete_records_device_update(self):
+        device = create_test_device('Device 1')
+        ip = IPAddress.objects.create(address='192.0.2.1/24')
+        device.primary_ip4 = ip
+        device.save()
+
+        request = _build_request(self.user)
+        with event_tracking(request):
+            ip.delete()
+
+        device.refresh_from_db()
+        self.assertIsNone(device.primary_ip4)
+
+        oc = ObjectChange.objects.get(
+            changed_object_type=ContentType.objects.get_for_model(device),
+            changed_object_id=device.pk,
+            action=ObjectChangeActionChoices.ACTION_UPDATE,
+        )
+        self.assertIsNone(oc.postchange_data['primary_ip4'])
+
+    def test_device_primary_ip6_delete_clears_field_and_saves(self):
+        device = create_test_device('Device 1')
+        ip = IPAddress.objects.create(address='2001:db8::1/64')
+        device.primary_ip6 = ip
+        device.save()
+
+        request = _build_request(self.user)
+        with event_tracking(request):
+            ip.delete()
+
+        device.refresh_from_db()
+        self.assertIsNone(device.primary_ip6)
+        oc = ObjectChange.objects.get(
+            changed_object_type=ContentType.objects.get_for_model(device),
+            changed_object_id=device.pk,
+            action=ObjectChangeActionChoices.ACTION_UPDATE,
+        )
+        self.assertIsNone(oc.postchange_data['primary_ip6'])
+
+    def test_vm_primary_ip4_delete_records_vm_update(self):
+        vm = create_test_virtualmachine('VM 1')
+        ip = IPAddress.objects.create(address='192.0.2.10/24')
+        vm.primary_ip4 = ip
+        vm.save()
+
+        request = _build_request(self.user)
+        with event_tracking(request):
+            ip.delete()
+
+        vm.refresh_from_db()
+        self.assertIsNone(vm.primary_ip4)
+        oc = ObjectChange.objects.get(
+            changed_object_type=ContentType.objects.get_for_model(vm),
+            changed_object_id=vm.pk,
+            action=ObjectChangeActionChoices.ACTION_UPDATE,
+        )
+        self.assertIsNone(oc.postchange_data['primary_ip4'])
+
+    def test_unrelated_ip_delete_records_no_device_change(self):
+        device = create_test_device('Device 1')
+        device_type = ContentType.objects.get_for_model(device)
+        assigned = IPAddress.objects.create(address='192.0.2.1/24')
+        unrelated = IPAddress.objects.create(address='192.0.2.2/24')
+        device.primary_ip4 = assigned
+        device.save()
+
+        request = _build_request(self.user)
+        with event_tracking(request):
+            unrelated.delete()
+
+        device.refresh_from_db()
+        self.assertEqual(device.primary_ip4, assigned)
+        self.assertFalse(
+            ObjectChange.objects.filter(
+                changed_object_type=device_type,
+                changed_object_id=device.pk,
+                action=ObjectChangeActionChoices.ACTION_UPDATE,
+            ).exists()
+        )
+
+
+class ClearOOBIPSignalTestCase(TestCase):
+    """
+    Verify ipam.signals.clear_oob_ip detaches a deleted IPAddress from any Device on
+    which it was set as the OOB IP, and records a Device update change-log entry.
+    """
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.user = User.objects.create_user(username='alice', password='pw')
+
+    def test_device_oob_ip_delete_records_device_update(self):
+        device = create_test_device('Device 1')
+        ip = IPAddress.objects.create(address='192.0.2.1/24')
+        device.oob_ip = ip
+        device.save()
+
+        request = _build_request(self.user)
+        with event_tracking(request):
+            ip.delete()
+
+        device.refresh_from_db()
+        self.assertIsNone(device.oob_ip)
+        oc = ObjectChange.objects.get(
+            changed_object_type=ContentType.objects.get_for_model(device),
+            changed_object_id=device.pk,
+            action=ObjectChangeActionChoices.ACTION_UPDATE,
+        )
+        self.assertIsNone(oc.postchange_data['oob_ip'])
+
+    def test_unrelated_ip_delete_records_no_device_change(self):
+        device = create_test_device('Device 1')
+        device_type = ContentType.objects.get_for_model(device)
+        oob = IPAddress.objects.create(address='192.0.2.1/24')
+        unrelated = IPAddress.objects.create(address='192.0.2.2/24')
+        device.oob_ip = oob
+        device.save()
+
+        request = _build_request(self.user)
+        with event_tracking(request):
+            unrelated.delete()
+
+        device.refresh_from_db()
+        self.assertEqual(device.oob_ip, oob)
+        self.assertFalse(
+            ObjectChange.objects.filter(
+                changed_object_type=device_type,
+                changed_object_id=device.pk,
+                action=ObjectChangeActionChoices.ACTION_UPDATE,
+            ).exists()
+        )

+ 128 - 0
netbox/users/tests/test_signals.py

@@ -0,0 +1,128 @@
+from django.contrib.auth.signals import user_logged_in, user_login_failed
+from django.db.models.signals import post_save
+from django.test import RequestFactory, TestCase, override_settings
+
+from users.models import User, UserConfig
+from users.signals import create_userconfig
+
+
+class LogUserLoginFailedSignalTestCase(TestCase):
+    """
+    Verify users.signals.log_user_login_failed emits the expected log records when the
+    user_login_failed signal fires.
+    """
+
+    def setUp(self):
+        self.factory = RequestFactory()
+
+    def test_log_includes_client_ip_when_available(self):
+        request = self.factory.post('/login/', REMOTE_ADDR='192.0.2.100')
+
+        with self.assertLogs('netbox.auth.login', level='INFO') as cm:
+            user_login_failed.send(
+                sender=self.__class__,
+                credentials={'username': 'alice'},
+                request=request,
+            )
+
+        self.assertEqual(len(cm.records), 1)
+        self.assertEqual(cm.records[0].levelname, 'INFO')
+        self.assertIn('alice', cm.records[0].getMessage())
+        self.assertIn('192.0.2.100', cm.records[0].getMessage())
+
+    def test_log_warns_when_client_ip_missing(self):
+        request = self.factory.post('/login/')
+        # RequestFactory sets REMOTE_ADDR by default; strip it to simulate missing IP.
+        request.META.pop('REMOTE_ADDR', None)
+
+        with self.assertLogs('netbox.auth.login', level='INFO') as cm:
+            user_login_failed.send(
+                sender=self.__class__,
+                credentials={'username': 'alice'},
+                request=request,
+            )
+
+        levels = [record.levelname for record in cm.records]
+        self.assertIn('WARNING', levels)
+        self.assertIn('INFO', levels)
+        info_message = next(r.getMessage() for r in cm.records if r.levelname == 'INFO')
+        self.assertEqual(info_message, 'Failed login attempt for username: alice')
+
+
+class SetLanguageOnLoginSignalTestCase(TestCase):
+    """
+    Verify users.signals.set_language_on_login stores the user's preferred language on the
+    request when the user logs in.
+    """
+
+    def setUp(self):
+        self.factory = RequestFactory()
+        self.user = User.objects.create_user(username='alice', password='pw')
+
+    def test_language_cookie_is_set_from_user_config(self):
+        # Assign a fresh dict to avoid mutating the shared DEFAULT_USER_PREFERENCES reference.
+        self.user.config.data = {'locale': {'language': 'de'}}
+        self.user.config.save()
+        request = self.factory.post('/login/')
+
+        user_logged_in.send(sender=self.__class__, user=self.user, request=request)
+
+        self.assertEqual(request._language_cookie, 'de')
+
+    def test_missing_language_preference_leaves_request_untouched(self):
+        self.user.config.data = {}
+        self.user.config.save()
+        request = self.factory.post('/login/')
+
+        user_logged_in.send(sender=self.__class__, user=self.user, request=request)
+
+        self.assertFalse(hasattr(request, '_language_cookie'))
+
+    def test_user_without_config_leaves_request_untouched(self):
+        request = self.factory.post('/login/')
+        # Drop the UserConfig that was auto-created so `hasattr(user, 'config')` returns False.
+        self.user.config.delete()
+        # Reload the user instance from the DB so the cached 'config' related attribute is gone.
+        user = User.objects.get(pk=self.user.pk)
+
+        user_logged_in.send(sender=self.__class__, user=user, request=request)
+
+        self.assertFalse(hasattr(request, '_language_cookie'))
+
+
+class CreateUserConfigSignalTestCase(TestCase):
+    """
+    Verify users.signals.create_userconfig creates a default UserConfig for new users only.
+    """
+
+    @override_settings(DEFAULT_USER_PREFERENCES={'pagination.per_page': 42})
+    def test_userconfig_is_created_with_default_preferences(self):
+        user = User.objects.create_user(username='alice', password='pw')
+
+        config = UserConfig.objects.get(user=user)
+        self.assertEqual(config.data, {'pagination.per_page': 42})
+
+    def test_userconfig_is_not_created_for_existing_user(self):
+        user = User.objects.create_user(username='alice', password='pw')
+        UserConfig.objects.filter(user=user).delete()
+
+        user.email = 'alice@example.com'
+        user.save()
+
+        self.assertFalse(UserConfig.objects.filter(user=user).exists())
+
+    def test_userconfig_is_not_created_for_raw_imports(self):
+        """
+        When loading a fixture (`raw=True`), the signal must skip UserConfig creation.
+        """
+        user = User(username='bob')
+        # Simulate the post_save signal Django emits during loaddata with raw=True.
+        post_save.disconnect(create_userconfig, sender=User)
+        try:
+            user.save()
+            UserConfig.objects.filter(user=user).delete()
+            create_userconfig(instance=user, created=True, raw=True)
+        finally:
+            post_save.connect(create_userconfig, sender=User)
+
+        self.assertFalse(UserConfig.objects.filter(user=user).exists())

+ 85 - 0
netbox/virtualization/tests/test_signals.py

@@ -0,0 +1,85 @@
+from django.contrib.contenttypes.models import ContentType
+from django.test import TestCase
+
+from dcim.models import Site
+from virtualization.models import Cluster, ClusterType, VirtualDisk, VirtualMachine
+
+
+class UpdateVirtualMachineDiskSignalTestCase(TestCase):
+    """
+    Verify virtualization.signals.update_virtualmachine_disk keeps VirtualMachine.disk in sync
+    with the aggregate size of its VirtualDisks.
+    """
+
+    @classmethod
+    def setUpTestData(cls):
+        cluster_type = ClusterType.objects.create(name='Cluster Type', slug='cluster-type')
+        cls.cluster = Cluster.objects.create(name='Cluster', type=cluster_type)
+
+    def test_disk_size_is_aggregated_on_save(self):
+        vm = VirtualMachine.objects.create(name='VM 1', cluster=self.cluster)
+        VirtualDisk.objects.create(virtual_machine=vm, name='disk0', size=50)
+        VirtualDisk.objects.create(virtual_machine=vm, name='disk1', size=75)
+
+        vm.refresh_from_db()
+        self.assertEqual(vm.disk, 125)
+
+    def test_disk_size_is_recalculated_on_delete(self):
+        vm = VirtualMachine.objects.create(name='VM 1', cluster=self.cluster)
+        disk = VirtualDisk.objects.create(virtual_machine=vm, name='disk0', size=50)
+        VirtualDisk.objects.create(virtual_machine=vm, name='disk1', size=75)
+        vm.refresh_from_db()
+        self.assertEqual(vm.disk, 125)
+
+        disk.delete()
+        vm.refresh_from_db()
+        self.assertEqual(vm.disk, 75)
+
+    def test_disk_size_is_none_when_all_disks_removed(self):
+        vm = VirtualMachine.objects.create(name='VM 1', cluster=self.cluster)
+        disk = VirtualDisk.objects.create(virtual_machine=vm, name='disk0', size=50)
+
+        disk.delete()
+        vm.refresh_from_db()
+        self.assertIsNone(vm.disk)
+
+
+class UpdateVirtualMachineSiteSignalTestCase(TestCase):
+    """
+    Verify virtualization.signals.update_virtualmachine_site propagates a Cluster's cached
+    site to all of its VirtualMachines on save.
+    """
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.site_a = Site.objects.create(name='Site A', slug='site-a')
+        cls.site_b = Site.objects.create(name='Site B', slug='site-b')
+        cls.cluster_type = ClusterType.objects.create(name='Cluster Type', slug='cluster-type')
+
+    def test_cluster_site_change_propagates_to_vms(self):
+        cluster = Cluster.objects.create(name='Cluster', type=self.cluster_type, scope=self.site_a)
+        vm1 = VirtualMachine.objects.create(name='VM 1', cluster=cluster)
+        vm2 = VirtualMachine.objects.create(name='VM 2', cluster=cluster)
+        self.assertEqual(vm1.site, self.site_a)
+        self.assertEqual(vm2.site, self.site_a)
+
+        # Re-scope the cluster to site B; both VMs should follow.
+        cluster.scope_type = ContentType.objects.get_for_model(Site)
+        cluster.scope_id = self.site_b.pk
+        cluster.save()
+
+        vm1.refresh_from_db()
+        vm2.refresh_from_db()
+        self.assertEqual(vm1.site, self.site_b)
+        self.assertEqual(vm2.site, self.site_b)
+
+    def test_cluster_without_site_does_not_overwrite_vm_site(self):
+        cluster = Cluster.objects.create(name='Cluster', type=self.cluster_type)
+        vm = VirtualMachine.objects.create(name='VM 1', cluster=cluster, site=self.site_a)
+        self.assertEqual(vm.site, self.site_a)
+
+        cluster.description = 'updated'
+        cluster.save()
+
+        vm.refresh_from_db()
+        self.assertEqual(vm.site, self.site_a)

+ 97 - 0
netbox/wireless/tests/test_signals.py

@@ -0,0 +1,97 @@
+from types import SimpleNamespace
+from unittest.mock import MagicMock, patch
+
+from django.test import SimpleTestCase, TestCase
+
+from dcim.choices import InterfaceTypeChoices
+from dcim.models import CablePath, Interface
+from utilities.testing.utils import create_test_device
+from wireless import signals
+from wireless.choices import WirelessChannelChoices
+from wireless.models import WirelessLink
+
+
+class WirelessLinkSignalTestCase(TestCase):
+    """
+    Verify wireless.signals.update_connected_interfaces and nullify_connected_interfaces
+    keep the connected interfaces and CablePaths consistent with the WirelessLink lifecycle.
+    """
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.device = create_test_device('Device 1')
+        # Eight interfaces — one distinct pair per test method so no test sees stale
+        # in-memory state mutated by a previous test.
+        cls.interfaces = [
+            Interface.objects.create(
+                device=cls.device,
+                name=f'radio{i}',
+                type=InterfaceTypeChoices.TYPE_80211AC,
+                rf_channel=WirelessChannelChoices.CHANNEL_5G_32,
+                rf_channel_frequency=5160,
+                rf_channel_width=20,
+            )
+            for i in range(8)
+        ]
+
+    def test_creating_link_assigns_wireless_link_to_both_interfaces(self):
+        interface_a, interface_b = self.interfaces[0], self.interfaces[1]
+        link = WirelessLink(interface_a=interface_a, interface_b=interface_b, ssid='LINK1')
+        link.save()
+
+        interface_a.refresh_from_db()
+        interface_b.refresh_from_db()
+        self.assertEqual(interface_a.wireless_link, link)
+        self.assertEqual(interface_b.wireless_link, link)
+
+    def test_creating_link_creates_cablepaths(self):
+        interface_a, interface_b = self.interfaces[2], self.interfaces[3]
+        link = WirelessLink(interface_a=interface_a, interface_b=interface_b, ssid='LINK1')
+        link.save()
+
+        self.assertEqual(CablePath.objects.filter(_nodes__contains=link).count(), 2)
+
+    def test_saving_existing_link_does_not_create_extra_paths(self):
+        interface_a, interface_b = self.interfaces[4], self.interfaces[5]
+        link = WirelessLink(interface_a=interface_a, interface_b=interface_b, ssid='LINK1')
+        link.save()
+        path_count = CablePath.objects.count()
+
+        link.description = 'updated'
+        link.save()
+
+        self.assertEqual(CablePath.objects.count(), path_count)
+
+    def test_deleting_link_clears_interfaces_and_paths(self):
+        interface_a, interface_b = self.interfaces[6], self.interfaces[7]
+        link = WirelessLink(interface_a=interface_a, interface_b=interface_b, ssid='LINK1')
+        link.save()
+        self.assertEqual(CablePath.objects.filter(_nodes__contains=link).count(), 2)
+
+        link.delete()
+
+        interface_a.refresh_from_db()
+        interface_b.refresh_from_db()
+        self.assertIsNone(interface_a.wireless_link)
+        self.assertIsNone(interface_b.wireless_link)
+        # All wireless cable paths should be gone.
+        self.assertEqual(CablePath.objects.count(), 0)
+
+
+class UpdateConnectedInterfacesDirectHandlerTestCase(SimpleTestCase):
+    """
+    Direct-call tests for update_connected_interfaces branches not reachable through
+    normal model operations (raw=True is only set during Django's loaddata pathway).
+    """
+
+    def test_raw_import_skips_interface_assignment_and_path_creation(self):
+        interface_a = SimpleNamespace(wireless_link=None, save=MagicMock())
+        interface_b = SimpleNamespace(wireless_link=None, cable=None, save=MagicMock())
+        instance = SimpleNamespace(interface_a=interface_a, interface_b=interface_b)
+
+        with patch.object(signals, 'create_cablepaths') as create_cablepaths:
+            signals.update_connected_interfaces(instance=instance, created=True, raw=True)
+
+        interface_a.save.assert_not_called()
+        interface_b.save.assert_not_called()
+        create_cablepaths.assert_not_called()