|
@@ -1,9 +1,12 @@
|
|
|
|
|
+import tempfile
|
|
|
|
|
+from pathlib import Path
|
|
|
from unittest import skipIf
|
|
from unittest import skipIf
|
|
|
from unittest.mock import patch
|
|
from unittest.mock import patch
|
|
|
|
|
|
|
|
from django.test import TestCase
|
|
from django.test import TestCase
|
|
|
|
|
|
|
|
-from core.data_backends import url_has_embedded_credentials
|
|
|
|
|
|
|
+from core.data_backends import S3Backend, url_has_embedded_credentials
|
|
|
|
|
+from core.exceptions import SyncError
|
|
|
|
|
|
|
|
try:
|
|
try:
|
|
|
import dulwich # noqa: F401
|
|
import dulwich # noqa: F401
|
|
@@ -114,3 +117,49 @@ class GitBackendCredentialIntegrationTestCase(TestCase):
|
|
|
|
|
|
|
|
self.assertEqual(kwargs.get('username'), None)
|
|
self.assertEqual(kwargs.get('username'), None)
|
|
|
self.assertEqual(kwargs.get('password'), None)
|
|
self.assertEqual(kwargs.get('password'), None)
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+class S3BackendKeyResolutionTestCase(TestCase):
|
|
|
|
|
+ """
|
|
|
|
|
+ Verify that S3Backend resolves object keys to local paths safely, rejecting any
|
|
|
|
|
+ key that would write outside of the local temporary directory.
|
|
|
|
|
+ """
|
|
|
|
|
+
|
|
|
|
|
+ def setUp(self):
|
|
|
|
|
+ self._tmp = tempfile.TemporaryDirectory()
|
|
|
|
|
+ self.addCleanup(self._tmp.cleanup)
|
|
|
|
|
+ self.local_root = Path(self._tmp.name).resolve()
|
|
|
|
|
+
|
|
|
|
|
+ def test_simple_key_resolves_under_root(self):
|
|
|
|
|
+ result = S3Backend._resolve_local_path(self.local_root, 'data/file.yaml')
|
|
|
|
|
+ self.assertEqual(result, self.local_root / 'data' / 'file.yaml')
|
|
|
|
|
+
|
|
|
|
|
+ def test_nested_key_resolves_under_root(self):
|
|
|
|
|
+ result = S3Backend._resolve_local_path(self.local_root, 'a/b/c/d.txt')
|
|
|
|
|
+ self.assertEqual(result, self.local_root / 'a' / 'b' / 'c' / 'd.txt')
|
|
|
|
|
+
|
|
|
|
|
+ def test_absolute_key_is_contained(self):
|
|
|
|
|
+ # A key beginning with '/' must not escape the local root
|
|
|
|
|
+ result = S3Backend._resolve_local_path(self.local_root, '/etc/passwd')
|
|
|
|
|
+ self.assertEqual(result, self.local_root / 'etc' / 'passwd')
|
|
|
|
|
+ self.assertTrue(result.is_relative_to(self.local_root))
|
|
|
|
|
+
|
|
|
|
|
+ def test_parent_traversal_is_rejected(self):
|
|
|
|
|
+ with self.assertRaises(SyncError):
|
|
|
|
|
+ S3Backend._resolve_local_path(self.local_root, '../escape.txt')
|
|
|
|
|
+
|
|
|
|
|
+ def test_nested_parent_traversal_is_rejected(self):
|
|
|
|
|
+ with self.assertRaises(SyncError):
|
|
|
|
|
+ S3Backend._resolve_local_path(self.local_root, 'foo/../../escape.txt')
|
|
|
|
|
+
|
|
|
|
|
+ def test_absolute_parent_traversal_is_rejected(self):
|
|
|
|
|
+ with self.assertRaises(SyncError):
|
|
|
|
|
+ S3Backend._resolve_local_path(self.local_root, '/../escape.txt')
|
|
|
|
|
+
|
|
|
|
|
+ def test_empty_key_is_rejected(self):
|
|
|
|
|
+ with self.assertRaises(SyncError):
|
|
|
|
|
+ S3Backend._resolve_local_path(self.local_root, '')
|
|
|
|
|
+
|
|
|
|
|
+ def test_root_key_is_rejected(self):
|
|
|
|
|
+ with self.assertRaises(SyncError):
|
|
|
|
|
+ S3Backend._resolve_local_path(self.local_root, '/')
|