ソースを参照

fix(ipam): Honor filters for child availability views

Retain the instantiated child FilterSet on ObjectChildrenView and expose
whether child object filters are active. Use this in IPAM child views to
avoid rendering synthetic availability rows when the child queryset has
been filtered.

This ensures Saved Filters and direct filters are respected on
Prefix IP Address, Child Prefix, Aggregate Prefix,
and VLAN Group VLAN tabs.

Fixes #22210
Martin Hauser 1 日 前
コミット
103bab2019

+ 229 - 0
netbox/ipam/tests/test_views.py

@@ -8,6 +8,7 @@ from core.choices import ObjectChangeActionChoices
 from core.models import ObjectChange, ObjectType
 from dcim.constants import InterfaceTypeChoices
 from dcim.models import Device, DeviceRole, DeviceType, Interface, Manufacturer, Site
+from extras.models import SavedFilter
 from ipam.choices import *
 from ipam.models import *
 from netbox.choices import CSVDelimiterChoices, ImportFormatChoices
@@ -353,6 +354,69 @@ class AggregateTestCase(ViewTestCases.PrimaryObjectViewTestCase):
         url = reverse('ipam:aggregate_prefixes', kwargs={'pk': aggregate.pk})
         self.assertHttpStatus(self.client.get(url), 200)
 
+    def test_aggregate_prefixes_filter_suppresses_available_prefixes(self):
+        self.add_permissions('ipam.view_aggregate', 'ipam.view_prefix')
+
+        tenants = (
+            Tenant(name='Aggregate Tenant 1', slug='aggregate-tenant-1'),
+            Tenant(name='Aggregate Tenant 2', slug='aggregate-tenant-2'),
+        )
+        Tenant.objects.bulk_create(tenants)
+
+        aggregate = Aggregate.objects.create(
+            prefix=IPNetwork('203.0.113.0/24'),
+            rir=RIR.objects.first()
+        )
+        prefixes = (
+            Prefix(prefix=IPNetwork('203.0.113.0/26'), tenant=tenants[0]),
+            Prefix(prefix=IPNetwork('203.0.113.64/26'), tenant=tenants[1]),
+        )
+        Prefix.objects.bulk_create(prefixes)
+
+        url = reverse('ipam:aggregate_prefixes', kwargs={'pk': aggregate.pk})
+        response = self.client.get(url, {'tenant_id': tenants[0].pk})
+
+        self.assertHttpStatus(response, 200)
+        self.assertEqual(len(response.context['table'].data), 1)
+        self.assertContains(response, '203.0.113.0/26')
+        self.assertNotContains(response, '203.0.113.64/26')
+
+    def test_aggregate_prefixes_saved_filter(self):
+        self.add_permissions('ipam.view_aggregate', 'ipam.view_prefix')
+
+        tenants = (
+            Tenant(name='Aggregate Saved Tenant 1', slug='aggregate-saved-tenant-1'),
+            Tenant(name='Aggregate Saved Tenant 2', slug='aggregate-saved-tenant-2'),
+        )
+        Tenant.objects.bulk_create(tenants)
+
+        aggregate = Aggregate.objects.create(
+            prefix=IPNetwork('203.0.114.0/24'),
+            rir=RIR.objects.first()
+        )
+        prefixes = (
+            Prefix(prefix=IPNetwork('203.0.114.0/26'), tenant=tenants[0]),
+            Prefix(prefix=IPNetwork('203.0.114.64/26'), tenant=tenants[1]),
+        )
+        Prefix.objects.bulk_create(prefixes)
+
+        saved_filter = SavedFilter.objects.create(
+            name='Aggregate Tenant 1 prefixes',
+            slug='aggregate-tenant-1-prefixes',
+            parameters={
+                'tenant_id': [str(tenants[0].pk)],
+            },
+        )
+        saved_filter.object_types.add(ObjectType.objects.get_for_model(Prefix))
+
+        url = reverse('ipam:aggregate_prefixes', kwargs={'pk': aggregate.pk})
+        response = self.client.get(url, {'filter_id': saved_filter.pk})
+
+        self.assertHttpStatus(response, 200)
+        self.assertEqual(len(response.context['table'].data), 1)
+        self.assertContains(response, '203.0.114.0/26')
+        self.assertNotContains(response, '203.0.114.64/26')
+
 
 class RoleTestCase(ViewTestCases.OrganizationalObjectViewTestCase):
     model = Role
