| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502 |
- from collections import defaultdict
- from django.contrib.contenttypes.fields import GenericForeignKey
- from django.contrib.contenttypes.models import ContentType
- from django.core.exceptions import ObjectDoesNotExist, ValidationError
- from django.db import models
- from django.db.models import Sum
- from django.urls import reverse
- from taggit.managers import TaggableManager
- from dcim.choices import *
- from dcim.constants import *
- from dcim.fields import PathField
- from dcim.utils import decompile_path_node, object_to_path_node, path_node_to_object
- from extras.models import ChangeLoggedModel, CustomFieldModel, TaggedItem
- from extras.utils import extras_features
- from utilities.fields import ColorField
- from utilities.querysets import RestrictedQuerySet
- from utilities.utils import to_meters
- from .devices import Device
- from .device_components import FrontPort, RearPort
- __all__ = (
- 'Cable',
- 'CablePath',
- )
- #
- # Cables
- #
- @extras_features('custom_fields', 'custom_links', 'export_templates', 'webhooks')
- class Cable(ChangeLoggedModel, CustomFieldModel):
- """
- A physical connection between two endpoints.
- """
- termination_a_type = models.ForeignKey(
- to=ContentType,
- limit_choices_to=CABLE_TERMINATION_MODELS,
- on_delete=models.PROTECT,
- related_name='+'
- )
- termination_a_id = models.PositiveIntegerField()
- termination_a = GenericForeignKey(
- ct_field='termination_a_type',
- fk_field='termination_a_id'
- )
- termination_b_type = models.ForeignKey(
- to=ContentType,
- limit_choices_to=CABLE_TERMINATION_MODELS,
- on_delete=models.PROTECT,
- related_name='+'
- )
- termination_b_id = models.PositiveIntegerField()
- termination_b = GenericForeignKey(
- ct_field='termination_b_type',
- fk_field='termination_b_id'
- )
- type = models.CharField(
- max_length=50,
- choices=CableTypeChoices,
- blank=True
- )
- status = models.CharField(
- max_length=50,
- choices=CableStatusChoices,
- default=CableStatusChoices.STATUS_CONNECTED
- )
- label = models.CharField(
- max_length=100,
- blank=True
- )
- color = ColorField(
- blank=True
- )
- length = models.PositiveSmallIntegerField(
- blank=True,
- null=True
- )
- length_unit = models.CharField(
- max_length=50,
- choices=CableLengthUnitChoices,
- blank=True,
- )
- # Stores the normalized length (in meters) for database ordering
- _abs_length = models.DecimalField(
- max_digits=10,
- decimal_places=4,
- blank=True,
- null=True
- )
- # Cache the associated device (where applicable) for the A and B terminations. This enables filtering of Cables by
- # their associated Devices.
- _termination_a_device = models.ForeignKey(
- to=Device,
- on_delete=models.CASCADE,
- related_name='+',
- blank=True,
- null=True
- )
- _termination_b_device = models.ForeignKey(
- to=Device,
- on_delete=models.CASCADE,
- related_name='+',
- blank=True,
- null=True
- )
- tags = TaggableManager(through=TaggedItem)
- objects = RestrictedQuerySet.as_manager()
- csv_headers = [
- 'termination_a_type', 'termination_a_id', 'termination_b_type', 'termination_b_id', 'type', 'status', 'label',
- 'color', 'length', 'length_unit',
- ]
- class Meta:
- ordering = ['pk']
- unique_together = (
- ('termination_a_type', 'termination_a_id'),
- ('termination_b_type', 'termination_b_id'),
- )
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- # A copy of the PK to be used by __str__ in case the object is deleted
- self._pk = self.pk
- # Cache the original status so we can check later if it's been changed
- self._orig_status = self.status
- @classmethod
- def from_db(cls, db, field_names, values):
- """
- Cache the original A and B terminations of existing Cable instances for later reference inside clean().
- """
- instance = super().from_db(db, field_names, values)
- instance._orig_termination_a_type_id = instance.termination_a_type_id
- instance._orig_termination_a_id = instance.termination_a_id
- instance._orig_termination_b_type_id = instance.termination_b_type_id
- instance._orig_termination_b_id = instance.termination_b_id
- return instance
- def __str__(self):
- pk = self.pk or self._pk
- return self.label or f'#{pk}'
- def get_absolute_url(self):
- return reverse('dcim:cable', args=[self.pk])
- def clean(self):
- from circuits.models import CircuitTermination
- super().clean()
- # Validate that termination A exists
- if not hasattr(self, 'termination_a_type'):
- raise ValidationError('Termination A type has not been specified')
- try:
- self.termination_a_type.model_class().objects.get(pk=self.termination_a_id)
- except ObjectDoesNotExist:
- raise ValidationError({
- 'termination_a': 'Invalid ID for type {}'.format(self.termination_a_type)
- })
- # Validate that termination B exists
- if not hasattr(self, 'termination_b_type'):
- raise ValidationError('Termination B type has not been specified')
- try:
- self.termination_b_type.model_class().objects.get(pk=self.termination_b_id)
- except ObjectDoesNotExist:
- raise ValidationError({
- 'termination_b': 'Invalid ID for type {}'.format(self.termination_b_type)
- })
- # If editing an existing Cable instance, check that neither termination has been modified.
- if self.pk:
- err_msg = 'Cable termination points may not be modified. Delete and recreate the cable instead.'
- if (
- self.termination_a_type_id != self._orig_termination_a_type_id or
- self.termination_a_id != self._orig_termination_a_id
- ):
- raise ValidationError({
- 'termination_a': err_msg
- })
- if (
- self.termination_b_type_id != self._orig_termination_b_type_id or
- self.termination_b_id != self._orig_termination_b_id
- ):
- raise ValidationError({
- 'termination_b': err_msg
- })
- type_a = self.termination_a_type.model
- type_b = self.termination_b_type.model
- # Validate interface types
- if type_a == 'interface' and self.termination_a.type in NONCONNECTABLE_IFACE_TYPES:
- raise ValidationError({
- 'termination_a_id': 'Cables cannot be terminated to {} interfaces'.format(
- self.termination_a.get_type_display()
- )
- })
- if type_b == 'interface' and self.termination_b.type in NONCONNECTABLE_IFACE_TYPES:
- raise ValidationError({
- 'termination_b_id': 'Cables cannot be terminated to {} interfaces'.format(
- self.termination_b.get_type_display()
- )
- })
- # Check that termination types are compatible
- if type_b not in COMPATIBLE_TERMINATION_TYPES.get(type_a):
- raise ValidationError(
- f"Incompatible termination types: {self.termination_a_type} and {self.termination_b_type}"
- )
- # Check that two connected RearPorts have the same number of positions (if both are >1)
- if isinstance(self.termination_a, RearPort) and isinstance(self.termination_b, RearPort):
- if self.termination_a.positions > 1 and self.termination_b.positions > 1:
- if self.termination_a.positions != self.termination_b.positions:
- raise ValidationError(
- f"{self.termination_a} has {self.termination_a.positions} position(s) but "
- f"{self.termination_b} has {self.termination_b.positions}. "
- f"Both terminations must have the same number of positions (if greater than one)."
- )
- # A termination point cannot be connected to itself
- if self.termination_a == self.termination_b:
- raise ValidationError(f"Cannot connect {self.termination_a_type} to itself")
- # A front port cannot be connected to its corresponding rear port
- if (
- type_a in ['frontport', 'rearport'] and
- type_b in ['frontport', 'rearport'] and
- (
- getattr(self.termination_a, 'rear_port', None) == self.termination_b or
- getattr(self.termination_b, 'rear_port', None) == self.termination_a
- )
- ):
- raise ValidationError("A front port cannot be connected to it corresponding rear port")
- # Check for an existing Cable connected to either termination object
- if self.termination_a.cable not in (None, self):
- raise ValidationError("{} already has a cable attached (#{})".format(
- self.termination_a, self.termination_a.cable_id
- ))
- if self.termination_b.cable not in (None, self):
- raise ValidationError("{} already has a cable attached (#{})".format(
- self.termination_b, self.termination_b.cable_id
- ))
- # Validate length and length_unit
- if self.length is not None and not self.length_unit:
- raise ValidationError("Must specify a unit when setting a cable length")
- elif self.length is None:
- self.length_unit = ''
- def save(self, *args, **kwargs):
- # Store the given length (if any) in meters for use in database ordering
- if self.length and self.length_unit:
- self._abs_length = to_meters(self.length, self.length_unit)
- else:
- self._abs_length = None
- # Store the parent Device for the A and B terminations (if applicable) to enable filtering
- if hasattr(self.termination_a, 'device'):
- self._termination_a_device = self.termination_a.device
- if hasattr(self.termination_b, 'device'):
- self._termination_b_device = self.termination_b.device
- super().save(*args, **kwargs)
- # Update the private pk used in __str__ in case this is a new object (i.e. just got its pk)
- self._pk = self.pk
- def to_csv(self):
- return (
- '{}.{}'.format(self.termination_a_type.app_label, self.termination_a_type.model),
- self.termination_a_id,
- '{}.{}'.format(self.termination_b_type.app_label, self.termination_b_type.model),
- self.termination_b_id,
- self.get_type_display(),
- self.get_status_display(),
- self.label,
- self.color,
- self.length,
- self.length_unit,
- )
- def get_status_class(self):
- return CableStatusChoices.CSS_CLASSES.get(self.status)
- def get_compatible_types(self):
- """
- Return all termination types compatible with termination A.
- """
- if self.termination_a is None:
- return
- return COMPATIBLE_TERMINATION_TYPES[self.termination_a._meta.model_name]
- class CablePath(models.Model):
- """
- A CablePath instance represents the physical path from an origin to a destination, including all intermediate
- elements in the path. Every instance must specify an `origin`, whereas `destination` may be null (for paths which do
- not terminate on a PathEndpoint).
- `path` contains a list of nodes within the path, each represented by a tuple of (type, ID). The first element in the
- path must be a Cable instance, followed by a pair of pass-through ports. For example, consider the following
- topology:
- 1 2 3
- Interface A --- Front Port A | Rear Port A --- Rear Port B | Front Port B --- Interface B
- This path would be expressed as:
- CablePath(
- origin = Interface A
- destination = Interface B
- path = [Cable 1, Front Port A, Rear Port A, Cable 2, Rear Port B, Front Port B, Cable 3]
- )
- `is_active` is set to True only if 1) `destination` is not null, and 2) every Cable within the path has a status of
- "connected".
- """
- origin_type = models.ForeignKey(
- to=ContentType,
- on_delete=models.CASCADE,
- related_name='+'
- )
- origin_id = models.PositiveIntegerField()
- origin = GenericForeignKey(
- ct_field='origin_type',
- fk_field='origin_id'
- )
- destination_type = models.ForeignKey(
- to=ContentType,
- on_delete=models.CASCADE,
- related_name='+',
- blank=True,
- null=True
- )
- destination_id = models.PositiveIntegerField(
- blank=True,
- null=True
- )
- destination = GenericForeignKey(
- ct_field='destination_type',
- fk_field='destination_id'
- )
- path = PathField()
- is_active = models.BooleanField(
- default=False
- )
- is_split = models.BooleanField(
- default=False
- )
- class Meta:
- unique_together = ('origin_type', 'origin_id')
- def __str__(self):
- status = ' (active)' if self.is_active else ' (split)' if self.is_split else ''
- return f"Path #{self.pk}: {self.origin} to {self.destination} via {len(self.path)} nodes{status}"
- def save(self, *args, **kwargs):
- super().save(*args, **kwargs)
- # Record a direct reference to this CablePath on its originating object
- model = self.origin._meta.model
- model.objects.filter(pk=self.origin.pk).update(_path=self.pk)
- @property
- def segment_count(self):
- total_length = 1 + len(self.path) + (1 if self.destination else 0)
- return int(total_length / 3)
- @classmethod
- def from_origin(cls, origin):
- """
- Create a new CablePath instance as traced from the given path origin.
- """
- if origin is None or origin.cable is None:
- return None
- destination = None
- path = []
- position_stack = []
- is_active = True
- is_split = False
- node = origin
- while node.cable is not None:
- if node.cable.status != CableStatusChoices.STATUS_CONNECTED:
- is_active = False
- # Follow the cable to its far-end termination
- path.append(object_to_path_node(node.cable))
- peer_termination = node.get_cable_peer()
- # Follow a FrontPort to its corresponding RearPort
- if isinstance(peer_termination, FrontPort):
- path.append(object_to_path_node(peer_termination))
- node = peer_termination.rear_port
- if node.positions > 1:
- position_stack.append(peer_termination.rear_port_position)
- path.append(object_to_path_node(node))
- # Follow a RearPort to its corresponding FrontPort (if any)
- elif isinstance(peer_termination, RearPort):
- path.append(object_to_path_node(peer_termination))
- # Determine the peer FrontPort's position
- if peer_termination.positions == 1:
- position = 1
- elif position_stack:
- position = position_stack.pop()
- else:
- # No position indicated: path has split, so we stop at the RearPort
- is_split = True
- break
- try:
- node = FrontPort.objects.get(rear_port=peer_termination, rear_port_position=position)
- path.append(object_to_path_node(node))
- except ObjectDoesNotExist:
- # No corresponding FrontPort found for the RearPort
- break
- # Anything else marks the end of the path
- else:
- destination = peer_termination
- break
- if destination is None:
- is_active = False
- return cls(
- origin=origin,
- destination=destination,
- path=path,
- is_active=is_active,
- is_split=is_split
- )
- def get_path(self):
- """
- Return the path as a list of prefetched objects.
- """
- # Compile a list of IDs to prefetch for each type of model in the path
- to_prefetch = defaultdict(list)
- for node in self.path:
- ct_id, object_id = decompile_path_node(node)
- to_prefetch[ct_id].append(object_id)
- # Prefetch path objects using one query per model type. Prefetch related devices where appropriate.
- prefetched = {}
- for ct_id, object_ids in to_prefetch.items():
- model_class = ContentType.objects.get_for_id(ct_id).model_class()
- queryset = model_class.objects.filter(pk__in=object_ids)
- if hasattr(model_class, 'device'):
- queryset = queryset.prefetch_related('device')
- prefetched[ct_id] = {
- obj.id: obj for obj in queryset
- }
- # Replicate the path using the prefetched objects.
- path = []
- for node in self.path:
- ct_id, object_id = decompile_path_node(node)
- path.append(prefetched[ct_id][object_id])
- return path
- def get_total_length(self):
- """
- Return a tuple containing the sum of the length of each cable in the path
- and a flag indicating whether the length is definitive.
- """
- cable_ids = [
- # Starting from the first element, every third element in the path should be a Cable
- decompile_path_node(self.path[i])[1] for i in range(0, len(self.path), 3)
- ]
- cables = Cable.objects.filter(id__in=cable_ids, _abs_length__isnull=False)
- total_length = cables.aggregate(total=Sum('_abs_length'))['total']
- is_definitive = len(cables) == len(cable_ids)
- return total_length, is_definitive
- def get_split_nodes(self):
- """
- Return all available next segments in a split cable path.
- """
- rearport = path_node_to_object(self.path[-1])
- return FrontPort.objects.filter(rear_port=rearport)
|