| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344 |
- import uuid
- from django_rq import get_queue
- from django_rq.workers import get_worker
- from django.urls import reverse
- from django.utils import timezone
- from rq.job import Job as RQ_Job, JobStatus
- from rq.registry import FailedJobRegistry, StartedJobRegistry
- from rest_framework import status
- from users.models import Token, User
- from utilities.testing import APITestCase, APIViewTestCases, TestCase
- from utilities.testing.utils import disable_logging
- from ..models import *
- class AppTest(APITestCase):
- def test_root(self):
- url = reverse('core-api:api-root')
- response = self.client.get('{}?format=api'.format(url), **self.header)
- self.assertEqual(response.status_code, 200)
- class DataSourceTest(APIViewTestCases.APIViewTestCase):
- model = DataSource
- brief_fields = ['description', 'display', 'id', 'name', 'url']
- bulk_update_data = {
- 'enabled': False,
- 'description': 'foo bar baz',
- }
- @classmethod
- def setUpTestData(cls):
- data_sources = (
- DataSource(name='Data Source 1', type='local', source_url='file:///var/tmp/source1/'),
- DataSource(name='Data Source 2', type='local', source_url='file:///var/tmp/source2/'),
- DataSource(name='Data Source 3', type='local', source_url='file:///var/tmp/source3/'),
- )
- DataSource.objects.bulk_create(data_sources)
- cls.create_data = [
- {
- 'name': 'Data Source 4',
- 'type': 'git',
- 'source_url': 'https://example.com/git/source4'
- },
- {
- 'name': 'Data Source 5',
- 'type': 'git',
- 'source_url': 'https://example.com/git/source5'
- },
- {
- 'name': 'Data Source 6',
- 'type': 'git',
- 'source_url': 'https://example.com/git/source6'
- },
- ]
- class DataFileTest(
- APIViewTestCases.GetObjectViewTestCase,
- APIViewTestCases.ListObjectsViewTestCase,
- APIViewTestCases.GraphQLTestCase
- ):
- model = DataFile
- brief_fields = ['display', 'id', 'path', 'url']
- user_permissions = ('core.view_datasource', )
- @classmethod
- def setUpTestData(cls):
- datasource = DataSource.objects.create(
- name='Data Source 1',
- type='local',
- source_url='file:///var/tmp/source1/'
- )
- data_files = (
- DataFile(
- source=datasource,
- path='dir1/file1.txt',
- last_updated=timezone.now(),
- size=1000,
- hash='442da078f0111cbdf42f21903724f6597c692535f55bdfbbea758a1ae99ad9e1'
- ),
- DataFile(
- source=datasource,
- path='dir1/file2.txt',
- last_updated=timezone.now(),
- size=2000,
- hash='a78168c7c97115bafd96450ed03ea43acec495094c5caa28f0d02e20e3a76cc2'
- ),
- DataFile(
- source=datasource,
- path='dir1/file3.txt',
- last_updated=timezone.now(),
- size=3000,
- hash='12b8827a14c4d5a2f30b6c6e2b7983063988612391c6cbe8ee7493b59054827a'
- ),
- )
- DataFile.objects.bulk_create(data_files)
- class ObjectTypeTest(APITestCase):
- def test_list_objects(self):
- object_type_count = ObjectType.objects.count()
- response = self.client.get(reverse('core-api:objecttype-list'), **self.header)
- self.assertHttpStatus(response, status.HTTP_200_OK)
- self.assertEqual(response.data['count'], object_type_count)
- def test_get_object(self):
- object_type = ObjectType.objects.first()
- url = reverse('core-api:objecttype-detail', kwargs={'pk': object_type.pk})
- self.assertHttpStatus(self.client.get(url, **self.header), status.HTTP_200_OK)
- class BackgroundTaskTestCase(TestCase):
- user_permissions = ()
- @staticmethod
- def dummy_job_default():
- return "Job finished"
- @staticmethod
- def dummy_job_failing():
- raise Exception("Job failed")
- def setUp(self):
- """
- Create a user and token for API calls.
- """
- # Create the test user and assign permissions
- self.user = User.objects.create_user(username='testuser', is_active=True)
- self.token = Token.objects.create(user=self.user)
- self.header = {'HTTP_AUTHORIZATION': f'Token {self.token.key}'}
- # Clear all queues prior to running each test
- get_queue('default').connection.flushall()
- get_queue('high').connection.flushall()
- get_queue('low').connection.flushall()
- def test_background_queue_list(self):
- url = reverse('core-api:rqqueue-list')
- # Attempt to load view without permission
- response = self.client.get(url, **self.header)
- self.assertEqual(response.status_code, 403)
- # Load view with permission
- self.user.is_superuser = True
- self.user.save()
- response = self.client.get(url, **self.header)
- self.assertEqual(response.status_code, 200)
- self.assertIn('default', str(response.content))
- self.assertIn('high', str(response.content))
- self.assertIn('low', str(response.content))
- def test_background_queue(self):
- url = reverse('core-api:rqqueue-detail', args=['default'])
- # Attempt to load view without permission
- response = self.client.get(url, **self.header)
- self.assertEqual(response.status_code, 403)
- # Load view with permission
- self.user.is_superuser = True
- self.user.save()
- response = self.client.get(url, **self.header)
- self.assertEqual(response.status_code, 200)
- self.assertIn('default', str(response.content))
- self.assertIn('oldest_job_timestamp', str(response.content))
- self.assertIn('scheduled_jobs', str(response.content))
- def test_background_task_list(self):
- queue = get_queue('default')
- queue.enqueue(self.dummy_job_default)
- url = reverse('core-api:rqtask-list')
- # Attempt to load view without permission
- response = self.client.get(url, **self.header)
- self.assertEqual(response.status_code, 403)
- # Load view with permission
- self.user.is_superuser = True
- self.user.save()
- response = self.client.get(url, **self.header)
- self.assertEqual(response.status_code, 200)
- self.assertIn('origin', str(response.content))
- self.assertIn('core.tests.test_api.BackgroundTaskTestCase.dummy_job_default()', str(response.content))
- def test_background_task(self):
- queue = get_queue('default')
- job = queue.enqueue(self.dummy_job_default)
- url = reverse('core-api:rqtask-detail', args=[job.id])
- # Attempt to load view without permission
- response = self.client.get(url, **self.header)
- self.assertEqual(response.status_code, 403)
- # Load view with permission
- self.user.is_superuser = True
- self.user.save()
- response = self.client.get(url, **self.header)
- self.assertEqual(response.status_code, 200)
- self.assertIn(str(job.id), str(response.content))
- self.assertIn('origin', str(response.content))
- self.assertIn('meta', str(response.content))
- self.assertIn('kwargs', str(response.content))
- def test_background_task_delete(self):
- queue = get_queue('default')
- job = queue.enqueue(self.dummy_job_default)
- url = reverse('core-api:rqtask-delete', args=[job.id])
- # Attempt to load view without permission
- response = self.client.get(url, **self.header)
- self.assertEqual(response.status_code, 403)
- # Load view with permission
- self.user.is_superuser = True
- self.user.save()
- response = self.client.post(url, **self.header)
- self.assertEqual(response.status_code, 200)
- self.assertFalse(RQ_Job.exists(job.id, connection=queue.connection))
- queue = get_queue('default')
- self.assertNotIn(job.id, queue.job_ids)
- def test_background_task_requeue(self):
- # Enqueue & run a job that will fail
- queue = get_queue('default')
- job = queue.enqueue(self.dummy_job_failing)
- worker = get_worker('default')
- with disable_logging():
- worker.work(burst=True)
- self.assertTrue(job.is_failed)
- url = reverse('core-api:rqtask-requeue', args=[job.id])
- # Attempt to requeue the job without permission
- response = self.client.post(url, **self.header)
- self.assertEqual(response.status_code, 403)
- # Re-enqueue the failed job and check that its status has been reset
- self.user.is_superuser = True
- self.user.save()
- response = self.client.post(url, **self.header)
- self.assertEqual(response.status_code, 200)
- job = RQ_Job.fetch(job.id, queue.connection)
- self.assertFalse(job.is_failed)
- def test_background_task_enqueue(self):
- # Enqueue some jobs that each depends on its predecessor
- queue = get_queue('default')
- job = previous_job = None
- for _ in range(0, 3):
- job = queue.enqueue(self.dummy_job_default, depends_on=previous_job)
- previous_job = job
- url = reverse('core-api:rqtask-enqueue', args=[job.id])
- # Check that the last job to be enqueued has a status of deferred
- self.assertIsNotNone(job)
- self.assertEqual(job.get_status(), JobStatus.DEFERRED)
- self.assertIsNone(job.enqueued_at)
- # Attempt to force-enqueue the job without permission
- response = self.client.post(url, **self.header)
- self.assertEqual(response.status_code, 403)
- # Force-enqueue the deferred job
- self.user.is_superuser = True
- self.user.save()
- response = self.client.post(url, **self.header)
- self.assertEqual(response.status_code, 200)
- # Check that job's status is updated correctly
- job = queue.fetch_job(job.id)
- self.assertEqual(job.get_status(), JobStatus.QUEUED)
- self.assertIsNotNone(job.enqueued_at)
- def test_background_task_stop(self):
- queue = get_queue('default')
- worker = get_worker('default')
- job = queue.enqueue(self.dummy_job_default)
- worker.prepare_job_execution(job)
- url = reverse('core-api:rqtask-stop', args=[job.id])
- self.assertEqual(job.get_status(), JobStatus.STARTED)
- # Attempt to stop the task without permission
- response = self.client.post(url, **self.header)
- self.assertEqual(response.status_code, 403)
- # Stop the task
- self.user.is_superuser = True
- self.user.save()
- response = self.client.post(url, **self.header)
- self.assertEqual(response.status_code, 200)
- with disable_logging():
- worker.monitor_work_horse(job, queue) # Sets the job as Failed and removes from Started
- started_job_registry = StartedJobRegistry(queue.name, connection=queue.connection)
- self.assertEqual(len(started_job_registry), 0)
- # Verify that the task was cancelled
- canceled_job_registry = FailedJobRegistry(queue.name, connection=queue.connection)
- self.assertEqual(len(canceled_job_registry), 1)
- self.assertIn(job.id, canceled_job_registry)
- def test_worker_list(self):
- worker1 = get_worker('default', name=uuid.uuid4().hex)
- worker1.register_birth()
- worker2 = get_worker('high')
- worker2.register_birth()
- url = reverse('core-api:rqworker-list')
- # Attempt to fetch the worker list without permission
- response = self.client.get(url, **self.header)
- self.assertEqual(response.status_code, 403)
- # Fetch the worker list
- self.user.is_superuser = True
- self.user.save()
- response = self.client.get(url, **self.header)
- self.assertEqual(response.status_code, 200)
- self.assertIn(str(worker1.name), str(response.content))
- def test_worker(self):
- worker1 = get_worker('default', name=uuid.uuid4().hex)
- worker1.register_birth()
- url = reverse('core-api:rqworker-detail', args=[worker1.name])
- # Attempt to fetch a worker without permission
- response = self.client.get(url, **self.header)
- self.assertEqual(response.status_code, 403)
- # Fetch the worker
- self.user.is_superuser = True
- self.user.save()
- response = self.client.get(url, **self.header)
- self.assertEqual(response.status_code, 200)
- self.assertIn(str(worker1.name), str(response.content))
- self.assertIn('birth_date', str(response.content))
- self.assertIn('total_working_time', str(response.content))
|