views.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. from django.contrib.auth import authenticate
  2. from django.contrib.auth import get_user_model
  3. from django.contrib.auth.models import Group
  4. from django.db.models import Count
  5. from drf_spectacular.utils import extend_schema
  6. from drf_spectacular.types import OpenApiTypes
  7. from rest_framework.exceptions import AuthenticationFailed
  8. from rest_framework.permissions import IsAuthenticated
  9. from rest_framework.response import Response
  10. from rest_framework.routers import APIRootView
  11. from rest_framework.status import HTTP_201_CREATED
  12. from rest_framework.views import APIView
  13. from rest_framework.viewsets import ViewSet
  14. from netbox.api.viewsets import NetBoxModelViewSet
  15. from users import filtersets
  16. from users.models import ObjectPermission, Token, UserConfig
  17. from utilities.querysets import RestrictedQuerySet
  18. from utilities.utils import deepmerge
  19. from . import serializers
  20. class UsersRootView(APIRootView):
  21. """
  22. Users API root view
  23. """
  24. def get_view_name(self):
  25. return 'Users'
  26. #
  27. # Users and groups
  28. #
  29. class UserViewSet(NetBoxModelViewSet):
  30. queryset = RestrictedQuerySet(model=get_user_model()).prefetch_related('groups').order_by('username')
  31. serializer_class = serializers.UserSerializer
  32. filterset_class = filtersets.UserFilterSet
  33. class GroupViewSet(NetBoxModelViewSet):
  34. queryset = RestrictedQuerySet(model=Group).annotate(user_count=Count('user')).order_by('name')
  35. serializer_class = serializers.GroupSerializer
  36. filterset_class = filtersets.GroupFilterSet
  37. #
  38. # REST API tokens
  39. #
  40. class TokenViewSet(NetBoxModelViewSet):
  41. queryset = Token.objects.prefetch_related('user')
  42. serializer_class = serializers.TokenSerializer
  43. filterset_class = filtersets.TokenFilterSet
  44. class TokenProvisionView(APIView):
  45. """
  46. Non-authenticated REST API endpoint via which a user may create a Token.
  47. """
  48. permission_classes = []
  49. @extend_schema(
  50. request=serializers.TokenProvisionSerializer,
  51. responses={
  52. 201: serializers.TokenSerializer,
  53. 401: OpenApiTypes.OBJECT,
  54. }
  55. )
  56. def post(self, request):
  57. serializer = serializers.TokenProvisionSerializer(data=request.data)
  58. serializer.is_valid()
  59. # Authenticate the user account based on the provided credentials
  60. username = serializer.data.get('username')
  61. password = serializer.data.get('password')
  62. if not username or not password:
  63. raise AuthenticationFailed("Username and password must be provided to provision a token.")
  64. user = authenticate(request=request, username=username, password=password)
  65. if user is None:
  66. raise AuthenticationFailed("Invalid username/password")
  67. # Create a new Token for the User
  68. token = Token(user=user)
  69. token.save()
  70. data = serializers.TokenSerializer(token, context={'request': request}).data
  71. # Manually append the token key, which is normally write-only
  72. data['key'] = token.key
  73. return Response(data, status=HTTP_201_CREATED)
  74. def get_serializer_class(self):
  75. return serializers.TokenSerializer
  76. #
  77. # ObjectPermissions
  78. #
  79. class ObjectPermissionViewSet(NetBoxModelViewSet):
  80. queryset = ObjectPermission.objects.prefetch_related('object_types', 'groups', 'users')
  81. serializer_class = serializers.ObjectPermissionSerializer
  82. filterset_class = filtersets.ObjectPermissionFilterSet
  83. #
  84. # User preferences
  85. #
  86. class UserConfigViewSet(ViewSet):
  87. """
  88. An API endpoint via which a user can update his or her own UserConfig data (but no one else's).
  89. """
  90. permission_classes = [IsAuthenticated]
  91. def get_queryset(self):
  92. return UserConfig.objects.filter(user=self.request.user)
  93. @extend_schema(responses={200: OpenApiTypes.OBJECT})
  94. def list(self, request):
  95. """
  96. Return the UserConfig for the currently authenticated User.
  97. """
  98. userconfig = self.get_queryset().first()
  99. return Response(userconfig.data)
  100. @extend_schema(methods=["patch"], responses={201: OpenApiTypes.OBJECT})
  101. def patch(self, request):
  102. """
  103. Update the UserConfig for the currently authenticated User.
  104. """
  105. # TODO: How can we validate this data?
  106. userconfig = self.get_queryset().first()
  107. userconfig.data = deepmerge(userconfig.data, request.data)
  108. userconfig.save()
  109. return Response(userconfig.data)