test_api.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. import uuid
  2. from django_rq import get_queue
  3. from django_rq.workers import get_worker
  4. from django.urls import reverse
  5. from django.utils import timezone
  6. from rq.job import Job as RQ_Job, JobStatus
  7. from rq.registry import FailedJobRegistry, StartedJobRegistry
  8. from rest_framework import status
  9. from users.models import Token, User
  10. from utilities.testing import APITestCase, APIViewTestCases, TestCase
  11. from utilities.testing.utils import disable_logging
  12. from ..models import *
  13. class AppTest(APITestCase):
  14. def test_root(self):
  15. url = reverse('core-api:api-root')
  16. response = self.client.get('{}?format=api'.format(url), **self.header)
  17. self.assertEqual(response.status_code, 200)
  18. class DataSourceTest(APIViewTestCases.APIViewTestCase):
  19. model = DataSource
  20. brief_fields = ['description', 'display', 'id', 'name', 'url']
  21. bulk_update_data = {
  22. 'enabled': False,
  23. 'description': 'foo bar baz',
  24. }
  25. @classmethod
  26. def setUpTestData(cls):
  27. data_sources = (
  28. DataSource(name='Data Source 1', type='local', source_url='file:///var/tmp/source1/'),
  29. DataSource(name='Data Source 2', type='local', source_url='file:///var/tmp/source2/'),
  30. DataSource(name='Data Source 3', type='local', source_url='file:///var/tmp/source3/'),
  31. )
  32. DataSource.objects.bulk_create(data_sources)
  33. cls.create_data = [
  34. {
  35. 'name': 'Data Source 4',
  36. 'type': 'git',
  37. 'source_url': 'https://example.com/git/source4'
  38. },
  39. {
  40. 'name': 'Data Source 5',
  41. 'type': 'git',
  42. 'source_url': 'https://example.com/git/source5'
  43. },
  44. {
  45. 'name': 'Data Source 6',
  46. 'type': 'git',
  47. 'source_url': 'https://example.com/git/source6'
  48. },
  49. ]
  50. class DataFileTest(
  51. APIViewTestCases.GetObjectViewTestCase,
  52. APIViewTestCases.ListObjectsViewTestCase,
  53. APIViewTestCases.GraphQLTestCase
  54. ):
  55. model = DataFile
  56. brief_fields = ['display', 'id', 'path', 'url']
  57. user_permissions = ('core.view_datasource', )
  58. @classmethod
  59. def setUpTestData(cls):
  60. datasource = DataSource.objects.create(
  61. name='Data Source 1',
  62. type='local',
  63. source_url='file:///var/tmp/source1/'
  64. )
  65. data_files = (
  66. DataFile(
  67. source=datasource,
  68. path='dir1/file1.txt',
  69. last_updated=timezone.now(),
  70. size=1000,
  71. hash='442da078f0111cbdf42f21903724f6597c692535f55bdfbbea758a1ae99ad9e1'
  72. ),
  73. DataFile(
  74. source=datasource,
  75. path='dir1/file2.txt',
  76. last_updated=timezone.now(),
  77. size=2000,
  78. hash='a78168c7c97115bafd96450ed03ea43acec495094c5caa28f0d02e20e3a76cc2'
  79. ),
  80. DataFile(
  81. source=datasource,
  82. path='dir1/file3.txt',
  83. last_updated=timezone.now(),
  84. size=3000,
  85. hash='12b8827a14c4d5a2f30b6c6e2b7983063988612391c6cbe8ee7493b59054827a'
  86. ),
  87. )
  88. DataFile.objects.bulk_create(data_files)
  89. class ObjectTypeTest(APITestCase):
  90. def test_list_objects(self):
  91. object_type_count = ObjectType.objects.count()
  92. response = self.client.get(reverse('core-api:objecttype-list'), **self.header)
  93. self.assertHttpStatus(response, status.HTTP_200_OK)
  94. self.assertEqual(response.data['count'], object_type_count)
  95. def test_get_object(self):
  96. object_type = ObjectType.objects.first()
  97. url = reverse('core-api:objecttype-detail', kwargs={'pk': object_type.pk})
  98. self.assertHttpStatus(self.client.get(url, **self.header), status.HTTP_200_OK)
  99. class BackgroundTaskTestCase(TestCase):
  100. user_permissions = ()
  101. @staticmethod
  102. def dummy_job_default():
  103. return "Job finished"
  104. @staticmethod
  105. def dummy_job_failing():
  106. raise Exception("Job failed")
  107. def setUp(self):
  108. """
  109. Create a user and token for API calls.
  110. """
  111. # Create the test user and assign permissions
  112. self.user = User.objects.create_user(username='testuser', is_active=True)
  113. self.token = Token.objects.create(user=self.user)
  114. self.header = {'HTTP_AUTHORIZATION': f'Token {self.token.key}'}
  115. # Clear all queues prior to running each test
  116. get_queue('default').connection.flushall()
  117. get_queue('high').connection.flushall()
  118. get_queue('low').connection.flushall()
  119. def test_background_queue_list(self):
  120. url = reverse('core-api:rqqueue-list')
  121. # Attempt to load view without permission
  122. response = self.client.get(url, **self.header)
  123. self.assertEqual(response.status_code, 403)
  124. # Load view with permission
  125. self.user.is_superuser = True
  126. self.user.save()
  127. response = self.client.get(url, **self.header)
  128. self.assertEqual(response.status_code, 200)
  129. self.assertIn('default', str(response.content))
  130. self.assertIn('high', str(response.content))
  131. self.assertIn('low', str(response.content))
  132. def test_background_queue(self):
  133. url = reverse('core-api:rqqueue-detail', args=['default'])
  134. # Attempt to load view without permission
  135. response = self.client.get(url, **self.header)
  136. self.assertEqual(response.status_code, 403)
  137. # Load view with permission
  138. self.user.is_superuser = True
  139. self.user.save()
  140. response = self.client.get(url, **self.header)
  141. self.assertEqual(response.status_code, 200)
  142. self.assertIn('default', str(response.content))
  143. self.assertIn('oldest_job_timestamp', str(response.content))
  144. self.assertIn('scheduled_jobs', str(response.content))
  145. def test_background_task_list(self):
  146. queue = get_queue('default')
  147. queue.enqueue(self.dummy_job_default)
  148. url = reverse('core-api:rqtask-list')
  149. # Attempt to load view without permission
  150. response = self.client.get(url, **self.header)
  151. self.assertEqual(response.status_code, 403)
  152. # Load view with permission
  153. self.user.is_superuser = True
  154. self.user.save()
  155. response = self.client.get(url, **self.header)
  156. self.assertEqual(response.status_code, 200)
  157. self.assertIn('origin', str(response.content))
  158. self.assertIn('core.tests.test_api.BackgroundTaskTestCase.dummy_job_default()', str(response.content))
  159. def test_background_task(self):
  160. queue = get_queue('default')
  161. job = queue.enqueue(self.dummy_job_default)
  162. url = reverse('core-api:rqtask-detail', args=[job.id])
  163. # Attempt to load view without permission
  164. response = self.client.get(url, **self.header)
  165. self.assertEqual(response.status_code, 403)
  166. # Load view with permission
  167. self.user.is_superuser = True
  168. self.user.save()
  169. response = self.client.get(url, **self.header)
  170. self.assertEqual(response.status_code, 200)
  171. self.assertIn(str(job.id), str(response.content))
  172. self.assertIn('origin', str(response.content))
  173. self.assertIn('meta', str(response.content))
  174. self.assertIn('kwargs', str(response.content))
  175. def test_background_task_delete(self):
  176. queue = get_queue('default')
  177. job = queue.enqueue(self.dummy_job_default)
  178. url = reverse('core-api:rqtask-delete', args=[job.id])
  179. # Attempt to load view without permission
  180. response = self.client.get(url, **self.header)
  181. self.assertEqual(response.status_code, 403)
  182. # Load view with permission
  183. self.user.is_superuser = True
  184. self.user.save()
  185. response = self.client.post(url, **self.header)
  186. self.assertEqual(response.status_code, 200)
  187. self.assertFalse(RQ_Job.exists(job.id, connection=queue.connection))
  188. queue = get_queue('default')
  189. self.assertNotIn(job.id, queue.job_ids)
  190. def test_background_task_requeue(self):
  191. # Enqueue & run a job that will fail
  192. queue = get_queue('default')
  193. job = queue.enqueue(self.dummy_job_failing)
  194. worker = get_worker('default')
  195. with disable_logging():
  196. worker.work(burst=True)
  197. self.assertTrue(job.is_failed)
  198. url = reverse('core-api:rqtask-requeue', args=[job.id])
  199. # Attempt to requeue the job without permission
  200. response = self.client.post(url, **self.header)
  201. self.assertEqual(response.status_code, 403)
  202. # Re-enqueue the failed job and check that its status has been reset
  203. self.user.is_superuser = True
  204. self.user.save()
  205. response = self.client.post(url, **self.header)
  206. self.assertEqual(response.status_code, 200)
  207. job = RQ_Job.fetch(job.id, queue.connection)
  208. self.assertFalse(job.is_failed)
  209. def test_background_task_enqueue(self):
  210. # Enqueue some jobs that each depends on its predecessor
  211. queue = get_queue('default')
  212. job = previous_job = None
  213. for _ in range(0, 3):
  214. job = queue.enqueue(self.dummy_job_default, depends_on=previous_job)
  215. previous_job = job
  216. url = reverse('core-api:rqtask-enqueue', args=[job.id])
  217. # Check that the last job to be enqueued has a status of deferred
  218. self.assertIsNotNone(job)
  219. self.assertEqual(job.get_status(), JobStatus.DEFERRED)
  220. self.assertIsNone(job.enqueued_at)
  221. # Attempt to force-enqueue the job without permission
  222. response = self.client.post(url, **self.header)
  223. self.assertEqual(response.status_code, 403)
  224. # Force-enqueue the deferred job
  225. self.user.is_superuser = True
  226. self.user.save()
  227. response = self.client.post(url, **self.header)
  228. self.assertEqual(response.status_code, 200)
  229. # Check that job's status is updated correctly
  230. job = queue.fetch_job(job.id)
  231. self.assertEqual(job.get_status(), JobStatus.QUEUED)
  232. self.assertIsNotNone(job.enqueued_at)
  233. def test_background_task_stop(self):
  234. queue = get_queue('default')
  235. worker = get_worker('default')
  236. job = queue.enqueue(self.dummy_job_default)
  237. worker.prepare_job_execution(job)
  238. url = reverse('core-api:rqtask-stop', args=[job.id])
  239. self.assertEqual(job.get_status(), JobStatus.STARTED)
  240. # Attempt to stop the task without permission
  241. response = self.client.post(url, **self.header)
  242. self.assertEqual(response.status_code, 403)
  243. # Stop the task
  244. self.user.is_superuser = True
  245. self.user.save()
  246. response = self.client.post(url, **self.header)
  247. self.assertEqual(response.status_code, 200)
  248. with disable_logging():
  249. worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started
  250. started_job_registry = StartedJobRegistry(queue.name, connection=queue.connection)
  251. self.assertEqual(len(started_job_registry), 0)
  252. # Verify that the task was cancelled
  253. canceled_job_registry = FailedJobRegistry(queue.name, connection=queue.connection)
  254. self.assertEqual(len(canceled_job_registry), 1)
  255. self.assertIn(job.id, canceled_job_registry)
  256. def test_worker_list(self):
  257. worker1 = get_worker('default', name=uuid.uuid4().hex)
  258. worker1.register_birth()
  259. worker2 = get_worker('high')
  260. worker2.register_birth()
  261. url = reverse('core-api:rqworker-list')
  262. # Attempt to fetch the worker list without permission
  263. response = self.client.get(url, **self.header)
  264. self.assertEqual(response.status_code, 403)
  265. # Fetch the worker list
  266. self.user.is_superuser = True
  267. self.user.save()
  268. response = self.client.get(url, **self.header)
  269. self.assertEqual(response.status_code, 200)
  270. self.assertIn(str(worker1.name), str(response.content))
  271. def test_worker(self):
  272. worker1 = get_worker('default', name=uuid.uuid4().hex)
  273. worker1.register_birth()
  274. url = reverse('core-api:rqworker-detail', args=[worker1.name])
  275. # Attempt to fetch a worker without permission
  276. response = self.client.get(url, **self.header)
  277. self.assertEqual(response.status_code, 403)
  278. # Fetch the worker
  279. self.user.is_superuser = True
  280. self.user.save()
  281. response = self.client.get(url, **self.header)
  282. self.assertEqual(response.status_code, 200)
  283. self.assertIn(str(worker1.name), str(response.content))
  284. self.assertIn('birth_date', str(response.content))
  285. self.assertIn('total_working_time', str(response.content))