Explorar el Código

fix(extras): Reject unknown custom fields

Add validation to reject unknown custom field names during API updates.
Ensure model.clean() normalization is preserved in serializers to remove
stale custom field data from both the database and change logs.
Filter stale keys during serialization to prevent lingering references.

Fixes #21529
Martin Hauser hace 7 horas
padre
commit
8bb6b9088c

+ 9 - 0
netbox/extras/api/customfields.py

@@ -85,6 +85,15 @@ class CustomFieldsDataField(Field):
                 "values."
             )
 
+        # Reject any unknown custom field names
+        valid_field_names = {cf.name for cf in self._get_custom_fields()}
+        invalid_fields = set(data.keys()) - valid_field_names
+        if invalid_fields:
+            raise ValidationError({
+                field: _("Custom field '{name}' does not exist for this object type.").format(name=field)
+                for field in sorted(invalid_fields)
+            })
+
         # Serialize object and multi-object values
         for cf in self._get_custom_fields():
             if cf.name in data and data[cf.name] not in CUSTOMFIELD_EMPTY_VALUES and cf.type in (

+ 79 - 1
netbox/extras/tests/test_customfields.py

@@ -7,7 +7,7 @@ from django.test import tag
 from django.urls import reverse
 from rest_framework import status
 
-from core.models import ObjectType
+from core.models import ObjectChange, ObjectType
 from dcim.filtersets import SiteFilterSet
 from dcim.forms import SiteImportForm
 from dcim.models import Manufacturer, Rack, Site
@@ -1194,6 +1194,84 @@ class CustomFieldAPITest(APITestCase):
             list(original_cfvs['multiobject_field'])
         )
 
+    def test_update_single_object_rejects_unknown_custom_fields(self):
+        site2 = Site.objects.get(name='Site 2')
+        original_cf_data = {**site2.custom_field_data}
+        url = reverse('dcim-api:site-detail', kwargs={'pk': site2.pk})
+        self.add_permissions('dcim.change_site')
+
+        data = {
+            'custom_fields': {
+                'text_field': 'valid',
+                'thisfieldshouldntexist': 'random text here',
+            },
+        }
+
+        response = self.client.patch(url, data, format='json', **self.header)
+        self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
+        self.assertIn('thisfieldshouldntexist', response.data['custom_fields'])
+
+        # Ensure the object was not modified
+        site2.refresh_from_db()
+        self.assertEqual(site2.custom_field_data, original_cf_data)
+
+    @tag('regression')
+    def test_update_single_object_prunes_stale_custom_field_data_from_database_and_changelog(self):
+        stale_key = 'thisfieldshouldntexist'
+        stale_value = 'random text here'
+        updated_text_value = 'ABCD'
+
+        site2 = Site.objects.get(name='Site 2')
+        object_type = ObjectType.objects.get_for_model(Site)
+        original_text_value = site2.custom_field_data['text_field']
+
+        # Seed stale custom field data directly in the database to mimic a polluted row.
+        Site.objects.filter(pk=site2.pk).update(
+            custom_field_data={
+                **site2.custom_field_data,
+                stale_key: stale_value,
+            }
+        )
+        site2.refresh_from_db()
+        self.assertIn(stale_key, site2.custom_field_data)
+
+        existing_change_ids = set(
+            ObjectChange.objects.filter(
+                changed_object_type=object_type,
+                changed_object_id=site2.pk,
+            ).values_list('pk', flat=True)
+        )
+
+        url = reverse('dcim-api:site-detail', kwargs={'pk': site2.pk})
+        self.add_permissions('dcim.change_site')
+        data = {
+            'custom_fields': {
+                'text_field': updated_text_value,
+            },
+        }
+
+        response = self.client.patch(url, data, format='json', **self.header)
+        self.assertHttpStatus(response, status.HTTP_200_OK)
+
+        site2.refresh_from_db()
+        self.assertEqual(site2.cf['text_field'], updated_text_value)
+        self.assertNotIn(stale_key, site2.custom_field_data)
+
+        objectchanges = ObjectChange.objects.filter(
+            changed_object_type=object_type,
+            changed_object_id=site2.pk,
+        ).exclude(pk__in=existing_change_ids)
+        self.assertEqual(objectchanges.count(), 1)
+
+        objectchange = objectchanges.get()
+        prechange_cf = objectchange.prechange_data['custom_fields']
+        postchange_cf = objectchange.postchange_data['custom_fields']
+
+        self.assertEqual(prechange_cf['text_field'], original_text_value)
+        self.assertEqual(postchange_cf['text_field'], updated_text_value)
+        self.assertNotIn(stale_key, prechange_cf)
+        self.assertNotIn(stale_key, postchange_cf)
+
     def test_specify_related_object_by_attr(self):
         site1 = Site.objects.get(name='Site 1')
         vlans = VLAN.objects.all()[:3]

+ 4 - 3
netbox/netbox/api/serializers/base.py

@@ -95,9 +95,6 @@ class ValidatedModelSerializer(BaseModelSerializer):
 
         attrs = data.copy()
 
-        # Remove custom field data (if any) prior to model validation
-        attrs.pop('custom_fields', None)
-
         # Skip ManyToManyFields
         opts = self.Meta.model._meta
         m2m_values = {}
@@ -116,4 +113,8 @@ class ValidatedModelSerializer(BaseModelSerializer):
         # Skip uniqueness validation of individual fields inside `full_clean()` (this is handled by the serializer)
         instance.full_clean(validate_unique=False)
 
+        # Preserve any normalization performed by model.clean() (e.g. stale custom field pruning)
+        if 'custom_field_data' in attrs:
+            data['custom_field_data'] = instance.custom_field_data
+
         return data

+ 6 - 1
netbox/utilities/serialization.py

@@ -31,7 +31,12 @@ def serialize_object(obj, resolve_tags=True, extra=None, exclude=None):
 
     # Include custom_field_data as "custom_fields"
     if 'custom_field_data' in data:
-        data['custom_fields'] = data.pop('custom_field_data')
+        custom_field_data = data.pop('custom_field_data')
+        custom_fields = getattr(obj, 'custom_fields', None)
+        if custom_fields is not None:
+            valid_names = {cf.name for cf in custom_fields}
+            custom_field_data = {key: value for key, value in custom_field_data.items() if key in valid_names}
+        data['custom_fields'] = custom_field_data
 
     # Resolve any assigned tags to their names. Check for tags cached on the instance;
     # fall back to using the manager.