@@ -588,6 +652,63 @@ class PrefixTestCase(ViewTestCases.PrimaryObjectViewTestCase):
         url = reverse('ipam:prefix_prefixes', kwargs={'pk': prefixes[0].pk})
         self.assertHttpStatus(self.client.get(url), 200)
 
+    def test_prefix_prefixes_filter_suppresses_available_prefixes(self):
+        self.add_permissions('ipam.view_prefix')
+
+        tenants = (
+            Tenant(name='Prefix Tenant 1', slug='prefix-tenant-1'),
+            Tenant(name='Prefix Tenant 2', slug='prefix-tenant-2'),
+        )
+        Tenant.objects.bulk_create(tenants)
+
+        parent = Prefix.objects.create(prefix=IPNetwork('198.51.100.0/24'))
+        prefixes = (
+            Prefix(prefix=IPNetwork('198.51.100.0/26'), tenant=tenants[0]),
+            Prefix(prefix=IPNetwork('198.51.100.64/26'), tenant=tenants[1]),
+        )
+        Prefix.objects.bulk_create(prefixes)
+
+        url = reverse('ipam:prefix_prefixes', kwargs={'pk': parent.pk})
+        response = self.client.get(url, {'tenant_id': tenants[0].pk})
+
+        self.assertHttpStatus(response, 200)
+        self.assertEqual(len(response.context['table'].data), 1)
+        self.assertContains(response, '198.51.100.0/26')
+        self.assertNotContains(response, '198.51.100.64/26')
+
+    def test_prefix_prefixes_saved_filter_suppresses_available_prefixes(self):
+        self.add_permissions('ipam.view_prefix')
+
+        tenants = (
+            Tenant(name='Prefix Saved Tenant 1', slug='prefix-saved-tenant-1'),
+            Tenant(name='Prefix Saved Tenant 2', slug='prefix-saved-tenant-2'),
+        )
+        Tenant.objects.bulk_create(tenants)
+
+        parent = Prefix.objects.create(prefix=IPNetwork('198.51.101.0/24'))
+        prefixes = (
+            Prefix(prefix=IPNetwork('198.51.101.0/26'), tenant=tenants[0]),
+            Prefix(prefix=IPNetwork('198.51.101.64/26'), tenant=tenants[1]),
+        )
+        Prefix.objects.bulk_create(prefixes)
+
+        saved_filter = SavedFilter.objects.create(
+            name='Prefix Tenant 1 prefixes',
+            slug='prefix-tenant-1-prefixes',
+            parameters={
+                'tenant_id': [str(tenants[0].pk)],
+            },
+        )
+        saved_filter.object_types.add(ObjectType.objects.get_for_model(Prefix))
+
+        url = reverse('ipam:prefix_prefixes', kwargs={'pk': parent.pk})
+        response = self.client.get(url, {'filter_id': saved_filter.pk})
+
+        self.assertHttpStatus(response, 200)
+        self.assertEqual(len(response.context['table'].data), 1)
+        self.assertContains(response, '198.51.101.0/26')
+        self.assertNotContains(response, '198.51.101.64/26')
+
     def test_prefix_ipranges(self):
         self.add_permissions('ipam.view_prefix', 'ipam.view_iprange')
         prefix = Prefix.objects.create(prefix=IPNetwork('192.168.0.0/16'))
@@ -616,6 +737,63 @@ class PrefixTestCase(ViewTestCases.PrimaryObjectViewTestCase):
         url = reverse('ipam:prefix_ipaddresses', kwargs={'pk': prefix.pk})
         self.assertHttpStatus(self.client.get(url), 200)
 
