ソースを参照

Merge pull request #22384 from netbox-community/15569-add-better-tests-for-graphql-filtering-and-lookup

Closes #15569: Auto-generate GraphQL filter tests for API test cases
bctiemann 1 ヶ月 前
コミット
f4d95e6e9d

+ 1 - 1
netbox/dcim/tests/query_counts.json

@@ -72,7 +72,7 @@
   "rearporttemplate:api_list_objects": 12,
   "region:api_list_objects": 13,
   "region:list_objects_with_permission": 20,
-  "site:api_list_objects": 16,
+  "site:api_list_objects": 17,
   "site:list_objects_with_permission": 22,
   "sitegroup:api_list_objects": 13,
   "sitegroup:list_objects_with_permission": 20,

+ 71 - 2
netbox/dcim/tests/test_api.py

@@ -20,6 +20,8 @@ from users.models import ObjectPermission, Token, User
 from utilities.testing import (
     APITestCase,
     APIViewTestCases,
+    GraphQLFilterTest,
+    GraphQLQueryTest,
     create_test_device,
     create_test_nat_ip_pair,
     disable_logging,
@@ -146,6 +148,19 @@ class SiteTestCase(APIViewTestCases.APIViewTestCase):
     bulk_update_data = {
         'status': 'planned',
     }
+    graphql_filter_tests = (
+        GraphQLFilterTest(
+            name='tenant__name__exact',
+            filters='tenant: {name: {exact: "Tenant 1"}}',
+            expected=lambda qs: qs.filter(tenant__name='Tenant 1'),
+            permissions=('tenancy.view_tenant',),
+        ),
+    )
+
+    def assert_nested_locations_active(self, data):
+        site_data = data.get('site') or {}
+        location_names = sorted(location['name'] for location in site_data.get('locations', []))
+        self.assertEqual(location_names, ['Site1 Active A', 'Site1 Active B'])
 
     @classmethod
     def setUpTestData(cls):
@@ -160,15 +175,32 @@ class SiteTestCase(APIViewTestCases.APIViewTestCase):
             SiteGroup.objects.create(name='Site Group 2', slug='site-group-2'),
         )
 
+        tenant = Tenant.objects.create(name='Tenant 1', slug='tenant-1')
+
+        # Site 1's tenant activates the dynamic tenant prefetch (+1 in api_list_objects baseline).
         sites = (
-            Site(region=regions[0], group=groups[0], name='Site 1', slug='site-1'),
+            Site(region=regions[0], group=groups[0], tenant=tenant, name='Site 1', slug='site-1'),
             Site(region=regions[0], group=groups[0], name='Site 2', slug='site-2'),
             Site(region=regions[0], group=groups[0], name='Site 3', slug='site-3'),
         )
         Site.objects.bulk_create(sites)
 
+        nested_site = Site.objects.get(slug='site-1')
+        cls.nested_site_pk = nested_site.pk
+        Location.objects.create(
+            site=nested_site, name='Site1 Active A', slug='site1-active-a',
+            status=LocationStatusChoices.STATUS_ACTIVE,
+        )
+        Location.objects.create(
+            site=nested_site, name='Site1 Active B', slug='site1-active-b',
+            status=LocationStatusChoices.STATUS_ACTIVE,
+        )
+        Location.objects.create(
+            site=nested_site, name='Site1 Planned', slug='site1-planned',
+            status=LocationStatusChoices.STATUS_PLANNED,
+        )
+
         rir = RIR.objects.create(name='RFC 6996', is_private=True)
-        tenant = Tenant.objects.create(name='Tenant 1', slug='tenant-1')
 
         asns = [
             ASN(asn=65000 + i, rir=rir) for i in range(8)
@@ -203,6 +235,19 @@ class SiteTestCase(APIViewTestCases.APIViewTestCase):
             },
         ]
 
+        cls.graphql_query_tests = (
+            GraphQLQueryTest(
+                name='nested_locations_by_status',
+                query=(
+                    '{ site(id: ' + str(cls.nested_site_pk) + ') { '
+                    'locations(filters: {status: {exact: STATUS_ACTIVE}}) { name } '
+                    '} }'
+                ),
+                assert_result=cls.assert_nested_locations_active,
+                permissions=('dcim.view_location',),
+            ),
+        )
+
     def test_add_tags(self):
         """
         Add tags to an existing object via the add_tags field.
@@ -427,6 +472,16 @@ class LocationTestCase(APIViewTestCases.APIViewTestCase):
         'description': 'New description',
     }
     user_permissions = ('dcim.view_site',)
+    graphql_filter_tests = (
+        GraphQLFilterTest(
+            name='status__in_list',
+            filters='status: {in_list: [STATUS_PLANNED, STATUS_STAGING]}',
+            expected=lambda qs: qs.filter(status__in=[
+                LocationStatusChoices.STATUS_PLANNED,
+                LocationStatusChoices.STATUS_STAGING,
+            ]),
+        ),
+    )
 
     @classmethod
     def setUpTestData(cls):
@@ -476,6 +531,20 @@ class LocationTestCase(APIViewTestCases.APIViewTestCase):
             parent=parent_locations[0],
             status=LocationStatusChoices.STATUS_ACTIVE,
         )
+        Location.objects.create(
+            site=sites[0],
+            name='GraphQL Planned Location',
+            slug='graphql-planned-location',
+            parent=parent_locations[0],
+            status=LocationStatusChoices.STATUS_PLANNED,
+        )
+        Location.objects.create(
+            site=sites[0],
+            name='GraphQL Staging Location',
+            slug='graphql-staging-location',
+            parent=parent_locations[0],
+            status=LocationStatusChoices.STATUS_STAGING,
+        )
 
         cls.create_data = [
             {

+ 67 - 1
netbox/netbox/tests/test_graphql.py

@@ -10,7 +10,7 @@ from strawberry.schema.config import StrawberryConfig
 
 from dcim.choices import LocationStatusChoices
 from dcim.models import Device, DeviceRole, DeviceType, Location, Manufacturer, Site, VirtualChassis
-from extras.models import TableConfig
+from extras.models import TableConfig, Tag
 from netbox.graphql.scalars import BigInt, BigIntScalar
 from netbox.graphql.schema import Query, get_schema_extensions
 from utilities.tables import get_table_for_model
@@ -185,6 +185,72 @@ class GraphQLAPITestCase(APITestCase):
         self.assertNotIn('errors', data)
         self.assertEqual(len(data['data']['site']['locations']), 0)
 
+    @override_settings(LOGIN_REQUIRED=True)
+    def test_graphql_nested_filter_objects(self):
+        """
+        Test filtering of nested GraphQL object lists.
+        """
+        self.add_permissions('dcim.view_site', 'dcim.view_location', 'extras.view_tag')
+
+        site = Site.objects.create(
+            name='Nested Filter Site',
+            slug='nested-filter-site'
+        )
+
+        # Location is MPTT-managed; bulk_create skips tree-init hooks. Use per-instance create.
+        Location.objects.create(
+            site=site,
+            name='Nested Active 1',
+            slug='nested-active-1',
+            status=LocationStatusChoices.STATUS_ACTIVE,
+        )
+        Location.objects.create(
+            site=site,
+            name='Nested Active 2',
+            slug='nested-active-2',
+            status=LocationStatusChoices.STATUS_ACTIVE,
+        )
+        Location.objects.create(
+            site=site,
+            name='Nested Planned',
+            slug='nested-planned',
+            status=LocationStatusChoices.STATUS_PLANNED,
+        )
+
+        planned = Tag.objects.create(name='Planned', slug='planned')
+        production = Tag.objects.create(name='Production', slug='production')
+        staging = Tag.objects.create(name='Staging', slug='staging')
+        site.tags.add(planned, production, staging)
+
+        url = reverse('graphql')
+        query = f"""
+        {{
+          site(id: {site.pk}) {{
+            locations(filters: {{status: {{exact: STATUS_ACTIVE}}}}) {{
+              name
+            }}
+            tags(filters: {{name: {{i_starts_with: "P"}}}}) {{
+              name
+            }}
+          }}
+        }}
+        """
+
+        response = self.client.post(url, data={'query': query}, format="json", **self.header)
+        self.assertHttpStatus(response, status.HTTP_200_OK)
+
+        data = json.loads(response.content)
+        self.assertNotIn('errors', data)
+
+        self.assertEqual(
+            {location['name'] for location in data['data']['site']['locations']},
+            {'Nested Active 1', 'Nested Active 2'}
+        )
+        self.assertEqual(
+            {tag['name'] for tag in data['data']['site']['tags']},
+            {'Planned', 'Production'}
+        )
+
     def test_graphql_integer_range_lookup(self):
         """
         Test that range_lookup works for integer fields (e.g. vc_position). Regression test for #20468.

