Просмотр исходного кода

Fixes: #11079 - Handle cables across multiple rear-port positions (#13337)

* Catch AssertionError's in signals.  Handle accordingly

* Alter cable logic to handle certain additional path types.

* Fix failures and add test

* More tests

* Remove not needed tests, add additional tests

* Finish tests, correct some behaviour

* Add check for mid-span device not allowed condition

* Remove excess import

* Remove logging import

* Remove logging import

* Minor tweaks based on Arthur's feedback

* Update netbox/dcim/tests/test_cablepaths.py

Co-authored-by: Jeremy Stretch <jstretch@netboxlabs.com>

* Update netbox/dcim/models/cables.py

Co-authored-by: Jeremy Stretch <jstretch@netboxlabs.com>

* Changes to account for required SVG rendering changes and based on feedback

* More tweaks for cable path checking

* Improve handling of links with multi-terminations

* Improved SVG rendering of multiple rear ports (with positions) per path trace.  Include asymmetric path detection

* Include missing assert to ensure links are same type.

* Clean up tests

* Remove unused objects from tests

* Changes requested to tests and update comments/doctstrings

* Fix parent reference

---------

Co-authored-by: Jeremy Stretch <jstretch@netboxlabs.com>
Daniel Sheppard 2 лет назад
Родитель
Сommit
f65744faee

+ 91 - 31
netbox/dcim/models/cables.py

@@ -20,7 +20,7 @@ from utilities.fields import ColorField
 from utilities.querysets import RestrictedQuerySet
 from utilities.utils import to_meters
 from wireless.models import WirelessLink
-from .device_components import FrontPort, RearPort
+from .device_components import FrontPort, RearPort, PathEndpoint
 
 __all__ = (
     'Cable',
@@ -518,9 +518,16 @@ class CablePath(models.Model):
             # Terminations must all be of the same type
             assert all(isinstance(t, type(terminations[0])) for t in terminations[1:])
 
+            # All mid-span terminations must all be attached to the same device
+            if not isinstance(terminations[0], PathEndpoint):
+                assert all(isinstance(t, type(terminations[0])) for t in terminations[1:])
+                assert all(t.parent_object == terminations[0].parent_object for t in terminations[1:])
+
             # Check for a split path (e.g. rear port fanning out to multiple front ports with
             # different cables attached)
-            if len(set(t.link for t in terminations)) > 1:
+            if len(set(t.link for t in terminations)) > 1 and (
+                    position_stack and len(terminations) != len(position_stack[-1])
+            ):
                 is_split = True
                 break
 
@@ -529,46 +536,68 @@ class CablePath(models.Model):
                 object_to_path_node(t) for t in terminations
             ])
 
-            # Step 2: Determine the attached link (Cable or WirelessLink), if any
-            link = terminations[0].link
-            if link is None and len(path) == 1:
-                # If this is the start of the path and no link exists, return None
-                return None
-            elif link is None:
+            # Step 2: Determine the attached links (Cable or WirelessLink), if any
+            links = [termination.link for termination in terminations if termination.link is not None]
+            if len(links) == 0:
+                if len(path) == 1:
+                    # If this is the start of the path and no link exists, return None
+                    return None
                 # Otherwise, halt the trace if no link exists
                 break
-            assert type(link) in (Cable, WirelessLink)
+            assert all(type(link) in (Cable, WirelessLink) for link in links)
+            assert all(isinstance(link, type(links[0])) for link in links)
+
+            # Step 3: Record asymmetric paths as split
+            not_connected_terminations = [termination.link for termination in terminations if termination.link is None]
+            if len(not_connected_terminations) > 0:
+                is_complete = False
+                is_split = True
 
-            # Step 3: Record the link and update path status if not "connected"
-            path.append([object_to_path_node(link)])
-            if hasattr(link, 'status') and link.status != LinkStatusChoices.STATUS_CONNECTED:
+            # Step 4: Record the links, keeping cables in order to allow for SVG rendering
+            cables = []
+            for link in links:
+                if object_to_path_node(link) not in cables:
+                    cables.append(object_to_path_node(link))
+            path.append(cables)
+
+            # Step 5: Update the path status if a link is not connected
+            links_status = [link.status for link in links if link.status != LinkStatusChoices.STATUS_CONNECTED]
+            if any([status != LinkStatusChoices.STATUS_CONNECTED for status in links_status]):
                 is_active = False
 
