querysets.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. import logging
  2. from django.db.models import Q, QuerySet
  3. from utilities.permissions import permission_is_exempt
  4. class DummyQuerySet:
  5. """
  6. A fake QuerySet that can be used to cache relationships to objects that have been deleted.
  7. """
  8. def __init__(self, queryset):
  9. self._cache = [obj for obj in queryset.all()]
  10. def all(self):
  11. return self._cache
  12. class RestrictedQuerySet(QuerySet):
  13. def __init__(self, *args, **kwargs):
  14. super().__init__(*args, **kwargs)
  15. # Initialize the allow_evaluation flag to False. This indicates that the QuerySet has not yet been restricted.
  16. self.allow_evaluation = False
  17. def _check_restriction(self):
  18. # Raise a warning if the QuerySet is evaluated without first calling restrict() or unrestricted().
  19. if not getattr(self, 'allow_evaluation', False):
  20. logger = logging.getLogger('netbox.RestrictedQuerySet')
  21. logger.warning(
  22. f'Evaluation of RestrictedQuerySet prior to calling restrict() or unrestricted(): {self.model}'
  23. )
  24. def _clone(self):
  25. # Persist the allow_evaluation flag when cloning the QuerySet.
  26. c = super()._clone()
  27. c.allow_evaluation = self.allow_evaluation
  28. return c
  29. def _fetch_all(self):
  30. self._check_restriction()
  31. return super()._fetch_all()
  32. def count(self):
  33. self._check_restriction()
  34. return super().count()
  35. def unrestricted(self):
  36. """
  37. Bypass restriction for the QuerySet. This is necessary in cases where we are not interacting with the objects
  38. directly (e.g. when filtering by related object).
  39. """
  40. self.allow_evaluation = True
  41. return self
  42. def restrict(self, user, action):
  43. """
  44. Filter the QuerySet to return only objects on which the specified user has been granted the specified
  45. permission.
  46. :param user: User instance
  47. :param action: The action which must be permitted (e.g. "view" for "dcim.view_site")
  48. """
  49. # Resolve the full name of the required permission
  50. app_label = self.model._meta.app_label
  51. model_name = self.model._meta.model_name
  52. permission_required = f'{app_label}.{action}_{model_name}'
  53. # Bypass restriction for superusers and exempt views
  54. if user.is_superuser or permission_is_exempt(permission_required):
  55. qs = self
  56. # User is anonymous or has not been granted the requisite permission
  57. elif not user.is_authenticated or permission_required not in user.get_all_permissions():
  58. qs = self.none()
  59. # Filter the queryset to include only objects with allowed attributes
  60. else:
  61. attrs = Q()
  62. for perm_attrs in user._object_perm_cache[permission_required]:
  63. if perm_attrs:
  64. attrs |= Q(**perm_attrs)
  65. qs = self.filter(attrs)
  66. # Allow QuerySet evaluation
  67. qs.allow_evaluation = True
  68. return qs