utils.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. from dataclasses import dataclass
  2. import netaddr
  3. from django.utils.translation import gettext_lazy as _
  4. from .constants import *
  5. from .models import VLAN, Prefix
  6. __all__ = (
  7. 'AvailableIPSpace',
  8. 'add_available_vlans',
  9. 'add_requested_prefixes',
  10. 'annotate_ip_space',
  11. 'get_next_available_prefix',
  12. 'rebuild_prefixes',
  13. )
  14. @dataclass
  15. class AvailableIPSpace:
  16. """
  17. A representation of available IP space between two IP addresses/ranges.
  18. """
  19. size: int
  20. first_ip: str
  21. @property
  22. def title(self):
  23. if self.size == 1:
  24. return _('1 IP available')
  25. if self.size <= 65536:
  26. return _('{count} IPs available').format(count=self.size)
  27. return _('Many IPs available')
  28. def add_requested_prefixes(parent, prefix_list, show_available=True, show_assigned=True):
  29. """
  30. Return a list of requested prefixes using show_available, show_assigned filters. If available prefixes are
  31. requested, create fake Prefix objects for all unallocated space within a prefix.
  32. :param parent: Parent Prefix instance
  33. :param prefix_list: Child prefixes list
  34. :param show_available: Include available prefixes.
  35. :param show_assigned: Show assigned prefixes.
  36. """
  37. child_prefixes = []
  38. # Add available prefixes to the table if requested
  39. if prefix_list and show_available:
  40. # Find all unallocated space, add fake Prefix objects to child_prefixes.
  41. # IMPORTANT: These are unsaved Prefix instances (pk=None). If this is ever changed to use
  42. # saved Prefix instances with real pks, bulk delete will fail for mixed-type selections
  43. # due to single-model form validation. See: https://github.com/netbox-community/netbox/issues/21176
  44. available_prefixes = netaddr.IPSet(parent) ^ netaddr.IPSet([p.prefix for p in prefix_list])
  45. available_prefixes = [Prefix(prefix=p, status=None) for p in available_prefixes.iter_cidrs()]
  46. child_prefixes = child_prefixes + available_prefixes
  47. # Add assigned prefixes to the table if requested
  48. if prefix_list and show_assigned:
  49. child_prefixes = child_prefixes + list(prefix_list)
  50. # Sort child prefixes after additions
  51. child_prefixes.sort(key=lambda p: p.prefix)
  52. return child_prefixes
  53. def annotate_ip_space(prefix):
  54. # Compile child objects
  55. records = []
  56. records.extend([
  57. (iprange.start_address.ip, iprange) for iprange in prefix.get_child_ranges(mark_populated=True)
  58. ])
  59. records.extend([
  60. (ip.address.ip, ip) for ip in prefix.get_child_ips()
  61. ])
  62. records = sorted(records, key=lambda x: x[0])
  63. # Determine the first & last valid IP addresses in the prefix
  64. if (
  65. prefix.is_pool
  66. or (prefix.family == 4 and prefix.mask_length >= 31)
  67. or (prefix.family == 6 and prefix.mask_length >= 127)
  68. ):
  69. # Pool, IPv4 /31-/32 or IPv6 /127-/128 sets are fully usable
  70. first_ip_in_prefix = netaddr.IPAddress(prefix.prefix.first)
  71. last_ip_in_prefix = netaddr.IPAddress(prefix.prefix.last)
  72. elif prefix.family == 4:
  73. # Ignore the network and broadcast addresses for non-pool IPv4 prefixes larger than /31
  74. first_ip_in_prefix = netaddr.IPAddress(prefix.prefix.first + 1)
  75. last_ip_in_prefix = netaddr.IPAddress(prefix.prefix.last - 1)
  76. else:
  77. # For IPv6 prefixes, omit the Subnet-Router anycast address (RFC 4291)
  78. first_ip_in_prefix = netaddr.IPAddress(prefix.prefix.first + 1)
  79. last_ip_in_prefix = netaddr.IPAddress(prefix.prefix.last)
  80. if not records:
  81. return [
  82. AvailableIPSpace(
  83. size=int(last_ip_in_prefix - first_ip_in_prefix + 1),
  84. first_ip=f'{first_ip_in_prefix}/{prefix.mask_length}'
  85. )
  86. ]
  87. output = []
  88. prev_ip = None
  89. # Account for any available IPs before the first real IP
  90. if records[0][0] > first_ip_in_prefix:
  91. output.append(AvailableIPSpace(
  92. size=int(records[0][0] - first_ip_in_prefix),
  93. first_ip=f'{first_ip_in_prefix}/{prefix.mask_length}'
  94. ))
  95. # Add IP ranges & addresses, annotating available space in between records
  96. for record in records:
  97. if prev_ip:
  98. # Annotate available space
  99. if (diff := int(record[0]) - int(prev_ip)) > 1:
  100. first_skipped = f'{prev_ip + 1}/{prefix.mask_length}'
  101. output.append(AvailableIPSpace(
  102. size=diff - 1,
  103. first_ip=first_skipped
  104. ))
  105. output.append(record[1])
  106. # Update the previous IP address
  107. if hasattr(record[1], 'end_address'):
  108. prev_ip = record[1].end_address.ip
  109. else:
  110. prev_ip = record[0]
  111. # Include any remaining available IPs
  112. if prev_ip < last_ip_in_prefix:
  113. output.append(AvailableIPSpace(
  114. size=int(last_ip_in_prefix - prev_ip),
  115. first_ip=f'{prev_ip + 1}/{prefix.mask_length}'
  116. ))
  117. return output
  118. def available_vlans_from_range(vlans, vlan_group, vid_range):
  119. """
  120. Create fake records for all gaps between used VLANs
  121. """
  122. min_vid = int(vid_range.lower) if vid_range else VLAN_VID_MIN
  123. max_vid = int(vid_range.upper) if vid_range else VLAN_VID_MAX
  124. if not vlans:
  125. return [{
  126. 'vid': min_vid,
  127. 'vlan_group': vlan_group,
  128. 'available': max_vid - min_vid
  129. }]
  130. prev_vid = min_vid - 1
  131. new_vlans = []
  132. for vlan in vlans:
  133. # Ignore VIDs outside the range
  134. if not min_vid <= vlan.vid < max_vid:
  135. continue
  136. # Annotate any available VIDs between the previous (or minimum) VID
  137. # and the current VID
  138. if vlan.vid - prev_vid > 1:
  139. new_vlans.append({
  140. 'vid': prev_vid + 1,
  141. 'vlan_group': vlan_group,
  142. 'available': vlan.vid - prev_vid - 1,
  143. })
  144. prev_vid = vlan.vid
  145. # Annotate any remaining available VLANs
  146. if prev_vid < max_vid - 1:
  147. new_vlans.append({
  148. 'vid': prev_vid + 1,
  149. 'vlan_group': vlan_group,
  150. 'available': max_vid - prev_vid - 1,
  151. })
  152. return new_vlans
  153. def add_available_vlans(vlans, vlan_group):
  154. """
  155. Create fake records for all gaps between used VLANs
  156. """
  157. new_vlans = []
  158. for vid_range in vlan_group.vid_ranges:
  159. new_vlans.extend(available_vlans_from_range(vlans, vlan_group, vid_range))
  160. vlans = list(vlans) + new_vlans
  161. vlans.sort(key=lambda v: v.vid if type(v) is VLAN else v['vid'])
  162. return vlans
  163. def rebuild_prefixes(vrf):
  164. """
  165. Rebuild the prefix hierarchy for all prefixes in the specified VRF (or global table).
  166. """
  167. def contains(parent, child):
  168. return child in parent and child != parent
  169. def push_to_stack(prefix):
  170. # Increment child count on parent nodes
  171. for n in stack:
  172. n['children'] += 1
  173. stack.append({
  174. 'pk': [prefix['pk']],
  175. 'prefix': prefix['prefix'],
  176. 'children': 0,
  177. })
  178. stack = []
  179. update_queue = []
  180. prefixes = Prefix.objects.filter(vrf=vrf).values('pk', 'prefix')
  181. # Iterate through all Prefixes in the VRF, growing and shrinking the stack as we go
  182. for i, p in enumerate(prefixes):
  183. # Grow the stack if this is a child of the most recent prefix
  184. if not stack or contains(stack[-1]['prefix'], p['prefix']):
  185. push_to_stack(p)
  186. # Handle duplicate prefixes
  187. elif stack[-1]['prefix'] == p['prefix']:
  188. stack[-1]['pk'].append(p['pk'])
  189. # If this is a sibling or parent of the most recent prefix, pop nodes from the
  190. # stack until we reach a parent prefix (or the root)
  191. else:
  192. while stack and not contains(stack[-1]['prefix'], p['prefix']):
  193. node = stack.pop()
  194. for pk in node['pk']:
  195. update_queue.append(
  196. Prefix(pk=pk, _depth=len(stack), _children=node['children'])
  197. )
  198. push_to_stack(p)
  199. # Flush the update queue once it reaches 100 Prefixes
  200. if len(update_queue) >= 100:
  201. Prefix.objects.bulk_update(update_queue, ['_depth', '_children'])
  202. update_queue = []
  203. # Clear out any prefixes remaining in the stack
  204. while stack:
  205. node = stack.pop()
  206. for pk in node['pk']:
  207. update_queue.append(
  208. Prefix(pk=pk, _depth=len(stack), _children=node['children'])
  209. )
  210. # Final flush of any remaining Prefixes
  211. Prefix.objects.bulk_update(update_queue, ['_depth', '_children'])
  212. def get_next_available_prefix(ipset, prefix_size):
  213. """
  214. Given a prefix length, allocate the next available prefix from an IPSet.
  215. """
  216. for available_prefix in ipset.iter_cidrs():
  217. if prefix_size >= available_prefix.prefixlen:
  218. allocated_prefix = f"{available_prefix.network}/{prefix_size}"
  219. ipset.remove(allocated_prefix)
  220. return allocated_prefix
  221. return None