| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535 |
- import django_filters
- import netaddr
- from django.core.exceptions import ValidationError
- from django.db.models import Q
- from netaddr.core import AddrFormatError
- from dcim.models import Device, Interface, Region, Site
- from extras.filters import CustomFieldFilterSet, CreatedUpdatedFilterSet
- from tenancy.filtersets import TenancyFilterSet
- from utilities.filters import NameSlugSearchFilterSet, NumericInFilter, TagFilter, TreeNodeMultipleChoiceFilter
- from virtualization.models import VirtualMachine
- from .choices import *
- from .models import Aggregate, IPAddress, Prefix, RIR, Role, Service, VLAN, VLANGroup, VRF
- __all__ = (
- 'AggregateFilterSet',
- 'IPAddressFilterSet',
- 'PrefixFilterSet',
- 'RIRFilterSet',
- 'RoleFilterSet',
- 'ServiceFilterSet',
- 'VLANFilterSet',
- 'VLANGroupFilterSet',
- 'VRFFilterSet',
- )
- class VRFFilterSet(TenancyFilterSet, CustomFieldFilterSet, CreatedUpdatedFilterSet):
- id__in = NumericInFilter(
- field_name='id',
- lookup_expr='in'
- )
- q = django_filters.CharFilter(
- method='search',
- label='Search',
- )
- tag = TagFilter()
- def search(self, queryset, name, value):
- if not value.strip():
- return queryset
- return queryset.filter(
- Q(name__icontains=value) |
- Q(rd__icontains=value) |
- Q(description__icontains=value)
- )
- class Meta:
- model = VRF
- fields = ['name', 'rd', 'enforce_unique']
- class RIRFilterSet(NameSlugSearchFilterSet):
- id__in = NumericInFilter(
- field_name='id',
- lookup_expr='in'
- )
- class Meta:
- model = RIR
- fields = ['name', 'slug', 'is_private']
- class AggregateFilterSet(CustomFieldFilterSet, CreatedUpdatedFilterSet):
- id__in = NumericInFilter(
- field_name='id',
- lookup_expr='in'
- )
- q = django_filters.CharFilter(
- method='search',
- label='Search',
- )
- prefix = django_filters.CharFilter(
- method='filter_prefix',
- label='Prefix',
- )
- rir_id = django_filters.ModelMultipleChoiceFilter(
- queryset=RIR.objects.all(),
- label='RIR (ID)',
- )
- rir = django_filters.ModelMultipleChoiceFilter(
- field_name='rir__slug',
- queryset=RIR.objects.all(),
- to_field_name='slug',
- label='RIR (slug)',
- )
- tag = TagFilter()
- class Meta:
- model = Aggregate
- fields = ['family', 'date_added']
- def search(self, queryset, name, value):
- if not value.strip():
- return queryset
- qs_filter = Q(description__icontains=value)
- try:
- prefix = str(netaddr.IPNetwork(value.strip()).cidr)
- qs_filter |= Q(prefix__net_contains_or_equals=prefix)
- except (AddrFormatError, ValueError):
- pass
- return queryset.filter(qs_filter)
- def filter_prefix(self, queryset, name, value):
- if not value.strip():
- return queryset
- try:
- query = str(netaddr.IPNetwork(value).cidr)
- return queryset.filter(prefix=query)
- except ValidationError:
- return queryset.none()
- class RoleFilterSet(NameSlugSearchFilterSet):
- q = django_filters.CharFilter(
- method='search',
- label='Search',
- )
- class Meta:
- model = Role
- fields = ['id', 'name', 'slug']
- class PrefixFilterSet(TenancyFilterSet, CustomFieldFilterSet, CreatedUpdatedFilterSet):
- id__in = NumericInFilter(
- field_name='id',
- lookup_expr='in'
- )
- q = django_filters.CharFilter(
- method='search',
- label='Search',
- )
- prefix = django_filters.CharFilter(
- method='filter_prefix',
- label='Prefix',
- )
- within = django_filters.CharFilter(
- method='search_within',
- label='Within prefix',
- )
- within_include = django_filters.CharFilter(
- method='search_within_include',
- label='Within and including prefix',
- )
- contains = django_filters.CharFilter(
- method='search_contains',
- label='Prefixes which contain this prefix or IP',
- )
- mask_length = django_filters.NumberFilter(
- method='filter_mask_length',
- label='Mask length',
- )
- vrf_id = django_filters.ModelMultipleChoiceFilter(
- queryset=VRF.objects.all(),
- label='VRF',
- )
- vrf = django_filters.ModelMultipleChoiceFilter(
- field_name='vrf__rd',
- queryset=VRF.objects.all(),
- to_field_name='rd',
- label='VRF (RD)',
- )
- region_id = TreeNodeMultipleChoiceFilter(
- queryset=Region.objects.all(),
- field_name='site__region__in',
- label='Region (ID)',
- )
- region = TreeNodeMultipleChoiceFilter(
- queryset=Region.objects.all(),
- field_name='site__region__in',
- to_field_name='slug',
- label='Region (slug)',
- )
- site_id = django_filters.ModelMultipleChoiceFilter(
- queryset=Site.objects.all(),
- label='Site (ID)',
- )
- site = django_filters.ModelMultipleChoiceFilter(
- field_name='site__slug',
- queryset=Site.objects.all(),
- to_field_name='slug',
- label='Site (slug)',
- )
- vlan_id = django_filters.ModelMultipleChoiceFilter(
- queryset=VLAN.objects.all(),
- label='VLAN (ID)',
- )
- vlan_vid = django_filters.NumberFilter(
- field_name='vlan__vid',
- label='VLAN number (1-4095)',
- )
- role_id = django_filters.ModelMultipleChoiceFilter(
- queryset=Role.objects.all(),
- label='Role (ID)',
- )
- role = django_filters.ModelMultipleChoiceFilter(
- field_name='role__slug',
- queryset=Role.objects.all(),
- to_field_name='slug',
- label='Role (slug)',
- )
- status = django_filters.MultipleChoiceFilter(
- choices=PrefixStatusChoices,
- null_value=None
- )
- tag = TagFilter()
- class Meta:
- model = Prefix
- fields = ['family', 'is_pool']
- def search(self, queryset, name, value):
- if not value.strip():
- return queryset
- qs_filter = Q(description__icontains=value)
- try:
- prefix = str(netaddr.IPNetwork(value.strip()).cidr)
- qs_filter |= Q(prefix__net_contains_or_equals=prefix)
- except (AddrFormatError, ValueError):
- pass
- return queryset.filter(qs_filter)
- def filter_prefix(self, queryset, name, value):
- if not value.strip():
- return queryset
- try:
- query = str(netaddr.IPNetwork(value).cidr)
- return queryset.filter(prefix=query)
- except ValidationError:
- return queryset.none()
- def search_within(self, queryset, name, value):
- value = value.strip()
- if not value:
- return queryset
- try:
- query = str(netaddr.IPNetwork(value).cidr)
- return queryset.filter(prefix__net_contained=query)
- except (AddrFormatError, ValueError):
- return queryset.none()
- def search_within_include(self, queryset, name, value):
- value = value.strip()
- if not value:
- return queryset
- try:
- query = str(netaddr.IPNetwork(value).cidr)
- return queryset.filter(prefix__net_contained_or_equal=query)
- except (AddrFormatError, ValueError):
- return queryset.none()
- def search_contains(self, queryset, name, value):
- value = value.strip()
- if not value:
- return queryset
- try:
- # Searching by prefix
- if '/' in value:
- return queryset.filter(prefix__net_contains_or_equals=str(netaddr.IPNetwork(value).cidr))
- # Searching by IP address
- else:
- return queryset.filter(prefix__net_contains=str(netaddr.IPAddress(value)))
- except (AddrFormatError, ValueError):
- return queryset.none()
- def filter_mask_length(self, queryset, name, value):
- if not value:
- return queryset
- return queryset.filter(prefix__net_mask_length=value)
- class IPAddressFilterSet(TenancyFilterSet, CustomFieldFilterSet, CreatedUpdatedFilterSet):
- id__in = NumericInFilter(
- field_name='id',
- lookup_expr='in'
- )
- q = django_filters.CharFilter(
- method='search',
- label='Search',
- )
- parent = django_filters.CharFilter(
- method='search_by_parent',
- label='Parent prefix',
- )
- address = django_filters.CharFilter(
- method='filter_address',
- label='Address',
- )
- mask_length = django_filters.NumberFilter(
- method='filter_mask_length',
- label='Mask length',
- )
- vrf_id = django_filters.ModelMultipleChoiceFilter(
- queryset=VRF.objects.all(),
- label='VRF',
- )
- vrf = django_filters.ModelMultipleChoiceFilter(
- field_name='vrf__rd',
- queryset=VRF.objects.all(),
- to_field_name='rd',
- label='VRF (RD)',
- )
- device = django_filters.CharFilter(
- method='filter_device',
- field_name='name',
- label='Device',
- )
- device_id = django_filters.NumberFilter(
- method='filter_device',
- field_name='pk',
- label='Device (ID)',
- )
- virtual_machine_id = django_filters.ModelMultipleChoiceFilter(
- field_name='interface__virtual_machine',
- queryset=VirtualMachine.objects.all(),
- label='Virtual machine (ID)',
- )
- virtual_machine = django_filters.ModelMultipleChoiceFilter(
- field_name='interface__virtual_machine__name',
- queryset=VirtualMachine.objects.all(),
- to_field_name='name',
- label='Virtual machine (name)',
- )
- interface = django_filters.ModelMultipleChoiceFilter(
- field_name='interface__name',
- queryset=Interface.objects.all(),
- to_field_name='name',
- label='Interface (ID)',
- )
- interface_id = django_filters.ModelMultipleChoiceFilter(
- queryset=Interface.objects.all(),
- label='Interface (ID)',
- )
- assigned_to_interface = django_filters.BooleanFilter(
- method='_assigned_to_interface',
- label='Is assigned to an interface',
- )
- status = django_filters.MultipleChoiceFilter(
- choices=IPAddressStatusChoices,
- null_value=None
- )
- role = django_filters.MultipleChoiceFilter(
- choices=IPAddressRoleChoices
- )
- tag = TagFilter()
- class Meta:
- model = IPAddress
- fields = ['family', 'dns_name']
- def search(self, queryset, name, value):
- if not value.strip():
- return queryset
- qs_filter = (
- Q(dns_name__icontains=value) |
- Q(description__icontains=value) |
- Q(address__istartswith=value)
- )
- return queryset.filter(qs_filter)
- def search_by_parent(self, queryset, name, value):
- value = value.strip()
- if not value:
- return queryset
- try:
- query = str(netaddr.IPNetwork(value.strip()).cidr)
- return queryset.filter(address__net_host_contained=query)
- except (AddrFormatError, ValueError):
- return queryset.none()
- def filter_address(self, queryset, name, value):
- if not value.strip():
- return queryset
- try:
- # Match address and subnet mask
- if '/' in value:
- return queryset.filter(address=value)
- return queryset.filter(address__net_host=value)
- except ValidationError:
- return queryset.none()
- def filter_mask_length(self, queryset, name, value):
- if not value:
- return queryset
- return queryset.filter(address__net_mask_length=value)
- def filter_device(self, queryset, name, value):
- try:
- device = Device.objects.prefetch_related('device_type').get(**{name: value})
- vc_interface_ids = [i['id'] for i in device.vc_interfaces.values('id')]
- return queryset.filter(interface_id__in=vc_interface_ids)
- except Device.DoesNotExist:
- return queryset.none()
- def _assigned_to_interface(self, queryset, name, value):
- return queryset.exclude(interface__isnull=value)
- class VLANGroupFilterSet(NameSlugSearchFilterSet):
- region_id = TreeNodeMultipleChoiceFilter(
- queryset=Region.objects.all(),
- field_name='site__region__in',
- label='Region (ID)',
- )
- region = TreeNodeMultipleChoiceFilter(
- queryset=Region.objects.all(),
- field_name='site__region__in',
- to_field_name='slug',
- label='Region (slug)',
- )
- site_id = django_filters.ModelMultipleChoiceFilter(
- queryset=Site.objects.all(),
- label='Site (ID)',
- )
- site = django_filters.ModelMultipleChoiceFilter(
- field_name='site__slug',
- queryset=Site.objects.all(),
- to_field_name='slug',
- label='Site (slug)',
- )
- class Meta:
- model = VLANGroup
- fields = ['id', 'name', 'slug']
- class VLANFilterSet(TenancyFilterSet, CustomFieldFilterSet, CreatedUpdatedFilterSet):
- id__in = NumericInFilter(
- field_name='id',
- lookup_expr='in'
- )
- q = django_filters.CharFilter(
- method='search',
- label='Search',
- )
- region_id = TreeNodeMultipleChoiceFilter(
- queryset=Region.objects.all(),
- field_name='site__region__in',
- label='Region (ID)',
- )
- region = TreeNodeMultipleChoiceFilter(
- queryset=Region.objects.all(),
- field_name='site__region__in',
- to_field_name='slug',
- label='Region (slug)',
- )
- site_id = django_filters.ModelMultipleChoiceFilter(
- queryset=Site.objects.all(),
- label='Site (ID)',
- )
- site = django_filters.ModelMultipleChoiceFilter(
- field_name='site__slug',
- queryset=Site.objects.all(),
- to_field_name='slug',
- label='Site (slug)',
- )
- group_id = django_filters.ModelMultipleChoiceFilter(
- queryset=VLANGroup.objects.all(),
- label='Group (ID)',
- )
- group = django_filters.ModelMultipleChoiceFilter(
- field_name='group__slug',
- queryset=VLANGroup.objects.all(),
- to_field_name='slug',
- label='Group',
- )
- role_id = django_filters.ModelMultipleChoiceFilter(
- queryset=Role.objects.all(),
- label='Role (ID)',
- )
- role = django_filters.ModelMultipleChoiceFilter(
- field_name='role__slug',
- queryset=Role.objects.all(),
- to_field_name='slug',
- label='Role (slug)',
- )
- status = django_filters.MultipleChoiceFilter(
- choices=VLANStatusChoices,
- null_value=None
- )
- tag = TagFilter()
- class Meta:
- model = VLAN
- fields = ['vid', 'name']
- def search(self, queryset, name, value):
- if not value.strip():
- return queryset
- qs_filter = Q(name__icontains=value) | Q(description__icontains=value)
- try:
- qs_filter |= Q(vid=int(value.strip()))
- except ValueError:
- pass
- return queryset.filter(qs_filter)
- class ServiceFilterSet(CreatedUpdatedFilterSet):
- q = django_filters.CharFilter(
- method='search',
- label='Search',
- )
- device_id = django_filters.ModelMultipleChoiceFilter(
- queryset=Device.objects.all(),
- label='Device (ID)',
- )
- device = django_filters.ModelMultipleChoiceFilter(
- field_name='device__name',
- queryset=Device.objects.all(),
- to_field_name='name',
- label='Device (name)',
- )
- virtual_machine_id = django_filters.ModelMultipleChoiceFilter(
- queryset=VirtualMachine.objects.all(),
- label='Virtual machine (ID)',
- )
- virtual_machine = django_filters.ModelMultipleChoiceFilter(
- field_name='virtual_machine__name',
- queryset=VirtualMachine.objects.all(),
- to_field_name='name',
- label='Virtual machine (name)',
- )
- tag = TagFilter()
- class Meta:
- model = Service
- fields = ['id', 'name', 'protocol', 'port']
- def search(self, queryset, name, value):
- if not value.strip():
- return queryset
- qs_filter = Q(name__icontains=value) | Q(description__icontains=value)
- return queryset.filter(qs_filter)
|