views.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. import base64
  2. from Crypto.PublicKey import RSA
  3. from django.core.urlresolvers import reverse
  4. from django.http import HttpResponseBadRequest
  5. from rest_framework.authentication import BasicAuthentication, SessionAuthentication
  6. from rest_framework.permissions import IsAuthenticated
  7. from rest_framework.renderers import JSONRenderer
  8. from rest_framework.response import Response
  9. from rest_framework.viewsets import ViewSet, ModelViewSet
  10. from extras.api.renderers import FormlessBrowsableAPIRenderer, FreeRADIUSClientsRenderer
  11. from secrets.exceptions import InvalidSessionKey
  12. from secrets.filters import SecretFilter
  13. from secrets.models import Secret, SecretRole, SessionKey, UserKey
  14. from utilities.api import WritableSerializerMixin
  15. from . import serializers
  16. ERR_USERKEY_MISSING = "No UserKey found for the current user."
  17. ERR_USERKEY_INACTIVE = "UserKey has not been activated for decryption."
  18. ERR_PRIVKEY_MISSING = "Private key was not provided."
  19. ERR_PRIVKEY_INVALID = "Invalid private key."
  20. #
  21. # Secret Roles
  22. #
  23. class SecretRoleViewSet(ModelViewSet):
  24. queryset = SecretRole.objects.all()
  25. serializer_class = serializers.SecretRoleSerializer
  26. permission_classes = [IsAuthenticated]
  27. #
  28. # Secrets
  29. #
  30. # TODO: Need to implement custom create() and update() methods to handle secret encryption.
  31. class SecretViewSet(WritableSerializerMixin, ModelViewSet):
  32. queryset = Secret.objects.select_related(
  33. 'device__primary_ip4', 'device__primary_ip6', 'role',
  34. ).prefetch_related(
  35. 'role__users', 'role__groups',
  36. )
  37. serializer_class = serializers.SecretSerializer
  38. write_serializer_class = serializers.WritableSecretSerializer
  39. filter_class = SecretFilter
  40. # DRF's BrowsableAPIRenderer can't support passing the secret key as a header, so we disable it.
  41. renderer_classes = [FormlessBrowsableAPIRenderer, JSONRenderer, FreeRADIUSClientsRenderer]
  42. # Enabled BasicAuthentication for testing (until we have TokenAuthentication implemented)
  43. authentication_classes = [BasicAuthentication, SessionAuthentication]
  44. permission_classes = [IsAuthenticated]
  45. def _read_session_key(self, request):
  46. # Check for a session key provided as a cookie or header
  47. if 'session_key' in request.COOKIES:
  48. return base64.b64decode(request.COOKIES['session_key'])
  49. elif 'HTTP_X_SESSION_KEY' in request.META:
  50. return base64.b64decode(request.META['HTTP_X_SESSION_KEY'])
  51. return None
  52. def retrieve(self, request, *args, **kwargs):
  53. secret = self.get_object()
  54. session_key = self._read_session_key(request)
  55. # Retrieve session key cipher (if any) for the current user
  56. if session_key is not None:
  57. try:
  58. sk = SessionKey.objects.get(user=request.user)
  59. master_key = sk.get_master_key(session_key)
  60. secret.decrypt(master_key)
  61. except SessionKey.DoesNotExist:
  62. return HttpResponseBadRequest("No active session key for current user.")
  63. except InvalidSessionKey:
  64. return HttpResponseBadRequest("Invalid session key.")
  65. serializer = self.get_serializer(secret)
  66. return Response(serializer.data)
  67. def list(self, request, *args, **kwargs):
  68. queryset = self.filter_queryset(self.get_queryset())
  69. # Attempt to retrieve the master key for decryption
  70. session_key = self._read_session_key(request)
  71. master_key = None
  72. if session_key is not None:
  73. try:
  74. sk = SessionKey.objects.get(user=request.user)
  75. master_key = sk.get_master_key(session_key)
  76. except SessionKey.DoesNotExist:
  77. return HttpResponseBadRequest("No active session key for current user.")
  78. except InvalidSessionKey:
  79. return HttpResponseBadRequest("Invalid session key.")
  80. # Pagination
  81. page = self.paginate_queryset(queryset)
  82. if page is not None:
  83. secrets = []
  84. if master_key is not None:
  85. for secret in page:
  86. secret.decrypt(master_key)
  87. secrets.append(secret)
  88. serializer = self.get_serializer(secrets, many=True)
  89. else:
  90. serializer = self.get_serializer(page, many=True)
  91. return self.get_paginated_response(serializer.data)
  92. serializer = self.get_serializer(queryset, many=True)
  93. return Response(serializer.data)
  94. class GetSessionKeyViewSet(ViewSet):
  95. """
  96. Retrieve a temporary session key to use for encrypting and decrypting secrets via the API. The user's private RSA
  97. key is POSTed with the name `private_key`. An example:
  98. curl -v -X POST -H "Authorization: Token <token>" -H "Accept: application/json; indent=4" \\
  99. --data-urlencode "private_key@<filename>" https://netbox/api/secrets/get-session-key/
  100. This request will yield a base64-encoded session key to be included in an `X-Session-Key` header in future requests:
  101. {
  102. "session_key": "+8t4SI6XikgVmB5+/urhozx9O5qCQANyOk1MNe6taRf="
  103. }
  104. """
  105. permission_classes = [IsAuthenticated]
  106. def create(self, request):
  107. # Read private key
  108. private_key = request.POST.get('private_key', None)
  109. if private_key is None:
  110. return HttpResponseBadRequest(ERR_PRIVKEY_MISSING)
  111. # Validate user key
  112. try:
  113. user_key = UserKey.objects.get(user=request.user)
  114. except UserKey.DoesNotExist:
  115. return HttpResponseBadRequest(ERR_USERKEY_MISSING)
  116. if not user_key.is_active():
  117. return HttpResponseBadRequest(ERR_USERKEY_INACTIVE)
  118. # Validate private key
  119. master_key = user_key.get_master_key(private_key)
  120. if master_key is None:
  121. return HttpResponseBadRequest(ERR_PRIVKEY_INVALID)
  122. # Delete the existing SessionKey for this user if one exists
  123. SessionKey.objects.filter(user=request.user).delete()
  124. # Create a new SessionKey
  125. sk = SessionKey(user=request.user)
  126. sk.save(master_key=master_key)
  127. encoded_key = base64.b64encode(sk.key)
  128. # Craft the response
  129. response = Response({
  130. 'session_key': encoded_key,
  131. })
  132. # If token authentication is not in use, assign the session key as a cookie
  133. if request.auth is None:
  134. response.set_cookie('session_key', value=encoded_key, path=reverse('secrets-api:secret-list'))
  135. return response
  136. class GenerateRSAKeyPairViewSet(ViewSet):
  137. """
  138. This endpoint can be used to generate a new RSA key pair. The keys are returned in PEM format.
  139. {
  140. "public_key": "<public key>",
  141. "private_key": "<private key>"
  142. }
  143. """
  144. permission_classes = [IsAuthenticated]
  145. def list(self, request):
  146. # Determine what size key to generate
  147. key_size = request.GET.get('key_size', 2048)
  148. if key_size not in range(2048, 4097, 256):
  149. key_size = 2048
  150. # Export RSA private and public keys in PEM format
  151. key = RSA.generate(key_size)
  152. private_key = key.exportKey('PEM')
  153. public_key = key.publickey().exportKey('PEM')
  154. return Response({
  155. 'private_key': private_key,
  156. 'public_key': public_key,
  157. })