test_event_rules.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440
  1. import json
  2. import uuid
  3. from unittest.mock import patch
  4. import django_rq
  5. from django.http import HttpResponse
  6. from django.test import RequestFactory
  7. from django.urls import reverse
  8. from requests import Session
  9. from rest_framework import status
  10. from core.events import *
  11. from core.models import ObjectType
  12. from dcim.choices import SiteStatusChoices
  13. from dcim.models import Site
  14. from extras.choices import EventRuleActionChoices
  15. from extras.events import enqueue_event, flush_events, serialize_for_event
  16. from extras.models import EventRule, Tag, Webhook
  17. from extras.webhooks import generate_signature, send_webhook
  18. from netbox.context_managers import event_tracking
  19. from utilities.testing import APITestCase
  20. class EventRuleTest(APITestCase):
  21. def setUp(self):
  22. super().setUp()
  23. # Ensure the queue has been cleared for each test
  24. self.queue = django_rq.get_queue('default')
  25. self.queue.empty()
  26. @classmethod
  27. def setUpTestData(cls):
  28. site_type = ObjectType.objects.get_for_model(Site)
  29. DUMMY_URL = 'http://localhost:9000/'
  30. DUMMY_SECRET = 'LOOKATMEIMASECRETSTRING'
  31. webhooks = Webhook.objects.bulk_create((
  32. Webhook(name='Webhook 1', payload_url=DUMMY_URL, secret=DUMMY_SECRET, additional_headers='X-Foo: Bar'),
  33. Webhook(name='Webhook 2', payload_url=DUMMY_URL, secret=DUMMY_SECRET),
  34. Webhook(name='Webhook 3', payload_url=DUMMY_URL, secret=DUMMY_SECRET),
  35. ))
  36. webhook_type = ObjectType.objects.get(app_label='extras', model='webhook')
  37. event_rules = EventRule.objects.bulk_create((
  38. EventRule(
  39. name='Event Rule 1',
  40. event_types=[OBJECT_CREATED],
  41. action_type=EventRuleActionChoices.WEBHOOK,
  42. action_object_type=webhook_type,
  43. action_object_id=webhooks[0].id,
  44. action_data={"foo": 1},
  45. ),
  46. EventRule(
  47. name='Event Rule 2',
  48. event_types=[OBJECT_UPDATED],
  49. action_type=EventRuleActionChoices.WEBHOOK,
  50. action_object_type=webhook_type,
  51. action_object_id=webhooks[0].id,
  52. action_data={"foo": 2},
  53. ),
  54. EventRule(
  55. name='Event Rule 3',
  56. event_types=[OBJECT_DELETED],
  57. action_type=EventRuleActionChoices.WEBHOOK,
  58. action_object_type=webhook_type,
  59. action_object_id=webhooks[0].id,
  60. action_data={"foo": 3},
  61. ),
  62. ))
  63. for event_rule in event_rules:
  64. event_rule.object_types.set([site_type])
  65. Tag.objects.bulk_create((
  66. Tag(name='Foo', slug='foo'),
  67. Tag(name='Bar', slug='bar'),
  68. Tag(name='Baz', slug='baz'),
  69. ))
  70. def test_eventrule_conditions(self):
  71. """
  72. Test evaluation of EventRule conditions.
  73. """
  74. event_rule = EventRule(
  75. name='Event Rule 1',
  76. event_types=[OBJECT_CREATED, OBJECT_UPDATED],
  77. conditions={
  78. 'and': [
  79. {
  80. 'attr': 'status.value',
  81. 'value': 'active',
  82. }
  83. ]
  84. }
  85. )
  86. # Create a Site to evaluate
  87. site = Site.objects.create(name='Site 1', slug='site-1', status=SiteStatusChoices.STATUS_STAGING)
  88. data = serialize_for_event(site)
  89. # Evaluate the conditions (status='staging')
  90. self.assertFalse(event_rule.eval_conditions(data))
  91. # Change the site's status
  92. site.status = SiteStatusChoices.STATUS_ACTIVE
  93. data = serialize_for_event(site)
  94. # Evaluate the conditions (status='active')
  95. self.assertTrue(event_rule.eval_conditions(data))
  96. def test_single_create_process_eventrule(self):
  97. """
  98. Check that creating an object with an applicable EventRule queues a background task for the rule's action.
  99. """
  100. # Create an object via the REST API
  101. data = {
  102. 'name': 'Site 1',
  103. 'slug': 'site-1',
  104. 'tags': [
  105. {'name': 'Foo'},
  106. {'name': 'Bar'},
  107. ]
  108. }
  109. url = reverse('dcim-api:site-list')
  110. self.add_permissions('dcim.add_site')
  111. response = self.client.post(url, data, format='json', **self.header)
  112. self.assertHttpStatus(response, status.HTTP_201_CREATED)
  113. self.assertEqual(Site.objects.count(), 1)
  114. self.assertEqual(Site.objects.first().tags.count(), 2)
  115. # Verify that a background task was queued for the new object
  116. self.assertEqual(self.queue.count, 1)
  117. job = self.queue.jobs[0]
  118. self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(name='Event Rule 1'))
  119. self.assertEqual(job.kwargs['event_type'], OBJECT_CREATED)
  120. self.assertEqual(job.kwargs['object_type'], ObjectType.objects.get_for_model(Site))
  121. self.assertEqual(job.kwargs['data']['id'], response.data['id'])
  122. self.assertEqual(job.kwargs['data']['foo'], 1)
  123. self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags']))
  124. self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site 1')
  125. self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Bar', 'Foo'])
  126. def test_bulk_create_process_eventrule(self):
  127. """
  128. Check that bulk creating multiple objects with an applicable EventRule queues a background task for each
  129. new object.
  130. """
  131. # Create multiple objects via the REST API
  132. data = [
  133. {
  134. 'name': 'Site 1',
  135. 'slug': 'site-1',
  136. 'tags': [
  137. {'name': 'Foo'},
  138. {'name': 'Bar'},
  139. ]
  140. },
  141. {
  142. 'name': 'Site 2',
  143. 'slug': 'site-2',
  144. 'tags': [
  145. {'name': 'Foo'},
  146. {'name': 'Bar'},
  147. ]
  148. },
  149. {
  150. 'name': 'Site 3',
  151. 'slug': 'site-3',
  152. 'tags': [
  153. {'name': 'Foo'},
  154. {'name': 'Bar'},
  155. ]
  156. },
  157. ]
  158. url = reverse('dcim-api:site-list')
  159. self.add_permissions('dcim.add_site')
  160. response = self.client.post(url, data, format='json', **self.header)
  161. self.assertHttpStatus(response, status.HTTP_201_CREATED)
  162. self.assertEqual(Site.objects.count(), 3)
  163. self.assertEqual(Site.objects.first().tags.count(), 2)
  164. # Verify that a background task was queued for each new object
  165. self.assertEqual(self.queue.count, 3)
  166. for i, job in enumerate(self.queue.jobs):
  167. self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(name='Event Rule 1'))
  168. self.assertEqual(job.kwargs['event_type'], OBJECT_CREATED)
  169. self.assertEqual(job.kwargs['object_type'], ObjectType.objects.get_for_model(Site))
  170. self.assertEqual(job.kwargs['data']['id'], response.data[i]['id'])
  171. self.assertEqual(job.kwargs['data']['foo'], 1)
  172. self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags']))
  173. self.assertEqual(job.kwargs['snapshots']['postchange']['name'], response.data[i]['name'])
  174. self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Bar', 'Foo'])
  175. def test_single_update_process_eventrule(self):
  176. """
  177. Check that updating an object with an applicable EventRule queues a background task for the rule's action.
  178. """
  179. site = Site.objects.create(name='Site 1', slug='site-1')
  180. site.tags.set(Tag.objects.filter(name__in=['Foo', 'Bar']))
  181. # Update an object via the REST API
  182. data = {
  183. 'name': 'Site X',
  184. 'comments': 'Updated the site',
  185. 'tags': [
  186. {'name': 'Baz'}
  187. ]
  188. }
  189. url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
  190. self.add_permissions('dcim.change_site')
  191. response = self.client.patch(url, data, format='json', **self.header)
  192. self.assertHttpStatus(response, status.HTTP_200_OK)
  193. # Verify that a background task was queued for the updated object
  194. self.assertEqual(self.queue.count, 1)
  195. job = self.queue.jobs[0]
  196. self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(name='Event Rule 2'))
  197. self.assertEqual(job.kwargs['event_type'], OBJECT_UPDATED)
  198. self.assertEqual(job.kwargs['object_type'], ObjectType.objects.get_for_model(Site))
  199. self.assertEqual(job.kwargs['data']['id'], site.pk)
  200. self.assertEqual(job.kwargs['data']['foo'], 2)
  201. self.assertEqual(len(job.kwargs['data']['tags']), len(response.data['tags']))
  202. self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
  203. self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
  204. self.assertEqual(job.kwargs['snapshots']['postchange']['name'], 'Site X')
  205. self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Baz'])
  206. def test_bulk_update_process_eventrule(self):
  207. """
  208. Check that bulk updating multiple objects with an applicable EventRule queues a background task for each
  209. updated object.
  210. """
  211. sites = (
  212. Site(name='Site 1', slug='site-1'),
  213. Site(name='Site 2', slug='site-2'),
  214. Site(name='Site 3', slug='site-3'),
  215. )
  216. Site.objects.bulk_create(sites)
  217. for site in sites:
  218. site.tags.set(Tag.objects.filter(name__in=['Foo', 'Bar']))
  219. # Update three objects via the REST API
  220. data = [
  221. {
  222. 'id': sites[0].pk,
  223. 'name': 'Site X',
  224. 'tags': [
  225. {'name': 'Baz'}
  226. ]
  227. },
  228. {
  229. 'id': sites[1].pk,
  230. 'name': 'Site Y',
  231. 'tags': [
  232. {'name': 'Baz'}
  233. ]
  234. },
  235. {
  236. 'id': sites[2].pk,
  237. 'name': 'Site Z',
  238. 'tags': [
  239. {'name': 'Baz'}
  240. ]
  241. },
  242. ]
  243. url = reverse('dcim-api:site-list')
  244. self.add_permissions('dcim.change_site')
  245. response = self.client.patch(url, data, format='json', **self.header)
  246. self.assertHttpStatus(response, status.HTTP_200_OK)
  247. # Verify that a background task was queued for each updated object
  248. self.assertEqual(self.queue.count, 3)
  249. for i, job in enumerate(self.queue.jobs):
  250. self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(name='Event Rule 2'))
  251. self.assertEqual(job.kwargs['event_type'], OBJECT_UPDATED)
  252. self.assertEqual(job.kwargs['object_type'], ObjectType.objects.get_for_model(Site))
  253. self.assertEqual(job.kwargs['data']['id'], data[i]['id'])
  254. self.assertEqual(job.kwargs['data']['foo'], 2)
  255. self.assertEqual(len(job.kwargs['data']['tags']), len(response.data[i]['tags']))
  256. self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
  257. self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
  258. self.assertEqual(job.kwargs['snapshots']['postchange']['name'], response.data[i]['name'])
  259. self.assertEqual(job.kwargs['snapshots']['postchange']['tags'], ['Baz'])
  260. def test_single_delete_process_eventrule(self):
  261. """
  262. Check that deleting an object with an applicable EventRule queues a background task for the rule's action.
  263. """
  264. site = Site.objects.create(name='Site 1', slug='site-1')
  265. site.tags.set(Tag.objects.filter(name__in=['Foo', 'Bar']))
  266. # Delete an object via the REST API
  267. url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
  268. self.add_permissions('dcim.delete_site')
  269. response = self.client.delete(url, **self.header)
  270. self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
  271. # Verify that a task was queued for the deleted object
  272. self.assertEqual(self.queue.count, 1)
  273. job = self.queue.jobs[0]
  274. self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(name='Event Rule 3'))
  275. self.assertEqual(job.kwargs['event_type'], OBJECT_DELETED)
  276. self.assertEqual(job.kwargs['object_type'], ObjectType.objects.get_for_model(Site))
  277. self.assertEqual(job.kwargs['data']['id'], site.pk)
  278. self.assertEqual(job.kwargs['data']['foo'], 3)
  279. self.assertEqual(job.kwargs['snapshots']['prechange']['name'], 'Site 1')
  280. self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
  281. def test_bulk_delete_process_eventrule(self):
  282. """
  283. Check that bulk deleting multiple objects with an applicable EventRule queues a background task for each
  284. deleted object.
  285. """
  286. sites = (
  287. Site(name='Site 1', slug='site-1'),
  288. Site(name='Site 2', slug='site-2'),
  289. Site(name='Site 3', slug='site-3'),
  290. )
  291. Site.objects.bulk_create(sites)
  292. for site in sites:
  293. site.tags.set(Tag.objects.filter(name__in=['Foo', 'Bar']))
  294. # Delete three objects via the REST API
  295. data = [
  296. {'id': site.pk} for site in sites
  297. ]
  298. url = reverse('dcim-api:site-list')
  299. self.add_permissions('dcim.delete_site')
  300. response = self.client.delete(url, data, format='json', **self.header)
  301. self.assertHttpStatus(response, status.HTTP_204_NO_CONTENT)
  302. # Verify that a background task was queued for each deleted object
  303. self.assertEqual(self.queue.count, 3)
  304. for i, job in enumerate(self.queue.jobs):
  305. self.assertEqual(job.kwargs['event_rule'], EventRule.objects.get(name='Event Rule 3'))
  306. self.assertEqual(job.kwargs['event_type'], OBJECT_DELETED)
  307. self.assertEqual(job.kwargs['object_type'], ObjectType.objects.get_for_model(Site))
  308. self.assertEqual(job.kwargs['data']['id'], sites[i].pk)
  309. self.assertEqual(job.kwargs['data']['foo'], 3)
  310. self.assertEqual(job.kwargs['snapshots']['prechange']['name'], sites[i].name)
  311. self.assertEqual(job.kwargs['snapshots']['prechange']['tags'], ['Bar', 'Foo'])
  312. def test_send_webhook(self):
  313. request_id = uuid.uuid4()
  314. def dummy_send(_, request, **kwargs):
  315. """
  316. A dummy implementation of Session.send() to be used for testing.
  317. Always returns a 200 HTTP response.
  318. """
  319. event = EventRule.objects.get(name='Event Rule 1')
  320. webhook = event.action_object
  321. signature = generate_signature(request.body, webhook.secret)
  322. # Validate the outgoing request headers
  323. self.assertEqual(request.headers['Content-Type'], webhook.http_content_type)
  324. self.assertEqual(request.headers['X-Hook-Signature'], signature)
  325. self.assertEqual(request.headers['X-Foo'], 'Bar')
  326. # Validate the outgoing request body
  327. body = json.loads(request.body)
  328. self.assertEqual(body['event'], 'created')
  329. self.assertEqual(body['timestamp'], job.kwargs['timestamp'])
  330. self.assertEqual(body['model'], 'site')
  331. self.assertEqual(body['username'], 'testuser')
  332. self.assertEqual(body['request_id'], str(request_id))
  333. self.assertEqual(body['data']['name'], 'Site 1')
  334. self.assertEqual(body['data']['foo'], 1)
  335. self.assertEqual(body['context']['foo'], 123) # From netbox.tests.dummy_plugin
  336. return HttpResponse()
  337. # Create a dummy request
  338. request = RequestFactory().get(reverse('dcim:site_add'))
  339. request.id = request_id
  340. request.user = self.user
  341. # Enqueue a webhook for processing
  342. webhooks_queue = {}
  343. site = Site.objects.create(name='Site 1', slug='site-1')
  344. enqueue_event(
  345. webhooks_queue,
  346. instance=site,
  347. request=request,
  348. event_type=OBJECT_CREATED,
  349. )
  350. flush_events(list(webhooks_queue.values()))
  351. # Retrieve the job from queue
  352. job = self.queue.jobs[0]
  353. # Patch the Session object with our dummy_send() method, then process the webhook for sending
  354. with patch.object(Session, 'send', dummy_send):
  355. send_webhook(**job.kwargs)
  356. def test_duplicate_triggers(self):
  357. """
  358. Test for erroneous duplicate event triggers resulting from saving an object multiple times
  359. within the span of a single request.
  360. """
  361. url = reverse('dcim:site_add')
  362. request = RequestFactory().get(url)
  363. request.id = uuid.uuid4()
  364. request.user = self.user
  365. # Test create & update
  366. with event_tracking(request):
  367. site = Site(name='Site 1', slug='site-1')
  368. site.save()
  369. site.description = 'foo'
  370. site.save()
  371. self.assertEqual(self.queue.count, 1, msg="Duplicate jobs found in queue")
  372. job = self.queue.get_jobs()[0]
  373. self.assertEqual(job.kwargs['event_type'], OBJECT_CREATED)
  374. self.queue.empty()
  375. # Test multiple updates
  376. site = Site.objects.create(name='Site 2', slug='site-2')
  377. with event_tracking(request):
  378. site.description = 'foo'
  379. site.save()
  380. site.description = 'bar'
  381. site.save()
  382. self.assertEqual(self.queue.count, 1, msg="Duplicate jobs found in queue")
  383. job = self.queue.get_jobs()[0]
  384. self.assertEqual(job.kwargs['event_type'], OBJECT_UPDATED)
  385. self.queue.empty()
  386. # Test update & delete
  387. site = Site.objects.create(name='Site 3', slug='site-3')
  388. with event_tracking(request):
  389. site.description = 'foo'
  390. site.save()
  391. site.delete()
  392. self.assertEqual(self.queue.count, 1, msg="Duplicate jobs found in queue")
  393. job = self.queue.get_jobs()[0]
  394. self.assertEqual(job.kwargs['event_type'], OBJECT_DELETED)
  395. self.queue.empty()