utils.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import netaddr
  2. from .constants import *
  3. from .models import Prefix, VLAN
  4. def add_available_prefixes(parent, prefix_list):
  5. """
  6. Create fake Prefix objects for all unallocated space within a prefix.
  7. """
  8. # Find all unallocated space
  9. available_prefixes = netaddr.IPSet(parent) ^ netaddr.IPSet([p.prefix for p in prefix_list])
  10. available_prefixes = [Prefix(prefix=p) for p in available_prefixes.iter_cidrs()]
  11. # Concatenate and sort complete list of children
  12. prefix_list = list(prefix_list) + available_prefixes
  13. prefix_list.sort(key=lambda p: p.prefix)
  14. return prefix_list
  15. def add_available_ipaddresses(prefix, ipaddress_list, is_pool=False):
  16. """
  17. Annotate ranges of available IP addresses within a given prefix. If is_pool is True, the first and last IP will be
  18. considered usable (regardless of mask length).
  19. """
  20. output = []
  21. prev_ip = None
  22. # Ignore the network and broadcast addresses for non-pool IPv4 prefixes larger than /31.
  23. if prefix.version == 4 and prefix.prefixlen < 31 and not is_pool:
  24. first_ip_in_prefix = netaddr.IPAddress(prefix.first + 1)
  25. last_ip_in_prefix = netaddr.IPAddress(prefix.last - 1)
  26. else:
  27. first_ip_in_prefix = netaddr.IPAddress(prefix.first)
  28. last_ip_in_prefix = netaddr.IPAddress(prefix.last)
  29. if not ipaddress_list:
  30. return [(
  31. int(last_ip_in_prefix - first_ip_in_prefix + 1),
  32. '{}/{}'.format(first_ip_in_prefix, prefix.prefixlen)
  33. )]
  34. # Account for any available IPs before the first real IP
  35. if ipaddress_list[0].address.ip > first_ip_in_prefix:
  36. skipped_count = int(ipaddress_list[0].address.ip - first_ip_in_prefix)
  37. first_skipped = '{}/{}'.format(first_ip_in_prefix, prefix.prefixlen)
  38. output.append((skipped_count, first_skipped))
  39. # Iterate through existing IPs and annotate free ranges
  40. for ip in ipaddress_list:
  41. if prev_ip:
  42. diff = int(ip.address.ip - prev_ip.address.ip)
  43. if diff > 1:
  44. first_skipped = '{}/{}'.format(prev_ip.address.ip + 1, prefix.prefixlen)
  45. output.append((diff - 1, first_skipped))
  46. output.append(ip)
  47. prev_ip = ip
  48. # Include any remaining available IPs
  49. if prev_ip.address.ip < last_ip_in_prefix:
  50. skipped_count = int(last_ip_in_prefix - prev_ip.address.ip)
  51. first_skipped = '{}/{}'.format(prev_ip.address.ip + 1, prefix.prefixlen)
  52. output.append((skipped_count, first_skipped))
  53. return output
  54. def add_available_vlans(vlan_group, vlans):
  55. """
  56. Create fake records for all gaps between used VLANs
  57. """
  58. if not vlans:
  59. return [{'vid': VLAN_VID_MIN, 'available': VLAN_VID_MAX - VLAN_VID_MIN + 1}]
  60. prev_vid = VLAN_VID_MAX
  61. new_vlans = []
  62. for vlan in vlans:
  63. if vlan.vid - prev_vid > 1:
  64. new_vlans.append({'vid': prev_vid + 1, 'available': vlan.vid - prev_vid - 1})
  65. prev_vid = vlan.vid
  66. if vlans[0].vid > VLAN_VID_MIN:
  67. new_vlans.append({'vid': VLAN_VID_MIN, 'available': vlans[0].vid - VLAN_VID_MIN})
  68. if prev_vid < VLAN_VID_MAX:
  69. new_vlans.append({'vid': prev_vid + 1, 'available': VLAN_VID_MAX - prev_vid})
  70. vlans = list(vlans) + new_vlans
  71. vlans.sort(key=lambda v: v.vid if type(v) == VLAN else v['vid'])
  72. return vlans