webhooks.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import hashlib
  2. import hmac
  3. from collections import defaultdict
  4. from django.contrib.contenttypes.models import ContentType
  5. from django.utils import timezone
  6. from django_rq import get_queue
  7. from utilities.api import get_serializer_for_model
  8. from utilities.utils import serialize_object
  9. from .choices import *
  10. from .models import Webhook
  11. from .registry import registry
  12. def serialize_for_webhook(instance):
  13. """
  14. Return a serialized representation of the given instance suitable for use in a webhook.
  15. """
  16. serializer_class = get_serializer_for_model(instance.__class__)
  17. serializer_context = {
  18. 'request': None,
  19. }
  20. serializer = serializer_class(instance, context=serializer_context)
  21. return serializer.data
  22. def get_snapshots(instance, action):
  23. return {
  24. 'prechange': getattr(instance, '_prechange_snapshot', None),
  25. 'postchange': serialize_object(instance) if action != ObjectChangeActionChoices.ACTION_DELETE else None,
  26. }
  27. def generate_signature(request_body, secret):
  28. """
  29. Return a cryptographic signature that can be used to verify the authenticity of webhook data.
  30. """
  31. hmac_prep = hmac.new(
  32. key=secret.encode('utf8'),
  33. msg=request_body,
  34. digestmod=hashlib.sha512
  35. )
  36. return hmac_prep.hexdigest()
  37. def enqueue_object(queue, instance, user, request_id, action):
  38. """
  39. Enqueue a serialized representation of a created/updated/deleted object for the processing of
  40. webhooks once the request has completed.
  41. """
  42. # Determine whether this type of object supports webhooks
  43. app_label = instance._meta.app_label
  44. model_name = instance._meta.model_name
  45. if model_name not in registry['model_features']['webhooks'].get(app_label, []):
  46. return
  47. queue.append({
  48. 'content_type': ContentType.objects.get_for_model(instance),
  49. 'object_id': instance.pk,
  50. 'event': action,
  51. 'data': serialize_for_webhook(instance),
  52. 'snapshots': get_snapshots(instance, action),
  53. 'username': user.username,
  54. 'request_id': request_id
  55. })
  56. def flush_webhooks(queue):
  57. """
  58. Flush a list of object representation to RQ for webhook processing.
  59. """
  60. rq_queue = get_queue('default')
  61. webhooks_cache = {
  62. 'type_create': {},
  63. 'type_update': {},
  64. 'type_delete': {},
  65. }
  66. for data in queue:
  67. action_flag = {
  68. ObjectChangeActionChoices.ACTION_CREATE: 'type_create',
  69. ObjectChangeActionChoices.ACTION_UPDATE: 'type_update',
  70. ObjectChangeActionChoices.ACTION_DELETE: 'type_delete',
  71. }[data['event']]
  72. content_type = data['content_type']
  73. # Cache applicable Webhooks
  74. if content_type not in webhooks_cache[action_flag]:
  75. webhooks_cache[action_flag][content_type] = Webhook.objects.filter(
  76. **{action_flag: True},
  77. content_types=content_type,
  78. enabled=True
  79. )
  80. webhooks = webhooks_cache[action_flag][content_type]
  81. for webhook in webhooks:
  82. rq_queue.enqueue(
  83. "extras.webhooks_worker.process_webhook",
  84. webhook=webhook,
  85. model_name=content_type.model,
  86. event=data['event'],
  87. data=data['data'],
  88. snapshots=data['snapshots'],
  89. timestamp=str(timezone.now()),
  90. username=data['username'],
  91. request_id=data['request_id']
  92. )