views.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. import base64
  2. from Crypto.Cipher import XOR
  3. from Crypto.PublicKey import RSA
  4. import os
  5. from django.http import HttpResponseBadRequest
  6. from rest_framework.authentication import BasicAuthentication, SessionAuthentication
  7. from rest_framework.permissions import IsAuthenticated
  8. from rest_framework.renderers import JSONRenderer
  9. from rest_framework.response import Response
  10. from rest_framework.views import APIView
  11. from rest_framework.viewsets import ModelViewSet
  12. from extras.api.renderers import FormlessBrowsableAPIRenderer, FreeRADIUSClientsRenderer
  13. from secrets.filters import SecretFilter
  14. from secrets.models import Secret, SecretRole, UserKey
  15. from utilities.api import WritableSerializerMixin
  16. from . import serializers
  17. ERR_USERKEY_MISSING = "No UserKey found for the current user."
  18. ERR_USERKEY_INACTIVE = "UserKey has not been activated for decryption."
  19. ERR_PRIVKEY_MISSING = "Private key was not provided."
  20. ERR_PRIVKEY_INVALID = "Invalid private key."
  21. #
  22. # Secret Roles
  23. #
  24. class SecretRoleViewSet(ModelViewSet):
  25. queryset = SecretRole.objects.all()
  26. serializer_class = serializers.SecretRoleSerializer
  27. permission_classes = [IsAuthenticated]
  28. #
  29. # Secrets
  30. #
  31. # TODO: Need to implement custom create() and update() methods to handle secret encryption.
  32. class SecretViewSet(WritableSerializerMixin, ModelViewSet):
  33. queryset = Secret.objects.select_related(
  34. 'device__primary_ip4', 'device__primary_ip6', 'role',
  35. ).prefetch_related(
  36. 'role__users', 'role__groups',
  37. )
  38. serializer_class = serializers.SecretSerializer
  39. write_serializer_class = serializers.WritableSecretSerializer
  40. filter_class = SecretFilter
  41. # DRF's BrowsableAPIRenderer can't support passing the secret key as a header, so we disable it.
  42. renderer_classes = [FormlessBrowsableAPIRenderer, JSONRenderer, FreeRADIUSClientsRenderer]
  43. # Enabled BasicAuthentication for testing (until we have TokenAuthentication implemented)
  44. authentication_classes = [BasicAuthentication, SessionAuthentication]
  45. permission_classes = [IsAuthenticated]
  46. def _get_master_key(self, request):
  47. cached_key = request.session.get('cached_key', None)
  48. session_key = request.COOKIES.get('session_key', None)
  49. if cached_key is None or session_key is None:
  50. return None
  51. cached_key = base64.b64decode(cached_key)
  52. session_key = base64.b64decode(session_key)
  53. xor = XOR.new(session_key)
  54. master_key = xor.encrypt(cached_key)
  55. return master_key
  56. def retrieve(self, request, *args, **kwargs):
  57. master_key = self._get_master_key(request)
  58. secret = self.get_object()
  59. if master_key is not None:
  60. secret.decrypt(master_key)
  61. serializer = self.get_serializer(secret)
  62. return Response(serializer.data)
  63. def list(self, request, *args, **kwargs):
  64. master_key = self._get_master_key(request)
  65. queryset = self.filter_queryset(self.get_queryset())
  66. # Pagination
  67. page = self.paginate_queryset(queryset)
  68. if page is not None:
  69. secrets = []
  70. if master_key is not None:
  71. for secret in page:
  72. secret.decrypt(master_key)
  73. secrets.append(secret)
  74. serializer = self.get_serializer(secrets, many=True)
  75. else:
  76. serializer = self.get_serializer(page, many=True)
  77. return self.get_paginated_response(serializer.data)
  78. serializer = self.get_serializer(queryset, many=True)
  79. return Response(serializer.data)
  80. class RSAKeyGeneratorView(APIView):
  81. """
  82. Generate a new RSA key pair for a user. Authenticated because it's a ripe avenue for DoS.
  83. """
  84. permission_classes = [IsAuthenticated]
  85. def get(self, request):
  86. # Determine what size key to generate
  87. key_size = request.GET.get('key_size', 2048)
  88. if key_size not in range(2048, 4097, 256):
  89. key_size = 2048
  90. # Export RSA private and public keys in PEM format
  91. key = RSA.generate(key_size)
  92. private_key = key.exportKey('PEM')
  93. public_key = key.publickey().exportKey('PEM')
  94. return Response({
  95. 'private_key': private_key,
  96. 'public_key': public_key,
  97. })
  98. class GetSessionKey(APIView):
  99. """
  100. Cache an encrypted copy of the master key derived from the submitted private key.
  101. """
  102. permission_classes = [IsAuthenticated]
  103. def post(self, request):
  104. # Read private key
  105. private_key = request.POST.get('private_key', None)
  106. if private_key is None:
  107. return HttpResponseBadRequest(ERR_PRIVKEY_MISSING)
  108. # Validate user key
  109. try:
  110. user_key = UserKey.objects.get(user=request.user)
  111. except UserKey.DoesNotExist:
  112. return HttpResponseBadRequest(ERR_USERKEY_MISSING)
  113. if not user_key.is_active():
  114. return HttpResponseBadRequest(ERR_USERKEY_INACTIVE)
  115. # Validate private key
  116. master_key = user_key.get_master_key(private_key)
  117. if master_key is None:
  118. return HttpResponseBadRequest(ERR_PRIVKEY_INVALID)
  119. # Generate a random 256-bit encryption key
  120. session_key = os.urandom(32)
  121. xor = XOR.new(session_key)
  122. cached_key = xor.encrypt(master_key)
  123. # Save XORed copy of the master key
  124. request.session['cached_key'] = base64.b64encode(cached_key)
  125. response = Response({
  126. 'session_key': base64.b64encode(session_key),
  127. })
  128. response.set_cookie('session_key', base64.b64encode(session_key))
  129. return response