-            # Step 4: Determine the far-end terminations
-            if isinstance(link, Cable):
+            # Step 6: Determine the far-end terminations
+            if isinstance(links[0], Cable):
                 termination_type = ContentType.objects.get_for_model(terminations[0])
                 local_cable_terminations = CableTermination.objects.filter(
                     termination_type=termination_type,
                     termination_id__in=[t.pk for t in terminations]
                 )
-                # Terminations must all belong to same end of Cable
-                local_cable_end = local_cable_terminations[0].cable_end
-                assert all(ct.cable_end == local_cable_end for ct in local_cable_terminations[1:])
-                remote_cable_terminations = CableTermination.objects.filter(
-                    cable=link,
-                    cable_end='A' if local_cable_end == 'B' else 'B'
-                )
+
+                q_filter = Q()
+                for lct in local_cable_terminations:
+                    cable_end = 'A' if lct.cable_end == 'B' else 'B'
+                    q_filter |= Q(cable=lct.cable, cable_end=cable_end)
+
+                remote_cable_terminations = CableTermination.objects.filter(q_filter)
                 remote_terminations = [ct.termination for ct in remote_cable_terminations]
             else:
                 # WirelessLink
-                remote_terminations = [link.interface_b] if link.interface_a is terminations[0] else [link.interface_a]
+                remote_terminations = [
+                    link.interface_b if link.interface_a is terminations[0] else link.interface_a for link in links
+                ]
+
+            # Remote Terminations must all be of the same type, otherwise return a split path
+            if not all(isinstance(t, type(remote_terminations[0])) for t in remote_terminations[1:]):
+                is_complete = False
+                is_split = True
+                break
 
-            # Step 5: Record the far-end termination object(s)
+            # Step 7: Record the far-end termination object(s)
             path.append([
                 object_to_path_node(t) for t in remote_terminations if t is not None
             ])
 
-            # Step 6: Determine the "next hop" terminations, if applicable
+            # Step 8: Determine the "next hop" terminations, if applicable
             if not remote_terminations:
                 break
 
@@ -577,20 +606,32 @@ class CablePath(models.Model):
                 rear_ports = RearPort.objects.filter(
                     pk__in=[t.rear_port_id for t in remote_terminations]
                 )
-                if len(rear_ports) > 1:
-                    assert all(rp.positions == 1 for rp in rear_ports)
-                elif rear_ports[0].positions > 1:
+                if len(rear_ports) > 1 or rear_ports[0].positions > 1:
                     position_stack.append([fp.rear_port_position for fp in remote_terminations])
 
                 terminations = rear_ports
 
             elif isinstance(remote_terminations[0], RearPort):
-
-                if len(remote_terminations) > 1 or remote_terminations[0].positions == 1:
+                if len(remote_terminations) == 1 and remote_terminations[0].positions == 1:
                     front_ports = FrontPort.objects.filter(
                         rear_port_id__in=[rp.pk for rp in remote_terminations],
                         rear_port_position=1
                     )
