views.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. import base64
  2. from Crypto.PublicKey import RSA
  3. from django.http import HttpResponseBadRequest
  4. from rest_framework.authentication import BasicAuthentication, SessionAuthentication
  5. from rest_framework.permissions import IsAuthenticated
  6. from rest_framework.renderers import JSONRenderer
  7. from rest_framework.response import Response
  8. from rest_framework.views import APIView
  9. from rest_framework.viewsets import ModelViewSet
  10. from extras.api.renderers import FormlessBrowsableAPIRenderer, FreeRADIUSClientsRenderer
  11. from secrets.filters import SecretFilter
  12. from secrets.models import Secret, SecretRole, UserKey
  13. from utilities.api import WritableSerializerMixin
  14. from . import serializers
  15. ERR_USERKEY_MISSING = "No UserKey found for the current user."
  16. ERR_USERKEY_INACTIVE = "UserKey has not been activated for decryption."
  17. ERR_PRIVKEY_INVALID = "Invalid private key."
  18. #
  19. # Secret Roles
  20. #
  21. class SecretRoleViewSet(ModelViewSet):
  22. queryset = SecretRole.objects.all()
  23. serializer_class = serializers.SecretRoleSerializer
  24. permission_classes = [IsAuthenticated]
  25. #
  26. # Secrets
  27. #
  28. # TODO: Need to implement custom create() and update() methods to handle secret encryption.
  29. # TODO: Figure out a better way of transmitting the private key
  30. class SecretViewSet(WritableSerializerMixin, ModelViewSet):
  31. queryset = Secret.objects.select_related(
  32. 'device__primary_ip4', 'device__primary_ip6', 'role',
  33. ).prefetch_related(
  34. 'role__users', 'role__groups',
  35. )
  36. serializer_class = serializers.SecretSerializer
  37. write_serializer_class = serializers.WritableSecretSerializer
  38. filter_class = SecretFilter
  39. # DRF's BrowsableAPIRenderer can't support passing the secret key as a header, so we disable it.
  40. renderer_classes = [FormlessBrowsableAPIRenderer, JSONRenderer, FreeRADIUSClientsRenderer]
  41. # Enabled BasicAuthentication for testing (until we have TokenAuthentication implemented)
  42. authentication_classes = [BasicAuthentication, SessionAuthentication]
  43. permission_classes = [IsAuthenticated]
  44. # The user's private key must be sent as a header (X-Private-Key). To overcome limitations around sending line
  45. # breaks in a header field, the key must be base64-encoded and stripped of all whitespace. This is a temporary
  46. # kludge until a more elegant approach can be devised.
  47. def _get_private_key(self, request):
  48. private_key_b64 = request.META.get('HTTP_X_PRIVATE_KEY', None)
  49. if private_key_b64 is None:
  50. return None
  51. # TODO: Handle invalid encoding
  52. return base64.b64decode(private_key_b64)
  53. def retrieve(self, request, *args, **kwargs):
  54. private_key = self._get_private_key(request)
  55. secret = self.get_object()
  56. # Attempt to unlock the Secret if a private key was provided
  57. if private_key is not None:
  58. try:
  59. user_key = UserKey.objects.get(user=request.user)
  60. except UserKey.DoesNotExist:
  61. return HttpResponseBadRequest(ERR_USERKEY_MISSING)
  62. if not user_key.is_active():
  63. return HttpResponseBadRequest(ERR_USERKEY_INACTIVE)
  64. master_key = user_key.get_master_key(private_key)
  65. if master_key is None:
  66. return HttpResponseBadRequest(ERR_PRIVKEY_INVALID)
  67. secret.decrypt(master_key)
  68. serializer = self.get_serializer(secret)
  69. return Response(serializer.data)
  70. def list(self, request, *args, **kwargs):
  71. private_key = self._get_private_key(request)
  72. queryset = self.filter_queryset(self.get_queryset())
  73. # Attempt to unlock the Secrets if a private key was provided
  74. master_key = None
  75. if private_key is not None:
  76. try:
  77. user_key = UserKey.objects.get(user=request.user)
  78. except UserKey.DoesNotExist:
  79. return HttpResponseBadRequest(ERR_USERKEY_MISSING)
  80. if not user_key.is_active():
  81. return HttpResponseBadRequest(ERR_USERKEY_INACTIVE)
  82. master_key = user_key.get_master_key(private_key)
  83. if master_key is None:
  84. return HttpResponseBadRequest(ERR_PRIVKEY_INVALID)
  85. # Pagination
  86. page = self.paginate_queryset(queryset)
  87. if page is not None:
  88. secrets = []
  89. if master_key is not None:
  90. for secret in page:
  91. secret.decrypt(master_key)
  92. secrets.append(secret)
  93. serializer = self.get_serializer(secrets, many=True)
  94. else:
  95. serializer = self.get_serializer(page, many=True)
  96. return self.get_paginated_response(serializer.data)
  97. serializer = self.get_serializer(queryset, many=True)
  98. return Response(serializer.data)
  99. class RSAKeyGeneratorView(APIView):
  100. """
  101. Generate a new RSA key pair for a user. Authenticated because it's a ripe avenue for DoS.
  102. """
  103. permission_classes = [IsAuthenticated]
  104. def get(self, request):
  105. # Determine what size key to generate
  106. key_size = request.GET.get('key_size', 2048)
  107. if key_size not in range(2048, 4097, 256):
  108. key_size = 2048
  109. # Export RSA private and public keys in PEM format
  110. key = RSA.generate(key_size)
  111. private_key = key.exportKey('PEM')
  112. public_key = key.publickey().exportKey('PEM')
  113. return Response({
  114. 'private_key': private_key,
  115. 'public_key': public_key,
  116. })