Просмотр исходного кода

Enable many-to-many model assignment for ObjectPermissions

Jeremy Stretch 5 лет назад
Родитель
Сommit
f65b2278f0

+ 52 - 26
netbox/netbox/tests/test_authentication.py

@@ -201,11 +201,13 @@ class ObjectPermissionViewTestCase(TestCase):
         self.assertHttpStatus(response, 403)
 
         # Assign object permission
-        self.user.object_permissions.create(
-            model=ContentType.objects.get_for_model(Prefix),
+        obj_perm = ObjectPermission(
             attrs={'site__name': 'Site 1'},
             can_view=True
         )
+        obj_perm.save()
+        obj_perm.users.add(self.user)
+        obj_perm.content_types.add(ContentType.objects.get_for_model(Prefix))
 
         # Retrieve permitted object
         response = self.client.get(self.prefixes[0].get_absolute_url())
@@ -223,11 +225,13 @@ class ObjectPermissionViewTestCase(TestCase):
         self.assertHttpStatus(response, 403)
 
         # Assign object permission
-        self.user.object_permissions.create(
-            model=ContentType.objects.get_for_model(Prefix),
+        obj_perm = ObjectPermission(
             attrs={'site__name': 'Site 1'},
             can_view=True
         )
+        obj_perm.save()
+        obj_perm.users.add(self.user)
+        obj_perm.content_types.add(ContentType.objects.get_for_model(Prefix))
 
         # Retrieve all objects. Only permitted objects should be returned.
         response = self.client.get(reverse('ipam:prefix_list'))
@@ -255,12 +259,14 @@ class ObjectPermissionViewTestCase(TestCase):
         self.assertEqual(initial_count, Prefix.objects.count())
 
         # Assign object permission
-        self.user.object_permissions.create(
-            model=ContentType.objects.get_for_model(Prefix),
+        obj_perm = ObjectPermission(
             attrs={'site__name': 'Site 1'},
             can_view=True,
             can_add=True
         )
+        obj_perm.save()
+        obj_perm.users.add(self.user)
+        obj_perm.content_types.add(ContentType.objects.get_for_model(Prefix))
 
         # Attempt to create a non-permitted object
         request = {
@@ -301,12 +307,14 @@ class ObjectPermissionViewTestCase(TestCase):
         self.assertHttpStatus(response, 403)
 
         # Assign object permission
-        self.user.object_permissions.create(
-            model=ContentType.objects.get_for_model(Prefix),
+        obj_perm = ObjectPermission(
             attrs={'site__name': 'Site 1'},
             can_view=True,
             can_change=True
         )
+        obj_perm.save()
+        obj_perm.users.add(self.user)
+        obj_perm.content_types.add(ContentType.objects.get_for_model(Prefix))
 
         # Attempt to edit a non-permitted object
         request = {
@@ -343,12 +351,14 @@ class ObjectPermissionViewTestCase(TestCase):
         self.assertHttpStatus(response, 403)
 
         # Assign object permission
-        self.user.object_permissions.create(
-            model=ContentType.objects.get_for_model(Prefix),
+        obj_perm = ObjectPermission(
             attrs={'site__name': 'Site 1'},
             can_view=True,
             can_delete=True
         )
+        obj_perm.save()
+        obj_perm.users.add(self.user)
+        obj_perm.content_types.add(ContentType.objects.get_for_model(Prefix))
 
         # Delete permitted object
         request = {
@@ -390,11 +400,13 @@ class ObjectPermissionViewTestCase(TestCase):
         self.assertEqual(initial_count, Prefix.objects.count())
 
         # Assign object permission
-        self.user.object_permissions.create(
-            model=ContentType.objects.get_for_model(Prefix),
+        obj_perm = ObjectPermission(
             attrs={'site__name': 'Site 1'},
             can_add=True
         )
+        obj_perm.save()
+        obj_perm.users.add(self.user)
+        obj_perm.content_types.add(ContentType.objects.get_for_model(Prefix))
 
         # Attempt to create non-permitted objects
         request = {
@@ -437,11 +449,13 @@ class ObjectPermissionViewTestCase(TestCase):
         self.assertHttpStatus(response, 403)
 
         # Assign object permission
-        self.user.object_permissions.create(
-            model=ContentType.objects.get_for_model(Prefix),
+        obj_perm = ObjectPermission(
             attrs={'site__name': 'Site 1'},
             can_change=True
         )
+        obj_perm.save()
+        obj_perm.users.add(self.user)
+        obj_perm.content_types.add(ContentType.objects.get_for_model(Prefix))
 
         # Attempt to edit non-permitted objects
         request = {
@@ -479,12 +493,14 @@ class ObjectPermissionViewTestCase(TestCase):
         self.assertHttpStatus(response, 403)
 
         # Assign object permission
-        self.user.object_permissions.create(
-            model=ContentType.objects.get_for_model(Prefix),
+        obj_perm = ObjectPermission(
             attrs={'site__name': 'Site 1'},
             can_view=True,
             can_delete=True
         )
+        obj_perm.save()
+        obj_perm.users.add(self.user)
+        obj_perm.content_types.add(ContentType.objects.get_for_model(Prefix))
 
         # Attempt to delete non-permitted object
         request = {
@@ -549,11 +565,13 @@ class ObjectPermissionAPIViewTestCase(TestCase):
         self.assertEqual(response.status_code, 403)
 
         # Assign object permission
-        self.user.object_permissions.create(
-            model=ContentType.objects.get_for_model(Prefix),
+        obj_perm = ObjectPermission(
             attrs={'site__name': 'Site 1'},
             can_view=True
         )
+        obj_perm.save()
+        obj_perm.users.add(self.user)
+        obj_perm.content_types.add(ContentType.objects.get_for_model(Prefix))
 
         # Retrieve permitted object
         url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[0].pk})
@@ -574,11 +592,13 @@ class ObjectPermissionAPIViewTestCase(TestCase):
         self.assertEqual(response.status_code, 403)
 
         # Assign object permission
-        self.user.object_permissions.create(
-            model=ContentType.objects.get_for_model(Prefix),
+        obj_perm = ObjectPermission(
             attrs={'site__name': 'Site 1'},
             can_view=True
         )
+        obj_perm.save()
+        obj_perm.users.add(self.user)
+        obj_perm.content_types.add(ContentType.objects.get_for_model(Prefix))
 
         # Retrieve all objects. Only permitted objects should be returned.
         response = self.client.get(url, **self.header)
@@ -599,11 +619,13 @@ class ObjectPermissionAPIViewTestCase(TestCase):
         self.assertEqual(response.status_code, 403)
 
         # Assign object permission
-        self.user.object_permissions.create(
-            model=ContentType.objects.get_for_model(Prefix),
+        obj_perm = ObjectPermission(
             attrs={'site__name': 'Site 1'},
             can_add=True
         )
+        obj_perm.save()
+        obj_perm.users.add(self.user)
+        obj_perm.content_types.add(ContentType.objects.get_for_model(Prefix))
 
         # Attempt to create a non-permitted object
         response = self.client.post(url, data, format='json', **self.header)
@@ -626,11 +648,13 @@ class ObjectPermissionAPIViewTestCase(TestCase):
         self.assertEqual(response.status_code, 403)
 
         # Assign object permission
-        self.user.object_permissions.create(
-            model=ContentType.objects.get_for_model(Prefix),
+        obj_perm = ObjectPermission(
             attrs={'site__name': 'Site 1'},
             can_change=True
         )
+        obj_perm.save()
+        obj_perm.users.add(self.user)
+        obj_perm.content_types.add(ContentType.objects.get_for_model(Prefix))
 
         # Attempt to edit a non-permitted object
         data = {'site': self.sites[0].pk}
@@ -659,11 +683,13 @@ class ObjectPermissionAPIViewTestCase(TestCase):
         self.assertEqual(response.status_code, 403)
 
         # Assign object permission
-        self.user.object_permissions.create(
-            model=ContentType.objects.get_for_model(Prefix),
+        obj_perm = ObjectPermission(
             attrs={'site__name': 'Site 1'},
             can_delete=True
         )
+        obj_perm.save()
+        obj_perm.users.add(self.user)
+        obj_perm.content_types.add(ContentType.objects.get_for_model(Prefix))
 
         # Attempt to delete a non-permitted object
         url = reverse('ipam-api:prefix-detail', kwargs={'pk': self.prefixes[3].pk})

+ 2 - 2
netbox/users/admin.py

@@ -89,8 +89,8 @@ class ObjectPermissionForm(forms.ModelForm):
         super().__init__(*args, **kwargs)
 
         # Format ContentType choices
-        order_content_types(self.fields['model'])
-        self.fields['model'].choices.insert(0, ('', '---------'))
+        order_content_types(self.fields['content_types'])
+        self.fields['content_types'].choices.insert(0, ('', '---------'))
 
 
 @admin.register(ObjectPermission)

+ 2 - 6
netbox/users/migrations/0007_objectpermission.py

@@ -1,9 +1,8 @@
-# Generated by Django 3.0.6 on 2020-05-27 14:17
+# Generated by Django 3.0.6 on 2020-05-28 18:24
 
 from django.conf import settings
 import django.contrib.postgres.fields.jsonb
 from django.db import migrations, models
-import django.db.models.deletion
 
 
 class Migration(migrations.Migration):
@@ -25,12 +24,9 @@ class Migration(migrations.Migration):
                 ('can_add', models.BooleanField(default=False)),
                 ('can_change', models.BooleanField(default=False)),
                 ('can_delete', models.BooleanField(default=False)),
+                ('content_types', models.ManyToManyField(limit_choices_to={'app_label__in': ['circuits', 'dcim', 'extras', 'ipam', 'secrets', 'tenancy', 'virtualization']}, related_name='object_permissions', to='contenttypes.ContentType')),
                 ('groups', models.ManyToManyField(blank=True, related_name='object_permissions', to='auth.Group')),
-                ('model', models.ForeignKey(limit_choices_to={'app_label__in': ['circuits', 'dcim', 'extras', 'ipam', 'secrets', 'tenancy', 'virtualization']}, on_delete=django.db.models.deletion.CASCADE, to='contenttypes.ContentType')),
                 ('users', models.ManyToManyField(blank=True, related_name='object_permissions', to=settings.AUTH_USER_MODEL)),
             ],
-            options={
-                'unique_together': {('model', 'attrs')},
-            },
         ),
     ]

+ 4 - 4
netbox/users/models.py

@@ -212,14 +212,14 @@ class ObjectPermission(models.Model):
         blank=True,
         related_name='object_permissions'
     )
-    model = models.ForeignKey(
+    content_types = models.ManyToManyField(
         to=ContentType,
         limit_choices_to={
             'app_label__in': [
                 'circuits', 'dcim', 'extras', 'ipam', 'secrets', 'tenancy', 'virtualization',
             ],
         },
-        on_delete=models.CASCADE
+        related_name='object_permissions'
     )
     attrs = JSONField(
         blank=True,
@@ -239,8 +239,8 @@ class ObjectPermission(models.Model):
         default=False
     )
 
-    class Meta:
-        unique_together = ('model', 'attrs')
+    def __str__(self):
+        return "Object permission"
 
     def clean(self):
 

+ 13 - 12
netbox/utilities/auth_backends.py

@@ -26,18 +26,19 @@ class ObjectPermissionBackend(ModelBackend):
         object_permissions = ObjectPermission.objects.filter(
             Q(users=user_obj) |
             Q(groups__user=user_obj)
-        ).prefetch_related('model')
+        ).prefetch_related('content_types')
 
         # Create a dictionary mapping permissions to their attributes
         perms = dict()
         for obj_perm in object_permissions:
-            for action in ['view', 'add', 'change', 'delete']:
-                if getattr(obj_perm, f"can_{action}"):
-                    perm_name = f"{obj_perm.model.app_label}.{action}_{obj_perm.model.model}"
-                    if perm_name in perms:
-                        perms[perm_name].append(obj_perm.attrs)
-                    else:
-                        perms[perm_name] = [obj_perm.attrs]
+            for content_type in obj_perm.content_types.all():
+                for action in ['view', 'add', 'change', 'delete']:
+                    if getattr(obj_perm, f"can_{action}"):
+                        perm_name = f"{content_type.app_label}.{action}_{content_type.model}"
+                        if perm_name in perms:
+                            perms[perm_name].append(obj_perm.attrs)
+                        else:
+                            perms[perm_name] = [obj_perm.attrs]
 
         return perms
 
@@ -122,10 +123,10 @@ class RemoteUserBackend(_RemoteUserBackend):
         for permission_name in settings.REMOTE_AUTH_DEFAULT_PERMISSIONS:
             try:
                 content_type, action = resolve_permission(permission_name)
-                user.object_permissions.create(**{
-                    'model': content_type,
-                    f'can_{action}': True
-                })
+                obj_perm = ObjectPermission(**{f'can_{action}': True})
+                obj_perm.save()
+                obj_perm.users.add(user)
+                obj_perm.content_types.add(content_type)
                 permissions_list.append(permission_name)
             except ValueError:
                 logging.error(

+ 49 - 44
netbox/utilities/testing/testcases.py

@@ -34,21 +34,10 @@ class TestCase(_TestCase):
         """
         for name in names:
             ct, action = resolve_permission(name)
-            self.user.object_permissions.create(**{
-                'model': ct,
-                f'can_{action}': True
-            })
-
-    def remove_permissions(self, *names):
-        """
-        Remove a set of permissions from the test user, if assigned.
-        """
-        for name in names:
-            ct, action = resolve_permission(name)
-            self.user.object_permissions.filter(**{
-                'model': ct,
-                f'can_{action}': True
-            }).delete()
+            obj_perm = ObjectPermission(**{f'can_{action}': True})
+            obj_perm.save()
+            obj_perm.users.add(self.user)
+            obj_perm.content_types.add(ct)
 
     #
     # Convenience methods
@@ -175,10 +164,12 @@ class ViewTestCases:
             instance = self.model.objects.first()
 
             # Add model-level permission
-            self.user.object_permissions.create(
-                model=ContentType.objects.get_for_model(self.model),
+            obj_perm = ObjectPermission(
                 can_view=True
             )
+            obj_perm.save()
+            obj_perm.users.add(self.user)
+            obj_perm.content_types.add(ContentType.objects.get_for_model(self.model))
 
             # Try GET with model-level permission
             self.assertHttpStatus(self.client.get(instance.get_absolute_url()), 200)
@@ -188,11 +179,13 @@ class ViewTestCases:
             instance1, instance2 = self.model.objects.all()[:2]
 
             # Add object-level permission
-            self.user.object_permissions.create(
-                model=ContentType.objects.get_for_model(self.model),
+            obj_perm = ObjectPermission(
                 attrs={'pk': instance1.pk},
                 can_view=True
             )
+            obj_perm.save()
+            obj_perm.users.add(self.user)
+            obj_perm.content_types.add(ContentType.objects.get_for_model(self.model))
 
             # Try GET to permitted object
             self.assertHttpStatus(self.client.get(instance1.get_absolute_url()), 200)
@@ -227,10 +220,12 @@ class ViewTestCases:
             initial_count = self.model.objects.count()
 
             # Assign model-level permission
-            self.user.object_permissions.create(
-                model=ContentType.objects.get_for_model(self.model),
+            obj_perm = ObjectPermission(
                 can_add=True
             )
+            obj_perm.save()
+            obj_perm.users.add(self.user)
+            obj_perm.content_types.add(ContentType.objects.get_for_model(self.model))
 
             # Try GET with model-level permission
             self.assertHttpStatus(self.client.get(self._get_url('add')), 200)
@@ -250,12 +245,12 @@ class ViewTestCases:
 
             # Assign object-level permission
             obj_perm = ObjectPermission(
-                model=ContentType.objects.get_for_model(self.model),
                 attrs={'pk__gt': 0},  # Dummy permission to allow all
                 can_add=True
             )
             obj_perm.save()
             obj_perm.users.add(self.user)
+            obj_perm.content_types.add(ContentType.objects.get_for_model(self.model))
 
             # Try GET with object-level permission
             self.assertHttpStatus(self.client.get(self._get_url('add')), 200)
@@ -309,10 +304,12 @@ class ViewTestCases:
             instance = self.model.objects.first()
 
             # Assign model-level permission
-            self.user.object_permissions.create(
-                model=ContentType.objects.get_for_model(self.model),
+            obj_perm = ObjectPermission(
                 can_change=True
             )
+            obj_perm.save()
+            obj_perm.users.add(self.user)
+            obj_perm.content_types.add(ContentType.objects.get_for_model(self.model))
 
             # Try GET with model-level permission
             self.assertHttpStatus(self.client.get(self._get_url('edit', instance)), 200)
@@ -331,12 +328,12 @@ class ViewTestCases:
 
             # Assign object-level permission
             obj_perm = ObjectPermission(
-                model=ContentType.objects.get_for_model(self.model),
                 attrs={'pk': instance1.pk},
                 can_change=True
             )
             obj_perm.save()
             obj_perm.users.add(self.user)
+            obj_perm.content_types.add(ContentType.objects.get_for_model(self.model))
 
             # Try GET with a permitted object
             self.assertHttpStatus(self.client.get(self._get_url('edit', instance1)), 200)
@@ -384,10 +381,12 @@ class ViewTestCases:
             instance = self.model.objects.first()
 
             # Assign model-level permission
-            self.user.object_permissions.create(
-                model=ContentType.objects.get_for_model(self.model),
+            obj_perm = ObjectPermission(
                 can_delete=True
             )
+            obj_perm.save()
+            obj_perm.users.add(self.user)
+            obj_perm.content_types.add(ContentType.objects.get_for_model(self.model))
 
             # Try GET with model-level permission
             self.assertHttpStatus(self.client.get(self._get_url('delete', instance)), 200)
@@ -407,12 +406,12 @@ class ViewTestCases:
 
             # Assign object-level permission
             obj_perm = ObjectPermission(
-                model=ContentType.objects.get_for_model(self.model),
                 attrs={'pk': instance1.pk},
                 can_delete=True
             )
             obj_perm.save()
             obj_perm.users.add(self.user)
+            obj_perm.content_types.add(ContentType.objects.get_for_model(self.model))
 
             # Try GET with a permitted object
             self.assertHttpStatus(self.client.get(self._get_url('delete', instance1)), 200)
@@ -459,10 +458,12 @@ class ViewTestCases:
         def test_list_objects_with_model_permission(self):
 
             # Add model-level permission
-            self.user.object_permissions.create(
-                model=ContentType.objects.get_for_model(self.model),
+            obj_perm = ObjectPermission(
                 can_view=True
             )
+            obj_perm.save()
+            obj_perm.users.add(self.user)
+            obj_perm.content_types.add(ContentType.objects.get_for_model(self.model))
 
             # Try GET with model-level permission
             self.assertHttpStatus(self.client.get(self._get_url('list')), 200)
@@ -479,12 +480,12 @@ class ViewTestCases:
 
             # Add object-level permission
             obj_perm = ObjectPermission(
-                model=ContentType.objects.get_for_model(self.model),
                 attrs={'pk': instance1.pk},
                 can_view=True
             )
             obj_perm.save()
             obj_perm.users.add(self.user)
+            obj_perm.content_types.add(ContentType.objects.get_for_model(self.model))
 
             # Try GET with object-level permission
             self.assertHttpStatus(self.client.get(self._get_url('list')), 200)
@@ -511,12 +512,10 @@ class ViewTestCases:
                 self.assertHttpStatus(self.client.post(**request), 403)
 
             # Assign object-level permission
-            obj_perm = ObjectPermission(
-                model=ContentType.objects.get_for_model(self.model),
-                can_add=True
-            )
+            obj_perm = ObjectPermission(can_add=True)
             obj_perm.save()
             obj_perm.users.add(self.user)
+            obj_perm.content_types.add(ContentType.objects.get_for_model(self.model))
 
             response = self.client.post(**request)
             self.assertHttpStatus(response, 302)
@@ -557,10 +556,12 @@ class ViewTestCases:
             }
 
             # Assign model-level permission
-            self.user.object_permissions.create(
-                model=ContentType.objects.get_for_model(self.model),
+            obj_perm = ObjectPermission(
                 can_add=True
             )
+            obj_perm.save()
+            obj_perm.users.add(self.user)
+            obj_perm.content_types.add(ContentType.objects.get_for_model(self.model))
 
             # Try GET with model-level permission
             self.assertHttpStatus(self.client.get(self._get_url('import')), 200)
@@ -578,12 +579,12 @@ class ViewTestCases:
 
             # Assign object-level permission
             obj_perm = ObjectPermission(
-                model=ContentType.objects.get_for_model(self.model),
                 attrs={'pk__gt': 0},  # Dummy permission to allow all
                 can_add=True
             )
             obj_perm.save()
             obj_perm.users.add(self.user)
+            obj_perm.content_types.add(ContentType.objects.get_for_model(self.model))
 
             # Test import with object-level permission
             self.assertHttpStatus(self.client.post(self._get_url('import'), data), 200)
@@ -625,10 +626,12 @@ class ViewTestCases:
             data.update(post_data(self.bulk_edit_data))
 
             # Assign model-level permission
-            self.user.object_permissions.create(
-                model=ContentType.objects.get_for_model(self.model),
+            obj_perm = ObjectPermission(
                 can_change=True
             )
+            obj_perm.save()
+            obj_perm.users.add(self.user)
+            obj_perm.content_types.add(ContentType.objects.get_for_model(self.model))
 
             # Try POST with model-level permission
             self.assertHttpStatus(self.client.post(self._get_url('bulk_edit'), data), 302)
@@ -648,12 +651,12 @@ class ViewTestCases:
 
             # Assign object-level permission
             obj_perm = ObjectPermission(
-                model=ContentType.objects.get_for_model(self.model),
                 attrs={'pk__in': list(pk_list)},
                 can_change=True
             )
             obj_perm.save()
             obj_perm.users.add(self.user)
+            obj_perm.content_types.add(ContentType.objects.get_for_model(self.model))
 
             # Try POST with model-level permission
             self.assertHttpStatus(self.client.post(self._get_url('bulk_edit'), data), 302)
@@ -693,10 +696,12 @@ class ViewTestCases:
             }
 
             # Assign model-level permission
-            self.user.object_permissions.create(
-                model=ContentType.objects.get_for_model(self.model),
+            obj_perm = ObjectPermission(
                 can_delete=True
             )
+            obj_perm.save()
+            obj_perm.users.add(self.user)
+            obj_perm.content_types.add(ContentType.objects.get_for_model(self.model))
 
             # Try POST with model-level permission
             self.assertHttpStatus(self.client.post(self._get_url('bulk_delete'), data), 302)
@@ -713,12 +718,12 @@ class ViewTestCases:
 
             # Assign object-level permission
             obj_perm = ObjectPermission(
-                model=ContentType.objects.get_for_model(self.model),
                 attrs={'pk__in': list(pk_list)},
                 can_delete=True
             )
             obj_perm.save()
             obj_perm.users.add(self.user)
+            obj_perm.content_types.add(ContentType.objects.get_for_model(self.model))
 
             # Try POST with object-level permission
             self.assertHttpStatus(self.client.post(self._get_url('bulk_delete'), data), 302)