Bläddra i källkod

Fixes #22283: Improve resolution of file path for imported S3 objects

Jeremy Stretch 14 timmar sedan
förälder
incheckning
f4196bc47c
2 ändrade filer med 69 tillägg och 6 borttagningar
  1. 19 5
      netbox/core/data_backends.py
  2. 50 1
      netbox/core/tests/test_data_backends.py

+ 19 - 5
netbox/core/data_backends.py

@@ -1,9 +1,8 @@
 import logging
-import os
 import re
 import tempfile
 from contextlib import contextmanager
-from pathlib import Path
+from pathlib import Path, PurePosixPath
 from urllib.parse import urlparse
 
 from django import forms
@@ -171,6 +170,7 @@ class S3Backend(DataBackend):
         import boto3
 
         local_path = tempfile.TemporaryDirectory()
+        local_root = Path(local_path.name).resolve()
 
         # Initialize the S3 resource and bucket
         aws_access_key_id = self.params.get('aws_access_key_id')
@@ -187,15 +187,29 @@ class S3Backend(DataBackend):
 
         # Download all files within the specified path
         for obj in bucket.objects.filter(Prefix=self._remote_path):
-            local_filename = os.path.join(local_path.name, obj.key)
+            local_filename = self._resolve_local_path(local_root, obj.key)
             # Build local path
-            Path(os.path.dirname(local_filename)).mkdir(parents=True, exist_ok=True)
-            bucket.download_file(obj.key, local_filename)
+            local_filename.parent.mkdir(parents=True, exist_ok=True)
+            bucket.download_file(obj.key, str(local_filename))
 
         yield local_path.name
 
         local_path.cleanup()
 
+    @staticmethod
+    def _resolve_local_path(local_root, key):
+        # S3 object keys are POSIX-style paths. Strip any leading separator so the key
+        # joins onto the temp directory rather than replacing it, then ensure the
+        # resolved destination remains within the temp directory to prevent path
+        # traversal via crafted object keys.
+        key_parts = PurePosixPath(key.lstrip('/')).parts
+        local_filename = local_root.joinpath(*key_parts).resolve()
+        if not local_filename.is_relative_to(local_root) or local_filename == local_root:
+            raise SyncError(
+                _("Invalid S3 object key '{key}': resolves outside of the local data directory").format(key=key)
+            )
+        return local_filename
+
     @property
     def _region_name(self):
         domain = urlparse(self.url).netloc

+ 50 - 1
netbox/core/tests/test_data_backends.py

@@ -1,9 +1,12 @@
+import tempfile
+from pathlib import Path
 from unittest import skipIf
 from unittest.mock import patch
 
 from django.test import TestCase
 
-from core.data_backends import url_has_embedded_credentials
+from core.data_backends import S3Backend, url_has_embedded_credentials
+from core.exceptions import SyncError
 
 try:
     import dulwich  # noqa: F401
@@ -114,3 +117,49 @@ class GitBackendCredentialIntegrationTestCase(TestCase):
 
         self.assertEqual(kwargs.get('username'), None)
         self.assertEqual(kwargs.get('password'), None)
+
+
+class S3BackendKeyResolutionTestCase(TestCase):
+    """
+    Verify that S3Backend resolves object keys to local paths safely, rejecting any
+    key that would write outside of the local temporary directory.
+    """
+
+    def setUp(self):
+        self._tmp = tempfile.TemporaryDirectory()
+        self.addCleanup(self._tmp.cleanup)
+        self.local_root = Path(self._tmp.name).resolve()
+
+    def test_simple_key_resolves_under_root(self):
+        result = S3Backend._resolve_local_path(self.local_root, 'data/file.yaml')
+        self.assertEqual(result, self.local_root / 'data' / 'file.yaml')
+
+    def test_nested_key_resolves_under_root(self):
+        result = S3Backend._resolve_local_path(self.local_root, 'a/b/c/d.txt')
+        self.assertEqual(result, self.local_root / 'a' / 'b' / 'c' / 'd.txt')
+
+    def test_absolute_key_is_contained(self):
+        # A key beginning with '/' must not escape the local root
+        result = S3Backend._resolve_local_path(self.local_root, '/etc/passwd')
+        self.assertEqual(result, self.local_root / 'etc' / 'passwd')
+        self.assertTrue(result.is_relative_to(self.local_root))
+
+    def test_parent_traversal_is_rejected(self):
+        with self.assertRaises(SyncError):
+            S3Backend._resolve_local_path(self.local_root, '../escape.txt')
+
+    def test_nested_parent_traversal_is_rejected(self):
+        with self.assertRaises(SyncError):
+            S3Backend._resolve_local_path(self.local_root, 'foo/../../escape.txt')
+
+    def test_absolute_parent_traversal_is_rejected(self):
+        with self.assertRaises(SyncError):
+            S3Backend._resolve_local_path(self.local_root, '/../escape.txt')
+
+    def test_empty_key_is_rejected(self):
+        with self.assertRaises(SyncError):
+            S3Backend._resolve_local_path(self.local_root, '')
+
+    def test_root_key_is_rejected(self):
+        with self.assertRaises(SyncError):
+            S3Backend._resolve_local_path(self.local_root, '/')