| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087 |
- import netaddr
- from django.contrib.contenttypes.fields import GenericForeignKey
- from django.core.exceptions import ValidationError
- from django.db import models
- from django.db.models import F
- from django.db.models.functions import Cast
- from django.utils.functional import cached_property
- from django.utils.translation import gettext_lazy as _
- from netaddr.ip import IPNetwork
- from core.models import ObjectType
- from dcim.models.mixins import CachedScopeMixin
- from ipam.choices import *
- from ipam.constants import *
- from ipam.fields import IPNetworkField, IPAddressField
- from ipam.lookups import Host
- from ipam.managers import IPAddressManager
- from ipam.querysets import PrefixQuerySet
- from ipam.validators import DNSValidator
- from netbox.config import get_config
- from netbox.models import OrganizationalModel, PrimaryModel
- from netbox.models.features import ContactsMixin
- __all__ = (
- 'Aggregate',
- 'IPAddress',
- 'IPRange',
- 'Prefix',
- 'RIR',
- 'Role',
- )
- class GetAvailablePrefixesMixin:
- def get_available_prefixes(self):
- """
- Return all available prefixes within this Aggregate or Prefix as an IPSet.
- """
- params = {
- 'prefix__net_contained': str(self.prefix)
- }
- if hasattr(self, 'vrf'):
- params['vrf'] = self.vrf
- child_prefixes = Prefix.objects.filter(**params).values_list('prefix', flat=True)
- return netaddr.IPSet(self.prefix) - netaddr.IPSet(child_prefixes)
- def get_first_available_prefix(self):
- """
- Return the first available child prefix within the prefix (or None).
- """
- available_prefixes = self.get_available_prefixes()
- if not available_prefixes:
- return None
- return available_prefixes.iter_cidrs()[0]
- class RIR(OrganizationalModel):
- """
- A Regional Internet Registry (RIR) is responsible for the allocation of a large portion of the global IP address
- space. This can be an organization like ARIN or RIPE, or a governing standard such as RFC 1918.
- """
- is_private = models.BooleanField(
- default=False,
- verbose_name=_('private'),
- help_text=_('IP space managed by this RIR is considered private')
- )
- class Meta:
- ordering = ('name',)
- verbose_name = _('RIR')
- verbose_name_plural = _('RIRs')
- class Aggregate(ContactsMixin, GetAvailablePrefixesMixin, PrimaryModel):
- """
- An aggregate exists at the root level of the IP address space hierarchy in NetBox. Aggregates are used to organize
- the hierarchy and track the overall utilization of available address space. Each Aggregate is assigned to a RIR.
- """
- prefix = IPNetworkField(
- help_text=_("IPv4 or IPv6 network")
- )
- rir = models.ForeignKey(
- to='ipam.RIR',
- on_delete=models.PROTECT,
- related_name='aggregates',
- verbose_name=_('RIR'),
- help_text=_("Regional Internet Registry responsible for this IP space")
- )
- tenant = models.ForeignKey(
- to='tenancy.Tenant',
- on_delete=models.PROTECT,
- related_name='aggregates',
- blank=True,
- null=True
- )
- date_added = models.DateField(
- verbose_name=_('date added'),
- blank=True,
- null=True
- )
- clone_fields = (
- 'rir', 'tenant', 'date_added', 'description',
- )
- prerequisite_models = (
- 'ipam.RIR',
- )
- class Meta:
- ordering = ('prefix', 'pk') # prefix may be non-unique
- verbose_name = _('aggregate')
- verbose_name_plural = _('aggregates')
- def __str__(self):
- return str(self.prefix)
- def clean(self):
- super().clean()
- if self.prefix:
- # /0 masks are not acceptable
- if self.prefix.prefixlen == 0:
- raise ValidationError({
- 'prefix': _("Cannot create aggregate with /0 mask.")
- })
- # Ensure that the aggregate being added is not covered by an existing aggregate
- covering_aggregates = Aggregate.objects.filter(
- prefix__net_contains_or_equals=str(self.prefix)
- )
- if self.pk:
- covering_aggregates = covering_aggregates.exclude(pk=self.pk)
- if covering_aggregates:
- raise ValidationError({
- 'prefix': _(
- "Aggregates cannot overlap. {prefix} is already covered by an existing aggregate ({aggregate})."
- ).format(
- prefix=self.prefix,
- aggregate=covering_aggregates[0]
- )
- })
- # Ensure that the aggregate being added does not cover an existing aggregate
- covered_aggregates = Aggregate.objects.filter(prefix__net_contained=str(self.prefix))
- if self.pk:
- covered_aggregates = covered_aggregates.exclude(pk=self.pk)
- if covered_aggregates:
- raise ValidationError({
- 'prefix': _(
- "Prefixes cannot overlap aggregates. {prefix} covers an existing aggregate ({aggregate})."
- ).format(
- prefix=self.prefix,
- aggregate=covered_aggregates[0]
- )
- })
- @property
- def family(self):
- if self.prefix:
- return self.prefix.version
- return None
- def get_child_prefixes(self):
- """
- Return all Prefixes within this Aggregate
- """
- return Prefix.objects.filter(prefix__net_contained=str(self.prefix))
- def get_utilization(self):
- """
- Determine the prefix utilization of the aggregate and return it as a percentage.
- """
- queryset = Prefix.objects.filter(prefix__net_contained_or_equal=str(self.prefix))
- child_prefixes = netaddr.IPSet([p.prefix for p in queryset])
- utilization = float(child_prefixes.size) / self.prefix.size * 100
- return min(utilization, 100)
- class Role(OrganizationalModel):
- """
- A Role represents the functional role of a Prefix or VLAN; for example, "Customer," "Infrastructure," or
- "Management."
- """
- weight = models.PositiveSmallIntegerField(
- verbose_name=_('weight'),
- default=1000
- )
- class Meta:
- ordering = ('weight', 'name')
- verbose_name = _('role')
- verbose_name_plural = _('roles')
- def __str__(self):
- return self.name
- class Prefix(ContactsMixin, GetAvailablePrefixesMixin, CachedScopeMixin, PrimaryModel):
- """
- A Prefix represents an IPv4 or IPv6 network, including mask length. Prefixes can optionally be scoped to certain
- areas and/or assigned to VRFs. A Prefix must be assigned a status and may optionally be assigned a used-define Role.
- A Prefix can also be assigned to a VLAN where appropriate.
- """
- aggregate = models.ForeignKey(
- to='ipam.Aggregate',
- on_delete=models.SET_NULL,
- related_name='prefixes',
- blank=True,
- null=True,
- verbose_name=_('aggregate')
- )
- parent = models.ForeignKey(
- to='ipam.Prefix',
- on_delete=models.SET_NULL,
- related_name='children',
- blank=True,
- null=True,
- verbose_name=_('Prefix')
- )
- prefix = IPNetworkField(
- verbose_name=_('prefix'),
- help_text=_('IPv4 or IPv6 network with mask')
- )
- vrf = models.ForeignKey(
- to='ipam.VRF',
- on_delete=models.PROTECT,
- related_name='prefixes',
- blank=True,
- null=True,
- verbose_name=_('VRF')
- )
- tenant = models.ForeignKey(
- to='tenancy.Tenant',
- on_delete=models.PROTECT,
- related_name='prefixes',
- blank=True,
- null=True
- )
- vlan = models.ForeignKey(
- to='ipam.VLAN',
- on_delete=models.PROTECT,
- related_name='prefixes',
- blank=True,
- null=True
- )
- status = models.CharField(
- max_length=50,
- choices=PrefixStatusChoices,
- default=PrefixStatusChoices.STATUS_ACTIVE,
- verbose_name=_('status'),
- help_text=_('Operational status of this prefix')
- )
- role = models.ForeignKey(
- to='ipam.Role',
- on_delete=models.SET_NULL,
- related_name='prefixes',
- blank=True,
- null=True,
- help_text=_('The primary function of this prefix')
- )
- is_pool = models.BooleanField(
- verbose_name=_('is a pool'),
- default=False,
- help_text=_('All IP addresses within this prefix are considered usable')
- )
- mark_utilized = models.BooleanField(
- verbose_name=_('mark utilized'),
- default=False,
- help_text=_("Treat as fully utilized")
- )
- # Cached depth & child counts
- _depth = models.PositiveSmallIntegerField(
- default=0,
- editable=False
- )
- _children = models.PositiveBigIntegerField(
- default=0,
- editable=False
- )
- objects = PrefixQuerySet.as_manager()
- clone_fields = (
- 'scope_type', 'scope_id', 'vrf', 'tenant', 'vlan', 'status', 'role', 'is_pool', 'mark_utilized', 'description',
- )
- class Meta:
- ordering = (F('vrf').asc(nulls_first=True), 'prefix', 'pk') # (vrf, prefix) may be non-unique
- verbose_name = _('prefix')
- verbose_name_plural = _('prefixes')
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- # Cache the original prefix and VRF so we can check if they have changed on post_save
- self._prefix = self.__dict__.get('prefix')
- self._vrf_id = self.__dict__.get('vrf_id')
- def __str__(self):
- return str(self.prefix)
- def clean(self):
- super().clean()
- if self.prefix:
- if not isinstance(self.prefix, IPNetwork):
- self.prefix = IPNetwork(self.prefix)
- # /0 masks are not acceptable
- if self.prefix.prefixlen == 0:
- raise ValidationError({
- 'prefix': _("Cannot create prefix with /0 mask.")
- })
- if self.parent:
- if self.prefix not in self.parent.prefix:
- raise ValidationError({
- 'parent': _("Prefix must be part of parent prefix.")
- })
- if self.parent.status != PrefixStatusChoices.STATUS_CONTAINER and self.vrf != self.parent.vrf:
- raise ValidationError({
- 'vrf': _("VRF must match the parent VRF.")
- })
- # Enforce unique IP space (if applicable)
- if (self.vrf is None and get_config().ENFORCE_GLOBAL_UNIQUE) or (self.vrf and self.vrf.enforce_unique):
- duplicate_prefixes = self.get_duplicates()
- if duplicate_prefixes:
- table = _("VRF {vrf}").format(vrf=self.vrf) if self.vrf else _("global table")
- raise ValidationError({
- 'prefix': _("Duplicate prefix found in {table}: {prefix}").format(
- table=table,
- prefix=duplicate_prefixes.first(),
- )
- })
- def save(self, *args, **kwargs):
- vrf_id = self.vrf.pk if self.vrf else None
- if not self.pk and not self.parent:
- parent = self.find_parent_prefix(self)
- self.parent = parent
- elif self.parent and (self.prefix != self._prefix or vrf_id != self._vrf_id):
- parent = self.find_parent_prefix(self)
- self.parent = parent
- if isinstance(self.prefix, netaddr.IPNetwork):
- # Clear host bits from prefix
- self.prefix = self.prefix.cidr
- # Cache objects associated with the terminating object (for filtering)
- self.cache_related_objects()
- super().save(*args, **kwargs)
- @property
- def family(self):
- return self.prefix.version if self.prefix else None
- @property
- def mask_length(self):
- return self.prefix.prefixlen if self.prefix else None
- @property
- def depth_count(self):
- return self._depth
- @property
- def children_count(self):
- return self._children
- def _set_prefix_length(self, value):
- """
- Expose the IPNetwork object's prefixlen attribute on the parent model so that it can be manipulated directly,
- e.g. for bulk editing.
- """
- if self.prefix is not None:
- self.prefix.prefixlen = value
- prefix_length = property(fset=_set_prefix_length)
- def get_status_color(self):
- return PrefixStatusChoices.colors.get(self.status)
- def get_parents(self, include_self=False):
- """
- Return all containing Prefixes in the hierarchy.
- """
- lookup = 'net_contains_or_equals' if include_self else 'net_contains'
- return Prefix.objects.filter(**{
- 'vrf': self.vrf,
- f'prefix__{lookup}': self.prefix
- })
- def get_children(self, include_self=False):
- """
- Return all covered Prefixes in the hierarchy.
- """
- lookup = 'net_contained_or_equal' if include_self else 'net_contained'
- return Prefix.objects.filter(**{
- 'vrf': self.vrf,
- f'prefix__{lookup}': self.prefix
- })
- def get_duplicates(self):
- return Prefix.objects.filter(vrf=self.vrf, prefix=str(self.prefix)).exclude(pk=self.pk)
- def get_child_prefixes(self):
- """
- Return all Prefixes within this Prefix and VRF. If this Prefix is a container in the global table, return child
- Prefixes belonging to any VRF.
- """
- if self.vrf is None and self.status == PrefixStatusChoices.STATUS_CONTAINER:
- return Prefix.objects.filter(prefix__net_contained=str(self.prefix))
- else:
- return Prefix.objects.filter(prefix__net_contained=str(self.prefix), vrf=self.vrf)
- def get_child_ranges(self, **kwargs):
- """
- Return all IPRanges within this Prefix and VRF.
- """
- return IPRange.objects.filter(
- vrf=self.vrf,
- start_address__net_host_contained=str(self.prefix),
- end_address__net_host_contained=str(self.prefix),
- **kwargs
- )
- def get_child_ips(self):
- """
- Return all IPAddresses within this Prefix and VRF. If this Prefix is a container in the global table, return
- child IPAddresses belonging to any VRF.
- """
- if self.vrf is None and self.status == PrefixStatusChoices.STATUS_CONTAINER:
- return IPAddress.objects.filter(address__net_host_contained=str(self.prefix))
- else:
- return IPAddress.objects.filter(address__net_host_contained=str(self.prefix), vrf=self.vrf)
- def get_available_ips(self):
- """
- Return all available IPs within this prefix as an IPSet.
- """
- prefix = netaddr.IPSet(self.prefix)
- child_ips = netaddr.IPSet([
- ip.address.ip for ip in self.get_child_ips()
- ])
- child_ranges = netaddr.IPSet([
- iprange.range for iprange in self.get_child_ranges().filter(mark_populated=True)
- ])
- available_ips = prefix - child_ips - child_ranges
- # IPv6 /127's, pool, or IPv4 /31-/32 sets are fully usable
- if (self.family == 6 and self.prefix.prefixlen >= 127) or self.is_pool or (
- self.family == 4 and self.prefix.prefixlen >= 31
- ):
- return available_ips
- if self.family == 4:
- # For "normal" IPv4 prefixes, omit first and last addresses
- available_ips -= netaddr.IPSet([
- netaddr.IPAddress(self.prefix.first),
- netaddr.IPAddress(self.prefix.last),
- ])
- else:
- # For IPv6 prefixes, omit the Subnet-Router anycast address
- # per RFC 4291
- available_ips -= netaddr.IPSet([netaddr.IPAddress(self.prefix.first)])
- return available_ips
- def get_first_available_ip(self):
- """
- Return the first available IP within the prefix (or None).
- """
- available_ips = self.get_available_ips()
- if not available_ips:
- return None
- return '{}/{}'.format(next(available_ips.__iter__()), self.prefix.prefixlen)
- def get_utilization(self):
- """
- Determine the utilization of the prefix and return it as a percentage. For Prefixes with a status of
- "container", calculate utilization based on child prefixes. For all others, count child IP addresses.
- """
- if self.mark_utilized:
- return 100
- if self.status == PrefixStatusChoices.STATUS_CONTAINER:
- queryset = Prefix.objects.filter(
- prefix__net_contained=str(self.prefix),
- vrf=self.vrf
- )
- child_prefixes = netaddr.IPSet([p.prefix for p in queryset])
- utilization = float(child_prefixes.size) / self.prefix.size * 100
- else:
- # Compile an IPSet to avoid counting duplicate IPs
- child_ips = netaddr.IPSet()
- for iprange in self.get_child_ranges().filter(mark_utilized=True):
- child_ips.add(iprange.range)
- for ip in self.get_child_ips():
- child_ips.add(ip.address.ip)
- prefix_size = self.prefix.size
- if self.prefix.version == 4 and self.prefix.prefixlen < 31 and not self.is_pool:
- prefix_size -= 2
- utilization = float(child_ips.size) / prefix_size * 100
- return min(utilization, 100)
- @classmethod
- def find_parent_prefix(cls, network):
- prefixes = Prefix.objects.filter(
- models.Q(
- vrf=network.vrf,
- prefix__net_contains=str(network)
- ) | models.Q(
- vrf=None,
- status=PrefixStatusChoices.STATUS_CONTAINER,
- prefix__net_contains=str(network),
- )
- )
- return prefixes.last()
- class IPRange(ContactsMixin, PrimaryModel):
- """
- A range of IP addresses, defined by start and end addresses.
- """
- prefix = models.ForeignKey(
- to='ipam.Prefix',
- on_delete=models.SET_NULL,
- related_name='ip_ranges',
- null=True,
- blank=True,
- verbose_name=_('prefix'),
- )
- start_address = IPAddressField(
- verbose_name=_('start address'),
- help_text=_('IPv4 or IPv6 address (with mask)')
- )
- end_address = IPAddressField(
- verbose_name=_('end address'),
- help_text=_('IPv4 or IPv6 address (with mask)')
- )
- size = models.PositiveIntegerField(
- verbose_name=_('size'),
- editable=False
- )
- vrf = models.ForeignKey(
- to='ipam.VRF',
- on_delete=models.PROTECT,
- related_name='ip_ranges',
- blank=True,
- null=True,
- verbose_name=_('VRF')
- )
- tenant = models.ForeignKey(
- to='tenancy.Tenant',
- on_delete=models.PROTECT,
- related_name='ip_ranges',
- blank=True,
- null=True
- )
- status = models.CharField(
- verbose_name=_('status'),
- max_length=50,
- choices=IPRangeStatusChoices,
- default=IPRangeStatusChoices.STATUS_ACTIVE,
- help_text=_('Operational status of this range')
- )
- role = models.ForeignKey(
- to='ipam.Role',
- on_delete=models.SET_NULL,
- related_name='ip_ranges',
- blank=True,
- null=True,
- help_text=_('The primary function of this range')
- )
- mark_populated = models.BooleanField(
- verbose_name=_('mark populated'),
- default=False,
- help_text=_("Prevent the creation of IP addresses within this range")
- )
- mark_utilized = models.BooleanField(
- verbose_name=_('mark utilized'),
- default=False,
- help_text=_("Report space as 100% utilized")
- )
- clone_fields = (
- 'vrf', 'tenant', 'status', 'role', 'description', 'mark_populated', 'mark_utilized',
- )
- class Meta:
- ordering = (F('vrf').asc(nulls_first=True), 'start_address', 'pk') # (vrf, start_address) may be non-unique
- verbose_name = _('IP range')
- verbose_name_plural = _('IP ranges')
- def __str__(self):
- return self.name
- def clean(self):
- super().clean()
- if self.start_address and self.end_address:
- # If prefix is set, validate suitability
- if self.prefix:
- # Check that start address and end address are within the prefix range
- if self.start_address not in self.prefix.prefix and self.end_address not in self.prefix.prefix:
- raise ValidationError({
- 'start_address': _("Start address must be part of the selected prefix"),
- 'end_address': _("End address must be part of the selected prefix.")
- })
- elif self.start_address not in self.prefix.prefix:
- raise ValidationError({
- 'start_address': _("Start address must be part of the selected prefix")
- })
- elif self.end_address not in self.prefix.prefix:
- raise ValidationError({
- 'end_address': _("End address must be part of the selected prefix.")
- })
- # Check that VRF matches prefix VRF
- if self.vrf != self.prefix.vrf:
- raise ValidationError({
- 'vrf': _("VRF must match the prefix VRF.")
- })
- # Check that start & end IP versions match
- if self.start_address.version != self.end_address.version:
- raise ValidationError({
- 'end_address': _("Starting and ending IP address versions must match")
- })
- # Check that the start & end IP prefix lengths match
- if self.start_address.prefixlen != self.end_address.prefixlen:
- raise ValidationError({
- 'end_address': _("Starting and ending IP address masks must match")
- })
- # Check that the ending address is greater than the starting address
- if not self.end_address > self.start_address:
- raise ValidationError({
- 'end_address': _(
- "Ending address must be greater than the starting address ({start_address})"
- ).format(start_address=self.start_address)
- })
- # Check for overlapping ranges
- overlapping_ranges = (
- IPRange.objects.exclude(pk=self.pk)
- .filter(vrf=self.vrf)
- .filter(
- # Starts inside
- Q(
- start_address__host__inet__gte=self.start_address.ip,
- start_address__host__inet__lte=self.end_address.ip,
- ) |
- # Ends inside
- Q(
- end_address__host__inet__gte=self.start_address.ip,
- end_address__host__inet__lte=self.end_address.ip,
- ) |
- # Starts & ends outside
- Q(
- start_address__host__inet__lte=self.start_address.ip,
- end_address__host__inet__gte=self.end_address.ip,
- )
- )
- )
- if overlapping_ranges.exists():
- raise ValidationError(
- _("Defined addresses overlap with range {overlapping_range} in VRF {vrf}").format(
- overlapping_range=overlapping_ranges.first(),
- vrf=self.vrf
- ))
- # Validate maximum size
- MAX_SIZE = 2 ** 32 - 1
- if int(self.end_address.ip - self.start_address.ip) + 1 > MAX_SIZE:
- raise ValidationError(
- _("Defined range exceeds maximum supported size ({max_size})").format(max_size=MAX_SIZE)
- )
- def save(self, *args, **kwargs):
- # Record the range's size (number of IP addresses)
- self.size = int(self.end_address.ip - self.start_address.ip) + 1
- super().save(*args, **kwargs)
- @property
- def family(self):
- return self.start_address.version if self.start_address else None
- @property
- def range(self):
- return netaddr.IPRange(self.start_address.ip, self.end_address.ip)
- @property
- def mask_length(self):
- return self.start_address.prefixlen if self.start_address else None
- @cached_property
- def name(self):
- """
- Return an efficient string representation of the IP range.
- """
- separator = ':' if self.family == 6 else '.'
- start_chunks = str(self.start_address.ip).split(separator)
- end_chunks = str(self.end_address.ip).split(separator)
- base_chunks = []
- for a, b in zip(start_chunks, end_chunks):
- if a == b:
- base_chunks.append(a)
- base_str = separator.join(base_chunks)
- start_str = separator.join(start_chunks[len(base_chunks):])
- end_str = separator.join(end_chunks[len(base_chunks):])
- return f'{base_str}{separator}{start_str}-{end_str}/{self.start_address.prefixlen}'
- def _set_prefix_length(self, value):
- """
- Expose the IPRange object's prefixlen attribute on the parent model so that it can be manipulated directly,
- e.g. for bulk editing.
- """
- self.start_address.prefixlen = value
- self.end_address.prefixlen = value
- prefix_length = property(fset=_set_prefix_length)
- def get_status_color(self):
- return IPRangeStatusChoices.colors.get(self.status)
- def get_child_ips(self):
- """
- Return all IPAddresses within this IPRange and VRF.
- """
- return IPAddress.objects.filter(
- address__gte=self.start_address,
- address__lte=self.end_address,
- vrf=self.vrf
- )
- def get_available_ips(self):
- """
- Return all available IPs within this range as an IPSet.
- """
- if self.mark_populated:
- return netaddr.IPSet()
- range = netaddr.IPRange(self.start_address.ip, self.end_address.ip)
- child_ips = netaddr.IPSet([ip.address.ip for ip in self.get_child_ips()])
- return netaddr.IPSet(range) - child_ips
- @cached_property
- def first_available_ip(self):
- """
- Return the first available IP within the range (or None).
- """
- available_ips = self.get_available_ips()
- if not available_ips:
- return None
- return '{}/{}'.format(next(available_ips.__iter__()), self.start_address.prefixlen)
- @cached_property
- def utilization(self):
- """
- Determine the utilization of the range and return it as a percentage.
- """
- if self.mark_utilized:
- return 100
- # Compile an IPSet to avoid counting duplicate IPs
- child_count = netaddr.IPSet([
- ip.address.ip for ip in self.get_child_ips()
- ]).size
- return min(float(child_count) / self.size * 100, 100)
- class IPAddress(ContactsMixin, PrimaryModel):
- """
- An IPAddress represents an individual IPv4 or IPv6 address and its mask. The mask length should match what is
- configured in the real world. (Typically, only loopback interfaces are configured with /32 or /128 masks.) Like
- Prefixes, IPAddresses can optionally be assigned to a VRF. An IPAddress can optionally be assigned to an Interface.
- Interfaces can have zero or more IPAddresses assigned to them.
- An IPAddress can also optionally point to a NAT inside IP, designating itself as a NAT outside IP. This is useful,
- for example, when mapping public addresses to private addresses. When an Interface has been assigned an IPAddress
- which has a NAT outside IP, that Interface's Device can use either the inside or outside IP as its primary IP.
- """
- prefix = models.ForeignKey(
- to='ipam.Prefix',
- on_delete=models.SET_NULL,
- related_name='ip_addresses',
- blank=True,
- null=True,
- verbose_name=_('Prefix')
- )
- address = IPAddressField(
- verbose_name=_('address'),
- help_text=_('IPv4 or IPv6 address (with mask)')
- )
- vrf = models.ForeignKey(
- to='ipam.VRF',
- on_delete=models.PROTECT,
- related_name='ip_addresses',
- blank=True,
- null=True,
- verbose_name=_('VRF')
- )
- tenant = models.ForeignKey(
- to='tenancy.Tenant',
- on_delete=models.PROTECT,
- related_name='ip_addresses',
- blank=True,
- null=True
- )
- status = models.CharField(
- verbose_name=_('status'),
- max_length=50,
- choices=IPAddressStatusChoices,
- default=IPAddressStatusChoices.STATUS_ACTIVE,
- help_text=_('The operational status of this IP')
- )
- role = models.CharField(
- verbose_name=_('role'),
- max_length=50,
- choices=IPAddressRoleChoices,
- blank=True,
- null=True,
- help_text=_('The functional role of this IP')
- )
- assigned_object_type = models.ForeignKey(
- to='contenttypes.ContentType',
- on_delete=models.PROTECT,
- related_name='+',
- blank=True,
- null=True
- )
- assigned_object_id = models.PositiveBigIntegerField(
- blank=True,
- null=True
- )
- assigned_object = GenericForeignKey(
- ct_field='assigned_object_type',
- fk_field='assigned_object_id'
- )
- nat_inside = models.ForeignKey(
- to='self',
- on_delete=models.SET_NULL,
- related_name='nat_outside',
- blank=True,
- null=True,
- verbose_name=_('NAT (inside)'),
- help_text=_('The IP for which this address is the "outside" IP')
- )
- dns_name = models.CharField(
- max_length=255,
- blank=True,
- validators=[DNSValidator],
- verbose_name=_('DNS name'),
- help_text=_('Hostname or FQDN (not case-sensitive)')
- )
- objects = IPAddressManager()
- clone_fields = (
- 'vrf', 'tenant', 'status', 'role', 'dns_name', 'description',
- )
- class Meta:
- ordering = ('address', 'pk') # address may be non-unique
- indexes = (
- models.Index(Cast(Host('address'), output_field=IPAddressField()), name='ipam_ipaddress_host'),
- models.Index(fields=('assigned_object_type', 'assigned_object_id')),
- )
- verbose_name = _('IP address')
- verbose_name_plural = _('IP addresses')
- def __str__(self):
- return str(self.address)
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self._address = self.address
- # Denote the original assigned object (if any) for validation in clean()
- self._original_assigned_object_id = self.__dict__.get('assigned_object_id')
- self._original_assigned_object_type_id = self.__dict__.get('assigned_object_type_id')
- def get_duplicates(self):
- return IPAddress.objects.filter(
- vrf=self.vrf,
- address__net_host=str(self.address.ip)
- ).exclude(pk=self.pk)
- def get_next_available_ip(self):
- """
- Return the next available IP address within this IP's network (if any)
- """
- if self.address and self.address.broadcast:
- start_ip = self.address.ip + 1
- end_ip = self.address.broadcast - 1
- if start_ip <= end_ip:
- available_ips = netaddr.IPSet(netaddr.IPRange(start_ip, end_ip))
- available_ips -= netaddr.IPSet([
- address.ip for address in IPAddress.objects.filter(
- vrf=self.vrf,
- address__gt=self.address,
- address__net_contained_or_equal=self.address.cidr
- ).values_list('address', flat=True)
- ])
- if available_ips:
- return next(iter(available_ips))
- def get_related_ips(self):
- """
- Return all IPAddresses belonging to the same VRF.
- """
- return IPAddress.objects.exclude(address=str(self.address)).filter(
- vrf=self.vrf, address__net_contained_or_equal=str(self.address)
- )
- def clean(self):
- super().clean()
- if self.address:
- # If prefix is set, validate suitability
- if self.prefix:
- if self.address not in self.prefix.prefix:
- raise ValidationError({
- 'prefix': _("IP address must be part of the selected prefix.")
- })
- if self.vrf != self.prefix.vrf:
- raise ValidationError({
- 'vrf': _("IP address VRF must match the prefix VRF.")
- })
- # /0 masks are not acceptable
- if self.address.prefixlen == 0:
- raise ValidationError({
- 'address': _("Cannot create IP address with /0 mask.")
- })
- # Do not allow assigning a network ID or broadcast address to an interface.
- if self.assigned_object:
- if self.address.ip == self.address.network:
- msg = _("{ip} is a network ID, which may not be assigned to an interface.").format(
- ip=self.address.ip
- )
- if self.address.version == 4 and self.address.prefixlen not in (31, 32):
- raise ValidationError(msg)
- if self.address.version == 6 and self.address.prefixlen not in (127, 128):
- raise ValidationError(msg)
- if (
- self.address.version == 4 and self.address.ip == self.address.broadcast and
- self.address.prefixlen not in (31, 32)
- ):
- msg = _("{ip} is a broadcast address, which may not be assigned to an interface.").format(
- ip=self.address.ip
- )
- raise ValidationError(msg)
- # Enforce unique IP space (if applicable)
- if (self.vrf is None and get_config().ENFORCE_GLOBAL_UNIQUE) or (self.vrf and self.vrf.enforce_unique):
- duplicate_ips = self.get_duplicates()
- if duplicate_ips and (
- self.role not in IPADDRESS_ROLES_NONUNIQUE or
- any(dip.role not in IPADDRESS_ROLES_NONUNIQUE for dip in duplicate_ips)
- ):
- table = _("VRF {vrf}").format(vrf=self.vrf) if self.vrf else _("global table")
- raise ValidationError({
- 'address': _("Duplicate IP address found in {table}: {ipaddress}").format(
- table=table,
- ipaddress=duplicate_ips.first(),
- )
- })
- # Disallow the creation of IPAddresses within an IPRange with mark_populated=True
- parent_range = IPRange.objects.filter(
- start_address__lte=self.address,
- end_address__gte=self.address,
- vrf=self.vrf,
- mark_populated=True
- ).first()
- if parent_range:
- raise ValidationError({
- 'address': _(
- "Cannot create IP address {ip} inside range {range}."
- ).format(ip=self.address, range=parent_range)
- })
- if self._original_assigned_object_id and self._original_assigned_object_type_id:
- parent = getattr(self.assigned_object, 'parent_object', None)
- ct = ObjectType.objects.get_for_id(self._original_assigned_object_type_id)
- original_assigned_object = ct.get_object_for_this_type(pk=self._original_assigned_object_id)
- original_parent = getattr(original_assigned_object, 'parent_object', None)
- # can't use is_primary_ip as self.assigned_object might be changed
- is_primary = False
- if self.family == 4 and hasattr(original_parent, 'primary_ip4'):
- if original_parent.primary_ip4_id == self.pk:
- is_primary = True
- if self.family == 6 and hasattr(original_parent, 'primary_ip6'):
- if original_parent.primary_ip6_id == self.pk:
- is_primary = True
- if is_primary and (parent != original_parent):
- raise ValidationError(
- _("Cannot reassign IP address while it is designated as the primary IP for the parent object")
- )
- # Validate IP status selection
- if self.status == IPAddressStatusChoices.STATUS_SLAAC and self.family != 6:
- raise ValidationError({
- 'status': _("Only IPv6 addresses can be assigned SLAAC status")
- })
- def save(self, *args, **kwargs):
- # Force dns_name to lowercase
- self.dns_name = self.dns_name.lower()
- super().save(*args, **kwargs)
- def clone(self):
- attrs = super().clone()
- # Populate the address field with the next available IP (if any)
- if next_available_ip := self.get_next_available_ip():
- attrs['address'] = f'{next_available_ip}/{self.address.prefixlen}'
- return attrs
- def to_objectchange(self, action):
- objectchange = super().to_objectchange(action)
- objectchange.related_object = self.assigned_object
- return objectchange
- @property
- def family(self):
- if self.address:
- return self.address.version
- return None
- @property
- def is_oob_ip(self):
- if self.assigned_object:
- parent = getattr(self.assigned_object, 'parent_object', None)
- if hasattr(parent, 'oob_ip') and parent.oob_ip_id == self.pk:
- return True
- return False
- @property
- def is_primary_ip(self):
- if self.assigned_object:
- parent = getattr(self.assigned_object, 'parent_object', None)
- if self.family == 4 and hasattr(parent, 'primary_ip4') and parent.primary_ip4_id == self.pk:
- return True
- if self.family == 6 and hasattr(parent, 'primary_ip6') and parent.primary_ip6_id == self.pk:
- return True
- return False
- def _set_mask_length(self, value):
- """
- Expose the IPNetwork object's prefixlen attribute on the parent model so that it can be manipulated directly,
- e.g. for bulk editing.
- """
- if self.address is not None:
- self.address.prefixlen = value
- mask_length = property(fset=_set_mask_length)
- def get_status_color(self):
- return IPAddressStatusChoices.colors.get(self.status)
- def get_role_color(self):
- return IPAddressRoleChoices.colors.get(self.role)
|