Browse Source

fix(extras): Ensure unique Image Attachment names on S3

Make image attachment filename generation use Django's base collision
handling so overwrite-style storage backends behave like local file
storage.

This preserves the original filename for the first upload, adds a
suffix only on collision, and avoids duplicate image paths in object
change records.

Add regression tests for path generation and collision handling.

Fixes #21801
Martin Hauser 8 hours ago
parent
commit
e864dc3ae0
3 changed files with 190 additions and 26 deletions
  1. 85 0
      netbox/extras/tests/test_models.py
  2. 82 17
      netbox/extras/tests/test_utils.py
  3. 23 9
      netbox/extras/utils.py

+ 85 - 0
netbox/extras/tests/test_models.py

@@ -1,10 +1,15 @@
+import io
 import tempfile
 import tempfile
 from pathlib import Path
 from pathlib import Path
+from unittest.mock import patch
 
 
 from django.contrib.contenttypes.models import ContentType
 from django.contrib.contenttypes.models import ContentType
+from django.core.files.base import ContentFile
+from django.core.files.storage import Storage
 from django.core.files.uploadedfile import SimpleUploadedFile
 from django.core.files.uploadedfile import SimpleUploadedFile
 from django.forms import ValidationError
 from django.forms import ValidationError
 from django.test import TestCase, tag
 from django.test import TestCase, tag
+from PIL import Image
 
 
 from core.models import AutoSyncRecord, DataSource, ObjectType
 from core.models import AutoSyncRecord, DataSource, ObjectType
 from dcim.models import Device, DeviceRole, DeviceType, Location, Manufacturer, Platform, Region, Site, SiteGroup
 from dcim.models import Device, DeviceRole, DeviceType, Location, Manufacturer, Platform, Region, Site, SiteGroup
@@ -14,10 +19,50 @@ from utilities.exceptions import AbortRequest
 from virtualization.models import Cluster, ClusterGroup, ClusterType, VirtualMachine
 from virtualization.models import Cluster, ClusterGroup, ClusterType, VirtualMachine
 
 
 
 
+class OverwriteStyleMemoryStorage(Storage):
+    """
+    In-memory storage that mimics overwrite-style backends by returning the
+    incoming name unchanged from get_available_name().
+    """
+
+    def __init__(self):
+        self.files = {}
+
+    def _open(self, name, mode='rb'):
+        return ContentFile(self.files[name], name=name)
+
+    def _save(self, name, content):
+        self.files[name] = content.read()
+        return name
+
+    def delete(self, name):
+        self.files.pop(name, None)
+
+    def exists(self, name):
+        return name in self.files
+
+    def get_available_name(self, name, max_length=None):
+        return name
+
+    def get_alternative_name(self, file_root, file_ext):
+        return f'{file_root}_sdmmer4{file_ext}'
+
+    def listdir(self, path):
+        return [], list(self.files)
+
+    def size(self, name):
+        return len(self.files[name])
+
+    def url(self, name):
+        return f'https://example.invalid/{name}'
+
+
 class ImageAttachmentTests(TestCase):
 class ImageAttachmentTests(TestCase):
     @classmethod
     @classmethod
     def setUpTestData(cls):
     def setUpTestData(cls):
         cls.ct_rack = ContentType.objects.get_by_natural_key('dcim', 'rack')
         cls.ct_rack = ContentType.objects.get_by_natural_key('dcim', 'rack')
+        cls.ct_site = ContentType.objects.get_by_natural_key('dcim', 'site')
+        cls.site = Site.objects.create(name='Site 1')
         cls.image_content = b''
         cls.image_content = b''
 
 
     def _stub_image_attachment(self, object_id, image_filename, name=None):
     def _stub_image_attachment(self, object_id, image_filename, name=None):
@@ -41,6 +86,15 @@ class ImageAttachmentTests(TestCase):
         )
         )
         return ia
         return ia
 
 
