|
|
@@ -1,3 +1,4 @@
|
|
|
+import netaddr
|
|
|
from django.contrib.contenttypes.models import ContentType
|
|
|
from django.core.exceptions import ValidationError
|
|
|
from django.db.backends.postgresql.psycopg_any import NumericRange
|
|
|
@@ -6,8 +7,11 @@ from netaddr import IPNetwork, IPSet
|
|
|
|
|
|
from dcim.models import Site, SiteGroup
|
|
|
from ipam.choices import *
|
|
|
+from ipam.constants import SERVICE_PORT_MAX, SERVICE_PORT_MIN
|
|
|
from ipam.models import *
|
|
|
+from ipam.utils import rebuild_prefixes
|
|
|
from utilities.data import string_to_ranges
|
|
|
+from virtualization.models import VirtualMachine
|
|
|
|
|
|
|
|
|
class AggregateTestCase(TestCase):
|
|
|
@@ -113,7 +117,7 @@ class IPRangeTestCase(TestCase):
|
|
|
|
|
|
self.assertEqual(iprange.size, 1)
|
|
|
self.assertEqual(str(iprange), '192.0.2.10-192.0.2.10/24')
|
|
|
- self.assertEqual(iprange.first_available_ip, '192.0.2.10/24')
|
|
|
+ self.assertEqual(iprange.get_first_available_ip(), '192.0.2.10/24')
|
|
|
|
|
|
def test_first_available_ip_consumed_single_address_range(self):
|
|
|
iprange = IPRange.objects.create(
|
|
|
@@ -123,7 +127,7 @@ class IPRangeTestCase(TestCase):
|
|
|
IPAddress.objects.create(address=IPNetwork('192.0.2.10/24'))
|
|
|
|
|
|
# The sole address in the range is now assigned, so no IPs remain available.
|
|
|
- self.assertIsNone(iprange.first_available_ip)
|
|
|
+ self.assertIsNone(iprange.get_first_available_ip())
|
|
|
|
|
|
def test_single_address_range_ipv6(self):
|
|
|
# IPRange.name has IPv4/IPv6-specific formatting; exercise the IPv6 branch
|
|
|
@@ -138,7 +142,7 @@ class IPRangeTestCase(TestCase):
|
|
|
|
|
|
self.assertEqual(iprange.size, 1)
|
|
|
self.assertEqual(str(iprange), '2001:db8::10-2001:db8::10/64')
|
|
|
- self.assertEqual(iprange.first_available_ip, '2001:db8::10/64')
|
|
|
+ self.assertEqual(iprange.get_first_available_ip(), '2001:db8::10/64')
|
|
|
|
|
|
def test_reversed_range(self):
|
|
|
iprange = IPRange(
|
|
|
@@ -165,9 +169,207 @@ class IPRangeTestCase(TestCase):
|
|
|
with self.assertRaisesMessage(ValidationError, 'Defined addresses overlap'):
|
|
|
iprange.clean()
|
|
|
|
|
|
+ def test_get_child_ips_host_portion(self):
|
|
|
+ iprange = IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('10.0.0.2/24'),
|
|
|
+ end_address=IPNetwork('10.0.0.254/24'),
|
|
|
+ )
|
|
|
+
|
|
|
+ ip1 = IPAddress.objects.create(address=IPNetwork('10.0.0.2/32'))
|
|
|
+ ip2 = IPAddress.objects.create(address=IPNetwork('10.0.0.3/24'))
|
|
|
+
|
|
|
+ self.assertEqual(set(iprange.get_child_ips()), {ip1, ip2})
|
|
|
+
|
|
|
+ def test_get_available_ips(self):
|
|
|
+ """
|
|
|
+ Tests that occupied hosts are deduplicated and excluded from the available set.
|
|
|
+ """
|
|
|
+ iprange = IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.10/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.13/24'),
|
|
|
+ )
|
|
|
+ IPAddress.objects.bulk_create((
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.10/24')),
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.10/32')),
|
|
|
+ ))
|
|
|
+
|
|
|
+ self.assertEqual(iprange.get_available_ips(), IPSet(['192.0.2.11/32', '192.0.2.12/31']))
|
|
|
+
|
|
|
+ def test_get_available_ips_mark_populated(self):
|
|
|
+ """
|
|
|
+ Tests that a populated range reports no available IPs.
|
|
|
+ """
|
|
|
+ iprange = IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.10/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.13/24'),
|
|
|
+ mark_populated=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ self.assertEqual(iprange.get_available_ips(), IPSet())
|
|
|
+
|
|
|
+ def test_get_available_ips_vrf(self):
|
|
|
+ """
|
|
|
+ Tests that IPs in other VRFs do not consume range space.
|
|
|
+ """
|
|
|
+ vrf1 = VRF.objects.create(name='VRF 1')
|
|
|
+ vrf2 = VRF.objects.create(name='VRF 2')
|
|
|
+ iprange = IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.10/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.11/24'),
|
|
|
+ vrf=vrf1,
|
|
|
+ )
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.10/24'), vrf=vrf2)
|
|
|
+
|
|
|
+ self.assertEqual(iprange.get_available_ips(), IPSet(['192.0.2.10/31']))
|
|
|
+
|
|
|
+ def test_iter_available_ips(self):
|
|
|
+ """
|
|
|
+ Tests that iter_available_ips() yields the same addresses as get_available_ips() in order.
|
|
|
+ """
|
|
|
+ iprange = IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.10/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.13/24'),
|
|
|
+ )
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.11/24'))
|
|
|
+
|
|
|
+ self.assertEqual(list(iprange.iter_available_ips()), sorted(iprange.get_available_ips()))
|
|
|
+
|
|
|
+ def test_available_ip_count(self):
|
|
|
+ iprange = IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.10/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.19/24'),
|
|
|
+ )
|
|
|
+
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.12/24'))
|
|
|
+
|
|
|
+ self.assertEqual(iprange.get_available_ip_count(), 9)
|
|
|
+
|
|
|
+ def test_available_ip_count_distinct_hosts(self):
|
|
|
+ iprange = IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.10/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.19/24'),
|
|
|
+ )
|
|
|
+
|
|
|
+ # Two rows for .10 (different masks) must dedupe to a single occupied host.
|
|
|
+ IPAddress.objects.bulk_create((
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.10/24')),
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.10/32')),
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.11/24')),
|
|
|
+ ))
|
|
|
+
|
|
|
+ self.assertEqual(iprange.get_available_ip_count(), 8)
|
|
|
+
|
|
|
+ def test_available_ip_count_vrf(self):
|
|
|
+ vrf1 = VRF.objects.create(name='VRF 1')
|
|
|
+ vrf2 = VRF.objects.create(name='VRF 2')
|
|
|
+
|
|
|
+ iprange = IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.10/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.19/24'),
|
|
|
+ vrf=vrf1,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.12/24'), vrf=vrf1)
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.13/24'), vrf=vrf2)
|
|
|
+
|
|
|
+ # Only the VRF 1 IP should count.
|
|
|
+ self.assertEqual(iprange.get_available_ip_count(), 9)
|
|
|
+
|
|
|
+ def test_available_ip_count_populated(self):
|
|
|
+ iprange = IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.10/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.19/24'),
|
|
|
+ mark_populated=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ self.assertEqual(iprange.get_available_ip_count(), 0)
|
|
|
+
|
|
|
+ def test_first_available_ip_full(self):
|
|
|
+ iprange = IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.10/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.11/24'),
|
|
|
+ )
|
|
|
+
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.10/24'))
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.11/24'))
|
|
|
+
|
|
|
+ self.assertIsNone(iprange.get_first_available_ip())
|
|
|
+
|
|
|
+ def test_first_available_ip_populated(self):
|
|
|
+ iprange = IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.10/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.19/24'),
|
|
|
+ mark_populated=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ self.assertIsNone(iprange.get_first_available_ip())
|
|
|
+
|
|
|
+ def test_first_available_ip_ipv6(self):
|
|
|
+ iprange = IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('::/126'),
|
|
|
+ end_address=IPNetwork('::3/126'),
|
|
|
+ )
|
|
|
+
|
|
|
+ self.assertEqual(iprange.get_first_available_ip(), '::/126')
|
|
|
+
|
|
|
+ def test_utilization_distinct_hosts(self):
|
|
|
+ iprange = IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.10/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.19/24'),
|
|
|
+ )
|
|
|
+
|
|
|
+ IPAddress.objects.bulk_create((
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.10/24')),
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.10/32')),
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.11/24')),
|
|
|
+ ))
|
|
|
+
|
|
|
+ # Two distinct hosts in a 10-address range.
|
|
|
+ self.assertEqual(iprange.utilization, 2 / 10 * 100)
|
|
|
+
|
|
|
+ def test_utilization_vrf(self):
|
|
|
+ vrf1 = VRF.objects.create(name='VRF 1')
|
|
|
+ vrf2 = VRF.objects.create(name='VRF 2')
|
|
|
+
|
|
|
+ iprange = IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.10/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.19/24'),
|
|
|
+ vrf=vrf1,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.12/24'), vrf=vrf1)
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.13/24'), vrf=vrf2)
|
|
|
+
|
|
|
+ # Only the VRF 1 IP counts toward utilization.
|
|
|
+ self.assertEqual(iprange.utilization, 1 / 10 * 100)
|
|
|
+
|
|
|
+ def test_utilization_duplicate_ips_vrf(self):
|
|
|
+ """
|
|
|
+ Tests that identical IPs in a non-unique VRF count once toward range utilization.
|
|
|
+ """
|
|
|
+ vrf = VRF.objects.create(name='VRF 1', enforce_unique=False)
|
|
|
+ iprange = IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.10/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.19/24'),
|
|
|
+ vrf=vrf,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPAddress.objects.bulk_create((
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.12/24'), vrf=vrf),
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.12/24'), vrf=vrf),
|
|
|
+ ))
|
|
|
+
|
|
|
+ self.assertEqual(iprange.utilization, 1 / 10 * 100)
|
|
|
+
|
|
|
|
|
|
class PrefixTestCase(TestCase):
|
|
|
|
|
|
+ def assertAvailableIPCountMatchesIPSet(self, prefix):
|
|
|
+ """
|
|
|
+ Confirm that get_available_ip_count() matches get_available_ips().size for the supplied prefix.
|
|
|
+ """
|
|
|
+ self.assertEqual(prefix.get_available_ip_count(), prefix.get_available_ips().size)
|
|
|
+
|
|
|
def test_family_string(self):
|
|
|
# Test property when prefix is a string
|
|
|
prefix = Prefix(prefix='10.0.0.0/8')
|
|
|
@@ -251,6 +453,23 @@ class PrefixTestCase(TestCase):
|
|
|
self.assertEqual(child_ranges[0], ranges[2])
|
|
|
self.assertEqual(child_ranges[1], ranges[3])
|
|
|
|
|
|
+ def test_get_child_ranges_other_family(self):
|
|
|
+ """
|
|
|
+ Tests that ranges of a different address family are not returned.
|
|
|
+ """
|
|
|
+ prefix = Prefix.objects.create(prefix=IPNetwork('192.168.0.16/28'))
|
|
|
+ IPRange.objects.bulk_create((
|
|
|
+ IPRange(
|
|
|
+ start_address=IPNetwork('192.168.0.18/28'), end_address=IPNetwork('192.168.0.20/28'), size=3
|
|
|
+ ),
|
|
|
+ IPRange(start_address=IPNetwork('::1/64'), end_address=IPNetwork('::2/64'), size=2),
|
|
|
+ ))
|
|
|
+
|
|
|
+ child_ranges = prefix.get_child_ranges()
|
|
|
+
|
|
|
+ self.assertEqual(len(child_ranges), 1)
|
|
|
+ self.assertEqual(child_ranges[0].start_address, IPNetwork('192.168.0.18/28'))
|
|
|
+
|
|
|
def test_get_child_ips(self):
|
|
|
vrfs = VRF.objects.bulk_create((
|
|
|
VRF(name='VRF 1'),
|
|
|
@@ -330,6 +549,319 @@ class PrefixTestCase(TestCase):
|
|
|
|
|
|
self.assertEqual(available_ips, missing_ips)
|
|
|
|
|
|
+ def test_iter_available_ips(self):
|
|
|
+ """
|
|
|
+ Tests that iter_available_ips() yields the same addresses as get_available_ips() in order.
|
|
|
+ """
|
|
|
+ parent_prefix = Prefix.objects.create(prefix=IPNetwork('10.0.0.0/28'))
|
|
|
+ IPAddress.objects.bulk_create((
|
|
|
+ IPAddress(address=IPNetwork('10.0.0.1/28')),
|
|
|
+ IPAddress(address=IPNetwork('10.0.0.5/28')),
|
|
|
+ ))
|
|
|
+ IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('10.0.0.8/28'),
|
|
|
+ end_address=IPNetwork('10.0.0.9/28'),
|
|
|
+ mark_populated=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ available_ips = list(parent_prefix.iter_available_ips())
|
|
|
+
|
|
|
+ self.assertEqual(available_ips, sorted(parent_prefix.get_available_ips()))
|
|
|
+ self.assertEqual(available_ips[0], netaddr.IPAddress('10.0.0.2'))
|
|
|
+ self.assertEqual(available_ips[-1], netaddr.IPAddress('10.0.0.14'))
|
|
|
+
|
|
|
+ def test_get_available_ips_ipv6(self):
|
|
|
+ """
|
|
|
+ Tests that the subnet-router anycast address is excluded and the last address included.
|
|
|
+ """
|
|
|
+ parent_prefix = Prefix.objects.create(prefix=IPNetwork('2001:db8::/126'))
|
|
|
+ IPAddress.objects.create(address=IPNetwork('2001:db8::1/126'))
|
|
|
+
|
|
|
+ self.assertEqual(parent_prefix.get_available_ips(), IPSet(['2001:db8::2/127']))
|
|
|
+
|
|
|
+ def test_get_available_ips_pool(self):
|
|
|
+ """
|
|
|
+ Tests that pool prefixes include the network and broadcast addresses.
|
|
|
+ """
|
|
|
+ parent_prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/30'), is_pool=True)
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.1/30'))
|
|
|
+
|
|
|
+ self.assertEqual(parent_prefix.get_available_ips(), IPSet(['192.0.2.0/32', '192.0.2.2/31']))
|
|
|
+
|
|
|
+ def test_available_ip_count_distinct_hosts(self):
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/29'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPAddress.objects.bulk_create((
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.1/29')),
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.1/32')),
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.3/29')),
|
|
|
+ ))
|
|
|
+
|
|
|
+ # Usable hosts in /29: 6. Two unique hosts occupy .1 and .3.
|
|
|
+ self.assertEqual(prefix.get_available_ip_count(), 4)
|
|
|
+ self.assertAvailableIPCountMatchesIPSet(prefix)
|
|
|
+
|
|
|
+ def test_available_ip_count_populated_ranges(self):
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/29'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPAddress.objects.bulk_create((
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.1/29')),
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.3/29')), # Inside the populated range; not double-counted.
|
|
|
+ ))
|
|
|
+
|
|
|
+ IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.3/29'),
|
|
|
+ end_address=IPNetwork('192.0.2.4/29'),
|
|
|
+ mark_populated=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Usable 6, one IP outside the range at .1, populated range covers .3-.4.
|
|
|
+ # Available: .2, .5, .6.
|
|
|
+ self.assertEqual(prefix.get_available_ip_count(), 3)
|
|
|
+ self.assertAvailableIPCountMatchesIPSet(prefix)
|
|
|
+
|
|
|
+ def test_available_ip_count_ipv4_pool(self):
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/30'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ is_pool=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ self.assertEqual(prefix.get_available_ip_count(), 4)
|
|
|
+ self.assertAvailableIPCountMatchesIPSet(prefix)
|
|
|
+
|
|
|
+ def test_available_ip_count_ipv4_non_pool(self):
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/30'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ is_pool=False,
|
|
|
+ )
|
|
|
+
|
|
|
+ self.assertEqual(prefix.get_available_ip_count(), 2)
|
|
|
+ self.assertAvailableIPCountMatchesIPSet(prefix)
|
|
|
+
|
|
|
+ def test_available_ip_count_ipv4_non_pool_ignores_unusable_ips(self):
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/30'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Network and broadcast addresses are unusable for non-pool IPv4 prefixes;
|
|
|
+ # an IP assigned to either must not reduce the available count.
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.0/30'))
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.3/30'))
|
|
|
+
|
|
|
+ self.assertEqual(prefix.get_available_ip_count(), 2)
|
|
|
+ self.assertEqual(prefix.get_first_available_ip(), '192.0.2.1/30')
|
|
|
+ self.assertAvailableIPCountMatchesIPSet(prefix)
|
|
|
+
|
|
|
+ def test_available_ip_count_ipv6(self):
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('2001:db8::/126'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ # /126 has 4 addresses; normal IPv6 prefix excludes the first.
|
|
|
+ self.assertEqual(prefix.get_available_ip_count(), 3)
|
|
|
+ self.assertAvailableIPCountMatchesIPSet(prefix)
|
|
|
+
|
|
|
+ def test_available_ip_count_ipv6_ignores_subnet_router_anycast(self):
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('2001:db8::/126'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ # The subnet-router anycast (::) address is unusable for normal IPv6 prefixes;
|
|
|
+ # an IP assigned there must not reduce the available count.
|
|
|
+ IPAddress.objects.create(address=IPNetwork('2001:db8::/126'))
|
|
|
+
|
|
|
+ self.assertEqual(prefix.get_available_ip_count(), 3)
|
|
|
+ self.assertEqual(prefix.get_first_available_ip(), '2001:db8::1/126')
|
|
|
+ self.assertAvailableIPCountMatchesIPSet(prefix)
|
|
|
+
|
|
|
+ def test_available_ip_count_ipv6_127(self):
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('2001:db8::/127'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ self.assertEqual(prefix.get_available_ip_count(), 2)
|
|
|
+ self.assertAvailableIPCountMatchesIPSet(prefix)
|
|
|
+
|
|
|
+ def test_available_ip_count_ipv6_populated_range(self):
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('2001:db8::/126'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('2001:db8::1/126'),
|
|
|
+ end_address=IPNetwork('2001:db8::2/126'),
|
|
|
+ mark_populated=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Usable IPv6 hosts in /126: ::1, ::2, ::3. Populated: ::1-::2.
|
|
|
+ self.assertEqual(prefix.get_available_ip_count(), 1)
|
|
|
+ self.assertEqual(prefix.get_first_available_ip(), '2001:db8::3/126')
|
|
|
+ self.assertAvailableIPCountMatchesIPSet(prefix)
|
|
|
+
|
|
|
+ def test_available_ip_count_overlapping_ranges(self):
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/29'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.1/29'),
|
|
|
+ end_address=IPNetwork('192.0.2.3/29'),
|
|
|
+ mark_populated=True,
|
|
|
+ )
|
|
|
+ IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.2/29'),
|
|
|
+ end_address=IPNetwork('192.0.2.4/29'),
|
|
|
+ mark_populated=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Usable hosts: .1-.6 => 6. Populated union: .1-.4 => 4. Available: .5-.6 => 2.
|
|
|
+ self.assertEqual(prefix.get_available_ip_count(), 2)
|
|
|
+ self.assertEqual(prefix.get_first_available_ip(), '192.0.2.5/29')
|
|
|
+ self.assertAvailableIPCountMatchesIPSet(prefix)
|
|
|
+
|
|
|
+ def test_available_ip_count_vrf(self):
|
|
|
+ vrf1 = VRF.objects.create(name='VRF 1')
|
|
|
+ vrf2 = VRF.objects.create(name='VRF 2')
|
|
|
+
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/29'),
|
|
|
+ vrf=vrf1,
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.1/29'), vrf=vrf1)
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.2/29'), vrf=vrf2)
|
|
|
+
|
|
|
+ # Usable .1-.6 => 6. Only the VRF 1 IP should count.
|
|
|
+ self.assertEqual(prefix.get_available_ip_count(), 5)
|
|
|
+ self.assertAvailableIPCountMatchesIPSet(prefix)
|
|
|
+
|
|
|
+ def test_available_ip_count_vrf_ranges(self):
|
|
|
+ vrf1 = VRF.objects.create(name='VRF 1')
|
|
|
+ vrf2 = VRF.objects.create(name='VRF 2')
|
|
|
+
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/29'),
|
|
|
+ vrf=vrf1,
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Covers every usable host, but in a different VRF.
|
|
|
+ IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.1/29'),
|
|
|
+ end_address=IPNetwork('192.0.2.6/29'),
|
|
|
+ vrf=vrf2,
|
|
|
+ mark_populated=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ self.assertEqual(prefix.get_available_ip_count(), 6)
|
|
|
+ self.assertEqual(prefix.get_first_available_ip(), '192.0.2.1/29')
|
|
|
+ self.assertAvailableIPCountMatchesIPSet(prefix)
|
|
|
+
|
|
|
+ def test_available_ip_count_fully_populated(self):
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/30'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Populated range covers every usable address (.1-.2 in a non-pool /30).
|
|
|
+ IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.1/30'),
|
|
|
+ end_address=IPNetwork('192.0.2.2/30'),
|
|
|
+ mark_populated=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Exercises the early-return paths that skip the child-IP count and
|
|
|
+ # the host-stream iterator entirely.
|
|
|
+ self.assertEqual(prefix.get_available_ip_count(), 0)
|
|
|
+ self.assertIsNone(prefix.get_first_available_ip())
|
|
|
+ self.assertAvailableIPCountMatchesIPSet(prefix)
|
|
|
+
|
|
|
+ def test_available_ip_count_query_count(self):
|
|
|
+ """
|
|
|
+ Tests that the count runs one interval query plus exactly one host scan.
|
|
|
+ """
|
|
|
+ prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24'))
|
|
|
+
|
|
|
+ IPAddress.objects.bulk_create(
|
|
|
+ IPAddress(address=IPNetwork(f'192.0.2.{i}/24')) for i in range(1, 11)
|
|
|
+ )
|
|
|
+
|
|
|
+ IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.20/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.29/24'),
|
|
|
+ mark_populated=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ with self.assertNumQueries(2):
|
|
|
+ prefix.get_available_ip_count()
|
|
|
+
|
|
|
+ def test_available_ip_count_container(self):
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/24'),
|
|
|
+ status=PrefixStatusChoices.STATUS_CONTAINER,
|
|
|
+ )
|
|
|
+
|
|
|
+ # A child prefix exists but does not reduce the available IP count.
|
|
|
+ Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/26'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ self.assertEqual(prefix.get_available_ip_count(), 254)
|
|
|
+ self.assertAvailableIPCountMatchesIPSet(prefix)
|
|
|
+
|
|
|
+ def test_available_ip_count_container_vrf_duplicate_hosts(self):
|
|
|
+ vrf1 = VRF.objects.create(name='VRF 1')
|
|
|
+ vrf2 = VRF.objects.create(name='VRF 2')
|
|
|
+
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/24'),
|
|
|
+ status=PrefixStatusChoices.STATUS_CONTAINER,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'), vrf=vrf1)
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'), vrf=vrf2)
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.2/24'), vrf=vrf2)
|
|
|
+
|
|
|
+ # A global container counts child IPs from all VRFs; the duplicate host
|
|
|
+ # counts once. 254 usable - 2 distinct hosts.
|
|
|
+ self.assertEqual(prefix.get_available_ip_count(), 252)
|
|
|
+ self.assertAvailableIPCountMatchesIPSet(prefix)
|
|
|
+
|
|
|
+ def test_available_ip_count_container_vrf_ip_in_populated_range(self):
|
|
|
+ vrf1 = VRF.objects.create(name='VRF 1')
|
|
|
+
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/24'),
|
|
|
+ status=PrefixStatusChoices.STATUS_CONTAINER,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.10/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.19/24'),
|
|
|
+ mark_populated=True,
|
|
|
+ )
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.15/24'), vrf=vrf1)
|
|
|
+
|
|
|
+ # The range covers 10 hosts; the VRF 1 IP inside it is not counted again.
|
|
|
+ self.assertEqual(prefix.get_available_ip_count(), 244)
|
|
|
+ self.assertAvailableIPCountMatchesIPSet(prefix)
|
|
|
+
|
|
|
def test_get_first_available_prefix(self):
|
|
|
|
|
|
prefixes = Prefix.objects.bulk_create((
|
|
|
@@ -368,6 +900,42 @@ class PrefixTestCase(TestCase):
|
|
|
parent_prefix = Prefix.objects.create(prefix=IPNetwork('2001:db8:500:5::/127'))
|
|
|
self.assertEqual(parent_prefix.get_first_available_ip(), '2001:db8:500:5::/127')
|
|
|
|
|
|
+ def test_get_first_available_ip_ipv6_zero_address(self):
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('::/126'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Normal IPv6 prefixes exclude the subnet-router anycast address ::.
|
|
|
+ self.assertEqual(prefix.get_first_available_ip(), '::1/126')
|
|
|
+
|
|
|
+ def test_get_first_available_ip_populated_ranges(self):
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/29'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.1/29'))
|
|
|
+
|
|
|
+ IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.2/29'),
|
|
|
+ end_address=IPNetwork('192.0.2.3/29'),
|
|
|
+ mark_populated=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ self.assertEqual(prefix.get_first_available_ip(), '192.0.2.4/29')
|
|
|
+
|
|
|
+ def test_get_first_available_ip_full(self):
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/30'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.1/30'))
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.2/30'))
|
|
|
+
|
|
|
+ self.assertIsNone(prefix.get_first_available_ip())
|
|
|
+
|
|
|
def test_get_utilization_container(self):
|
|
|
prefixes = (
|
|
|
Prefix(prefix=IPNetwork('10.0.0.0/24'), status=PrefixStatusChoices.STATUS_CONTAINER),
|
|
|
@@ -397,6 +965,276 @@ class PrefixTestCase(TestCase):
|
|
|
)
|
|
|
self.assertEqual(prefix.get_utilization(), 64 / 254 * 100) # ~25% utilization
|
|
|
|
|
|
+ def test_get_utilization_distinct_hosts(self):
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/24'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPAddress.objects.bulk_create((
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.10/24')),
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.10/32')),
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.11/24')),
|
|
|
+ ))
|
|
|
+
|
|
|
+ # Two unique occupied hosts over 254 usable IPv4 addresses.
|
|
|
+ self.assertEqual(prefix.get_utilization(), 2 / 254 * 100)
|
|
|
+
|
|
|
+ @override_settings(ENFORCE_GLOBAL_UNIQUE=False)
|
|
|
+ def test_get_utilization_duplicate_ips_global(self):
|
|
|
+ """
|
|
|
+ Tests that identical global IPs permitted by disabled uniqueness count as one host.
|
|
|
+ """
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/24'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.10/24'))
|
|
|
+ duplicate_ip = IPAddress(address=IPNetwork('192.0.2.10/24'))
|
|
|
+ self.assertIsNone(duplicate_ip.clean())
|
|
|
+ duplicate_ip.save()
|
|
|
+
|
|
|
+ self.assertEqual(prefix.get_utilization(), 1 / 254 * 100)
|
|
|
+
|
|
|
+ def test_get_utilization_duplicate_ips_vrf(self):
|
|
|
+ """
|
|
|
+ Tests that identical IPs in a non-unique VRF count as one host.
|
|
|
+ """
|
|
|
+ vrf = VRF.objects.create(name='VRF 1', enforce_unique=False)
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/24'),
|
|
|
+ vrf=vrf,
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.10/24'), vrf=vrf)
|
|
|
+ duplicate_ip = IPAddress(address=IPNetwork('192.0.2.10/24'), vrf=vrf)
|
|
|
+ self.assertIsNone(duplicate_ip.clean())
|
|
|
+ duplicate_ip.save()
|
|
|
+
|
|
|
+ self.assertEqual(prefix.get_utilization(), 1 / 254 * 100)
|
|
|
+
|
|
|
+ def test_available_ip_count_duplicate_ips_vrf(self):
|
|
|
+ """
|
|
|
+ Tests that identical IPs in a non-unique VRF reduce availability once.
|
|
|
+ """
|
|
|
+ vrf = VRF.objects.create(name='VRF 1', enforce_unique=False)
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/29'),
|
|
|
+ vrf=vrf,
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPAddress.objects.bulk_create((
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.1/29'), vrf=vrf),
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.1/29'), vrf=vrf),
|
|
|
+ ))
|
|
|
+
|
|
|
+ # Usable hosts in /29: 6. The duplicate occupies a single host.
|
|
|
+ self.assertEqual(prefix.get_available_ip_count(), 5)
|
|
|
+ self.assertAvailableIPCountMatchesIPSet(prefix)
|
|
|
+
|
|
|
+ @override_settings(ENFORCE_GLOBAL_UNIQUE=False)
|
|
|
+ def test_get_ip_usage_summary_duplicate_ips_global(self):
|
|
|
+ """
|
|
|
+ Tests that the usage summary deduplicates identical global IPs in both values.
|
|
|
+ """
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/24'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPAddress.objects.bulk_create((
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.10/24')),
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.10/24')),
|
|
|
+ ))
|
|
|
+
|
|
|
+ summary = prefix.get_ip_usage_summary()
|
|
|
+
|
|
|
+ self.assertEqual(summary['available_ip_count'], 253)
|
|
|
+ self.assertEqual(summary['utilization'], 1 / 254 * 100)
|
|
|
+
|
|
|
+ def test_get_utilization_utilized_ranges(self):
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/24'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.10/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.19/24'),
|
|
|
+ mark_utilized=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPAddress.objects.bulk_create((
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.1/24')),
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.10/24')),
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.11/24')),
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.20/24')),
|
|
|
+ ))
|
|
|
+
|
|
|
+ # Utilized range contributes 10 hosts; IPs inside the range are not double-counted.
|
|
|
+ # Outside IPs: .1 and .20 => 2 more.
|
|
|
+ self.assertEqual(prefix.get_utilization(), 12 / 254 * 100)
|
|
|
+
|
|
|
+ def test_get_utilization_overlapping_utilized_ranges(self):
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/24'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.10/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.19/24'),
|
|
|
+ mark_utilized=True,
|
|
|
+ )
|
|
|
+ IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.15/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.24/24'),
|
|
|
+ mark_utilized=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Union is .10-.24 => 15 hosts, not 20.
|
|
|
+ self.assertEqual(prefix.get_utilization(), 15 / 254 * 100)
|
|
|
+
|
|
|
+ def test_get_utilization_fully_utilized_range(self):
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/24'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Utilized range covers every usable host (.1-.254 in a non-pool /24).
|
|
|
+ IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.1/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.254/24'),
|
|
|
+ mark_utilized=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Exercises the early-return path that skips the child-IP count entirely.
|
|
|
+ self.assertEqual(prefix.get_utilization(), 100)
|
|
|
+
|
|
|
+ def test_get_utilization_ipv6_utilized_range(self):
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('2001:db8::/126'),
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('2001:db8::1/126'),
|
|
|
+ end_address=IPNetwork('2001:db8::2/126'),
|
|
|
+ mark_utilized=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ self.assertEqual(prefix.get_utilization(), 2 / 4 * 100)
|
|
|
+
|
|
|
+ def test_get_utilization_vrf(self):
|
|
|
+ vrf1 = VRF.objects.create(name='VRF 1')
|
|
|
+ vrf2 = VRF.objects.create(name='VRF 2')
|
|
|
+
|
|
|
+ prefix = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/24'),
|
|
|
+ vrf=vrf1,
|
|
|
+ status=PrefixStatusChoices.STATUS_ACTIVE,
|
|
|
+ )
|
|
|
+
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.1/24'), vrf=vrf1)
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.15/24'), vrf=vrf1)
|
|
|
+ IPAddress.objects.create(address=IPNetwork('192.0.2.2/24'), vrf=vrf2)
|
|
|
+ IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.10/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.19/24'),
|
|
|
+ vrf=vrf2,
|
|
|
+ mark_utilized=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ # VRF 2 objects are ignored entirely; the VRF 1 IP at .15 still counts even
|
|
|
+ # though it falls inside the VRF 2 range's host span (exclusion intervals are
|
|
|
+ # built only from same-VRF ranges).
|
|
|
+ self.assertEqual(prefix.get_utilization(), 2 / 254 * 100)
|
|
|
+
|
|
|
+ def test_get_utilization_query_count(self):
|
|
|
+ """
|
|
|
+ Tests that utilization for a non-container prefix uses two queries.
|
|
|
+ """
|
|
|
+ prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24'))
|
|
|
+
|
|
|
+ with self.assertNumQueries(2):
|
|
|
+ prefix.get_utilization()
|
|
|
+
|
|
|
+ def test_get_ip_usage_summary(self):
|
|
|
+ """
|
|
|
+ Tests that the combined summary matches the independent methods.
|
|
|
+ """
|
|
|
+ prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24'))
|
|
|
+ IPAddress.objects.bulk_create((
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.1/24')),
|
|
|
+ IPAddress(address=IPNetwork('192.0.2.2/24')),
|
|
|
+ ))
|
|
|
+ IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.10/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.19/24'),
|
|
|
+ mark_utilized=True,
|
|
|
+ )
|
|
|
+ IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('192.0.2.30/24'),
|
|
|
+ end_address=IPNetwork('192.0.2.39/24'),
|
|
|
+ mark_populated=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ summary = prefix.get_ip_usage_summary()
|
|
|
+
|
|
|
+ self.assertEqual(summary['available_ip_count'], prefix.get_available_ip_count())
|
|
|
+ self.assertEqual(summary['utilization'], prefix.get_utilization())
|
|
|
+
|
|
|
+ def test_get_ip_usage_summary_query_count(self):
|
|
|
+ """
|
|
|
+ Tests that the combined summary uses a single distinct-host scan (three queries).
|
|
|
+ """
|
|
|
+ prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24'))
|
|
|
+
|
|
|
+ with self.assertNumQueries(3):
|
|
|
+ prefix.get_ip_usage_summary()
|
|
|
+
|
|
|
+ def test_get_ip_usage_summary_container(self):
|
|
|
+ """
|
|
|
+ Tests that the summary delegates to the independent methods for containers.
|
|
|
+ """
|
|
|
+ container = Prefix.objects.create(
|
|
|
+ prefix=IPNetwork('192.0.2.0/24'),
|
|
|
+ status=PrefixStatusChoices.STATUS_CONTAINER,
|
|
|
+ )
|
|
|
+
|
|
|
+ summary = container.get_ip_usage_summary()
|
|
|
+
|
|
|
+ self.assertEqual(summary['available_ip_count'], container.get_available_ip_count())
|
|
|
+ self.assertEqual(summary['utilization'], container.get_utilization())
|
|
|
+
|
|
|
+ def test_get_ip_usage_summary_mark_utilized(self):
|
|
|
+ """
|
|
|
+ Tests that a marked-utilized prefix reports 100% utilization in the summary.
|
|
|
+ """
|
|
|
+ prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24'), mark_utilized=True)
|
|
|
+
|
|
|
+ summary = prefix.get_ip_usage_summary()
|
|
|
+
|
|
|
+ self.assertEqual(summary['utilization'], 100)
|
|
|
+ self.assertEqual(summary['available_ip_count'], prefix.get_available_ip_count())
|
|
|
+
|
|
|
+ def test_usable_size(self):
|
|
|
+ self.assertEqual(Prefix(prefix=IPNetwork('192.0.2.0/24')).usable_size, 254)
|
|
|
+ self.assertEqual(Prefix(prefix=IPNetwork('192.0.2.0/24'), is_pool=True).usable_size, 256)
|
|
|
+ self.assertEqual(Prefix(prefix=IPNetwork('2001:db8::/126')).usable_size, 3)
|
|
|
+
|
|
|
+ def test_usable_ip_bounds_string_prefix(self):
|
|
|
+ """
|
|
|
+ Tests that usable bounds are computed for a string-assigned prefix.
|
|
|
+ """
|
|
|
+ first_ip, last_ip = Prefix(prefix='192.0.2.0/24').usable_ip_bounds
|
|
|
+
|
|
|
+ self.assertEqual(first_ip, netaddr.IPAddress('192.0.2.1'))
|
|
|
+ self.assertEqual(last_ip, netaddr.IPAddress('192.0.2.254'))
|
|
|
+
|
|
|
#
|
|
|
# Uniqueness enforcement tests
|
|
|
#
|
|
|
@@ -622,6 +1460,35 @@ class PrefixHierarchyTestCase(TestCase):
|
|
|
self.assertEqual(prefixes[3]._depth, 2)
|
|
|
self.assertEqual(prefixes[3]._children, 0)
|
|
|
|
|
|
+ def test_rebuild_prefixes_accepts_vrf_identifier(self):
|
|
|
+ # None means "global table". Wipe the precomputed hierarchy so the rebuild is observable.
|
|
|
+ Prefix.objects.update(_depth=0, _children=0)
|
|
|
+
|
|
|
+ rebuild_prefixes(None)
|
|
|
+
|
|
|
+ top = Prefix.objects.get(prefix='10.0.0.0/8')
|
|
|
+ mid = Prefix.objects.get(prefix='10.0.0.0/16')
|
|
|
+ leaf = Prefix.objects.get(prefix='10.0.0.0/24')
|
|
|
+ self.assertEqual((top._depth, top._children), (0, 2))
|
|
|
+ self.assertEqual((mid._depth, mid._children), (1, 1))
|
|
|
+ self.assertEqual((leaf._depth, leaf._children), (2, 0))
|
|
|
+
|
|
|
+ def test_rebuild_prefixes_accepts_vrf_pk(self):
|
|
|
+ # A VRF pk filters to that VRF's prefixes.
|
|
|
+ vrf = VRF.objects.create(name='VRF 1')
|
|
|
+ Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24'), vrf=vrf)
|
|
|
+ Prefix.objects.create(prefix=IPNetwork('192.0.2.0/25'), vrf=vrf)
|
|
|
+
|
|
|
+ # Reset depth/children so the rebuild has something to restore.
|
|
|
+ Prefix.objects.filter(vrf=vrf).update(_depth=0, _children=0)
|
|
|
+
|
|
|
+ rebuild_prefixes(vrf.pk)
|
|
|
+
|
|
|
+ parent = Prefix.objects.get(prefix='192.0.2.0/24', vrf=vrf)
|
|
|
+ child = Prefix.objects.get(prefix='192.0.2.0/25', vrf=vrf)
|
|
|
+ self.assertEqual((parent._depth, parent._children), (0, 1))
|
|
|
+ self.assertEqual((child._depth, child._children), (1, 0))
|
|
|
+
|
|
|
|
|
|
class IPAddressTestCase(TestCase):
|
|
|
|
|
|
@@ -717,6 +1584,20 @@ class IPAddressTestCase(TestCase):
|
|
|
with self.assertRaisesMessage(ValidationError, 'Cannot create IP address'):
|
|
|
ipaddress.clean()
|
|
|
|
|
|
+ def test_populated_range_blocks_ip_with_different_mask(self):
|
|
|
+ # The populated-range check compares by host portion, so a different mask
|
|
|
+ # must not let an IPAddress slip past validation.
|
|
|
+ IPRange.objects.create(
|
|
|
+ start_address=IPNetwork('10.0.0.2/24'),
|
|
|
+ end_address=IPNetwork('10.0.0.254/24'),
|
|
|
+ mark_populated=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ ip = IPAddress(address=IPNetwork('10.0.0.2/32'))
|
|
|
+
|
|
|
+ with self.assertRaises(ValidationError):
|
|
|
+ ip.full_clean()
|
|
|
+
|
|
|
|
|
|
class VLANGroupTestCase(TestCase):
|
|
|
|
|
|
@@ -926,3 +1807,119 @@ class VLANTestCase(TestCase):
|
|
|
vlan.group = vlangroups[2]
|
|
|
with self.assertRaises(ValidationError):
|
|
|
vlan.full_clean()
|
|
|
+
|
|
|
+ def test_vlan_group_vid_validation_with_null_vid(self):
|
|
|
+ """A missing VID on a grouped VLAN raises a ValidationError, not a TypeError."""
|
|
|
+ group = VLANGroup.objects.create(name='VLAN Group 1', slug='vlan-group-1')
|
|
|
+ vlan = VLAN(name='VLAN X', vid=None, group=group)
|
|
|
+ with self.assertRaises(ValidationError):
|
|
|
+ vlan.full_clean()
|
|
|
+
|
|
|
+
|
|
|
+class PrefixGetChildIPsTestCase(TestCase):
|
|
|
+ @classmethod
|
|
|
+ def setUpTestData(cls):
|
|
|
+ cls.prefix = Prefix.objects.create(prefix='10.0.0.0/24')
|
|
|
+ IPAddress.objects.bulk_create((
|
|
|
+ IPAddress(address='10.0.0.0/24'), # Network address (inside containment)
|
|
|
+ IPAddress(address='10.0.0.1/24'),
|
|
|
+ IPAddress(address='10.0.0.255/24'), # Broadcast address (inside containment)
|
|
|
+ IPAddress(address='10.0.1.1/24'), # Outside the prefix
|
|
|
+ ))
|
|
|
+
|
|
|
+ def test_get_child_ips_matches_net_host_contained(self):
|
|
|
+ """get_child_ips returns the same IPs as the net_host_contained containment lookup."""
|
|
|
+ expected = set(
|
|
|
+ IPAddress.objects.filter(
|
|
|
+ address__net_host_contained=str(self.prefix.prefix), vrf=None
|
|
|
+ ).values_list('pk', flat=True)
|
|
|
+ )
|
|
|
+ actual = set(self.prefix.get_child_ips().values_list('pk', flat=True))
|
|
|
+ self.assertEqual(actual, expected)
|
|
|
+ self.assertEqual(len(actual), 3)
|
|
|
+
|
|
|
+ def test_get_child_ips_sql_avoids_containment_recheck(self):
|
|
|
+ """get_child_ips filters on an inet host range, not the <<= containment operator."""
|
|
|
+ sql = str(self.prefix.get_child_ips().query)
|
|
|
+ self.assertNotIn('<<=', sql)
|
|
|
+
|
|
|
+ def test_get_child_ips_container_in_global_table_spans_vrfs(self):
|
|
|
+ """A container prefix in the global table returns child IPs from any VRF."""
|
|
|
+ vrf = VRF.objects.create(name='VRF 1')
|
|
|
+ container = Prefix.objects.create(
|
|
|
+ prefix='10.1.0.0/24', status=PrefixStatusChoices.STATUS_CONTAINER,
|
|
|
+ )
|
|
|
+ in_vrf = IPAddress.objects.create(address='10.1.0.5/24', vrf=vrf)
|
|
|
+ in_global = IPAddress.objects.create(address='10.1.0.6/24')
|
|
|
+ child_pks = set(container.get_child_ips().values_list('pk', flat=True))
|
|
|
+ self.assertEqual(child_pks, {in_vrf.pk, in_global.pk})
|
|
|
+
|
|
|
+
|
|
|
+class ServiceTemplateTestCase(TestCase):
|
|
|
+
|
|
|
+ def test_servicetemplate_lowest_port(self):
|
|
|
+ """
|
|
|
+ Test lowest port setting for servicetemplate
|
|
|
+ """
|
|
|
+ template = ServiceTemplate(
|
|
|
+ name='Template 1',
|
|
|
+ protocol=ServiceProtocolChoices.PROTOCOL_TCP,
|
|
|
+ ports=[80, 443, 22, 8080], # small test list
|
|
|
+ )
|
|
|
+ template.full_clean()
|
|
|
+ template.save()
|
|
|
+ self.assertEqual(template._ports_lowest, 22)
|
|
|
+
|
|
|
+ def test_servicetemplate_single_port(self):
|
|
|
+ """
|
|
|
+ Test with a single port
|
|
|
+ """
|
|
|
+ template = ServiceTemplate(
|
|
|
+ name='Template 2',
|
|
|
+ protocol=ServiceProtocolChoices.PROTOCOL_UDP,
|
|
|
+ ports=[53],
|
|
|
+ )
|
|
|
+ template.full_clean()
|
|
|
+ template.save()
|
|
|
+ self.assertEqual(template._ports_lowest, 53)
|
|
|
+
|
|
|
+ def test_servicetemplate_empty_ports(self):
|
|
|
+ """
|
|
|
+ Test with empty ports list
|
|
|
+ """
|
|
|
+ template = ServiceTemplate(
|
|
|
+ name='Template 3',
|
|
|
+ protocol=ServiceProtocolChoices.PROTOCOL_TCP,
|
|
|
+ ports=[],
|
|
|
+ )
|
|
|
+ self.assertRaises(ValidationError, template.full_clean)
|
|
|
+
|
|
|
+
|
|
|
+class ServiceTestCase(TestCase):
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def setUpTestData(cls):
|
|
|
+ site = Site.objects.create(
|
|
|
+ name='Site 1',
|
|
|
+ slug='site-1',
|
|
|
+ )
|
|
|
+ VirtualMachine.objects.create(
|
|
|
+ name='virtual machine 1',
|
|
|
+ site=site,
|
|
|
+ )
|
|
|
+
|
|
|
+ def test_large_service(self):
|
|
|
+ """
|
|
|
+ Test creation of service with large number of ports.
|
|
|
+ Related to issue #22273
|
|
|
+ """
|
|
|
+ service = Service(
|
|
|
+ name='Service 1',
|
|
|
+ protocol=ServiceProtocolChoices.PROTOCOL_TCP,
|
|
|
+ ports=list(range(SERVICE_PORT_MIN, SERVICE_PORT_MAX)),
|
|
|
+ parent=VirtualMachine.objects.first(),
|
|
|
+ )
|
|
|
+ service.full_clean()
|
|
|
+ # Testing .save() is the important part, to check for database problems
|
|
|
+ service.save()
|
|
|
+ self.assertEqual(service._ports_lowest, SERVICE_PORT_MIN)
|