querysets.py 8.0 KB


  1. from django.contrib.contenttypes.models import ContentType
  2. from django.db.models import Count, F, OuterRef, Q, Subquery, Value
  3. from django.db.models.expressions import RawSQL
  4. from django.db.models.functions import Round
  5. from utilities.query import count_related
  6. from utilities.querysets import RestrictedQuerySet
  7. __all__ = (
  8. 'ASNRangeQuerySet',
  9. 'PrefixQuerySet',
  10. 'VLANGroupQuerySet',
  11. 'VLANQuerySet',
  12. )
  13. class ASNRangeQuerySet(RestrictedQuerySet):
  14. def annotate_asn_counts(self):
  15. """
  16. Annotate the number of ASNs which appear within each range.
  17. """
  18. from .models import ASN
  19. # Because ASN does not have a foreign key to ASNRange, we create a fake column "_" with a consistent value
  20. # that we can use to count ASNs and return a single value per ASNRange.
  21. asns = ASN.objects.filter(
  22. asn__gte=OuterRef('start'),
  23. asn__lte=OuterRef('end')
  24. ).order_by().annotate(_=Value(1)).values('_').annotate(c=Count('*')).values('c')
  25. return self.annotate(asn_count=Subquery(asns))
  26. class PrefixQuerySet(RestrictedQuerySet):
  27. def annotate_hierarchy(self):
  28. """
  29. Annotate the depth and number of child prefixes for each Prefix. Cast null VRF values to zero for
  30. comparison. (NULL != NULL).
  31. """
  32. return self.annotate(
  33. hierarchy_depth=RawSQL(
  34. 'SELECT COUNT(DISTINCT U0."prefix") AS "c" '
  35. 'FROM "ipam_prefix" U0 '
  36. 'WHERE (U0."prefix" >> "ipam_prefix"."prefix" '
  37. 'AND COALESCE(U0."vrf_id", 0) = COALESCE("ipam_prefix"."vrf_id", 0))',
  38. ()
  39. ),
  40. hierarchy_children=RawSQL(
  41. 'SELECT COUNT(U1."prefix") AS "c" '
  42. 'FROM "ipam_prefix" U1 '
  43. 'WHERE (U1."prefix" << "ipam_prefix"."prefix" '
  44. 'AND COALESCE(U1."vrf_id", 0) = COALESCE("ipam_prefix"."vrf_id", 0))',
  45. ()
  46. )
  47. )
  48. class VLANGroupQuerySet(RestrictedQuerySet):
  49. def annotate_utilization(self):
  50. from .models import VLAN
  51. return self.annotate(
  52. vlan_count=count_related(VLAN, 'group'),
  53. utilization=Round(F('vlan_count') * 100.0 / F('_total_vlan_ids'), 2)
  54. )
  55. class VLANQuerySet(RestrictedQuerySet):
  56. def get_for_site(self, site):
  57. """
  58. Return all VLANs in the specified site
  59. """
  60. from .models import VLANGroup
  61. q = Q()
  62. q |= Q(
  63. scope_type=ContentType.objects.get_by_natural_key('dcim', 'site'),
  64. scope_id=site.pk
  65. )
  66. if site.region:
  67. q |= Q(
  68. scope_type=ContentType.objects.get_by_natural_key('dcim', 'region'),
  69. scope_id__in=site.region.get_ancestors(include_self=True)
  70. )
  71. if site.group:
  72. q |= Q(
  73. scope_type=ContentType.objects.get_by_natural_key('dcim', 'sitegroup'),
  74. scope_id__in=site.group.get_ancestors(include_self=True)
  75. )
  76. return self.filter(
  77. Q(group__in=VLANGroup.objects.filter(q)) |
  78. Q(site=site) |
  79. Q(group__scope_id__isnull=True, site__isnull=True) | # Global group VLANs
  80. Q(group__isnull=True, site__isnull=True) # Global VLANs
  81. )
  82. def get_for_device(self, device):
  83. """
  84. Return all VLANs available to the specified Device.
  85. """
  86. from .models import VLANGroup
  87. # Find all relevant VLANGroups
  88. q = Q()
  89. if device.site.region:
  90. q |= Q(
  91. scope_type=ContentType.objects.get_by_natural_key('dcim', 'region'),
  92. scope_id__in=device.site.region.get_ancestors(include_self=True)
  93. )
  94. if device.site.group:
  95. q |= Q(
  96. scope_type=ContentType.objects.get_by_natural_key('dcim', 'sitegroup'),
  97. scope_id__in=device.site.group.get_ancestors(include_self=True)
  98. )
  99. q |= Q(
  100. scope_type=ContentType.objects.get_by_natural_key('dcim', 'site'),
  101. scope_id=device.site_id
  102. )
  103. if device.location:
  104. q |= Q(
  105. scope_type=ContentType.objects.get_by_natural_key('dcim', 'location'),
  106. scope_id__in=device.location.get_ancestors(include_self=True)
  107. )
  108. if device.rack:
  109. q |= Q(
  110. scope_type=ContentType.objects.get_by_natural_key('dcim', 'rack'),
  111. scope_id=device.rack_id
  112. )
  113. # Return all applicable VLANs
  114. return self.filter(
  115. Q(group__in=VLANGroup.objects.filter(q)) |
  116. Q(site=device.site) |
  117. Q(group__scope_id__isnull=True, site__isnull=True) | # Global group VLANs
  118. Q(group__isnull=True, site__isnull=True) # Global VLANs
  119. )
  120. def get_for_virtualmachine(self, vm):
  121. """
  122. Return all VLANs available to the specified VirtualMachine.
  123. """
  124. from .models import VLANGroup
  125. # Find all relevant VLANGroups
  126. q = Q()
  127. site = vm.site
  128. if vm.cluster:
  129. # Add VLANGroups scoped to the assigned cluster (or its group)
  130. q |= Q(
  131. scope_type=ContentType.objects.get_by_natural_key('virtualization', 'cluster'),
  132. scope_id=vm.cluster_id
  133. )
  134. if vm.cluster.group:
  135. q |= Q(
  136. scope_type=ContentType.objects.get_by_natural_key('virtualization', 'clustergroup'),
  137. scope_id=vm.cluster.group_id
  138. )
  139. # Looking all possible cluster scopes
  140. if vm.cluster.scope_type == ContentType.objects.get_by_natural_key('dcim', 'location'):
  141. site = site or vm.cluster.scope.site
  142. q |= Q(
  143. scope_type=ContentType.objects.get_by_natural_key('dcim', 'location'),
  144. scope_id__in=vm.cluster.scope.get_ancestors(include_self=True)
  145. )
  146. elif vm.cluster.scope_type == ContentType.objects.get_by_natural_key('dcim', 'site'):
  147. site = site or vm.cluster.scope
  148. q |= Q(
  149. scope_type=ContentType.objects.get_by_natural_key('dcim', 'site'),
  150. scope_id=vm.cluster.scope.pk
  151. )
  152. elif vm.cluster.scope_type == ContentType.objects.get_by_natural_key('dcim', 'sitegroup'):
  153. q |= Q(
  154. scope_type=ContentType.objects.get_by_natural_key('dcim', 'sitegroup'),
  155. scope_id__in=vm.cluster.scope.get_ancestors(include_self=True)
  156. )
  157. elif vm.cluster.scope_type == ContentType.objects.get_by_natural_key('dcim', 'region'):
  158. q |= Q(
  159. scope_type=ContentType.objects.get_by_natural_key('dcim', 'region'),
  160. scope_id__in=vm.cluster.scope.get_ancestors(include_self=True)
  161. )
  162. # VM can be assigned to a site without a cluster so checking assigned site independently
  163. if site:
  164. # Add VLANGroups scoped to the assigned site (or its group or region)
  165. q |= Q(
  166. scope_type=ContentType.objects.get_by_natural_key('dcim', 'site'),
  167. scope_id=site.pk
  168. )
  169. if site.region:
  170. q |= Q(
  171. scope_type=ContentType.objects.get_by_natural_key('dcim', 'region'),
  172. scope_id__in=site.region.get_ancestors(include_self=True)
  173. )
  174. if site.group:
  175. q |= Q(
  176. scope_type=ContentType.objects.get_by_natural_key('dcim', 'sitegroup'),
  177. scope_id__in=site.group.get_ancestors(include_self=True)
  178. )
  179. vlan_groups = VLANGroup.objects.filter(q)
  180. # Return all applicable VLANs
  181. q = (
  182. Q(group__in=vlan_groups) |
  183. Q(group__scope_id__isnull=True, site__isnull=True) | # Global group VLANs
  184. Q(group__isnull=True, site__isnull=True) # Global VLANs
  185. )
  186. if site:
  187. q |= Q(site=site)
  188. return self.filter(q)