Explorar o código

Add test for updated paths on cable status change

Jeremy Stretch %!s(int64=5) %!d(string=hai) anos
pai
achega
3b0a75edf8

+ 3 - 2
netbox/dcim/models/devices.py

@@ -980,6 +980,9 @@ class Cable(ChangeLoggedModel, CustomFieldModel):
         # A copy of the PK to be used by __str__ in case the object is deleted
         self._pk = self.pk
 
+        # Cache the original status so we can check later if it's been changed
+        self._orig_status = self.status
+
     @classmethod
     def from_db(cls, db, field_names, values):
         """
@@ -992,8 +995,6 @@ class Cable(ChangeLoggedModel, CustomFieldModel):
         instance._orig_termination_b_type_id = instance.termination_b_type_id
         instance._orig_termination_b_id = instance.termination_b_id
 
-        instance._orig_status = instance.status
-
         return instance
 
     def __str__(self):

+ 1 - 1
netbox/dcim/signals.py

@@ -86,7 +86,7 @@ def update_connected_endpoints(instance, created, **kwargs):
         # may change in the future.) However, we do need to capture status changes and update
         # any CablePaths accordingly.
         if instance.status != CableStatusChoices.STATUS_CONNECTED:
-            CablePath.objects.filter(path__contains=object_to_path_node(instance)).update(is_connected=False)
+            CablePath.objects.filter(path__contains=[object_to_path_node(instance)]).update(is_connected=False)
         else:
             rebuild_paths(instance)
 

+ 102 - 26
netbox/dcim/tests/test_cablepaths.py

@@ -2,6 +2,7 @@ from django.contrib.contenttypes.models import ContentType
 from django.test import TestCase
 
 from circuits.models import *
+from dcim.choices import CableStatusChoices
 from dcim.models import *
 from dcim.utils import objects_to_path
 
@@ -70,13 +71,14 @@ class CablePathTestCase(TestCase):
         ]
         CircuitTermination.objects.bulk_create(cls.circuit_terminations)
 
-    def assertPathExists(self, origin, destination, path=None, msg=None):
+    def assertPathExists(self, origin, destination, path=None, is_connected=None, msg=None):
         """
         Assert that a CablePath from origin to destination with a specific intermediate path exists.
 
         :param origin: Originating endpoint
         :param destination: Terminating endpoint, or None
         :param path: Sequence of objects comprising the intermediate path (optional)