+    def _uploaded_png(self, filename):
+        image = io.BytesIO()
+        Image.new('RGB', (1, 1)).save(image, format='PNG')
+        return SimpleUploadedFile(
+            name=filename,
+            content=image.getvalue(),
+            content_type='image/png',
+        )
+
     def test_filename_strips_expected_prefix(self):
     def test_filename_strips_expected_prefix(self):
         """
         """
         Tests that the filename of the image attachment is stripped of the expected
         Tests that the filename of the image attachment is stripped of the expected
@@ -89,6 +143,37 @@ class ImageAttachmentTests(TestCase):
         ia = self._stub_image_attachment(12, 'image-attachments/rack_12_file.png', name='')
         ia = self._stub_image_attachment(12, 'image-attachments/rack_12_file.png', name='')
         self.assertEqual('file.png', str(ia))
         self.assertEqual('file.png', str(ia))
 
 
+    def test_duplicate_uploaded_names_get_suffixed_with_overwrite_style_storage(self):
+        storage = OverwriteStyleMemoryStorage()
+        field = ImageAttachment._meta.get_field('image')
+
+        with patch.object(field, 'storage', storage):
+            first = ImageAttachment(
+                object_type=self.ct_site,
+                object_id=self.site.pk,
+                image=self._uploaded_png('action-buttons.png'),
+            )
+            first.save()
+
+            second = ImageAttachment(
+                object_type=self.ct_site,
+                object_id=self.site.pk,
+                image=self._uploaded_png('action-buttons.png'),
+            )
+            second.save()
+
+        base_name = f'image-attachments/site_{self.site.pk}_action-buttons.png'
+        suffixed_name = f'image-attachments/site_{self.site.pk}_action-buttons_sdmmer4.png'
+
+        self.assertEqual(first.image.name, base_name)
+        self.assertEqual(second.image.name, suffixed_name)
+        self.assertNotEqual(first.image.name, second.image.name)
+
+        self.assertEqual(first.filename, 'action-buttons.png')
+        self.assertEqual(second.filename, 'action-buttons_sdmmer4.png')
+
+        self.assertCountEqual(storage.files.keys(), {base_name, suffixed_name})
+
 
 
 class TagTest(TestCase):
 class TagTest(TestCase):
 
 

+ 82 - 17
netbox/extras/tests/test_utils.py

@@ -1,10 +1,12 @@
 from types import SimpleNamespace
 from types import SimpleNamespace
+from unittest.mock import patch
 
 
 from django.contrib.contenttypes.models import ContentType
 from django.contrib.contenttypes.models import ContentType
+from django.core.files.storage import Storage
 from django.test import TestCase
 from django.test import TestCase
 
 
-from extras.models import ExportTemplate
-from extras.utils import filename_from_model, image_upload
+from extras.models import ExportTemplate, ImageAttachment
+from extras.utils import _build_image_attachment_path, filename_from_model, image_upload
 from tenancy.models import ContactGroup, TenantGroup
 from tenancy.models import ContactGroup, TenantGroup
 from wireless.models import WirelessLANGroup
 from wireless.models import WirelessLANGroup
 
 
@@ -22,6 +24,25 @@ class FilenameFromModelTests(TestCase):
             self.assertEqual(filename_from_model(model), expected)
             self.assertEqual(filename_from_model(model), expected)
 
 
 
 
+class OverwriteStyleStorage(Storage):
+    """
+    Mimic an overwrite-style backend (for example, S3 with file_overwrite=True),
+    where get_available_name() returns the incoming name unchanged.
+    """
+
+    def __init__(self, existing_names=None):
+        self.existing_names = set(existing_names or [])
+
+    def exists(self, name):
+        return name in self.existing_names
+
+    def get_available_name(self, name, max_length=None):
+        return name
+
+    def get_alternative_name(self, file_root, file_ext):
+        return f'{file_root}_sdmmer4{file_ext}'
+
+
 class ImageUploadTests(TestCase):
 class ImageUploadTests(TestCase):
     @classmethod
     @classmethod
     def setUpTestData(cls):
     def setUpTestData(cls):
@@ -31,16 +52,18 @@ class ImageUploadTests(TestCase):
 
 
     def _stub_instance(self, object_id=12, name=None):
     def _stub_instance(self, object_id=12, name=None):
         """
         """