+                # Obtain the individual front ports based on the termination and all positions
+                elif len(remote_terminations) > 1 and position_stack:
+                    positions = position_stack.pop()
+
+                    # Ensure we have a number of positions equal to the amount of remote terminations
+                    assert len(remote_terminations) == len(positions)
+
+                    # Get our front ports
+                    q_filter = Q()
+                    for rt in remote_terminations:
+                        position = positions.pop()
+                        q_filter |= Q(rear_port_id=rt.pk, rear_port_position=position)
+                    assert q_filter is not Q()
+                    front_ports = FrontPort.objects.filter(q_filter)
+                # Obtain the individual front ports based on the termination and position
                 elif position_stack:
                     front_ports = FrontPort.objects.filter(
                         rear_port_id=remote_terminations[0].pk,
@@ -632,9 +673,16 @@ class CablePath(models.Model):
 
                 terminations = [circuit_termination]
 
-            # Anything else marks the end of the path
             else:
-                is_complete = True
+                # Check for non-symmetric path
+                if all(isinstance(t, type(remote_terminations[0])) for t in remote_terminations[1:]):
+                    is_complete = True
+                elif len(remote_terminations) == 0:
+                    is_complete = False
+                else:
+                    # Unsupported topology, mark as split and exit
+                    is_complete = False
+                    is_split = True
                 break
 
         return cls(
@@ -740,3 +788,15 @@ class CablePath(models.Model):
             return [
                 ct.get_peer_termination() for ct in nodes
             ]
+
+    def get_asymmetric_nodes(self):
+        """
+        Return all available next segments in a split cable path.
+        """
+        from circuits.models import CircuitTermination
+        asymmetric_nodes = []
+        for nodes in self.path_objects:
+            if type(nodes[0]) in [RearPort, FrontPort, CircuitTermination]:
+                asymmetric_nodes.extend([node for node in nodes if node.link is None])
+
+        return asymmetric_nodes

+ 109 - 38
netbox/dcim/svg/cables.py

@@ -32,11 +32,18 @@ class Node(Hyperlink):
         color: Box fill color (RRGGBB format)
         labels: An iterable of text strings. Each label will render on a new line within the box.
         radius: Box corner radius, for rounded corners (default: 10)
+        object: A copy of the object to allow reference when drawing cables to determine which cables are connected to
+                which terminations.
     """
 
-    def __init__(self, position, width, url, color, labels, radius=10, **extra):
+    object = None
+
+    def __init__(self, position, width, url, color, labels, radius=10, object=object, **extra):
         super(Node, self).__init__(href=url, target='_parent', **extra)
 
+        # Save object for reference by cable systems
+        self.object = object
+
         x, y = position
 
         # Add the box
@@ -77,7 +84,7 @@ class Connector(Group):
         labels: Iterable of text labels
     """
 
-    def __init__(self, start, url, color, labels=[], **extra):
+    def __init__(self, start, url, color, labels=[], description=[], **extra):
         super().__init__(class_='connector', **extra)
 
         self.start = start
@@ -104,6 +111,8 @@ class Connector(Group):
             text_coords = (start[0] + PADDING * 2, cursor - LINE_HEIGHT / 2)
             text = Text(label, insert=text_coords, class_='bold' if not i else [])
             link.add(text)
+        if len(description) > 0:
+            link.set_desc("\n".join(description))
 
         self.add(link)
 
@@ -206,7 +215,8 @@ class CableTraceSVG:
                 url=f'{self.base_url}{term.get_absolute_url()}',
                 color=self._get_color(term),
                 labels=self._get_labels(term),
-                radius=5
+                radius=5,
+                object=term
             )
             nodes_height = max(nodes_height, node.box['height'])
             nodes.append(node)
@@ -238,22 +248,65 @@ class CableTraceSVG:
             Polyline(points=points, style=f'stroke: #{connector.color}'),
         ))
 
-    def draw_cable(self, cable):
-        labels = [
-            f'Cable {cable}',
-            cable.get_status_display()
-        ]
-        if cable.type:
-            labels.append(cable.get_type_display())
-        if cable.length and cable.length_unit:
-            labels.append(f'{cable.length} {cable.get_length_unit_display()}')
+    def draw_cable(self, cable, terminations, cable_count=0):
+        """
+        Draw a single cable.  Terminations and cable count are passed for determining position and padding
+
+        :param cable: The cable to draw
+        :param terminations: List of terminations to build positioning data off of
+        :param cable_count: Count of all cables on this layer for determining whether to collapse description into a
+                            tooltip.
+        """
+
+        # If the cable count is higher than 2, collapse the description into a tooltip
+        if cable_count > 2:
+            # Use the cable __str__ function to denote the cable
+            labels = [f'{cable}']
+
+            # Include the label and the status description in the tooltip
+            description = [
+                f'Cable {cable}',
+                cable.get_status_display()
+            ]
+
+            if cable.type:
+                # Include the cable type in the tooltip
+                description.append(cable.get_type_display())
+            if cable.length and cable.length_unit:
+                # Include the cable length in the tooltip
+                description.append(f'{cable.length} {cable.get_length_unit_display()}')
+        else:
+            labels = [
+                f'Cable {cable}',
+                cable.get_status_display()
+            ]
+            description = []
+            if cable.type:
+                labels.append(cable.get_type_display())
+            if cable.length and cable.length_unit:
+                # Include the cable length in the tooltip
+                labels.append(f'{cable.length} {cable.get_length_unit_display()}')
+
+        # If there is only one termination, center on that termination
+        # Otherwise average the center across the terminations
+        if len(terminations) == 1:
+            center = terminations[0].bottom_center[0]
+        else:
+            # Get a list of termination centers
+            termination_centers = [term.bottom_center[0] for term in terminations]
+            # Average the centers
+            center = sum(termination_centers) / len(termination_centers)
+
+        # Create the connector
         connector = Connector(
-            start=(self.center + OFFSET, self.cursor),
+            start=(center, self.cursor),
             color=cable.color or '000000',
             url=f'{self.base_url}{cable.get_absolute_url()}',
-            labels=labels
+            labels=labels,
+            description=description
         )
 
