Răsfoiți Sursa

Fixes #20432: Allow cablepaths with CircuitTerminations that have different parent Circuit's (#20770)

Daniel Sheppard 3 luni în urmă
părinte
comite
9b89af75e4
2 a modificat fișierele cu 133 adăugiri și 20 ștergeri
  1. 29 20
      netbox/dcim/models/cables.py
  2. 104 0
      netbox/dcim/tests/test_cablepaths.py

+ 29 - 20
netbox/dcim/models/cables.py

@@ -10,6 +10,7 @@ from django.utils.translation import gettext_lazy as _
 from core.models import ObjectType
 from core.models import ObjectType
 from dcim.choices import *
 from dcim.choices import *
 from dcim.constants import *
 from dcim.constants import *
+from dcim.exceptions import UnsupportedCablePath
 from dcim.fields import PathField
 from dcim.fields import PathField
 from dcim.utils import decompile_path_node, object_to_path_node
 from dcim.utils import decompile_path_node, object_to_path_node
 from netbox.choices import ColorChoices
 from netbox.choices import ColorChoices
@@ -28,8 +29,6 @@ __all__ = (
     'CableTermination',
     'CableTermination',
 )
 )
 
 
-from ..exceptions import UnsupportedCablePath
-
 trace_paths = Signal()
 trace_paths = Signal()
 
 
 
 
@@ -615,7 +614,7 @@ class CablePath(models.Model):
         Cable or WirelessLink connects (interfaces, console ports, circuit termination, etc.). All terminations must be
         Cable or WirelessLink connects (interfaces, console ports, circuit termination, etc.). All terminations must be
         of the same type and must belong to the same parent object.
         of the same type and must belong to the same parent object.
         """
         """
-        from circuits.models import CircuitTermination
+        from circuits.models import CircuitTermination, Circuit
 
 
         if not terminations:
         if not terminations:
             return None
             return None
@@ -637,8 +636,11 @@ class CablePath(models.Model):
                 raise UnsupportedCablePath(_("All mid-span terminations must have the same termination type"))
                 raise UnsupportedCablePath(_("All mid-span terminations must have the same termination type"))
 
 
             # All mid-span terminations must all be attached to the same device
             # All mid-span terminations must all be attached to the same device
-            if (not isinstance(terminations[0], PathEndpoint) and not
-                    all(t.parent_object == terminations[0].parent_object for t in terminations[1:])):
+            if (
+                not isinstance(terminations[0], PathEndpoint) and
+                not isinstance(terminations[0].parent_object, Circuit) and
+                not all(t.parent_object == terminations[0].parent_object for t in terminations[1:])
+            ):
                 raise UnsupportedCablePath(_("All mid-span terminations must have the same parent object"))
                 raise UnsupportedCablePath(_("All mid-span terminations must have the same parent object"))
 
 
             # Check for a split path (e.g. rear port fanning out to multiple front ports with
             # Check for a split path (e.g. rear port fanning out to multiple front ports with
@@ -782,32 +784,39 @@ class CablePath(models.Model):
 
 
             elif isinstance(remote_terminations[0], CircuitTermination):
             elif isinstance(remote_terminations[0], CircuitTermination):
                 # Follow a CircuitTermination to its corresponding CircuitTermination (A to Z or vice versa)
                 # Follow a CircuitTermination to its corresponding CircuitTermination (A to Z or vice versa)
-                if len(remote_terminations) > 1:
-                    is_split = True
-                    break
-                circuit_termination = CircuitTermination.objects.filter(
-                    circuit=remote_terminations[0].circuit,
-                    term_side='Z' if remote_terminations[0].term_side == 'A' else 'A'
-                ).first()
-                if circuit_termination is None:
+                qs = Q()
+                for remote_termination in remote_terminations:
+                    qs |= Q(
+                        circuit=remote_termination.circuit,
+                        term_side='Z' if remote_termination.term_side == 'A' else 'A'
+                    )
+
+                # Get all circuit terminations
+                circuit_terminations = CircuitTermination.objects.filter(qs)
+
+                if not circuit_terminations.exists():
                     break
                     break
-                elif circuit_termination._provider_network:
+                elif all([ct._provider_network for ct in circuit_terminations]):
                     # Circuit terminates to a ProviderNetwork
                     # Circuit terminates to a ProviderNetwork
                     path.extend([
                     path.extend([
-                        [object_to_path_node(circuit_termination)],
-                        [object_to_path_node(circuit_termination._provider_network)],
+                        [object_to_path_node(ct) for ct in circuit_terminations],
+                        [object_to_path_node(ct._provider_network) for ct in circuit_terminations],
                     ])
                     ])
                     is_complete = True
                     is_complete = True
                     break
                     break
