| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555 |
- import django_filters
- import netaddr
- from django.core.exceptions import ValidationError
- from django.db.models import Q
- from netaddr.core import AddrFormatError
- from dcim.models import Device, Interface, Region, Site
- from extras.filters import CustomFieldFilterSet, CreatedUpdatedFilterSet
- from tenancy.filters import TenancyFilterSet
- from utilities.filters import (
- BaseFilterSet, MultiValueCharFilter, MultiValueNumberFilter, NameSlugSearchFilterSet, TagFilter,
- TreeNodeMultipleChoiceFilter,
- )
- from virtualization.models import VirtualMachine, VMInterface
- from .choices import *
- from .models import Aggregate, IPAddress, Prefix, RIR, Role, Service, VLAN, VLANGroup, VRF
- __all__ = (
- 'AggregateFilterSet',
- 'IPAddressFilterSet',
- 'PrefixFilterSet',
- 'RIRFilterSet',
- 'RoleFilterSet',
- 'ServiceFilterSet',
- 'VLANFilterSet',
- 'VLANGroupFilterSet',
- 'VRFFilterSet',
- )
- class VRFFilterSet(BaseFilterSet, TenancyFilterSet, CustomFieldFilterSet, CreatedUpdatedFilterSet):
- q = django_filters.CharFilter(
- method='search',
- label='Search',
- )
- tag = TagFilter()
- def search(self, queryset, name, value):
- if not value.strip():
- return queryset
- return queryset.filter(
- Q(name__icontains=value) |
- Q(rd__icontains=value) |
- Q(description__icontains=value)
- )
- class Meta:
- model = VRF
- fields = ['id', 'name', 'rd', 'enforce_unique']
- class RIRFilterSet(BaseFilterSet, NameSlugSearchFilterSet):
- class Meta:
- model = RIR
- fields = ['id', 'name', 'slug', 'is_private', 'description']
- class AggregateFilterSet(BaseFilterSet, CustomFieldFilterSet, CreatedUpdatedFilterSet):
- q = django_filters.CharFilter(
- method='search',
- label='Search',
- )
- family = django_filters.NumberFilter(
- field_name='prefix',
- lookup_expr='family'
- )
- prefix = django_filters.CharFilter(
- method='filter_prefix',
- label='Prefix',
- )
- rir_id = django_filters.ModelMultipleChoiceFilter(
- queryset=RIR.objects.all(),
- label='RIR (ID)',
- )
- rir = django_filters.ModelMultipleChoiceFilter(
- field_name='rir__slug',
- queryset=RIR.objects.all(),
- to_field_name='slug',
- label='RIR (slug)',
- )
- tag = TagFilter()
- class Meta:
- model = Aggregate
- fields = ['id', 'date_added']
- def search(self, queryset, name, value):
- if not value.strip():
- return queryset
- qs_filter = Q(description__icontains=value)
- try:
- prefix = str(netaddr.IPNetwork(value.strip()).cidr)
- qs_filter |= Q(prefix__net_contains_or_equals=prefix)
- except (AddrFormatError, ValueError):
- pass
- return queryset.filter(qs_filter)
- def filter_prefix(self, queryset, name, value):
- if not value.strip():
- return queryset
- try:
- query = str(netaddr.IPNetwork(value).cidr)
- return queryset.filter(prefix=query)
- except (AddrFormatError, ValueError):
- return queryset.none()
- class RoleFilterSet(BaseFilterSet, NameSlugSearchFilterSet):
- q = django_filters.CharFilter(
- method='search',
- label='Search',
- )
- class Meta:
- model = Role
- fields = ['id', 'name', 'slug']
- class PrefixFilterSet(BaseFilterSet, TenancyFilterSet, CustomFieldFilterSet, CreatedUpdatedFilterSet):
- q = django_filters.CharFilter(
- method='search',
- label='Search',
- )
- family = django_filters.NumberFilter(
- field_name='prefix',
- lookup_expr='family'
- )
- prefix = django_filters.CharFilter(
- method='filter_prefix',
- label='Prefix',
- )
- within = django_filters.CharFilter(
- method='search_within',
- label='Within prefix',
- )
- within_include = django_filters.CharFilter(
- method='search_within_include',
- label='Within and including prefix',
- )
- contains = django_filters.CharFilter(
- method='search_contains',
- label='Prefixes which contain this prefix or IP',
- )
- mask_length = django_filters.NumberFilter(
- field_name='prefix',
- lookup_expr='net_mask_length'
- )
- mask_length__gte = django_filters.NumberFilter(
- field_name='prefix',
- lookup_expr='net_mask_length__gte'
- )
- mask_length__lte = django_filters.NumberFilter(
- field_name='prefix',
- lookup_expr='net_mask_length__lte'
- )
- vrf_id = django_filters.ModelMultipleChoiceFilter(
- queryset=VRF.objects.all(),
- label='VRF',
- )
- vrf = django_filters.ModelMultipleChoiceFilter(
- field_name='vrf__rd',
- queryset=VRF.objects.all(),
- to_field_name='rd',
- label='VRF (RD)',
- )
- region_id = TreeNodeMultipleChoiceFilter(
- queryset=Region.objects.all(),
- field_name='site__region',
- lookup_expr='in',
- label='Region (ID)',
- )
- region = TreeNodeMultipleChoiceFilter(
- queryset=Region.objects.all(),
- field_name='site__region',
- lookup_expr='in',
- to_field_name='slug',
- label='Region (slug)',
- )
- site_id = django_filters.ModelMultipleChoiceFilter(
- queryset=Site.objects.all(),
- label='Site (ID)',
- )
- site = django_filters.ModelMultipleChoiceFilter(
- field_name='site__slug',
- queryset=Site.objects.all(),
- to_field_name='slug',
- label='Site (slug)',
- )
- vlan_id = django_filters.ModelMultipleChoiceFilter(
- queryset=VLAN.objects.all(),
- label='VLAN (ID)',
- )
- vlan_vid = django_filters.NumberFilter(
- field_name='vlan__vid',
- label='VLAN number (1-4095)',
- )
- role_id = django_filters.ModelMultipleChoiceFilter(
- queryset=Role.objects.all(),
- label='Role (ID)',
- )
- role = django_filters.ModelMultipleChoiceFilter(
- field_name='role__slug',
- queryset=Role.objects.all(),
- to_field_name='slug',
- label='Role (slug)',
- )
- status = django_filters.MultipleChoiceFilter(
- choices=PrefixStatusChoices,
- null_value=None
- )
- tag = TagFilter()
- class Meta:
- model = Prefix
- fields = ['id', 'is_pool']
- def search(self, queryset, name, value):
- if not value.strip():
- return queryset
- qs_filter = Q(description__icontains=value)
- try:
- prefix = str(netaddr.IPNetwork(value.strip()).cidr)
- qs_filter |= Q(prefix__net_contains_or_equals=prefix)
- except (AddrFormatError, ValueError):
- pass
- return queryset.filter(qs_filter)
- def filter_prefix(self, queryset, name, value):
- if not value.strip():
- return queryset
- try:
- query = str(netaddr.IPNetwork(value).cidr)
- return queryset.filter(prefix=query)
- except (AddrFormatError, ValueError):
- return queryset.none()
- def search_within(self, queryset, name, value):
- value = value.strip()
- if not value:
- return queryset
- try:
- query = str(netaddr.IPNetwork(value).cidr)
- return queryset.filter(prefix__net_contained=query)
- except (AddrFormatError, ValueError):
- return queryset.none()
- def search_within_include(self, queryset, name, value):
- value = value.strip()
- if not value:
- return queryset
- try:
- query = str(netaddr.IPNetwork(value).cidr)
- return queryset.filter(prefix__net_contained_or_equal=query)
- except (AddrFormatError, ValueError):
- return queryset.none()
- def search_contains(self, queryset, name, value):
- value = value.strip()
- if not value:
- return queryset
- try:
- # Searching by prefix
- if '/' in value:
- return queryset.filter(prefix__net_contains_or_equals=str(netaddr.IPNetwork(value).cidr))
- # Searching by IP address
- else:
- return queryset.filter(prefix__net_contains=str(netaddr.IPAddress(value)))
- except (AddrFormatError, ValueError):
- return queryset.none()
- class IPAddressFilterSet(BaseFilterSet, TenancyFilterSet, CustomFieldFilterSet, CreatedUpdatedFilterSet):
- q = django_filters.CharFilter(
- method='search',
- label='Search',
- )
- family = django_filters.NumberFilter(
- field_name='address',
- lookup_expr='family'
- )
- parent = django_filters.CharFilter(
- method='search_by_parent',
- label='Parent prefix',
- )
- address = MultiValueCharFilter(
- method='filter_address',
- label='Address',
- )
- mask_length = django_filters.NumberFilter(
- method='filter_mask_length',
- label='Mask length',
- )
- vrf_id = django_filters.ModelMultipleChoiceFilter(
- queryset=VRF.objects.all(),
- label='VRF',
- )
- vrf = django_filters.ModelMultipleChoiceFilter(
- field_name='vrf__rd',
- queryset=VRF.objects.all(),
- to_field_name='rd',
- label='VRF (RD)',
- )
- device = MultiValueCharFilter(
- method='filter_device',
- field_name='name',
- label='Device (name)',
- )
- device_id = MultiValueNumberFilter(
- method='filter_device',
- field_name='pk',
- label='Device (ID)',
- )
- virtual_machine = MultiValueCharFilter(
- method='filter_virtual_machine',
- field_name='name',
- label='Virtual machine (name)',
- )
- virtual_machine_id = MultiValueNumberFilter(
- method='filter_virtual_machine',
- field_name='pk',
- label='Virtual machine (ID)',
- )
- interface = django_filters.ModelMultipleChoiceFilter(
- field_name='interface__name',
- queryset=Interface.objects.all(),
- to_field_name='name',
- label='Interface (name)',
- )
- interface_id = django_filters.ModelMultipleChoiceFilter(
- field_name='interface',
- queryset=Interface.objects.all(),
- label='Interface (ID)',
- )
- vminterface = django_filters.ModelMultipleChoiceFilter(
- field_name='vminterface__name',
- queryset=VMInterface.objects.all(),
- to_field_name='name',
- label='VM interface (name)',
- )
- vminterface_id = django_filters.ModelMultipleChoiceFilter(
- field_name='vminterface',
- queryset=VMInterface.objects.all(),
- label='VM interface (ID)',
- )
- assigned_to_interface = django_filters.BooleanFilter(
- method='_assigned_to_interface',
- label='Is assigned to an interface',
- )
- status = django_filters.MultipleChoiceFilter(
- choices=IPAddressStatusChoices,
- null_value=None
- )
- role = django_filters.MultipleChoiceFilter(
- choices=IPAddressRoleChoices
- )
- tag = TagFilter()
- class Meta:
- model = IPAddress
- fields = ['id', 'dns_name']
- def search(self, queryset, name, value):
- if not value.strip():
- return queryset
- qs_filter = (
- Q(dns_name__icontains=value) |
- Q(description__icontains=value) |
- Q(address__istartswith=value)
- )
- return queryset.filter(qs_filter)
- def search_by_parent(self, queryset, name, value):
- value = value.strip()
- if not value:
- return queryset
- try:
- query = str(netaddr.IPNetwork(value.strip()).cidr)
- return queryset.filter(address__net_host_contained=query)
- except (AddrFormatError, ValueError):
- return queryset.none()
- def filter_address(self, queryset, name, value):
- try:
- return queryset.filter(address__net_in=value)
- except ValidationError:
- return queryset.none()
- def filter_mask_length(self, queryset, name, value):
- if not value:
- return queryset
- return queryset.filter(address__net_mask_length=value)
- def filter_device(self, queryset, name, value):
- devices = Device.objects.filter(**{'{}__in'.format(name): value})
- if not devices.exists():
- return queryset.none()
- interface_ids = []
- for device in devices:
- interface_ids.extend(device.vc_interfaces.values_list('id', flat=True))
- return queryset.filter(
- interface__in=interface_ids
- )
- def filter_virtual_machine(self, queryset, name, value):
- virtual_machines = VirtualMachine.objects.filter(**{'{}__in'.format(name): value})
- if not virtual_machines.exists():
- return queryset.none()
- interface_ids = []
- for vm in virtual_machines:
- interface_ids.extend(vm.interfaces.values_list('id', flat=True))
- return queryset.filter(
- vminterface__in=interface_ids
- )
- def _assigned_to_interface(self, queryset, name, value):
- return queryset.exclude(assigned_object_id__isnull=value)
- class VLANGroupFilterSet(BaseFilterSet, NameSlugSearchFilterSet):
- region_id = TreeNodeMultipleChoiceFilter(
- queryset=Region.objects.all(),
- field_name='site__region',
- lookup_expr='in',
- label='Region (ID)',
- )
- region = TreeNodeMultipleChoiceFilter(
- queryset=Region.objects.all(),
- field_name='site__region',
- lookup_expr='in',
- to_field_name='slug',
- label='Region (slug)',
- )
- site_id = django_filters.ModelMultipleChoiceFilter(
- queryset=Site.objects.all(),
- label='Site (ID)',
- )
- site = django_filters.ModelMultipleChoiceFilter(
- field_name='site__slug',
- queryset=Site.objects.all(),
- to_field_name='slug',
- label='Site (slug)',
- )
- class Meta:
- model = VLANGroup
- fields = ['id', 'name', 'slug', 'description']
- class VLANFilterSet(BaseFilterSet, TenancyFilterSet, CustomFieldFilterSet, CreatedUpdatedFilterSet):
- q = django_filters.CharFilter(
- method='search',
- label='Search',
- )
- region_id = TreeNodeMultipleChoiceFilter(
- queryset=Region.objects.all(),
- field_name='site__region',
- lookup_expr='in',
- label='Region (ID)',
- )
- region = TreeNodeMultipleChoiceFilter(
- queryset=Region.objects.all(),
- field_name='site__region',
- lookup_expr='in',
- to_field_name='slug',
- label='Region (slug)',
- )
- site_id = django_filters.ModelMultipleChoiceFilter(
- queryset=Site.objects.all(),
- label='Site (ID)',
- )
- site = django_filters.ModelMultipleChoiceFilter(
- field_name='site__slug',
- queryset=Site.objects.all(),
- to_field_name='slug',
- label='Site (slug)',
- )
- group_id = django_filters.ModelMultipleChoiceFilter(
- queryset=VLANGroup.objects.all(),
- label='Group (ID)',
- )
- group = django_filters.ModelMultipleChoiceFilter(
- field_name='group__slug',
- queryset=VLANGroup.objects.all(),
- to_field_name='slug',
- label='Group',
- )
- role_id = django_filters.ModelMultipleChoiceFilter(
- queryset=Role.objects.all(),
- label='Role (ID)',
- )
- role = django_filters.ModelMultipleChoiceFilter(
- field_name='role__slug',
- queryset=Role.objects.all(),
- to_field_name='slug',
- label='Role (slug)',
- )
- status = django_filters.MultipleChoiceFilter(
- choices=VLANStatusChoices,
- null_value=None
- )
- tag = TagFilter()
- class Meta:
- model = VLAN
- fields = ['id', 'vid', 'name']
- def search(self, queryset, name, value):
- if not value.strip():
- return queryset
- qs_filter = Q(name__icontains=value) | Q(description__icontains=value)
- try:
- qs_filter |= Q(vid=int(value.strip()))
- except ValueError:
- pass
- return queryset.filter(qs_filter)
- class ServiceFilterSet(BaseFilterSet, CreatedUpdatedFilterSet):
- q = django_filters.CharFilter(
- method='search',
- label='Search',
- )
- device_id = django_filters.ModelMultipleChoiceFilter(
- queryset=Device.objects.all(),
- label='Device (ID)',
- )
- device = django_filters.ModelMultipleChoiceFilter(
- field_name='device__name',
- queryset=Device.objects.all(),
- to_field_name='name',
- label='Device (name)',
- )
- virtual_machine_id = django_filters.ModelMultipleChoiceFilter(
- queryset=VirtualMachine.objects.all(),
- label='Virtual machine (ID)',
- )
- virtual_machine = django_filters.ModelMultipleChoiceFilter(
- field_name='virtual_machine__name',
- queryset=VirtualMachine.objects.all(),
- to_field_name='name',
- label='Virtual machine (name)',
- )
- tag = TagFilter()
- class Meta:
- model = Service
- fields = ['id', 'name', 'protocol', 'port']
- def search(self, queryset, name, value):
- if not value.strip():
- return queryset
- qs_filter = Q(name__icontains=value) | Q(description__icontains=value)
- return queryset.filter(qs_filter)
|