views.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. import base64
  2. from Crypto.PublicKey import RSA
  3. from django.db.models import Count
  4. from django.http import HttpResponseBadRequest
  5. from rest_framework.exceptions import ValidationError
  6. from rest_framework.permissions import IsAuthenticated
  7. from rest_framework.response import Response
  8. from rest_framework.viewsets import ViewSet
  9. from secrets import filters
  10. from secrets.exceptions import InvalidKey
  11. from secrets.models import Secret, SecretRole, SessionKey, UserKey
  12. from utilities.api import FieldChoicesViewSet, ModelViewSet
  13. from . import serializers
  14. ERR_USERKEY_MISSING = "No UserKey found for the current user."
  15. ERR_USERKEY_INACTIVE = "UserKey has not been activated for decryption."
  16. ERR_PRIVKEY_MISSING = "Private key was not provided."
  17. ERR_PRIVKEY_INVALID = "Invalid private key."
  18. #
  19. # Field choices
  20. #
  21. class SecretsFieldChoicesViewSet(FieldChoicesViewSet):
  22. fields = ()
  23. #
  24. # Secret Roles
  25. #
  26. class SecretRoleViewSet(ModelViewSet):
  27. queryset = SecretRole.objects.annotate(
  28. secret_count=Count('secrets')
  29. )
  30. serializer_class = serializers.SecretRoleSerializer
  31. permission_classes = [IsAuthenticated]
  32. filterset_class = filters.SecretRoleFilter
  33. #
  34. # Secrets
  35. #
  36. class SecretViewSet(ModelViewSet):
  37. queryset = Secret.objects.select_related(
  38. 'device__primary_ip4', 'device__primary_ip6', 'role',
  39. ).prefetch_related(
  40. 'role__users', 'role__groups', 'tags',
  41. )
  42. serializer_class = serializers.SecretSerializer
  43. filterset_class = filters.SecretFilter
  44. master_key = None
  45. def get_serializer_context(self):
  46. # Make the master key available to the serializer for encrypting plaintext values
  47. context = super().get_serializer_context()
  48. context['master_key'] = self.master_key
  49. return context
  50. def initial(self, request, *args, **kwargs):
  51. super().initial(request, *args, **kwargs)
  52. if request.user.is_authenticated:
  53. # Read session key from HTTP cookie or header if it has been provided. The session key must be provided in
  54. # order to encrypt/decrypt secrets.
  55. if 'session_key' in request.COOKIES:
  56. session_key = base64.b64decode(request.COOKIES['session_key'])
  57. elif 'HTTP_X_SESSION_KEY' in request.META:
  58. session_key = base64.b64decode(request.META['HTTP_X_SESSION_KEY'])
  59. else:
  60. session_key = None
  61. # We can't encrypt secret plaintext without a session key.
  62. if self.action in ['create', 'update'] and session_key is None:
  63. raise ValidationError("A session key must be provided when creating or updating secrets.")
  64. # Attempt to retrieve the master key for encryption/decryption if a session key has been provided.
  65. if session_key is not None:
  66. try:
  67. sk = SessionKey.objects.get(userkey__user=request.user)
  68. self.master_key = sk.get_master_key(session_key)
  69. except (SessionKey.DoesNotExist, InvalidKey):
  70. raise ValidationError("Invalid session key.")
  71. def retrieve(self, request, *args, **kwargs):
  72. secret = self.get_object()
  73. # Attempt to decrypt the secret if the master key is known
  74. if self.master_key is not None:
  75. secret.decrypt(self.master_key)
  76. serializer = self.get_serializer(secret)
  77. return Response(serializer.data)
  78. def list(self, request, *args, **kwargs):
  79. queryset = self.filter_queryset(self.get_queryset())
  80. page = self.paginate_queryset(queryset)
  81. if page is not None:
  82. # Attempt to decrypt all secrets if the master key is known
  83. if self.master_key is not None:
  84. secrets = []
  85. for secret in page:
  86. secret.decrypt(self.master_key)
  87. secrets.append(secret)
  88. serializer = self.get_serializer(secrets, many=True)
  89. else:
  90. serializer = self.get_serializer(page, many=True)
  91. return self.get_paginated_response(serializer.data)
  92. serializer = self.get_serializer(queryset, many=True)
  93. return Response(serializer.data)
  94. class GetSessionKeyViewSet(ViewSet):
  95. """
  96. Retrieve a temporary session key to use for encrypting and decrypting secrets via the API. The user's private RSA
  97. key is POSTed with the name `private_key`. An example:
  98. curl -v -X POST -H "Authorization: Token <token>" -H "Accept: application/json; indent=4" \\
  99. --data-urlencode "private_key@<filename>" https://netbox/api/secrets/get-session-key/
  100. This request will yield a base64-encoded session key to be included in an `X-Session-Key` header in future requests:
  101. {
  102. "session_key": "+8t4SI6XikgVmB5+/urhozx9O5qCQANyOk1MNe6taRf="
  103. }
  104. This endpoint accepts one optional parameter: `preserve_key`. If True and a session key exists, the existing session
  105. key will be returned instead of a new one.
  106. """
  107. permission_classes = [IsAuthenticated]
  108. def create(self, request):
  109. # Read private key
  110. private_key = request.POST.get('private_key', None)
  111. if private_key is None:
  112. return HttpResponseBadRequest(ERR_PRIVKEY_MISSING)
  113. # Validate user key
  114. try:
  115. user_key = UserKey.objects.get(user=request.user)
  116. except UserKey.DoesNotExist:
  117. return HttpResponseBadRequest(ERR_USERKEY_MISSING)
  118. if not user_key.is_active():
  119. return HttpResponseBadRequest(ERR_USERKEY_INACTIVE)
  120. # Validate private key
  121. master_key = user_key.get_master_key(private_key)
  122. if master_key is None:
  123. return HttpResponseBadRequest(ERR_PRIVKEY_INVALID)
  124. try:
  125. current_session_key = SessionKey.objects.get(userkey__user_id=request.user.pk)
  126. except SessionKey.DoesNotExist:
  127. current_session_key = None
  128. if current_session_key and request.GET.get('preserve_key', False):
  129. # Retrieve the existing session key
  130. key = current_session_key.get_session_key(master_key)
  131. else:
  132. # Create a new SessionKey
  133. SessionKey.objects.filter(userkey__user=request.user).delete()
  134. sk = SessionKey(userkey=user_key)
  135. sk.save(master_key=master_key)
  136. key = sk.key
  137. # Encode the key using base64. (b64decode() returns a bytestring under Python 3.)
  138. encoded_key = base64.b64encode(key).decode()
  139. # Craft the response
  140. response = Response({
  141. 'session_key': encoded_key,
  142. })
  143. # If token authentication is not in use, assign the session key as a cookie
  144. if request.auth is None:
  145. response.set_cookie('session_key', value=encoded_key)
  146. return response
  147. class GenerateRSAKeyPairViewSet(ViewSet):
  148. """
  149. This endpoint can be used to generate a new RSA key pair. The keys are returned in PEM format.
  150. {
  151. "public_key": "<public key>",
  152. "private_key": "<private key>"
  153. }
  154. """
  155. permission_classes = [IsAuthenticated]
  156. def list(self, request):
  157. # Determine what size key to generate
  158. key_size = request.GET.get('key_size', 2048)
  159. if key_size not in range(2048, 4097, 256):
  160. key_size = 2048
  161. # Export RSA private and public keys in PEM format
  162. key = RSA.generate(key_size)
  163. private_key = key.exportKey('PEM')
  164. public_key = key.publickey().exportKey('PEM')
  165. return Response({
  166. 'private_key': private_key,
  167. 'public_key': public_key,
  168. })