-        Creates a minimal stub for use with the `image_upload()` function.
-
-        This method generates an instance of `SimpleNamespace` containing a set
-        of attributes required to simulate the expected input for the
-        `image_upload()` method.
-        It is designed to simplify testing or processing by providing a
-        lightweight representation of an object.
+        Creates a minimal stub for use with image attachment path generation.
         """
         """
         return SimpleNamespace(object_type=self.ct_rack, object_id=object_id, name=name)
         return SimpleNamespace(object_type=self.ct_rack, object_id=object_id, name=name)
 
 
+    def _bound_instance(self, *, storage, object_id=12, name=None, max_length=100):
+        return SimpleNamespace(
+            object_type=self.ct_rack,
+            object_id=object_id,
+            name=name,
+            image=SimpleNamespace(field=SimpleNamespace(storage=storage, max_length=max_length)),
+        )
+
     def _second_segment(self, path: str):
     def _second_segment(self, path: str):
         """
         """
         Extracts and returns the portion of the input string after the
         Extracts and returns the portion of the input string after the
@@ -53,7 +76,7 @@ class ImageUploadTests(TestCase):
         Tests handling of a Windows file path with a fake directory and extension.
         Tests handling of a Windows file path with a fake directory and extension.
         """
         """
         inst = self._stub_instance(name=None)
         inst = self._stub_instance(name=None)
-        path = image_upload(inst, r'C:\fake_path\MyPhoto.JPG')
+        path = _build_image_attachment_path(inst, r'C:\fake_path\MyPhoto.JPG')
         # Base directory and single-level path
         # Base directory and single-level path
         seg2 = self._second_segment(path)
         seg2 = self._second_segment(path)
         self.assertTrue(path.startswith('image-attachments/rack_12_'))
         self.assertTrue(path.startswith('image-attachments/rack_12_'))
@@ -67,7 +90,7 @@ class ImageUploadTests(TestCase):
         create subdirectories.
         create subdirectories.
         """
         """
         inst = self._stub_instance(name='5/31/23')
         inst = self._stub_instance(name='5/31/23')
-        path = image_upload(inst, 'image.png')
+        path = _build_image_attachment_path(inst, 'image.png')
         seg2 = self._second_segment(path)
         seg2 = self._second_segment(path)
         self.assertTrue(seg2.startswith('rack_12_'))
         self.assertTrue(seg2.startswith('rack_12_'))
         self.assertNotIn('/', seg2)
         self.assertNotIn('/', seg2)
@@ -80,7 +103,7 @@ class ImageUploadTests(TestCase):
         into a single directory name without creating subdirectories.
         into a single directory name without creating subdirectories.
         """
         """
         inst = self._stub_instance(name=r'5\31\23')
         inst = self._stub_instance(name=r'5\31\23')
-        path = image_upload(inst, 'image_name.png')
+        path = _build_image_attachment_path(inst, 'image_name.png')
 
 
         seg2 = self._second_segment(path)
         seg2 = self._second_segment(path)
         self.assertTrue(seg2.startswith('rack_12_'))
         self.assertTrue(seg2.startswith('rack_12_'))
@@ -93,7 +116,7 @@ class ImageUploadTests(TestCase):
         Tests the output path format generated by the `image_upload` function.
         Tests the output path format generated by the `image_upload` function.
         """
         """
         inst = self._stub_instance(object_id=99, name='label')
         inst = self._stub_instance(object_id=99, name='label')
-        path = image_upload(inst, 'a.webp')
+        path = _build_image_attachment_path(inst, 'a.webp')
         # The second segment must begin with "rack_99_"
         # The second segment must begin with "rack_99_"
         seg2 = self._second_segment(path)
         seg2 = self._second_segment(path)
         self.assertTrue(seg2.startswith('rack_99_'))
         self.assertTrue(seg2.startswith('rack_99_'))
@@ -105,7 +128,7 @@ class ImageUploadTests(TestCase):
         is omitted.
         is omitted.
         """
         """
         inst = self._stub_instance(name='test')
         inst = self._stub_instance(name='test')
