Ver Fonte

Fixes 22512: Restrict queryset when fetching custom field objects

Jeremy Stretch há 1 semana atrás
pai
commit
5359ad8ee4
2 ficheiros alterados com 92 adições e 14 exclusões
  1. 32 7
      netbox/extras/api/customfields.py
  2. 60 7
      netbox/extras/tests/test_customfields.py

+ 32 - 7
netbox/extras/api/customfields.py

@@ -14,6 +14,18 @@ from utilities.api import get_serializer_for_model
 #
 
 
+def restrict_queryset(model, request):
+    """
+    Return a queryset for the given model, restricted to the objects the requesting user is
+    permitted to view. If no request is available (e.g. during internal serialization) or the
+    model's manager does not support restriction, the unrestricted queryset is returned.
+    """
+    manager = model.objects
+    if request is not None and hasattr(manager, 'restrict'):
+        return manager.restrict(request.user, 'view')
+    return manager.all()
+
+
 class CustomFieldDefaultValues:
     """
     Return a dictionary of all CustomFields assigned to the parent model and their default values.
@@ -50,21 +62,32 @@ class CustomFieldsDataField(Field):
         from utilities.api import get_serializer_for_model
         data = {}
         cache = self.parent.context.get('cf_object_cache')
+        request = self.parent.context.get('request')
 
         for cf in self._get_custom_fields():
-            if cache is not None and cf.type in (
+            if cf.type in (
                 CustomFieldTypeChoices.TYPE_OBJECT,
                 CustomFieldTypeChoices.TYPE_MULTIOBJECT,
             ):
                 raw = obj.get(cf.name)
+                model = cf.related_object_type.model_class()
                 if raw is None:
                     value = None
-                elif cf.type == CustomFieldTypeChoices.TYPE_OBJECT:
-                    model = cf.related_object_type.model_class()
-                    value = cache.get((model, raw))
+                elif cache is not None:
+                    # Use the pre-fetched (and permission-restricted) bulk cache populated by
+                    # CustomFieldListSerializer
+                    if cf.type == CustomFieldTypeChoices.TYPE_OBJECT:
+                        value = cache.get((model, raw))
+                    else:
+                        value = [cache[(model, pk)] for pk in raw if (model, pk) in cache]
                 else:
-                    model = cf.related_object_type.model_class()
-                    value = [cache[(model, pk)] for pk in raw if (model, pk) in cache] or None
+                    # No bulk cache available (single-object serialization); query directly,
+                    # restricting to objects the requesting user is permitted to view
+                    queryset = restrict_queryset(model, request)
+                    if cf.type == CustomFieldTypeChoices.TYPE_OBJECT:
+                        value = queryset.filter(pk=raw).first()
+                    else:
+                        value = list(queryset.filter(pk__in=raw))
             else:
                 value = cf.deserialize(obj.get(cf.name))
 
@@ -124,6 +147,7 @@ class CustomFieldListSerializer(ListSerializer):
     def to_representation(self, data):
         cf_field = self.child.fields.get('custom_fields')
         if isinstance(cf_field, CustomFieldsDataField):
+            request = self.context.get('request')
             object_type_cfs = [
                 cf for cf in cf_field._get_custom_fields()
                 if cf.type in (CustomFieldTypeChoices.TYPE_OBJECT, CustomFieldTypeChoices.TYPE_MULTIOBJECT)
@@ -139,7 +163,8 @@ class CustomFieldListSerializer(ListSerializer):
                             pks.update(raw)
                         else:
                             pks.add(raw)
-                for obj in model.objects.filter(pk__in=pks):
+                # Restrict to objects the requesting user is permitted to view
+                for obj in restrict_queryset(model, request).filter(pk__in=pks):
                     cache[(model, obj.pk)] = obj
             self.child.context['cf_object_cache'] = cache
         return super().to_representation(data)

+ 60 - 7
netbox/extras/tests/test_customfields.py

@@ -15,6 +15,7 @@ from extras.choices import *
 from extras.models import CustomField, CustomFieldChoiceSet
 from ipam.models import VLAN
 from netbox.choices import CSVDelimiterChoices, ImportFormatChoices
+from users.models import ObjectPermission
 from utilities.testing import APITestCase, TestCase
 from virtualization.models import VirtualMachine
 
@@ -968,7 +969,7 @@ class CustomFieldAPITestCase(APITestCase):
         site2 = Site.objects.get(name='Site 2')
         site2_cfvs = site2.cf
         url = reverse('dcim-api:site-detail', kwargs={'pk': site2.pk})
-        self.add_permissions('dcim.view_site')
+        self.add_permissions('dcim.view_site', 'ipam.view_vlan')
 
         response = self.client.get(url, **self.header)
         self.assertEqual(response.data['name'], site2.name)
@@ -989,6 +990,58 @@ class CustomFieldAPITestCase(APITestCase):
             [obj.pk for obj in site2_cfvs['multiobject_field']]
         )
 
+    def test_object_field_omits_unviewable_objects(self):
+        """
+        Object/multi-object custom field values referencing objects the user cannot view must be
+        omitted from the response (see #22512).
+        """
+        site2 = Site.objects.get(name='Site 2')
+
+        # Single-object retrieval (no bulk cache)
+        url = reverse('dcim-api:site-detail', kwargs={'pk': site2.pk})
+        self.add_permissions('dcim.view_site')
+        response = self.client.get(url, **self.header)
+        self.assertHttpStatus(response, status.HTTP_200_OK)
+        self.assertIsNone(response.data['custom_fields']['object_field'])
+        self.assertListEqual(response.data['custom_fields']['multiobject_field'], [])
+
+        # List retrieval (bulk cache via CustomFieldListSerializer)
+        list_url = reverse('dcim-api:site-list')
+        response = self.client.get(f'{list_url}?id={site2.pk}', **self.header)
+        self.assertHttpStatus(response, status.HTTP_200_OK)
+        self.assertIsNone(response.data['results'][0]['custom_fields']['object_field'])
+        self.assertListEqual(response.data['results'][0]['custom_fields']['multiobject_field'], [])
+
+    def test_object_field_respects_object_level_permissions(self):
+        """
+        Object/multi-object custom field values must honor object-level (constrained) view
+        permissions on the related model (see #22512).
+        """
+        site2 = Site.objects.get(name='Site 2')
+        cfvs = site2.cf
+        viewable_vlan = cfvs['object_field']
+
+        self.add_permissions('dcim.view_site')
+
+        # Grant view permission for only the VLAN assigned to object_field
+        obj_perm = ObjectPermission(
+            name='Restricted VLAN access',
+            constraints={'pk': viewable_vlan.pk},
+            actions=['view']
+        )
+        obj_perm.save()
+        obj_perm.users.add(self.user)
+        obj_perm.object_types.add(ObjectType.objects.get_for_model(VLAN))
+
+        url = reverse('dcim-api:site-detail', kwargs={'pk': site2.pk})
+        response = self.client.get(url, **self.header)
+        self.assertHttpStatus(response, status.HTTP_200_OK)
+
+        # object_field references a permitted VLAN and is returned
+        self.assertEqual(response.data['custom_fields']['object_field']['id'], viewable_vlan.pk)
+        # multiobject_field references VLANs outside the constraint and is omitted
+        self.assertListEqual(response.data['custom_fields']['multiobject_field'], [])
+
     def test_create_single_object_with_defaults(self):
         """
         Create a new site with no specified custom field values and check that it received the default values.
@@ -1001,7 +1054,7 @@ class CustomFieldAPITestCase(APITestCase):
             'slug': 'site-3',
         }
         url = reverse('dcim-api:site-list')
-        self.add_permissions('dcim.add_site')
+        self.add_permissions('dcim.add_site', 'ipam.view_vlan')
 
         response = self.client.post(url, data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
@@ -1065,7 +1118,7 @@ class CustomFieldAPITestCase(APITestCase):
             },
         }
         url = reverse('dcim-api:site-list')
