Răsfoiți Sursa

Merge pull request #22013 from netbox-community/21988-authorization-bypass-in-nested-object-resolution-via

Fixes #21988: Enforce object permissions for nested related objects in the REST API
bctiemann 1 săptămână în urmă
părinte
comite
b3489cd529

+ 19 - 4
netbox/core/tests/test_changelog.py

@@ -149,7 +149,14 @@ class ChangeLogViewTestCase(ModelViewTestCase):
             'path': self._get_url('delete', instance=site),
             'path': self._get_url('delete', instance=site),
             'data': post_data({'confirm': True}),
             'data': post_data({'confirm': True}),
         }
         }
-        self.add_permissions('dcim.delete_site')
+        self.add_permissions(
+            'dcim.add_module',
+            'dcim.delete_site',
+            'dcim.view_device',
+            'dcim.view_modulebay',
+            'dcim.view_moduletype',
+            'extras.view_tag',
+        )
         response = self.client.post(**request)
         response = self.client.post(**request)
         self.assertHttpStatus(response, 302)
         self.assertHttpStatus(response, 302)
 
 
@@ -449,7 +456,7 @@ class ChangeLogAPITestCase(APITestCase):
         }
         }
         self.assertEqual(ObjectChange.objects.count(), 0)
         self.assertEqual(ObjectChange.objects.count(), 0)
         url = reverse('dcim-api:site-list')
         url = reverse('dcim-api:site-list')
-        self.add_permissions('dcim.add_site')
+        self.add_permissions('dcim.add_site', 'extras.view_tag')
 
 
         response = self.client.post(url, data, format='json', **self.header)
         response = self.client.post(url, data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
@@ -481,7 +488,7 @@ class ChangeLogAPITestCase(APITestCase):
             ]
             ]
         }
         }
         self.assertEqual(ObjectChange.objects.count(), 0)
         self.assertEqual(ObjectChange.objects.count(), 0)
-        self.add_permissions('dcim.change_site')
+        self.add_permissions('dcim.change_site', 'extras.view_tag')
         url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
         url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
 
 
         response = self.client.put(url, data, format='json', **self.header)
         response = self.client.put(url, data, format='json', **self.header)
@@ -648,7 +655,15 @@ class ChangeLogAPITestCase(APITestCase):
         device = create_test_device('device1')
         device = create_test_device('device1')
         module_bay = ModuleBay.objects.create(device=device, name='Module Bay 1')
         module_bay = ModuleBay.objects.create(device=device, name='Module Bay 1')
         module_type = ModuleType.objects.create(manufacturer=Manufacturer.objects.first(), model='Module Type 1')
         module_type = ModuleType.objects.create(manufacturer=Manufacturer.objects.first(), model='Module Type 1')
-        self.add_permissions('dcim.add_module', 'dcim.add_interface', 'dcim.delete_module')
+        self.add_permissions(
+            'dcim.add_module',
+            'dcim.add_interface',
+            'dcim.delete_module',
+            'dcim.view_device',
+            'dcim.view_module',
+            'dcim.view_modulebay',
+            'dcim.view_moduletype',
+        )
         self.assertEqual(ObjectChange.objects.count(), 0)  # Sanity check
         self.assertEqual(ObjectChange.objects.count(), 0)  # Sanity check
 
 
         # Create a new Module
         # Create a new Module

+ 4 - 4
netbox/dcim/api/serializers_/device_components.py

