views.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. import base64
  2. import logging
  3. from django.contrib import messages
  4. from django.db.models import Count
  5. from django.shortcuts import get_object_or_404, redirect, render
  6. from django.utils.html import escape
  7. from django.utils.safestring import mark_safe
  8. from utilities.views import (
  9. BulkDeleteView, BulkEditView, BulkImportView, ObjectView, ObjectDeleteView, ObjectEditView, ObjectListView,
  10. )
  11. from . import filters, forms, tables
  12. from .models import SecretRole, Secret, SessionKey, UserKey
  13. def get_session_key(request):
  14. """
  15. Extract and decode the session key sent with a request. Returns None if no session key was provided.
  16. """
  17. session_key = request.COOKIES.get('session_key', None)
  18. if session_key is not None:
  19. return base64.b64decode(session_key)
  20. return session_key
  21. #
  22. # Secret roles
  23. #
  24. class SecretRoleListView(ObjectListView):
  25. queryset = SecretRole.objects.annotate(secret_count=Count('secrets'))
  26. table = tables.SecretRoleTable
  27. class SecretRoleEditView(ObjectEditView):
  28. queryset = SecretRole.objects.all()
  29. model_form = forms.SecretRoleForm
  30. default_return_url = 'secrets:secretrole_list'
  31. class SecretRoleBulkImportView(BulkImportView):
  32. queryset = SecretRole.objects.all()
  33. model_form = forms.SecretRoleCSVForm
  34. table = tables.SecretRoleTable
  35. default_return_url = 'secrets:secretrole_list'
  36. class SecretRoleBulkDeleteView(BulkDeleteView):
  37. queryset = SecretRole.objects.annotate(secret_count=Count('secrets'))
  38. table = tables.SecretRoleTable
  39. default_return_url = 'secrets:secretrole_list'
  40. #
  41. # Secrets
  42. #
  43. class SecretListView(ObjectListView):
  44. queryset = Secret.objects.prefetch_related('role', 'device')
  45. filterset = filters.SecretFilterSet
  46. filterset_form = forms.SecretFilterForm
  47. table = tables.SecretTable
  48. action_buttons = ('import', 'export')
  49. class SecretView(ObjectView):
  50. queryset = Secret.objects.all()
  51. def get(self, request, pk):
  52. secret = get_object_or_404(self.queryset, pk=pk)
  53. return render(request, 'secrets/secret.html', {
  54. 'secret': secret,
  55. })
  56. class SecretEditView(ObjectEditView):
  57. queryset = Secret.objects.all()
  58. model_form = forms.SecretForm
  59. template_name = 'secrets/secret_edit.html'
  60. def dispatch(self, request, *args, **kwargs):
  61. # Check that the user has a valid UserKey
  62. try:
  63. uk = UserKey.objects.get(user=request.user)
  64. except UserKey.DoesNotExist:
  65. messages.warning(request, "This operation requires an active user key, but you don't have one.")
  66. return redirect('user:userkey')
  67. if not uk.is_active():
  68. messages.warning(request, "This operation is not available. Your user key has not been activated.")
  69. return redirect('user:userkey')
  70. return super().dispatch(request, *args, **kwargs)
  71. def post(self, request, *args, **kwargs):
  72. logger = logging.getLogger('netbox.views.ObjectEditView')
  73. session_key = get_session_key(request)
  74. secret = self.get_object(kwargs)
  75. form = self.model_form(request.POST, instance=secret)
  76. if form.is_valid():
  77. logger.debug("Form validation was successful")
  78. # We must have a session key in order to create a secret or update the plaintext of an existing secret
  79. if (form.cleaned_data['plaintext'] or secret.pk is None) and session_key is None:
  80. logger.debug("Unable to proceed: No session key was provided with the request")
  81. form.add_error(None, "No session key was provided with the request. Unable to encrypt secret data.")
  82. else:
  83. master_key = None
  84. try:
  85. sk = SessionKey.objects.get(userkey__user=request.user)
  86. master_key = sk.get_master_key(session_key)
  87. except SessionKey.DoesNotExist:
  88. logger.debug("Unable to proceed: User has no session key assigned")
  89. form.add_error(None, "No session key found for this user.")
  90. if master_key is not None:
  91. logger.debug("Successfully resolved master key for encryption")
  92. secret = form.save(commit=False)
  93. if form.cleaned_data['plaintext']:
  94. secret.plaintext = str(form.cleaned_data['plaintext'])
  95. secret.encrypt(master_key)
  96. secret.save()
  97. form.save_m2m()
  98. msg = '{} secret'.format('Created' if not form.instance.pk else 'Modified')
  99. logger.info(f"{msg} {secret} (PK: {secret.pk})")
  100. msg = '{} <a href="{}">{}</a>'.format(msg, secret.get_absolute_url(), escape(secret))
  101. messages.success(request, mark_safe(msg))
  102. return redirect(self.get_return_url(request, secret))
  103. else:
  104. logger.debug("Form validation failed")
  105. return render(request, self.template_name, {
  106. 'obj': secret,
  107. 'obj_type': self.queryset.model._meta.verbose_name,
  108. 'form': form,
  109. 'return_url': self.get_return_url(request, secret),
  110. })
  111. class SecretDeleteView(ObjectDeleteView):
  112. queryset = Secret.objects.all()
  113. default_return_url = 'secrets:secret_list'
  114. class SecretBulkImportView(BulkImportView):
  115. queryset = Secret.objects.all()
  116. model_form = forms.SecretCSVForm
  117. table = tables.SecretTable
  118. template_name = 'secrets/secret_import.html'
  119. default_return_url = 'secrets:secret_list'
  120. widget_attrs = {'class': 'requires-session-key'}
  121. master_key = None
  122. def _save_obj(self, obj_form, request):
  123. """
  124. Encrypt each object before saving it to the database.
  125. """
  126. obj = obj_form.save(commit=False)
  127. obj.encrypt(self.master_key)
  128. obj.save()
  129. return obj
  130. def post(self, request):
  131. # Grab the session key from cookies.
  132. session_key = request.COOKIES.get('session_key')
  133. if session_key:
  134. # Attempt to derive the master key using the provided session key.
  135. try:
  136. sk = SessionKey.objects.get(userkey__user=request.user)
  137. self.master_key = sk.get_master_key(base64.b64decode(session_key))
  138. except SessionKey.DoesNotExist:
  139. messages.error(request, "No session key found for this user.")
  140. if self.master_key is not None:
  141. return super().post(request)
  142. else:
  143. messages.error(request, "Invalid private key! Unable to encrypt secret data.")
  144. else:
  145. messages.error(request, "No session key was provided with the request. Unable to encrypt secret data.")
  146. return render(request, self.template_name, {
  147. 'form': self._import_form(request.POST),
  148. 'fields': self.model_form().fields,
  149. 'obj_type': self.model_form._meta.model._meta.verbose_name,
  150. 'return_url': self.get_return_url(request),
  151. })
  152. class SecretBulkEditView(BulkEditView):
  153. queryset = Secret.objects.prefetch_related('role', 'device')
  154. filterset = filters.SecretFilterSet
  155. table = tables.SecretTable
  156. form = forms.SecretBulkEditForm
  157. default_return_url = 'secrets:secret_list'
  158. class SecretBulkDeleteView(BulkDeleteView):
  159. queryset = Secret.objects.prefetch_related('role', 'device')
  160. filterset = filters.SecretFilterSet
  161. table = tables.SecretTable
  162. default_return_url = 'secrets:secret_list'