+    def test_prefix_ipaddresses_filter(self):
+        self.add_permissions('ipam.view_prefix', 'ipam.view_ipaddress', 'ipam.view_iprange')
+
+        tenants = (
+            Tenant(name='IP Address Tenant 1', slug='ip-address-tenant-1'),
+            Tenant(name='IP Address Tenant 2', slug='ip-address-tenant-2'),
+        )
+        Tenant.objects.bulk_create(tenants)
+
+        prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24'))
+        ip_addresses = (
+            IPAddress(address=IPNetwork('192.0.2.1/24'), tenant=tenants[0]),
+            IPAddress(address=IPNetwork('192.0.2.2/24'), tenant=tenants[1]),
+        )
+        IPAddress.objects.bulk_create(ip_addresses)
+
+        url = reverse('ipam:prefix_ipaddresses', kwargs={'pk': prefix.pk})
+        response = self.client.get(url, {'tenant_id': tenants[0].pk})
+
+        self.assertHttpStatus(response, 200)
+        self.assertEqual(len(response.context['table'].data), 1)
+        self.assertContains(response, '192.0.2.1/24')
+        self.assertNotContains(response, '192.0.2.2/24')
+
+    def test_prefix_ipaddresses_saved_filter(self):
+        self.add_permissions('ipam.view_prefix', 'ipam.view_ipaddress', 'ipam.view_iprange')
+
+        tenants = (
+            Tenant(name='Saved Filter Tenant 1', slug='saved-filter-tenant-1'),
+            Tenant(name='Saved Filter Tenant 2', slug='saved-filter-tenant-2'),
+        )
+        Tenant.objects.bulk_create(tenants)
+
+        prefix = Prefix.objects.create(prefix=IPNetwork('192.0.2.0/24'))
+        ip_addresses = (
+            IPAddress(address=IPNetwork('192.0.2.1/24'), tenant=tenants[0]),
+            IPAddress(address=IPNetwork('192.0.2.2/24'), tenant=tenants[1]),
+        )
+        IPAddress.objects.bulk_create(ip_addresses)
+
+        saved_filter = SavedFilter.objects.create(
+            name='Tenant 1 IP addresses',
+            slug='tenant-1-ip-addresses',
+            parameters={
+                'tenant_id': [str(tenants[0].pk)],
+            },
+        )
+        saved_filter.object_types.add(ObjectType.objects.get_for_model(IPAddress))
+
+        url = reverse('ipam:prefix_ipaddresses', kwargs={'pk': prefix.pk})
+        response = self.client.get(url, {'filter_id': saved_filter.pk})
+
+        self.assertHttpStatus(response, 200)
+        self.assertEqual(len(response.context['table'].data), 1)
+        self.assertContains(response, '192.0.2.1/24')
+        self.assertNotContains(response, '192.0.2.2/24')
+
     def test_prefix_ipaddresses_with_single_address_range(self):
         self.add_permissions('ipam.view_prefix', 'ipam.view_ipaddress', 'ipam.view_iprange')
         # The IP Addresses tab annotates child IP addresses alongside any
@@ -1130,6 +1308,57 @@ class VLANGroupTestCase(ViewTestCases.OrganizationalObjectViewTestCase):
             'description': 'New description',
         }
 