@@ -21,7 +21,7 @@ from dcim.models import (
 from ipam.api.serializers_.vlans import VLANSerializer, VLANTranslationPolicySerializer
 from ipam.api.serializers_.vlans import VLANSerializer, VLANTranslationPolicySerializer
 from ipam.api.serializers_.vrfs import VRFSerializer
 from ipam.api.serializers_.vrfs import VRFSerializer
 from ipam.models import VLAN
 from ipam.models import VLAN
-from netbox.api.fields import ChoiceField, ContentTypeField, SerializedPKRelatedField
+from netbox.api.fields import ChoiceField, ContentTypeField, RestrictedPrimaryKeyRelatedField, SerializedPKRelatedField
 from netbox.api.gfk_fields import GFKSerializerField
 from netbox.api.gfk_fields import GFKSerializerField
 from netbox.api.serializers import NetBoxModelSerializer
 from netbox.api.serializers import NetBoxModelSerializer
 from users.api.serializers_.mixins import OwnerMixin
 from users.api.serializers_.mixins import OwnerMixin
@@ -336,7 +336,7 @@ class RearPortMappingSerializer(serializers.ModelSerializer):
     position = serializers.IntegerField(
     position = serializers.IntegerField(
         source='rear_port_position'
         source='rear_port_position'
     )
     )
-    front_port = serializers.PrimaryKeyRelatedField(
+    front_port = RestrictedPrimaryKeyRelatedField(
         queryset=FrontPort.objects.all(),
         queryset=FrontPort.objects.all(),
     )
     )
 
 
@@ -374,7 +374,7 @@ class FrontPortMappingSerializer(serializers.ModelSerializer):
     position = serializers.IntegerField(
     position = serializers.IntegerField(
         source='front_port_position'
         source='front_port_position'
     )
     )
-    rear_port = serializers.PrimaryKeyRelatedField(
+    rear_port = RestrictedPrimaryKeyRelatedField(
         queryset=RearPort.objects.all(),
         queryset=RearPort.objects.all(),
     )
     )
 
 
@@ -450,7 +450,7 @@ class DeviceBaySerializer(OwnerMixin, NetBoxModelSerializer):
 
 
 class InventoryItemSerializer(OwnerMixin, NetBoxModelSerializer):
 class InventoryItemSerializer(OwnerMixin, NetBoxModelSerializer):
     device = DeviceSerializer(nested=True)
     device = DeviceSerializer(nested=True)
-    parent = serializers.PrimaryKeyRelatedField(queryset=InventoryItem.objects.all(), allow_null=True, default=None)
+    parent = RestrictedPrimaryKeyRelatedField(queryset=InventoryItem.objects.all(), allow_null=True, default=None)
     role = InventoryItemRoleSerializer(nested=True, required=False, allow_null=True)
     role = InventoryItemRoleSerializer(nested=True, required=False, allow_null=True)
     manufacturer = ManufacturerSerializer(nested=True, required=False, allow_null=True, default=None)
     manufacturer = ManufacturerSerializer(nested=True, required=False, allow_null=True, default=None)
     component_type = ContentTypeField(
     component_type = ContentTypeField(

+ 4 - 4
netbox/dcim/api/serializers_/devicetype_components.py

@@ -16,7 +16,7 @@ from dcim.models import (
     PowerPortTemplate,
     PowerPortTemplate,
     RearPortTemplate,
     RearPortTemplate,
 )
 )
-from netbox.api.fields import ChoiceField, ContentTypeField
+from netbox.api.fields import ChoiceField, ContentTypeField, RestrictedPrimaryKeyRelatedField
 from netbox.api.gfk_fields import GFKSerializerField
 from netbox.api.gfk_fields import GFKSerializerField
 from netbox.api.serializers import ChangeLogMessageSerializer, ValidatedModelSerializer
 from netbox.api.serializers import ChangeLogMessageSerializer, ValidatedModelSerializer
 from wireless.choices import *
 from wireless.choices import *
@@ -220,7 +220,7 @@ class RearPortTemplateMappingSerializer(serializers.ModelSerializer):
     position = serializers.IntegerField(
     position = serializers.IntegerField(
         source='rear_port_position'
         source='rear_port_position'
     )
     )
-    front_port = serializers.PrimaryKeyRelatedField(
+    front_port = RestrictedPrimaryKeyRelatedField(
         queryset=FrontPortTemplate.objects.all(),
         queryset=FrontPortTemplate.objects.all(),
     )
     )
 
 
@@ -262,7 +262,7 @@ class FrontPortTemplateMappingSerializer(serializers.ModelSerializer):
     position = serializers.IntegerField(
     position = serializers.IntegerField(
         source='front_port_position'
         source='front_port_position'
     )
     )
-    rear_port = serializers.PrimaryKeyRelatedField(
+    rear_port = RestrictedPrimaryKeyRelatedField(
         queryset=RearPortTemplate.objects.all(),
         queryset=RearPortTemplate.objects.all(),
     )
     )
 
 
@@ -341,7 +341,7 @@ class InventoryItemTemplateSerializer(ComponentTemplateSerializer):
     device_type = DeviceTypeSerializer(
     device_type = DeviceTypeSerializer(
         nested=True
         nested=True
     )
     )
-    parent = serializers.PrimaryKeyRelatedField(
+    parent = RestrictedPrimaryKeyRelatedField(
         queryset=InventoryItemTemplate.objects.all(),
         queryset=InventoryItemTemplate.objects.all(),
         allow_null=True,
         allow_null=True,
         default=None
         default=None

+ 28 - 3
netbox/dcim/tests/test_api.py

@@ -66,7 +66,10 @@ class Mixins:
             cable = Cable(a_terminations=[obj], b_terminations=[peer_obj], label='Cable 1')
             cable = Cable(a_terminations=[obj], b_terminations=[peer_obj], label='Cable 1')
             cable.save()
             cable.save()
 
 
-            self.add_permissions(f'dcim.view_{self.model._meta.model_name}')
+            self.add_permissions(
+                f'dcim.view_{self.model._meta.model_name}',
+                f'dcim.view_{self.peer_termination_type._meta.model_name}',
+            )
             url = reverse(f'dcim-api:{self.model._meta.model_name}-trace', kwargs={'pk': obj.pk})
             url = reverse(f'dcim-api:{self.model._meta.model_name}-trace', kwargs={'pk': obj.pk})
             response = self.client.get(url, **self.header)
             response = self.client.get(url, **self.header)
 
 
@@ -1444,6 +1447,7 @@ class RearPortTemplateTestCase(APIViewTestCases.APIViewTestCase):
     bulk_update_data = {
     bulk_update_data = {
         'description': 'New description',
         'description': 'New description',
     }
     }
+    user_permissions = ('dcim.view_frontporttemplate', )
 
 
     @classmethod
     @classmethod
     def setUpTestData(cls):
     def setUpTestData(cls):
@@ -2624,7 +2628,7 @@ class InterfaceTestCase(Mixins.ComponentTraceMixin, APIViewTestCases.APIViewTest
         'description': 'New description',
         'description': 'New description',
     }
     }
     peer_termination_type = Interface
     peer_termination_type = Interface
-    user_permissions = ('dcim.view_device', )
+    user_permissions = ('dcim.view_device', 'ipam.view_vlan')
 
 
     @classmethod
     @classmethod
     def setUpTestData(cls):
     def setUpTestData(cls):
@@ -2944,6 +2948,25 @@ class FrontPortTestCase(APIViewTestCases.APIViewTestCase):
 
 
         self.assertHttpStatus(response, status.HTTP_200_OK)
         self.assertHttpStatus(response, status.HTTP_200_OK)
 
 
+    def test_create_front_port_with_unviewable_rear_port_fails(self):
+        """A front port cannot be created against a rear port the user cannot view."""
+        self.remove_permissions('dcim.view_rearport')
+        self.add_permissions('dcim.add_frontport')
+
+        device = Device.objects.first()
+        rear_port = RearPort.objects.first()
+        data = {
+            'device': device.pk,
+            'name': 'Front Port Hidden',
+            'type': PortTypeChoices.TYPE_8P8C,
+            'rear_ports': [
+                {'position': 1, 'rear_port': rear_port.pk, 'rear_port_position': 1},
+            ],
+        }
+        response = self.client.post(self._get_list_url(), data, format='json', **self.header)
+        self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
+        self.assertIn('rear_ports', response.data)
+
 
 
 class RearPortTestCase(APIViewTestCases.APIViewTestCase):
 class RearPortTestCase(APIViewTestCases.APIViewTestCase):
     model = RearPort
     model = RearPort
@@ -2952,7 +2975,7 @@ class RearPortTestCase(APIViewTestCases.APIViewTestCase):
         'description': 'New description',
         'description': 'New description',
     }
     }
     peer_termination_type = Interface
     peer_termination_type = Interface
-    user_permissions = ('dcim.view_device', )
+    user_permissions = ('dcim.view_device', 'dcim.view_frontport')
 
 
     @classmethod
     @classmethod
     def setUpTestData(cls):
     def setUpTestData(cls):
@@ -3326,6 +3349,8 @@ class CableBundleTestCase(APIViewTestCases.APIViewTestCase):
 class CableTestCase(APIViewTestCases.APIViewTestCase):
 class CableTestCase(APIViewTestCases.APIViewTestCase):
     model = Cable
     model = Cable
     brief_fields = ['description', 'display', 'id', 'label', 'url']
     brief_fields = ['description', 'display', 'id', 'label', 'url']
+    # Cable terminations are generic-FK references; view permission cannot be auto-derived
+    user_permissions = ('dcim.view_interface',)
     bulk_update_data = {
     bulk_update_data = {
         'length': 100,
         'length': 100,
         'length_unit': 'm',
         'length_unit': 'm',

+ 3 - 0
netbox/extras/tests/test_api.py

@@ -596,6 +596,7 @@ class BookmarkTestCase(
 ):
 ):
     model = Bookmark
     model = Bookmark
     brief_fields = ['display', 'id', 'object_id', 'object_type', 'url']
     brief_fields = ['display', 'id', 'object_id', 'object_type', 'url']
+    user_permissions = ('users.view_user',)
 
 
     @classmethod
     @classmethod
     def setUpTestData(cls):
     def setUpTestData(cls):
@@ -1417,6 +1418,7 @@ class SubscriptionTestCase(APIViewTestCases.APIViewTestCase):
     graphql_filter = {
     graphql_filter = {
         'id': {'lookup': 'gt', 'value': '0'},
         'id': {'lookup': 'gt', 'value': '0'},
     }
     }
+    user_permissions = ('users.view_user',)
 
 
     @classmethod
     @classmethod
     def setUpTestData(cls):
     def setUpTestData(cls):
@@ -1553,6 +1555,7 @@ class NotificationGroupTestCase(APIViewTestCases.APIViewTestCase):
 class NotificationTestCase(APIViewTestCases.APIViewTestCase):
 class NotificationTestCase(APIViewTestCases.APIViewTestCase):
     model = Notification
     model = Notification
     brief_fields = ['display', 'event_type', 'id', 'object_id', 'object_type', 'read', 'url', 'user']
     brief_fields = ['display', 'event_type', 'id', 'object_id', 'object_type', 'read', 'url', 'user']
+    user_permissions = ('users.view_user',)
     bulk_update_data = {
     bulk_update_data = {
         'read': now(),
         'read': now(),
     }
     }

+ 3 - 3
netbox/extras/tests/test_customfields.py

@@ -1065,7 +1065,7 @@ class CustomFieldAPITestCase(APITestCase):
             },
             },
         }
         }
         url = reverse('dcim-api:site-list')
         url = reverse('dcim-api:site-list')
