test_views.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347
  1. import urllib.parse
  2. import uuid
  3. from datetime import datetime
  4. from django.urls import reverse
  5. from django.utils import timezone
  6. from django_rq import get_queue
  7. from django_rq.settings import QUEUES_MAP
  8. from django_rq.workers import get_worker
  9. from rq.job import Job as RQ_Job, JobStatus
  10. from rq.registry import DeferredJobRegistry, FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry
  11. from core.choices import ObjectChangeActionChoices
  12. from core.models import *
  13. from dcim.models import Site
  14. from users.models import User
  15. from utilities.testing import TestCase, ViewTestCases, create_tags
  16. class DataSourceTestCase(ViewTestCases.PrimaryObjectViewTestCase):
  17. model = DataSource
  18. @classmethod
  19. def setUpTestData(cls):
  20. data_sources = (
  21. DataSource(name='Data Source 1', type='local', source_url='file:///var/tmp/source1/'),
  22. DataSource(name='Data Source 2', type='local', source_url='file:///var/tmp/source2/'),
  23. DataSource(name='Data Source 3', type='local', source_url='file:///var/tmp/source3/'),
  24. )
  25. DataSource.objects.bulk_create(data_sources)
  26. tags = create_tags('Alpha', 'Bravo', 'Charlie')
  27. cls.form_data = {
  28. 'name': 'Data Source X',
  29. 'type': 'git',
  30. 'source_url': 'http:///exmaple/com/foo/bar/',
  31. 'description': 'Something',
  32. 'comments': 'Foo bar baz',
  33. 'tags': [t.pk for t in tags],
  34. }
  35. cls.csv_data = (
  36. "name,type,source_url,enabled",
  37. "Data Source 4,local,file:///var/tmp/source4/,true",
  38. "Data Source 5,local,file:///var/tmp/source4/,true",
  39. "Data Source 6,git,http:///exmaple/com/foo/bar/,false",
  40. )
  41. cls.csv_update_data = (
  42. "id,name,description",
  43. f"{data_sources[0].pk},Data Source 7,New description7",
  44. f"{data_sources[1].pk},Data Source 8,New description8",
  45. f"{data_sources[2].pk},Data Source 9,New description9",
  46. )
  47. cls.bulk_edit_data = {
  48. 'enabled': False,
  49. 'description': 'New description',
  50. }
  51. class DataFileTestCase(
  52. ViewTestCases.GetObjectViewTestCase,
  53. ViewTestCases.DeleteObjectViewTestCase,
  54. ViewTestCases.ListObjectsViewTestCase,
  55. ViewTestCases.BulkDeleteObjectsViewTestCase,
  56. ):
  57. model = DataFile
  58. @classmethod
  59. def setUpTestData(cls):
  60. datasource = DataSource.objects.create(
  61. name='Data Source 1',
  62. type='local',
  63. source_url='file:///var/tmp/source1/'
  64. )
  65. data_files = (
  66. DataFile(
  67. source=datasource,
  68. path='dir1/file1.txt',
  69. last_updated=timezone.now(),
  70. size=1000,
  71. hash='442da078f0111cbdf42f21903724f6597c692535f55bdfbbea758a1ae99ad9e1'
  72. ),
  73. DataFile(
  74. source=datasource,
  75. path='dir1/file2.txt',
  76. last_updated=timezone.now(),
  77. size=2000,
  78. hash='a78168c7c97115bafd96450ed03ea43acec495094c5caa28f0d02e20e3a76cc2'
  79. ),
  80. DataFile(
  81. source=datasource,
  82. path='dir1/file3.txt',
  83. last_updated=timezone.now(),
  84. size=3000,
  85. hash='12b8827a14c4d5a2f30b6c6e2b7983063988612391c6cbe8ee7493b59054827a'
  86. ),
  87. )
  88. DataFile.objects.bulk_create(data_files)
  89. # TODO: Convert to StandardTestCases.Views
  90. class ObjectChangeTestCase(TestCase):
  91. user_permissions = (
  92. 'core.view_objectchange',
  93. )
  94. @classmethod
  95. def setUpTestData(cls):
  96. site = Site(name='Site 1', slug='site-1')
  97. site.save()
  98. # Create three ObjectChanges
  99. user = User.objects.create_user(username='testuser2')
  100. for i in range(1, 4):
  101. oc = site.to_objectchange(action=ObjectChangeActionChoices.ACTION_UPDATE)
  102. oc.user = user
  103. oc.request_id = uuid.uuid4()
  104. oc.save()
  105. def test_objectchange_list(self):
  106. url = reverse('core:objectchange_list')
  107. params = {
  108. "user": User.objects.first().pk,
  109. }
  110. response = self.client.get('{}?{}'.format(url, urllib.parse.urlencode(params)))
  111. self.assertHttpStatus(response, 200)
  112. def test_objectchange(self):
  113. objectchange = ObjectChange.objects.first()
  114. response = self.client.get(objectchange.get_absolute_url())
  115. self.assertHttpStatus(response, 200)
  116. class BackgroundTaskTestCase(TestCase):
  117. user_permissions = ()
  118. # Dummy worker functions
  119. @staticmethod
  120. def dummy_job_default():
  121. return "Job finished"
  122. @staticmethod
  123. def dummy_job_high():
  124. return "Job finished"
  125. @staticmethod
  126. def dummy_job_failing():
  127. raise Exception("Job failed")
  128. def setUp(self):
  129. super().setUp()
  130. self.user.is_staff = True
  131. self.user.is_active = True
  132. self.user.save()
  133. # Clear all queues prior to running each test
  134. get_queue('default').connection.flushall()
  135. get_queue('high').connection.flushall()
  136. get_queue('low').connection.flushall()
  137. def test_background_queue_list(self):
  138. url = reverse('core:background_queue_list')
  139. # Attempt to load view without permission
  140. self.user.is_staff = False
  141. self.user.save()
  142. response = self.client.get(url)
  143. self.assertEqual(response.status_code, 403)
  144. # Load view with permission
  145. self.user.is_staff = True
  146. self.user.save()
  147. response = self.client.get(url)
  148. self.assertEqual(response.status_code, 200)
  149. self.assertIn('default', str(response.content))
  150. self.assertIn('high', str(response.content))
  151. self.assertIn('low', str(response.content))
  152. def test_background_tasks_list_default(self):
  153. queue = get_queue('default')
  154. queue.enqueue(self.dummy_job_default)
  155. queue_index = QUEUES_MAP['default']
  156. response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'queued']))
  157. self.assertEqual(response.status_code, 200)
  158. self.assertIn('BackgroundTaskTestCase.dummy_job_default', str(response.content))
  159. def test_background_tasks_list_high(self):
  160. queue = get_queue('high')
  161. queue.enqueue(self.dummy_job_high)
  162. queue_index = QUEUES_MAP['high']
  163. response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'queued']))
  164. self.assertEqual(response.status_code, 200)
  165. self.assertIn('BackgroundTaskTestCase.dummy_job_high', str(response.content))
  166. def test_background_tasks_list_finished(self):
  167. queue = get_queue('default')
  168. job = queue.enqueue(self.dummy_job_default)
  169. queue_index = QUEUES_MAP['default']
  170. registry = FinishedJobRegistry(queue.name, queue.connection)
  171. registry.add(job, 2)
  172. response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'finished']))
  173. self.assertEqual(response.status_code, 200)
  174. self.assertIn('BackgroundTaskTestCase.dummy_job_default', str(response.content))
  175. def test_background_tasks_list_failed(self):
  176. queue = get_queue('default')
  177. job = queue.enqueue(self.dummy_job_default)
  178. queue_index = QUEUES_MAP['default']
  179. registry = FailedJobRegistry(queue.name, queue.connection)
  180. registry.add(job, 2)
  181. response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'failed']))
  182. self.assertEqual(response.status_code, 200)
  183. self.assertIn('BackgroundTaskTestCase.dummy_job_default', str(response.content))
  184. def test_background_tasks_scheduled(self):
  185. queue = get_queue('default')
  186. queue.enqueue_at(datetime.now(), self.dummy_job_default)
  187. queue_index = QUEUES_MAP['default']
  188. response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'scheduled']))
  189. self.assertEqual(response.status_code, 200)
  190. self.assertIn('BackgroundTaskTestCase.dummy_job_default', str(response.content))
  191. def test_background_tasks_list_deferred(self):
  192. queue = get_queue('default')
  193. job = queue.enqueue(self.dummy_job_default)
  194. queue_index = QUEUES_MAP['default']
  195. registry = DeferredJobRegistry(queue.name, queue.connection)
  196. registry.add(job, 2)
  197. response = self.client.get(reverse('core:background_task_list', args=[queue_index, 'deferred']))
  198. self.assertEqual(response.status_code, 200)
  199. self.assertIn('BackgroundTaskTestCase.dummy_job_default', str(response.content))
  200. def test_background_task(self):
  201. queue = get_queue('default')
  202. job = queue.enqueue(self.dummy_job_default)
  203. response = self.client.get(reverse('core:background_task', args=[job.id]))
  204. self.assertEqual(response.status_code, 200)
  205. self.assertIn('Background Tasks', str(response.content))
  206. self.assertIn(str(job.id), str(response.content))
  207. self.assertIn('Callable', str(response.content))
  208. self.assertIn('Meta', str(response.content))
  209. self.assertIn('Keyword Arguments', str(response.content))
  210. def test_background_task_delete(self):
  211. queue = get_queue('default')
  212. job = queue.enqueue(self.dummy_job_default)
  213. response = self.client.post(reverse('core:background_task_delete', args=[job.id]), {'confirm': True})
  214. self.assertEqual(response.status_code, 302)
  215. self.assertFalse(RQ_Job.exists(job.id, connection=queue.connection))
  216. self.assertNotIn(job.id, queue.job_ids)
  217. def test_background_task_requeue(self):
  218. queue = get_queue('default')
  219. # Enqueue & run a job that will fail
  220. job = queue.enqueue(self.dummy_job_failing)
  221. worker = get_worker('default')
  222. worker.work(burst=True)
  223. self.assertTrue(job.is_failed)
  224. # Re-enqueue the failed job and check that its status has been reset
  225. response = self.client.get(reverse('core:background_task_requeue', args=[job.id]))
  226. self.assertEqual(response.status_code, 302)
  227. self.assertFalse(job.is_failed)
  228. def test_background_task_enqueue(self):
  229. queue = get_queue('default')
  230. # Enqueue some jobs that each depends on its predecessor
  231. job = previous_job = None
  232. for _ in range(0, 3):
  233. job = queue.enqueue(self.dummy_job_default, depends_on=previous_job)
  234. previous_job = job
  235. # Check that the last job to be enqueued has a status of deferred
  236. self.assertIsNotNone(job)
  237. self.assertEqual(job.get_status(), JobStatus.DEFERRED)
  238. self.assertIsNone(job.enqueued_at)
  239. # Force-enqueue the deferred job
  240. response = self.client.get(reverse('core:background_task_enqueue', args=[job.id]))
  241. self.assertEqual(response.status_code, 302)
  242. # Check that job's status is updated correctly
  243. job = queue.fetch_job(job.id)
  244. self.assertEqual(job.get_status(), JobStatus.QUEUED)
  245. self.assertIsNotNone(job.enqueued_at)
  246. def test_background_task_stop(self):
  247. queue = get_queue('default')
  248. worker = get_worker('default')
  249. job = queue.enqueue(self.dummy_job_default)
  250. worker.prepare_job_execution(job)
  251. self.assertEqual(job.get_status(), JobStatus.STARTED)
  252. # Stop those jobs using the view
  253. started_job_registry = StartedJobRegistry(queue.name, connection=queue.connection)
  254. self.assertEqual(len(started_job_registry), 1)
  255. response = self.client.get(reverse('core:background_task_stop', args=[job.id]))
  256. self.assertEqual(response.status_code, 302)
  257. worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started
  258. self.assertEqual(len(started_job_registry), 0)
  259. canceled_job_registry = FailedJobRegistry(queue.name, connection=queue.connection)
  260. self.assertEqual(len(canceled_job_registry), 1)
  261. self.assertIn(job.id, canceled_job_registry)
  262. def test_worker_list(self):
  263. worker1 = get_worker('default', name=uuid.uuid4().hex)
  264. worker1.register_birth()
  265. worker2 = get_worker('high')
  266. worker2.register_birth()
  267. queue_index = QUEUES_MAP['default']
  268. response = self.client.get(reverse('core:worker_list', args=[queue_index]))
  269. self.assertEqual(response.status_code, 200)
  270. self.assertIn(str(worker1.name), str(response.content))
  271. self.assertNotIn(str(worker2.name), str(response.content))
  272. def test_worker(self):
  273. worker1 = get_worker('default', name=uuid.uuid4().hex)
  274. worker1.register_birth()
  275. response = self.client.get(reverse('core:worker', args=[worker1.name]))
  276. self.assertEqual(response.status_code, 200)
  277. self.assertIn(str(worker1.name), str(response.content))
  278. self.assertIn('Birth', str(response.content))
  279. self.assertIn('Total working time', str(response.content))