+    def test_vlans_filter_suppresses_available_vlans(self):
+        self.add_permissions('ipam.view_vlangroup', 'ipam.view_vlan')
+
+        group = VLANGroup.objects.create(
+            name='Filtered VLAN Group',
+            slug='filtered-vlan-group'
+        )
+        vlans = (
+            VLAN(group=group, vid=100, name='VLAN100'),
+            VLAN(group=group, vid=200, name='VLAN200'),
+        )
+        VLAN.objects.bulk_create(vlans)
+
+        url = reverse('ipam:vlangroup_vlans', kwargs={'pk': group.pk})
+        response = self.client.get(url, {'vid': 100})
+
+        self.assertHttpStatus(response, 200)
+        self.assertEqual(len(response.context['table'].data), 1)
+        self.assertContains(response, 'VLAN100')
+        self.assertNotContains(response, 'VLAN200')
+
+    def test_vlans_saved_filter_suppresses_available_vlans(self):
+        self.add_permissions('ipam.view_vlangroup', 'ipam.view_vlan')
+
+        group = VLANGroup.objects.create(
+            name='Saved Filter VLAN Group',
+            slug='saved-filter-vlan-group'
+        )
+        vlans = (
+            VLAN(group=group, vid=100, name='VLAN100'),
+            VLAN(group=group, vid=200, name='VLAN200'),
+        )
+        VLAN.objects.bulk_create(vlans)
+
+        saved_filter = SavedFilter.objects.create(
+            name='VLAN 100',
+            slug='vlan-100',
+            parameters={
+                'vid': ['100'],
+            },
+        )
+        saved_filter.object_types.add(ObjectType.objects.get_for_model(VLAN))
+
+        url = reverse('ipam:vlangroup_vlans', kwargs={'pk': group.pk})
+        response = self.client.get(url, {'filter_id': saved_filter.pk})
+
+        self.assertHttpStatus(response, 200)
+        self.assertEqual(len(response.context['table'].data), 1)
+        self.assertContains(response, 'VLAN100')
+        self.assertNotContains(response, 'VLAN200')
+
 
 class VLANTestCase(ViewTestCases.PrimaryObjectViewTestCase):
     model = VLAN

+ 19 - 6
netbox/ipam/views.py

@@ -572,13 +572,18 @@ class AggregatePrefixesView(generic.ObjectChildrenView):
         show_available = bool(request.GET.get('show_available', 'true') == 'true')
         show_assigned = bool(request.GET.get('show_assigned', 'true') == 'true')
 
+        if self.has_active_filters():
+            show_available = False
+
         return add_requested_prefixes(parent.prefix, queryset, show_available, show_assigned)
 
     def get_extra_context(self, request, instance):
+        show_available = bool(request.GET.get('show_available', 'true') == 'true') and not self.has_active_filters()
+
         return {
             'bulk_querystring': f'within={instance.prefix}',
             'first_available_prefix': instance.get_first_available_prefix(),
-            'show_available': bool(request.GET.get('show_available', 'true') == 'true'),
+            'show_available': show_available,
             'show_assigned': bool(request.GET.get('show_assigned', 'true') == 'true'),
         }
 
@@ -794,13 +799,18 @@ class PrefixPrefixesView(generic.ObjectChildrenView):
         show_available = bool(request.GET.get('show_available', 'true') == 'true')
         show_assigned = bool(request.GET.get('show_assigned', 'true') == 'true')
 
+        if self.has_active_filters():
+            show_available = False
+
         return add_requested_prefixes(parent.prefix, queryset, show_available, show_assigned)
 
     def get_extra_context(self, request, instance):
+        show_available = bool(request.GET.get('show_available', 'true') == 'true') and not self.has_active_filters()
+
         return {
             'bulk_querystring': f"vrf_id={instance.vrf.pk if instance.vrf else '0'}&within={instance.prefix}",
             'first_available_prefix': instance.get_first_available_prefix(),
-            'show_available': bool(request.GET.get('show_available', 'true') == 'true'),
+            'show_available': show_available,
             'show_assigned': bool(request.GET.get('show_assigned', 'true') == 'true'),
         }
 
@@ -851,9 +861,10 @@ class PrefixIPAddressesView(generic.ObjectChildrenView):
         return parent.get_child_ips().restrict(request.user, 'view').prefetch_related('vrf', 'tenant', 'tenant__group')
 
     def prep_table_data(self, request, queryset, parent):
-        if not request.GET.get('q') and not get_table_ordering(request, self.table):
+        if not self.has_active_filters() and not get_table_ordering(request, self.table):
             return annotate_ip_space(parent)
