|
@@ -1,18 +1,19 @@
|
|
|
|
|
+import uuid
|
|
|
from unittest.mock import PropertyMock, patch
|
|
from unittest.mock import PropertyMock, patch
|
|
|
|
|
|
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.contrib.contenttypes.models import ContentType
|
|
|
from django.test import tag
|
|
from django.test import tag
|
|
|
from django.urls import reverse
|
|
from django.urls import reverse
|
|
|
|
|
|
|
|
-from core.choices import ManagedFileRootPathChoices
|
|
|
|
|
|
|
+from core.choices import JobStatusChoices, ManagedFileRootPathChoices
|
|
|
from core.events import *
|
|
from core.events import *
|
|
|
-from core.models import ObjectType
|
|
|
|
|
|
|
+from core.models import Job, ObjectType
|
|
|
from dcim.models import DeviceType, Manufacturer, Site
|
|
from dcim.models import DeviceType, Manufacturer, Site
|
|
|
from extras.choices import *
|
|
from extras.choices import *
|
|
|
from extras.models import *
|
|
from extras.models import *
|
|
|
from extras.scripts import BooleanVar, IntegerVar
|
|
from extras.scripts import BooleanVar, IntegerVar
|
|
|
from extras.scripts import Script as PythonClass
|
|
from extras.scripts import Script as PythonClass
|
|
|
-from users.models import Group, User
|
|
|
|
|
|
|
+from users.models import Group, ObjectPermission, User
|
|
|
from utilities.testing import TestCase, ViewTestCases
|
|
from utilities.testing import TestCase, ViewTestCases
|
|
|
|
|
|
|
|
|
|
|
|
@@ -1165,3 +1166,88 @@ class ScriptDefaultValuesTestCase(TestCase):
|
|
|
self.assertEqual(call_kwargs['data']['bool_default_false'], False)
|
|
self.assertEqual(call_kwargs['data']['bool_default_false'], False)
|
|
|
self.assertEqual(call_kwargs['data']['int_with_default'], 0)
|
|
self.assertEqual(call_kwargs['data']['int_with_default'], 0)
|
|
|
self.assertIsNone(call_kwargs['data']['int_without_default'])
|
|
self.assertIsNone(call_kwargs['data']['int_without_default'])
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class ScriptResultViewTestCase(TestCase):
|
|
|
|
|
+ SECRET_OUTPUT = 'my secret script output'
|
|
|
|
|
+
|
|
|
|
|
+ @classmethod
|
|
|
|
|
+ def setUpTestData(cls):
|
|
|
|
|
+ with patch.object(ScriptModule, 'sync_classes'):
|
|
|
|
|
+ module = ScriptModule.objects.create(
|
|
|
|
|
+ file_root=ManagedFileRootPathChoices.SCRIPTS,
|
|
|
|
|
+ file_path='test_script.py',
|
|
|
|
|
+ )
|
|
|
|
|
+ cls.allowed_script = Script.objects.create(
|
|
|
|
|
+ module=module, name='Allowed script', is_executable=True
|
|
|
|
|
+ )
|
|
|
|
|
+ cls.secret_script = Script.objects.create(
|
|
|
|
|
+ module=module, name='Secret script', is_executable=True
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ script_type = ObjectType.objects.get_for_model(Script)
|
|
|
|
|
+ cls.allowed_job = Job.objects.create(
|
|
|
|
|
+ object_type=script_type,
|
|
|
|
|
+ object_id=cls.allowed_script.pk,
|
|
|
|
|
+ name='allowed-job',
|
|
|
|
|
+ job_id=uuid.uuid4(),
|
|
|
|
|
+ status=JobStatusChoices.STATUS_COMPLETED,
|
|
|
|
|
+ data={'log': [], 'output': 'benign output'},
|
|
|
|
|
+ )
|
|
|
|
|
+ cls.secret_job = Job.objects.create(
|
|
|
|
|
+ object_type=script_type,
|
|
|
|
|
+ object_id=cls.secret_script.pk,
|
|
|
|
|
+ name='secret-job',
|
|
|
|
|
+ job_id=uuid.uuid4(),
|
|
|
|
|
+ status=JobStatusChoices.STATUS_COMPLETED,
|
|
|
|
|
+ data={'log': [], 'output': cls.SECRET_OUTPUT},
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ def test_get_without_view_job_permission_returns_404(self):
|
|
|
|
|
+ """
|
|
|
|
|
+ A user with extras.view_script but no core.view_job must not retrieve any job result
|
|
|
|
|
+ via ScriptResultView, even for the script whose object-level permission they hold.
|
|
|
|
|
+ """
|
|
|
|
|
+ self.add_permissions('extras.view_script')
|
|
|
|
|
+
|
|
|
|
|
+ url = reverse('extras:script_result', kwargs={'job_pk': self.allowed_job.pk})
|
|
|
|
|
+ self.assertHttpStatus(self.client.get(url), 404)
|
|
|
|
|
+
|
|
|
|
|
+ url = reverse('extras:script_result', kwargs={'job_pk': self.secret_job.pk})
|
|
|
|
|
+ self.assertHttpStatus(self.client.get(url), 404)
|
|
|
|
|
+
|
|
|
|
|
+ def test_get_export_output_without_view_job_permission_returns_404(self):
|
|
|
|
|
+ """
|
|
|
|
|
+ Regression for the PoC: the ?export=output path must not leak job.data['output']
|
|
|
|
|
+ when the user lacks core.view_job.
|
|
|
|
|
+ """
|
|
|
|
|
+ self.add_permissions('extras.view_script')
|
|
|
|
|
+
|
|
|
|
|
+ url = reverse('extras:script_result', kwargs={'job_pk': self.secret_job.pk})
|
|
|
|
|
+ response = self.client.get(url, {'export': 'output'})
|
|
|
|
|
+
|
|
|
|
|
+ self.assertHttpStatus(response, 404)
|
|
|
|
|
+ self.assertNotIn(self.SECRET_OUTPUT.encode(), response.content)
|
|
|
|
|
+
|
|
|
|
|
+ def test_get_with_constrained_view_job_permission(self):
|
|
|
|
|
+ """
|
|
|
|
|
+ With core.view_job constrained to the allowed job only, the user can fetch the allowed
|
|
|
|
|
+ result but the secret result is hidden (404).
|
|
|
|
|
+ """
|
|
|
|
|
+ self.add_permissions('extras.view_script')
|
|
|
|
|
+ obj_perm = ObjectPermission(
|
|
|
|
|
+ name='View allowed job only',
|
|
|
|
|
+ constraints={'pk': self.allowed_job.pk},
|
|
|
|
|
+ actions=['view'],
|
|
|
|
|
+ )
|
|
|
|
|
+ obj_perm.save()
|
|
|
|
|
+ obj_perm.users.add(self.user)
|
|
|
|
|
+ obj_perm.object_types.add(ObjectType.objects.get_for_model(Job))
|
|
|
|
|
+
|
|
|
|
|
+ url = reverse('extras:script_result', kwargs={'job_pk': self.allowed_job.pk})
|
|
|
|
|
+ self.assertHttpStatus(self.client.get(url), 200)
|
|
|
|
|
+
|
|
|
|
|
+ url = reverse('extras:script_result', kwargs={'job_pk': self.secret_job.pk})
|
|
|
|
|
+ response = self.client.get(url, {'export': 'output'})
|
|
|
|
|
+ self.assertHttpStatus(response, 404)
|
|
|
|
|
+ self.assertNotIn(self.SECRET_OUTPUT.encode(), response.content)
|