Просмотр исходного кода

Add object permission tests for get and list API views

Jeremy Stretch 5 лет назад
Родитель
Сommit
8c40148ca7
2 измененных файлов с 128 добавлено и 3 удалено
  1. 121 0
      netbox/netbox/tests/test_authentication.py
  2. 7 3
      netbox/utilities/api.py

+ 121 - 0
netbox/netbox/tests/test_authentication.py

@@ -371,3 +371,124 @@ class ObjectPermissionViewTestCase(TestCase):
         response = self.client.post(**request)
         self.assertHttpStatus(response, 404)
         self.assertTrue(Prefix.objects.filter(pk=self.prefixes[3].pk).exists())
+
+
+class ObjectPermissionAPIViewTestCase(TestCase):
+    client_class = APIClient
+
+    @classmethod
+    def setUpTestData(cls):
+
+        cls.sites = (
+            Site(name='Site 1', slug='site-1'),
+            Site(name='Site 2', slug='site-2'),
+            Site(name='Site 3', slug='site-3'),
+        )
+        Site.objects.bulk_create(cls.sites)
+
+        cls.prefixes = (
+            Prefix(prefix=IPNetwork('10.0.0.0/24'), site=cls.sites[0]),
+            Prefix(prefix=IPNetwork('10.0.1.0/24'), site=cls.sites[0]),
+            Prefix(prefix=IPNetwork('10.0.2.0/24'), site=cls.sites[0]),
+            Prefix(prefix=IPNetwork('10.0.3.0/24'), site=cls.sites[1]),
+            Prefix(prefix=IPNetwork('10.0.4.0/24'), site=cls.sites[1]),
+            Prefix(prefix=IPNetwork('10.0.5.0/24'), site=cls.sites[1]),
+            Prefix(prefix=IPNetwork('10.0.6.0/24'), site=cls.sites[2]),
+            Prefix(prefix=IPNetwork('10.0.7.0/24'), site=cls.sites[2]),
+            Prefix(prefix=IPNetwork('10.0.8.0/24'), site=cls.sites[2]),
+        )
+        Prefix.objects.bulk_create(cls.prefixes)
+
+    def setUp(self):
+        """
+        Create a test user and token for API calls.
+        """
+        self.user = User.objects.create(username='testuser')
+        self.token = Token.objects.create(user=self.user)
+        self.header = {'HTTP_AUTHORIZATION': 'Token {}'.format(self.token.key)}
+
+    @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
+    def test_get_object(self):
+
+        # Attempt to retrieve object without permission
+        url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk})
+        response = self.client.get(url, **self.header)
+        self.assertEqual(response.status_code, 403)
+
+        # Assign object permission
+        obj_perm = ObjectPermission(
+            model=ContentType.objects.get_for_model(Prefix),
+            attrs={
+                'site__name': 'Site 1',
+            },
+            can_view=True
+        )
+        obj_perm.save()
+        obj_perm.users.add(self.user)
+
+        # Retrieve permitted object
+        url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk})
+        response = self.client.get(url, **self.header)
+        self.assertEqual(response.status_code, 200)
+
+        # Attempt to retrieve non-permitted object
+        url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[3].pk})
+        response = self.client.get(url, **self.header)
+        self.assertEqual(response.status_code, 404)
+
+    @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
+    def test_list_objects(self):
+        url = reverse('ipam-api:prefix-list')
+
+        # Attempt to list objects without permission
+        response = self.client.get(url, **self.header)
+        self.assertEqual(response.status_code, 403)
+
+        # Assign object permission
+        obj_perm = ObjectPermission(
+            model=ContentType.objects.get_for_model(Prefix),
+            attrs={
+                'site__name': 'Site 1',
+            },
+            can_view=True
+        )
+        obj_perm.save()
+        obj_perm.users.add(self.user)
+
+        # Retrieve all objects. Only permitted objects should be returned.
+        response = self.client.get(url, **self.header)
+        self.assertEqual(response.status_code, 200)
+        self.assertEqual(response.data['count'], 3)
+
+    # TODO
+    # @override_settings(EXEMPT_VIEW_PERMISSIONS=[])
+    # def test_create_object(self):
+    #     url = reverse('ipam-api:prefix-list')
+    #     data = {
+    #         'prefix': '10.0.9.0/24',
+    #         'site': self.sites[1].pk,
+    #     }
+    #     initial_count = Prefix.objects.count()
+    #
+    #     # Attempt to create an object without permission
+    #     response = self.client.post(url, data, format='json', **self.header)
+    #     self.assertEqual(response.status_code, 403)
+    #
+    #     # Assign object permission
+    #     obj_perm = ObjectPermission(
+    #         model=ContentType.objects.get_for_model(Prefix),
+    #         attrs={'site__name': 'Site 1'},
+    #         can_view=True
+    #     )
+    #     obj_perm.save()
+    #     obj_perm.users.add(self.user)
+    #
+    #     # Attempt to create a non-permitted object
+    #     response = self.client.post(url, data, format='json', **self.header)
+    #     self.assertEqual(response.status_code, 403)
+    #     self.assertEqual(Prefix.objects.count(), initial_count)
+    #
+    #     # Create a permitted object
+    #     response = self.client.post(url, data, format='json', **self.header)
+    #     self.assertEqual(response.status_code, 200)
+    #     self.assertEqual(Prefix.objects.count(), initial_count + 1)

+ 7 - 3
netbox/utilities/api.py

@@ -329,11 +329,15 @@ class ModelViewSet(_ModelViewSet):
         if not request.user.is_authenticated or request.user.is_superuser:
             return
 
-        permission_required = 'dcim.view_site'
+        # Determine the required permission
+        permission_required = "{}.view_{}".format(
+            self.queryset.model._meta.app_label,
+            self.queryset.model._meta.model_name
+        )
 
         # Enforce object-level permissions
-        if permission_required not in self.request.user._perm_cache:
-            attrs = ObjectPermission.objects.get_attr_constraints(self.request.user, permission_required)
+        if permission_required not in {*request.user._user_perm_cache, *request.user._group_perm_cache}:
+            attrs = ObjectPermission.objects.get_attr_constraints(request.user, permission_required)
             if attrs:
                 # Update the view's QuerySet to filter only the permitted objects
                 self.queryset = self.queryset.filter(attrs)