test_api.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. import uuid
  2. from django_rq import get_queue
  3. from django_rq.workers import get_worker
  4. from django.urls import reverse
  5. from django.utils import timezone
  6. from rq.job import Job as RQ_Job, JobStatus
  7. from rq.registry import FailedJobRegistry, StartedJobRegistry
  8. from rest_framework import status
  9. from users.models import Token, User
  10. from utilities.testing import APITestCase, APIViewTestCases, TestCase
  11. from utilities.testing.utils import disable_logging
  12. from ..models import *
  13. class AppTest(APITestCase):
  14. def test_root(self):
  15. url = reverse('core-api:api-root')
  16. response = self.client.get('{}?format=api'.format(url), **self.header)
  17. self.assertEqual(response.status_code, 200)
  18. class DataSourceTest(APIViewTestCases.APIViewTestCase):
  19. model = DataSource
  20. brief_fields = ['description', 'display', 'id', 'name', 'url']
  21. bulk_update_data = {
  22. 'enabled': False,
  23. 'description': 'foo bar baz',
  24. }
  25. @classmethod
  26. def setUpTestData(cls):
  27. data_sources = (
  28. DataSource(name='Data Source 1', type='local', source_url='file:///var/tmp/source1/'),
  29. DataSource(name='Data Source 2', type='local', source_url='file:///var/tmp/source2/'),
  30. DataSource(name='Data Source 3', type='local', source_url='file:///var/tmp/source3/'),
  31. )
  32. DataSource.objects.bulk_create(data_sources)
  33. cls.create_data = [
  34. {
  35. 'name': 'Data Source 4',
  36. 'type': 'git',
  37. 'source_url': 'https://example.com/git/source4'
  38. },
  39. {
  40. 'name': 'Data Source 5',
  41. 'type': 'git',
  42. 'source_url': 'https://example.com/git/source5'
  43. },
  44. {
  45. 'name': 'Data Source 6',
  46. 'type': 'git',
  47. 'source_url': 'https://example.com/git/source6'
  48. },
  49. ]
  50. class DataFileTest(
  51. APIViewTestCases.GetObjectViewTestCase,
  52. APIViewTestCases.ListObjectsViewTestCase,
  53. APIViewTestCases.GraphQLTestCase
  54. ):
  55. model = DataFile
  56. brief_fields = ['display', 'id', 'path', 'url']
  57. user_permissions = ('core.view_datasource', )
  58. @classmethod
  59. def setUpTestData(cls):
  60. datasource = DataSource.objects.create(
  61. name='Data Source 1',
  62. type='local',
  63. source_url='file:///var/tmp/source1/'
  64. )
  65. data_files = (
  66. DataFile(
  67. source=datasource,
  68. path='dir1/file1.txt',
  69. last_updated=timezone.now(),
  70. size=1000,
  71. hash='442da078f0111cbdf42f21903724f6597c692535f55bdfbbea758a1ae99ad9e1'
  72. ),
  73. DataFile(
  74. source=datasource,
  75. path='dir1/file2.txt',
  76. last_updated=timezone.now(),
  77. size=2000,
  78. hash='a78168c7c97115bafd96450ed03ea43acec495094c5caa28f0d02e20e3a76cc2'
  79. ),
  80. DataFile(
  81. source=datasource,
  82. path='dir1/file3.txt',
  83. last_updated=timezone.now(),
  84. size=3000,
  85. hash='12b8827a14c4d5a2f30b6c6e2b7983063988612391c6cbe8ee7493b59054827a'
  86. ),
  87. )
  88. DataFile.objects.bulk_create(data_files)
  89. class ObjectTypeTest(APITestCase):
  90. def test_list_objects(self):
  91. object_type_count = ObjectType.objects.count()
  92. response = self.client.get(reverse('extras-api:objecttype-list'), **self.header)
  93. self.assertHttpStatus(response, status.HTTP_200_OK)
  94. self.assertEqual(response.data['count'], object_type_count)
  95. def test_get_object(self):
  96. object_type = ObjectType.objects.first()
  97. url = reverse('extras-api:objecttype-detail', kwargs={'pk': object_type.pk})
  98. self.assertHttpStatus(self.client.get(url, **self.header), status.HTTP_200_OK)
  99. class BackgroundTaskTestCase(TestCase):
  100. user_permissions = ()
  101. @staticmethod
  102. def dummy_job_default():
  103. return "Job finished"
  104. @staticmethod
  105. def dummy_job_failing():
  106. raise Exception("Job failed")
  107. def setUp(self):
  108. """
  109. Create a user and token for API calls.
  110. """
  111. # Create the test user and assign permissions
  112. self.user = User.objects.create_user(username='testuser')
  113. self.user.is_staff = True
  114. self.user.is_active = True
  115. self.user.save()
  116. self.token = Token.objects.create(user=self.user)
  117. self.header = {'HTTP_AUTHORIZATION': f'Token {self.token.key}'}
  118. # Clear all queues prior to running each test
  119. get_queue('default').connection.flushall()
  120. get_queue('high').connection.flushall()
  121. get_queue('low').connection.flushall()
  122. def test_background_queue_list(self):
  123. url = reverse('core-api:rqqueue-list')
  124. # Attempt to load view without permission
  125. self.user.is_staff = False
  126. self.user.save()
  127. response = self.client.get(url, **self.header)
  128. self.assertEqual(response.status_code, 403)
  129. # Load view with permission
  130. self.user.is_staff = True
  131. self.user.save()
  132. response = self.client.get(url, **self.header)
  133. self.assertEqual(response.status_code, 200)
  134. self.assertIn('default', str(response.content))
  135. self.assertIn('high', str(response.content))
  136. self.assertIn('low', str(response.content))
  137. def test_background_queue(self):
  138. response = self.client.get(reverse('core-api:rqqueue-detail', args=['default']), **self.header)
  139. self.assertEqual(response.status_code, 200)
  140. self.assertIn('default', str(response.content))
  141. self.assertIn('oldest_job_timestamp', str(response.content))
  142. self.assertIn('scheduled_jobs', str(response.content))
  143. def test_background_task_list(self):
  144. queue = get_queue('default')
  145. queue.enqueue(self.dummy_job_default)
  146. response = self.client.get(reverse('core-api:rqtask-list'), **self.header)
  147. self.assertEqual(response.status_code, 200)
  148. self.assertIn('origin', str(response.content))
  149. self.assertIn('core.tests.test_api.BackgroundTaskTestCase.dummy_job_default()', str(response.content))
  150. def test_background_task(self):
  151. queue = get_queue('default')
  152. job = queue.enqueue(self.dummy_job_default)
  153. response = self.client.get(reverse('core-api:rqtask-detail', args=[job.id]), **self.header)
  154. self.assertEqual(response.status_code, 200)
  155. self.assertIn(str(job.id), str(response.content))
  156. self.assertIn('origin', str(response.content))
  157. self.assertIn('meta', str(response.content))
  158. self.assertIn('kwargs', str(response.content))
  159. def test_background_task_delete(self):
  160. queue = get_queue('default')
  161. job = queue.enqueue(self.dummy_job_default)
  162. response = self.client.post(reverse('core-api:rqtask-delete', args=[job.id]), **self.header)
  163. self.assertEqual(response.status_code, 200)
  164. self.assertFalse(RQ_Job.exists(job.id, connection=queue.connection))
  165. queue = get_queue('default')
  166. self.assertNotIn(job.id, queue.job_ids)
  167. def test_background_task_requeue(self):
  168. queue = get_queue('default')
  169. # Enqueue & run a job that will fail
  170. job = queue.enqueue(self.dummy_job_failing)
  171. worker = get_worker('default')
  172. with disable_logging():
  173. worker.work(burst=True)
  174. self.assertTrue(job.is_failed)
  175. # Re-enqueue the failed job and check that its status has been reset
  176. response = self.client.post(reverse('core-api:rqtask-requeue', args=[job.id]), **self.header)
  177. self.assertEqual(response.status_code, 200)
  178. job = RQ_Job.fetch(job.id, queue.connection)
  179. self.assertFalse(job.is_failed)
  180. def test_background_task_enqueue(self):
  181. queue = get_queue('default')
  182. # Enqueue some jobs that each depends on its predecessor
  183. job = previous_job = None
  184. for _ in range(0, 3):
  185. job = queue.enqueue(self.dummy_job_default, depends_on=previous_job)
  186. previous_job = job
  187. # Check that the last job to be enqueued has a status of deferred
  188. self.assertIsNotNone(job)
  189. self.assertEqual(job.get_status(), JobStatus.DEFERRED)
  190. self.assertIsNone(job.enqueued_at)
  191. # Force-enqueue the deferred job
  192. response = self.client.post(reverse('core-api:rqtask-enqueue', args=[job.id]), **self.header)
  193. self.assertEqual(response.status_code, 200)
  194. # Check that job's status is updated correctly
  195. job = queue.fetch_job(job.id)
  196. self.assertEqual(job.get_status(), JobStatus.QUEUED)
  197. self.assertIsNotNone(job.enqueued_at)
  198. def test_background_task_stop(self):
  199. queue = get_queue('default')
  200. worker = get_worker('default')
  201. job = queue.enqueue(self.dummy_job_default)
  202. worker.prepare_job_execution(job)
  203. self.assertEqual(job.get_status(), JobStatus.STARTED)
  204. response = self.client.post(reverse('core-api:rqtask-stop', args=[job.id]), **self.header)
  205. self.assertEqual(response.status_code, 200)
  206. with disable_logging():
  207. worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started
  208. started_job_registry = StartedJobRegistry(queue.name, connection=queue.connection)
  209. self.assertEqual(len(started_job_registry), 0)
  210. canceled_job_registry = FailedJobRegistry(queue.name, connection=queue.connection)
  211. self.assertEqual(len(canceled_job_registry), 1)
  212. self.assertIn(job.id, canceled_job_registry)
  213. def test_worker_list(self):
  214. worker1 = get_worker('default', name=uuid.uuid4().hex)
  215. worker1.register_birth()
  216. worker2 = get_worker('high')
  217. worker2.register_birth()
  218. response = self.client.get(reverse('core-api:rqworker-list'), **self.header)
  219. self.assertEqual(response.status_code, 200)
  220. self.assertIn(str(worker1.name), str(response.content))
  221. def test_worker(self):
  222. worker1 = get_worker('default', name=uuid.uuid4().hex)
  223. worker1.register_birth()
  224. response = self.client.get(reverse('core-api:rqworker-detail', args=[worker1.name]), **self.header)
  225. self.assertEqual(response.status_code, 200)
  226. self.assertIn(str(worker1.name), str(response.content))
  227. self.assertIn('birth_date', str(response.content))
  228. self.assertIn('total_working_time', str(response.content))