-        self.add_permissions('dcim.add_site')
+        self.add_permissions('dcim.add_site', 'ipam.view_vlan')
 
 
         response = self.client.post(url, data, format='json', **self.header)
         response = self.client.post(url, data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
@@ -1209,7 +1209,7 @@ class CustomFieldAPITestCase(APITestCase):
             },
             },
         )
         )
         url = reverse('dcim-api:site-list')
         url = reverse('dcim-api:site-list')
-        self.add_permissions('dcim.add_site')
+        self.add_permissions('dcim.add_site', 'ipam.view_vlan')
 
 
         response = self.client.post(url, data, format='json', **self.header)
         response = self.client.post(url, data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
@@ -1389,7 +1389,7 @@ class CustomFieldAPITestCase(APITestCase):
         site1 = Site.objects.get(name='Site 1')
         site1 = Site.objects.get(name='Site 1')
         vlans = VLAN.objects.all()[:3]
         vlans = VLAN.objects.all()[:3]
         url = reverse('dcim-api:site-detail', kwargs={'pk': site1.pk})
         url = reverse('dcim-api:site-detail', kwargs={'pk': site1.pk})
-        self.add_permissions('dcim.change_site')
+        self.add_permissions('dcim.change_site', 'ipam.view_vlan')
 
 
         # Set related objects by PK
         # Set related objects by PK
         data = {
         data = {

+ 6 - 1
netbox/extras/tests/test_event_rules.py

@@ -31,6 +31,11 @@ from utilities.testing.mixins import RQQueueTestMixin
 
 
 
 
 class EventRuleTestCase(RQQueueTestMixin, APITestCase):
 class EventRuleTestCase(RQQueueTestMixin, APITestCase):
+    user_permissions = (
+        'dcim.add_site',
+        'dcim.change_site',
+        'extras.view_tag',
+    )
 
 
     def setUp(self):
     def setUp(self):
         super().setUp()
         super().setUp()
@@ -573,7 +578,7 @@ class EventRuleTestCase(RQQueueTestMixin, APITestCase):
             'b_terminations': [{'object_type': 'dcim.interface', 'object_id': interface_b.pk}],
             'b_terminations': [{'object_type': 'dcim.interface', 'object_id': interface_b.pk}],
         }
         }
         url = reverse('dcim-api:cable-list')
         url = reverse('dcim-api:cable-list')
-        self.add_permissions('dcim.add_cable')
+        self.add_permissions('dcim.add_cable', 'dcim.view_interface')
         response = self.client.post(url, data, format='json', **self.header)
         response = self.client.post(url, data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
 
 

+ 2 - 2
netbox/extras/tests/test_tags.py

@@ -22,7 +22,7 @@ class TaggedItemTestCase(APITestCase):
             'tags': [t.pk for t in tags]
             'tags': [t.pk for t in tags]
         }
         }
         url = reverse('dcim-api:site-list')
         url = reverse('dcim-api:site-list')
-        self.add_permissions('dcim.add_site')
+        self.add_permissions('dcim.add_site', 'extras.view_tag')
 
 
         response = self.client.post(url, data, format='json', **self.header)
         response = self.client.post(url, data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
@@ -50,7 +50,7 @@ class TaggedItemTestCase(APITestCase):
                 {"name": "New Tag"},
                 {"name": "New Tag"},
             ]
             ]
         }
         }
-        self.add_permissions('dcim.change_site')
+        self.add_permissions('dcim.change_site', 'extras.view_tag')
         url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
         url = reverse('dcim-api:site-detail', kwargs={'pk': site.pk})
 
 
         response = self.client.patch(url, data, format='json', **self.header)
         response = self.client.patch(url, data, format='json', **self.header)

+ 49 - 7
netbox/ipam/tests/test_api.py

@@ -6,10 +6,12 @@ from django.urls import reverse
 from netaddr import IPNetwork
 from netaddr import IPNetwork
 from rest_framework import status
 from rest_framework import status
 
 
+from core.models import ObjectType
 from dcim.models import Device, DeviceRole, DeviceType, Interface, Manufacturer, Site
 from dcim.models import Device, DeviceRole, DeviceType, Interface, Manufacturer, Site
 from ipam.choices import *
 from ipam.choices import *
 from ipam.models import *
 from ipam.models import *
 from tenancy.models import Tenant
 from tenancy.models import Tenant
+from users.models import ObjectPermission
 from utilities.data import string_to_ranges
 from utilities.data import string_to_ranges
 from utilities.testing import APITestCase, APIViewTestCases, create_test_device, disable_logging
 from utilities.testing import APITestCase, APIViewTestCases, create_test_device, disable_logging
 
 
@@ -99,7 +101,7 @@ class ASNRangeTestCase(APIViewTestCases.APIViewTestCase):
         rir = RIR.objects.first()
         rir = RIR.objects.first()
         asnrange = ASNRange.objects.create(name='Range 1', slug='range-1', rir=rir, start=101, end=110)
         asnrange = ASNRange.objects.create(name='Range 1', slug='range-1', rir=rir, start=101, end=110)
         url = reverse('ipam-api:asnrange-available-asns', kwargs={'pk': asnrange.pk})
         url = reverse('ipam-api:asnrange-available-asns', kwargs={'pk': asnrange.pk})
