2
0
Эх сурвалжийг харах

Fixes #21653: Fix multi-position tracing in `CablePath.from_origin()` (#21681)

* Add failing tests for multi-position cable path tracing

* Fix multi-position tracing in CablePath.from_origin()

* Add failing test for multi-connector trunk cable tracing through patch panel

* Fix multi-connector profiled cable tracing in CablePath.from_origin()
Jonathan Senecal 17 цаг өмнө
parent
commit
d57f230f37

+ 29 - 6
netbox/dcim/models/cables.py

@@ -820,9 +820,9 @@ class CablePath(models.Model):
             path.append([
                 object_to_path_node(t) for t in terminations
             ])
-            # If not null, push cable position onto the stack
+            # If not null, push cable positions onto the stack
             if isinstance(terminations[0], PathEndpoint) and terminations[0].cable_positions:
-                position_stack.append([terminations[0].cable_positions[0]])
+                position_stack.append(list(terminations[0].cable_positions))
 
             # Step 2: Determine the attached links (Cable or WirelessLink), if any
             links = list(dict.fromkeys(
@@ -863,10 +863,33 @@ class CablePath(models.Model):
                 # Profile-based tracing
                 if links[0].profile:
                     cable_profile = links[0].profile_class()
-                    position = position_stack.pop()[0] if position_stack else None
-                    term, position = cable_profile.get_peer_termination(terminations[0], position)
-                    remote_terminations = [term]
-                    position_stack.append([position])
+                    positions = position_stack.pop() if position_stack else [None]
+                    remote_terminations = []
+                    new_positions = []
+
+                    # Build (termination, position) pairs by matching stacked positions
+                    # to each termination's cable_positions. This correctly handles
+                    # multiple terminations on different connectors of the same cable.
+                    remaining = list(positions)
+                    term_position_pairs = []
+                    for term in terminations:
+                        if term.cable_positions:
+                            for cp in term.cable_positions:
+                                if cp in remaining:
+                                    term_position_pairs.append((term, cp))
+                                    remaining.remove(cp)
+
+                    # Fallback for when positions don't match cable_positions
+                    # (e.g., empty position stack yielding [None])
+                    if not term_position_pairs:
+                        term_position_pairs = [(terminations[0], pos) for pos in positions]
+
+                    for term, pos in term_position_pairs:
+                        peer, new_pos = cable_profile.get_peer_termination(term, pos)
+                        if peer not in remote_terminations:
+                            remote_terminations.append(peer)
+                        new_positions.append(new_pos)
+                    position_stack.append(new_positions)
 
                 # Legacy (positionless) behavior
                 else:

+ 426 - 0
netbox/dcim/tests/test_cablepaths2.py

@@ -797,6 +797,432 @@ class CablePathTests(CablePathTestCase):
         # Test SVG generation
         CableTraceSVG(interfaces[0]).render()
 
+    def test_107_duplex_interface_profiled_patch_through_trunk_with_splices(self):
+        """
+        Tests that a duplex interface (cable_positions=[1,2]) traces both positions through
+        profiled cables and splice pass-throughs, producing a single CablePath with both
+        strands visible.
+
+        [IF1] -C1(1C2P)- [FP1(p=2)][RP1(p=2)] -C2(1C2P)- [RP2(p=2)]
+        [FP2] -C3- [FP4][RP3(p=2)] -C4(1C2P)- [RP4(p=2)][FP6(p=2)]
+        -C5(1C2P)- [IF2]  /  [FP3] -C6- [FP5]
+
+        Cable profiles: C1=1C2P, C2=1C2P, C3/C6=unprofiled splices, C4=1C2P, C5=1C2P
+        """
+        interfaces = [
+            Interface.objects.create(device=self.device, name='Interface 1'),
+            Interface.objects.create(device=self.device, name='Interface 2'),
+        ]
+        rear_ports = [
+            RearPort.objects.create(device=self.device, name='Rear Port 1', positions=2),
+            RearPort.objects.create(device=self.device, name='Rear Port 2', positions=2),
+            RearPort.objects.create(device=self.device, name='Rear Port 3', positions=2),
+            RearPort.objects.create(device=self.device, name='Rear Port 4', positions=2),
+        ]
+        front_ports = [
+            FrontPort.objects.create(device=self.device, name='Front Port 1', positions=2),  # Panel A duplex
+            FrontPort.objects.create(device=self.device, name='Front Port 2'),               # Splice A strand 1
+            FrontPort.objects.create(device=self.device, name='Front Port 3'),               # Splice A strand 2
+            FrontPort.objects.create(device=self.device, name='Front Port 4'),               # Splice B strand 1
+            FrontPort.objects.create(device=self.device, name='Front Port 5'),               # Splice B strand 2
+            FrontPort.objects.create(device=self.device, name='Front Port 6', positions=2),  # Panel B duplex
+        ]
+        PortMapping.objects.bulk_create([
+            # Panel A: duplex FP1(pos=2) -> RP1(pos=2)
+            PortMapping(
+                device=self.device, front_port=front_ports[0], front_port_position=1,
+                rear_port=rear_ports[0], rear_port_position=1,
+            ),
+            PortMapping(
+                device=self.device, front_port=front_ports[0], front_port_position=2,
+                rear_port=rear_ports[0], rear_port_position=2,
+            ),
+            # Splice A: FP2, FP3 -> RP2(pos=2)
+            PortMapping(
+                device=self.device, front_port=front_ports[1], front_port_position=1,
+                rear_port=rear_ports[1], rear_port_position=1,
+            ),
+            PortMapping(
+                device=self.device, front_port=front_ports[2], front_port_position=1,
+                rear_port=rear_ports[1], rear_port_position=2,
+            ),
+            # Splice B: FP4, FP5 -> RP3(pos=2)
+            PortMapping(
+                device=self.device, front_port=front_ports[3], front_port_position=1,
+                rear_port=rear_ports[2], rear_port_position=1,
+            ),
+            PortMapping(
+                device=self.device, front_port=front_ports[4], front_port_position=1,
+                rear_port=rear_ports[2], rear_port_position=2,
+            ),
+            # Panel B: duplex FP6(pos=2) -> RP4(pos=2)
+            PortMapping(
+                device=self.device, front_port=front_ports[5], front_port_position=1,
+                rear_port=rear_ports[3], rear_port_position=1,
+            ),
+            PortMapping(
+                device=self.device, front_port=front_ports[5], front_port_position=2,
+                rear_port=rear_ports[3], rear_port_position=2,
+            ),
+        ])
+
+        # Create cables
+        cable1 = Cable(
+            profile=CableProfileChoices.SINGLE_1C2P,
+            a_terminations=[interfaces[0]],
+            b_terminations=[front_ports[0]],
+        )
+        cable1.clean()
+        cable1.save()
+        cable2 = Cable(
+            profile=CableProfileChoices.SINGLE_1C2P,
+            a_terminations=[rear_ports[0]],
+            b_terminations=[rear_ports[1]],
+        )
+        cable2.clean()
+        cable2.save()
+        cable3 = Cable(
+            a_terminations=[front_ports[1]],
+            b_terminations=[front_ports[3]],
+        )
+        cable3.clean()
+        cable3.save()
+        cable4 = Cable(
+            profile=CableProfileChoices.SINGLE_1C2P,
+            a_terminations=[rear_ports[2]],
+            b_terminations=[rear_ports[3]],
+        )
+        cable4.clean()
+        cable4.save()
+        cable5 = Cable(
+            profile=CableProfileChoices.SINGLE_1C2P,
+            a_terminations=[front_ports[5]],
+            b_terminations=[interfaces[1]],
+        )
+        cable5.clean()
+        cable5.save()
+        cable6 = Cable(
+            a_terminations=[front_ports[2]],
+            b_terminations=[front_ports[4]],
+        )
+        cable6.clean()
+        cable6.save()
+
+        # Verify forward path: IF1 -> IF2 (both strands through splice)
+        self.assertPathExists(
+            (
+                interfaces[0], cable1, front_ports[0],
+                rear_ports[0], cable2, rear_ports[1],
+                [front_ports[1], front_ports[2]], [cable3, cable6], [front_ports[3], front_ports[4]],
+                rear_ports[2], cable4, rear_ports[3],
+                front_ports[5], cable5, interfaces[1],
+            ),
+            is_complete=True,
+            is_active=True
+        )
+        # Verify reverse path: IF2 -> IF1
+        self.assertPathExists(
+            (
+                interfaces[1], cable5, front_ports[5],
+                rear_ports[3], cable4, rear_ports[2],
+                [front_ports[3], front_ports[4]], [cable3, cable6], [front_ports[1], front_ports[2]],
+                rear_ports[1], cable2, rear_ports[0],
+                front_ports[0], cable1, interfaces[0],
+            ),
+            is_complete=True,
+            is_active=True
+        )
+        self.assertEqual(CablePath.objects.count(), 2)
+
+        # Verify cable positions on interfaces
+        for iface in interfaces:
+            iface.refresh_from_db()
+        self.assertEqual(interfaces[0].cable_connector, 1)
+        self.assertEqual(interfaces[0].cable_positions, [1, 2])
+        self.assertEqual(interfaces[1].cable_connector, 1)
+        self.assertEqual(interfaces[1].cable_positions, [1, 2])
+
+        # Test SVG generation
+        CableTraceSVG(interfaces[0]).render()
+
+    def test_108_single_interface_two_frontports_unprofiled_through_trunk_with_splices(self):
+        """
+        Tests that positions seeded by PortMapping (not cable_positions) are preserved
+        when crossing profiled cables.
+
+        [IF1] -C1- [FP1,FP2][RP1(p=2)] -C2(1C2P)- [RP2(p=2)]
+        [FP3] -C3- [FP5][RP3(p=2)] -C4(1C2P)- [RP4(p=2)]
+        [FP7,FP8] -C5- [IF2]  /  [FP4] -C6- [FP6]
+
+        PortMappings: FP1->RP1p1, FP2->RP1p2, FP3->RP2p1, FP4->RP2p2,
+                      FP5->RP3p1, FP6->RP3p2, FP7->RP4p1, FP8->RP4p2
+
+        C1 is unprofiled (1 IF -> 2 FPs), C2/C4 are 1C2P trunks,
+        C3/C6 are unprofiled splices, C5 is unprofiled (2 FPs -> 1 IF).
+        """
+        interfaces = [
+            Interface.objects.create(device=self.device, name='Interface 1'),
+            Interface.objects.create(device=self.device, name='Interface 2'),
+        ]
+        rear_ports = [
+            RearPort.objects.create(device=self.device, name='Rear Port 1', positions=2),
+            RearPort.objects.create(device=self.device, name='Rear Port 2', positions=2),
+            RearPort.objects.create(device=self.device, name='Rear Port 3', positions=2),
+            RearPort.objects.create(device=self.device, name='Rear Port 4', positions=2),
+        ]
+        front_ports = [
+            FrontPort.objects.create(device=self.device, name='Front Port 1'),  # Panel A strand 1
+            FrontPort.objects.create(device=self.device, name='Front Port 2'),  # Panel A strand 2
+            FrontPort.objects.create(device=self.device, name='Front Port 3'),  # Splice A strand 1
+            FrontPort.objects.create(device=self.device, name='Front Port 4'),  # Splice A strand 2
+            FrontPort.objects.create(device=self.device, name='Front Port 5'),  # Splice B strand 1
+            FrontPort.objects.create(device=self.device, name='Front Port 6'),  # Splice B strand 2
+            FrontPort.objects.create(device=self.device, name='Front Port 7'),  # Panel B strand 1
+            FrontPort.objects.create(device=self.device, name='Front Port 8'),  # Panel B strand 2
+        ]
+        PortMapping.objects.bulk_create([
+            # Panel A: FP1, FP2 -> RP1(pos=2)
+            PortMapping(
+                device=self.device, front_port=front_ports[0], front_port_position=1,
+                rear_port=rear_ports[0], rear_port_position=1,
+            ),
+            PortMapping(
+                device=self.device, front_port=front_ports[1], front_port_position=1,
+                rear_port=rear_ports[0], rear_port_position=2,
+            ),
+            # Splice A: FP3, FP4 -> RP2(pos=2)
+            PortMapping(
+                device=self.device, front_port=front_ports[2], front_port_position=1,
+                rear_port=rear_ports[1], rear_port_position=1,
+            ),
+            PortMapping(
+                device=self.device, front_port=front_ports[3], front_port_position=1,
+                rear_port=rear_ports[1], rear_port_position=2,
+            ),
+            # Splice B: FP5, FP6 -> RP3(pos=2)
+            PortMapping(
+                device=self.device, front_port=front_ports[4], front_port_position=1,
+                rear_port=rear_ports[2], rear_port_position=1,
+            ),
+            PortMapping(
+                device=self.device, front_port=front_ports[5], front_port_position=1,
+                rear_port=rear_ports[2], rear_port_position=2,
+            ),
+            # Panel B: FP7, FP8 -> RP4(pos=2)
+            PortMapping(
+                device=self.device, front_port=front_ports[6], front_port_position=1,
+                rear_port=rear_ports[3], rear_port_position=1,
+            ),
+            PortMapping(
+                device=self.device, front_port=front_ports[7], front_port_position=1,
+                rear_port=rear_ports[3], rear_port_position=2,
+            ),
+        ])
+
+        # Create cables
+        cable1 = Cable(
+            a_terminations=[interfaces[0]],
+            b_terminations=[front_ports[0], front_ports[1]],
+        )
+        cable1.clean()
+        cable1.save()
+        cable2 = Cable(
+            profile=CableProfileChoices.SINGLE_1C2P,
+            a_terminations=[rear_ports[0]],
+            b_terminations=[rear_ports[1]],
+        )
+        cable2.clean()
+        cable2.save()
+        cable3 = Cable(
+            a_terminations=[front_ports[2]],
+            b_terminations=[front_ports[4]],
+        )
+        cable3.clean()
+        cable3.save()
+        cable4 = Cable(
+            profile=CableProfileChoices.SINGLE_1C2P,
+            a_terminations=[rear_ports[2]],
+            b_terminations=[rear_ports[3]],
+        )
+        cable4.clean()
+        cable4.save()
+        cable5 = Cable(
+            a_terminations=[front_ports[6], front_ports[7]],
+            b_terminations=[interfaces[1]],
+        )
+        cable5.clean()
+        cable5.save()
+        cable6 = Cable(
+            a_terminations=[front_ports[3]],
+            b_terminations=[front_ports[5]],
+        )
+        cable6.clean()
+        cable6.save()
+
+        # Verify forward path: IF1 -> IF2 (both strands through splice)
+        self.assertPathExists(
+            (
+                interfaces[0], cable1, [front_ports[0], front_ports[1]],
+                rear_ports[0], cable2, rear_ports[1],
+                [front_ports[2], front_ports[3]], [cable3, cable6], [front_ports[4], front_ports[5]],
+                rear_ports[2], cable4, rear_ports[3],
+                [front_ports[6], front_ports[7]], cable5, interfaces[1],
+            ),
+            is_complete=True,
+            is_active=True
+        )
+        # Verify reverse path: IF2 -> IF1
+        self.assertPathExists(
+            (
+                interfaces[1], cable5, [front_ports[6], front_ports[7]],
+                rear_ports[3], cable4, rear_ports[2],
+                [front_ports[4], front_ports[5]], [cable3, cable6], [front_ports[2], front_ports[3]],
+                rear_ports[1], cable2, rear_ports[0],
+                [front_ports[0], front_ports[1]], cable1, interfaces[0],
+            ),
+            is_complete=True,
+            is_active=True
+        )
+        self.assertEqual(CablePath.objects.count(), 2)
+
+        # Verify cable positions are not set (unprofiled patch cables)
+        for iface in interfaces:
+            iface.refresh_from_db()
+        self.assertIsNone(interfaces[0].cable_connector)
+        self.assertIsNone(interfaces[0].cable_positions)
+        self.assertIsNone(interfaces[1].cable_connector)
+        self.assertIsNone(interfaces[1].cable_positions)
+
+    def test_109_multiconnector_trunk_through_patch_panel(self):
+        """
+        Tests that a 4-position interface traces correctly through a patch panel
+        that fans out to both connectors of a Trunk2C2P cable.
+
+        [IF1] --C1(1C4P)-- [FP1(p=4)][RP1(p=2)] --C3(Trunk2C2P)-- [RP3(p=2)][FP5(p=4)] --C5(1C4P)-- [IF2]
+                                      [RP2(p=2)]                    [RP4(p=2)]
+
+        PortMappings (Panel A): FP1p1->RP1p1, FP1p2->RP1p2, FP1p3->RP2p1, FP1p4->RP2p2
+        PortMappings (Panel B): FP5p1->RP3p1, FP5p2->RP3p2, FP5p3->RP4p1, FP5p4->RP4p2
+        """
+        interfaces = [
+            Interface.objects.create(device=self.device, name='Interface 1'),
+            Interface.objects.create(device=self.device, name='Interface 2'),
+        ]
+        rear_ports = [
+            RearPort.objects.create(device=self.device, name='Rear Port 1', positions=2),
+            RearPort.objects.create(device=self.device, name='Rear Port 2', positions=2),
+            RearPort.objects.create(device=self.device, name='Rear Port 3', positions=2),
+            RearPort.objects.create(device=self.device, name='Rear Port 4', positions=2),
+        ]
+        front_ports = [
+            FrontPort.objects.create(device=self.device, name='Front Port 1', positions=4),
+            FrontPort.objects.create(device=self.device, name='Front Port 5', positions=4),
+        ]
+        PortMapping.objects.bulk_create([
+            # Panel A: FP1(p=4) -> RP1(p=2) and RP2(p=2)
+            PortMapping(
+                device=self.device, front_port=front_ports[0], front_port_position=1,
+                rear_port=rear_ports[0], rear_port_position=1,
+            ),
+            PortMapping(
+                device=self.device, front_port=front_ports[0], front_port_position=2,
+                rear_port=rear_ports[0], rear_port_position=2,
+            ),
+            PortMapping(
+                device=self.device, front_port=front_ports[0], front_port_position=3,
+                rear_port=rear_ports[1], rear_port_position=1,
+            ),
+            PortMapping(
+                device=self.device, front_port=front_ports[0], front_port_position=4,
+                rear_port=rear_ports[1], rear_port_position=2,
+            ),
+            # Panel B: FP5(p=4) -> RP3(p=2) and RP4(p=2)
+            PortMapping(
+                device=self.device, front_port=front_ports[1], front_port_position=1,
+                rear_port=rear_ports[2], rear_port_position=1,
+            ),
+            PortMapping(
+                device=self.device, front_port=front_ports[1], front_port_position=2,
+                rear_port=rear_ports[2], rear_port_position=2,
+            ),
+            PortMapping(
+                device=self.device, front_port=front_ports[1], front_port_position=3,
+                rear_port=rear_ports[3], rear_port_position=1,
+            ),
+            PortMapping(
+                device=self.device, front_port=front_ports[1], front_port_position=4,
+                rear_port=rear_ports[3], rear_port_position=2,
+            ),
+        ])
+
+        # Create cables
+        cable1 = Cable(
+            profile=CableProfileChoices.SINGLE_1C4P,
+            a_terminations=[interfaces[0]],
+            b_terminations=[front_ports[0]],
+        )
+        cable1.clean()
+        cable1.save()
+        cable3 = Cable(
+            profile=CableProfileChoices.TRUNK_2C2P,
+            a_terminations=[rear_ports[0], rear_ports[1]],
+            b_terminations=[rear_ports[2], rear_ports[3]],
+        )
+        cable3.clean()
+        cable3.save()
+        cable5 = Cable(
+            profile=CableProfileChoices.SINGLE_1C4P,
+            a_terminations=[front_ports[1]],
+            b_terminations=[interfaces[1]],
+        )
+        cable5.clean()
+        cable5.save()
+
+        # Verify forward path: IF1 -> IF2 (all 4 positions through trunk)
+        self.assertPathExists(
+            (
+                interfaces[0], cable1, front_ports[0],
+                [rear_ports[0], rear_ports[1]], cable3, [rear_ports[2], rear_ports[3]],
+                front_ports[1], cable5, interfaces[1],
+            ),
+            is_complete=True,
+            is_active=True
+        )
+        # Verify reverse path: IF2 -> IF1
+        self.assertPathExists(
+            (
+                interfaces[1], cable5, front_ports[1],
+                [rear_ports[2], rear_ports[3]], cable3, [rear_ports[0], rear_ports[1]],
+                front_ports[0], cable1, interfaces[0],
+            ),
+            is_complete=True,
+            is_active=True
+        )
+        self.assertEqual(CablePath.objects.count(), 2)
+
+        # Verify cable positions
+        for iface in interfaces:
+            iface.refresh_from_db()
+        self.assertEqual(interfaces[0].cable_connector, 1)
+        self.assertEqual(interfaces[0].cable_positions, [1, 2, 3, 4])
+        self.assertEqual(interfaces[1].cable_connector, 1)
+        self.assertEqual(interfaces[1].cable_positions, [1, 2, 3, 4])
+
+        # Verify rear port connector assignments
+        for rp in rear_ports:
+            rp.refresh_from_db()
+        self.assertEqual(rear_ports[0].cable_connector, 1)
+        self.assertEqual(rear_ports[0].cable_positions, [1, 2])
+        self.assertEqual(rear_ports[1].cable_connector, 2)
+        self.assertEqual(rear_ports[1].cable_positions, [1, 2])
+        self.assertEqual(rear_ports[2].cable_connector, 1)
+        self.assertEqual(rear_ports[2].cable_positions, [1, 2])
+        self.assertEqual(rear_ports[3].cable_connector, 2)
+        self.assertEqual(rear_ports[3].cable_positions, [1, 2])
+
+        # Test SVG generation
+        CableTraceSVG(interfaces[0]).render()
+
     def test_202_single_path_via_pass_through_with_breakouts(self):
         """
         [IF1] --C1-- [FP1] [RP1] --C2-- [IF3]