Procházet zdrojové kódy

fix(api): Enforce Object Permissions for Nested Serializer input

Apply object-level view restrictions when nested Serializers resolve
related objects from REST API write input. Nested create and update
operations now resolve related objects from a permission-restricted
queryset, causing hidden related objects to fail validation the same as
nonexistent objects.

Fixes #21988
Martin Hauser před 2 týdny
rodič
revize
e597107b01

+ 19 - 4
netbox/core/tests/test_changelog.py

@@ -149,7 +149,14 @@ class ChangeLogViewTestCase(ModelViewTestCase):
             'path': self._get_url('delete', instance=site),
             'data': post_data({'confirm': True}),
         }
-        self.add_permissions('dcim.delete_site')
+        self.add_permissions(
+            'dcim.add_module',
+            'dcim.delete_site',
+            'dcim.view_device',
+            'dcim.view_modulebay',
+            'dcim.view_moduletype',
+            'extras.view_tag',
+        )
         response = self.client.post(**request)
         self.assertHttpStatus(response, 302)
 
@@ -449,7 +456,7 @@ class ChangeLogAPITestCase(APITestCase):
         }
         self.assertEqual(ObjectChange.objects.count(), 0)
         url = reverse('dcim-api:site-list')
-        self.add_permissions('dcim.add_site')
+        self.add_permissions('dcim.add_site', 'extras.view_tag')
 
         response = self.client.post(url, data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
@@ -481,7 +488,7 @@ class ChangeLogAPITestCase(APITestCase):
             ]
         }
         self.assertEqual(ObjectChange.objects.count(), 0)
-        self.add_permissions('dcim.change_site')
+        self.add_permissions('dcim.change_site', 'extras.view_tag')
         url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
 
         response = self.client.put(url, data, format='json', **self.header)
@@ -648,7 +655,15 @@ class ChangeLogAPITestCase(APITestCase):
         device = create_test_device('device1')
         module_bay = ModuleBay.objects.create(device=device, name='Module Bay 1')
         module_type = ModuleType.objects.create(manufacturer=Manufacturer.objects.first(), model='Module Type 1')
-        self.add_permissions('dcim.add_module', 'dcim.add_interface', 'dcim.delete_module')
+        self.add_permissions(
+            'dcim.add_module',
+            'dcim.add_interface',
+            'dcim.delete_module',
+            'dcim.view_device',
+            'dcim.view_module',
+            'dcim.view_modulebay',
+            'dcim.view_moduletype',
+        )
         self.assertEqual(ObjectChange.objects.count(), 0)  # Sanity check
 
         # Create a new Module

+ 4 - 4
netbox/dcim/api/serializers_/device_components.py