-        path = image_upload(inst, 'document.txt')
+        path = _build_image_attachment_path(inst, 'document.txt')
 
 
         seg2 = self._second_segment(path)
         seg2 = self._second_segment(path)
         self.assertTrue(seg2.startswith('rack_12_test'))
         self.assertTrue(seg2.startswith('rack_12_test'))
@@ -121,7 +144,7 @@ class ImageUploadTests(TestCase):
         # Suppose the instance name has surrounding whitespace and
         # Suppose the instance name has surrounding whitespace and
         # extra slashes.
         # extra slashes.
         inst = self._stub_instance(name='  my/complex\\name  ')
         inst = self._stub_instance(name='  my/complex\\name  ')
-        path = image_upload(inst, 'irrelevant.png')
+        path = _build_image_attachment_path(inst, 'irrelevant.png')
 
 
         # The output should be flattened and sanitized.
         # The output should be flattened and sanitized.
         # We expect the name to be transformed into a valid filename without
         # We expect the name to be transformed into a valid filename without
@@ -141,7 +164,7 @@ class ImageUploadTests(TestCase):
         for name in ['2025/09/12', r'2025\09\12']:
         for name in ['2025/09/12', r'2025\09\12']:
             with self.subTest(name=name):
             with self.subTest(name=name):
                 inst = self._stub_instance(name=name)
                 inst = self._stub_instance(name=name)
-                path = image_upload(inst, 'x.jpeg')
+                path = _build_image_attachment_path(inst, 'x.jpeg')
                 seg2 = self._second_segment(path)
                 seg2 = self._second_segment(path)
                 self.assertTrue(seg2.startswith('rack_12_'))
                 self.assertTrue(seg2.startswith('rack_12_'))
                 self.assertNotIn('/', seg2)
                 self.assertNotIn('/', seg2)
@@ -154,7 +177,49 @@ class ImageUploadTests(TestCase):
         SuspiciousFileOperation, the fallback default is used.
         SuspiciousFileOperation, the fallback default is used.
         """
         """
         inst = self._stub_instance(name=' ')
         inst = self._stub_instance(name=' ')
-        path = image_upload(inst, 'sample.png')
+        path = _build_image_attachment_path(inst, 'sample.png')
         # Expect the fallback name 'unnamed' to be used.
         # Expect the fallback name 'unnamed' to be used.
         self.assertIn('unnamed', path)
         self.assertIn('unnamed', path)
         self.assertTrue(path.startswith('image-attachments/rack_12_'))
         self.assertTrue(path.startswith('image-attachments/rack_12_'))
+
+    def test_image_upload_preserves_original_name_when_available(self):
+        inst = self._bound_instance(
+            storage=OverwriteStyleStorage(),
+            name='action-buttons',
+        )
+
+        path = image_upload(inst, 'action-buttons.png')
+
+        self.assertEqual(path, 'image-attachments/rack_12_action-buttons.png')
+
+    def test_image_upload_uses_base_collision_handling_with_overwrite_style_storage(self):
+        inst = self._bound_instance(
+            storage=OverwriteStyleStorage(existing_names={'image-attachments/rack_12_action-buttons.png'}),
+            name='action-buttons',
+        )
+
+        path = image_upload(inst, 'action-buttons.png')
+
+        self.assertEqual(
+            path,
+            'image-attachments/rack_12_action-buttons_sdmmer4.png',
+        )
+
+    def test_image_field_generate_filename_uses_image_upload_collision_handling(self):
+        field = ImageAttachment._meta.get_field('image')
+        instance = ImageAttachment(
+            object_type=self.ct_rack,
+            object_id=12,
+        )
+
+        with patch.object(
+            field,
+            'storage',
+            OverwriteStyleStorage(existing_names={'image-attachments/rack_12_action-buttons.png'}),
+        ):
+            path = field.generate_filename(instance, 'action-buttons.png')
+
+        self.assertEqual(
+            path,
+            'image-attachments/rack_12_action-buttons_sdmmer4.png',
+        )