-                elif circuit_termination.termination and not circuit_termination.cable:
+                elif all([ct.termination and not ct.cable for ct in circuit_terminations]):
                     # Circuit terminates to a Region/Site/etc.
                     # Circuit terminates to a Region/Site/etc.
                     path.extend([
                     path.extend([
-                        [object_to_path_node(circuit_termination)],
-                        [object_to_path_node(circuit_termination.termination)],
+                        [object_to_path_node(ct) for ct in circuit_terminations],
+                        [object_to_path_node(ct.termination) for ct in circuit_terminations],
                     ])
                     ])
                     break
                     break
+                elif any([ct.cable in links for ct in circuit_terminations]):
+                    # No valid path
+                    is_split = True
+                    break
 
 
-                terminations = [circuit_termination]
+                terminations = circuit_terminations
 
 
             else:
             else:
                 # Check for non-symmetric path
                 # Check for non-symmetric path

+ 104 - 0
netbox/dcim/tests/test_cablepaths.py

@@ -2270,6 +2270,80 @@ class CablePathTestCase(TestCase):
         CableTraceSVG(interface1).render()
         CableTraceSVG(interface1).render()
         CableTraceSVG(interface2).render()
         CableTraceSVG(interface2).render()
 
 
+    def test_223_interface_to_interface_via_multiple_circuit_terminations(self):
+        provider = Provider.objects.first()
+        circuit_type = CircuitType.objects.first()
+        circuit1 = self.circuit
+        circuit2 = Circuit.objects.create(provider=provider, type=circuit_type, cid='Circuit 2')
+        interface1 = Interface.objects.create(device=self.device, name='Interface 1')
+        interface2 = Interface.objects.create(device=self.device, name='Interface 2')
+        circuittermination1_A = CircuitTermination.objects.create(
+            circuit=circuit1,
+            termination=self.site,
+            term_side='A'
+        )
+        circuittermination1_Z = CircuitTermination.objects.create(
+            circuit=circuit1,
+            termination=self.site,
+            term_side='Z'
+        )
+        circuittermination2_A = CircuitTermination.objects.create(
+            circuit=circuit2,
+            termination=self.site,
+            term_side='A'
+        )
+        circuittermination2_Z = CircuitTermination.objects.create(
+            circuit=circuit2,
+            termination=self.site,
+            term_side='Z'
+        )
+
+        # Create cables
+        cable1 = Cable(
+            a_terminations=[interface1],
+            b_terminations=[circuittermination1_A, circuittermination2_A]
+        )
+        cable2 = Cable(
+            a_terminations=[interface2],
+            b_terminations=[circuittermination1_Z, circuittermination2_Z]
+        )
+        cable1.save()
+        cable2.save()
+
+        self.assertEqual(CablePath.objects.count(), 2)
+
+        path1 = self.assertPathExists(
+            (
+                interface1,
+                cable1,
+                (circuittermination1_A, circuittermination2_A),
+                (circuittermination1_Z, circuittermination2_Z),
+                cable2,
+                interface2
+
+            ),
+            is_active=True,
+            is_complete=True,
+        )
+        interface1.refresh_from_db()
+        self.assertPathIsSet(interface1, path1)
+
+        path2 = self.assertPathExists(
+            (
+                interface2,
+                cable2,
+                (circuittermination1_Z, circuittermination2_Z),
+                (circuittermination1_A, circuittermination2_A),
+                cable1,
+                interface1
+
+            ),
+            is_active=True,
+            is_complete=True,
+        )
+        interface2.refresh_from_db()
+        self.assertPathIsSet(interface2, path2)
+
     def test_301_create_path_via_existing_cable(self):
     def test_301_create_path_via_existing_cable(self):
         """
         """
         [IF1] --C1-- [FP1] [RP1] --C2-- [RP2] [FP2] --C3-- [IF2]
         [IF1] --C1-- [FP1] [RP1] --C2-- [RP2] [FP2] --C3-- [IF2]
@@ -2510,3 +2584,33 @@ class CablePathTestCase(TestCase):
             is_active=True
             is_active=True
         )
         )
         self.assertEqual(CablePath.objects.count(), 0)
         self.assertEqual(CablePath.objects.count(), 0)
+
+    def test_402_exclude_circuit_loopback(self):
+        interface = Interface.objects.create(device=self.device, name='Interface 1')
+        circuittermination1 = CircuitTermination.objects.create(
+            circuit=self.circuit,
+            termination=self.site,
+            term_side='A'
+        )
+        circuittermination2 = CircuitTermination.objects.create(
+            circuit=self.circuit,
+            termination=self.site,
+            term_side='Z'
+        )
+
+        # Create cables
+        cable = Cable(
+            a_terminations=[interface],
+            b_terminations=[circuittermination1, circuittermination2]
+        )
+        cable.save()
+
+        path = self.assertPathExists(
+            (interface, cable, (circuittermination1, circuittermination2)),
+            is_active=True,
+            is_complete=False,
+            is_split=True
+        )
+        self.assertEqual(CablePath.objects.count(), 1)
+        interface.refresh_from_db()
+        self.assertPathIsSet(interface, path)