test_webhooks.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. import json
  2. import uuid
  3. from unittest.mock import patch
  4. import django_rq
  5. from django.contrib.contenttypes.models import ContentType
  6. from django.http import HttpResponse
  7. from django.urls import reverse
  8. from requests import Session
  9. from rest_framework import status
  10. from dcim.models import Site
  11. from extras.choices import ObjectChangeActionChoices
  12. from extras.models import Webhook
  13. from extras.webhooks import enqueue_webhooks, generate_signature
  14. from extras.webhooks_worker import process_webhook
  15. from utilities.testing import APITestCase
  16. class WebhookTest(APITestCase):
  17. def setUp(self):
  18. super().setUp()
  19. self.queue = django_rq.get_queue('default')
  20. self.queue.empty() # Begin each test with an empty queue
  21. @classmethod
  22. def setUpTestData(cls):
  23. site_ct = ContentType.objects.get_for_model(Site)
  24. DUMMY_URL = "http://localhost/"
  25. DUMMY_SECRET = "LOOKATMEIMASECRETSTRING"
  26. webhooks = Webhook.objects.bulk_create((
  27. Webhook(name='Site Create Webhook', type_create=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET, additional_headers={'X-Foo': 'Bar'}),
  28. Webhook(name='Site Update Webhook', type_update=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET),
  29. Webhook(name='Site Delete Webhook', type_delete=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET),
  30. ))
  31. for webhook in webhooks:
  32. webhook.obj_type.set([site_ct])
  33. def test_enqueue_webhook_create(self):
  34. # Create an object via the REST API
  35. data = {
  36. 'name': 'Test Site',
  37. 'slug': 'test-site',
  38. }
  39. url = reverse('dcim-api:site-list')
  40. response = self.client.post(url, data, format='json', **self.header)
  41. self.assertHttpStatus(response, status.HTTP_201_CREATED)
  42. self.assertEqual(Site.objects.count(), 1)
  43. # Verify that a job was queued for the object creation webhook
  44. self.assertEqual(self.queue.count, 1)
  45. job = self.queue.jobs[0]
  46. self.assertEqual(job.args[0], Webhook.objects.get(type_create=True))
  47. self.assertEqual(job.args[1]['id'], response.data['id'])
  48. self.assertEqual(job.args[2], 'site')
  49. self.assertEqual(job.args[3], ObjectChangeActionChoices.ACTION_CREATE)
  50. def test_enqueue_webhook_update(self):
  51. site = Site.objects.create(name='Site 1', slug='site-1')
  52. # Update an object via the REST API
  53. data = {
  54. 'comments': 'Updated the site',
  55. }
  56. url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
  57. response = self.client.patch(url, data, format='json', **self.header)
  58. self.assertHttpStatus(response, status.HTTP_200_OK)
  59. # Verify that a job was queued for the object update webhook
  60. self.assertEqual(self.queue.count, 1)
  61. job = self.queue.jobs[0]
  62. self.assertEqual(job.args[0], Webhook.objects.get(type_update=True))
  63. self.assertEqual(job.args[1]['id'], site.pk)
  64. self.assertEqual(job.args[2], 'site')
  65. self.assertEqual(job.args[3], ObjectChangeActionChoices.ACTION_UPDATE)
  66. def test_enqueue_webhook_delete(self):
  67. site = Site.objects.create(name='Site 1', slug='site-1')
  68. # Delete an object via the REST API
  69. url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
  70. response = self.client.delete(url, **self.header)
  71. self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
  72. # Verify that a job was queued for the object update webhook
  73. self.assertEqual(self.queue.count, 1)
  74. job = self.queue.jobs[0]
  75. self.assertEqual(job.args[0], Webhook.objects.get(type_delete=True))
  76. self.assertEqual(job.args[1]['id'], site.pk)
  77. self.assertEqual(job.args[2], 'site')
  78. self.assertEqual(job.args[3], ObjectChangeActionChoices.ACTION_DELETE)
  79. def test_webhooks_worker(self):
  80. request_id = uuid.uuid4()
  81. def dummy_send(_, request):
  82. """
  83. A dummy implementation of Session.send() to be used for testing.
  84. Always returns a 200 HTTP response.
  85. """
  86. webhook = Webhook.objects.get(type_create=True)
  87. signature = generate_signature(request.body, webhook.secret)
  88. # Validate the outgoing request headers
  89. self.assertEqual(request.headers['Content-Type'], webhook.http_content_type)
  90. self.assertEqual(request.headers['X-Hook-Signature'], signature)
  91. self.assertEqual(request.headers['X-Foo'], 'Bar')
  92. # Validate the outgoing request body
  93. body = json.loads(request.body)
  94. self.assertEqual(body['event'], 'created')
  95. self.assertEqual(body['timestamp'], job.args[4])
  96. self.assertEqual(body['model'], 'site')
  97. self.assertEqual(body['username'], 'testuser')
  98. self.assertEqual(body['request_id'], str(request_id))
  99. self.assertEqual(body['data']['name'], 'Site 1')
  100. return HttpResponse()
  101. # Enqueue a webhook for processing
  102. site = Site.objects.create(name='Site 1', slug='site-1')
  103. enqueue_webhooks(
  104. instance=site,
  105. user=self.user,
  106. request_id=request_id,
  107. action=ObjectChangeActionChoices.ACTION_CREATE
  108. )
  109. # Retrieve the job from queue
  110. job = self.queue.jobs[0]
  111. # Patch the Session object with our dummy_send() method, then process the webhook for sending
  112. with patch.object(Session, 'send', dummy_send) as mock_send:
  113. process_webhook(*job.args)