| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910 |
- import netaddr
- from django.contrib.contenttypes.fields import GenericForeignKey
- from django.contrib.contenttypes.models import ContentType
- from django.core.exceptions import ValidationError
- from django.db import models
- from django.db.models import F
- from django.db.models.functions import Cast
- from django.urls import reverse
- from django.utils.functional import cached_property
- from django.utils.translation import gettext_lazy as _
- from ipam.choices import *
- from ipam.constants import *
- from ipam.fields import IPNetworkField, IPAddressField
- from ipam.lookups import Host
- from ipam.managers import IPAddressManager
- from ipam.querysets import PrefixQuerySet
- from ipam.validators import DNSValidator
- from netbox.config import get_config
- from netbox.models import OrganizationalModel, PrimaryModel
- __all__ = (
- 'Aggregate',
- 'IPAddress',
- 'IPRange',
- 'Prefix',
- 'RIR',
- 'Role',
- )
- class GetAvailablePrefixesMixin:
- def get_available_prefixes(self):
- """
- Return all available prefixes within this Aggregate or Prefix as an IPSet.
- """
- params = {
- 'prefix__net_contained': str(self.prefix)
- }
- if hasattr(self, 'vrf'):
- params['vrf'] = self.vrf
- child_prefixes = Prefix.objects.filter(**params).values_list('prefix', flat=True)
- return netaddr.IPSet(self.prefix) - netaddr.IPSet(child_prefixes)
- def get_first_available_prefix(self):
- """
- Return the first available child prefix within the prefix (or None).
- """
- available_prefixes = self.get_available_prefixes()
- if not available_prefixes:
- return None
- return available_prefixes.iter_cidrs()[0]
- class RIR(OrganizationalModel):
- """
- A Regional Internet Registry (RIR) is responsible for the allocation of a large portion of the global IP address
- space. This can be an organization like ARIN or RIPE, or a governing standard such as RFC 1918.
- """
- is_private = models.BooleanField(
- default=False,
- verbose_name=_('private'),
- help_text=_('IP space managed by this RIR is considered private')
- )
- class Meta:
- ordering = ('name',)
- verbose_name = _('RIR')
- verbose_name_plural = _('RIRs')
- def get_absolute_url(self):
- return reverse('ipam:rir', args=[self.pk])
- class Aggregate(GetAvailablePrefixesMixin, PrimaryModel):
- """
- An aggregate exists at the root level of the IP address space hierarchy in NetBox. Aggregates are used to organize
- the hierarchy and track the overall utilization of available address space. Each Aggregate is assigned to a RIR.
- """
- prefix = IPNetworkField(
- help_text=_("IPv4 or IPv6 network")
- )
- rir = models.ForeignKey(
- to='ipam.RIR',
- on_delete=models.PROTECT,
- related_name='aggregates',
- verbose_name=_('RIR'),
- help_text=_("Regional Internet Registry responsible for this IP space")
- )
- tenant = models.ForeignKey(
- to='tenancy.Tenant',
- on_delete=models.PROTECT,
- related_name='aggregates',
- blank=True,
- null=True
- )
- date_added = models.DateField(
- verbose_name=_('date added'),
- blank=True,
- null=True
- )
- clone_fields = (
- 'rir', 'tenant', 'date_added', 'description',
- )
- prerequisite_models = (
- 'ipam.RIR',
- )
- class Meta:
- ordering = ('prefix', 'pk') # prefix may be non-unique
- verbose_name = _('aggregate')
- verbose_name_plural = _('aggregates')
- def __str__(self):
- return str(self.prefix)
- def get_absolute_url(self):
- return reverse('ipam:aggregate', args=[self.pk])
- def clean(self):
- super().clean()
- if self.prefix:
- # /0 masks are not acceptable
- if self.prefix.prefixlen == 0:
- raise ValidationError({
- 'prefix': _("Cannot create aggregate with /0 mask.")
- })
- # Ensure that the aggregate being added is not covered by an existing aggregate
- covering_aggregates = Aggregate.objects.filter(
- prefix__net_contains_or_equals=str(self.prefix)
- )
- if self.pk:
- covering_aggregates = covering_aggregates.exclude(pk=self.pk)
- if covering_aggregates:
- raise ValidationError({
- 'prefix': _(
- "Aggregates cannot overlap. {} is already covered by an existing aggregate ({})."
- ).format(self.prefix, covering_aggregates[0])
- })
- # Ensure that the aggregate being added does not cover an existing aggregate
- covered_aggregates = Aggregate.objects.filter(prefix__net_contained=str(self.prefix))
- if self.pk:
- covered_aggregates = covered_aggregates.exclude(pk=self.pk)
- if covered_aggregates:
- raise ValidationError({
- 'prefix': _("Aggregates cannot overlap. {} covers an existing aggregate ({}).").format(
- self.prefix, covered_aggregates[0]
- )
- })
- @property
- def family(self):
- if self.prefix:
- return self.prefix.version
- return None
- def get_child_prefixes(self):
- """
- Return all Prefixes within this Aggregate
- """
- return Prefix.objects.filter(prefix__net_contained=str(self.prefix))
- def get_utilization(self):
- """
- Determine the prefix utilization of the aggregate and return it as a percentage.
- """
- queryset = Prefix.objects.filter(prefix__net_contained_or_equal=str(self.prefix))
- child_prefixes = netaddr.IPSet([p.prefix for p in queryset])
- utilization = float(child_prefixes.size) / self.prefix.size * 100
- return min(utilization, 100)
- class Role(OrganizationalModel):
- """
- A Role represents the functional role of a Prefix or VLAN; for example, "Customer," "Infrastructure," or
- "Management."
- """
- weight = models.PositiveSmallIntegerField(
- verbose_name=_('weight'),
- default=1000
- )
- class Meta:
- ordering = ('weight', 'name')
- verbose_name = _('role')
- verbose_name_plural = _('roles')
- def __str__(self):
- return self.name
- def get_absolute_url(self):
- return reverse('ipam:role', args=[self.pk])
- class Prefix(GetAvailablePrefixesMixin, PrimaryModel):
- """
- A Prefix represents an IPv4 or IPv6 network, including mask length. Prefixes can optionally be assigned to Sites and
- VRFs. A Prefix must be assigned a status and may optionally be assigned a used-define Role. A Prefix can also be
- assigned to a VLAN where appropriate.
- """
- prefix = IPNetworkField(
- verbose_name=_('prefix'),
- help_text=_('IPv4 or IPv6 network with mask')
- )
- site = models.ForeignKey(
- to='dcim.Site',
- on_delete=models.PROTECT,
- related_name='prefixes',
- blank=True,
- null=True
- )
- vrf = models.ForeignKey(
- to='ipam.VRF',
- on_delete=models.PROTECT,
- related_name='prefixes',
- blank=True,
- null=True,
- verbose_name=_('VRF')
- )
- tenant = models.ForeignKey(
- to='tenancy.Tenant',
- on_delete=models.PROTECT,
- related_name='prefixes',
- blank=True,
- null=True
- )
- vlan = models.ForeignKey(
- to='ipam.VLAN',
- on_delete=models.PROTECT,
- related_name='prefixes',
- blank=True,
- null=True
- )
- status = models.CharField(
- max_length=50,
- choices=PrefixStatusChoices,
- default=PrefixStatusChoices.STATUS_ACTIVE,
- verbose_name=_('status'),
- help_text=_('Operational status of this prefix')
- )
- role = models.ForeignKey(
- to='ipam.Role',
- on_delete=models.SET_NULL,
- related_name='prefixes',
- blank=True,
- null=True,
- help_text=_('The primary function of this prefix')
- )
- is_pool = models.BooleanField(
- verbose_name=_('is a pool'),
- default=False,
- help_text=_('All IP addresses within this prefix are considered usable')
- )
- mark_utilized = models.BooleanField(
- verbose_name=_('mark utilized'),
- default=False,
- help_text=_("Treat as 100% utilized")
- )
- # Cached depth & child counts
- _depth = models.PositiveSmallIntegerField(
- default=0,
- editable=False
- )
- _children = models.PositiveBigIntegerField(
- default=0,
- editable=False
- )
- objects = PrefixQuerySet.as_manager()
- clone_fields = (
- 'site', 'vrf', 'tenant', 'vlan', 'status', 'role', 'is_pool', 'mark_utilized', 'description',
- )
- class Meta:
- ordering = (F('vrf').asc(nulls_first=True), 'prefix', 'pk') # (vrf, prefix) may be non-unique
- verbose_name = _('prefix')
- verbose_name_plural = _('prefixes')
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- # Cache the original prefix and VRF so we can check if they have changed on post_save
- self._prefix = self.__dict__.get('prefix')
- self._vrf_id = self.__dict__.get('vrf_id')
- def __str__(self):
- return str(self.prefix)
- def get_absolute_url(self):
- return reverse('ipam:prefix', args=[self.pk])
- def clean(self):
- super().clean()
- if self.prefix:
- # /0 masks are not acceptable
- if self.prefix.prefixlen == 0:
- raise ValidationError({
- 'prefix': _("Cannot create prefix with /0 mask.")
- })
- # Enforce unique IP space (if applicable)
- if (self.vrf is None and get_config().ENFORCE_GLOBAL_UNIQUE) or (self.vrf and self.vrf.enforce_unique):
- duplicate_prefixes = self.get_duplicates()
- if duplicate_prefixes:
- raise ValidationError({
- 'prefix': _("Duplicate prefix found in {}: {}").format(
- _("VRF {}").format(self.vrf) if self.vrf else _("global table"),
- duplicate_prefixes.first(),
- )
- })
- def save(self, *args, **kwargs):
- if isinstance(self.prefix, netaddr.IPNetwork):
- # Clear host bits from prefix
- self.prefix = self.prefix.cidr
- super().save(*args, **kwargs)
- @property
- def family(self):
- return self.prefix.version if self.prefix else None
- @property
- def mask_length(self):
- return self.prefix.prefixlen if self.prefix else None
- @property
- def depth(self):
- return self._depth
- @property
- def children(self):
- return self._children
- def _set_prefix_length(self, value):
- """
- Expose the IPNetwork object's prefixlen attribute on the parent model so that it can be manipulated directly,
- e.g. for bulk editing.
- """
- if self.prefix is not None:
- self.prefix.prefixlen = value
- prefix_length = property(fset=_set_prefix_length)
- def get_status_color(self):
- return PrefixStatusChoices.colors.get(self.status)
- def get_parents(self, include_self=False):
- """
- Return all containing Prefixes in the hierarchy.
- """
- lookup = 'net_contains_or_equals' if include_self else 'net_contains'
- return Prefix.objects.filter(**{
- 'vrf': self.vrf,
- f'prefix__{lookup}': self.prefix
- })
- def get_children(self, include_self=False):
- """
- Return all covered Prefixes in the hierarchy.
- """
- lookup = 'net_contained_or_equal' if include_self else 'net_contained'
- return Prefix.objects.filter(**{
- 'vrf': self.vrf,
- f'prefix__{lookup}': self.prefix
- })
- def get_duplicates(self):
- return Prefix.objects.filter(vrf=self.vrf, prefix=str(self.prefix)).exclude(pk=self.pk)
- def get_child_prefixes(self):
- """
- Return all Prefixes within this Prefix and VRF. If this Prefix is a container in the global table, return child
- Prefixes belonging to any VRF.
- """
- if self.vrf is None and self.status == PrefixStatusChoices.STATUS_CONTAINER:
- return Prefix.objects.filter(prefix__net_contained=str(self.prefix))
- else:
- return Prefix.objects.filter(prefix__net_contained=str(self.prefix), vrf=self.vrf)
- def get_child_ranges(self):
- """
- Return all IPRanges within this Prefix and VRF.
- """
- return IPRange.objects.filter(
- vrf=self.vrf,
- start_address__net_host_contained=str(self.prefix),
- end_address__net_host_contained=str(self.prefix)
- )
- def get_child_ips(self):
- """
- Return all IPAddresses within this Prefix and VRF. If this Prefix is a container in the global table, return
- child IPAddresses belonging to any VRF.
- """
- if self.vrf is None and self.status == PrefixStatusChoices.STATUS_CONTAINER:
- return IPAddress.objects.filter(address__net_host_contained=str(self.prefix))
- else:
- return IPAddress.objects.filter(address__net_host_contained=str(self.prefix), vrf=self.vrf)
- def get_available_ips(self):
- """
- Return all available IPs within this prefix as an IPSet.
- """
- if self.mark_utilized:
- return netaddr.IPSet()
- prefix = netaddr.IPSet(self.prefix)
- child_ips = netaddr.IPSet([ip.address.ip for ip in self.get_child_ips()])
- child_ranges = netaddr.IPSet()
- for iprange in self.get_child_ranges():
- child_ranges.add(iprange.range)
- available_ips = prefix - child_ips - child_ranges
- # IPv6 /127's, pool, or IPv4 /31-/32 sets are fully usable
- if (self.family == 6 and self.prefix.prefixlen >= 127) or self.is_pool or (self.family == 4 and self.prefix.prefixlen >= 31):
- return available_ips
- if self.family == 4:
- # For "normal" IPv4 prefixes, omit first and last addresses
- available_ips -= netaddr.IPSet([
- netaddr.IPAddress(self.prefix.first),
- netaddr.IPAddress(self.prefix.last),
- ])
- else:
- # For IPv6 prefixes, omit the Subnet-Router anycast address
- # per RFC 4291
- available_ips -= netaddr.IPSet([netaddr.IPAddress(self.prefix.first)])
- return available_ips
- def get_first_available_ip(self):
- """
- Return the first available IP within the prefix (or None).
- """
- available_ips = self.get_available_ips()
- if not available_ips:
- return None
- return '{}/{}'.format(next(available_ips.__iter__()), self.prefix.prefixlen)
- def get_utilization(self):
- """
- Determine the utilization of the prefix and return it as a percentage. For Prefixes with a status of
- "container", calculate utilization based on child prefixes. For all others, count child IP addresses.
- """
- if self.mark_utilized:
- return 100
- if self.status == PrefixStatusChoices.STATUS_CONTAINER:
- queryset = Prefix.objects.filter(
- prefix__net_contained=str(self.prefix),
- vrf=self.vrf
- )
- child_prefixes = netaddr.IPSet([p.prefix for p in queryset])
- utilization = float(child_prefixes.size) / self.prefix.size * 100
- else:
- # Compile an IPSet to avoid counting duplicate IPs
- child_ips = netaddr.IPSet(
- [_.range for _ in self.get_child_ranges()] + [_.address.ip for _ in self.get_child_ips()]
- )
- prefix_size = self.prefix.size
- if self.prefix.version == 4 and self.prefix.prefixlen < 31 and not self.is_pool:
- prefix_size -= 2
- utilization = float(child_ips.size) / prefix_size * 100
- return min(utilization, 100)
- class IPRange(PrimaryModel):
- """
- A range of IP addresses, defined by start and end addresses.
- """
- start_address = IPAddressField(
- verbose_name=_('start address'),
- help_text=_('IPv4 or IPv6 address (with mask)')
- )
- end_address = IPAddressField(
- verbose_name=_('end address'),
- help_text=_('IPv4 or IPv6 address (with mask)')
- )
- size = models.PositiveIntegerField(
- verbose_name=_('size'),
- editable=False
- )
- vrf = models.ForeignKey(
- to='ipam.VRF',
- on_delete=models.PROTECT,
- related_name='ip_ranges',
- blank=True,
- null=True,
- verbose_name=_('VRF')
- )
- tenant = models.ForeignKey(
- to='tenancy.Tenant',
- on_delete=models.PROTECT,
- related_name='ip_ranges',
- blank=True,
- null=True
- )
- status = models.CharField(
- verbose_name=_('status'),
- max_length=50,
- choices=IPRangeStatusChoices,
- default=IPRangeStatusChoices.STATUS_ACTIVE,
- help_text=_('Operational status of this range')
- )
- role = models.ForeignKey(
- to='ipam.Role',
- on_delete=models.SET_NULL,
- related_name='ip_ranges',
- blank=True,
- null=True,
- help_text=_('The primary function of this range')
- )
- mark_utilized = models.BooleanField(
- verbose_name=_('mark utilized'),
- default=False,
- help_text=_("Treat as 100% utilized")
- )
- clone_fields = (
- 'vrf', 'tenant', 'status', 'role', 'description',
- )
- class Meta:
- ordering = (F('vrf').asc(nulls_first=True), 'start_address', 'pk') # (vrf, start_address) may be non-unique
- verbose_name = _('IP range')
- verbose_name_plural = _('IP ranges')
- def __str__(self):
- return self.name
- def get_absolute_url(self):
- return reverse('ipam:iprange', args=[self.pk])
- def clean(self):
- super().clean()
- if self.start_address and self.end_address:
- # Check that start & end IP versions match
- if self.start_address.version != self.end_address.version:
- raise ValidationError({
- 'end_address': _("Starting and ending IP address versions must match")
- })
- # Check that the start & end IP prefix lengths match
- if self.start_address.prefixlen != self.end_address.prefixlen:
- raise ValidationError({
- 'end_address': _("Starting and ending IP address masks must match")
- })
- # Check that the ending address is greater than the starting address
- if not self.end_address > self.start_address:
- raise ValidationError({
- 'end_address': _(
- "Ending address must be lower than the starting address ({start_address})"
- ).format(start_address=self.start_address)
- })
- # Check for overlapping ranges
- overlapping_range = IPRange.objects.exclude(pk=self.pk).filter(vrf=self.vrf).filter(
- Q(start_address__gte=self.start_address, start_address__lte=self.end_address) | # Starts inside
- Q(end_address__gte=self.start_address, end_address__lte=self.end_address) | # Ends inside
- Q(start_address__lte=self.start_address, end_address__gte=self.end_address) # Starts & ends outside
- ).first()
- if overlapping_range:
- raise ValidationError(
- _("Defined addresses overlap with range {overlapping_range} in VRF {vrf}").format(
- overlapping_range=overlapping_range,
- vrf=self.vrf
- ))
- # Validate maximum size
- MAX_SIZE = 2 ** 32 - 1
- if int(self.end_address.ip - self.start_address.ip) + 1 > MAX_SIZE:
- raise ValidationError(
- _("Defined range exceeds maximum supported size ({max_size})").format(max_size=MAX_SIZE)
- )
- def save(self, *args, **kwargs):
- # Record the range's size (number of IP addresses)
- self.size = int(self.end_address.ip - self.start_address.ip) + 1
- super().save(*args, **kwargs)
- @property
- def family(self):
- return self.start_address.version if self.start_address else None
- @property
- def range(self):
- return netaddr.IPRange(self.start_address.ip, self.end_address.ip)
- @property
- def mask_length(self):
- return self.start_address.prefixlen if self.start_address else None
- @cached_property
- def name(self):
- """
- Return an efficient string representation of the IP range.
- """
- separator = ':' if self.family == 6 else '.'
- start_chunks = str(self.start_address.ip).split(separator)
- end_chunks = str(self.end_address.ip).split(separator)
- base_chunks = []
- for a, b in zip(start_chunks, end_chunks):
- if a == b:
- base_chunks.append(a)
- base_str = separator.join(base_chunks)
- start_str = separator.join(start_chunks[len(base_chunks):])
- end_str = separator.join(end_chunks[len(base_chunks):])
- return f'{base_str}{separator}{start_str}-{end_str}/{self.start_address.prefixlen}'
- def _set_prefix_length(self, value):
- """
- Expose the IPRange object's prefixlen attribute on the parent model so that it can be manipulated directly,
- e.g. for bulk editing.
- """
- self.start_address.prefixlen = value
- self.end_address.prefixlen = value
- prefix_length = property(fset=_set_prefix_length)
- def get_status_color(self):
- return IPRangeStatusChoices.colors.get(self.status)
- def get_child_ips(self):
- """
- Return all IPAddresses within this IPRange and VRF.
- """
- return IPAddress.objects.filter(
- address__gte=self.start_address,
- address__lte=self.end_address,
- vrf=self.vrf
- )
- def get_available_ips(self):
- """
- Return all available IPs within this range as an IPSet.
- """
- range = netaddr.IPRange(self.start_address.ip, self.end_address.ip)
- child_ips = netaddr.IPSet([ip.address.ip for ip in self.get_child_ips()])
- return netaddr.IPSet(range) - child_ips
- @cached_property
- def first_available_ip(self):
- """
- Return the first available IP within the range (or None).
- """
- available_ips = self.get_available_ips()
- if not available_ips:
- return None
- return '{}/{}'.format(next(available_ips.__iter__()), self.start_address.prefixlen)
- @cached_property
- def utilization(self):
- """
- Determine the utilization of the range and return it as a percentage.
- """
- if self.mark_utilized:
- return 100
- # Compile an IPSet to avoid counting duplicate IPs
- child_count = netaddr.IPSet([
- ip.address.ip for ip in self.get_child_ips()
- ]).size
- return int(float(child_count) / self.size * 100)
- class IPAddress(PrimaryModel):
- """
- An IPAddress represents an individual IPv4 or IPv6 address and its mask. The mask length should match what is
- configured in the real world. (Typically, only loopback interfaces are configured with /32 or /128 masks.) Like
- Prefixes, IPAddresses can optionally be assigned to a VRF. An IPAddress can optionally be assigned to an Interface.
- Interfaces can have zero or more IPAddresses assigned to them.
- An IPAddress can also optionally point to a NAT inside IP, designating itself as a NAT outside IP. This is useful,
- for example, when mapping public addresses to private addresses. When an Interface has been assigned an IPAddress
- which has a NAT outside IP, that Interface's Device can use either the inside or outside IP as its primary IP.
- """
- address = IPAddressField(
- verbose_name=_('address'),
- help_text=_('IPv4 or IPv6 address (with mask)')
- )
- vrf = models.ForeignKey(
- to='ipam.VRF',
- on_delete=models.PROTECT,
- related_name='ip_addresses',
- blank=True,
- null=True,
- verbose_name=_('VRF')
- )
- tenant = models.ForeignKey(
- to='tenancy.Tenant',
- on_delete=models.PROTECT,
- related_name='ip_addresses',
- blank=True,
- null=True
- )
- status = models.CharField(
- verbose_name=_('status'),
- max_length=50,
- choices=IPAddressStatusChoices,
- default=IPAddressStatusChoices.STATUS_ACTIVE,
- help_text=_('The operational status of this IP')
- )
- role = models.CharField(
- verbose_name=_('role'),
- max_length=50,
- choices=IPAddressRoleChoices,
- blank=True,
- help_text=_('The functional role of this IP')
- )
- assigned_object_type = models.ForeignKey(
- to=ContentType,
- limit_choices_to=IPADDRESS_ASSIGNMENT_MODELS,
- on_delete=models.PROTECT,
- related_name='+',
- blank=True,
- null=True
- )
- assigned_object_id = models.PositiveBigIntegerField(
- blank=True,
- null=True
- )
- assigned_object = GenericForeignKey(
- ct_field='assigned_object_type',
- fk_field='assigned_object_id'
- )
- nat_inside = models.ForeignKey(
- to='self',
- on_delete=models.SET_NULL,
- related_name='nat_outside',
- blank=True,
- null=True,
- verbose_name=_('NAT (inside)'),
- help_text=_('The IP for which this address is the "outside" IP')
- )
- dns_name = models.CharField(
- max_length=255,
- blank=True,
- validators=[DNSValidator],
- verbose_name=_('DNS name'),
- help_text=_('Hostname or FQDN (not case-sensitive)')
- )
- objects = IPAddressManager()
- clone_fields = (
- 'vrf', 'tenant', 'status', 'role', 'dns_name', 'description',
- )
- class Meta:
- ordering = ('address', 'pk') # address may be non-unique
- indexes = [
- models.Index(Cast(Host('address'), output_field=IPAddressField()), name='ipam_ipaddress_host'),
- ]
- verbose_name = _('IP address')
- verbose_name_plural = _('IP addresses')
- def __str__(self):
- return str(self.address)
- def get_absolute_url(self):
- return reverse('ipam:ipaddress', args=[self.pk])
- def get_duplicates(self):
- return IPAddress.objects.filter(
- vrf=self.vrf,
- address__net_host=str(self.address.ip)
- ).exclude(pk=self.pk)
- def get_next_available_ip(self):
- """
- Return the next available IP address within this IP's network (if any)
- """
- if self.address and self.address.broadcast:
- start_ip = self.address.ip + 1
- end_ip = self.address.broadcast - 1
- if start_ip <= end_ip:
- available_ips = netaddr.IPSet(netaddr.IPRange(start_ip, end_ip))
- available_ips -= netaddr.IPSet([
- address.ip for address in IPAddress.objects.filter(
- vrf=self.vrf,
- address__gt=self.address,
- address__net_contained_or_equal=self.address.cidr
- ).values_list('address', flat=True)
- ])
- if available_ips:
- return next(iter(available_ips))
- def get_related_ips(self):
- """
- Return all IPAddresses belonging to the same VRF.
- """
- return IPAddress.objects.exclude(address=str(self.address)).filter(
- vrf=self.vrf, address__net_contained_or_equal=str(self.address)
- )
- def clean(self):
- super().clean()
- if self.address:
- # /0 masks are not acceptable
- if self.address.prefixlen == 0:
- raise ValidationError({
- 'address': _("Cannot create IP address with /0 mask.")
- })
- # Enforce unique IP space (if applicable)
- if (self.vrf is None and get_config().ENFORCE_GLOBAL_UNIQUE) or (self.vrf and self.vrf.enforce_unique):
- duplicate_ips = self.get_duplicates()
- if duplicate_ips and (
- self.role not in IPADDRESS_ROLES_NONUNIQUE or
- any(dip.role not in IPADDRESS_ROLES_NONUNIQUE for dip in duplicate_ips)
- ):
- raise ValidationError({
- 'address': _("Duplicate IP address found in {}: {}").format(
- _("VRF {}").format(self.vrf) if self.vrf else _("global table"),
- duplicate_ips.first(),
- )
- })
- # Validate IP status selection
- if self.status == IPAddressStatusChoices.STATUS_SLAAC and self.family != 6:
- raise ValidationError({
- 'status': _("Only IPv6 addresses can be assigned SLAAC status")
- })
- def save(self, *args, **kwargs):
- # Force dns_name to lowercase
- self.dns_name = self.dns_name.lower()
- super().save(*args, **kwargs)
- def clone(self):
- attrs = super().clone()
- # Populate the address field with the next available IP (if any)
- if next_available_ip := self.get_next_available_ip():
- attrs['address'] = f'{next_available_ip}/{self.address.prefixlen}'
- return attrs
- def to_objectchange(self, action):
- objectchange = super().to_objectchange(action)
- objectchange.related_object = self.assigned_object
- return objectchange
- @property
- def family(self):
- if self.address:
- return self.address.version
- return None
- @property
- def is_oob_ip(self):
- if self.assigned_object:
- parent = getattr(self.assigned_object, 'parent_object', None)
- if hasattr(parent, 'oob_ip') and parent.oob_ip_id == self.pk:
- return True
- return False
- @property
- def is_primary_ip(self):
- if self.assigned_object:
- parent = getattr(self.assigned_object, 'parent_object', None)
- if self.family == 4 and hasattr(parent, 'primary_ip4') and parent.primary_ip4_id == self.pk:
- return True
- if self.family == 6 and hasattr(parent, 'primary_ip6') and parent.primary_ip6_id == self.pk:
- return True
- return False
- def _set_mask_length(self, value):
- """
- Expose the IPNetwork object's prefixlen attribute on the parent model so that it can be manipulated directly,
- e.g. for bulk editing.
- """
- if self.address is not None:
- self.address.prefixlen = value
- mask_length = property(fset=_set_mask_length)
- def get_status_color(self):
- return IPAddressStatusChoices.colors.get(self.status)
- def get_role_color(self):
- return IPAddressRoleChoices.colors.get(self.role)
|