|
|
@@ -14,6 +14,7 @@ from dcim.utils import decompile_path_node, flatten_path, object_to_path_node, p
|
|
|
from netbox.models import NetBoxModel
|
|
|
from utilities.fields import ColorField
|
|
|
from utilities.utils import to_meters
|
|
|
+from wireless.models import WirelessLink
|
|
|
from .devices import Device
|
|
|
from .device_components import FrontPort, RearPort
|
|
|
|
|
|
@@ -329,124 +330,129 @@ class CablePath(models.Model):
|
|
|
@classmethod
|
|
|
def from_origin(cls, terminations):
|
|
|
"""
|
|
|
- Create a new CablePath instance as traced from the given path origin.
|
|
|
-
|
|
|
- :param terminations: An iterable of one or more CableTermination objects.
|
|
|
+ Create a new CablePath instance as traced from the given termination objects. These can be any object to which a
|
|
|
+ Cable or WirelessLink connects (interfaces, console ports, circuit termination, etc.). All terminations must be
|
|
|
+ of the same type and must belong to the same parent object.
|
|
|
"""
|
|
|
from circuits.models import CircuitTermination
|
|
|
|
|
|
- if not terminations or terminations[0].termination.link is None:
|
|
|
- return None
|
|
|
-
|
|
|
path = []
|
|
|
position_stack = []
|
|
|
is_complete = False
|
|
|
is_active = True
|
|
|
is_split = False
|
|
|
|
|
|
- # Start building the path from its originating CableTerminations
|
|
|
- path.append([
|
|
|
- object_to_path_node(t.termination) for t in terminations
|
|
|
- ])
|
|
|
+ while terminations:
|
|
|
|
|
|
- node = terminations[0].termination
|
|
|
- while terminations and node.link is not None:
|
|
|
- if hasattr(node.link, 'status') and node.link.status != LinkStatusChoices.STATUS_CONNECTED:
|
|
|
- is_active = False
|
|
|
+ # Terminations must all be of the same type and belong to the same parent
|
|
|
+ assert all(isinstance(t, type(terminations[0])) for t in terminations[1:])
|
|
|
+ assert all(t.parent is terminations[0].parent for t in terminations[1:])
|
|
|
|
|
|
- # Append the cable
|
|
|
- path.append([object_to_path_node(node.link)])
|
|
|
+ # Step 1: Record the near-end termination object(s)
|
|
|
+ path.append([
|
|
|
+ object_to_path_node(t) for t in terminations
|
|
|
+ ])
|
|
|
|
|
|
- # Follow the link to its far-end termination
|
|
|
- if terminations[0].cable_end == 'A':
|
|
|
- peer_terminations = CableTermination.objects.filter(cable=terminations[0].cable, cable_end='B')
|
|
|
+ # Step 2: Determine the attached link (Cable or WirelessLink), if any
|
|
|
+ link = terminations[0].link
|
|
|
+ assert all(t.link is link for t in terminations[1:])
|
|
|
+ if link is None:
|
|
|
+ # No attached link; abort
|
|
|
+ break
|
|
|
+ assert type(link) in (Cable, WirelessLink)
|
|
|
+
|
|
|
+ # Step 3: Record the link and update path status if not "connected"
|
|
|
+ path.append([object_to_path_node(link)])
|
|
|
+ if hasattr(link, 'status') and link.status != LinkStatusChoices.STATUS_CONNECTED:
|
|
|
+ is_active = False
|
|
|
+
|
|
|
+ # Step 4: Determine the far-end terminations
|
|
|
+ if isinstance(link, Cable):
|
|
|
+ termination_type = ContentType.objects.get_for_model(terminations[0])
|
|
|
+ local_cable_terminations = CableTermination.objects.filter(
|
|
|
+ termination_type=termination_type,
|
|
|
+ termination_id__in=[t.pk for t in terminations]
|
|
|
+ )
|
|
|
+ # Terminations must all belong to same end of Cable
|
|
|
+ local_cable_end = local_cable_terminations[0].cable_end
|
|
|
+ assert all(ct.cable_end == local_cable_end for ct in local_cable_terminations[1:])
|
|
|
+ remote_cable_terminations = CableTermination.objects.filter(
|
|
|
+ cable=link,
|
|
|
+ cable_end='A' if local_cable_end == 'B' else 'B'
|
|
|
+ )
|
|
|
+ remote_terminations = [ct.termination for ct in remote_cable_terminations]
|
|
|
else:
|
|
|
- peer_terminations = CableTermination.objects.filter(cable=terminations[0].cable, cable_end='A')
|
|
|
-
|
|
|
- # Follow FrontPorts to their corresponding RearPorts
|
|
|
- if isinstance(peer_terminations[0].termination, FrontPort):
|
|
|
- path.append([
|
|
|
- object_to_path_node(t.termination) for t in peer_terminations
|
|
|
- ])
|
|
|
- terminations = CableTermination.objects.filter(
|
|
|
- termination_type=ContentType.objects.get_for_model(RearPort),
|
|
|
- termination_id__in=[t.termination_id for t in peer_terminations]
|
|
|
+ # WirelessLink
|
|
|
+ remote_terminations = [link.interface_b] if link.interface_a is terminations[0] else [link.interface_a]
|
|
|
+
|
|
|
+ # Step 5: Record the far-end termination object(s)
|
|
|
+ path.append([
|
|
|
+ object_to_path_node(t) for t in remote_terminations
|
|
|
+ ])
|
|
|
+
|
|
|
+ # Step 6: Determine the "next hop" terminations, if applicable
|
|
|
+ if isinstance(remote_terminations[0], FrontPort):
|
|
|
+ # Follow FrontPorts to their corresponding RearPorts
|
|
|
+ rear_ports = RearPort.objects.filter(
|
|
|
+ pk__in=[t.rear_port_id for t in remote_terminations]
|
|
|
)
|
|
|
- rear_ports = RearPort.objects.filter(pk__in=[t.termination.rear_port_id for t in peer_terminations])
|
|
|
- # TODO: We're assuming that each of the front-to-rear mapping use equivalent positions.
|
|
|
- node = rear_ports[0]
|
|
|
- if rear_ports[0].positions > 1:
|
|
|
- position_stack.append(peer_terminations[0].termination.rear_port_position)
|
|
|
- path.append([
|
|
|
- object_to_path_node(rp) for rp in rear_ports
|
|
|
- ])
|
|
|
-
|
|
|
- # Follow RearPorts to their corresponding FrontPorts (if any)
|
|
|
- elif isinstance(peer_terminations[0], RearPort):
|
|
|
- path.append([
|
|
|
- object_to_path_node(t.termination) for t in peer_terminations
|
|
|
- ])
|
|
|
-
|
|
|
- # Determine the peer FrontPort's position
|
|
|
- if peer_terminations[0].termination.positions == 1:
|
|
|
+ # RearPorts must have the same number of positions
|
|
|
+ rp_position_count = rear_ports[0].positions
|
|
|
+ assert all(rp.positions == rp_position_count for rp in terminations[1:])
|
|
|
+ # Push position to stack if >1
|
|
|
+ if rp_position_count > 1:
|
|
|
+ position_stack.append(remote_terminations[0].rear_port_position)
|
|
|
+
|
|
|
+ terminations = rear_ports
|
|
|
+
|
|
|
+ elif isinstance(remote_terminations[0], RearPort):
|
|
|
+ # If the RearPort has multiple positions, pop the current position from the stack
|
|
|
+ rp_position_count = remote_terminations[0].positions
|
|
|
+ assert all(rp.positions == rp_position_count for rp in remote_terminations[1:])
|
|
|
+ if rp_position_count == 1:
|
|
|
position = 1
|
|
|
elif position_stack:
|
|
|
position = position_stack.pop()
|
|
|
else:
|
|
|
- # No position indicated: path has split, so we stop at the RearPort
|
|
|
+ # No position indicated: path has split, so we stop at the RearPorts
|
|
|
is_split = True
|
|
|
break
|
|
|
|
|
|
- # Map FrontPorts to their corresponding RearPorts
|
|
|
+ # Follow RearPorts to their corresponding FrontPorts (if any)
|
|
|
front_ports = FrontPort.objects.filter(
|
|
|
- rear_port_id__in=[t.rear_port_id for t in peer_terminations],
|
|
|
+ rear_port_id__in=[t.pk for t in remote_terminations],
|
|
|
rear_port_position=position
|
|
|
)
|
|
|
- terminations = CableTermination.objects.filter(
|
|
|
- termination_type=ContentType.objects.get_for_model(FrontPort),
|
|
|
- termination_id__in=[fp.pk for fp in front_ports]
|
|
|
- )
|
|
|
- if terminations:
|
|
|
+
|
|
|
+ terminations = front_ports
|
|
|
+
|
|
|
+ elif isinstance(remote_terminations[0], CircuitTermination):
|
|
|
+ # Follow a CircuitTermination to its corresponding CircuitTermination (A to Z or vice versa)
|
|
|
+ term_side = remote_terminations[0].term_side
|
|
|
+ assert all(ct.term_side == term_side for ct in remote_terminations[1:])
|
|
|
+ circuit_termination = CircuitTermination.objects.filter(
|
|
|
+ circuit=remote_terminations[0].circuit,
|
|
|
+ term_side='Z' if term_side == 'A' else 'Z'
|
|
|
+ ).first()
|
|
|
+ if circuit_termination is None:
|
|
|
+ break
|
|
|
+ elif circuit_termination.provider_network:
|
|
|
+ # Circuit terminates to a ProviderNetwork
|
|
|
path.append([
|
|
|
- object_to_path_node(t.termination) for t in terminations
|
|
|
+ object_to_path_node(circuit_termination.provider_network)
|
|
|
])
|
|
|
-
|
|
|
- # Follow a CircuitTermination to its corresponding CircuitTermination (A to Z or vice versa)
|
|
|
- elif isinstance(peer_terminations[0], CircuitTermination):
|
|
|
- path.append([
|
|
|
- object_to_path_node(t.termination) for t in peer_terminations
|
|
|
- ])
|
|
|
-
|
|
|
- # Get peer CircuitTerminations
|
|
|
- term_side = 'Z' if peer_terminations[0].termination == 'A' else 'Z'
|
|
|
- terminations = CircuitTermination.objects.filter(
|
|
|
- circuit=peer_terminations[0].circuit,
|
|
|
- term_side=term_side
|
|
|
- )
|
|
|
- # Tracing across multiple circuits not currently supported
|
|
|
- if len(terminations) > 1:
|
|
|
- is_split = True
|
|
|
break
|
|
|
- elif terminations:
|
|
|
+ elif circuit_termination.site and not circuit_termination.cable:
|
|
|
+ # Circuit terminates to a Site
|
|
|
path.append([
|
|
|
- object_to_path_node(t.termination) for t in terminations
|
|
|
+ object_to_path_node(circuit_termination.site)
|
|
|
])
|
|
|
- # TODO
|
|
|
- # if node.provider_network:
|
|
|
- # destination = node.provider_network
|
|
|
- # break
|
|
|
- # elif node.site and not node.cable:
|
|
|
- # destination = node.site
|
|
|
- # break
|
|
|
- else:
|
|
|
- # No peer CircuitTermination exists; halt the trace
|
|
|
break
|
|
|
|
|
|
+ terminations = [circuit_termination]
|
|
|
+
|
|
|
# Anything else marks the end of the path
|
|
|
else:
|
|
|
- path.append([
|
|
|
- object_to_path_node(t.termination) for t in peer_terminations
|
|
|
- ])
|
|
|
is_complete = True
|
|
|
break
|
|
|
|