+ 766 - 20
netbox/utilities/testing/api.py

@@ -1,21 +1,50 @@
 import copy
+import importlib
 import inspect
 import json
+import types
+import typing
+from collections.abc import Callable
+from dataclasses import dataclass
+from decimal import Decimal
 
+import strawberry
 import strawberry_django
 from django.conf import settings
 from django.contrib.contenttypes.models import ContentType
+from django.contrib.postgres.fields import ArrayField
+from django.db import models
 from django.test import override_settings
 from django.urls import reverse
+from django.utils.module_loading import import_string
 from rest_framework import status
 from rest_framework.test import APIClient
 from strawberry.types.base import StrawberryList, StrawberryOptional
 from strawberry.types.lazy_type import LazyType
 from strawberry.types.union import StrawberryUnion
+from strawberry_django import (
+    BaseFilterLookup,
+    ComparisonFilterLookup,
+    DateFilterLookup,
+    DatetimeFilterLookup,
+    FilterLookup,
+    RangeLookup,
+    StrFilterLookup,
+    TimeFilterLookup,
+)
 
 from core.choices import ObjectChangeActionChoices
 from core.models import ObjectChange, ObjectType
 from ipam.graphql.types import IPAddressFamilyType
+from netbox.graphql.filter_lookups import (
+    ArrayLookup,
+    BigIntegerLookup,
+    FloatLookup,
+    IntegerLookup,
+    IntegerRangeArrayLookup,
+    JSONFilter,
+    TreeNodeFilter,
+)
 from netbox.models.features import ChangeLoggingMixin
 from users.constants import TOKEN_PREFIX
 from users.models import ObjectPermission, Token, User
@@ -28,9 +57,48 @@ from .utils import disable_logging, disable_warnings, get_random_string
 __all__ = (
     'APITestCase',
     'APIViewTestCases',
+    'GraphQLFilterTest',
+    'GraphQLQueryTest',
 )
 
 
+@dataclass(frozen=True)
+class GraphQLFilterTest:
+    """
+    Declarative GraphQL filter test case for APIViewTestCases.GraphQLTestCase.
+
+    ``filters`` is the raw content to place inside the GraphQL ``filters`` input,
+    e.g. ``name: {i_contains: "site"}``.
+
+    ``expected`` may be a callable accepting the model queryset, an ORM filter
+    dict, a queryset, an iterable of model instances, or an iterable of object
+    IDs. When omitted, the test only asserts that the filter returns at least one
+    result; this preserves compatibility with the legacy ``graphql_filter``
+    attribute.
+    """
+    name: str
+    filters: str
+    expected: object = None
+    permissions: tuple[str, ...] = ()
+
+
+@dataclass(frozen=True)
+class GraphQLQueryTest:
+    """
+    Declarative GraphQL query test case for model-specific complex queries.
+
+    ``assert_result`` is called as ``assert_result(testcase, data)`` where
+    ``testcase`` is the running ``GraphQLTestCase`` instance (use it for
+    ``testcase.assertEqual`` etc.) and ``data`` is the decoded GraphQL
+    ``data`` object (the inner ``response.json()['data']``, not the full HTTP
+    response).
+    """
+    name: str
+    query: str
+    assert_result: Callable
+    permissions: tuple[str, ...] = ()
+
+
 #
 # REST/GraphQL API Tests
 #
@@ -556,6 +624,21 @@ class APIViewTestCases:
                         message=changelog_message)
 
     class GraphQLTestCase(APITestCase):