@@ -21,7 +21,7 @@ from dcim.models import (
 from ipam.api.serializers_.vlans import VLANSerializer, VLANTranslationPolicySerializer
 from ipam.api.serializers_.vrfs import VRFSerializer
 from ipam.models import VLAN
-from netbox.api.fields import ChoiceField, ContentTypeField, SerializedPKRelatedField
+from netbox.api.fields import ChoiceField, ContentTypeField, RestrictedPrimaryKeyRelatedField, SerializedPKRelatedField
 from netbox.api.gfk_fields import GFKSerializerField
 from netbox.api.serializers import NetBoxModelSerializer
 from users.api.serializers_.mixins import OwnerMixin
@@ -336,7 +336,7 @@ class RearPortMappingSerializer(serializers.ModelSerializer):
     position = serializers.IntegerField(
         source='rear_port_position'
     )
-    front_port = serializers.PrimaryKeyRelatedField(
+    front_port = RestrictedPrimaryKeyRelatedField(
         queryset=FrontPort.objects.all(),
     )
 
@@ -374,7 +374,7 @@ class FrontPortMappingSerializer(serializers.ModelSerializer):
     position = serializers.IntegerField(
         source='front_port_position'
     )
-    rear_port = serializers.PrimaryKeyRelatedField(
+    rear_port = RestrictedPrimaryKeyRelatedField(
         queryset=RearPort.objects.all(),
     )
 
@@ -450,7 +450,7 @@ class DeviceBaySerializer(OwnerMixin, NetBoxModelSerializer):
 
 class InventoryItemSerializer(OwnerMixin, NetBoxModelSerializer):
     device = DeviceSerializer(nested=True)
-    parent = serializers.PrimaryKeyRelatedField(queryset=InventoryItem.objects.all(), allow_null=True, default=None)
+    parent = RestrictedPrimaryKeyRelatedField(queryset=InventoryItem.objects.all(), allow_null=True, default=None)
     role = InventoryItemRoleSerializer(nested=True, required=False, allow_null=True)
     manufacturer = ManufacturerSerializer(nested=True, required=False, allow_null=True, default=None)
     component_type = ContentTypeField(

+ 4 - 4
netbox/dcim/api/serializers_/devicetype_components.py

@@ -16,7 +16,7 @@ from dcim.models import (
     PowerPortTemplate,
     RearPortTemplate,
 )
-from netbox.api.fields import ChoiceField, ContentTypeField
+from netbox.api.fields import ChoiceField, ContentTypeField, RestrictedPrimaryKeyRelatedField
 from netbox.api.gfk_fields import GFKSerializerField
 from netbox.api.serializers import ChangeLogMessageSerializer, ValidatedModelSerializer
 from wireless.choices import *
@@ -220,7 +220,7 @@ class RearPortTemplateMappingSerializer(serializers.ModelSerializer):
     position = serializers.IntegerField(
         source='rear_port_position'
     )
-    front_port = serializers.PrimaryKeyRelatedField(
+    front_port = RestrictedPrimaryKeyRelatedField(
         queryset=FrontPortTemplate.objects.all(),
     )
 
@@ -262,7 +262,7 @@ class FrontPortTemplateMappingSerializer(serializers.ModelSerializer):
     position = serializers.IntegerField(
         source='front_port_position'
     )
-    rear_port = serializers.PrimaryKeyRelatedField(
+    rear_port = RestrictedPrimaryKeyRelatedField(
         queryset=RearPortTemplate.objects.all(),
     )
 
@@ -341,7 +341,7 @@ class InventoryItemTemplateSerializer(ComponentTemplateSerializer):
     device_type = DeviceTypeSerializer(
         nested=True
     )
-    parent = serializers.PrimaryKeyRelatedField(
+    parent = RestrictedPrimaryKeyRelatedField(
         queryset=InventoryItemTemplate.objects.all(),
         allow_null=True,
         default=None

+ 28 - 3
netbox/dcim/tests/test_api.py

@@ -66,7 +66,10 @@ class Mixins:
             cable = Cable(a_terminations=[obj], b_terminations=[peer_obj], label='Cable 1')
             cable.save()
 
-            self.add_permissions(f'dcim.view_{self.model._meta.model_name}')
+            self.add_permissions(
+                f'dcim.view_{self.model._meta.model_name}',
+                f'dcim.view_{self.peer_termination_type._meta.model_name}',
+            )
             url = reverse(f'dcim-api:{self.model._meta.model_name}-trace', kwargs={'pk': obj.pk})
             response = self.client.get(url, **self.header)
 
@@ -1444,6 +1447,7 @@ class RearPortTemplateTestCase(APIViewTestCases.APIViewTestCase):
     bulk_update_data = {
         'description': 'New description',
     }
+    user_permissions = ('dcim.view_frontporttemplate', )
 
     @classmethod
     def setUpTestData(cls):
@@ -2624,7 +2628,7 @@ class InterfaceTestCase(Mixins.ComponentTraceMixin, APIViewTestCases.APIViewTest
         'description': 'New description',
     }
     peer_termination_type = Interface
-    user_permissions = ('dcim.view_device', )
+    user_permissions = ('dcim.view_device', 'ipam.view_vlan')
 
     @classmethod
     def setUpTestData(cls):
@@ -2944,6 +2948,25 @@ class FrontPortTestCase(APIViewTestCases.APIViewTestCase):
 
         self.assertHttpStatus(response, status.HTTP_200_OK)
 
+    def test_create_front_port_with_unviewable_rear_port_fails(self):
+        """A front port cannot be created against a rear port the user cannot view."""
+        self.remove_permissions('dcim.view_rearport')
+        self.add_permissions('dcim.add_frontport')
+
+        device = Device.objects.first()
+        rear_port = RearPort.objects.first()
+        data = {
+            'device': device.pk,
+            'name': 'Front Port Hidden',
+            'type': PortTypeChoices.TYPE_8P8C,
+            'rear_ports': [
+                {'position': 1, 'rear_port': rear_port.pk, 'rear_port_position': 1},
+            ],
+        }
+        response = self.client.post(self._get_list_url(), data, format='json', **self.header)
+        self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
+        self.assertIn('rear_ports', response.data)
+
 
 class RearPortTestCase(APIViewTestCases.APIViewTestCase):
     model = RearPort
@@ -2952,7 +2975,7 @@ class RearPortTestCase(APIViewTestCases.APIViewTestCase):
         'description': 'New description',
     }
     peer_termination_type = Interface
-    user_permissions = ('dcim.view_device', )
+    user_permissions = ('dcim.view_device', 'dcim.view_frontport')
 
     @classmethod
     def setUpTestData(cls):
@@ -3326,6 +3349,8 @@ class CableBundleTestCase(APIViewTestCases.APIViewTestCase):
 class CableTestCase(APIViewTestCases.APIViewTestCase):
     model = Cable
     brief_fields = ['description', 'display', 'id', 'label', 'url']
+    # Cable terminations are generic-FK references; view permission cannot be auto-derived
+    user_permissions = ('dcim.view_interface',)
     bulk_update_data = {
         'length': 100,
         'length_unit': 'm',

+ 3 - 0
netbox/extras/tests/test_api.py

@@ -596,6 +596,7 @@ class BookmarkTestCase(
 ):
     model = Bookmark
     brief_fields = ['display', 'id', 'object_id', 'object_type', 'url']
+    user_permissions = ('users.view_user',)
 
     @classmethod
     def setUpTestData(cls):
@@ -1417,6 +1418,7 @@ class SubscriptionTestCase(APIViewTestCases.APIViewTestCase):
     graphql_filter = {
         'id': {'lookup': 'gt', 'value': '0'},
     }
+    user_permissions = ('users.view_user',)
 
     @classmethod
     def setUpTestData(cls):
@@ -1553,6 +1555,7 @@ class NotificationGroupTestCase(APIViewTestCases.APIViewTestCase):
 class NotificationTestCase(APIViewTestCases.APIViewTestCase):
     model = Notification
     brief_fields = ['display', 'event_type', 'id', 'object_id', 'object_type', 'read', 'url', 'user']
+    user_permissions = ('users.view_user',)
     bulk_update_data = {
         'read': now(),
     }

+ 3 - 3
netbox/extras/tests/test_customfields.py

@@ -1065,7 +1065,7 @@ class CustomFieldAPITestCase(APITestCase):
             },
         }
         url = reverse('dcim-api:site-list')
-        self.add_permissions('dcim.add_site')
+        self.add_permissions('dcim.add_site', 'ipam.view_vlan')
 
         response = self.client.post(url, data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
@@ -1209,7 +1209,7 @@ class CustomFieldAPITestCase(APITestCase):
             },
         )
         url = reverse('dcim-api:site-list')
-        self.add_permissions('dcim.add_site')
+        self.add_permissions('dcim.add_site', 'ipam.view_vlan')
 
         response = self.client.post(url, data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
@@ -1389,7 +1389,7 @@ class CustomFieldAPITestCase(APITestCase):
         site1 = Site.objects.get(name='Site 1')
         vlans = VLAN.objects.all()[:3]
         url = reverse('dcim-api:site-detail', kwargs={'pk': site1.pk})
-        self.add_permissions('dcim.change_site')
+        self.add_permissions('dcim.change_site', 'ipam.view_vlan')
 
         # Set related objects by PK
         data = {

+ 6 - 1
netbox/extras/tests/test_event_rules.py

@@ -31,6 +31,11 @@ from utilities.testing.mixins import RQQueueTestMixin
 
 
 class EventRuleTestCase(RQQueueTestMixin, APITestCase):
+    user_permissions = (
+        'dcim.add_site',
+        'dcim.change_site',
+        'extras.view_tag',
+    )
 
     def setUp(self):
         super().setUp()
@@ -573,7 +578,7 @@ class EventRuleTestCase(RQQueueTestMixin, APITestCase):
             'b_terminations': [{'object_type': 'dcim.interface', 'object_id': interface_b.pk}],
         }
         url = reverse('dcim-api:cable-list')
-        self.add_permissions('dcim.add_cable')
+        self.add_permissions('dcim.add_cable', 'dcim.view_interface')
         response = self.client.post(url, data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
 

+ 2 - 2
netbox/extras/tests/test_tags.py

@@ -22,7 +22,7 @@ class TaggedItemTestCase(APITestCase):
             'tags': [t.pk for t in tags]
         }
         url = reverse('dcim-api:site-list')
-        self.add_permissions('dcim.add_site')
+        self.add_permissions('dcim.add_site', 'extras.view_tag')
 
         response = self.client.post(url, data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
@@ -50,7 +50,7 @@ class TaggedItemTestCase(APITestCase):
                 {"name": "New Tag"},
             ]
         }
-        self.add_permissions('dcim.change_site')
+        self.add_permissions('dcim.change_site', 'extras.view_tag')
         url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
 
         response = self.client.patch(url, data, format='json', **self.header)

+ 49 - 7
netbox/ipam/tests/test_api.py

@@ -6,10 +6,12 @@ from django.urls import reverse
 from netaddr import IPNetwork
 from rest_framework import status
 
+from core.models import ObjectType
 from dcim.models import Device, DeviceRole, DeviceType, Interface, Manufacturer, Site
 from ipam.choices import *
 from ipam.models import *
 from tenancy.models import Tenant
+from users.models import ObjectPermission
 from utilities.data import string_to_ranges
 from utilities.testing import APITestCase, APIViewTestCases, create_test_device, disable_logging
 
@@ -99,7 +101,7 @@ class ASNRangeTestCase(APIViewTestCases.APIViewTestCase):
         rir = RIR.objects.first()
         asnrange = ASNRange.objects.create(name='Range 1', slug='range-1', rir=rir, start=101, end=110)
         url = reverse('ipam-api:asnrange-available-asns', kwargs={'pk': asnrange.pk})
-        self.add_permissions('ipam.view_asnrange', 'ipam.add_asn')
+        self.add_permissions('ipam.view_asnrange', 'ipam.add_asn', 'ipam.view_rir')
 
         data = {
             'description': 'New ASN'
@@ -116,7 +118,7 @@ class ASNRangeTestCase(APIViewTestCases.APIViewTestCase):
         rir = RIR.objects.first()
         asnrange = ASNRange.objects.create(name='Range 1', slug='range-1', rir=rir, start=101, end=110)
         url = reverse('ipam-api:asnrange-available-asns', kwargs={'pk': asnrange.pk})
-        self.add_permissions('ipam.view_asnrange', 'ipam.add_asn')
+        self.add_permissions('ipam.view_asnrange', 'ipam.add_asn', 'ipam.view_rir')
 
         # Try to create eleven ASNs (only ten are available)
         data = [
@@ -488,7 +490,7 @@ class PrefixTestCase(APIViewTestCases.APIViewTestCase):
         vrf = VRF.objects.create(name='VRF 1')
         prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/28'), vrf=vrf, is_pool=True)
         url = reverse('ipam-api:prefix-available-prefixes', kwargs={'pk': prefix.pk})
-        self.add_permissions('ipam.view_prefix', 'ipam.add_prefix')
+        self.add_permissions('ipam.view_prefix', 'ipam.add_prefix', 'ipam.view_vrf')
 
         # Create four available prefixes with individual requests
         prefixes_to_be_created = [
@@ -525,7 +527,7 @@ class PrefixTestCase(APIViewTestCases.APIViewTestCase):
         vrf = VRF.objects.create(name='VRF 1')
         prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/28'), vrf=vrf, is_pool=True)
         url = reverse('ipam-api:prefix-available-prefixes', kwargs={'pk': prefix.pk})
-        self.add_permissions('ipam.view_prefix', 'ipam.add_prefix')
+        self.add_permissions('ipam.view_prefix', 'ipam.add_prefix', 'ipam.view_vrf')
 
         # Try to create five /30s (only four are available)
         data = [
@@ -576,7 +578,7 @@ class PrefixTestCase(APIViewTestCases.APIViewTestCase):
         vrf = VRF.objects.create(name='VRF 1')
         prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/30'), vrf=vrf, is_pool=True)
         url = reverse('ipam-api:prefix-available-ips', kwargs={'pk': prefix.pk})
-        self.add_permissions('ipam.view_prefix', 'ipam.add_ipaddress')
+        self.add_permissions('ipam.view_prefix', 'ipam.add_ipaddress', 'ipam.view_vrf')
 
         # Create all four available IPs with individual requests
         for i in range(1, 5):
@@ -600,7 +602,7 @@ class PrefixTestCase(APIViewTestCases.APIViewTestCase):
         vrf = VRF.objects.create(name='VRF 1')
         prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/29'), vrf=vrf, is_pool=True)
         url = reverse('ipam-api:prefix-available-ips', kwargs={'pk': prefix.pk})
-        self.add_permissions('ipam.view_prefix', 'ipam.add_ipaddress')
+        self.add_permissions('ipam.view_prefix', 'ipam.add_ipaddress', 'ipam.view_vrf')
 
         # Try to create nine IPs (only eight are available)
         data = [{'description': f'Test IP {i}'} for i in range(1, 10)]  # 9 IPs
@@ -727,7 +729,7 @@ class IPRangeTestCase(APIViewTestCases.APIViewTestCase):
             vrf=vrf
         )
         url = reverse('ipam-api:iprange-available-ips', kwargs={'pk': iprange.pk})
-        self.add_permissions('ipam.view_iprange', 'ipam.add_ipaddress')
+        self.add_permissions('ipam.view_iprange', 'ipam.add_ipaddress', 'ipam.view_vrf')
 
         # Create all three available IPs with individual requests
         for i in range(1, 4):
@@ -1501,3 +1503,43 @@ class ServiceTestCase(APIViewTestCases.APIViewTestCase):
                 'ports': [6],
             },
         ]
+
+
+class NestedObjectPermissionAPITest(APITestCase):
+    """
+    End-to-end: a constrained view permission on a related model is enforced when that object is
+    referenced via a nested serializer on write, so an object outside the user's constraint is
+    rejected as though it did not exist (the #21988 vector).
+    """
+    model = Prefix
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.tenants = Tenant.objects.bulk_create((
+            Tenant(name='Tenant 1', slug='tenant-1'),
+            Tenant(name='Tenant 2', slug='tenant-2'),
+        ))
+
+    def test_create_rejects_related_object_outside_constraint(self):
+        self.add_permissions('ipam.add_prefix')
+        obj_perm = ObjectPermission(
+            name='View Tenant 1 only', actions=['view'], constraints={'pk': self.tenants[0].pk}
+        )
+        obj_perm.save()
+        obj_perm.users.add(self.user)
+        obj_perm.object_types.add(ObjectType.objects.get_for_model(Tenant))
+
+        url = self._get_list_url()
+
+        # The viewable tenant resolves and the prefix is created
+        response = self.client.post(
+            url, {'prefix': '198.51.100.0/24', 'tenant': self.tenants[0].pk}, format='json', **self.header
+        )
+        self.assertHttpStatus(response, status.HTTP_201_CREATED)
+
+        # The tenant outside the constraint fails resolution as a nonexistent object would
+        response = self.client.post(
+            url, {'prefix': '198.51.101.0/24', 'tenant': self.tenants[1].pk}, format='json', **self.header
+        )
+        self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
+        self.assertIn('tenant', response.data)

+ 16 - 3
netbox/netbox/api/fields.py

@@ -10,6 +10,7 @@ from rest_framework.exceptions import ValidationError
 from rest_framework.relations import PrimaryKeyRelatedField, RelatedField
 
 from utilities.data import get_inclusive_integer_range_bounds
+from utilities.permissions import restrict_queryset
 
 __all__ = (
     'AttributesField',
@@ -18,6 +19,7 @@ __all__ = (
     'IPNetworkSerializer',
     'IntegerRangeSerializer',
     'RelatedObjectCountField',
+    'RestrictedPrimaryKeyRelatedField',
     'SerializedPKRelatedField',
 )
 
@@ -133,10 +135,21 @@ class IPNetworkSerializer(serializers.Serializer):
         return IPNetwork(value)
 
 
-class SerializedPKRelatedField(PrimaryKeyRelatedField):
+class RestrictedPrimaryKeyRelatedField(PrimaryKeyRelatedField):
     """
-    Extends PrimaryKeyRelatedField to return a serialized object on read. This is useful for representing related
-    objects in a ManyToManyField while still allowing a set of primary keys to be written.
+    PrimaryKeyRelatedField that resolves write input from a permission-restricted queryset, so a
+    referenced object the user cannot view fails validation as though it did not exist. It represents the
+    object by its bare PK on read; SerializedPKRelatedField extends it to return a serialized object instead.
+    """
+    def get_queryset(self):
+        return restrict_queryset(super().get_queryset(), self.context.get('request'))
+
+
+class SerializedPKRelatedField(RestrictedPrimaryKeyRelatedField):
+    """
+    Extends RestrictedPrimaryKeyRelatedField to return a serialized object on read, inheriting its
+    permission-restricted resolution of write input. This is useful for representing related objects in a
+    ManyToManyField while still allowing a set of primary keys to be written.
     """
     def __init__(self, serializer, nested=False, **kwargs):
         self.serializer = serializer

+ 12 - 3
netbox/netbox/api/serializers/base.py

@@ -5,6 +5,7 @@ from drf_spectacular.utils import extend_schema_field
 from rest_framework import serializers
 
 from utilities.api import get_related_object_by_attrs
+from utilities.permissions import restrict_queryset
 
 from .fields import NetBoxAPIHyperlinkedIdentityField, NetBoxURLHyperlinkedIdentityField
 
@@ -43,13 +44,21 @@ class BaseModelSerializer(serializers.ModelSerializer):
 
         super().__init__(*args, **kwargs)
 
-    def to_internal_value(self, data):
+    def get_related_object_queryset(self):
+        """
+        Return the queryset used to resolve a related object supplied via nested
+        serializer input.
+        """
+        return restrict_queryset(self.Meta.model.objects.all(), self.context.get('request'))
 
+    def to_internal_value(self, data):
+        """
+        Override to_internal_value() to handle nested serializer input.
+        """
         # If initialized as a nested serializer, we should expect to receive the attrs or PK
         # identifying a related object.
         if self.nested:
-            queryset = self.Meta.model.objects.all()
-            return get_related_object_by_attrs(queryset, data)
+            return get_related_object_by_attrs(self.get_related_object_queryset(), data)
 
         return super().to_internal_value(data)
 

+ 4 - 2
netbox/netbox/api/serializers/generic.py

@@ -4,8 +4,9 @@ from rest_framework import serializers
 
 from core.models import ObjectType
 from netbox.api.fields import ContentTypeField
-from utilities.api import get_serializer_for_model
+from utilities.api import get_related_object_by_attrs, get_serializer_for_model
 from utilities.object_types import object_type_identifier
+from utilities.permissions import restrict_queryset
 
 __all__ = (
     'GenericObjectSerializer',
@@ -25,7 +26,8 @@ class GenericObjectSerializer(serializers.Serializer):
     def to_internal_value(self, data):
         data = super().to_internal_value(data)
         model = data['object_type'].model_class()
-        return model.objects.get(pk=data['object_id'])
+        queryset = restrict_queryset(model.objects.all(), self.context.get('request'))
+        return get_related_object_by_attrs(queryset, data['object_id'])
 
     def to_representation(self, instance):
         object_type = ObjectType.objects.get_for_model(instance)

+ 1 - 2
netbox/netbox/api/serializers/nested.py

@@ -16,8 +16,7 @@ class WritableNestedSerializer(BaseModelSerializer):
     subclassed to return a full representation of the related object on read.
     """
     def to_internal_value(self, data):
-        queryset = self.Meta.model.objects.all()
-        return get_related_object_by_attrs(queryset, data)
+        return get_related_object_by_attrs(self.get_related_object_queryset(), data)
 
 
 # Declared here for use by PrimaryModelSerializer

+ 7 - 0
netbox/netbox/tests/test_authentication.py

@@ -611,6 +611,10 @@ class ObjectPermissionAPIViewTestCase(TestCase):
         }
         initial_count = Rack.objects.count()
 
+        # Permit resolving the related Site. This test is concerned with
+        # constrained Rack add permissions, not Site visibility.
+        self.add_permissions('dcim.view_site')
+
         # Attempt to create an object without permission
         response = self.client.post(url, data, format='json', **self.header)
         self.assertEqual(response.status_code, 403)
@@ -638,6 +642,9 @@ class ObjectPermissionAPIViewTestCase(TestCase):
 
     @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
     def test_edit_object(self):
+        # Permit resolving the related Site. This test is concerned with
+        # constrained Rack change permissions, not Site visibility.
+        self.add_permissions('dcim.view_site')
 
         # Attempt to edit an object without permission
         data = {'site': self.sites[0].pk}

+ 196 - 0
netbox/netbox/tests/test_serializers.py

@@ -0,0 +1,196 @@
+from django.core.exceptions import ValidationError
+from django.test import RequestFactory, TestCase
+from rest_framework.exceptions import ValidationError as DRFValidationError
+
+from core.models import ObjectType
+from netbox.api.fields import SerializedPKRelatedField
+from netbox.api.serializers import NetBoxModelSerializer, WritableNestedSerializer
+from netbox.api.serializers.generic import GenericObjectSerializer
+from netbox.tests.dummy_plugin.models import DummyNetBoxModel
+from users.models import ObjectPermission, User
+
+
+class NestedDummyNetBoxModelSerializer(NetBoxModelSerializer):
+    class Meta:
+        model = DummyNetBoxModel
+        fields = ['id', 'url', 'display_url', 'display']
+        brief_fields = ['id', 'display']
+
+
+class WritableNestedDummyNetBoxModelSerializer(WritableNestedSerializer):
+    class Meta:
+        model = DummyNetBoxModel
+        fields = ['id', 'display']
+
+
+class NestedRelatedObjectPermissionTest(TestCase):
+    """
+    Validate that nested related-object resolution honors object permissions.
+
+    This exercises the shared serializer behavior directly rather than relying
+    on any specific app's API endpoint.
+    """
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.user = User.objects.create_user(username='testuser')
+
+        cls.visible_object = DummyNetBoxModel.objects.create()
+        cls.hidden_object = DummyNetBoxModel.objects.create()
+
+        obj_perm = ObjectPermission(
+            name='View visible dummy object', actions=['view'], constraints={'pk': cls.visible_object.pk}
+        )
+        obj_perm.save()
+        obj_perm.users.add(cls.user)
+        obj_perm.object_types.add(ObjectType.objects.get_for_model(DummyNetBoxModel))
+
+    def setUp(self):
+        self.request = RequestFactory().get('/api/')
+        self.request.user = self.user
+
+    def test_nested_serializer_represents_visible_object(self):
+        serializer = NestedDummyNetBoxModelSerializer(nested=True, context={'request': self.request})
+
+        data = serializer.to_representation(self.visible_object)
+
+        self.assertEqual(data['id'], self.visible_object.pk)
+        self.assertIn('display', data)
+
+    def test_writable_nested_serializer_represents_visible_object(self):
+        serializer = WritableNestedDummyNetBoxModelSerializer(context={'request': self.request})
+
+        data = serializer.to_representation(self.visible_object)
+
+        self.assertEqual(data['id'], self.visible_object.pk)
+        self.assertIn('display', data)
+
+    def test_nested_serializer_resolves_visible_object_by_id(self):
+        serializer = NestedDummyNetBoxModelSerializer(nested=True, context={'request': self.request})
+
+        self.assertEqual(serializer.to_internal_value(self.visible_object.pk), self.visible_object)
+
+    def test_nested_serializer_rejects_hidden_object_by_id(self):
+        serializer = NestedDummyNetBoxModelSerializer(nested=True, context={'request': self.request})
+
+        with self.assertRaises(ValidationError):
+            serializer.to_internal_value(self.hidden_object.pk)
+
+    def test_nested_serializer_rejects_hidden_object_by_attrs(self):
+        serializer = NestedDummyNetBoxModelSerializer(nested=True, context={'request': self.request})
+
+        with self.assertRaises(ValidationError):
+            serializer.to_internal_value({'id': self.hidden_object.pk})
+
+    def test_writable_nested_serializer_resolves_visible_object_by_id(self):
+        serializer = WritableNestedDummyNetBoxModelSerializer(context={'request': self.request})
+
+        self.assertEqual(serializer.to_internal_value(self.visible_object.pk), self.visible_object)
+
+    def test_writable_nested_serializer_rejects_hidden_object_by_id(self):
+        serializer = WritableNestedDummyNetBoxModelSerializer(context={'request': self.request})
+
+        with self.assertRaises(ValidationError):
+            serializer.to_internal_value(self.hidden_object.pk)
+
+    def test_writable_nested_serializer_rejects_hidden_object_by_attrs(self):
+        serializer = WritableNestedDummyNetBoxModelSerializer(context={'request': self.request})
+
+        with self.assertRaises(ValidationError):
+            serializer.to_internal_value({'id': self.hidden_object.pk})
+
+    def test_nested_serializer_resolves_object_without_request_context(self):
+        serializer = NestedDummyNetBoxModelSerializer(nested=True)
+
+        self.assertEqual(serializer.to_internal_value(self.hidden_object.pk), self.hidden_object)
+
+    def test_writable_nested_serializer_resolves_object_without_request_context(self):
+        serializer = WritableNestedDummyNetBoxModelSerializer()
+
+        self.assertEqual(serializer.to_internal_value(self.hidden_object.pk), self.hidden_object)
+
+
+class SerializedPKRelatedFieldPermissionTest(TestCase):
+    """ManyToMany input resolution honors object permissions."""
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.user = User.objects.create_user(username='m2m_user')
+        cls.visible_object = DummyNetBoxModel.objects.create()
+        cls.hidden_object = DummyNetBoxModel.objects.create()
+
+        obj_perm = ObjectPermission(
+            name='View visible dummy object (m2m)', actions=['view'], constraints={'pk': cls.visible_object.pk}
+        )
+        obj_perm.save()
+        obj_perm.users.add(cls.user)
+        obj_perm.object_types.add(ObjectType.objects.get_for_model(DummyNetBoxModel))
+
+    def setUp(self):
+        self.request = RequestFactory().get('/api/')
+        self.request.user = self.user
+
+    def _bind_field(self, context):
+        field = SerializedPKRelatedField(
+            queryset=DummyNetBoxModel.objects.all(),
+            serializer=NestedDummyNetBoxModelSerializer,
+            required=False,
+        )
+        # A bound parent serializer supplies the request context to the field.
+        parent = NestedDummyNetBoxModelSerializer(nested=True, context=context)
+        field.bind('related', parent)
+        return field
+
+    def test_resolves_visible_object(self):
+        field = self._bind_field({'request': self.request})
+        self.assertEqual(field.to_internal_value(self.visible_object.pk), self.visible_object)
+
+    def test_rejects_hidden_object(self):
+        field = self._bind_field({'request': self.request})
+        with self.assertRaises(DRFValidationError):
+            field.to_internal_value(self.hidden_object.pk)
+
+    def test_resolves_object_without_request_context(self):
+        field = self._bind_field({})
+        self.assertEqual(field.to_internal_value(self.hidden_object.pk), self.hidden_object)
+
+
+class GenericObjectSerializerPermissionTest(TestCase):
+    """Writable generic-FK input resolution honors object permissions."""
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.user = User.objects.create_user(username='gfk_user')
+        cls.visible_object = DummyNetBoxModel.objects.create()
+        cls.hidden_object = DummyNetBoxModel.objects.create()
+        cls.object_type = ObjectType.objects.get_for_model(DummyNetBoxModel)
+
+        obj_perm = ObjectPermission(
+            name='View visible dummy object (gfk)', actions=['view'], constraints={'pk': cls.visible_object.pk}
+        )
+        obj_perm.save()
+        obj_perm.users.add(cls.user)
+        obj_perm.object_types.add(cls.object_type)
+
+    def setUp(self):
+        self.request = RequestFactory().get('/api/')
+        self.request.user = self.user
+
+    def _payload(self, obj):
+        return {
+            'object_type': f'{self.object_type.app_label}.{self.object_type.model}',
+            'object_id': obj.pk,
+        }
+
+    def test_resolves_visible_object(self):
+        serializer = GenericObjectSerializer(context={'request': self.request})
+        self.assertEqual(serializer.to_internal_value(self._payload(self.visible_object)), self.visible_object)
+
+    def test_rejects_hidden_object(self):
+        serializer = GenericObjectSerializer(context={'request': self.request})
+        with self.assertRaises(ValidationError):
+            serializer.to_internal_value(self._payload(self.hidden_object))
+
+    def test_resolves_object_without_request_context(self):
+        serializer = GenericObjectSerializer()
+        self.assertEqual(serializer.to_internal_value(self._payload(self.hidden_object)), self.hidden_object)

+ 1 - 1
netbox/users/tests/query_counts.json

@@ -9,6 +9,6 @@
   "ownergroup:list_objects_with_permission": 17,
   "token:api_list_objects": 10,
   "token:list_objects_with_permission": 17,
-  "user:api_list_objects": 12,
+  "user:api_list_objects": 13,
   "user:list_objects_with_permission": 17
 }

+ 19 - 2
netbox/users/tests/test_api.py

@@ -21,6 +21,9 @@ class UserTestCase(APIViewTestCases.APIViewTestCase):
     model = User
     brief_fields = ['display', 'id', 'url', 'username']
     validation_excluded_fields = ['password']
+    # 'permissions' is a SerializedPKRelatedField sourced from object_permissions, so its view perm
+    # cannot be auto-derived from the model field name
+    user_permissions = ('users.view_objectpermission',)
     bulk_update_data = {
         'email': 'test@example.com',
     }
@@ -140,6 +143,9 @@ class UserTestCase(APIViewTestCases.APIViewTestCase):
 class GroupTestCase(APIViewTestCases.APIViewTestCase):
     model = Group
     brief_fields = ['description', 'display', 'id', 'name', 'url']
+    # 'permissions' is a SerializedPKRelatedField sourced from object_permissions, so its view perm
+    # cannot be auto-derived from the model field name
+    user_permissions = ('users.view_objectpermission',)
 
     @classmethod
     def setUpTestData(cls):
@@ -199,6 +205,7 @@ class TokenTestCase(
 ):
     model = Token
     brief_fields = ['description', 'display', 'enabled', 'id', 'key', 'url', 'version', 'write_enabled']
+    user_permissions = ('users.view_user',)
     bulk_update_data = {
         'description': 'New description',
     }
@@ -207,7 +214,10 @@ class TokenTestCase(
         super().setUp()
 
         # Apply grant_token permission to enable the creation of Tokens for other Users
-        self.add_permissions('users.grant_token')
+        self.add_permissions(
+            'users.grant_token',
+            'users.view_user',
+        )
 
     @classmethod
     def setUpTestData(cls):
@@ -294,7 +304,10 @@ class TokenTestCase(
         # Clear grant_token permission assigned by setUpTestData
         ObjectPermission.objects.filter(users=self.user).delete()
 
-        self.add_permissions('users.add_token')
+        self.add_permissions(
+            'users.add_token',
+            'users.view_user',
+        )
         user2 = User.objects.create_user(username='testuser2')
         data = {
             'user': user2.id,
@@ -677,6 +690,10 @@ class OwnerGroupTestCase(APIViewTestCases.APIViewTestCase):
 
 class OwnerTestCase(APIViewTestCases.APIViewTestCase):
     model = Owner
+    user_permissions = (
+        'users.view_group',
+        'users.view_user',
+    )
     brief_fields = ['description', 'display', 'id', 'name', 'url']
 
     @classmethod

+ 16 - 0
netbox/utilities/permissions.py

@@ -15,6 +15,7 @@ __all__ = (
     'qs_filter_from_constraints',
     'resolve_permission',
     'resolve_permission_type',
+    'restrict_queryset',
 )
 
 
@@ -159,3 +160,18 @@ def qs_filter_from_constraints(constraints, tokens=None):
             return Q()
 
     return params
+
+
+def restrict_queryset(queryset, request, action='view'):
+    """
+    Restrict a queryset to the objects the request user may act on.
+
+    Used when resolving related objects from REST API write input so that an object the user
+    cannot view fails resolution exactly as a nonexistent object would. When no request context
+    is available (internal calls), or the queryset does not support restriction (models without a
+    RestrictedQuerySet manager have no object-level permissions to enforce), it is returned unchanged.
+    """
+    if request is not None and hasattr(queryset, 'restrict'):
+        # An unauthenticated request resolves no objects (restrict() returns none()).
+        return queryset.restrict(getattr(request, 'user', None), action)
+    return queryset

+ 9 - 0
netbox/utilities/testing/api.py

@@ -302,6 +302,8 @@ class APIViewTestCases:
             obj_perm.users.add(self.user)
             obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
 
+            self.add_related_view_permissions(self.create_data[0])
+
             data = copy.deepcopy(self.create_data[0])
 
             # If supported, add a changelog message
@@ -343,6 +345,8 @@ class APIViewTestCases:
             obj_perm.users.add(self.user)
             obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
 
+            self.add_related_view_permissions(*self.create_data)
+
             # If supported, add a changelog message
             changelog_message = get_random_string(10)
             if issubclass(self.model, ChangeLoggingMixin):
@@ -418,6 +422,8 @@ class APIViewTestCases:
             obj_perm.users.add(self.user)
             obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
 
+            self.add_related_view_permissions(update_data)
+
             data = copy.deepcopy(update_data)
 
             # If supported, add a changelog message
@@ -458,6 +464,7 @@ class APIViewTestCases:
             instance = self._get_queryset().first()
             url = self._get_detail_url(instance)
             update_data = self.update_data or getattr(self, 'create_data')[0]
+            self.add_related_view_permissions(update_data)
 
             # Fetch current ETag
             get_response = self.client.get(url, **self.header)
@@ -499,6 +506,8 @@ class APIViewTestCases:
             obj_perm.users.add(self.user)
             obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
 
+            self.add_related_view_permissions(self.bulk_update_data)
+
             id_list = list(self._get_queryset().values_list('id', flat=True)[:3])
             self.assertEqual(len(id_list), 3, "Insufficient number of objects to test bulk update")
             data = [

+ 24 - 1
netbox/utilities/testing/base.py

@@ -18,7 +18,7 @@ from core.models import ObjectType
 from users.models import ObjectPermission, User
 from utilities.data import ranges_to_string
 from utilities.object_types import object_type_identifier
-from utilities.permissions import resolve_permission_type
+from utilities.permissions import get_permission_for_model, resolve_permission_type
 
 from .utils import DUMMY_CF_DATA, extract_form_failures
 
@@ -138,6 +138,29 @@ class ModelTestCase(TestCase):
         """
         return self.model.objects.all()
 
+    def add_related_view_permissions(self, *payloads):
+        """
+        Grant the test user view permission on each permission-controlled related model referenced
+        by the given write payload(s), so nested related-object resolution succeeds while object-level
+        permission enforcement stays active (no global view exemption).
+
+        Foreign-key and many-to-many references are derived automatically from the payload. References
+        that cannot be derived from the model fields (e.g. generic-FK targets) should be granted via the
+        test case's user_permissions.
+        """
+        permissions = set()
+        for payload in payloads:
+            if not isinstance(payload, dict):
+                continue
+            for field in self.model._meta.get_fields():
+                related_model = getattr(field, 'related_model', None)
+                if field.name not in payload or related_model is None:
+                    continue
+                if hasattr(related_model.objects, 'restrict'):
+                    permissions.add(get_permission_for_model(related_model, 'view'))
+        if permissions:
+            self.add_permissions(*permissions)
+
     def prepare_instance(self, instance):
         """
         Test cases can override this method to perform any necessary manipulation of an instance prior to its evaluation

+ 3 - 3
netbox/utilities/tests/test_api.py

@@ -38,7 +38,7 @@ class WritableNestedSerializerTestCase(APITestCase):
             'site': self.site1.pk,
         }
         url = reverse('ipam-api:vlan-list')
-        self.add_permissions('ipam.add_vlan')
+        self.add_permissions('dcim.view_site', 'ipam.add_vlan')
 
         response = self.client.post(url, data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
@@ -70,7 +70,7 @@ class WritableNestedSerializerTestCase(APITestCase):
             },
         }
         url = reverse('ipam-api:vlan-list')
-        self.add_permissions('ipam.add_vlan')
+        self.add_permissions('dcim.view_site', 'ipam.add_vlan')
 
         response = self.client.post(url, data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
@@ -106,7 +106,7 @@ class WritableNestedSerializerTestCase(APITestCase):
             },
         }
         url = reverse('ipam-api:vlan-list')
-        self.add_permissions('ipam.add_vlan')
+        self.add_permissions('dcim.view_region', 'dcim.view_site', 'ipam.add_vlan')
 
         with disable_warnings('django.request'):
             response = self.client.post(url, data, format='json', **self.header)

+ 14 - 3
netbox/virtualization/tests/test_api.py

@@ -357,7 +357,12 @@ class VirtualMachineTestCase(APIViewTestCases.APIViewTestCase):
             'vcpus': None,
             'memory': None,
         }
-        self.add_permissions('virtualization.add_virtualmachine')
+        self.add_permissions(
+            'virtualization.add_virtualmachine',
+            'dcim.view_site',
+            'virtualization.view_cluster',
+            'virtualization.view_virtualmachinetype',
+        )
 
         response = self.client.post(self._get_list_url(), data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
@@ -378,7 +383,13 @@ class VirtualMachineTestCase(APIViewTestCases.APIViewTestCase):
             'vcpus': 6,
             'memory': 12288,
         }
-        self.add_permissions('virtualization.add_virtualmachine')
+        self.add_permissions(
+            'virtualization.add_virtualmachine',
+            'dcim.view_site',
+            'virtualization.view_cluster',
+            'virtualization.view_virtualmachinetype',
+            'dcim.view_platform',
+        )
 
         response = self.client.post(self._get_list_url(), data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
@@ -677,7 +688,7 @@ class VMInterfaceTestCase(APIViewTestCases.APIViewTestCase):
             },
         }
 
-        self.add_permissions('ipam.change_prefix')
+        self.add_permissions('ipam.change_prefix', 'virtualization.view_vminterface')
 
         response = self.client.patch(url, data, format='json', **self.header)
         self.assertEqual(response.status_code, 200)