-        self.add_permissions('ipam.view_asnrange', 'ipam.add_asn')
+        self.add_permissions('ipam.view_asnrange', 'ipam.add_asn', 'ipam.view_rir')
 
 
         data = {
         data = {
             'description': 'New ASN'
             'description': 'New ASN'
@@ -116,7 +118,7 @@ class ASNRangeTestCase(APIViewTestCases.APIViewTestCase):
         rir = RIR.objects.first()
         rir = RIR.objects.first()
         asnrange = ASNRange.objects.create(name='Range 1', slug='range-1', rir=rir, start=101, end=110)
         asnrange = ASNRange.objects.create(name='Range 1', slug='range-1', rir=rir, start=101, end=110)
         url = reverse('ipam-api:asnrange-available-asns', kwargs={'pk': asnrange.pk})
         url = reverse('ipam-api:asnrange-available-asns', kwargs={'pk': asnrange.pk})
-        self.add_permissions('ipam.view_asnrange', 'ipam.add_asn')
+        self.add_permissions('ipam.view_asnrange', 'ipam.add_asn', 'ipam.view_rir')
 
 
         # Try to create eleven ASNs (only ten are available)
         # Try to create eleven ASNs (only ten are available)
         data = [
         data = [
@@ -488,7 +490,7 @@ class PrefixTestCase(APIViewTestCases.APIViewTestCase):
         vrf = VRF.objects.create(name='VRF 1')
         vrf = VRF.objects.create(name='VRF 1')
         prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/28'), vrf=vrf, is_pool=True)
         prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/28'), vrf=vrf, is_pool=True)
         url = reverse('ipam-api:prefix-available-prefixes', kwargs={'pk': prefix.pk})
         url = reverse('ipam-api:prefix-available-prefixes', kwargs={'pk': prefix.pk})
-        self.add_permissions('ipam.view_prefix', 'ipam.add_prefix')
+        self.add_permissions('ipam.view_prefix', 'ipam.add_prefix', 'ipam.view_vrf')
 
 
         # Create four available prefixes with individual requests
         # Create four available prefixes with individual requests
         prefixes_to_be_created = [
         prefixes_to_be_created = [
@@ -525,7 +527,7 @@ class PrefixTestCase(APIViewTestCases.APIViewTestCase):
         vrf = VRF.objects.create(name='VRF 1')
         vrf = VRF.objects.create(name='VRF 1')
         prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/28'), vrf=vrf, is_pool=True)
         prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/28'), vrf=vrf, is_pool=True)
         url = reverse('ipam-api:prefix-available-prefixes', kwargs={'pk': prefix.pk})
         url = reverse('ipam-api:prefix-available-prefixes', kwargs={'pk': prefix.pk})
-        self.add_permissions('ipam.view_prefix', 'ipam.add_prefix')
+        self.add_permissions('ipam.view_prefix', 'ipam.add_prefix', 'ipam.view_vrf')
 
 
         # Try to create five /30s (only four are available)
         # Try to create five /30s (only four are available)
         data = [
         data = [
@@ -576,7 +578,7 @@ class PrefixTestCase(APIViewTestCases.APIViewTestCase):
         vrf = VRF.objects.create(name='VRF 1')
         vrf = VRF.objects.create(name='VRF 1')
         prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/30'), vrf=vrf, is_pool=True)
         prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/30'), vrf=vrf, is_pool=True)
         url = reverse('ipam-api:prefix-available-ips', kwargs={'pk': prefix.pk})
         url = reverse('ipam-api:prefix-available-ips', kwargs={'pk': prefix.pk})
-        self.add_permissions('ipam.view_prefix', 'ipam.add_ipaddress')
+        self.add_permissions('ipam.view_prefix', 'ipam.add_ipaddress', 'ipam.view_vrf')
 
 
         # Create all four available IPs with individual requests
         # Create all four available IPs with individual requests
         for i in range(1, 5):
         for i in range(1, 5):
@@ -600,7 +602,7 @@ class PrefixTestCase(APIViewTestCases.APIViewTestCase):
         vrf = VRF.objects.create(name='VRF 1')
         vrf = VRF.objects.create(name='VRF 1')
         prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/29'), vrf=vrf, is_pool=True)
         prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/29'), vrf=vrf, is_pool=True)
         url = reverse('ipam-api:prefix-available-ips', kwargs={'pk': prefix.pk})
         url = reverse('ipam-api:prefix-available-ips', kwargs={'pk': prefix.pk})
-        self.add_permissions('ipam.view_prefix', 'ipam.add_ipaddress')
+        self.add_permissions('ipam.view_prefix', 'ipam.add_ipaddress', 'ipam.view_vrf')
 
 
         # Try to create nine IPs (only eight are available)
         # Try to create nine IPs (only eight are available)
         data = [{'description': f'Test IP {i}'} for i in range(1, 10)]  # 9 IPs
         data = [{'description': f'Test IP {i}'} for i in range(1, 10)]  # 9 IPs
@@ -727,7 +729,7 @@ class IPRangeTestCase(APIViewTestCases.APIViewTestCase):
             vrf=vrf
             vrf=vrf
         )
         )
         url = reverse('ipam-api:iprange-available-ips', kwargs={'pk': iprange.pk})
         url = reverse('ipam-api:iprange-available-ips', kwargs={'pk': iprange.pk})
-        self.add_permissions('ipam.view_iprange', 'ipam.add_ipaddress')
+        self.add_permissions('ipam.view_iprange', 'ipam.add_ipaddress', 'ipam.view_vrf')
 
 
         # Create all three available IPs with individual requests
         # Create all three available IPs with individual requests
         for i in range(1, 4):
         for i in range(1, 4):
@@ -1501,3 +1503,43 @@ class ServiceTestCase(APIViewTestCases.APIViewTestCase):
                 'ports': [6],
                 'ports': [6],
             },
             },
         ]
         ]
+
+
+class NestedObjectPermissionAPITest(APITestCase):
+    """
+    End-to-end: a constrained view permission on a related model is enforced when that object is
+    referenced via a nested serializer on write, so an object outside the user's constraint is
+    rejected as though it did not exist (the #21988 vector).
+    """
+    model = Prefix
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.tenants = Tenant.objects.bulk_create((
+            Tenant(name='Tenant 1', slug='tenant-1'),
+            Tenant(name='Tenant 2', slug='tenant-2'),
+        ))
+
+    def test_create_rejects_related_object_outside_constraint(self):
+        self.add_permissions('ipam.add_prefix')
+        obj_perm = ObjectPermission(
+            name='View Tenant 1 only', actions=['view'], constraints={'pk': self.tenants[0].pk}
+        )
+        obj_perm.save()
+        obj_perm.users.add(self.user)
+        obj_perm.object_types.add(ObjectType.objects.get_for_model(Tenant))
+
+        url = self._get_list_url()
+
+        # The viewable tenant resolves and the prefix is created
+        response = self.client.post(
+            url, {'prefix': '198.51.100.0/24', 'tenant': self.tenants[0].pk}, format='json', **self.header
+        )
+        self.assertHttpStatus(response, status.HTTP_201_CREATED)
+
+        # The tenant outside the constraint fails resolution as a nonexistent object would
+        response = self.client.post(
+            url, {'prefix': '198.51.101.0/24', 'tenant': self.tenants[1].pk}, format='json', **self.header
+        )
+        self.assertHttpStatus(response, status.HTTP_400_BAD_REQUEST)
+        self.assertIn('tenant', response.data)

+ 16 - 3
netbox/netbox/api/fields.py

@@ -10,6 +10,7 @@ from rest_framework.exceptions import ValidationError
 from rest_framework.relations import PrimaryKeyRelatedField, RelatedField
 from rest_framework.relations import PrimaryKeyRelatedField, RelatedField
 
 
 from utilities.data import get_inclusive_integer_range_bounds
 from utilities.data import get_inclusive_integer_range_bounds
+from utilities.permissions import restrict_queryset
 
 
 __all__ = (
 __all__ = (
     'AttributesField',
     'AttributesField',
@@ -18,6 +19,7 @@ __all__ = (
     'IPNetworkSerializer',
     'IPNetworkSerializer',
     'IntegerRangeSerializer',
     'IntegerRangeSerializer',
     'RelatedObjectCountField',
     'RelatedObjectCountField',
+    'RestrictedPrimaryKeyRelatedField',
     'SerializedPKRelatedField',
     'SerializedPKRelatedField',
 )
 )
 
 
@@ -133,10 +135,21 @@ class IPNetworkSerializer(serializers.Serializer):
         return IPNetwork(value)
         return IPNetwork(value)
 
 
 
 
-class SerializedPKRelatedField(PrimaryKeyRelatedField):
+class RestrictedPrimaryKeyRelatedField(PrimaryKeyRelatedField):
     """
     """
-    Extends PrimaryKeyRelatedField to return a serialized object on read. This is useful for representing related
-    objects in a ManyToManyField while still allowing a set of primary keys to be written.
+    PrimaryKeyRelatedField that resolves write input from a permission-restricted queryset, so a
+    referenced object the user cannot view fails validation as though it did not exist. It represents the
+    object by its bare PK on read; SerializedPKRelatedField extends it to return a serialized object instead.
+    """
+    def get_queryset(self):
+        return restrict_queryset(super().get_queryset(), self.context.get('request'))
+
+
+class SerializedPKRelatedField(RestrictedPrimaryKeyRelatedField):
+    """
+    Extends RestrictedPrimaryKeyRelatedField to return a serialized object on read, inheriting its
+    permission-restricted resolution of write input. This is useful for representing related objects in a
+    ManyToManyField while still allowing a set of primary keys to be written.
     """
     """
     def __init__(self, serializer, nested=False, **kwargs):
     def __init__(self, serializer, nested=False, **kwargs):
         self.serializer = serializer
         self.serializer = serializer

+ 12 - 3
netbox/netbox/api/serializers/base.py

@@ -5,6 +5,7 @@ from drf_spectacular.utils import extend_schema_field
 from rest_framework import serializers
 from rest_framework import serializers
 
 
 from utilities.api import get_related_object_by_attrs
 from utilities.api import get_related_object_by_attrs
+from utilities.permissions import restrict_queryset
 
 
 from .fields import NetBoxAPIHyperlinkedIdentityField, NetBoxURLHyperlinkedIdentityField
 from .fields import NetBoxAPIHyperlinkedIdentityField, NetBoxURLHyperlinkedIdentityField
 
 
@@ -43,13 +44,21 @@ class BaseModelSerializer(serializers.ModelSerializer):
 
 
         super().__init__(*args, **kwargs)
         super().__init__(*args, **kwargs)
 
 
-    def to_internal_value(self, data):
+    def get_related_object_queryset(self):
+        """
+        Return the queryset used to resolve a related object supplied via nested
+        serializer input.
+        """
+        return restrict_queryset(self.Meta.model.objects.all(), self.context.get('request'))
 
 
+    def to_internal_value(self, data):
+        """
+        Override to_internal_value() to handle nested serializer input.
+        """
         # If initialized as a nested serializer, we should expect to receive the attrs or PK
         # If initialized as a nested serializer, we should expect to receive the attrs or PK
         # identifying a related object.
         # identifying a related object.
         if self.nested:
         if self.nested:
-            queryset = self.Meta.model.objects.all()
-            return get_related_object_by_attrs(queryset, data)
+            return get_related_object_by_attrs(self.get_related_object_queryset(), data)
 
 
         return super().to_internal_value(data)
         return super().to_internal_value(data)
 
 

+ 4 - 2
netbox/netbox/api/serializers/generic.py

@@ -4,8 +4,9 @@ from rest_framework import serializers
 
 
 from core.models import ObjectType
 from core.models import ObjectType
 from netbox.api.fields import ContentTypeField
 from netbox.api.fields import ContentTypeField
-from utilities.api import get_serializer_for_model
+from utilities.api import get_related_object_by_attrs, get_serializer_for_model
 from utilities.object_types import object_type_identifier
 from utilities.object_types import object_type_identifier
+from utilities.permissions import restrict_queryset
 
 
 __all__ = (
 __all__ = (
     'GenericObjectSerializer',
     'GenericObjectSerializer',
@@ -25,7 +26,8 @@ class GenericObjectSerializer(serializers.Serializer):
     def to_internal_value(self, data):
     def to_internal_value(self, data):
         data = super().to_internal_value(data)
         data = super().to_internal_value(data)
         model = data['object_type'].model_class()
         model = data['object_type'].model_class()
-        return model.objects.get(pk=data['object_id'])
+        queryset = restrict_queryset(model.objects.all(), self.context.get('request'))
+        return get_related_object_by_attrs(queryset, data['object_id'])
 
 
     def to_representation(self, instance):
     def to_representation(self, instance):
         object_type = ObjectType.objects.get_for_model(instance)
         object_type = ObjectType.objects.get_for_model(instance)

+ 1 - 2
netbox/netbox/api/serializers/nested.py

@@ -16,8 +16,7 @@ class WritableNestedSerializer(BaseModelSerializer):
     subclassed to return a full representation of the related object on read.
     subclassed to return a full representation of the related object on read.
     """
     """
     def to_internal_value(self, data):
     def to_internal_value(self, data):
-        queryset = self.Meta.model.objects.all()
-        return get_related_object_by_attrs(queryset, data)
+        return get_related_object_by_attrs(self.get_related_object_queryset(), data)
 
 
 
 
 # Declared here for use by PrimaryModelSerializer
 # Declared here for use by PrimaryModelSerializer

+ 7 - 0
netbox/netbox/tests/test_authentication.py

@@ -611,6 +611,10 @@ class ObjectPermissionAPIViewTestCase(TestCase):
         }
         }
         initial_count = Rack.objects.count()
         initial_count = Rack.objects.count()
 
 
+        # Permit resolving the related Site. This test is concerned with
+        # constrained Rack add permissions, not Site visibility.
+        self.add_permissions('dcim.view_site')
+
         # Attempt to create an object without permission
         # Attempt to create an object without permission
         response = self.client.post(url, data, format='json', **self.header)
         response = self.client.post(url, data, format='json', **self.header)
         self.assertEqual(response.status_code, 403)
         self.assertEqual(response.status_code, 403)
@@ -638,6 +642,9 @@ class ObjectPermissionAPIViewTestCase(TestCase):
 
 
     @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
     @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
     def test_edit_object(self):
     def test_edit_object(self):
+        # Permit resolving the related Site. This test is concerned with
+        # constrained Rack change permissions, not Site visibility.
+        self.add_permissions('dcim.view_site')
 
 
         # Attempt to edit an object without permission
         # Attempt to edit an object without permission
         data = {'site': self.sites[0].pk}
         data = {'site': self.sites[0].pk}

+ 196 - 0
netbox/netbox/tests/test_serializers.py

@@ -0,0 +1,196 @@
+from django.core.exceptions import ValidationError
+from django.test import RequestFactory, TestCase
+from rest_framework.exceptions import ValidationError as DRFValidationError
+
+from core.models import ObjectType
+from netbox.api.fields import SerializedPKRelatedField
+from netbox.api.serializers import NetBoxModelSerializer, WritableNestedSerializer
+from netbox.api.serializers.generic import GenericObjectSerializer
+from netbox.tests.dummy_plugin.models import DummyNetBoxModel
+from users.models import ObjectPermission, User
+
+
+class NestedDummyNetBoxModelSerializer(NetBoxModelSerializer):
+    class Meta:
+        model = DummyNetBoxModel
+        fields = ['id', 'url', 'display_url', 'display']
+        brief_fields = ['id', 'display']
+
+
+class WritableNestedDummyNetBoxModelSerializer(WritableNestedSerializer):
+    class Meta:
+        model = DummyNetBoxModel
+        fields = ['id', 'display']
+
+
+class NestedRelatedObjectPermissionTest(TestCase):
+    """
+    Validate that nested related-object resolution honors object permissions.
+
+    This exercises the shared serializer behavior directly rather than relying
+    on any specific app's API endpoint.
+    """
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.user = User.objects.create_user(username='testuser')
+
+        cls.visible_object = DummyNetBoxModel.objects.create()
+        cls.hidden_object = DummyNetBoxModel.objects.create()
+
+        obj_perm = ObjectPermission(
+            name='View visible dummy object', actions=['view'], constraints={'pk': cls.visible_object.pk}
+        )
+        obj_perm.save()
+        obj_perm.users.add(cls.user)
+        obj_perm.object_types.add(ObjectType.objects.get_for_model(DummyNetBoxModel))
+
+    def setUp(self):
+        self.request = RequestFactory().get('/api/')
+        self.request.user = self.user
+
+    def test_nested_serializer_represents_visible_object(self):
+        serializer = NestedDummyNetBoxModelSerializer(nested=True, context={'request': self.request})
+
+        data = serializer.to_representation(self.visible_object)
+
+        self.assertEqual(data['id'], self.visible_object.pk)
+        self.assertIn('display', data)
+
+    def test_writable_nested_serializer_represents_visible_object(self):
+        serializer = WritableNestedDummyNetBoxModelSerializer(context={'request': self.request})
+
+        data = serializer.to_representation(self.visible_object)
+
+        self.assertEqual(data['id'], self.visible_object.pk)
+        self.assertIn('display', data)
+
+    def test_nested_serializer_resolves_visible_object_by_id(self):
+        serializer = NestedDummyNetBoxModelSerializer(nested=True, context={'request': self.request})
+
+        self.assertEqual(serializer.to_internal_value(self.visible_object.pk), self.visible_object)
+
+    def test_nested_serializer_rejects_hidden_object_by_id(self):
+        serializer = NestedDummyNetBoxModelSerializer(nested=True, context={'request': self.request})
+
+        with self.assertRaises(ValidationError):
+            serializer.to_internal_value(self.hidden_object.pk)
+
+    def test_nested_serializer_rejects_hidden_object_by_attrs(self):
+        serializer = NestedDummyNetBoxModelSerializer(nested=True, context={'request': self.request})
+
+        with self.assertRaises(ValidationError):
+            serializer.to_internal_value({'id': self.hidden_object.pk})
+
+    def test_writable_nested_serializer_resolves_visible_object_by_id(self):
+        serializer = WritableNestedDummyNetBoxModelSerializer(context={'request': self.request})
+
+        self.assertEqual(serializer.to_internal_value(self.visible_object.pk), self.visible_object)
+
+    def test_writable_nested_serializer_rejects_hidden_object_by_id(self):
+        serializer = WritableNestedDummyNetBoxModelSerializer(context={'request': self.request})
+
+        with self.assertRaises(ValidationError):
+            serializer.to_internal_value(self.hidden_object.pk)
+
+    def test_writable_nested_serializer_rejects_hidden_object_by_attrs(self):
+        serializer = WritableNestedDummyNetBoxModelSerializer(context={'request': self.request})
+
+        with self.assertRaises(ValidationError):
+            serializer.to_internal_value({'id': self.hidden_object.pk})
+
+    def test_nested_serializer_resolves_object_without_request_context(self):
+        serializer = NestedDummyNetBoxModelSerializer(nested=True)
+
+        self.assertEqual(serializer.to_internal_value(self.hidden_object.pk), self.hidden_object)
+
+    def test_writable_nested_serializer_resolves_object_without_request_context(self):
+        serializer = WritableNestedDummyNetBoxModelSerializer()
+
+        self.assertEqual(serializer.to_internal_value(self.hidden_object.pk), self.hidden_object)
+
+
+class SerializedPKRelatedFieldPermissionTest(TestCase):
+    """ManyToMany input resolution honors object permissions."""
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.user = User.objects.create_user(username='m2m_user')
+        cls.visible_object = DummyNetBoxModel.objects.create()
+        cls.hidden_object = DummyNetBoxModel.objects.create()
+
+        obj_perm = ObjectPermission(
+            name='View visible dummy object (m2m)', actions=['view'], constraints={'pk': cls.visible_object.pk}
+        )
+        obj_perm.save()
+        obj_perm.users.add(cls.user)
+        obj_perm.object_types.add(ObjectType.objects.get_for_model(DummyNetBoxModel))
+
+    def setUp(self):
+        self.request = RequestFactory().get('/api/')
+        self.request.user = self.user
+
+    def _bind_field(self, context):
+        field = SerializedPKRelatedField(
+            queryset=DummyNetBoxModel.objects.all(),
+            serializer=NestedDummyNetBoxModelSerializer,
+            required=False,
+        )
+        # A bound parent serializer supplies the request context to the field.
+        parent = NestedDummyNetBoxModelSerializer(nested=True, context=context)
+        field.bind('related', parent)
+        return field
+
+    def test_resolves_visible_object(self):
+        field = self._bind_field({'request': self.request})
+        self.assertEqual(field.to_internal_value(self.visible_object.pk), self.visible_object)
+
+    def test_rejects_hidden_object(self):
+        field = self._bind_field({'request': self.request})
+        with self.assertRaises(DRFValidationError):
+            field.to_internal_value(self.hidden_object.pk)
+
+    def test_resolves_object_without_request_context(self):
+        field = self._bind_field({})
+        self.assertEqual(field.to_internal_value(self.hidden_object.pk), self.hidden_object)
+
+
+class GenericObjectSerializerPermissionTest(TestCase):
+    """Writable generic-FK input resolution honors object permissions."""
+
+    @classmethod
+    def setUpTestData(cls):
+        cls.user = User.objects.create_user(username='gfk_user')
+        cls.visible_object = DummyNetBoxModel.objects.create()
+        cls.hidden_object = DummyNetBoxModel.objects.create()
+        cls.object_type = ObjectType.objects.get_for_model(DummyNetBoxModel)
+
+        obj_perm = ObjectPermission(
+            name='View visible dummy object (gfk)', actions=['view'], constraints={'pk': cls.visible_object.pk}
+        )
+        obj_perm.save()
+        obj_perm.users.add(cls.user)
+        obj_perm.object_types.add(cls.object_type)
+
+    def setUp(self):
+        self.request = RequestFactory().get('/api/')
+        self.request.user = self.user
+
+    def _payload(self, obj):
+        return {
+            'object_type': f'{self.object_type.app_label}.{self.object_type.model}',
+            'object_id': obj.pk,
+        }
+
+    def test_resolves_visible_object(self):
+        serializer = GenericObjectSerializer(context={'request': self.request})
+        self.assertEqual(serializer.to_internal_value(self._payload(self.visible_object)), self.visible_object)
+
+    def test_rejects_hidden_object(self):
+        serializer = GenericObjectSerializer(context={'request': self.request})
+        with self.assertRaises(ValidationError):
+            serializer.to_internal_value(self._payload(self.hidden_object))
+
+    def test_resolves_object_without_request_context(self):
+        serializer = GenericObjectSerializer()
+        self.assertEqual(serializer.to_internal_value(self._payload(self.hidden_object)), self.hidden_object)

+ 1 - 1
netbox/users/tests/query_counts.json

@@ -9,6 +9,6 @@
   "ownergroup:list_objects_with_permission": 17,
   "ownergroup:list_objects_with_permission": 17,
   "token:api_list_objects": 10,
   "token:api_list_objects": 10,
   "token:list_objects_with_permission": 17,
   "token:list_objects_with_permission": 17,
-  "user:api_list_objects": 12,
+  "user:api_list_objects": 13,
   "user:list_objects_with_permission": 17
   "user:list_objects_with_permission": 17
 }
 }

+ 19 - 2
netbox/users/tests/test_api.py

@@ -21,6 +21,9 @@ class UserTestCase(APIViewTestCases.APIViewTestCase):
     model = User
     model = User
     brief_fields = ['display', 'id', 'url', 'username']
     brief_fields = ['display', 'id', 'url', 'username']
     validation_excluded_fields = ['password']
     validation_excluded_fields = ['password']
+    # 'permissions' is a SerializedPKRelatedField sourced from object_permissions, so its view perm
+    # cannot be auto-derived from the model field name
+    user_permissions = ('users.view_objectpermission',)
     bulk_update_data = {
     bulk_update_data = {
         'email': 'test@example.com',
         'email': 'test@example.com',
     }
     }
@@ -140,6 +143,9 @@ class UserTestCase(APIViewTestCases.APIViewTestCase):
 class GroupTestCase(APIViewTestCases.APIViewTestCase):
 class GroupTestCase(APIViewTestCases.APIViewTestCase):
     model = Group
     model = Group
     brief_fields = ['description', 'display', 'id', 'name', 'url']
     brief_fields = ['description', 'display', 'id', 'name', 'url']
+    # 'permissions' is a SerializedPKRelatedField sourced from object_permissions, so its view perm
+    # cannot be auto-derived from the model field name
+    user_permissions = ('users.view_objectpermission',)
 
 
     @classmethod
     @classmethod
     def setUpTestData(cls):
     def setUpTestData(cls):
@@ -199,6 +205,7 @@ class TokenTestCase(
 ):
 ):
     model = Token
     model = Token
     brief_fields = ['description', 'display', 'enabled', 'id', 'key', 'url', 'version', 'write_enabled']
     brief_fields = ['description', 'display', 'enabled', 'id', 'key', 'url', 'version', 'write_enabled']
