filtersets.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393
  1. import json
  2. import django_filters
  3. from copy import deepcopy
  4. from django.contrib.contenttypes.models import ContentType
  5. from django.db import models
  6. from django.db.models import Q
  7. from django_filters.exceptions import FieldLookupError
  8. from django_filters.utils import get_model_field, resolve_field
  9. from django.utils.translation import gettext as _
  10. from core.choices import ObjectChangeActionChoices
  11. from core.models import ObjectChange
  12. from extras.choices import CustomFieldFilterLogicChoices
  13. from extras.filters import TagFilter, TagIDFilter
  14. from extras.models import CustomField, SavedFilter
  15. from users.filterset_mixins import OwnerFilterMixin
  16. from utilities.constants import (
  17. FILTER_CHAR_BASED_LOOKUP_MAP, FILTER_NEGATION_LOOKUP_MAP, FILTER_TREENODE_NEGATION_LOOKUP_MAP,
  18. FILTER_NUMERIC_BASED_LOOKUP_MAP
  19. )
  20. from utilities.forms.fields import MACAddressField
  21. from utilities import filters
  22. __all__ = (
  23. 'AttributeFiltersMixin',
  24. 'BaseFilterSet',
  25. 'ChangeLoggedModelFilterSet',
  26. 'NestedGroupModelFilterSet',
  27. 'NetBoxModelFilterSet',
  28. 'OrganizationalModelFilterSet',
  29. 'PrimaryModelFilterSet',
  30. )
  31. STANDARD_LOOKUPS = (
  32. 'exact',
  33. 'iexact',
  34. 'in',
  35. 'contains',
  36. )
  37. #
  38. # FilterSets
  39. #
  40. class BaseFilterSet(django_filters.FilterSet):
  41. """
  42. A base FilterSet which provides some enhanced functionality over django-filter2's FilterSet class.
  43. """
  44. FILTER_DEFAULTS = deepcopy(django_filters.filterset.FILTER_FOR_DBFIELD_DEFAULTS)
  45. FILTER_DEFAULTS.update({
  46. models.AutoField: {
  47. 'filter_class': filters.MultiValueNumberFilter
  48. },
  49. models.CharField: {
  50. 'filter_class': filters.MultiValueCharFilter
  51. },
  52. models.DateField: {
  53. 'filter_class': filters.MultiValueDateFilter
  54. },
  55. models.DateTimeField: {
  56. 'filter_class': filters.MultiValueDateTimeFilter
  57. },
  58. models.DecimalField: {
  59. 'filter_class': filters.MultiValueDecimalFilter
  60. },
  61. models.EmailField: {
  62. 'filter_class': filters.MultiValueCharFilter
  63. },
  64. models.FloatField: {
  65. 'filter_class': filters.MultiValueNumberFilter
  66. },
  67. models.IntegerField: {
  68. 'filter_class': filters.MultiValueNumberFilter
  69. },
  70. models.PositiveIntegerField: {
  71. 'filter_class': filters.MultiValueNumberFilter
  72. },
  73. models.PositiveSmallIntegerField: {
  74. 'filter_class': filters.MultiValueNumberFilter
  75. },
  76. models.SlugField: {
  77. 'filter_class': filters.MultiValueCharFilter
  78. },
  79. models.SmallIntegerField: {
  80. 'filter_class': filters.MultiValueNumberFilter
  81. },
  82. models.TimeField: {
  83. 'filter_class': filters.MultiValueTimeFilter
  84. },
  85. models.URLField: {
  86. 'filter_class': filters.MultiValueCharFilter
  87. },
  88. MACAddressField: {
  89. 'filter_class': filters.MultiValueMACAddressFilter
  90. },
  91. })
  92. def __init__(self, data=None, *args, **kwargs):
  93. # bit of a hack for #9231 - extras.lookup.Empty is registered in apps.ready
  94. # however FilterSet Factory is setup before this which creates the
  95. # initial filters. This recreates the filters so Empty is picked up correctly.
  96. self.base_filters = self.__class__.get_filters()
  97. # Apply any referenced SavedFilters
  98. if data and ('filter' in data or 'filter_id' in data):
  99. data = data.copy() # Get a mutable copy
  100. saved_filters = SavedFilter.objects.filter(
  101. Q(slug__in=data.pop('filter', [])) |
  102. Q(pk__in=data.pop('filter_id', []))
  103. )
  104. for sf in saved_filters:
  105. for key, value in sf.parameters.items():
  106. # QueryDicts are... fun
  107. if type(value) not in (list, tuple):
  108. value = [value]
  109. if key in data:
  110. for v in value:
  111. data.appendlist(key, v)
  112. else:
  113. data.setlist(key, value)
  114. super().__init__(data, *args, **kwargs)
  115. @staticmethod
  116. def _get_filter_lookup_dict(existing_filter):
  117. # Choose the lookup expression map based on the filter type
  118. if isinstance(existing_filter, (
  119. django_filters.NumberFilter,
  120. filters.MultiValueDateFilter,
  121. filters.MultiValueDateTimeFilter,
  122. filters.MultiValueNumberFilter,
  123. filters.MultiValueDecimalFilter,
  124. filters.MultiValueTimeFilter
  125. )):
  126. return FILTER_NUMERIC_BASED_LOOKUP_MAP
  127. elif isinstance(existing_filter, (
  128. filters.TreeNodeMultipleChoiceFilter,
  129. )):
  130. # TreeNodeMultipleChoiceFilter only support negation but must maintain the `in` lookup expression
  131. return FILTER_TREENODE_NEGATION_LOOKUP_MAP
  132. elif isinstance(existing_filter, (
  133. django_filters.ModelChoiceFilter,
  134. django_filters.ModelMultipleChoiceFilter,
  135. TagFilter
  136. )):
  137. # These filter types support only negation
  138. return FILTER_NEGATION_LOOKUP_MAP
  139. elif isinstance(existing_filter, (
  140. django_filters.filters.CharFilter,
  141. django_filters.ChoiceFilter,
  142. django_filters.MultipleChoiceFilter,
  143. filters.MultiValueCharFilter,
  144. filters.MultiValueMACAddressFilter
  145. )):
  146. return FILTER_CHAR_BASED_LOOKUP_MAP
  147. return None
  148. @classmethod
  149. def get_additional_lookups(cls, existing_filter_name, existing_filter):
  150. new_filters = {}
  151. # Skip on abstract models
  152. if not cls._meta.model:
  153. return {}
  154. # Skip nonstandard lookup expressions
  155. if existing_filter.method is not None or existing_filter.lookup_expr not in STANDARD_LOOKUPS:
  156. return {}
  157. # Choose the lookup expression map based on the filter type
  158. lookup_map = cls._get_filter_lookup_dict(existing_filter)
  159. if lookup_map is None:
  160. # Do not augment this filter type with more lookup expressions
  161. return {}
  162. # Get properties of the existing filter for later use
  163. field_name = existing_filter.field_name
  164. field = get_model_field(cls._meta.model, field_name)
  165. # Create new filters for each lookup expression in the map
  166. for lookup_name, lookup_expr in lookup_map.items():
  167. new_filter_name = f'{existing_filter_name}__{lookup_name}'
  168. existing_filter_extra = deepcopy(existing_filter.extra)
  169. try:
  170. if existing_filter_name in cls.declared_filters:
  171. # The filter field has been explicitly defined on the filterset class so we must manually
  172. # create the new filter with the same type because there is no guarantee the defined type
  173. # is the same as the default type for the field
  174. if field is None:
  175. raise ValueError('Invalid field name/lookup on {}: {}'.format(existing_filter_name, field_name))
  176. resolve_field(field, lookup_expr) # Will raise FieldLookupError if the lookup is invalid
  177. filter_cls = type(existing_filter)
  178. if lookup_expr == 'empty':
  179. filter_cls = django_filters.BooleanFilter
  180. for param_to_remove in ('choices', 'null_value'):
  181. existing_filter_extra.pop(param_to_remove, None)
  182. new_filter = filter_cls(
  183. field_name=field_name,
  184. lookup_expr=lookup_expr,
  185. label=existing_filter.label,
  186. exclude=existing_filter.exclude,
  187. distinct=existing_filter.distinct,
  188. **existing_filter_extra
  189. )
  190. elif hasattr(existing_filter, 'custom_field'):
  191. # Filter is for a custom field
  192. custom_field = existing_filter.custom_field
  193. new_filter = custom_field.to_filter(lookup_expr=lookup_expr)
  194. else:
  195. # The filter field is listed in Meta.fields so we can safely rely on default behaviour
  196. # Will raise FieldLookupError if the lookup is invalid
  197. new_filter = cls.filter_for_field(field, field_name, lookup_expr)
  198. except FieldLookupError:
  199. # The filter could not be created because the lookup expression is not supported on the field
  200. continue
  201. if lookup_name.startswith('n'):
  202. # This is a negation filter which requires a queryset.exclude() clause
  203. # Of course setting the negation of the existing filter's exclude attribute handles both cases
  204. new_filter.exclude = not existing_filter.exclude
  205. new_filters[new_filter_name] = new_filter
  206. return new_filters
  207. @classmethod
  208. def get_filters(cls):
  209. """
  210. Override filter generation to support dynamic lookup expressions for certain filter types.
  211. For specific filter types, new filters are created based on defined lookup expressions in
  212. the form `<field_name>__<lookup_expr>`
  213. """
  214. filters = super().get_filters()
  215. additional_filters = {}
  216. for existing_filter_name, existing_filter in filters.items():
  217. additional_filters.update(cls.get_additional_lookups(existing_filter_name, existing_filter))
  218. filters.update(additional_filters)
  219. return filters
  220. @classmethod
  221. def filter_for_lookup(cls, field, lookup_type):
  222. if lookup_type == 'empty':
  223. return django_filters.BooleanFilter, {}
  224. return super().filter_for_lookup(field, lookup_type)
  225. class ChangeLoggedModelFilterSet(BaseFilterSet):
  226. """
  227. Base FilterSet for ChangeLoggedModel classes.
  228. """
  229. created = filters.MultiValueDateTimeFilter()
  230. last_updated = filters.MultiValueDateTimeFilter()
  231. created_by_request = django_filters.UUIDFilter(
  232. method='filter_by_request'
  233. )
  234. updated_by_request = django_filters.UUIDFilter(
  235. method='filter_by_request'
  236. )
  237. modified_by_request = django_filters.UUIDFilter(
  238. method='filter_by_request'
  239. )
  240. def filter_by_request(self, queryset, name, value):
  241. content_type = ContentType.objects.get_for_model(self.Meta.model)
  242. action = {
  243. 'created_by_request': Q(action=ObjectChangeActionChoices.ACTION_CREATE),
  244. 'updated_by_request': Q(action=ObjectChangeActionChoices.ACTION_UPDATE),
  245. 'modified_by_request': Q(
  246. action__in=[ObjectChangeActionChoices.ACTION_CREATE, ObjectChangeActionChoices.ACTION_UPDATE]
  247. ),
  248. }.get(name)
  249. request_id = value
  250. pks = ObjectChange.objects.filter(
  251. action,
  252. changed_object_type=content_type,
  253. request_id=request_id,
  254. ).values_list('changed_object_id', flat=True)
  255. return queryset.filter(pk__in=pks)
  256. class NetBoxModelFilterSet(ChangeLoggedModelFilterSet):
  257. """
  258. Provides additional filtering functionality (e.g. tags, custom fields) for core NetBox models.
  259. """
  260. q = django_filters.CharFilter(
  261. method='search',
  262. label=_('Search'),
  263. )
  264. tag = TagFilter()
  265. tag_id = TagIDFilter()
  266. def __init__(self, *args, **kwargs):
  267. super().__init__(*args, **kwargs)
  268. custom_field_filters = {}
  269. for custom_field in CustomField.objects.get_for_model(self._meta.model):
  270. if custom_field.filter_logic == CustomFieldFilterLogicChoices.FILTER_DISABLED:
  271. # Skip disabled fields
  272. continue
  273. if filter_instance := custom_field.to_filter():
  274. filter_name = f'cf_{custom_field.name}'
  275. custom_field_filters[filter_name] = filter_instance
  276. # Add relevant additional lookups
  277. additional_lookups = self.get_additional_lookups(filter_name, filter_instance)
  278. custom_field_filters.update(additional_lookups)
  279. self.filters.update(custom_field_filters)
  280. def search(self, queryset, name, value):
  281. """
  282. Override this method to apply a general-purpose search logic.
  283. """
  284. return queryset
  285. class PrimaryModelFilterSet(OwnerFilterMixin, NetBoxModelFilterSet):
  286. """
  287. Base filterset for models inheriting from PrimaryModel.
  288. """
  289. pass
  290. class OrganizationalModelFilterSet(OwnerFilterMixin, NetBoxModelFilterSet):
  291. """
  292. Base filterset for models inheriting from OrganizationalModel.
  293. """
  294. def search(self, queryset, name, value):
  295. if not value.strip():
  296. return queryset
  297. return queryset.filter(
  298. models.Q(name__icontains=value) |
  299. models.Q(slug__icontains=value) |
  300. models.Q(description__icontains=value)
  301. )
  302. class NestedGroupModelFilterSet(OwnerFilterMixin, NetBoxModelFilterSet):
  303. """
  304. Base filterset for models inheriting from NestedGroupModel.
  305. """
  306. def search(self, queryset, name, value):
  307. if value.strip():
  308. queryset = queryset.filter(
  309. models.Q(name__icontains=value) |
  310. models.Q(slug__icontains=value) |
  311. models.Q(description__icontains=value) |
  312. models.Q(comments__icontains=value)
  313. )
  314. return queryset
  315. class AttributeFiltersMixin:
  316. attributes_field_name = 'attribute_data'
  317. attribute_filter_prefix = 'attr_'
  318. def __init__(self, data=None, queryset=None, *, request=None, prefix=None):
  319. self.attr_filters = {}
  320. # Extract JSONField-based filters from the incoming data
  321. if data is not None:
  322. for key, value in data.items():
  323. if field := self._get_field_lookup(key):
  324. # Attempt to cast the value to a native JSON type
  325. try:
  326. self.attr_filters[field] = json.loads(value)
  327. except (ValueError, json.JSONDecodeError):
  328. self.attr_filters[field] = value
  329. super().__init__(data=data, queryset=queryset, request=request, prefix=prefix)
  330. def _get_field_lookup(self, key):
  331. if not key.startswith(self.attribute_filter_prefix):
  332. return
  333. lookup = key.split(self.attribute_filter_prefix, 1)[1] # Strip prefix
  334. return f'{self.attributes_field_name}__{lookup}'
  335. def filter_queryset(self, queryset):
  336. return super().filter_queryset(queryset).filter(**self.attr_filters)