+        # Set the cursor position
         self.cursor += connector.height
 
         return connector
@@ -334,34 +387,52 @@ class CableTraceSVG:
 
             # Connector (a Cable or WirelessLink)
             if links:
-                link = links[0]  # Remove Cable from list
+                link_cables = {}
+                fanin = False
+                fanout = False
 
-                # Cable
-                if type(link) is Cable:
-
-                    # Account for fan-ins height
-                    if len(near_ends) > 1:
-                        self.cursor += FANOUT_HEIGHT
-
-                    cable = self.draw_cable(link)
-                    self.connectors.append(cable)
-
-                    # Draw fan-ins
-                    if len(near_ends) > 1:
-                        for term in terminations:
-                            self.draw_fanin(term, cable)
-
-                # WirelessLink
-                elif type(link) is WirelessLink:
-                    wirelesslink = self.draw_wirelesslink(link)
-                    self.connectors.append(wirelesslink)
+                # Determine if we have fanins or fanouts
+                if len(near_ends) > len(set(links)):
+                    self.cursor += FANOUT_HEIGHT
+                    fanin = True
+                if len(far_ends) > len(set(links)):
+                    fanout = True
+                cursor = self.cursor
+                for link in links:
+                    # Cable
+                    if type(link) is Cable and not link_cables.get(link.pk):
+                        # Reset cursor
+                        self.cursor = cursor
+                        # Generate a list of terminations connected to this cable
+                        near_end_link_terminations = [term for term in terminations if term.object.cable == link]
+                        # Draw the cable
+                        cable = self.draw_cable(link, near_end_link_terminations, cable_count=len(links))
+                        # Add cable to the list of cables
+                        link_cables.update({link.pk: cable})
+                        # Add cable to drawing
+                        self.connectors.append(cable)
+
+                        # Draw fan-ins
+                        if len(near_ends) > 1 and fanin:
+                            for term in terminations:
+                                if term.object.cable == link:
+                                    self.draw_fanin(term, cable)
+
+                    # WirelessLink
+                    elif type(link) is WirelessLink:
+                        wirelesslink = self.draw_wirelesslink(link)
+                        self.connectors.append(wirelesslink)
 
                 # Far end termination(s)
                 if len(far_ends) > 1:
-                    self.cursor += FANOUT_HEIGHT
-                    terminations = self.draw_terminations(far_ends)
-                    for term in terminations:
-                        self.draw_fanout(term, cable)
+                    if fanout:
+                        self.cursor += FANOUT_HEIGHT
+                        terminations = self.draw_terminations(far_ends)
+                        for term in terminations:
+                            if hasattr(term.object, 'cable') and link_cables.get(term.object.cable.pk):
+                                self.draw_fanout(term, link_cables.get(term.object.cable.pk))
+                    else:
+                        self.draw_terminations(far_ends)
                 elif far_ends:
                     self.draw_terminations(far_ends)
                 else:

+ 396 - 4
netbox/dcim/tests/test_cablepaths.py

@@ -15,6 +15,7 @@ class CablePathTestCase(TestCase):
         1XX: Test direct connections between different endpoint types
         2XX: Test different cable topologies
         3XX: Test responses to changes in existing objects
+        4XX: Test to exclude specific cable topologies
     """
     @classmethod
     def setUpTestData(cls):
@@ -33,12 +34,11 @@ class CablePathTestCase(TestCase):
         circuit_type = CircuitType.objects.create(name='Circuit Type', slug='circuit-type')
         cls.circuit = Circuit.objects.create(provider=provider, type=circuit_type, cid='Circuit 1')
 
-    def assertPathExists(self, nodes, **kwargs):
+    def _get_cablepath(self, nodes, **kwargs):
         """
-        Assert that a CablePath from origin to destination with a specific intermediate path exists.
+        Return a given cable path
 
         :param nodes: Iterable of steps, with each step being either a single node or a list of nodes