+    user_permissions = ('users.view_user',)
     bulk_update_data = {
     bulk_update_data = {
         'description': 'New description',
         'description': 'New description',
     }
     }
@@ -207,7 +214,10 @@ class TokenTestCase(
         super().setUp()
         super().setUp()
 
 
         # Apply grant_token permission to enable the creation of Tokens for other Users
         # Apply grant_token permission to enable the creation of Tokens for other Users
-        self.add_permissions('users.grant_token')
+        self.add_permissions(
+            'users.grant_token',
+            'users.view_user',
+        )
 
 
     @classmethod
     @classmethod
     def setUpTestData(cls):
     def setUpTestData(cls):
@@ -294,7 +304,10 @@ class TokenTestCase(
         # Clear grant_token permission assigned by setUpTestData
         # Clear grant_token permission assigned by setUpTestData
         ObjectPermission.objects.filter(users=self.user).delete()
         ObjectPermission.objects.filter(users=self.user).delete()
 
 
-        self.add_permissions('users.add_token')
+        self.add_permissions(
+            'users.add_token',
+            'users.view_user',
+        )
         user2 = User.objects.create_user(username='testuser2')
         user2 = User.objects.create_user(username='testuser2')
         data = {
         data = {
             'user': user2.id,
             'user': user2.id,
@@ -677,6 +690,10 @@ class OwnerGroupTestCase(APIViewTestCases.APIViewTestCase):
 
 
 class OwnerTestCase(APIViewTestCases.APIViewTestCase):
 class OwnerTestCase(APIViewTestCases.APIViewTestCase):
     model = Owner
     model = Owner
+    user_permissions = (
+        'users.view_group',
+        'users.view_user',
+    )
     brief_fields = ['description', 'display', 'id', 'name', 'url']
     brief_fields = ['description', 'display', 'id', 'name', 'url']
 
 
     @classmethod
     @classmethod

+ 16 - 0
netbox/utilities/permissions.py

@@ -15,6 +15,7 @@ __all__ = (
     'qs_filter_from_constraints',
     'qs_filter_from_constraints',
     'resolve_permission',
     'resolve_permission',
     'resolve_permission_type',
     'resolve_permission_type',
+    'restrict_queryset',
 )
 )
 
 
 
 
@@ -159,3 +160,18 @@ def qs_filter_from_constraints(constraints, tokens=None):
             return Q()
             return Q()
 
 
     return params
     return params
+
+
+def restrict_queryset(queryset, request, action='view'):
+    """
+    Restrict a queryset to the objects the request user may act on.
+
+    Used when resolving related objects from REST API write input so that an object the user
+    cannot view fails resolution exactly as a nonexistent object would. When no request context
+    is available (internal calls), or the queryset does not support restriction (models without a
+    RestrictedQuerySet manager have no object-level permissions to enforce), it is returned unchanged.
+    """
+    if request is not None and hasattr(queryset, 'restrict'):
+        # An unauthenticated request resolves no objects (restrict() returns none()).
+        return queryset.restrict(getattr(request, 'user', None), action)
+    return queryset

+ 9 - 0
netbox/utilities/testing/api.py

@@ -302,6 +302,8 @@ class APIViewTestCases:
             obj_perm.users.add(self.user)
             obj_perm.users.add(self.user)
             obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
             obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
 
 
+            self.add_related_view_permissions(self.create_data[0])
+
             data = copy.deepcopy(self.create_data[0])
             data = copy.deepcopy(self.create_data[0])
 
 
             # If supported, add a changelog message
             # If supported, add a changelog message
@@ -343,6 +345,8 @@ class APIViewTestCases:
             obj_perm.users.add(self.user)
             obj_perm.users.add(self.user)
             obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
             obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
 
 
+            self.add_related_view_permissions(*self.create_data)
+
             # If supported, add a changelog message
             # If supported, add a changelog message
             changelog_message = get_random_string(10)
             changelog_message = get_random_string(10)
             if issubclass(self.model, ChangeLoggingMixin):
             if issubclass(self.model, ChangeLoggingMixin):
@@ -418,6 +422,8 @@ class APIViewTestCases:
             obj_perm.users.add(self.user)
             obj_perm.users.add(self.user)
             obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
             obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
 
 
+            self.add_related_view_permissions(update_data)
+
             data = copy.deepcopy(update_data)
             data = copy.deepcopy(update_data)
 
 
             # If supported, add a changelog message
             # If supported, add a changelog message
@@ -458,6 +464,7 @@ class APIViewTestCases:
             instance = self._get_queryset().first()
             instance = self._get_queryset().first()
             url = self._get_detail_url(instance)
             url = self._get_detail_url(instance)
             update_data = self.update_data or getattr(self, 'create_data')[0]
             update_data = self.update_data or getattr(self, 'create_data')[0]
+            self.add_related_view_permissions(update_data)
 
 
             # Fetch current ETag
             # Fetch current ETag
             get_response = self.client.get(url, **self.header)
             get_response = self.client.get(url, **self.header)
@@ -499,6 +506,8 @@ class APIViewTestCases:
             obj_perm.users.add(self.user)
             obj_perm.users.add(self.user)
             obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
             obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
 
 
+            self.add_related_view_permissions(self.bulk_update_data)
+
             id_list = list(self._get_queryset().values_list('id', flat=True)[:3])
             id_list = list(self._get_queryset().values_list('id', flat=True)[:3])
             self.assertEqual(len(id_list), 3, "Insufficient number of objects to test bulk update")
             self.assertEqual(len(id_list), 3, "Insufficient number of objects to test bulk update")
             data = [
             data = [

+ 24 - 1
netbox/utilities/testing/base.py

@@ -18,7 +18,7 @@ from core.models import ObjectType
 from users.models import ObjectPermission, User
 from users.models import ObjectPermission, User
 from utilities.data import ranges_to_string
 from utilities.data import ranges_to_string
 from utilities.object_types import object_type_identifier
 from utilities.object_types import object_type_identifier
-from utilities.permissions import resolve_permission_type
+from utilities.permissions import get_permission_for_model, resolve_permission_type
 
 
 from .utils import DUMMY_CF_DATA, extract_form_failures
 from .utils import DUMMY_CF_DATA, extract_form_failures
 
 
@@ -138,6 +138,29 @@ class ModelTestCase(TestCase):
         """
         """
         return self.model.objects.all()
         return self.model.objects.all()
 
 
+    def add_related_view_permissions(self, *payloads):
+        """
+        Grant the test user view permission on each permission-controlled related model referenced
+        by the given write payload(s), so nested related-object resolution succeeds while object-level
+        permission enforcement stays active (no global view exemption).
+
+        Foreign-key and many-to-many references are derived automatically from the payload. References
+        that cannot be derived from the model fields (e.g. generic-FK targets) should be granted via the
+        test case's user_permissions.
+        """
+        permissions = set()
+        for payload in payloads:
+            if not isinstance(payload, dict):
+                continue
+            for field in self.model._meta.get_fields():
+                related_model = getattr(field, 'related_model', None)
+                if field.name not in payload or related_model is None:
+                    continue
+                if hasattr(related_model.objects, 'restrict'):
+                    permissions.add(get_permission_for_model(related_model, 'view'))
+        if permissions:
+            self.add_permissions(*permissions)
+
     def prepare_instance(self, instance):
     def prepare_instance(self, instance):
         """
         """
         Test cases can override this method to perform any necessary manipulation of an instance prior to its evaluation
         Test cases can override this method to perform any necessary manipulation of an instance prior to its evaluation

+ 3 - 3
netbox/utilities/tests/test_api.py

@@ -38,7 +38,7 @@ class WritableNestedSerializerTestCase(APITestCase):
             'site': self.site1.pk,
             'site': self.site1.pk,
         }
         }
         url = reverse('ipam-api:vlan-list')
         url = reverse('ipam-api:vlan-list')
-        self.add_permissions('ipam.add_vlan')
+        self.add_permissions('dcim.view_site', 'ipam.add_vlan')
 
 
         response = self.client.post(url, data, format='json', **self.header)
         response = self.client.post(url, data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
@@ -70,7 +70,7 @@ class WritableNestedSerializerTestCase(APITestCase):
             },
             },
         }
         }
         url = reverse('ipam-api:vlan-list')
         url = reverse('ipam-api:vlan-list')
