test_views.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385
  1. import json
  2. import urllib.parse
  3. import uuid
  4. from datetime import datetime
  5. from django.urls import reverse
  6. from django.utils import timezone
  7. from django_rq import get_queue
  8. from django_rq.settings import QUEUES_MAP
  9. from django_rq.workers import get_worker
  10. from rq.job import Job as RQ_Job, JobStatus
  11. from rq.registry import DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry
  12. from core.choices import ObjectChangeActionChoices
  13. from core.models import *
  14. from dcim.models import Site
  15. from users.models import User
  16. from utilities.testing import TestCase, ViewTestCases, create_tags, disable_logging
  17. class DataSourceTestCase(ViewTestCases.PrimaryObjectViewTestCase):
  18. model = DataSource
  19. @classmethod
  20. def setUpTestData(cls):
  21. data_sources = (
  22. DataSource(name='Data Source 1', type='local', source_url='file:///var/tmp/source1/'),
  23. DataSource(name='Data Source 2', type='local', source_url='file:///var/tmp/source2/'),
  24. DataSource(name='Data Source 3', type='local', source_url='file:///var/tmp/source3/'),
  25. )
  26. DataSource.objects.bulk_create(data_sources)
  27. tags = create_tags('Alpha', 'Bravo', 'Charlie')
  28. cls.form_data = {
  29. 'name': 'Data Source X',
  30. 'type': 'git',
  31. 'source_url': 'http:///exmaple/com/foo/bar/',
  32. 'description': 'Something',
  33. 'comments': 'Foo bar baz',
  34. 'tags': [t.pk for t in tags],
  35. }
  36. cls.csv_data = (
  37. "name,type,source_url,enabled",
  38. "Data Source 4,local,file:///var/tmp/source4/,true",
  39. "Data Source 5,local,file:///var/tmp/source4/,true",
  40. "Data Source 6,git,http:///exmaple/com/foo/bar/,false",
  41. )
  42. cls.csv_update_data = (
  43. "id,name,description",
  44. f"{data_sources[0].pk},Data Source 7,New description7",
  45. f"{data_sources[1].pk},Data Source 8,New description8",
  46. f"{data_sources[2].pk},Data Source 9,New description9",
  47. )
  48. cls.bulk_edit_data = {
  49. 'enabled': False,
  50. 'description': 'New description',
  51. }
  52. class DataFileTestCase(
  53. ViewTestCases.GetObjectViewTestCase,
  54. ViewTestCases.DeleteObjectViewTestCase,
  55. ViewTestCases.ListObjectsViewTestCase,
  56. ViewTestCases.BulkDeleteObjectsViewTestCase,
  57. ):
  58. model = DataFile
  59. @classmethod
  60. def setUpTestData(cls):
  61. datasource = DataSource.objects.create(
  62. name='Data Source 1',
  63. type='local',
  64. source_url='file:///var/tmp/source1/'
  65. )
  66. data_files = (
  67. DataFile(
  68. source=datasource,
  69. path='dir1/file1.txt',
  70. last_updated=timezone.now(),
  71. size=1000,
  72. hash='442da078f0111cbdf42f21903724f6597c692535f55bdfbbea758a1ae99ad9e1'
  73. ),
  74. DataFile(
  75. source=datasource,
  76. path='dir1/file2.txt',
  77. last_updated=timezone.now(),
  78. size=2000,
  79. hash='a78168c7c97115bafd96450ed03ea43acec495094c5caa28f0d02e20e3a76cc2'
  80. ),
  81. DataFile(
  82. source=datasource,
  83. path='dir1/file3.txt',
  84. last_updated=timezone.now(),
  85. size=3000,
  86. hash='12b8827a14c4d5a2f30b6c6e2b7983063988612391c6cbe8ee7493b59054827a'
  87. ),
  88. )
  89. DataFile.objects.bulk_create(data_files)
  90. # TODO: Convert to StandardTestCases.Views
  91. class ObjectChangeTestCase(TestCase):
  92. user_permissions = (
  93. 'core.view_objectchange',
  94. )
  95. @classmethod
  96. def setUpTestData(cls):
  97. site = Site(name='Site 1', slug='site-1')
  98. site.save()
  99. # Create three ObjectChanges
  100. user = User.objects.create_user(username='testuser2')
  101. for i in range(1, 4):
  102. oc = site.to_objectchange(action=ObjectChangeActionChoices.ACTION_UPDATE)
  103. oc.user = user
  104. oc.request_id = uuid.uuid4()
  105. oc.save()
  106. def test_objectchange_list(self):
  107. url = reverse('core:objectchange_list')
  108. params = {
  109. "user": User.objects.first().pk,
  110. }
  111. response = self.client.get('{}?{}'.format(url, urllib.parse.urlencode(params)))
  112. self.assertHttpStatus(response, 200)
  113. def test_objectchange(self):
  114. objectchange = ObjectChange.objects.first()
  115. response = self.client.get(objectchange.get_absolute_url())
  116. self.assertHttpStatus(response, 200)
  117. class BackgroundTaskTestCase(TestCase):
  118. user_permissions = ()
  119. # Dummy worker functions
  120. @staticmethod
  121. def dummy_job_default():
  122. return "Job finished"
  123. @staticmethod
  124. def dummy_job_high():
  125. return "Job finished"
  126. @staticmethod
  127. def dummy_job_failing():
  128. raise Exception("Job failed")
  129. def setUp(self):
  130. super().setUp()
  131. self.user.is_superuser = True
  132. self.user.is_active = True
  133. self.user.save()
  134. # Clear all queues prior to running each test
  135. get_queue('default').connection.flushall()
  136. get_queue('high').connection.flushall()
  137. get_queue('low').connection.flushall()
  138. def test_background_queue_list(self):
  139. url = reverse('core:background_queue_list')
  140. # Attempt to load view without permission
  141. self.user.is_superuser = False
  142. self.user.save()
  143. response = self.client.get(url)
  144. self.assertEqual(response.status_code, 403)
  145. # Load view with permission
  146. self.user.is_superuser = True
  147. self.user.save()
  148. response = self.client.get(url)
  149. self.assertEqual(response.status_code, 200)
  150. self.assertIn('default', str(response.content))
  151. self.assertIn('high', str(response.content))
  152. self.assertIn('low', str(response.content))
  153. def test_background_tasks_list_default(self):
  154. queue = get_queue('default')
  155. queue.enqueue(self.dummy_job_default)
  156. queue_index = QUEUES_MAP['default']
  157. response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'queued']))
  158. self.assertEqual(response.status_code, 200)
  159. self.assertIn('BackgroundTaskTestCase.dummy_job_default', str(response.content))
  160. def test_background_tasks_list_high(self):
  161. queue = get_queue('high')
  162. queue.enqueue(self.dummy_job_high)
  163. queue_index = QUEUES_MAP['high']
  164. response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'queued']))
  165. self.assertEqual(response.status_code, 200)
  166. self.assertIn('BackgroundTaskTestCase.dummy_job_high', str(response.content))
  167. def test_background_tasks_list_finished(self):
  168. queue = get_queue('default')
  169. job = queue.enqueue(self.dummy_job_default)
  170. queue_index = QUEUES_MAP['default']
  171. registry = FinishedJobRegistry(queue.name, queue.connection)
  172. registry.add(job, 2)
  173. response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'finished']))
  174. self.assertEqual(response.status_code, 200)
  175. self.assertIn('BackgroundTaskTestCase.dummy_job_default', str(response.content))
  176. def test_background_tasks_list_failed(self):
  177. queue = get_queue('default')
  178. job = queue.enqueue(self.dummy_job_default)
  179. queue_index = QUEUES_MAP['default']
  180. registry = FailedJobRegistry(queue.name, queue.connection)
  181. registry.add(job, 2)
  182. response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'failed']))
  183. self.assertEqual(response.status_code, 200)
  184. self.assertIn('BackgroundTaskTestCase.dummy_job_default', str(response.content))
  185. def test_background_tasks_scheduled(self):
  186. queue = get_queue('default')
  187. queue.enqueue_at(datetime.now(), self.dummy_job_default)
  188. queue_index = QUEUES_MAP['default']
  189. response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'scheduled']))
  190. self.assertEqual(response.status_code, 200)
  191. self.assertIn('BackgroundTaskTestCase.dummy_job_default', str(response.content))
  192. def test_background_tasks_list_deferred(self):
  193. queue = get_queue('default')
  194. job = queue.enqueue(self.dummy_job_default)
  195. queue_index = QUEUES_MAP['default']
  196. registry = DeferredJobRegistry(queue.name, queue.connection)
  197. registry.add(job, 2)
  198. response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'deferred']))
  199. self.assertEqual(response.status_code, 200)
  200. self.assertIn('BackgroundTaskTestCase.dummy_job_default', str(response.content))
  201. def test_background_task(self):
  202. queue = get_queue('default')
  203. job = queue.enqueue(self.dummy_job_default)
  204. response = self.client.get(reverse('core:background_task', args=[job.id]))
  205. self.assertEqual(response.status_code, 200)
  206. self.assertIn('Background Tasks', str(response.content))
  207. self.assertIn(str(job.id), str(response.content))
  208. self.assertIn('Callable', str(response.content))
  209. self.assertIn('Meta', str(response.content))
  210. self.assertIn('Keyword Arguments', str(response.content))
  211. def test_background_task_delete(self):
  212. queue = get_queue('default')
  213. job = queue.enqueue(self.dummy_job_default)
  214. response = self.client.post(reverse('core:background_task_delete', args=[job.id]), {'confirm': True})
  215. self.assertEqual(response.status_code, 302)
  216. self.assertFalse(RQ_Job.exists(job.id, connection=queue.connection))
  217. self.assertNotIn(job.id, queue.job_ids)
  218. def test_background_task_requeue(self):
  219. queue = get_queue('default')
  220. # Enqueue & run a job that will fail
  221. job = queue.enqueue(self.dummy_job_failing)
  222. worker = get_worker('default')
  223. with disable_logging():
  224. worker.work(burst=True)
  225. self.assertTrue(job.is_failed)
  226. # Re-enqueue the failed job and check that its status has been reset
  227. response = self.client.get(reverse('core:background_task_requeue', args=[job.id]))
  228. self.assertEqual(response.status_code, 302)
  229. self.assertFalse(job.is_failed)
  230. def test_background_task_enqueue(self):
  231. queue = get_queue('default')
  232. # Enqueue some jobs that each depends on its predecessor
  233. job = previous_job = None
  234. for _ in range(0, 3):
  235. job = queue.enqueue(self.dummy_job_default, depends_on=previous_job)
  236. previous_job = job
  237. # Check that the last job to be enqueued has a status of deferred
  238. self.assertIsNotNone(job)
  239. self.assertEqual(job.get_status(), JobStatus.DEFERRED)
  240. self.assertIsNone(job.enqueued_at)
  241. # Force-enqueue the deferred job
  242. response = self.client.get(reverse('core:background_task_enqueue', args=[job.id]))
  243. self.assertEqual(response.status_code, 302)
  244. # Check that job's status is updated correctly
  245. job = queue.fetch_job(job.id)
  246. self.assertEqual(job.get_status(), JobStatus.QUEUED)
  247. self.assertIsNotNone(job.enqueued_at)
  248. def test_background_task_stop(self):
  249. queue = get_queue('default')
  250. worker = get_worker('default')
  251. job = queue.enqueue(self.dummy_job_default)
  252. worker.prepare_job_execution(job)
  253. worker.prepare_execution(job)
  254. self.assertEqual(job.get_status(), JobStatus.STARTED)
  255. # Stop those jobs using the view
  256. started_job_registry = StartedJobRegistry(queue.name, connection=queue.connection)
  257. self.assertEqual(len(started_job_registry), 1)
  258. response = self.client.get(reverse('core:background_task_stop', args=[job.id]))
  259. self.assertEqual(response.status_code, 302)
  260. with disable_logging():
  261. worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started
  262. self.assertEqual(len(started_job_registry), 0)
  263. canceled_job_registry = FailedJobRegistry(queue.name, connection=queue.connection)
  264. self.assertEqual(len(canceled_job_registry), 1)
  265. self.assertIn(job.id, canceled_job_registry)
  266. def test_worker_list(self):
  267. worker1 = get_worker('default', name=uuid.uuid4().hex)
  268. worker1.register_birth()
  269. worker2 = get_worker('high')
  270. worker2.register_birth()
  271. queue_index = QUEUES_MAP['default']
  272. response = self.client.get(reverse('core:worker_list', args=[queue_index]))
  273. self.assertEqual(response.status_code, 200)
  274. self.assertIn(str(worker1.name), str(response.content))
  275. self.assertNotIn(str(worker2.name), str(response.content))
  276. def test_worker(self):
  277. worker1 = get_worker('default', name=uuid.uuid4().hex)
  278. worker1.register_birth()
  279. response = self.client.get(reverse('core:worker', args=[worker1.name]))
  280. self.assertEqual(response.status_code, 200)
  281. self.assertIn(str(worker1.name), str(response.content))
  282. self.assertIn('Birth', str(response.content))
  283. self.assertIn('Total working time', str(response.content))
  284. class SystemTestCase(TestCase):
  285. def setUp(self):
  286. super().setUp()
  287. self.user.is_superuser = True
  288. self.user.save()
  289. def test_system_view_default(self):
  290. # Test UI render
  291. response = self.client.get(reverse('core:system'))
  292. self.assertEqual(response.status_code, 200)
  293. # Test export
  294. response = self.client.get(f"{reverse('core:system')}?export=true")
  295. self.assertEqual(response.status_code, 200)
  296. data = json.loads(response.content)
  297. self.assertIn('netbox_release', data)
  298. self.assertIn('plugins', data)
  299. self.assertIn('config', data)
  300. self.assertIn('objects', data)
  301. def test_system_view_with_config_revision(self):
  302. ConfigRevision.objects.create()
  303. # Test UI render
  304. response = self.client.get(reverse('core:system'))
  305. self.assertEqual(response.status_code, 200)
  306. # Test export
  307. response = self.client.get(f"{reverse('core:system')}?export=true")
  308. self.assertEqual(response.status_code, 200)