Procházet zdrojové kódy

fix(extras): Prevent Script uploads from overwriting files (#22554)

Reject duplicate ScriptModule uploads before writing to storage to
prevent failed uploads from corrupting existing files. Add existence
check in cleanup path to avoid deleting files referenced by concurrent
uploads that won the race.

Fixes #22543
Martin Hauser před 1 týdnem
rodič
revize
9f140e6442

+ 12 - 1
netbox/extras/api/serializers_/scripts.py

@@ -37,6 +37,13 @@ class ScriptModuleSerializer(ValidatedModelSerializer):
         # Pop 'file' before model instantiation — ScriptModule has no such field.
         file = data.pop('file', None)
         data['file_root'] = ManagedFileRootPathChoices.SCRIPTS
+
+        # Reject duplicates before writing to storage so a failed upload can't touch the existing file
+        if file is not None and ScriptModule.objects.filter(
+            file_root=ManagedFileRootPathChoices.SCRIPTS, file_path=file.name
+        ).exists():
+            raise serializers.ValidationError(_("A script module with this file name already exists."))
+
         data = super().validate(data)
         data.pop('file_root', None)
         if file is not None:
@@ -68,7 +75,11 @@ class ScriptModuleSerializer(ValidatedModelSerializer):
                 )
             raise
         finally:
-            if not created and (file_path := validated_data.get('file_path')):
+            # Don't delete a path another ScriptModule still references (e.g. a concurrent upload won the race)
+            file_path = validated_data.get('file_path')
+            if not created and file_path and not ScriptModule.objects.filter(
+                file_root=ManagedFileRootPathChoices.SCRIPTS, file_path=file_path
+            ).exists():
                 try:
                     storage.delete(file_path)
                 except Exception:

+ 67 - 0
netbox/extras/tests/test_api.py

@@ -1581,6 +1581,27 @@ class NotificationTestCase(APIViewTestCases.APIViewTestCase):
         ]
 
 
+class _InMemoryScriptStorage:
+    """Stateful stand-in for the scripts storage backend; mimics allow_overwrite=True."""
+
+    def __init__(self):
+        self.files = {}
+
+    def save(self, name, content):
+        content.seek(0)
+        self.files[name] = content.read()
+        return name
+
+    def open(self, name, mode='rb'):
+        return io.BytesIO(self.files[name])
+
+    def delete(self, name):
+        self.files.pop(name, None)
+
+    def exists(self, name):
+        return name in self.files
+
+
 class ScriptModuleTestCase(APITestCase):
     """
     Tests for the POST /api/extras/scripts/upload/ endpoint.
@@ -1653,6 +1674,52 @@ class ScriptModuleTestCase(APITestCase):
         self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
         self.assertFalse(ScriptModule.objects.filter(file_path='test_faulty.py').exists())
 
+    def test_upload_duplicate_script_module_preserves_existing_file(self):
+        """A duplicate-filename upload returns 400 and leaves the existing file unchanged."""
+        self.add_permissions('extras.add_scriptmodule', 'core.add_managedfile')
+        original_content = (
+            b"from extras.scripts import Script\n\n\n"
+            b"class ProbeScript(Script):\n    def run(self, data, commit):\n        return 'v1'\n"
+        )
+        updated_content = original_content.replace(b"'v1'", b"'v2'")
+
+        fake_storage = _InMemoryScriptStorage()
+
+        with (
+            patch('extras.api.serializers_.scripts.storages') as mock_serializer_storages,
+            patch('extras.models.mixins.storages') as mock_module_storages,
+        ):
+            mock_serializer_storages.create_storage.return_value = fake_storage
+            mock_serializer_storages.backends = {'scripts': {}}
+            mock_module_storages.__getitem__.return_value = fake_storage
+
+            # First upload succeeds and writes the file
+            response = self.client.post(
+                self.url,
+                {'file': SimpleUploadedFile('zz_probe.py', original_content, content_type='text/plain')},
+                format='multipart',
+                **self.header,
+            )
+            self.assertHttpStatus(response, status.HTTP_201_CREATED)
+            self.assertEqual(fake_storage.files['zz_probe.py'], original_content)
+
+            # Re-uploading the same filename with different content must be rejected
+            response = self.client.post(
+                self.url,
+                {'file': SimpleUploadedFile('zz_probe.py', updated_content, content_type='text/plain')},
+                format='multipart',
+                **self.header,
+            )
+            self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
+            self.assertIn('already exists', str(response.data))
+
+            # Existing file must survive intact: neither deleted nor overwritten with v2
+            self.assertTrue(fake_storage.exists('zz_probe.py'))
+            self.assertEqual(fake_storage.files['zz_probe.py'], original_content)
+
+        # Exactly one ScriptModule remains, still pointing at the original file
+        self.assertEqual(ScriptModule.objects.filter(file_path='zz_probe.py').count(), 1)
+
     def test_upload_script_module_without_file_fails(self):
         self.add_permissions('extras.add_scriptmodule', 'core.add_managedfile')
         response = self.client.post(self.url, {}, format='json', **self.header)