-        self.add_permissions('dcim.add_site')
+        self.add_permissions('dcim.add_site', 'ipam.view_vlan')
 
         response = self.client.post(url, data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
@@ -1129,7 +1182,7 @@ class CustomFieldAPITestCase(APITestCase):
             },
         )
         url = reverse('dcim-api:site-list')
-        self.add_permissions('dcim.add_site')
+        self.add_permissions('dcim.add_site', 'ipam.view_vlan')
 
         response = self.client.post(url, data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
@@ -1209,7 +1262,7 @@ class CustomFieldAPITestCase(APITestCase):
             },
         )
         url = reverse('dcim-api:site-list')
-        self.add_permissions('dcim.add_site')
+        self.add_permissions('dcim.add_site', 'ipam.view_vlan')
 
         response = self.client.post(url, data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
@@ -1266,7 +1319,7 @@ class CustomFieldAPITestCase(APITestCase):
             },
         }
         url = reverse('dcim-api:site-detail', kwargs={'pk': site2.pk})
-        self.add_permissions('dcim.change_site')
+        self.add_permissions('dcim.change_site', 'ipam.view_vlan')
 
         response = self.client.patch(url, data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_200_OK)
@@ -1389,7 +1442,7 @@ class CustomFieldAPITestCase(APITestCase):
         site1 = Site.objects.get(name='Site 1')
         vlans = VLAN.objects.all()[:3]
         url = reverse('dcim-api:site-detail', kwargs={'pk': site1.pk})
-        self.add_permissions('dcim.change_site')
+        self.add_permissions('dcim.change_site', 'ipam.view_vlan')
 
         # Set related objects by PK
         data = {