2
0
Эх сурвалжийг харах

Merge pull request #22348 from netbox-community/22180-custom-script-data-source-bypass

Closes #22180: Validate scripts added via a data source
bctiemann 1 сар өмнө
parent
commit
5b5e821fbb

+ 9 - 0
netbox/extras/forms/scripts.py

@@ -90,6 +90,15 @@ class ScriptFileForm(ManagedFileForm):
                 raise forms.ValidationError(
                     _("Error loading script: {error}").format(error=e)
                 )
+        elif data_file := self.cleaned_data.get('data_file'):
+            # Validate scripts synced from a data source as well, to avoid creating a broken
+            # script module that cannot be loaded or corrected
+            try:
+                validate_script_content(data_file.data, data_file.path)
+            except Exception as e:
+                raise forms.ValidationError(
+                    _("Error loading script: {error}").format(error=e)
+                )
 
         return self.cleaned_data
 

+ 129 - 2
netbox/extras/tests/test_forms.py

@@ -1,12 +1,19 @@
+import tempfile
+from pathlib import Path
+
+from django.core.exceptions import NON_FIELD_ERRORS
+from django.core.files.uploadedfile import SimpleUploadedFile
 from django.test import TestCase
 
-from core.models import ObjectType
+from core.choices import ManagedFileRootPathChoices
+from core.models import DataSource, ObjectType
 from dcim.forms import SiteForm
 from dcim.models import Site
 from extras.choices import CustomFieldTypeChoices
 from extras.forms import SavedFilterForm
 from extras.forms.model_forms import CustomFieldChoiceSetForm
-from extras.models import CustomField, CustomFieldChoiceSet
+from extras.forms.scripts import ScriptFileForm
+from extras.models import CustomField, CustomFieldChoiceSet, ScriptModule
 
 
 class CustomFieldModelFormTestCase(TestCase):
@@ -161,3 +168,123 @@ class SavedFilterFormTestCase(TestCase):
         })
         self.assertTrue(form.is_valid())
         form.save()
+
+
+class ScriptFileFormTestCase(TestCase):
+    """
+    Scripts added via a Data Source must be validated the same way uploaded scripts are (see #22180).
+    """
+    BROKEN_SCRIPT = (
+        "from extras.scripts import Script\n"
+        "import imnotarealmoduleicreateerrors\n\n\n"
+        "class BrokenScript(Script):\n"
+        "    def run(self, data, commit):\n"
+        "        pass\n"
+    )
+    VALID_SCRIPT = (
+        "from extras.scripts import Script\n\n\n"
+        "class FirstScript(Script):\n"
+        "    def run(self, data, commit):\n"
+        "        pass\n\n\n"
+        "class SecondScript(Script):\n"
+        "    def run(self, data, commit):\n"
+        "        pass\n"
+    )
+
+    @staticmethod
+    def _write(scripts_dir, filename, content):
+        with open(scripts_dir / filename, 'w') as f:
+            f.write(content)
+
+    @staticmethod
+    def _new_module():
+        # Mirror ScriptModuleCreateView.alter_object(), which sets file_root before validation.
+        return ScriptModule(file_root=ManagedFileRootPathChoices.SCRIPTS)
+
+    def _sync_source(self, name, **files):
+        """
+        Create a local DataSource over a temp dir populated with the given {filename: content} files,
+        sync it, and return the DataSource.
+        """
+        temp_dir = tempfile.TemporaryDirectory()
+        self.addCleanup(temp_dir.cleanup)
+        scripts_dir = Path(temp_dir.name) / "scripts"
+        scripts_dir.mkdir(parents=True, exist_ok=True)
+        for filename, content in files.items():
+            self._write(scripts_dir, filename, content)
+
+        data_source = DataSource(name=name, type="local", source_url=str(scripts_dir))
+        data_source.save()
+        data_source.sync()
+        return data_source
+
+    def test_broken_script_via_data_file_is_rejected(self):
+        """A script that fails to import via a data_file must be rejected, and no ScriptModule created."""
+        data_source = self._sync_source("Broken", **{'broken.py': self.BROKEN_SCRIPT})
+        data_file = data_source.datafiles.get(path__endswith='broken.py')
+
+        form = ScriptFileForm(data={'data_file': data_file.pk}, instance=self._new_module())
+
+        self.assertFalse(form.is_valid())
+        self.assertIn(NON_FIELD_ERRORS, form.errors)
+        self.assertEqual(ScriptModule.objects.count(), 0)
+
+    def test_valid_script_via_data_file_is_accepted(self):
+        """A valid script via a data_file passes validation and its Script classes are discovered on save."""
+        data_source = self._sync_source("Valid", **{'valid.py': self.VALID_SCRIPT})
+        data_file = data_source.datafiles.get(path__endswith='valid.py')
+
+        form = ScriptFileForm(data={'data_file': data_file.pk}, instance=self._new_module())
+        self.assertTrue(form.is_valid())
+        module = form.save()
+
+        self.assertEqual(ScriptModule.objects.count(), 1)
+        self.assertEqual(
+            {script.name for script in module.scripts.all()},
+            {'FirstScript', 'SecondScript'},
+        )
+
+    def test_corrected_script_recovers(self):
+        """After a broken script is rejected, syncing a corrected version succeeds without a uniqueness deadlock."""
+        temp_dir = tempfile.TemporaryDirectory()
+        self.addCleanup(temp_dir.cleanup)
+        scripts_dir = Path(temp_dir.name) / "scripts"
+        scripts_dir.mkdir(parents=True, exist_ok=True)
+
+        data_source = DataSource(name="Recovery", type="local", source_url=str(scripts_dir))
+        data_source.save()
+
+        # First sync: broken script is rejected, nothing created
+        self._write(scripts_dir, 'myscript.py', self.BROKEN_SCRIPT)
+        data_source.sync()
+        data_file = data_source.datafiles.get(path__endswith='myscript.py')
+        form = ScriptFileForm(data={'data_file': data_file.pk}, instance=self._new_module())
+        self.assertFalse(form.is_valid())
+        self.assertEqual(ScriptModule.objects.count(), 0)
+
+        # Correct the script and re-sync: now it should be accepted
+        self._write(scripts_dir, 'myscript.py', self.VALID_SCRIPT)
+        data_source.sync()
+        data_file = data_source.datafiles.get(path__endswith='myscript.py')
+        form = ScriptFileForm(data={'data_file': data_file.pk}, instance=self._new_module())
+        self.assertTrue(form.is_valid())
+        module = form.save()
+        self.assertEqual(
+            {script.name for script in module.scripts.all()},
+            {'FirstScript', 'SecondScript'},
+        )
+
+    def test_broken_script_via_upload_is_rejected(self):
+        """Regression guard: the upload_file path still validates content."""
+        upload_file = SimpleUploadedFile(name='broken.py', content=self.BROKEN_SCRIPT.encode())
+        form = ScriptFileForm(files={'upload_file': upload_file}, instance=self._new_module())
+
+        self.assertFalse(form.is_valid())
+        self.assertIn(NON_FIELD_ERRORS, form.errors)
+
+    def test_valid_script_via_upload_is_accepted(self):
+        """Regression guard: a valid uploaded script still validates."""
+        upload_file = SimpleUploadedFile(name='valid.py', content=self.VALID_SCRIPT.encode())
+        form = ScriptFileForm(files={'upload_file': upload_file}, instance=self._new_module())
+
+        self.assertTrue(form.is_valid())