+        :param is_connected: Boolean indicating whether the end-to-end path is complete and active (optional)
         :param msg: Custom failure message (optional)
         """
         kwargs = {
@@ -91,6 +93,8 @@ class CablePathTestCase(TestCase):
             kwargs['destination_id__isnull'] = True
         if path is not None:
             kwargs['path'] = objects_to_path(*path)
+        if is_connected is not None:
+            kwargs['is_connected'] = is_connected
         if msg is None:
             if destination is not None:
                 msg = f"Missing path from {origin} to {destination}"
@@ -108,12 +112,14 @@ class CablePathTestCase(TestCase):
         self.assertPathExists(
             origin=self.interfaces[0],
             destination=self.interfaces[1],
-            path=(cable1,)
+            path=(cable1,),
+            is_connected=True
         )
         self.assertPathExists(
             origin=self.interfaces[1],
             destination=self.interfaces[0],
-            path=(cable1,)
+            path=(cable1,),
+            is_connected=True
         )
         self.assertEqual(CablePath.objects.count(), 2)
 
@@ -133,7 +139,8 @@ class CablePathTestCase(TestCase):
         self.assertPathExists(
             origin=self.interfaces[0],
             destination=None,
-            path=(cable1, self.front_ports[16], self.rear_ports[4])
+            path=(cable1, self.front_ports[16], self.rear_ports[4]),
+            is_connected=False
         )
         self.assertEqual(CablePath.objects.count(), 1)
 
@@ -143,12 +150,14 @@ class CablePathTestCase(TestCase):
         self.assertPathExists(
             origin=self.interfaces[0],
             destination=self.interfaces[1],
-            path=(cable1, self.front_ports[16], self.rear_ports[4], cable2)
+            path=(cable1, self.front_ports[16], self.rear_ports[4], cable2),
+            is_connected=True
         )
         self.assertPathExists(
             origin=self.interfaces[1],
             destination=self.interfaces[0],
-            path=(cable2, self.rear_ports[4], self.front_ports[16], cable1)
+            path=(cable2, self.rear_ports[4], self.front_ports[16], cable1),
+            is_connected=True
         )
         self.assertEqual(CablePath.objects.count(), 2)
 
@@ -157,7 +166,8 @@ class CablePathTestCase(TestCase):
         self.assertPathExists(
             origin=self.interfaces[0],
             destination=None,
-            path=(cable1, self.front_ports[16], self.rear_ports[4])
+            path=(cable1, self.front_ports[16], self.rear_ports[4]),
+            is_connected=False
         )
         self.assertEqual(CablePath.objects.count(), 1)
 
@@ -174,12 +184,14 @@ class CablePathTestCase(TestCase):
         self.assertPathExists(
             origin=self.interfaces[0],
             destination=None,
-            path=(cable1, self.front_ports[0], self.rear_ports[0])
+            path=(cable1, self.front_ports[0], self.rear_ports[0]),
+            is_connected=False
         )
         self.assertPathExists(
             origin=self.interfaces[1],
             destination=None,
-            path=(cable2, self.front_ports[1], self.rear_ports[0])
+            path=(cable2, self.front_ports[1], self.rear_ports[0]),
+            is_connected=False
         )
         self.assertEqual(CablePath.objects.count(), 2)
 
@@ -189,12 +201,14 @@ class CablePathTestCase(TestCase):
         self.assertPathExists(
             origin=self.interfaces[0],
             destination=None,
-            path=(cable1, self.front_ports[0], self.rear_ports[0], cable3, self.rear_ports[1], self.front_ports[4])
+            path=(cable1, self.front_ports[0], self.rear_ports[0], cable3, self.rear_ports[1], self.front_ports[4]),
+            is_connected=False
         )
         self.assertPathExists(
             origin=self.interfaces[1],
             destination=None,
-            path=(cable2, self.front_ports[1], self.rear_ports[0], cable3, self.rear_ports[1], self.front_ports[5])
+            path=(cable2, self.front_ports[1], self.rear_ports[0], cable3, self.rear_ports[1], self.front_ports[5]),
+            is_connected=False
         )
         self.assertEqual(CablePath.objects.count(), 2)
 
@@ -209,7 +223,8 @@ class CablePathTestCase(TestCase):
             path=(
                 cable1, self.front_ports[0], self.rear_ports[0], cable3, self.rear_ports[1], self.front_ports[4],
                 cable4,
-            )
+            ),
+            is_connected=True
         )
         self.assertPathExists(
             origin=self.interfaces[1],
@@ -217,7 +232,8 @@ class CablePathTestCase(TestCase):
             path=(
                 cable2, self.front_ports[1], self.rear_ports[0], cable3, self.rear_ports[1], self.front_ports[5],
                 cable5,
-            )
+            ),
+            is_connected=True
         )
         self.assertPathExists(
             origin=self.interfaces[2],
@@ -225,7 +241,8 @@ class CablePathTestCase(TestCase):
             path=(
                 cable4, self.front_ports[4], self.rear_ports[1], cable3, self.rear_ports[0], self.front_ports[0],
                 cable1
-            )
+            ),
+            is_connected=True
         )
         self.assertPathExists(
             origin=self.interfaces[3],
@@ -233,7 +250,8 @@ class CablePathTestCase(TestCase):
             path=(
                 cable5, self.front_ports[5], self.rear_ports[1], cable3, self.rear_ports[0], self.front_ports[1],
                 cable2
-            )
+            ),
+            is_connected=True
         )
         self.assertEqual(CablePath.objects.count(), 4)
 
@@ -277,7 +295,8 @@ class CablePathTestCase(TestCase):
                 cable1, self.front_ports[0], self.rear_ports[0], cable3, self.front_ports[4], self.rear_ports[1],
                 cable4, self.rear_ports[2], self.front_ports[8], cable5, self.rear_ports[3], self.front_ports[12],
                 cable6
-            )
+            ),
+            is_connected=True
         )
         self.assertPathExists(
             origin=self.interfaces[1],
@@ -286,7 +305,8 @@ class CablePathTestCase(TestCase):
                 cable2, self.front_ports[1], self.rear_ports[0], cable3, self.front_ports[4], self.rear_ports[1],
                 cable4, self.rear_ports[2], self.front_ports[8], cable5, self.rear_ports[3], self.front_ports[13],
                 cable7
-            )
+            ),
+            is_connected=True
         )
         self.assertPathExists(
             origin=self.interfaces[2],
@@ -295,7 +315,8 @@ class CablePathTestCase(TestCase):
                 cable6, self.front_ports[12], self.rear_ports[3], cable5, self.front_ports[8], self.rear_ports[2],
                 cable4, self.rear_ports[1], self.front_ports[4], cable3, self.rear_ports[0], self.front_ports[0],
                 cable1
-            )
+            ),
+            is_connected=True
         )
         self.assertPathExists(
             origin=self.interfaces[3],
@@ -304,7 +325,8 @@ class CablePathTestCase(TestCase):
                 cable7, self.front_ports[13], self.rear_ports[3], cable5, self.front_ports[8], self.rear_ports[2],
                 cable4, self.rear_ports[1], self.front_ports[4], cable3, self.rear_ports[0], self.front_ports[1],
                 cable2
-            )
+            ),
+            is_connected=True
         )
         self.assertEqual(CablePath.objects.count(), 4)
 
@@ -342,7 +364,8 @@ class CablePathTestCase(TestCase):
             path=(
                 cable1, self.front_ports[0], self.rear_ports[0], cable3, self.front_ports[16], self.rear_ports[4],
                 cable4, self.rear_ports[1], self.front_ports[4], cable5
-            )
+            ),
+            is_connected=True
         )
         self.assertPathExists(
             origin=self.interfaces[1],
@@ -350,7 +373,8 @@ class CablePathTestCase(TestCase):
             path=(
                 cable2, self.front_ports[1], self.rear_ports[0], cable3, self.front_ports[16], self.rear_ports[4],
                 cable4, self.rear_ports[1], self.front_ports[5], cable6
-            )
+            ),
+            is_connected=True
         )
         self.assertPathExists(
             origin=self.interfaces[2],
@@ -358,7 +382,8 @@ class CablePathTestCase(TestCase):
             path=(
                 cable5, self.front_ports[4], self.rear_ports[1], cable4, self.rear_ports[4], self.front_ports[16],
                 cable3, self.rear_ports[0], self.front_ports[0], cable1
-            )
+            ),
+            is_connected=True
         )
         self.assertPathExists(
             origin=self.interfaces[3],
@@ -366,7 +391,8 @@ class CablePathTestCase(TestCase):
             path=(
                 cable6, self.front_ports[5], self.rear_ports[1], cable4, self.rear_ports[4], self.front_ports[16],
                 cable3, self.rear_ports[0], self.front_ports[1], cable2
-            )
+            ),
+            is_connected=True
         )
         self.assertEqual(CablePath.objects.count(), 4)
 
@@ -392,7 +418,8 @@ class CablePathTestCase(TestCase):
         self.assertPathExists(
             origin=self.interfaces[0],
             destination=None,
-            path=(cable1, self.front_ports[16], self.rear_ports[4], cable2, self.rear_ports[5], self.front_ports[17])
+            path=(cable1, self.front_ports[16], self.rear_ports[4], cable2, self.rear_ports[5], self.front_ports[17]),
+            is_connected=False
         )
         self.assertEqual(CablePath.objects.count(), 1)
 
@@ -405,7 +432,8 @@ class CablePathTestCase(TestCase):
             path=(
                 cable1, self.front_ports[16], self.rear_ports[4], cable2, self.rear_ports[5], self.front_ports[17],
                 cable3,
-            )
+            ),
+            is_connected=True
         )
         self.assertPathExists(
             origin=self.interfaces[1],
@@ -413,6 +441,54 @@ class CablePathTestCase(TestCase):
             path=(
                 cable3, self.front_ports[17], self.rear_ports[5], cable2, self.rear_ports[4], self.front_ports[16],
                 cable1,
-            )
+            ),
+            is_connected=True
+        )
+        self.assertEqual(CablePath.objects.count(), 2)
+
+    def test_07_change_cable_status(self):
+        """
+        [IF1] --C1-- [FP5] [RP5] --C2-- [IF2]
+        """
+        # Create cables 1 and 2
+        cable1 = Cable(termination_a=self.interfaces[0], termination_b=self.front_ports[16])
+        cable1.save()
+        cable2 = Cable(termination_a=self.rear_ports[4], termination_b=self.interfaces[1])
+        cable2.save()
+        self.assertEqual(CablePath.objects.filter(is_connected=True).count(), 2)
+        self.assertEqual(CablePath.objects.count(), 2)
+
+        # Change cable 2's status to "planned"
+        cable2.status = CableStatusChoices.STATUS_PLANNED
+        cable2.save()
+        self.assertPathExists(
+            origin=self.interfaces[0],
+            destination=self.interfaces[1],
+            path=(cable1, self.front_ports[16], self.rear_ports[4], cable2),
+            is_connected=False
+        )
+        self.assertPathExists(
+            origin=self.interfaces[1],
+            destination=self.interfaces[0],
+            path=(cable2, self.rear_ports[4], self.front_ports[16], cable1),
+            is_connected=False
+        )
+        self.assertEqual(CablePath.objects.count(), 2)
+
+        # Change cable 2's status to "connected"
+        cable2 = Cable.objects.get(pk=cable2.pk)
+        cable2.status = CableStatusChoices.STATUS_CONNECTED
+        cable2.save()
+        self.assertPathExists(
+            origin=self.interfaces[0],
+            destination=self.interfaces[1],
+            path=(cable1, self.front_ports[16], self.rear_ports[4], cable2),
+            is_connected=True
+        )
+        self.assertPathExists(
+            origin=self.interfaces[1],
+            destination=self.interfaces[0],
+            path=(cable2, self.rear_ports[4], self.front_ports[16], cable1),
+            is_connected=True
         )
         self.assertEqual(CablePath.objects.count(), 2)

+ 3 - 0
netbox/dcim/utils.py

@@ -63,4 +63,7 @@ def trace_path(node):
             destination = peer_termination
             break
 
+    if destination is None:
+        is_connected = False
+
     return path, destination, is_connected