views.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. import base64
  2. from Crypto.PublicKey import RSA
  3. from django.db.models.functions import Coalesce
  4. from django.http import HttpResponseBadRequest
  5. from rest_framework.exceptions import ValidationError
  6. from rest_framework.permissions import IsAuthenticated
  7. from rest_framework.response import Response
  8. from rest_framework.routers import APIRootView
  9. from rest_framework.viewsets import ViewSet
  10. from netbox.api.views import ModelViewSet
  11. from secrets import filters
  12. from secrets.exceptions import InvalidKey
  13. from secrets.models import Secret, SecretRole, SessionKey, UserKey
  14. from utilities.utils import get_subquery
  15. from . import serializers
  16. ERR_USERKEY_MISSING = "No UserKey found for the current user."
  17. ERR_USERKEY_INACTIVE = "UserKey has not been activated for decryption."
  18. ERR_PRIVKEY_MISSING = "Private key was not provided."
  19. ERR_PRIVKEY_INVALID = "Invalid private key."
  20. class SecretsRootView(APIRootView):
  21. """
  22. Secrets API root view
  23. """
  24. def get_view_name(self):
  25. return 'Secrets'
  26. #
  27. # Secret Roles
  28. #
  29. class SecretRoleViewSet(ModelViewSet):
  30. queryset = SecretRole.objects.annotate(
  31. secret_count=Coalesce(get_subquery(Secret, 'role'), 0)
  32. )
  33. serializer_class = serializers.SecretRoleSerializer
  34. filterset_class = filters.SecretRoleFilterSet
  35. #
  36. # Secrets
  37. #
  38. class SecretViewSet(ModelViewSet):
  39. queryset = Secret.objects.prefetch_related('role', 'tags')
  40. serializer_class = serializers.SecretSerializer
  41. filterset_class = filters.SecretFilterSet
  42. master_key = None
  43. def get_serializer_context(self):
  44. # Make the master key available to the serializer for encrypting plaintext values
  45. context = super().get_serializer_context()
  46. context['master_key'] = self.master_key
  47. return context
  48. def initial(self, request, *args, **kwargs):
  49. super().initial(request, *args, **kwargs)
  50. if request.user.is_authenticated:
  51. # Read session key from HTTP cookie or header if it has been provided. The session key must be provided in
  52. # order to encrypt/decrypt secrets.
  53. if 'session_key' in request.COOKIES:
  54. session_key = base64.b64decode(request.COOKIES['session_key'])
  55. elif 'HTTP_X_SESSION_KEY' in request.META:
  56. session_key = base64.b64decode(request.META['HTTP_X_SESSION_KEY'])
  57. else:
  58. session_key = None
  59. # We can't encrypt secret plaintext without a session key.
  60. if self.action in ['create', 'update'] and session_key is None:
  61. raise ValidationError("A session key must be provided when creating or updating secrets.")
  62. # Attempt to retrieve the master key for encryption/decryption if a session key has been provided.
  63. if session_key is not None:
  64. try:
  65. sk = SessionKey.objects.get(userkey__user=request.user)
  66. self.master_key = sk.get_master_key(session_key)
  67. except (SessionKey.DoesNotExist, InvalidKey):
  68. raise ValidationError("Invalid session key.")
  69. def retrieve(self, request, *args, **kwargs):
  70. secret = self.get_object()
  71. # Attempt to decrypt the secret if the master key is known
  72. if self.master_key is not None:
  73. secret.decrypt(self.master_key)
  74. serializer = self.get_serializer(secret)
  75. return Response(serializer.data)
  76. def list(self, request, *args, **kwargs):
  77. queryset = self.filter_queryset(self.get_queryset())
  78. page = self.paginate_queryset(queryset)
  79. if page is not None:
  80. # Attempt to decrypt all secrets if the master key is known
  81. if self.master_key is not None:
  82. secrets = []
  83. for secret in page:
  84. secret.decrypt(self.master_key)
  85. secrets.append(secret)
  86. serializer = self.get_serializer(secrets, many=True)
  87. else:
  88. serializer = self.get_serializer(page, many=True)
  89. return self.get_paginated_response(serializer.data)
  90. serializer = self.get_serializer(queryset, many=True)
  91. return Response(serializer.data)
  92. class GetSessionKeyViewSet(ViewSet):
  93. """
  94. Retrieve a temporary session key to use for encrypting and decrypting secrets via the API. The user's private RSA
  95. key is POSTed with the name `private_key`. An example:
  96. curl -v -X POST -H "Authorization: Token <token>" -H "Accept: application/json; indent=4" \\
  97. --data-urlencode "private_key@<filename>" https://netbox/api/secrets/get-session-key/
  98. This request will yield a base64-encoded session key to be included in an `X-Session-Key` header in future requests:
  99. {
  100. "session_key": "+8t4SI6XikgVmB5+/urhozx9O5qCQANyOk1MNe6taRf="
  101. }
  102. This endpoint accepts one optional parameter: `preserve_key`. If True and a session key exists, the existing session
  103. key will be returned instead of a new one.
  104. """
  105. permission_classes = [IsAuthenticated]
  106. def create(self, request):
  107. # Read private key
  108. private_key = request.POST.get('private_key', None)
  109. if private_key is None:
  110. return HttpResponseBadRequest(ERR_PRIVKEY_MISSING)
  111. # Validate user key
  112. try:
  113. user_key = UserKey.objects.get(user=request.user)
  114. except UserKey.DoesNotExist:
  115. return HttpResponseBadRequest(ERR_USERKEY_MISSING)
  116. if not user_key.is_active():
  117. return HttpResponseBadRequest(ERR_USERKEY_INACTIVE)
  118. # Validate private key
  119. master_key = user_key.get_master_key(private_key)
  120. if master_key is None:
  121. return HttpResponseBadRequest(ERR_PRIVKEY_INVALID)
  122. try:
  123. current_session_key = SessionKey.objects.get(userkey__user_id=request.user.pk)
  124. except SessionKey.DoesNotExist:
  125. current_session_key = None
  126. if current_session_key and request.GET.get('preserve_key', False):
  127. # Retrieve the existing session key
  128. key = current_session_key.get_session_key(master_key)
  129. else:
  130. # Create a new SessionKey
  131. SessionKey.objects.filter(userkey__user=request.user).delete()
  132. sk = SessionKey(userkey=user_key)
  133. sk.save(master_key=master_key)
  134. key = sk.key
  135. # Encode the key using base64. (b64decode() returns a bytestring under Python 3.)
  136. encoded_key = base64.b64encode(key).decode()
  137. # Craft the response
  138. response = Response({
  139. 'session_key': encoded_key,
  140. })
  141. # If token authentication is not in use, assign the session key as a cookie
  142. if request.auth is None:
  143. response.set_cookie('session_key', value=encoded_key)
  144. return response
  145. class GenerateRSAKeyPairViewSet(ViewSet):
  146. """
  147. This endpoint can be used to generate a new RSA key pair. The keys are returned in PEM format.
  148. {
  149. "public_key": "<public key>",
  150. "private_key": "<private key>"
  151. }
  152. """
  153. permission_classes = [IsAuthenticated]
  154. def list(self, request):
  155. # Determine what size key to generate
  156. key_size = request.GET.get('key_size', 2048)
  157. if key_size not in range(2048, 4097, 256):
  158. key_size = 2048
  159. # Export RSA private and public keys in PEM format
  160. key = RSA.generate(key_size)
  161. private_key = key.exportKey('PEM')
  162. public_key = key.publickey().exportKey('PEM')
  163. return Response({
  164. 'private_key': private_key,
  165. 'public_key': public_key,
  166. })