-        :param is_active: Boolean indicating whether the end-to-end path is complete and active (optional)
 
         :return: The matching CablePath (if any)
         """
@@ -48,12 +48,29 @@ class CablePathTestCase(TestCase):
                 path.append([object_to_path_node(node) for node in step])
             else:
                 path.append([object_to_path_node(step)])
+        return CablePath.objects.filter(path=path, **kwargs).first()
 
-        cablepath = CablePath.objects.filter(path=path, **kwargs).first()
+    def assertPathExists(self, nodes, **kwargs):
+        """
+        Assert that a CablePath from origin to destination with a specific intermediate path exists. Returns the
+        first matching CablePath, if found.
+
+        :param nodes: Iterable of steps, with each step being either a single node or a list of nodes
+        """
+        cablepath = self._get_cablepath(nodes, **kwargs)
         self.assertIsNotNone(cablepath, msg='CablePath not found')
 
         return cablepath
 
+    def assertPathDoesNotExist(self, nodes, **kwargs):
+        """
+        Assert that a specific CablePath does *not* exist.
+
+        :param nodes: Iterable of steps, with each step being either a single node or a list of nodes
+        """
+        cablepath = self._get_cablepath(nodes, **kwargs)
+        self.assertIsNone(cablepath, msg='Unexpected CablePath found')
+
     def assertPathIsSet(self, origin, cablepath, msg=None):
         """
         Assert that a specific CablePath instance is set as the path on the origin.
@@ -1695,6 +1712,291 @@ class CablePathTestCase(TestCase):
         self.assertPathIsSet(interface3, path3)
         self.assertPathIsSet(interface4, path4)
 