-        return queryset
+
+        return super().prep_table_data(request, queryset, parent)
 
     def get_extra_context(self, request, instance):
         return {
@@ -1312,9 +1323,11 @@ class VLANGroupVLANsView(generic.ObjectChildrenView):
         )
 
     def prep_table_data(self, request, queryset, parent):
-        if not get_table_ordering(request, self.table):
+        # Skip synthetic available rows under active filters: filtered-out VLANs would otherwise look available.
+        if not self.has_active_filters() and not get_table_ordering(request, self.table):
             return add_available_vlans(queryset, parent)
-        return queryset
+
+        return super().prep_table_data(request, queryset, parent)
 
 
 #

+ 63 - 3
netbox/netbox/views/generic/object_views.py

@@ -11,6 +11,7 @@ from django.urls import reverse
 from django.utils.html import escape
 from django.utils.safestring import mark_safe
 from django.utils.translation import gettext as _
+from django_filters.constants import EMPTY_VALUES
 
 from core.signals import clear_events
 from netbox.object_actions import BulkDelete, BulkEdit, CloneObject, DeleteObject, EditObject
@@ -104,6 +105,7 @@ class ObjectChildrenView(ObjectView, ActionsMixin, TableMixin):
     table = None
     filterset = None
     filterset_form = None
+    filterset_instance = None
     actions = (CloneObject, EditObject, DeleteObject, BulkEdit, BulkDelete)
     template_name = 'generic/object_children.html'
 
@@ -119,6 +121,66 @@ class ObjectChildrenView(ObjectView, ActionsMixin, TableMixin):
             class_name=self.__class__.__name__
         ))
 
+    def get_filterset(self, request, queryset):
+        """
+        Instantiate and return the child object FilterSet for the request.
+        """
+        if self.filterset is None:
+            return None
+
+        return self.filterset(request.GET, queryset, request=request)
+
+    def filter_queryset(self, request, queryset):
+        """
+        Apply the configured FilterSet to the child object queryset.
+        """
+        self.filterset_instance = self.get_filterset(request, queryset)
+
+        if self.filterset_instance is not None:
+            return self.filterset_instance.qs
+
+        return queryset
+
+    def has_active_filters(self):
+        """
+        Return True if the child object FilterSet has any active filter parameters.
+
+        Only declared filterset parameters are counted. This excludes table controls and other view-specific
+        query parameters, while still accounting for SavedFilter parameters after they have been expanded by the
+        FilterSet.
+
+        This must be called only after filter_queryset() has initialized self.filterset_instance.
+        """
+        if self.filterset_instance is None:
+            if self.filterset is not None:
+                raise RuntimeError(
+                    f"{self.__class__.__name__}.has_active_filters() must be called after filter_queryset()."
+                )
+            return False
+
+        filter_names = set(self.filterset_instance.filters)
+        data = self.filterset_instance.data
+
+        if not data:
+            return False
+
+        if hasattr(data, 'lists'):
+            data_items = data.lists()
+        else:
+            data_items = data.items()
+
+        for key, values in data_items:
+            if key not in filter_names:
+                continue
+
+            if not isinstance(values, (list, tuple)):
+                values = [values]
+
+            if any(value not in EMPTY_VALUES for value in values):
+                return True
+
+        return False
+
     def prep_table_data(self, request, queryset, parent):
         """
         Provides a hook for subclassed views to modify data before initializing the table.
@@ -140,9 +202,7 @@ class ObjectChildrenView(ObjectView, ActionsMixin, TableMixin):
         """
         instance = self.get_object(**kwargs)
         child_objects = self.get_children(request, instance)
-
-        if self.filterset:
-            child_objects = self.filterset(request.GET, child_objects, request=request).qs
+        child_objects = self.filter_queryset(request, child_objects)
 
         # Determine the available actions
         actions = self.get_permitted_actions(request.user, model=self.child_model)