+        graphql_auto_filter_tests = True
+        graphql_auto_filter_exclude = ()
+
+        # Cap fields per lookup kind to keep test counts balanced across kinds
+        # (string fields shouldn't crowd out numeric/date/array fields).
+        graphql_auto_filter_fields_per_kind = 2
+
+        # Fail when auto mode is on and no tests were generated.
+        graphql_auto_filter_required = True
+
+        # Additional explicit-list filter cases as GraphQLFilterTest instances.
+        graphql_filter_tests = ()
+
+        # Additional full-query cases (e.g. nested filters) as GraphQLQueryTest instances.
+        graphql_query_tests = ()
 
         def _get_graphql_base_name(self):
             """
@@ -622,26 +705,627 @@ class APIViewTestCases:
 
             return query
 
+        @staticmethod
+        def _graphql_literal(value):
+            """
+            Render a Python value as a GraphQL literal.
+            """
+            if value is None:
+                return 'null'
+            if isinstance(value, bool):
+                return 'true' if value else 'false'
+            if isinstance(value, (int, float)):
+                return str(value)
+            if isinstance(value, Decimal):
+                return str(float(value))
+            if isinstance(value, (list, tuple)):
+                items = ', '.join(
+                    APIViewTestCases.GraphQLTestCase._graphql_literal(v) for v in value
+                )
+                return f'[{items}]'
+            if isinstance(value, str):
+                return json.dumps(value)
+
+            return json.dumps(str(value))
+
+        def _render_graphql_filter_value(self, params):
+            """
+            Render the legacy graphql_filter dict value to a GraphQL filter value.
+            """
+            if isinstance(params, str):
+                return params
+
+            if not isinstance(params, dict):
+                return self._graphql_literal(params)
+
+            lookup = params.get('lookup')
+            value = params['value']
+
+            if lookup:
+                return f'{{{lookup}: {self._graphql_literal(value)}}}'
+
+            return self._graphql_literal(value)
+
+        def _build_graphql_filter_string(self, **filters):
+            if not filters:
+                return ''
+
+            filter_expressions = [
+                f'{field_name}: {self._render_graphql_filter_value(params)}'
+                for field_name, params in filters.items()
+            ]
+
+            return f'(filters: {{{", ".join(filter_expressions)}}})'
+
         def _build_filtered_query(self, name, **filters):
             """
             Create a filtered query: i.e. device_list(filters: {name: {i_contains: "akron"}}){.
             """
-            # TODO: This should be extended to support AND, OR multi-lookups
-            if filters:
-                for field_name, params in filters.items():
-                    lookup = params['lookup']
-                    value = params['value']
-                    if lookup:
-                        query = f'{{{lookup}: "{value}"}}'
-                        filter_string = f'{field_name}: {query}'
-                    else:
-                        filter_string = f'{field_name}: "{value}"'
-                filter_string = f'(filters: {{{filter_string}}})'
-            else:
-                filter_string = ''
+            filter_string = self._build_graphql_filter_string(**filters)
 
             return self._build_query_with_filter(name, filter_string)
 
+        def _build_graphql_id_list_query(self, name, filters):
+            filter_string = f'(filters: {{{filters}}})' if filters else ''
+            selection = 'id' if self._graphql_type_exposes_id() else '__typename'
+
+            return f"""
+            {{
+              {name}{filter_string} {{
+                {selection}
+              }}
+            }}
+            """
+
+        def _graphql_type_exposes_id(self):
+            """
+            Return True when the model's GraphQL type exposes ``id`` as a
+            queryable selection. Some NetBox types (e.g. Notification,
+            Subscription) omit ``id`` from the output type; for those, the
+            assertion path falls back to length-only comparison.
+            """
+            type_class = get_graphql_type_for_model(self.model)
+            strawberry_definition = getattr(type_class, '__strawberry_definition__', None)
+            if strawberry_definition is None:
+                return False
+            return any(field.name == 'id' for field in strawberry_definition.fields)
+
+        def _get_model_graphql_filter_class(self, model=None):
+            """
+            Return the model's GraphQL filter class, if one follows NetBox's
+            conventional <app>.graphql.filters.<Model>Filter path. ``None`` if
+            the filter module (or any of its parent packages) is absent or the
+            class is not present in the module. Import errors originating
+            inside an existing filter module are re-raised.
+            """
+            model = model or self.model
+            module_path = f'{model._meta.app_label}.graphql.filters'
+            class_name = f'{model.__name__}Filter'
+
+            try:
+                module = importlib.import_module(module_path)
+            except ModuleNotFoundError as exc:
+                # Treat both "<app>.graphql.filters" absent and any missing
+                # parent (e.g. "<app>.graphql" or "<app>") as "no conventional
+                # filter class". Real ImportErrors from inside an existing
+                # filter module still propagate.
+                if exc.name == module_path or module_path.startswith(f'{exc.name}.'):
+                    return None
+                raise
+
+            return getattr(module, class_name, None)
+
+        def _get_graphql_filter_field_names(self):
+            """
+            Return the names exposed by the model's GraphQL filter input, sourced
+            only from the conventional <app>.graphql.filters.<Model>Filter path.
+            """
+            filter_class = self._get_model_graphql_filter_class()
+            if filter_class is None:
+                return set()
+
+            return self._collect_filter_class_annotation_names(filter_class)
+
+        @staticmethod
+        def _collect_filter_class_annotation_names(filter_class):
+            field_names = set()
+            for cls in reversed(getattr(filter_class, '__mro__', ())):
+                field_names.update(
+                    field_name for field_name in getattr(cls, '__annotations__', {})
+                    if not field_name.startswith('_')
+                )
+            return field_names
+
+        def _assert_graphql_filter_class_present(self, filter_fields, handwritten_tests=()):
+            """
+            Raise when the model has no discoverable filter class or the class
+            declares no fields. Skipped when auto-filter generation is disabled,
+            the per-model opt-out attribute is set, or hand-written (legacy or
+            explicit) filter tests are declared for the model.
+            """
+            if handwritten_tests:
+                return
+            if not getattr(self, 'graphql_auto_filter_required', True):
+                return
+            if not getattr(self, 'graphql_auto_filter_tests', True):
+                return
+
+            label = self.model._meta.label
+            path = f'{self.model._meta.app_label}.graphql.filters.{self.model.__name__}Filter'
+
+            filter_class = self._get_model_graphql_filter_class()
+            self.assertIsNotNone(
+                filter_class,
+                f'No GraphQL filter class found for {label} at {path}. '
+                f'Set graphql_auto_filter_required = False on this test case if intentional.'
+            )
+            self.assertTrue(
+                filter_fields,
+                f'GraphQL filter class for {label} declares no fields. '
+                f'Set graphql_auto_filter_required = False on this test case if intentional.'
+            )
+
+        def _get_nonempty_field_value(self, field):
+            queryset = self._get_queryset()
+
+            if getattr(field, 'null', False):
+                queryset = queryset.exclude(**{f'{field.name}__isnull': True})
+
+            if isinstance(field, (models.CharField, models.TextField)):
+                queryset = queryset.exclude(**{field.name: ''})
+
+            return queryset.values_list(field.name, flat=True).first()
+
+        def _get_model_field_for_filter_field(self, field_name):
+            """
+            Find the Django model field matching a filter field name. Filter
+            fields are declared with either the model field name (e.g. `name`)
+            or the FK attname (e.g. `tenant_id`).
+            """
+            for field in self.model._meta.fields:
+                if field.name == field_name or getattr(field, 'attname', None) == field_name:
+                    return field
+            return None
+
+        def _iter_filter_class_annotations(self, filter_class):
+            """
+            Yield (field_name, annotation) pairs for the filter class, walking
+            its MRO so inherited fields surface. Subclass annotations override
+            inherited ones (private `_`-prefixed names are skipped).
+            """
+            annotations = {}
+            for cls in reversed(filter_class.__mro__):
+                annotations.update({
+                    name: ann for name, ann in getattr(cls, '__annotations__', {}).items()
+                    if not name.startswith('_')
+                })
+            yield from annotations.items()
+
+        @staticmethod
+        def _unwrap_filter_annotation(annotation):
+            """
+            Strip ``X | None`` / ``Optional[X]`` and ``Annotated[X, ...]``
+            layers. Resolve `strawberry.lazy('...')` metadata so lazily-annotated
+            lookup types (e.g. ``Annotated['FloatLookup', strawberry.lazy('mod')] | None``)
+            are returned as the actual class. When an ``Annotated`` layer carries
+            multiple metadata entries, the first ``module``-bearing entry wins.
+            Returns None when the inner type cannot be resolved.
+            """
+            if annotation is None:
+                return None
+
+            lazy_module = None
+            # Cap iterations at 8: typical NetBox annotations nest at most 3 layers
+            # (Union > Annotated > ForwardRef). 8 is a generous safety net to
+            # prevent infinite loops on pathological / future annotation shapes.
+            for _ in range(8):
+                origin = typing.get_origin(annotation)
+                args = typing.get_args(annotation)
+
+                if origin in (typing.Union, types.UnionType):
+                    non_none = [a for a in args if a is not type(None)]
+                    if len(non_none) != 1:
+                        return None
+                    annotation = non_none[0]
+                    continue
+
+                if hasattr(annotation, '__metadata__'):
+                    for meta in annotation.__metadata__:
+                        module_name = getattr(meta, 'module', None)
+                        if module_name:
+                            lazy_module = module_name
+                            break
+                    inner = args[0] if args else None
+                    if inner is None:
+                        return None
+                    annotation = inner
+                    continue
+
+                break
+
+            if isinstance(annotation, (str, typing.ForwardRef)):
+                if lazy_module is None:
+                    return None
+                name = annotation.__forward_arg__ if isinstance(annotation, typing.ForwardRef) else annotation
+                try:
+                    return import_string(f'{lazy_module}.{name}')
+                except ImportError:
+                    return None
+
+            return annotation
+
+        @classmethod
+        def _classify_filter_annotation(cls, annotation):
+            """
+            Resolve a filter field annotation to a (kind, kind_arg) tuple keyed
+            on the declared GraphQL lookup type. Returns (None, None) for
+            annotations the dispatcher does not handle (those fields are
+            silently skipped).
+            """
+            annotation = cls._unwrap_filter_annotation(annotation)
+            if annotation is None or isinstance(annotation, str):
+                return None, None
+
+            if annotation is strawberry.ID:
+                return 'id', None
+
+            origin = typing.get_origin(annotation)
+            target = origin if isinstance(origin, type) else annotation
+            type_args = typing.get_args(annotation)
+
+            if not isinstance(target, type):
+                return None, None
+
+            if target in (IntegerLookup, BigIntegerLookup, FloatLookup):
+                return 'numeric', target
+
+            # TreeNodeFilter schema requires {id, match_type}; skip auto-emit.
+            if target is TreeNodeFilter:
+                return None, None
+
+            if issubclass(target, (DateFilterLookup, DatetimeFilterLookup, TimeFilterLookup)):
+                return 'date_lookup', None
+
+            if target is RangeLookup or issubclass(target, RangeLookup):
+                return 'range_lookup', type_args[0] if type_args else None
+
+            if issubclass(target, ArrayLookup):
+                return 'array_lookup', None
+            if target is IntegerRangeArrayLookup or issubclass(target, IntegerRangeArrayLookup):
+                return 'range_array_lookup', None
+            if target is JSONFilter:
+                # JSONFilter requires explicit (path, typed lookup); no general auto shape.
+                return None, None
+
+            if issubclass(target, StrFilterLookup):
+                return 'str_lookup', None
+            if issubclass(target, ComparisonFilterLookup):
+                return 'comparison_lookup', type_args[0] if type_args else None
+            if issubclass(target, FilterLookup):
+                return 'filter_lookup', type_args[0] if type_args else None
+            # Enum-typed BaseFilterLookup needs an enum literal; skip auto-emit.
+            if issubclass(target, BaseFilterLookup):
+                return None, None
+
+            return None, None
+
+        def _emit_id_filter_tests(self, field_name, _kind_arg):
+            if field_name == 'id':
+                instance = self._get_queryset().first()
+                if instance is None:
+                    return
+                yield GraphQLFilterTest(
+                    name='id__exact',
+                    filters=f'id: {self._graphql_literal(str(instance.pk))}',
+                    expected=lambda qs, pk=instance.pk: qs.filter(pk=pk),
+                )
+                return
+
+            model_field = self._get_model_field_for_filter_field(field_name)
+            if model_field is None or not isinstance(model_field, models.ForeignKey):
+                return
+            queryset = self._get_queryset().exclude(**{f'{model_field.name}__isnull': True})
+            value = queryset.values_list(model_field.attname, flat=True).first()
+            if value is None:
+                return
+            yield GraphQLFilterTest(
+                name=f'{field_name}__exact',
+                filters=f'{field_name}: {self._graphql_literal(str(value))}',
+                expected=lambda qs, attname=model_field.attname, v=value: qs.filter(**{attname: v}),
+            )
+
+        def _emit_str_lookup_filter_tests(self, field_name, _kind_arg):
+            model_field = self._get_model_field_for_filter_field(field_name)
+            if model_field is None:
+                return
+            value = self._get_nonempty_field_value(model_field)
+            if value in (None, ''):
+                return
+            value = str(value)
+            token = max(1, min(3, len(value)))
+            lookups = (
+                ('exact', 'exact', value),
+                ('i_contains', 'icontains', value[:token]),
+                ('i_starts_with', 'istartswith', value[:token]),
+                ('i_ends_with', 'iendswith', value[-token:]),
+            )
+            for lookup, orm_lookup, filter_value in lookups:
+                yield GraphQLFilterTest(
+                    name=f'{field_name}__{lookup}',
+                    filters=f'{field_name}: {{{lookup}: {self._graphql_literal(filter_value)}}}',
+                    expected=(
+                        lambda qs, fn=model_field.name, ol=orm_lookup, v=filter_value:
+                        qs.filter(**{f'{fn}__{ol}': v})
+                    ),
+                )
+
+        def _emit_filter_lookup_filter_tests(self, field_name, type_arg):
+            model_field = self._get_model_field_for_filter_field(field_name)
+            if model_field is None:
+                return
+            value = self._get_nonempty_field_value(model_field)
+            if value is None:
+                return
+            if type_arg is bool or isinstance(value, bool):
+                yield GraphQLFilterTest(
+                    name=f'{field_name}__exact',
+                    filters=f'{field_name}: {{exact: {self._graphql_literal(value)}}}',
+                    expected=lambda qs, fn=model_field.name, v=value: qs.filter(**{fn: v}),
+                )
+                return
+            yield GraphQLFilterTest(
+                name=f'{field_name}__exact',
+                filters=f'{field_name}: {{exact: {self._graphql_literal(value)}}}',
+                expected=lambda qs, fn=model_field.name, v=value: qs.filter(**{f'{fn}__exact': v}),
+            )
+
+        def _emit_comparison_lookup_filter_tests(self, field_name, _type_arg):
+            model_field = self._get_model_field_for_filter_field(field_name)
+            if model_field is None:
+                return
+            value = self._get_nonempty_field_value(model_field)
+            if value is None:
+                return
+            yield GraphQLFilterTest(
+                name=f'{field_name}__exact',
+                filters=f'{field_name}: {{exact: {self._graphql_literal(value)}}}',
+                expected=lambda qs, fn=model_field.name, v=value: qs.filter(**{f'{fn}__exact': v}),
+            )
+
+        def _emit_numeric_filter_tests(self, field_name, _type_arg):
+            # NetBox numeric wrapper: {filter_lookup: {exact: N}}.
+            model_field = self._get_model_field_for_filter_field(field_name)
+            if model_field is None:
+                return
+            if isinstance(model_field, ArrayField):
+                return
+            value = self._get_nonempty_field_value(model_field)
+            if value is None:
+                return
+            if isinstance(value, Decimal):
+                value = float(value)
+            yield GraphQLFilterTest(
+                name=f'{field_name}__filter_lookup__exact',
+                filters=(
+                    f'{field_name}: {{filter_lookup: '
+                    f'{{exact: {self._graphql_literal(value)}}}}}'
+                ),
+                expected=lambda qs, fn=model_field.name, v=value: qs.filter(**{f'{fn}__exact': v}),
+            )
+
+        def _emit_date_lookup_filter_tests(self, field_name, _kind_arg):
+            model_field = self._get_model_field_for_filter_field(field_name)
+            if model_field is None:
+                return
+            value = self._get_nonempty_field_value(model_field)
+            if value is None:
+                return
+            iso_value = value.isoformat() if hasattr(value, 'isoformat') else str(value)
+            yield GraphQLFilterTest(
+                name=f'{field_name}__exact',
+                filters=f'{field_name}: {{exact: "{iso_value}"}}',
+                expected=lambda qs, fn=model_field.name, v=value: qs.filter(**{fn: v}),
+            )
+
+        def _emit_range_lookup_filter_tests(self, field_name, _kind_arg):
+            model_field = self._get_model_field_for_filter_field(field_name)
+            if model_field is None:
+                return
+            aggregates = self._get_queryset().aggregate(
+                _min=models.Min(model_field.name), _max=models.Max(model_field.name),
+            )
+            start, end = aggregates['_min'], aggregates['_max']
+            if start is None or end is None or start == end:
+                return
+            yield GraphQLFilterTest(
+                name=f'{field_name}__range_lookup',
+                filters=(
+                    f'{field_name}: {{range_lookup: '
+                    f'{{start: {self._graphql_literal(start)}, end: {self._graphql_literal(end)}}}}}'
+                ),
+                expected=(
+                    lambda qs, fn=model_field.name, lo=start, hi=end:
+                    qs.filter(**{f'{fn}__gte': lo, f'{fn}__lte': hi})
+                ),
+            )
+
+        def _emit_array_lookup_filter_tests(self, field_name, _kind_arg):
+            model_field = self._get_model_field_for_filter_field(field_name)
+            if model_field is None:
+                return
+            if not isinstance(model_field, ArrayField):
+                return
+            queryset = self._get_queryset().exclude(**{field_name: []})
+            sample = queryset.values_list(field_name, flat=True).first()
+            if not sample:
+                return
+            element = sample[0]
+            yield GraphQLFilterTest(
+                name=f'{field_name}__contains',
+                filters=(
+                    f'{field_name}: {{contains: [{self._graphql_literal(element)}]}}'
+                ),
+                expected=(
+                    lambda qs, fn=model_field.name, v=element: qs.filter(**{f'{fn}__contains': [v]})
+                ),
+            )
+
+        def _emit_range_array_lookup_filter_tests(self, field_name, _kind_arg):
+            model_field = self._get_model_field_for_filter_field(field_name)
+            if model_field is None:
+                return
+            queryset = self._get_queryset().exclude(**{f'{field_name}__isnull': True})
+            sample = queryset.values_list(field_name, flat=True).first()
+            if not sample:
+                return
+            first_range = sample[0]
+            lower = getattr(first_range, 'lower', None)
+            if lower is None:
+                return
+            yield GraphQLFilterTest(
+                name=f'{field_name}__contains',
+                filters=f'{field_name}: {{contains: {self._graphql_literal(lower)}}}',
+                expected=(
+                    lambda qs, fn=model_field.name, v=lower: qs.filter(**{f'{fn}__range_contains': v})
+                ),
+            )
+
+        def _iter_auto_graphql_filter_tests(self):
+            if not getattr(self, 'graphql_auto_filter_tests', True):
+                return
+
+            filter_class = self._get_model_graphql_filter_class()
+            if filter_class is None:
+                return
+
+            exclude = set(getattr(self, 'graphql_auto_filter_exclude', ()))
+            per_kind = self.graphql_auto_filter_fields_per_kind
+
+            # Bucket eligible fields by lookup kind so per-kind budgeting balances coverage.
+            by_kind: dict[str, list[tuple[str, object]]] = {}
+            for field_name, annotation in self._iter_filter_class_annotations(filter_class):
+                if field_name in exclude:
+                    continue
+                kind, kind_arg = self._classify_filter_annotation(annotation)
+                if kind is None:
+                    continue
+                by_kind.setdefault(kind, []).append((field_name, kind_arg))
+
+            # Emit per-kind; the cap counts SUCCESSFUL emissions, not candidate fields, so
+            # early null/empty fields don't shadow later fields with usable fixture data.
+            for kind, fields in by_kind.items():
+                emitter = getattr(self, f'_emit_{kind}_filter_tests', None)
+                if emitter is None:
+                    continue
+
+                emitted_fields = 0
+                for field_name, kind_arg in fields:
+                    tests = list(emitter(field_name, kind_arg))
+                    if not tests:
+                        continue
+                    yield from tests
+                    emitted_fields += 1
+                    if emitted_fields >= per_kind:
+                        break
+
+        def _iter_legacy_graphql_filter_tests(self):
+            if not hasattr(self, 'graphql_filter'):
+                return
+
+            filter_expressions = [
+                f'{field_name}: {self._render_graphql_filter_value(params)}'
+                for field_name, params in self.graphql_filter.items()
+            ]
+
+            yield GraphQLFilterTest(
+                name='graphql_filter',
+                filters=', '.join(filter_expressions),
+            )
+
+        def _coerce_graphql_filter_test(self, filter_test):
+            if isinstance(filter_test, GraphQLFilterTest):
+                return filter_test
+
+            filter_test = dict(filter_test)
+            if 'filter' in filter_test and 'filters' not in filter_test:
+                filter_test['filters'] = filter_test.pop('filter')
+
+            return GraphQLFilterTest(**filter_test)
+
+        def _iter_explicit_graphql_filter_tests(self):
+            for filter_test in getattr(self, 'graphql_filter_tests', ()):
+                yield self._coerce_graphql_filter_test(filter_test)
+
+        def _get_expected_id_set(self, filter_test):
+            expected = filter_test.expected
+
+            if callable(expected):
+                expected = expected(self._get_queryset())
+
+            if isinstance(expected, dict):
+                expected = self._get_queryset().filter(**expected)
+
+            if hasattr(expected, 'values_list'):
+                values = expected.distinct().values_list('pk', flat=True)
+            else:
+                values = [getattr(value, 'pk', value) for value in expected]
+
+            return {str(value) for value in values}
+
+        def _assert_graphql_filter_test(self, url, field_name, filter_test):
+            query = self._build_graphql_id_list_query(field_name, filter_test.filters)
+
+            for permission in filter_test.permissions:
+                self.add_permissions(permission)
+
+            response = self.client.post(url, data={'query': query}, format="json", **self.header)
+            self.assertHttpStatus(response, status.HTTP_200_OK)
+
+            data = json.loads(response.content)
+            self.assertNotIn('errors', data)
+
+            results = data['data'][field_name]
+
+            if filter_test.expected is None:
+                self.assertGreater(len(results), 0)
+                return
+
+            expected_ids = self._get_expected_id_set(filter_test)
+
+            self.assertGreater(
+                len(expected_ids), 0,
+                msg=(
+                    f'{self.model._meta.label}: filter "{filter_test.name}" produced an empty '
+                    f'expected set; the test would tautologically pass. Adjust fixtures or the '
+                    f'filter so the expected ORM queryset is non-empty.'
+                ),
+            )
+
+            if self._graphql_type_exposes_id():
+                result_ids = [str(result['id']) for result in results]
+                self.assertEqual(
+                    set(result_ids), expected_ids,
+                    msg=f'{self.model._meta.label}: filter "{filter_test.name}" ID set mismatch',
+                )
+
+            self.assertEqual(
+                len(results), len(expected_ids),
+                msg=(
+                    f'{self.model._meta.label}: filter "{filter_test.name}" result count mismatch '
+                    f'(GraphQL type does not expose id; comparing by length).'
+                ),
+            )
+
+        def _coerce_graphql_query_test(self, query_test):
+            if isinstance(query_test, GraphQLQueryTest):
+                return query_test
+
+            query_test = dict(query_test)
+            if 'assertion' in query_test and 'assert_result' not in query_test:
+                query_test['assert_result'] = query_test.pop('assertion')
+
+            return GraphQLQueryTest(**query_test)
+
         def _build_query(self, name, **filters):
             """
             Create a normal query - unfiltered or with a string query: i.e. site(name: "aaa"){.
@@ -740,14 +1424,44 @@ class APIViewTestCases:
             self.assertNotIn('errors', data)
             self.assertEqual(len(data['data'][field_name]), self.model.objects.count())
 
+        def _assert_graphql_filter_tests_exist(self, auto_tests, legacy_tests, explicit_tests):
+            """
+            Fail loudly when auto mode is required and no GraphQL filter tests
+            (auto, legacy, or explicit) exist for the current model.
+            """
+            if (
+                getattr(self, 'graphql_auto_filter_tests', True)
+                and getattr(self, 'graphql_auto_filter_required', True)
+                and not auto_tests
+                and not legacy_tests
+                and not explicit_tests
+            ):
+                self.fail(
+                    f'No GraphQL filter tests were generated for {self.model._meta.label}. '
+                    f'Set graphql_auto_filter_required = False or add explicit graphql_filter_tests '
+                    f'if intentional.'
+                )
+
         @override_settings(LOGIN_REQUIRED=True)
         def test_graphql_filter_objects(self):
-            if not hasattr(self, 'graphql_filter'):
+            legacy_tests = list(self._iter_legacy_graphql_filter_tests())
+            explicit_tests = list(self._iter_explicit_graphql_filter_tests())
+
+            filter_fields = self._get_graphql_filter_field_names()
+            self._assert_graphql_filter_class_present(
+                filter_fields, handwritten_tests=[*legacy_tests, *explicit_tests]
+            )
+
+            auto_tests = list(self._iter_auto_graphql_filter_tests())
+
+            self._assert_graphql_filter_tests_exist(auto_tests, legacy_tests, explicit_tests)
+
+            filter_tests = [*auto_tests, *legacy_tests, *explicit_tests]
+            if not filter_tests:
                 return
 
             url = reverse('graphql')
             field_name = f'{self._get_graphql_base_name()}_list'
-            query = self._build_filtered_query(field_name, **self.graphql_filter)
 
             # Add object-level permission
             obj_perm = ObjectPermission(
@@ -758,11 +1472,43 @@ class APIViewTestCases:
             obj_perm.users.add(self.user)
             obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
 
-            response = self.client.post(url, data={'query': query}, format="json", **self.header)
-            self.assertHttpStatus(response, status.HTTP_200_OK)
-            data = json.loads(response.content)
-            self.assertNotIn('errors', data)
-            self.assertGreater(len(data['data'][field_name]), 0)
+            for filter_test in filter_tests:
+                with self.subTest(filter=filter_test.name):
+                    self._assert_graphql_filter_test(url, field_name, filter_test)
+
+        @override_settings(LOGIN_REQUIRED=True)
+        def test_graphql_extra_queries(self):
+            query_tests = [
+                self._coerce_graphql_query_test(query_test)
+                for query_test in getattr(self, 'graphql_query_tests', ())
+            ]
+
+            if not query_tests:
+                return
+
+            url = reverse('graphql')
+
+            # Add object-level permission for this model. Additional permissions
+            # required by the query can be declared on the GraphQLQueryTest.
+            obj_perm = ObjectPermission(
+                name='Test permission',
+                actions=['view']
+            )
+            obj_perm.save()
+            obj_perm.users.add(self.user)
+            obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
+
+            for query_test in query_tests:
+                with self.subTest(query=query_test.name):
+                    for permission in query_test.permissions:
+                        self.add_permissions(permission)
+
+                    response = self.client.post(url, data={'query': query_test.query}, format="json", **self.header)
+                    self.assertHttpStatus(response, status.HTTP_200_OK)
+
+                    data = json.loads(response.content)
+                    self.assertNotIn('errors', data)
+                    query_test.assert_result(self, data['data'])
 
     class APIViewTestCase(
         GetObjectViewTestCase,

+ 260 - 0
netbox/utilities/tests/test_api_graphql.py

@@ -0,0 +1,260 @@
+"""Tests for the GraphQL filter test framework in utilities/testing/api.py."""
+
+import sys
+import types
+from decimal import Decimal
+from typing import Annotated
+
+import strawberry
+from django.test import TestCase
+
+from netbox.graphql.filter_lookups import FloatLookup
+from utilities.testing.api import APIViewTestCases, GraphQLFilterTest
+
+
+class GraphQLFilterAnnotationMROTestCase(TestCase):
+    """Cover MRO override, import error propagation, lazy annotation resolution, and the zero-auto-test gate."""
+
+    def test_subclass_annotation_overrides_base(self):
+        """Subclass annotations win over base in `_iter_filter_class_annotations`."""
+        class Base:
+            __annotations__ = {'shared': int}
+
+        class Sub(Base):
+            __annotations__ = {'shared': str}
+
+        # Stand up a throwaway instance just to access the method as bound.
+        instance = APIViewTestCases.GraphQLTestCase()
+        pairs = dict(instance._iter_filter_class_annotations(Sub))
+        self.assertEqual(pairs['shared'], str)
+
+    def test_get_filter_class_propagates_real_import_errors(self):
+        """A broken import inside a model's filters module must surface, not silently return None."""
+        broken = types.ModuleType('netbox_broken_filter_fixture.graphql.filters')
+
+        def _raise(*args, **kwargs):
+            raise ImportError('simulated downstream breakage')
+
+        broken.__getattr__ = _raise
+        # sys.modules mutation is safe under --parallel (separate processes), not threads.
+        sys.modules['netbox_broken_filter_fixture'] = types.ModuleType('netbox_broken_filter_fixture')
+        sys.modules['netbox_broken_filter_fixture.graphql'] = types.ModuleType(
+            'netbox_broken_filter_fixture.graphql'
+        )
+        sys.modules['netbox_broken_filter_fixture.graphql.filters'] = broken
+
+        try:
+            class FakeMeta:
+                app_label = 'netbox_broken_filter_fixture'
+
+            class FakeModel:
+                _meta = FakeMeta()
+                __name__ = 'BrokenModel'
+
+            instance = APIViewTestCases.GraphQLTestCase()
+            with self.assertRaises(ImportError):
+                instance._get_model_graphql_filter_class(FakeModel)
+        finally:
+            for key in (
+                'netbox_broken_filter_fixture',
+                'netbox_broken_filter_fixture.graphql',
+                'netbox_broken_filter_fixture.graphql.filters',
+            ):
+                sys.modules.pop(key, None)
+
+    def test_zero_auto_filter_tests_fails_loudly(self):
+        """Helper fails when auto mode is required and no tests of any kind exist."""
+
+        class FakeMeta:
+            label = 'fake.FakeModel'
+
+        class FakeModel:
+            _meta = FakeMeta()
+
+        class Case(APIViewTestCases.GraphQLTestCase):
+            model = FakeModel
+            graphql_auto_filter_tests = True
+            graphql_auto_filter_required = True
+
+        case = Case()
+        with self.assertRaisesRegex(AssertionError, r'No GraphQL filter tests.*fake\.FakeModel'):
+            case._assert_graphql_filter_tests_exist([], [], [])
+
+    def test_lazy_annotated_lookup_resolves(self):
+        """Annotated['FloatLookup', strawberry.lazy(...)] | None resolves to FloatLookup."""
+        annotation = Annotated['FloatLookup', strawberry.lazy('netbox.graphql.filter_lookups')] | None
+        self.assertIs(
+            APIViewTestCases.GraphQLTestCase._unwrap_filter_annotation(annotation),
+            FloatLookup,
+        )
+
+    def test_str_lookup_emits_all_four_variants(self):
+        """`_emit_str_lookup_filter_tests` emits exact, i_contains, i_starts_with, i_ends_with."""
+        captured_value = 'production'
+
+        class FakeMeta:
+            label = 'fake.FakeModel'
+            app_label = 'fake'
+
+        class Case(APIViewTestCases.GraphQLTestCase):
+            def _get_model_field_for_filter_field(self, field_name):
+                class FakeField:
+                    name = field_name
+                return FakeField()
+
+            def _get_nonempty_field_value(self, field):
+                return captured_value
+
+            def _graphql_literal(self, value):
+                return f'"{value}"'
+
+        case = Case()
+        tests = list(case._emit_str_lookup_filter_tests('name', None))
+        self.assertEqual(
+            [t.name for t in tests],
+            ['name__exact', 'name__i_contains', 'name__i_starts_with', 'name__i_ends_with'],
+        )
+
+    def test_explicit_tests_satisfy_auto_required_gate(self):
+        """Helper does NOT fail when explicit tests exist, even if auto/legacy are empty."""
+
+        class FakeMeta:
+            label = 'fake.FakeModel'
+
+        class FakeModel:
+            _meta = FakeMeta()
+
+        class Case(APIViewTestCases.GraphQLTestCase):
+            model = FakeModel
+            graphql_auto_filter_tests = True
+            graphql_auto_filter_required = True
+
+        case = Case()
+        # Should not raise.
+        case._assert_graphql_filter_tests_exist(
+            auto_tests=[],
+            legacy_tests=[],
+            explicit_tests=[GraphQLFilterTest(name='x', filters='x: 1')],
+        )
+
+    def test_graphql_literal_renders_lists(self):
+        """List and tuple values render as GraphQL list literals, not quoted strings."""
+        literal = APIViewTestCases.GraphQLTestCase._graphql_literal
+        self.assertEqual(literal([1, 2, 3]), '[1, 2, 3]')
+        self.assertEqual(literal(('a', 'b')), '["a", "b"]')
+        self.assertEqual(literal([]), '[]')
+
+    def test_graphql_literal_renders_decimal_as_number(self):
+        """Decimal values render as numeric literals, not quoted strings."""
+        literal = APIViewTestCases.GraphQLTestCase._graphql_literal
+        self.assertEqual(literal(Decimal('1.23')), '1.23')
+        self.assertEqual(literal([Decimal('1.5'), Decimal('2.5')]), '[1.5, 2.5]')
+
+    def test_per_kind_cap_counts_successful_emissions(self):
+        """Later candidate fields are tried until per-kind successful emissions reach the cap."""
+
+        class FakeMeta:
+            label = 'fake.FakeModel'
+            app_label = 'fake'
+
+        class FakeModel:
+            _meta = FakeMeta()
+            __name__ = 'FakeModel'
+
+        emit_calls = []
+
+        class Case(APIViewTestCases.GraphQLTestCase):
+            model = FakeModel
+            graphql_auto_filter_fields_per_kind = 2
+
+            def _get_model_graphql_filter_class(self, model=None):
+                class FilterClass:
+                    __annotations__ = {
+                        'empty_field_1': str,
+                        'empty_field_2': str,
+                        'useful_field_1': str,
+                        'useful_field_2': str,
+                        'useful_field_3': str,
+                    }
+                return FilterClass
+
+            def _classify_filter_annotation(self, annotation):
+                return 'str_lookup', None
+
+            def _emit_str_lookup_filter_tests(self, field_name, _kind_arg):
+                emit_calls.append(field_name)
+                if field_name.startswith('empty_'):
+                    return iter(())
+                return iter((GraphQLFilterTest(name=field_name, filters=f'{field_name}: "x"'),))
+
+        case = Case()
+        list(case._iter_auto_graphql_filter_tests())
+
+        # The candidate-counting bug stops at 'empty_field_1' and 'empty_field_2' (the slice
+        # captures the first 2). After the fix, the emitter is invoked on all 5 candidates
+        # in order until 2 SUCCESSFUL fields have emitted.
+        self.assertEqual(
+            emit_calls,
+            ['empty_field_1', 'empty_field_2', 'useful_field_1', 'useful_field_2'],
+        )
+
+    def test_get_filter_class_returns_none_when_parent_module_missing(self):
+        """When the parent `<app>.graphql` package is absent, return None instead of re-raising."""
+
+        class FakeMeta:
+            app_label = 'netbox_missing_graphql_fixture'
+
+        class FakeModel:
+            _meta = FakeMeta()
+            __name__ = 'BrokenModel'
+
+        instance = APIViewTestCases.GraphQLTestCase()
+        # No `netbox_missing_graphql_fixture` package is registered in sys.modules,
+        # so import_module raises ModuleNotFoundError with exc.name == 'netbox_missing_graphql_fixture'
+        # (the parent), not the full path 'netbox_missing_graphql_fixture.graphql.filters'.
+        # The fix accepts both shapes.
+        self.assertIsNone(instance._get_model_graphql_filter_class(FakeModel))
+
+    def test_filter_class_assertion_skipped_with_handwritten_tests(self):
+        """Hand-written tests exempt a model from the conventional filter class requirement."""
+
+        class FakeMeta:
+            label = 'fake.FakeModel'
+            app_label = 'fake'
+
+        class FakeModel:
+            _meta = FakeMeta()
+            __name__ = 'FakeModel'
+
+        class Case(APIViewTestCases.GraphQLTestCase):
+            model = FakeModel
+
+            def _get_model_graphql_filter_class(self, model=None):
+                return None
+
+        case = Case()
+        # Should not raise despite the missing conventional filter class.
+        case._assert_graphql_filter_class_present(
+            set(), handwritten_tests=[GraphQLFilterTest(name='x', filters='x: 1')]
+        )
+
+    def test_filter_class_assertion_fails_without_filter_class(self):
+        """Missing conventional filter class raises when no hand-written tests exist."""
+
+        class FakeMeta:
+            label = 'fake.FakeModel'
+            app_label = 'fake'
+
+        class FakeModel:
+            _meta = FakeMeta()
+            __name__ = 'FakeModel'
+
+        class Case(APIViewTestCases.GraphQLTestCase):
+            model = FakeModel
+
+            def _get_model_graphql_filter_class(self, model=None):
+                return None
+
+        case = Case()
+        with self.assertRaisesRegex(AssertionError, r'No GraphQL filter class found for fake\.FakeModel'):
+            case._assert_graphql_filter_class_present(set())