test_webhooks.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. import json
  2. import uuid
  3. from unittest.mock import patch
  4. import django_rq
  5. from django.contrib.contenttypes.models import ContentType
  6. from django.http import HttpResponse
  7. from django.urls import reverse
  8. from requests import Session
  9. from rest_framework import status
  10. from dcim.choices import SiteStatusChoices
  11. from dcim.models import Site
  12. from extras.choices import ObjectChangeActionChoices
  13. from extras.models import Tag, Webhook
  14. from extras.webhooks import enqueue_object, flush_webhooks, generate_signature, serialize_for_webhook
  15. from extras.webhooks_worker import eval_conditions, process_webhook
  16. from utilities.testing import APITestCase
  17. class WebhookTest(APITestCase):
  18. def setUp(self):
  19. super().setUp()
  20. # Ensure the queue has been cleared for each test
  21. self.queue = django_rq.get_queue('default')
  22. self.queue.empty()
  23. @classmethod
  24. def setUpTestData(cls):
  25. site_ct = ContentType.objects.get_for_model(Site)
  26. DUMMY_URL = 'http://localhost:9000/'
  27. DUMMY_SECRET = 'LOOKATMEIMASECRETSTRING'
  28. webhooks = Webhook.objects.bulk_create((
  29. Webhook(name='Webhook 1', type_create=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET, additional_headers='X-Foo: Bar'),
  30. Webhook(name='Webhook 2', type_update=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET),
  31. Webhook(name='Webhook 3', type_delete=True, payload_url=DUMMY_URL, secret=DUMMY_SECRET),
  32. ))
  33. for webhook in webhooks:
  34. webhook.content_types.set([site_ct])
  35. Tag.objects.bulk_create((
  36. Tag(name='Foo', slug='foo'),
  37. Tag(name='Bar', slug='bar'),
  38. Tag(name='Baz', slug='baz'),
  39. ))
  40. def test_enqueue_webhook_create(self):
  41. # Create an object via the REST API
  42. data = {
  43. 'name': 'Site 1',
  44. 'slug': 'site-1',
  45. 'tags': [
  46. {'name': 'Foo'},
  47. {'name': 'Bar'},
  48. ]
  49. }
  50. url = reverse('dcim-api:site-list')
  51. self.add_permissions('dcim.add_site')
  52. response = self.client.post(url, data, format='json', **self.header)
  53. self.assertHttpStatus(response, status.HTTP_201_CREATED)
  54. self.assertEqual(Site.objects.count(), 1)
  55. self.assertEqual(Site.objects.first().tags.count(), 2)
  56. # Verify that a job was queued for the object creation webhook
  57. self.assertEqual(self.queue.count, 1)
  58. job = self.queue.jobs[0]
  59. self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_create=True))
  60. self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_CREATE)
  61. self.assertEqual(job.kwargs['model_name'], 'site')
  62. self.assertEqual(job.kwargs['data']['id'], response.data['id'])
  63. self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags']))
  64. self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site 1')
  65. self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Bar', 'Foo'])
  66. def test_enqueue_webhook_bulk_create(self):
  67. # Create multiple objects via the REST API
  68. data = [
  69. {
  70. 'name': 'Site 1',
  71. 'slug': 'site-1',
  72. 'tags': [
  73. {'name': 'Foo'},
  74. {'name': 'Bar'},
  75. ]
  76. },
  77. {
  78. 'name': 'Site 2',
  79. 'slug': 'site-2',
  80. 'tags': [
  81. {'name': 'Foo'},
  82. {'name': 'Bar'},
  83. ]
  84. },
  85. {
  86. 'name': 'Site 3',
  87. 'slug': 'site-3',
  88. 'tags': [
  89. {'name': 'Foo'},
  90. {'name': 'Bar'},
  91. ]
  92. },
  93. ]
  94. url = reverse('dcim-api:site-list')
  95. self.add_permissions('dcim.add_site')
  96. response = self.client.post(url, data, format='json', **self.header)
  97. self.assertHttpStatus(response, status.HTTP_201_CREATED)
  98. self.assertEqual(Site.objects.count(), 3)
  99. self.assertEqual(Site.objects.first().tags.count(), 2)
  100. # Verify that a webhook was queued for each object
  101. self.assertEqual(self.queue.count, 3)
  102. for i, job in enumerate(self.queue.jobs):
  103. self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_create=True))
  104. self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_CREATE)
  105. self.assertEqual(job.kwargs['model_name'], 'site')
  106. self.assertEqual(job.kwargs['data']['id'], response.data[i]['id'])
  107. self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags']))
  108. self.assertEqual(job.kwargs['snapshots']['postchange']['name'], response.data[i]['name'])
  109. self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Bar', 'Foo'])
  110. def test_enqueue_webhook_update(self):
  111. site = Site.objects.create(name='Site 1', slug='site-1')
  112. site.tags.set(Tag.objects.filter(name__in=['Foo', 'Bar']))
  113. # Update an object via the REST API
  114. data = {
  115. 'name': 'Site X',
  116. 'comments': 'Updated the site',
  117. 'tags': [
  118. {'name': 'Baz'}
  119. ]
  120. }
  121. url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
  122. self.add_permissions('dcim.change_site')
  123. response = self.client.patch(url, data, format='json', **self.header)
  124. self.assertHttpStatus(response, status.HTTP_200_OK)
  125. # Verify that a job was queued for the object update webhook
  126. self.assertEqual(self.queue.count, 1)
  127. job = self.queue.jobs[0]
  128. self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_update=True))
  129. self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_UPDATE)
  130. self.assertEqual(job.kwargs['model_name'], 'site')
  131. self.assertEqual(job.kwargs['data']['id'], site.pk)
  132. self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags']))
  133. self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
  134. self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
  135. self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site X')
  136. self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Baz'])
  137. def test_enqueue_webhook_bulk_update(self):
  138. sites = (
  139. Site(name='Site 1', slug='site-1'),
  140. Site(name='Site 2', slug='site-2'),
  141. Site(name='Site 3', slug='site-3'),
  142. )
  143. Site.objects.bulk_create(sites)
  144. for site in sites:
  145. site.tags.set(Tag.objects.filter(name__in=['Foo', 'Bar']))
  146. # Update three objects via the REST API
  147. data = [
  148. {
  149. 'id': sites[0].pk,
  150. 'name': 'Site X',
  151. 'tags': [
  152. {'name': 'Baz'}
  153. ]
  154. },
  155. {
  156. 'id': sites[1].pk,
  157. 'name': 'Site Y',
  158. 'tags': [
  159. {'name': 'Baz'}
  160. ]
  161. },
  162. {
  163. 'id': sites[2].pk,
  164. 'name': 'Site Z',
  165. 'tags': [
  166. {'name': 'Baz'}
  167. ]
  168. },
  169. ]
  170. url = reverse('dcim-api:site-list')
  171. self.add_permissions('dcim.change_site')
  172. response = self.client.patch(url, data, format='json', **self.header)
  173. self.assertHttpStatus(response, status.HTTP_200_OK)
  174. # Verify that a job was queued for the object update webhook
  175. self.assertEqual(self.queue.count, 3)
  176. for i, job in enumerate(self.queue.jobs):
  177. self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_update=True))
  178. self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_UPDATE)
  179. self.assertEqual(job.kwargs['model_name'], 'site')
  180. self.assertEqual(job.kwargs['data']['id'], data[i]['id'])
  181. self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags']))
  182. self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
  183. self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
  184. self.assertEqual(job.kwargs['snapshots']['postchange']['name'], response.data[i]['name'])
  185. self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Baz'])
  186. def test_enqueue_webhook_delete(self):
  187. site = Site.objects.create(name='Site 1', slug='site-1')
  188. site.tags.set(Tag.objects.filter(name__in=['Foo', 'Bar']))
  189. # Delete an object via the REST API
  190. url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
  191. self.add_permissions('dcim.delete_site')
  192. response = self.client.delete(url, **self.header)
  193. self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
  194. # Verify that a job was queued for the object update webhook
  195. self.assertEqual(self.queue.count, 1)
  196. job = self.queue.jobs[0]
  197. self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_delete=True))
  198. self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_DELETE)
  199. self.assertEqual(job.kwargs['model_name'], 'site')
  200. self.assertEqual(job.kwargs['data']['id'], site.pk)
  201. self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
  202. self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
  203. def test_enqueue_webhook_bulk_delete(self):
  204. sites = (
  205. Site(name='Site 1', slug='site-1'),
  206. Site(name='Site 2', slug='site-2'),
  207. Site(name='Site 3', slug='site-3'),
  208. )
  209. Site.objects.bulk_create(sites)
  210. for site in sites:
  211. site.tags.set(Tag.objects.filter(name__in=['Foo', 'Bar']))
  212. # Delete three objects via the REST API
  213. data = [
  214. {'id': site.pk} for site in sites
  215. ]
  216. url = reverse('dcim-api:site-list')
  217. self.add_permissions('dcim.delete_site')
  218. response = self.client.delete(url, data, format='json', **self.header)
  219. self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
  220. # Verify that a job was queued for the object update webhook
  221. self.assertEqual(self.queue.count, 3)
  222. for i, job in enumerate(self.queue.jobs):
  223. self.assertEqual(job.kwargs['webhook'], Webhook.objects.get(type_delete=True))
  224. self.assertEqual(job.kwargs['event'], ObjectChangeActionChoices.ACTION_DELETE)
  225. self.assertEqual(job.kwargs['model_name'], 'site')
  226. self.assertEqual(job.kwargs['data']['id'], sites[i].pk)
  227. self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
  228. self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
  229. def test_webhook_conditions(self):
  230. # Create a conditional Webhook
  231. webhook = Webhook(
  232. name='Conditional Webhook',
  233. type_create=True,
  234. type_update=True,
  235. payload_url='http://localhost:9000/',
  236. conditions={
  237. 'and': [
  238. {
  239. 'attr': 'status.value',
  240. 'value': 'active',
  241. }
  242. ]
  243. }
  244. )
  245. # Create a Site to evaluate
  246. site = Site.objects.create(name='Site 1', slug='site-1', status=SiteStatusChoices.STATUS_STAGING)
  247. data = serialize_for_webhook(site)
  248. # Evaluate the conditions (status='staging')
  249. self.assertFalse(eval_conditions(webhook, data))
  250. # Change the site's status
  251. site.status = SiteStatusChoices.STATUS_ACTIVE
  252. data = serialize_for_webhook(site)
  253. # Evaluate the conditions (status='active')
  254. self.assertTrue(eval_conditions(webhook, data))
  255. def test_webhooks_worker(self):
  256. request_id = uuid.uuid4()
  257. def dummy_send(_, request, **kwargs):
  258. """
  259. A dummy implementation of Session.send() to be used for testing.
  260. Always returns a 200 HTTP response.
  261. """
  262. webhook = Webhook.objects.get(type_create=True)
  263. signature = generate_signature(request.body, webhook.secret)
  264. # Validate the outgoing request headers
  265. self.assertEqual(request.headers['Content-Type'], webhook.http_content_type)
  266. self.assertEqual(request.headers['X-Hook-Signature'], signature)
  267. self.assertEqual(request.headers['X-Foo'], 'Bar')
  268. # Validate the outgoing request body
  269. body = json.loads(request.body)
  270. self.assertEqual(body['event'], 'created')
  271. self.assertEqual(body['timestamp'], job.kwargs['timestamp'])
  272. self.assertEqual(body['model'], 'site')
  273. self.assertEqual(body['username'], 'testuser')
  274. self.assertEqual(body['request_id'], str(request_id))
  275. self.assertEqual(body['data']['name'], 'Site 1')
  276. return HttpResponse()
  277. # Enqueue a webhook for processing
  278. webhooks_queue = []
  279. site = Site.objects.create(name='Site 1', slug='site-1')
  280. enqueue_object(
  281. webhooks_queue,
  282. instance=site,
  283. user=self.user,
  284. request_id=request_id,
  285. action=ObjectChangeActionChoices.ACTION_CREATE
  286. )
  287. flush_webhooks(webhooks_queue)
  288. # Retrieve the job from queue
  289. job = self.queue.jobs[0]
  290. # Patch the Session object with our dummy_send() method, then process the webhook for sending
  291. with patch.object(Session, 'send', dummy_send) as mock_send:
  292. process_webhook(**job.kwargs)