authentication.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. from django.conf import settings
  2. from django.utils import timezone
  3. from rest_framework import authentication, exceptions
  4. from rest_framework.permissions import BasePermission, DjangoObjectPermissions, SAFE_METHODS
  5. from users.models import Token
  6. class TokenAuthentication(authentication.TokenAuthentication):
  7. """
  8. A custom authentication scheme which enforces Token expiration times.
  9. """
  10. model = Token
  11. def authenticate_credentials(self, key):
  12. model = self.get_model()
  13. try:
  14. token = model.objects.prefetch_related('user').get(key=key)
  15. except model.DoesNotExist:
  16. raise exceptions.AuthenticationFailed("Invalid token")
  17. # Update last used, but only once a minute. This reduces the write load on the db
  18. last_used_diff = timezone.now() - token.last_used
  19. if last_used_diff.total_seconds() > 60:
  20. token.last_used = timezone.now()
  21. token.save()
  22. # Enforce the Token's expiration time, if one has been set.
  23. if token.is_expired:
  24. raise exceptions.AuthenticationFailed("Token expired")
  25. if not token.user.is_active:
  26. raise exceptions.AuthenticationFailed("User inactive")
  27. # When LDAP authentication is active try to load user data from LDAP directory
  28. if settings.REMOTE_AUTH_BACKEND == 'netbox.authentication.LDAPBackend':
  29. from netbox.authentication import LDAPBackend
  30. ldap_backend = LDAPBackend()
  31. # Load from LDAP if FIND_GROUP_PERMS is active
  32. if ldap_backend.settings.FIND_GROUP_PERMS:
  33. user = ldap_backend.populate_user(token.user.username)
  34. # If the user is found in the LDAP directory use it, if not fallback to the local user
  35. if user:
  36. return user, token
  37. return token.user, token
  38. class TokenPermissions(DjangoObjectPermissions):
  39. """
  40. Custom permissions handler which extends the built-in DjangoModelPermissions to validate a Token's write ability
  41. for unsafe requests (POST/PUT/PATCH/DELETE).
  42. """
  43. # Override the stock perm_map to enforce view permissions
  44. perms_map = {
  45. 'GET': ['%(app_label)s.view_%(model_name)s'],
  46. 'OPTIONS': [],
  47. 'HEAD': ['%(app_label)s.view_%(model_name)s'],
  48. 'POST': ['%(app_label)s.add_%(model_name)s'],
  49. 'PUT': ['%(app_label)s.change_%(model_name)s'],
  50. 'PATCH': ['%(app_label)s.change_%(model_name)s'],
  51. 'DELETE': ['%(app_label)s.delete_%(model_name)s'],
  52. }
  53. def __init__(self):
  54. # LOGIN_REQUIRED determines whether read-only access is provided to anonymous users.
  55. self.authenticated_users_only = settings.LOGIN_REQUIRED
  56. super().__init__()
  57. def _verify_write_permission(self, request):
  58. # If token authentication is in use, verify that the token allows write operations (for unsafe methods).
  59. if request.method in SAFE_METHODS or request.auth.write_enabled:
  60. return True
  61. def has_permission(self, request, view):
  62. # Enforce Token write ability
  63. if isinstance(request.auth, Token) and not self._verify_write_permission(request):
  64. return False
  65. return super().has_permission(request, view)
  66. def has_object_permission(self, request, view, obj):
  67. # Enforce Token write ability
  68. if isinstance(request.auth, Token) and not self._verify_write_permission(request):
  69. return False
  70. return super().has_object_permission(request, view, obj)
  71. class IsAuthenticatedOrLoginNotRequired(BasePermission):
  72. """
  73. Returns True if the user is authenticated or LOGIN_REQUIRED is False.
  74. """
  75. def has_permission(self, request, view):
  76. if not settings.LOGIN_REQUIRED:
  77. return True
  78. return request.user.is_authenticated