+ 23 - 9
netbox/extras/utils.py

@@ -2,7 +2,7 @@ import importlib
 from pathlib import Path
 from pathlib import Path
 
 
 from django.core.exceptions import ImproperlyConfigured, SuspiciousFileOperation
 from django.core.exceptions import ImproperlyConfigured, SuspiciousFileOperation
-from django.core.files.storage import default_storage
+from django.core.files.storage import Storage, default_storage
 from django.core.files.utils import validate_file_name
 from django.core.files.utils import validate_file_name
 from django.db import models
 from django.db import models
 from django.db.models import Q
 from django.db.models import Q
@@ -67,15 +67,13 @@ def is_taggable(obj):
     return False
     return False
 
 
 
 
-def image_upload(instance, filename):
+def _build_image_attachment_path(instance, filename, *, storage=default_storage):
     """
     """
-    Return a path for uploading image attachments.
+    Build a deterministic relative path for an image attachment.
 
 
     - Normalizes browser paths (e.g., C:\\fake_path\\photo.jpg)
     - Normalizes browser paths (e.g., C:\\fake_path\\photo.jpg)
     - Uses the instance.name if provided (sanitized to a *basename*, no ext)
     - Uses the instance.name if provided (sanitized to a *basename*, no ext)
     - Prefixes with a machine-friendly identifier
     - Prefixes with a machine-friendly identifier
-
-    Note: Relies on Django's default_storage utility.
     """
     """
     upload_dir = 'image-attachments'
     upload_dir = 'image-attachments'
     default_filename = 'unnamed'
     default_filename = 'unnamed'
@@ -92,22 +90,38 @@ def image_upload(instance, filename):
     # Rely on Django's get_valid_filename to perform sanitization.
     # Rely on Django's get_valid_filename to perform sanitization.
     stem = (instance.name or file_path.stem).strip()
     stem = (instance.name or file_path.stem).strip()
     try:
     try:
-        safe_stem = default_storage.get_valid_name(stem)
+        safe_stem = storage.get_valid_name(stem)
     except SuspiciousFileOperation:
     except SuspiciousFileOperation:
         safe_stem = default_filename
         safe_stem = default_filename
 
 
     # Append the uploaded extension only if it's an allowed image type
     # Append the uploaded extension only if it's an allowed image type
-    final_name = f"{safe_stem}.{ext}" if ext in allowed_img_extensions else safe_stem
+    final_name = f'{safe_stem}.{ext}' if ext in allowed_img_extensions else safe_stem
 
 
     # Create a machine-friendly prefix from the instance
     # Create a machine-friendly prefix from the instance
-    prefix = f"{instance.object_type.model}_{instance.object_id}"
-    name_with_path = f"{upload_dir}/{prefix}_{final_name}"
+    prefix = f'{instance.object_type.model}_{instance.object_id}'
+    name_with_path = f'{upload_dir}/{prefix}_{final_name}'
 
 
     # Validate the generated relative path (blocks absolute/traversal)
     # Validate the generated relative path (blocks absolute/traversal)
     validate_file_name(name_with_path, allow_relative_path=True)
     validate_file_name(name_with_path, allow_relative_path=True)
     return name_with_path
     return name_with_path
 
 
 
 
+def image_upload(instance, filename):
+    """
+    Return a relative upload path for an image attachment, applying Django's
+    usual suffix-on-collision behavior regardless of storage backend.
+    """
+    field = instance.image.field
+    name_with_path = _build_image_attachment_path(instance, filename, storage=field.storage)
+
+    # Intentionally call Django's base Storage implementation here. Some
+    # backends override get_available_name() to reuse the incoming name
+    # unchanged, but we want Django's normal suffix-on-collision behavior
+    # while still dispatching exists() / get_alternative_name() to the
+    # configured storage instance.
+    return Storage.get_available_name(field.storage, name_with_path, max_length=field.max_length)
+
+
 def is_script(obj):
 def is_script(obj):
     """
     """
     Returns True if the object is a Script or Report.
     Returns True if the object is a Script or Report.