| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- import hashlib
- import hmac
- from django.contrib.contenttypes.models import ContentType
- from django.utils import timezone
- from django_rq import get_queue
- from extras.models import Webhook
- from utilities.api import get_serializer_for_model
- from .choices import *
- from .utils import FeatureQuery
- def generate_signature(request_body, secret):
- """
- Return a cryptographic signature that can be used to verify the authenticity of webhook data.
- """
- hmac_prep = hmac.new(
- key=secret.encode('utf8'),
- msg=request_body,
- digestmod=hashlib.sha512
- )
- return hmac_prep.hexdigest()
- def enqueue_webhooks(instance, user, request_id, action):
- """
- Find Webhook(s) assigned to this instance + action and enqueue them
- to be processed
- """
- obj_type = ContentType.objects.get_for_model(instance.__class__)
- webhook_models = ContentType.objects.filter(FeatureQuery('webhooks').get_query())
- if obj_type not in webhook_models:
- return
- # Retrieve any applicable Webhooks
- action_flag = {
- ObjectChangeActionChoices.ACTION_CREATE: 'type_create',
- ObjectChangeActionChoices.ACTION_UPDATE: 'type_update',
- ObjectChangeActionChoices.ACTION_DELETE: 'type_delete',
- }[action]
- webhooks = Webhook.objects.filter(obj_type=obj_type, enabled=True, **{action_flag: True})
- if webhooks.exists():
- # Get the Model's API serializer class and serialize the object
- serializer_class = get_serializer_for_model(instance.__class__)
- serializer_context = {
- 'request': None,
- }
- serializer = serializer_class(instance, context=serializer_context)
- # Enqueue the webhooks
- webhook_queue = get_queue('default')
- for webhook in webhooks:
- webhook_queue.enqueue(
- "extras.webhooks_worker.process_webhook",
- webhook,
- serializer.data,
- instance._meta.model_name,
- action,
- str(timezone.now()),
- user.username,
- request_id
- )
|