views.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. import base64
  2. from Crypto.PublicKey import RSA
  3. from django.core.urlresolvers import reverse
  4. from django.http import HttpResponseBadRequest
  5. from rest_framework.exceptions import ValidationError
  6. from rest_framework.permissions import IsAuthenticated
  7. from rest_framework.response import Response
  8. from rest_framework.viewsets import ModelViewSet, ViewSet
  9. from secrets.exceptions import InvalidSessionKey
  10. from secrets.filters import SecretFilter
  11. from secrets.models import Secret, SecretRole, SessionKey, UserKey
  12. from utilities.api import WritableSerializerMixin
  13. from . import serializers
  14. ERR_USERKEY_MISSING = "No UserKey found for the current user."
  15. ERR_USERKEY_INACTIVE = "UserKey has not been activated for decryption."
  16. ERR_PRIVKEY_MISSING = "Private key was not provided."
  17. ERR_PRIVKEY_INVALID = "Invalid private key."
  18. #
  19. # Secret Roles
  20. #
  21. class SecretRoleViewSet(ModelViewSet):
  22. queryset = SecretRole.objects.all()
  23. serializer_class = serializers.SecretRoleSerializer
  24. permission_classes = [IsAuthenticated]
  25. #
  26. # Secrets
  27. #
  28. class SecretViewSet(WritableSerializerMixin, ModelViewSet):
  29. queryset = Secret.objects.select_related(
  30. 'device__primary_ip4', 'device__primary_ip6', 'role',
  31. ).prefetch_related(
  32. 'role__users', 'role__groups',
  33. )
  34. serializer_class = serializers.SecretSerializer
  35. write_serializer_class = serializers.WritableSecretSerializer
  36. filter_class = SecretFilter
  37. master_key = None
  38. def _get_encrypted_fields(self, serializer):
  39. """
  40. Since we can't call encrypt() on the serializer like we can on the Secret model, we need to calculate the
  41. ciphertext and hash values by encrypting a dummy copy. These can be passed to the serializer's save() method.
  42. """
  43. s = Secret(plaintext=serializer.validated_data['plaintext'])
  44. s.encrypt(self.master_key)
  45. return ({
  46. 'ciphertext': s.ciphertext,
  47. 'hash': s.hash,
  48. })
  49. def initial(self, request, *args, **kwargs):
  50. super(SecretViewSet, self).initial(request, *args, **kwargs)
  51. # Read session key from HTTP cookie or header if it has been provided. The session key must be provided in order
  52. # to encrypt/decrypt secrets.
  53. if 'session_key' in request.COOKIES:
  54. session_key = base64.b64decode(request.COOKIES['session_key'])
  55. elif 'HTTP_X_SESSION_KEY' in request.META:
  56. session_key = base64.b64decode(request.META['HTTP_X_SESSION_KEY'])
  57. else:
  58. session_key = None
  59. # We can't encrypt secret plaintext without a session key.
  60. # assert False, self.action
  61. if self.action in ['create', 'update'] and session_key is None:
  62. raise ValidationError("A session key must be provided when creating or updating secrets.")
  63. # Attempt to retrieve the master key for encryption/decryption if a session key has been provided.
  64. if session_key is not None:
  65. try:
  66. sk = SessionKey.objects.get(userkey__user=request.user)
  67. self.master_key = sk.get_master_key(session_key)
  68. except (SessionKey.DoesNotExist, InvalidSessionKey):
  69. raise ValidationError("Invalid session key.")
  70. def retrieve(self, request, *args, **kwargs):
  71. secret = self.get_object()
  72. # Attempt to decrypt the secret if the master key is known
  73. if self.master_key is not None:
  74. secret.decrypt(self.master_key)
  75. serializer = self.get_serializer(secret)
  76. return Response(serializer.data)
  77. def list(self, request, *args, **kwargs):
  78. queryset = self.filter_queryset(self.get_queryset())
  79. page = self.paginate_queryset(queryset)
  80. if page is not None:
  81. # Attempt to decrypt all secrets if the master key is known
  82. if self.master_key is not None:
  83. secrets = []
  84. for secret in page:
  85. secret.decrypt(self.master_key)
  86. secrets.append(secret)
  87. serializer = self.get_serializer(secrets, many=True)
  88. else:
  89. serializer = self.get_serializer(page, many=True)
  90. return self.get_paginated_response(serializer.data)
  91. serializer = self.get_serializer(queryset, many=True)
  92. return Response(serializer.data)
  93. def perform_create(self, serializer):
  94. serializer.save(**self._get_encrypted_fields(serializer))
  95. def perform_update(self, serializer):
  96. serializer.save(**self._get_encrypted_fields(serializer))
  97. class GetSessionKeyViewSet(ViewSet):
  98. """
  99. Retrieve a temporary session key to use for encrypting and decrypting secrets via the API. The user's private RSA
  100. key is POSTed with the name `private_key`. An example:
  101. curl -v -X POST -H "Authorization: Token <token>" -H "Accept: application/json; indent=4" \\
  102. --data-urlencode "private_key@<filename>" https://netbox/api/secrets/get-session-key/
  103. This request will yield a base64-encoded session key to be included in an `X-Session-Key` header in future requests:
  104. {
  105. "session_key": "+8t4SI6XikgVmB5+/urhozx9O5qCQANyOk1MNe6taRf="
  106. }
  107. """
  108. permission_classes = [IsAuthenticated]
  109. def create(self, request):
  110. # Read private key
  111. private_key = request.POST.get('private_key', None)
  112. if private_key is None:
  113. return HttpResponseBadRequest(ERR_PRIVKEY_MISSING)
  114. # Validate user key
  115. try:
  116. user_key = UserKey.objects.get(user=request.user)
  117. except UserKey.DoesNotExist:
  118. return HttpResponseBadRequest(ERR_USERKEY_MISSING)
  119. if not user_key.is_active():
  120. return HttpResponseBadRequest(ERR_USERKEY_INACTIVE)
  121. # Validate private key
  122. master_key = user_key.get_master_key(private_key)
  123. if master_key is None:
  124. return HttpResponseBadRequest(ERR_PRIVKEY_INVALID)
  125. # Delete the existing SessionKey for this user if one exists
  126. SessionKey.objects.filter(userkey__user=request.user).delete()
  127. # Create a new SessionKey
  128. sk = SessionKey(userkey=user_key)
  129. sk.save(master_key=master_key)
  130. encoded_key = base64.b64encode(sk.key)
  131. # b64decode() returns a bytestring under Python 3
  132. if not isinstance(encoded_key, str):
  133. encoded_key = encoded_key.decode()
  134. # Craft the response
  135. response = Response({
  136. 'session_key': encoded_key,
  137. })
  138. # If token authentication is not in use, assign the session key as a cookie
  139. if request.auth is None:
  140. response.set_cookie('session_key', value=encoded_key)
  141. return response
  142. class GenerateRSAKeyPairViewSet(ViewSet):
  143. """
  144. This endpoint can be used to generate a new RSA key pair. The keys are returned in PEM format.
  145. {
  146. "public_key": "<public key>",
  147. "private_key": "<private key>"
  148. }
  149. """
  150. permission_classes = [IsAuthenticated]
  151. def list(self, request):
  152. # Determine what size key to generate
  153. key_size = request.GET.get('key_size', 2048)
  154. if key_size not in range(2048, 4097, 256):
  155. key_size = 2048
  156. # Export RSA private and public keys in PEM format
  157. key = RSA.generate(key_size)
  158. private_key = key.exportKey('PEM')
  159. public_key = key.publickey().exportKey('PEM')
  160. return Response({
  161. 'private_key': private_key,
  162. 'public_key': public_key,
  163. })