+    def test_219_interface_to_interface_duplex_via_multiple_rearports(self):
+        """
+        [IF1] --C1-- [FP1] [RP1] --C2-- [RP2] [FP2] --C3-- [IF2]
+                     [FP3] [RP3] --C4-- [RP4] [FP4]
+        """
+        interface1 = Interface.objects.create(device=self.device, name='Interface 1')
+        interface2 = Interface.objects.create(device=self.device, name='Interface 2')
+        rearport1 = RearPort.objects.create(device=self.device, name='Rear Port 1', positions=1)
+        rearport2 = RearPort.objects.create(device=self.device, name='Rear Port 2', positions=1)
+        rearport3 = RearPort.objects.create(device=self.device, name='Rear Port 3', positions=1)
+        rearport4 = RearPort.objects.create(device=self.device, name='Rear Port 4', positions=1)
+        frontport1 = FrontPort.objects.create(
+            device=self.device, name='Front Port 1', rear_port=rearport1, rear_port_position=1
+        )
+        frontport2 = FrontPort.objects.create(
+            device=self.device, name='Front Port 2', rear_port=rearport2, rear_port_position=1
+        )
+        frontport3 = FrontPort.objects.create(
+            device=self.device, name='Front Port 3', rear_port=rearport3, rear_port_position=1
+        )
+        frontport4 = FrontPort.objects.create(
+            device=self.device, name='Front Port 4', rear_port=rearport4, rear_port_position=1
+        )
+
+        cable2 = Cable(
+            a_terminations=[rearport1],
+            b_terminations=[rearport2]
+        )
+        cable2.save()
+        cable4 = Cable(
+            a_terminations=[rearport3],
+            b_terminations=[rearport4]
+        )
+        cable4.save()
+        self.assertEqual(CablePath.objects.count(), 0)
+
+        # Create cable1
+        cable1 = Cable(
+            a_terminations=[interface1],
+            b_terminations=[frontport1, frontport3]
+        )
+        cable1.save()
+        self.assertPathExists(
+            (interface1, cable1, (frontport1, frontport3), (rearport1, rearport3), (cable2, cable4), (rearport2, rearport4), (frontport2, frontport4)),
+            is_complete=False
+        )
+        self.assertEqual(CablePath.objects.count(), 1)
+
+        # Create cable 3
+        cable3 = Cable(
+            a_terminations=[frontport2, frontport4],
+            b_terminations=[interface2]
+        )
+        cable3.save()
+        self.assertPathExists(
+            (
+                interface1, cable1, (frontport1, frontport3), (rearport1, rearport3), (cable2, cable4),
+                (rearport2, rearport4), (frontport2, frontport4), cable3, interface2
+            ),
+            is_complete=True,
+            is_active=True
+        )
+        self.assertPathExists(
+            (
+                interface2, cable3, (frontport2, frontport4), (rearport2, rearport4), (cable2, cable4),
+                (rearport1, rearport3), (frontport1, frontport3), cable1, interface1
+            ),
+            is_complete=True,
+            is_active=True
+        )
+        self.assertEqual(CablePath.objects.count(), 2)
+
+    def test_220_interface_to_interface_duplex_via_multiple_front_and_rear_ports(self):
+        """
+        [IF1] --C1-- [FP1] [RP1] --C2-- [RP2] [FP2] --C3-- [IF2]
+        [IF2] --C5-- [FP3] [RP3] --C4-- [RP4] [FP4]
+        """
+        interface1 = Interface.objects.create(device=self.device, name='Interface 1')
+        interface2 = Interface.objects.create(device=self.device, name='Interface 2')
+        interface3 = Interface.objects.create(device=self.device, name='Interface 3')
+        rearport1 = RearPort.objects.create(device=self.device, name='Rear Port 1', positions=1)
+        rearport2 = RearPort.objects.create(device=self.device, name='Rear Port 2', positions=1)
+        rearport3 = RearPort.objects.create(device=self.device, name='Rear Port 3', positions=1)
+        rearport4 = RearPort.objects.create(device=self.device, name='Rear Port 4', positions=1)
+        frontport1 = FrontPort.objects.create(
+            device=self.device, name='Front Port 1', rear_port=rearport1, rear_port_position=1
+        )
+        frontport2 = FrontPort.objects.create(
+            device=self.device, name='Front Port 2', rear_port=rearport2, rear_port_position=1
+        )
+        frontport3 = FrontPort.objects.create(
+            device=self.device, name='Front Port 3', rear_port=rearport3, rear_port_position=1
+        )
+        frontport4 = FrontPort.objects.create(
+            device=self.device, name='Front Port 4', rear_port=rearport4, rear_port_position=1
+        )
+
+        cable2 = Cable(
+            a_terminations=[rearport1],
+            b_terminations=[rearport2]
+        )
+        cable2.save()
+        cable4 = Cable(
+            a_terminations=[rearport3],
+            b_terminations=[rearport4]
+        )
+        cable4.save()
+        self.assertEqual(CablePath.objects.count(), 0)
+
+        # Create cable1
+        cable1 = Cable(
+            a_terminations=[interface1],
+            b_terminations=[frontport1]
+        )
+        cable1.save()
+        self.assertPathExists(
+            (
+                interface1, cable1, frontport1, rearport1, cable2, rearport2, frontport2
+            ),
+            is_complete=False
+        )
+        # Create cable1
+        cable5 = Cable(
+            a_terminations=[interface3],
+            b_terminations=[frontport3]
+        )
+        cable5.save()
+        self.assertPathExists(
+            (
+                interface3, cable5, frontport3, rearport3, cable4, rearport4, frontport4
+            ),
+            is_complete=False
+        )
+        self.assertEqual(CablePath.objects.count(), 2)
+
+        # Create cable 3
+        cable3 = Cable(
+            a_terminations=[frontport2, frontport4],
+            b_terminations=[interface2]
+        )
+        cable3.save()
+        self.assertPathExists(
+            (
+                interface2, cable3, (frontport2, frontport4), (rearport2, rearport4), (cable2, cable4),
+                (rearport1, rearport3), (frontport1, frontport3), (cable1, cable5), (interface1, interface3)
+            ),
+            is_complete=True,
+            is_active=True
+        )
+        self.assertPathExists(
+            (
+                interface1, cable1, frontport1, rearport1, cable2, rearport2, frontport2, cable3, interface2
+            ),
+            is_complete=True,
+            is_active=True
+        )
+        self.assertPathExists(
+            (
+                interface3, cable5, frontport3, rearport3, cable4, rearport4, frontport4, cable3, interface2
+            ),
+            is_complete=True,
+            is_active=True
+        )
+        self.assertEqual(CablePath.objects.count(), 3)
+
+    def test_221_non_symmetric_paths(self):
+        """
+        [IF1] --C1-- [FP1] [RP1] --C2-- [RP2] [FP2] --C3-- -------------------------------------- [IF2]
+        [IF2] --C5-- [FP3] [RP3] --C4-- [RP4] [FP4] --C6-- [FP5] [RP5] --C7-- [RP6] [FP6] --C3---/
+        """
+        interface1 = Interface.objects.create(device=self.device, name='Interface 1')
+        interface2 = Interface.objects.create(device=self.device, name='Interface 2')
+        interface3 = Interface.objects.create(device=self.device, name='Interface 3')
+        rearport1 = RearPort.objects.create(device=self.device, name='Rear Port 1', positions=1)
+        rearport2 = RearPort.objects.create(device=self.device, name='Rear Port 2', positions=1)
+        rearport3 = RearPort.objects.create(device=self.device, name='Rear Port 3', positions=1)
+        rearport4 = RearPort.objects.create(device=self.device, name='Rear Port 4', positions=1)
+        rearport5 = RearPort.objects.create(device=self.device, name='Rear Port 5', positions=1)
+        rearport6 = RearPort.objects.create(device=self.device, name='Rear Port 6', positions=1)
+        frontport1 = FrontPort.objects.create(
+            device=self.device, name='Front Port 1', rear_port=rearport1, rear_port_position=1
+        )
+        frontport2 = FrontPort.objects.create(
+            device=self.device, name='Front Port 2', rear_port=rearport2, rear_port_position=1
+        )
+        frontport3 = FrontPort.objects.create(
+            device=self.device, name='Front Port 3', rear_port=rearport3, rear_port_position=1
+        )
+        frontport4 = FrontPort.objects.create(
+            device=self.device, name='Front Port 4', rear_port=rearport4, rear_port_position=1
+        )
+        frontport5 = FrontPort.objects.create(
+            device=self.device, name='Front Port 5', rear_port=rearport5, rear_port_position=1
+        )
+        frontport6 = FrontPort.objects.create(
+            device=self.device, name='Front Port 6', rear_port=rearport6, rear_port_position=1
+        )
+
+        cable2 = Cable(
+            a_terminations=[rearport1],
+            b_terminations=[rearport2],
+            label='C2'
+        )
+        cable2.save()
+        cable4 = Cable(
+            a_terminations=[rearport3],
+            b_terminations=[rearport4],
+            label='C4'
+        )
+        cable4.save()
+        cable6 = Cable(
+            a_terminations=[frontport4],
+            b_terminations=[frontport5],
+            label='C6'
+        )
+        cable6.save()
+        cable7 = Cable(
+            a_terminations=[rearport5],
+            b_terminations=[rearport6],
+            label='C7'
+        )
+        cable7.save()
+        self.assertEqual(CablePath.objects.count(), 0)
+
+        # Create cable1
+        cable1 = Cable(
+            a_terminations=[interface1],
+            b_terminations=[frontport1],
+            label='C1'
+        )
+        cable1.save()
+        self.assertPathExists(
+            (
+                interface1, cable1, frontport1, rearport1, cable2, rearport2, frontport2
+            ),
+            is_complete=False
+        )
+        # Create cable1
+        cable5 = Cable(
+            a_terminations=[interface3],
+            b_terminations=[frontport3],
+            label='C5'
+        )
+        cable5.save()
+        self.assertPathExists(
+            (
+                interface3, cable5, frontport3, rearport3, cable4, rearport4, frontport4, cable6, frontport5, rearport5,
+                cable7, rearport6, frontport6
+            ),
+            is_complete=False
+        )
+        self.assertEqual(CablePath.objects.count(), 2)
+
+        # Create cable 3
+        cable3 = Cable(
+            a_terminations=[frontport2, frontport6],
+            b_terminations=[interface2],
+            label='C3'
+        )
+        cable3.save()
+        self.assertPathExists(
+            (
+                interface2, cable3, (frontport2, frontport6), (rearport2, rearport6), (cable2, cable7),
+                (rearport1, rearport5), (frontport1, frontport5), (cable1, cable6)
+            ),
+            is_complete=False,
+            is_split=True
+        )
+        self.assertPathExists(
+            (
+                interface1, cable1, frontport1, rearport1, cable2, rearport2, frontport2, cable3, interface2
+            ),
+            is_complete=True,
+            is_active=True
+        )
+        self.assertPathExists(
+            (
+                interface3, cable5, frontport3, rearport3, cable4, rearport4, frontport4, cable6, frontport5, rearport5,
+                cable7, rearport6, frontport6, cable3, interface2
+            ),
+            is_complete=True,
+            is_active=True
+        )
+        self.assertEqual(CablePath.objects.count(), 3)
+
     def test_301_create_path_via_existing_cable(self):
         """
         [IF1] --C1-- [FP1] [RP1] --C2-- [RP2] [FP2] --C3-- [IF2]
@@ -1845,3 +2147,93 @@ class CablePathTestCase(TestCase):
             is_complete=True,
             is_active=True
         )
+
+    def test_401_exclude_midspan_devices(self):
+        """
+        [IF1] --C1-- [FP1][Test Device][RP1] --C2-- [RP2][Test Device][FP2] --C3-- [IF2]
+                     [FP3][Test mid-span Device][RP3] --C4-- [RP4][Test mid-span Device][FP4] /
+        """
+        device = Device.objects.create(
+            site=self.site,
+            device_type=self.device.device_type,
+            device_role=self.device.device_role,
+            name='Test mid-span Device'
+        )
+        interface1 = Interface.objects.create(device=self.device, name='Interface 1')
+        interface2 = Interface.objects.create(device=self.device, name='Interface 2')
+        rearport1 = RearPort.objects.create(device=self.device, name='Rear Port 1', positions=1)
+        rearport2 = RearPort.objects.create(device=self.device, name='Rear Port 2', positions=1)
+        rearport3 = RearPort.objects.create(device=device, name='Rear Port 3', positions=1)
+        rearport4 = RearPort.objects.create(device=device, name='Rear Port 4', positions=1)
+        frontport1 = FrontPort.objects.create(
+            device=self.device, name='Front Port 1', rear_port=rearport1, rear_port_position=1
+        )
+        frontport2 = FrontPort.objects.create(
+            device=self.device, name='Front Port 2', rear_port=rearport2, rear_port_position=1
+        )
+        frontport3 = FrontPort.objects.create(
+            device=device, name='Front Port 3', rear_port=rearport3, rear_port_position=1
+        )
+        frontport4 = FrontPort.objects.create(
+            device=device, name='Front Port 4', rear_port=rearport4, rear_port_position=1
+        )
+
+        cable2 = Cable(
+            a_terminations=[rearport1],
+            b_terminations=[rearport2],
+            label='C2'
+        )
+        cable2.save()
+        cable4 = Cable(
+            a_terminations=[rearport3],
+            b_terminations=[rearport4],
+            label='C4'
+        )
+        cable4.save()
+        self.assertEqual(CablePath.objects.count(), 0)
+
+        # Create cable1
+        cable1 = Cable(
+            a_terminations=[interface1],
+            b_terminations=[frontport1, frontport3],
+            label='C1'
+        )
+        with self.assertRaises(AssertionError):
+            cable1.save()
+
+        self.assertPathDoesNotExist(
+            (
+                interface1, cable1, (frontport1, frontport3), (rearport1, rearport3), (cable2, cable4),
+                (rearport2, rearport4), (frontport2, frontport4)
+            ),
+            is_complete=False
+        )
+        self.assertEqual(CablePath.objects.count(), 0)
+
+        # Create cable 3
+        cable3 = Cable(
+            a_terminations=[frontport2, frontport4],
+            b_terminations=[interface2],
+            label='C3'
+        )
+
+        with self.assertRaises(AssertionError):
+            cable3.save()
+
+        self.assertPathDoesNotExist(
+            (
+                interface2, cable3, (frontport2, frontport4), (rearport2, rearport4), (cable2, cable4),
+                (rearport1, rearport3), (frontport1, frontport2), cable1, interface1
+            ),
+            is_complete=True,
+            is_active=True
+        )
+        self.assertPathDoesNotExist(
+            (
+                interface1, cable1, (frontport1, frontport3), (rearport1, rearport3), (cable2, cable4),
+                (rearport2, rearport4), (frontport2, frontport4), cable3, interface2
+            ),
+            is_complete=True,
+            is_active=True
+        )
+        self.assertEqual(CablePath.objects.count(), 0)

+ 9 - 1
netbox/templates/dcim/cable_trace.html

@@ -23,7 +23,15 @@
               </div>
             </div>
             <div class="trace-end">
-                {% if path.is_split %}
+                {% if path.is_split and path.get_asymmetric_nodes %}
+                    <h3 class="text-danger">{% trans "Asymmetric Path" %}!</h3>
+                    <p>{% trans "The nodes below have no links and result in an asymmetric path" %}:</p>
+                    <ul class="text-start">
+                      {% for next_node in path.get_asymmetric_nodes %}
+                        <li class="text-muted">{{ next_node|linkify }}</li>
+                      {% endfor %}
+                    </ul>
+                {% elif path.is_split %}
                     <h3 class="text-danger">{% trans "Path split" %}!</h3>
                     <p>{% trans "Select a node below to continue" %}:</p>
                     <ul class="text-start">