| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978 |
- import itertools
- from django.contrib.contenttypes.fields import GenericForeignKey
- from django.contrib.contenttypes.models import ContentType
- from django.core.exceptions import ValidationError
- from django.core.validators import MaxValueValidator, MinValueValidator
- from django.db import models
- from django.dispatch import Signal
- from django.utils.translation import gettext_lazy as _
- from core.models import ObjectType
- from dcim.choices import *
- from dcim.constants import *
- from dcim.exceptions import UnsupportedCablePath
- from dcim.fields import PathField
- from dcim.utils import decompile_path_node, object_to_path_node
- from netbox.choices import ColorChoices
- from netbox.models import ChangeLoggedModel, PrimaryModel
- from utilities.conversion import to_meters
- from utilities.exceptions import AbortRequest
- from utilities.fields import ColorField, GenericArrayForeignKey
- from utilities.querysets import RestrictedQuerySet
- from utilities.serialization import deserialize_object, serialize_object
- from wireless.models import WirelessLink
- from .device_components import FrontPort, PathEndpoint, RearPort
- __all__ = (
- 'Cable',
- 'CablePath',
- 'CableTermination',
- )
- trace_paths = Signal()
- #
- # Cables
- #
- class Cable(PrimaryModel):
- """
- A physical connection between two endpoints.
- """
- type = models.CharField(
- verbose_name=_('type'),
- max_length=50,
- choices=CableTypeChoices,
- blank=True,
- null=True
- )
- status = models.CharField(
- verbose_name=_('status'),
- max_length=50,
- choices=LinkStatusChoices,
- default=LinkStatusChoices.STATUS_CONNECTED
- )
- profile = models.CharField(
- verbose_name=_('profile'),
- max_length=50,
- choices=CableProfileChoices,
- blank=True,
- )
- tenant = models.ForeignKey(
- to='tenancy.Tenant',
- on_delete=models.PROTECT,
- related_name='cables',
- blank=True,
- null=True
- )
- label = models.CharField(
- verbose_name=_('label'),
- max_length=100,
- blank=True
- )
- color = ColorField(
- verbose_name=_('color'),
- blank=True
- )
- length = models.DecimalField(
- verbose_name=_('length'),
- max_digits=8,
- decimal_places=2,
- blank=True,
- null=True
- )
- length_unit = models.CharField(
- verbose_name=_('length unit'),
- max_length=50,
- choices=CableLengthUnitChoices,
- blank=True,
- null=True
- )
- # Stores the normalized length (in meters) for database ordering
- _abs_length = models.DecimalField(
- max_digits=10,
- decimal_places=4,
- blank=True,
- null=True
- )
- clone_fields = ('tenant', 'type', 'profile')
- class Meta:
- ordering = ('pk',)
- verbose_name = _('cable')
- verbose_name_plural = _('cables')
- def __init__(self, *args, a_terminations=None, b_terminations=None, **kwargs):
- super().__init__(*args, **kwargs)
- # A copy of the PK to be used by __str__ in case the object is deleted
- self._pk = self.__dict__.get('id')
- # Cache the original status so we can check later if it's been changed
- self._orig_status = self.__dict__.get('status')
- self._terminations_modified = False
- # Assign or retrieve A/B terminations
- if a_terminations:
- self.a_terminations = a_terminations
- if b_terminations:
- self.b_terminations = b_terminations
- def __str__(self):
- pk = self.pk or self._pk
- return self.label or f'#{pk}'
- def get_status_color(self):
- return LinkStatusChoices.colors.get(self.status)
- @property
- def profile_class(self):
- from dcim import cable_profiles
- return {
- CableProfileChoices.STRAIGHT_SINGLE: cable_profiles.StraightSingleCableProfile,
- CableProfileChoices.STRAIGHT_MULTI: cable_profiles.StraightMultiCableProfile,
- CableProfileChoices.SHUFFLE_2X2_MPO8: cable_profiles.Shuffle2x2MPO8CableProfile,
- CableProfileChoices.SHUFFLE_4X4_MPO8: cable_profiles.Shuffle4x4MPO8CableProfile,
- }.get(self.profile)
- def _get_x_terminations(self, side):
- """
- Return the terminating objects for the given cable end (A or B).
- """
- if side not in (CableEndChoices.SIDE_A, CableEndChoices.SIDE_B):
- raise ValueError(f"Unknown cable side: {side}")
- attr = f'_{side.lower()}_terminations'
- if hasattr(self, attr):
- return getattr(self, attr)
- if not self.pk:
- return []
- return [
- # Query self.terminations.all() to leverage cached results
- ct.termination for ct in self.terminations.all() if ct.cable_end == side
- ]
- def _set_x_terminations(self, side, value):
- """
- Set the terminating objects for the given cable end (A or B).
- """
- if side not in (CableEndChoices.SIDE_A, CableEndChoices.SIDE_B):
- raise ValueError(f"Unknown cable side: {side}")
- _attr = f'_{side.lower()}_terminations'
- # If the provided value is a list of CableTermination IDs, resolve them
- # to their corresponding termination objects.
- if all(isinstance(item, int) for item in value):
- value = [
- ct.termination for ct in CableTermination.objects.filter(pk__in=value).prefetch_related('termination')
- ]
- if not self.pk or getattr(self, _attr, []) != list(value):
- self._terminations_modified = True
- setattr(self, _attr, value)
- @property
- def a_terminations(self):
- return self._get_x_terminations(CableEndChoices.SIDE_A)
- @a_terminations.setter
- def a_terminations(self, value):
- self._set_x_terminations(CableEndChoices.SIDE_A, value)
- @property
- def b_terminations(self):
- return self._get_x_terminations(CableEndChoices.SIDE_B)
- @b_terminations.setter
- def b_terminations(self, value):
- self._set_x_terminations(CableEndChoices.SIDE_B, value)
- @property
- def color_name(self):
- color_name = ""
- for hex_code, label in ColorChoices.CHOICES:
- if hex_code.lower() == self.color.lower():
- color_name = str(label)
- return color_name
- def clean(self):
- super().clean()
- # Validate length and length_unit
- if self.length is not None and not self.length_unit:
- raise ValidationError(_("Must specify a unit when setting a cable length"))
- if self._state.adding and self.pk is None and (not self.a_terminations or not self.b_terminations):
- raise ValidationError(_("Must define A and B terminations when creating a new cable."))
- # Validate terminations against the assigned cable profile (if any)
- if self.profile:
- self.profile_class().clean(self)
- if self._terminations_modified:
- # Check that all termination objects for either end are of the same type
- for terms in (self.a_terminations, self.b_terminations):
- if len(terms) > 1 and not all(isinstance(t, type(terms[0])) for t in terms[1:]):
- raise ValidationError(_("Cannot connect different termination types to same end of cable."))
- # Check that termination types are compatible
- if self.a_terminations and self.b_terminations:
- a_type = self.a_terminations[0]._meta.model_name
- b_type = self.b_terminations[0]._meta.model_name
- if b_type not in COMPATIBLE_TERMINATION_TYPES.get(a_type):
- raise ValidationError(
- _("Incompatible termination types: {type_a} and {type_b}").format(type_a=a_type, type_b=b_type)
- )
- if a_type == b_type:
- # can't directly use self.a_terminations here as possible they
- # don't have pk yet
- a_pks = set(obj.pk for obj in self.a_terminations if obj.pk)
- b_pks = set(obj.pk for obj in self.b_terminations if obj.pk)
- if (a_pks & b_pks):
- raise ValidationError(
- _("A and B terminations cannot connect to the same object.")
- )
- # Run clean() on any new CableTerminations
- for termination in self.a_terminations:
- CableTermination(cable=self, cable_end='A', termination=termination).clean()
- for termination in self.b_terminations:
- CableTermination(cable=self, cable_end='B', termination=termination).clean()
- def save(self, *args, force_insert=False, force_update=False, using=None, update_fields=None):
- _created = self.pk is None
- # Store the given length (if any) in meters for use in database ordering
- if self.length is not None and self.length_unit:
- self._abs_length = to_meters(self.length, self.length_unit)
- else:
- self._abs_length = None
- # Clear length_unit if no length is defined
- if self.length is None:
- self.length_unit = None
- # If this is a new Cable, save it before attempting to create its CableTerminations
- if self._state.adding:
- super().save(*args, force_insert=True, using=using, update_fields=update_fields)
- # Update the private PK used in __str__()
- self._pk = self.pk
- if self._terminations_modified:
- self.update_terminations()
- super().save(*args, force_update=True, using=using, update_fields=update_fields)
- try:
- trace_paths.send(Cable, instance=self, created=_created)
- except UnsupportedCablePath as e:
- raise AbortRequest(e)
- def serialize_object(self, exclude=None):
- data = serialize_object(self, exclude=exclude or [])
- # Add A & B terminations to the serialized data
- a_terminations, b_terminations = self.get_terminations()
- data['a_terminations'] = sorted([ct.pk for ct in a_terminations.values()])
- data['b_terminations'] = sorted([ct.pk for ct in b_terminations.values()])
- return data
- @classmethod
- def deserialize_object(cls, data, pk=None):
- a_terminations = data.pop('a_terminations', [])
- b_terminations = data.pop('b_terminations', [])
- instance = deserialize_object(cls, data, pk=pk)
- # Assign A & B termination objects to the Cable instance
- queryset = CableTermination.objects.prefetch_related('termination')
- instance.a_terminations = [
- ct.termination for ct in queryset.filter(pk__in=a_terminations)
- ]
- instance.b_terminations = [
- ct.termination for ct in queryset.filter(pk__in=b_terminations)
- ]
- return instance
- def get_terminations(self):
- """
- Return two dictionaries mapping A & B side terminating objects to their corresponding CableTerminations
- for this Cable.
- """
- a_terminations = {}
- b_terminations = {}
- for ct in CableTermination.objects.filter(cable=self).prefetch_related('termination'):
- if ct.cable_end == CableEndChoices.SIDE_A:
- a_terminations[ct.termination] = ct
- else:
- b_terminations[ct.termination] = ct
- return a_terminations, b_terminations
- def update_terminations(self):
- """
- Create/delete CableTerminations for this Cable to reflect its current state.
- """
- a_terminations, b_terminations = self.get_terminations()
- # Delete any stale CableTerminations
- for termination, ct in a_terminations.items():
- if termination.pk and termination not in self.a_terminations:
- ct.delete()
- for termination, ct in b_terminations.items():
- if termination.pk and termination not in self.b_terminations:
- ct.delete()
- # Save any new CableTerminations
- for i, termination in enumerate(self.a_terminations, start=1):
- if not termination.pk or termination not in a_terminations:
- position = i if self.profile and isinstance(termination, PathEndpoint) else None
- CableTermination(cable=self, cable_end='A', position=position, termination=termination).save()
- for i, termination in enumerate(self.b_terminations, start=1):
- if not termination.pk or termination not in b_terminations:
- position = i if self.profile and isinstance(termination, PathEndpoint) else None
- CableTermination(cable=self, cable_end='B', position=position, termination=termination).save()
- class CableTermination(ChangeLoggedModel):
- """
- A mapping between side A or B of a Cable and a terminating object (e.g. an Interface or CircuitTermination).
- """
- cable = models.ForeignKey(
- to='dcim.Cable',
- on_delete=models.CASCADE,
- related_name='terminations'
- )
- cable_end = models.CharField(
- max_length=1,
- choices=CableEndChoices,
- verbose_name=_('end')
- )
- termination_type = models.ForeignKey(
- to='contenttypes.ContentType',
- on_delete=models.PROTECT,
- related_name='+'
- )
- termination_id = models.PositiveBigIntegerField()
- termination = GenericForeignKey(
- ct_field='termination_type',
- fk_field='termination_id'
- )
- position = models.PositiveIntegerField(
- blank=True,
- null=True,
- validators=(
- MinValueValidator(CABLE_POSITION_MIN),
- MaxValueValidator(CABLE_POSITION_MAX)
- )
- )
- # Cached associations to enable efficient filtering
- _device = models.ForeignKey(
- to='dcim.Device',
- on_delete=models.CASCADE,
- blank=True,
- null=True
- )
- _rack = models.ForeignKey(
- to='dcim.Rack',
- on_delete=models.CASCADE,
- blank=True,
- null=True
- )
- _location = models.ForeignKey(
- to='dcim.Location',
- on_delete=models.CASCADE,
- blank=True,
- null=True
- )
- _site = models.ForeignKey(
- to='dcim.Site',
- on_delete=models.CASCADE,
- blank=True,
- null=True
- )
- objects = RestrictedQuerySet.as_manager()
- class Meta:
- ordering = ('cable', 'cable_end', 'position', 'pk')
- constraints = (
- models.UniqueConstraint(
- fields=('termination_type', 'termination_id'),
- name='%(app_label)s_%(class)s_unique_termination'
- ),
- models.UniqueConstraint(
- fields=('cable', 'cable_end', 'position'),
- name='%(app_label)s_%(class)s_unique_position'
- ),
- )
- verbose_name = _('cable termination')
- verbose_name_plural = _('cable terminations')
- def __str__(self):
- return f'Cable {self.cable} to {self.termination}'
- def clean(self):
- super().clean()
- # Disallow connecting a cable to any termination object that is
- # explicitly flagged as "mark connected".
- termination = getattr(self, 'termination', None)
- if termination is not None and getattr(termination, "mark_connected", False):
- raise ValidationError(
- _("Cannot connect a cable to {obj_parent} > {obj} because it is marked as connected.").format(
- obj_parent=termination.parent_object,
- obj=termination,
- )
- )
- # Check for existing termination
- qs = CableTermination.objects.filter(
- termination_type=self.termination_type,
- termination_id=self.termination_id
- )
- if self.cable.pk:
- qs = qs.exclude(cable=self.cable)
- existing_termination = qs.first()
- if existing_termination is not None:
- raise ValidationError(
- _("Duplicate termination found for {app_label}.{model} {termination_id}: cable {cable_pk}").format(
- app_label=self.termination_type.app_label,
- model=self.termination_type.model,
- termination_id=self.termination_id,
- cable_pk=existing_termination.cable.pk
- )
- )
- # Validate the interface type (if applicable)
- if self.termination_type.model == 'interface' and self.termination.type in NONCONNECTABLE_IFACE_TYPES:
- raise ValidationError(
- _("Cables cannot be terminated to {type_display} interfaces").format(
- type_display=self.termination.get_type_display()
- )
- )
- # A CircuitTermination attached to a ProviderNetwork cannot have a Cable
- if self.termination_type.model == 'circuittermination' and self.termination._provider_network is not None:
- raise ValidationError(_("Circuit terminations attached to a provider network may not be cabled."))
- def save(self, *args, **kwargs):
- # Cache objects associated with the terminating object (for filtering)
- self.cache_related_objects()
- super().save(*args, **kwargs)
- # Set the cable on the terminating object
- termination = self.termination._meta.model.objects.get(pk=self.termination_id)
- termination.snapshot()
- termination.cable = self.cable
- termination.cable_end = self.cable_end
- termination.cable_position = self.position
- termination.save()
- def delete(self, *args, **kwargs):
- # Delete the cable association on the terminating object
- termination = self.termination._meta.model.objects.get(pk=self.termination_id)
- termination.snapshot()
- termination.cable = None
- termination.cable_end = None
- termination.cable_position = None
- termination.save()
- super().delete(*args, **kwargs)
- def cache_related_objects(self):
- """
- Cache objects related to the termination (e.g. device, rack, site) directly on the object to
- enable efficient filtering.
- """
- assert self.termination is not None
- # Device components
- if getattr(self.termination, 'device', None):
- self._device = self.termination.device
- self._rack = self.termination.device.rack
- self._location = self.termination.device.location
- self._site = self.termination.device.site
- # Power feeds
- elif getattr(self.termination, 'rack', None):
- self._rack = self.termination.rack
- self._location = self.termination.rack.location
- self._site = self.termination.rack.site
- # Circuit terminations
- elif getattr(self.termination, 'site', None):
- self._site = self.termination.site
- cache_related_objects.alters_data = True
- def to_objectchange(self, action):
- objectchange = super().to_objectchange(action)
- objectchange.related_object = self.termination
- return objectchange
- class CablePath(models.Model):
- """
- A CablePath instance represents the physical path from a set of origin nodes to a set of destination nodes,
- including all intermediate elements.
- `path` contains the ordered set of nodes, arranged in lists of (type, ID) tuples. (Each cable in the path can
- terminate to one or more objects.) For example, consider the following
- topology:
- A B C
- Interface 1 --- Front Port 1 | Rear Port 1 --- Rear Port 2 | Front Port 3 --- Interface 2
- Front Port 2 Front Port 4
- This path would be expressed as:
- CablePath(
- path = [
- [Interface 1],
- [Cable A],
- [Front Port 1, Front Port 2],
- [Rear Port 1],
- [Cable B],
- [Rear Port 2],
- [Front Port 3, Front Port 4],
- [Cable C],
- [Interface 2],
- ]
- )
- `is_active` is set to True only if every Cable within the path has a status of "connected". `is_complete` is True
- if the instance represents a complete end-to-end path from origin(s) to destination(s). `is_split` is True if the
- path diverges across multiple cables.
- `_nodes` retains a flattened list of all nodes within the path to enable simple filtering.
- """
- path = models.JSONField(
- verbose_name=_('path'),
- default=list
- )
- is_active = models.BooleanField(
- verbose_name=_('is active'),
- default=False
- )
- is_complete = models.BooleanField(
- verbose_name=_('is complete'),
- default=False
- )
- is_split = models.BooleanField(
- verbose_name=_('is split'),
- default=False
- )
- _nodes = PathField()
- _netbox_private = True
- class Meta:
- verbose_name = _('cable path')
- verbose_name_plural = _('cable paths')
- def __str__(self):
- return f"Path #{self.pk}: {len(self.path)} hops"
- def save(self, *args, **kwargs):
- # Save the flattened nodes list
- self._nodes = list(itertools.chain(*self.path))
- super().save(*args, **kwargs)
- # Record a direct reference to this CablePath on its originating object(s)
- origin_model = self.origin_type.model_class()
- origin_ids = [decompile_path_node(node)[1] for node in self.path[0]]
- origin_model.objects.filter(pk__in=origin_ids).update(_path=self.pk)
- @property
- def origin_type(self):
- if self.path:
- ct_id, _ = decompile_path_node(self.path[0][0])
- return ContentType.objects.get_for_id(ct_id)
- @property
- def destination_type(self):
- if self.is_complete:
- ct_id, _ = decompile_path_node(self.path[-1][0])
- return ContentType.objects.get_for_id(ct_id)
- @property
- def _path_decompiled(self):
- res = []
- for step in self.path:
- nodes = []
- for node in step:
- nodes.append(decompile_path_node(node))
- res.append(nodes)
- return res
- path_objects = GenericArrayForeignKey("_path_decompiled")
- @property
- def origins(self):
- """
- Return the list of originating objects.
- """
- return self.path_objects[0]
- @property
- def destinations(self):
- """
- Return the list of destination objects, if the path is complete.
- """
- if not self.is_complete:
- return []
- return self.path_objects[-1]
- @property
- def segment_count(self):
- return int(len(self.path) / 3)
- @classmethod
- def from_origin(cls, terminations):
- """
- Create a new CablePath instance as traced from the given termination objects. These can be any object to which a
- Cable or WirelessLink connects (interfaces, console ports, circuit termination, etc.). All terminations must be
- of the same type and must belong to the same parent object.
- """
- from circuits.models import CircuitTermination, Circuit
- if not terminations:
- return None
- # Ensure all originating terminations are attached to the same link
- if len(terminations) > 1 and not all(t.link == terminations[0].link for t in terminations[1:]):
- raise UnsupportedCablePath(_("All originating terminations must be attached to the same link"))
- path = []
- position_stack = []
- is_complete = False
- is_active = True
- is_split = False
- while terminations:
- # Terminations must all be of the same type
- if not all(isinstance(t, type(terminations[0])) for t in terminations[1:]):
- raise UnsupportedCablePath(_("All mid-span terminations must have the same termination type"))
- # All mid-span terminations must all be attached to the same device
- if (
- not isinstance(terminations[0], PathEndpoint) and
- not isinstance(terminations[0].parent_object, Circuit) and
- not all(t.parent_object == terminations[0].parent_object for t in terminations[1:])
- ):
- raise UnsupportedCablePath(_("All mid-span terminations must have the same parent object"))
- # Check for a split path (e.g. rear port fanning out to multiple front ports with
- # different cables attached)
- if len(set(t.link for t in terminations)) > 1 and (
- position_stack and len(terminations) != len(position_stack[-1])
- ):
- is_split = True
- break
- # Step 1: Record the near-end termination object(s)
- path.append([
- object_to_path_node(t) for t in terminations
- ])
- # If not null, push cable_position onto the stack
- if terminations[0].cable_position is not None:
- position_stack.append([terminations[0].cable_position])
- # Step 2: Determine the attached links (Cable or WirelessLink), if any
- links = [termination.link for termination in terminations if termination.link is not None]
- if len(links) == 0:
- if len(path) == 1:
- # If this is the start of the path and no link exists, return None
- return None
- # Otherwise, halt the trace if no link exists
- break
- if not all(type(link) in (Cable, WirelessLink) for link in links):
- raise UnsupportedCablePath(_("All links must be cable or wireless"))
- if not all(isinstance(link, type(links[0])) for link in links):
- raise UnsupportedCablePath(_("All links must match first link type"))
- # Step 3: Record asymmetric paths as split
- not_connected_terminations = [termination.link for termination in terminations if termination.link is None]
- if len(not_connected_terminations) > 0:
- is_complete = False
- is_split = True
- # Step 4: Record the links, keeping cables in order to allow for SVG rendering
- cables = []
- for link in links:
- if object_to_path_node(link) not in cables:
- cables.append(object_to_path_node(link))
- path.append(cables)
- # Step 5: Update the path status if a link is not connected
- links_status = [link.status for link in links if link.status != LinkStatusChoices.STATUS_CONNECTED]
- if any([status != LinkStatusChoices.STATUS_CONNECTED for status in links_status]):
- is_active = False
- # Step 6: Determine the far-end terminations
- if isinstance(links[0], Cable):
- # Profile-based tracing
- if links[0].profile:
- cable_profile = links[0].profile_class()
- peer_cable_terminations = cable_profile.get_peer_terminations(terminations, position_stack)
- remote_terminations = [ct.termination for ct in peer_cable_terminations]
- # Legacy (positionless) behavior
- else:
- termination_type = ObjectType.objects.get_for_model(terminations[0])
- local_cable_terminations = CableTermination.objects.filter(
- termination_type=termination_type,
- termination_id__in=[t.pk for t in terminations]
- )
- q_filter = Q()
- for lct in local_cable_terminations:
- cable_end = 'A' if lct.cable_end == 'B' else 'B'
- q_filter |= Q(cable=lct.cable, cable_end=cable_end)
- # Make sure this filter has been populated; if not, we have probably been given invalid data
- if not q_filter:
- break
- remote_cable_terminations = CableTermination.objects.filter(q_filter)
- remote_terminations = [ct.termination for ct in remote_cable_terminations]
- else:
- # WirelessLink
- remote_terminations = [
- link.interface_b if link.interface_a is terminations[0] else link.interface_a for link in links
- ]
- # Remote Terminations must all be of the same type, otherwise return a split path
- if not all(isinstance(t, type(remote_terminations[0])) for t in remote_terminations[1:]):
- is_complete = False
- is_split = True
- break
- # Step 7: Record the far-end termination object(s)
- path.append([
- object_to_path_node(t) for t in remote_terminations if t is not None
- ])
- # Step 8: Determine the "next hop" terminations, if applicable
- if not remote_terminations:
- break
- if isinstance(remote_terminations[0], FrontPort):
- # Follow FrontPorts to their corresponding RearPorts
- rear_ports = RearPort.objects.filter(
- pk__in=[t.rear_port_id for t in remote_terminations]
- )
- if len(rear_ports) > 1 or rear_ports[0].positions > 1:
- position_stack.append([fp.rear_port_position for fp in remote_terminations])
- terminations = rear_ports
- elif isinstance(remote_terminations[0], RearPort):
- if len(remote_terminations) == 1 and remote_terminations[0].positions == 1:
- front_ports = FrontPort.objects.filter(
- rear_port_id__in=[rp.pk for rp in remote_terminations],
- rear_port_position=1
- )
- # Obtain the individual front ports based on the termination and all positions
- elif len(remote_terminations) > 1 and position_stack:
- positions = position_stack.pop()
- # Ensure we have a number of positions equal to the amount of remote terminations
- if len(remote_terminations) != len(positions):
- raise UnsupportedCablePath(
- _("All positions counts within the path on opposite ends of links must match")
- )
- # Get our front ports
- q_filter = Q()
- for rt in remote_terminations:
- position = positions.pop()
- q_filter |= Q(rear_port_id=rt.pk, rear_port_position=position)
- if q_filter is Q():
- raise UnsupportedCablePath(_("Remote termination position filter is missing"))
- front_ports = FrontPort.objects.filter(q_filter)
- # Obtain the individual front ports based on the termination and position
- elif position_stack:
- front_ports = FrontPort.objects.filter(
- rear_port_id=remote_terminations[0].pk,
- rear_port_position__in=position_stack.pop()
- )
- # If all rear ports have a single position, we can just get the front ports
- elif all([rp.positions == 1 for rp in remote_terminations]):
- front_ports = FrontPort.objects.filter(rear_port_id__in=[rp.pk for rp in remote_terminations])
- if len(front_ports) != len(remote_terminations):
- # Some rear ports does not have a front port
- is_split = True
- break
- else:
- # No position indicated: path has split, so we stop at the RearPorts
- is_split = True
- break
- terminations = front_ports
- elif isinstance(remote_terminations[0], CircuitTermination):
- # Follow a CircuitTermination to its corresponding CircuitTermination (A to Z or vice versa)
- qs = Q()
- for remote_termination in remote_terminations:
- qs |= Q(
- circuit=remote_termination.circuit,
- term_side='Z' if remote_termination.term_side == 'A' else 'A'
- )
- # Get all circuit terminations
- circuit_terminations = CircuitTermination.objects.filter(qs)
- if not circuit_terminations.exists():
- break
- elif all([ct._provider_network for ct in circuit_terminations]):
- # Circuit terminates to a ProviderNetwork
- path.extend([
- [object_to_path_node(ct) for ct in circuit_terminations],
- [object_to_path_node(ct._provider_network) for ct in circuit_terminations],
- ])
- is_complete = True
- break
- elif all([ct.termination and not ct.cable for ct in circuit_terminations]):
- # Circuit terminates to a Region/Site/etc.
- path.extend([
- [object_to_path_node(ct) for ct in circuit_terminations],
- [object_to_path_node(ct.termination) for ct in circuit_terminations],
- ])
- break
- elif any([ct.cable in links for ct in circuit_terminations]):
- # No valid path
- is_split = True
- break
- terminations = circuit_terminations
- else:
- # Check for non-symmetric path
- if all(isinstance(t, type(remote_terminations[0])) for t in remote_terminations[1:]):
- is_complete = True
- elif len(remote_terminations) == 0:
- is_complete = False
- else:
- # Unsupported topology, mark as split and exit
- is_complete = False
- is_split = True
- break
- return cls(
- path=path,
- is_complete=is_complete,
- is_active=is_active,
- is_split=is_split
- )
- def retrace(self):
- """
- Retrace the path from the currently-defined originating termination(s)
- """
- _new = self.from_origin(self.origins)
- if _new:
- self.path = _new.path
- self.is_complete = _new.is_complete
- self.is_active = _new.is_active
- self.is_split = _new.is_split
- self.save()
- else:
- self.delete()
- retrace.alters_data = True
- def get_cable_ids(self):
- """
- Return all Cable IDs within the path.
- """
- cable_ct = ObjectType.objects.get_for_model(Cable).pk
- cable_ids = []
- for node in self._nodes:
- ct, id = decompile_path_node(node)
- if ct == cable_ct:
- cable_ids.append(id)
- return cable_ids
- def get_total_length(self):
- """
- Return a tuple containing the sum of the length of each cable in the path
- and a flag indicating whether the length is definitive.
- """
- cable_ct = ObjectType.objects.get_for_model(Cable).pk
- # Pre-cache cable lengths by ID
- cable_ids = self.get_cable_ids()
- cables = {
- cable['pk']: cable['_abs_length']
- for cable in Cable.objects.filter(id__in=cable_ids, _abs_length__isnull=False).values('pk', '_abs_length')
- }
- # Iterate through each set of nodes in the path. For cables, add the length of the longest cable to the total
- # length of the path.
- total_length = 0
- for node_set in self.path:
- hop_length = 0
- for node in node_set:
- ct, pk = decompile_path_node(node)
- if ct != cable_ct:
- break # Not a cable
- if pk in cables and cables[pk] > hop_length:
- hop_length = cables[pk]
- total_length += hop_length
- is_definitive = len(cables) == len(cable_ids)
- return total_length, is_definitive
- def get_split_nodes(self):
- """
- Return all available next segments in a split cable path.
- """
- from circuits.models import CircuitTermination
- nodes = self.path_objects[-1]
- # RearPort splitting to multiple FrontPorts with no stack position
- if type(nodes[0]) is RearPort:
- return FrontPort.objects.filter(rear_port__in=nodes)
- # Cable terminating to multiple FrontPorts mapped to different
- # RearPorts connected to different cables
- elif type(nodes[0]) is FrontPort:
- return RearPort.objects.filter(pk__in=[fp.rear_port_id for fp in nodes])
- # Cable terminating to multiple CircuitTerminations
- elif type(nodes[0]) is CircuitTermination:
- return [
- ct.get_peer_termination() for ct in nodes
- ]
- def get_asymmetric_nodes(self):
- """
- Return all available next segments in a split cable path.
- """
- from circuits.models import CircuitTermination
- asymmetric_nodes = []
- for nodes in self.path_objects:
- if type(nodes[0]) in [RearPort, FrontPort, CircuitTermination]:
- asymmetric_nodes.extend([node for node in nodes if node.link is None])
- return asymmetric_nodes
|