-        self.add_permissions('ipam.add_vlan')
+        self.add_permissions('dcim.view_site', 'ipam.add_vlan')
 
 
         response = self.client.post(url, data, format='json', **self.header)
         response = self.client.post(url, data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
@@ -106,7 +106,7 @@ class WritableNestedSerializerTestCase(APITestCase):
             },
             },
         }
         }
         url = reverse('ipam-api:vlan-list')
         url = reverse('ipam-api:vlan-list')
-        self.add_permissions('ipam.add_vlan')
+        self.add_permissions('dcim.view_region', 'dcim.view_site', 'ipam.add_vlan')
 
 
         with disable_warnings('django.request'):
         with disable_warnings('django.request'):
             response = self.client.post(url, data, format='json', **self.header)
             response = self.client.post(url, data, format='json', **self.header)

+ 14 - 3
netbox/virtualization/tests/test_api.py

@@ -357,7 +357,12 @@ class VirtualMachineTestCase(APIViewTestCases.APIViewTestCase):
             'vcpus': None,
             'vcpus': None,
             'memory': None,
             'memory': None,
         }
         }
-        self.add_permissions('virtualization.add_virtualmachine')
+        self.add_permissions(
+            'virtualization.add_virtualmachine',
+            'dcim.view_site',
+            'virtualization.view_cluster',
+            'virtualization.view_virtualmachinetype',
+        )
 
 
         response = self.client.post(self._get_list_url(), data, format='json', **self.header)
         response = self.client.post(self._get_list_url(), data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
@@ -378,7 +383,13 @@ class VirtualMachineTestCase(APIViewTestCases.APIViewTestCase):
             'vcpus': 6,
             'vcpus': 6,
             'memory': 12288,
             'memory': 12288,
         }
         }
-        self.add_permissions('virtualization.add_virtualmachine')
+        self.add_permissions(
+            'virtualization.add_virtualmachine',
+            'dcim.view_site',
+            'virtualization.view_cluster',
+            'virtualization.view_virtualmachinetype',
+            'dcim.view_platform',
+        )
 
 
         response = self.client.post(self._get_list_url(), data, format='json', **self.header)
         response = self.client.post(self._get_list_url(), data, format='json', **self.header)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
         self.assertHttpStatus(response, status.HTTP_201_CREATED)
@@ -677,7 +688,7 @@ class VMInterfaceTestCase(APIViewTestCases.APIViewTestCase):
             },
             },
         }
         }
 
 
-        self.add_permissions('ipam.change_prefix')
+        self.add_permissions('ipam.change_prefix', 'virtualization.view_vminterface')
 
 
         response = self.client.patch(url, data, format='json', **self.header)
         response = self.client.patch(url, data, format='json', **self.header)
         self.assertEqual(response.status_code, 200)
         self.assertEqual(response.status_code, 200)