filtersets.py 41 KB


  1. import django_filters
  2. import netaddr
  3. from dcim.base_filtersets import ScopedFilterSet
  4. from django.contrib.contenttypes.models import ContentType
  5. from django.core.exceptions import ValidationError
  6. from django.db.models import Q
  7. from django.utils.translation import gettext as _
  8. from drf_spectacular.types import OpenApiTypes
  9. from drf_spectacular.utils import extend_schema_field
  10. from netaddr.core import AddrFormatError
  11. from circuits.models import Provider
  12. from dcim.models import Device, Interface, Region, Site, SiteGroup
  13. from netbox.filtersets import ChangeLoggedModelFilterSet, OrganizationalModelFilterSet, NetBoxModelFilterSet
  14. from tenancy.filtersets import ContactModelFilterSet, TenancyFilterSet
  15. from utilities.filters import (
  16. ContentTypeFilter, MultiValueCharFilter, MultiValueNumberFilter, NumericArrayFilter, TreeNodeMultipleChoiceFilter,
  17. )
  18. from virtualization.models import VirtualMachine, VMInterface
  19. from vpn.models import L2VPN
  20. from .choices import *
  21. from .models import *
  22. __all__ = (
  23. 'AggregateFilterSet',
  24. 'ASNFilterSet',
  25. 'ASNRangeFilterSet',
  26. 'FHRPGroupAssignmentFilterSet',
  27. 'FHRPGroupFilterSet',
  28. 'IPAddressFilterSet',
  29. 'IPRangeFilterSet',
  30. 'PrefixFilterSet',
  31. 'PrimaryIPFilterSet',
  32. 'RIRFilterSet',
  33. 'RoleFilterSet',
  34. 'RouteTargetFilterSet',
  35. 'ServiceFilterSet',
  36. 'ServiceTemplateFilterSet',
  37. 'VLANFilterSet',
  38. 'VLANGroupFilterSet',
  39. 'VLANTranslationPolicyFilterSet',
  40. 'VLANTranslationRuleFilterSet',
  41. 'VRFFilterSet',
  42. )
  43. class VRFFilterSet(NetBoxModelFilterSet, TenancyFilterSet):
  44. import_target_id = django_filters.ModelMultipleChoiceFilter(
  45. field_name='import_targets',
  46. queryset=RouteTarget.objects.all(),
  47. label=_('Import target'),
  48. )
  49. import_target = django_filters.ModelMultipleChoiceFilter(
  50. field_name='import_targets__name',
  51. queryset=RouteTarget.objects.all(),
  52. to_field_name='name',
  53. label=_('Import target (name)'),
  54. )
  55. export_target_id = django_filters.ModelMultipleChoiceFilter(
  56. field_name='export_targets',
  57. queryset=RouteTarget.objects.all(),
  58. label=_('Export target'),
  59. )
  60. export_target = django_filters.ModelMultipleChoiceFilter(
  61. field_name='export_targets__name',
  62. queryset=RouteTarget.objects.all(),
  63. to_field_name='name',
  64. label=_('Export target (name)'),
  65. )
  66. def search(self, queryset, name, value):
  67. if not value.strip():
  68. return queryset
  69. return queryset.filter(
  70. Q(name__icontains=value) |
  71. Q(rd__icontains=value) |
  72. Q(description__icontains=value)
  73. )
  74. class Meta:
  75. model = VRF
  76. fields = ('id', 'name', 'rd', 'enforce_unique', 'description')
  77. class RouteTargetFilterSet(NetBoxModelFilterSet, TenancyFilterSet):
  78. importing_vrf_id = django_filters.ModelMultipleChoiceFilter(
  79. field_name='importing_vrfs',
  80. queryset=VRF.objects.all(),
  81. label=_('Importing VRF'),
  82. )
  83. importing_vrf = django_filters.ModelMultipleChoiceFilter(
  84. field_name='importing_vrfs__rd',
  85. queryset=VRF.objects.all(),
  86. to_field_name='rd',
  87. label=_('Import VRF (RD)'),
  88. )
  89. exporting_vrf_id = django_filters.ModelMultipleChoiceFilter(
  90. field_name='exporting_vrfs',
  91. queryset=VRF.objects.all(),
  92. label=_('Exporting VRF'),
  93. )
  94. exporting_vrf = django_filters.ModelMultipleChoiceFilter(
  95. field_name='exporting_vrfs__rd',
  96. queryset=VRF.objects.all(),
  97. to_field_name='rd',
  98. label=_('Export VRF (RD)'),
  99. )
  100. importing_l2vpn_id = django_filters.ModelMultipleChoiceFilter(
  101. field_name='importing_l2vpns',
  102. queryset=L2VPN.objects.all(),
  103. label=_('Importing L2VPN'),
  104. )
  105. importing_l2vpn = django_filters.ModelMultipleChoiceFilter(
  106. field_name='importing_l2vpns__identifier',
  107. queryset=L2VPN.objects.all(),
  108. to_field_name='identifier',
  109. label=_('Importing L2VPN (identifier)'),
  110. )
  111. exporting_l2vpn_id = django_filters.ModelMultipleChoiceFilter(
  112. field_name='exporting_l2vpns',
  113. queryset=L2VPN.objects.all(),
  114. label=_('Exporting L2VPN'),
  115. )
  116. exporting_l2vpn = django_filters.ModelMultipleChoiceFilter(
  117. field_name='exporting_l2vpns__identifier',
  118. queryset=L2VPN.objects.all(),
  119. to_field_name='identifier',
  120. label=_('Exporting L2VPN (identifier)'),
  121. )
  122. def search(self, queryset, name, value):
  123. if not value.strip():
  124. return queryset
  125. return queryset.filter(
  126. Q(name__icontains=value) |
  127. Q(description__icontains=value)
  128. )
  129. class Meta:
  130. model = RouteTarget
  131. fields = ('id', 'name', 'description')
  132. class RIRFilterSet(OrganizationalModelFilterSet):
  133. class Meta:
  134. model = RIR
  135. fields = ('id', 'name', 'slug', 'is_private', 'description')
  136. class AggregateFilterSet(NetBoxModelFilterSet, TenancyFilterSet, ContactModelFilterSet):
  137. family = django_filters.NumberFilter(
  138. field_name='prefix',
  139. lookup_expr='family'
  140. )
  141. prefix = django_filters.CharFilter(
  142. method='filter_prefix',
  143. label=_('Prefix'),
  144. )
  145. rir_id = django_filters.ModelMultipleChoiceFilter(
  146. queryset=RIR.objects.all(),
  147. label=_('RIR (ID)'),
  148. )
  149. rir = django_filters.ModelMultipleChoiceFilter(
  150. field_name='rir__slug',
  151. queryset=RIR.objects.all(),
  152. to_field_name='slug',
  153. label=_('RIR (slug)'),
  154. )
  155. class Meta:
  156. model = Aggregate
  157. fields = ('id', 'date_added', 'description')
  158. def search(self, queryset, name, value):
  159. if not value.strip():
  160. return queryset
  161. qs_filter = Q(description__icontains=value)
  162. qs_filter |= Q(prefix__contains=value.strip())
  163. try:
  164. prefix = str(netaddr.IPNetwork(value.strip()).cidr)
  165. qs_filter |= Q(prefix__net_contains_or_equals=prefix)
  166. qs_filter |= Q(prefix__contains=value.strip())
  167. except (AddrFormatError, ValueError):
  168. pass
  169. return queryset.filter(qs_filter)
  170. def filter_prefix(self, queryset, name, value):
  171. if not value.strip():
  172. return queryset
  173. try:
  174. query = str(netaddr.IPNetwork(value).cidr)
  175. return queryset.filter(prefix=query)
  176. except (AddrFormatError, ValueError):
  177. return queryset.none()
  178. class ASNRangeFilterSet(OrganizationalModelFilterSet, TenancyFilterSet):
  179. rir_id = django_filters.ModelMultipleChoiceFilter(
  180. queryset=RIR.objects.all(),
  181. label=_('RIR (ID)'),
  182. )
  183. rir = django_filters.ModelMultipleChoiceFilter(
  184. field_name='rir__slug',
  185. queryset=RIR.objects.all(),
  186. to_field_name='slug',
  187. label=_('RIR (slug)'),
  188. )
  189. class Meta:
  190. model = ASNRange
  191. fields = ('id', 'name', 'slug', 'start', 'end', 'description')
  192. def search(self, queryset, name, value):
  193. if not value.strip():
  194. return queryset
  195. return queryset.filter(
  196. Q(name__icontains=value) |
  197. Q(description__icontains=value)
  198. )
  199. class ASNFilterSet(OrganizationalModelFilterSet, TenancyFilterSet):
  200. rir_id = django_filters.ModelMultipleChoiceFilter(
  201. queryset=RIR.objects.all(),
  202. label=_('RIR (ID)'),
  203. )
  204. rir = django_filters.ModelMultipleChoiceFilter(
  205. field_name='rir__slug',
  206. queryset=RIR.objects.all(),
  207. to_field_name='slug',
  208. label=_('RIR (slug)'),
  209. )
  210. site_group_id = TreeNodeMultipleChoiceFilter(
  211. queryset=SiteGroup.objects.all(),
  212. field_name='sites__group',
  213. lookup_expr='in',
  214. label=_('Site group (ID)'),
  215. )
  216. site_group = TreeNodeMultipleChoiceFilter(
  217. queryset=SiteGroup.objects.all(),
  218. field_name='sites__group',
  219. lookup_expr='in',
  220. to_field_name='slug',
  221. label=_('Site group (slug)'),
  222. )
  223. site_id = django_filters.ModelMultipleChoiceFilter(
  224. field_name='sites',
  225. queryset=Site.objects.all(),
  226. label=_('Site (ID)'),
  227. )
  228. site = django_filters.ModelMultipleChoiceFilter(
  229. field_name='sites__slug',
  230. queryset=Site.objects.all(),
  231. to_field_name='slug',
  232. label=_('Site (slug)'),
  233. )
  234. provider_id = django_filters.ModelMultipleChoiceFilter(
  235. field_name='providers',
  236. queryset=Provider.objects.all(),
  237. label=_('Provider (ID)'),
  238. )
  239. provider = django_filters.ModelMultipleChoiceFilter(
  240. field_name='providers__slug',
  241. queryset=Provider.objects.all(),
  242. to_field_name='slug',
  243. label=_('Provider (slug)'),
  244. )
  245. class Meta:
  246. model = ASN
  247. fields = ('id', 'asn', 'description')
  248. def search(self, queryset, name, value):
  249. if not value.strip():
  250. return queryset
  251. qs_filter = Q(description__icontains=value)
  252. try:
  253. qs_filter |= Q(asn=int(value))
  254. except ValueError:
  255. pass
  256. return queryset.filter(qs_filter)
  257. class RoleFilterSet(OrganizationalModelFilterSet):
  258. class Meta:
  259. model = Role
  260. fields = ('id', 'name', 'slug', 'description', 'weight')
  261. class PrefixFilterSet(NetBoxModelFilterSet, ScopedFilterSet, TenancyFilterSet, ContactModelFilterSet):
  262. family = django_filters.NumberFilter(
  263. field_name='prefix',
  264. lookup_expr='family'
  265. )
  266. prefix = MultiValueCharFilter(
  267. method='filter_prefix',
  268. label=_('Prefix'),
  269. )
  270. within = django_filters.CharFilter(
  271. method='search_within',
  272. label=_('Within prefix'),
  273. )
  274. within_include = django_filters.CharFilter(
  275. method='search_within_include',
  276. label=_('Within and including prefix'),
  277. )
  278. contains = django_filters.CharFilter(
  279. method='search_contains',
  280. label=_('Prefixes which contain this prefix or IP'),
  281. )
  282. depth = MultiValueNumberFilter(
  283. field_name='_depth'
  284. )
  285. children = MultiValueNumberFilter(
  286. field_name='_children'
  287. )
  288. mask_length = MultiValueNumberFilter(
  289. field_name='prefix',
  290. lookup_expr='net_mask_length',
  291. label=_('Mask length')
  292. )
  293. mask_length__gte = django_filters.NumberFilter(
  294. field_name='prefix',
  295. lookup_expr='net_mask_length__gte'
  296. )
  297. mask_length__lte = django_filters.NumberFilter(
  298. field_name='prefix',
  299. lookup_expr='net_mask_length__lte'
  300. )
  301. vrf_id = django_filters.ModelMultipleChoiceFilter(
  302. queryset=VRF.objects.all(),
  303. label=_('VRF'),
  304. )
  305. vrf = django_filters.ModelMultipleChoiceFilter(
  306. field_name='vrf__rd',
  307. queryset=VRF.objects.all(),
  308. to_field_name='rd',
  309. label=_('VRF (RD)'),
  310. )
  311. present_in_vrf_id = django_filters.ModelChoiceFilter(
  312. queryset=VRF.objects.all(),
  313. method='filter_present_in_vrf',
  314. label=_('VRF')
  315. )
  316. present_in_vrf = django_filters.ModelChoiceFilter(
  317. queryset=VRF.objects.all(),
  318. method='filter_present_in_vrf',
  319. to_field_name='rd',
  320. label=_('VRF (RD)'),
  321. )
  322. vlan_group_id = django_filters.ModelMultipleChoiceFilter(
  323. field_name='vlan__group',
  324. queryset=VLANGroup.objects.all(),
  325. to_field_name='id',
  326. label=_('VLAN Group (ID)'),
  327. )
  328. vlan_group = django_filters.ModelMultipleChoiceFilter(
  329. field_name='vlan__group__slug',
  330. queryset=VLANGroup.objects.all(),
  331. to_field_name='slug',
  332. label=_('VLAN Group (slug)'),
  333. )
  334. vlan_id = django_filters.ModelMultipleChoiceFilter(
  335. queryset=VLAN.objects.all(),
  336. label=_('VLAN (ID)'),
  337. )
  338. vlan_vid = django_filters.NumberFilter(
  339. field_name='vlan__vid',
  340. label=_('VLAN number (1-4094)'),
  341. )
  342. role_id = django_filters.ModelMultipleChoiceFilter(
  343. queryset=Role.objects.all(),
  344. label=_('Role (ID)'),
  345. )
  346. role = django_filters.ModelMultipleChoiceFilter(
  347. field_name='role__slug',
  348. queryset=Role.objects.all(),
  349. to_field_name='slug',
  350. label=_('Role (slug)'),
  351. )
  352. status = django_filters.MultipleChoiceFilter(
  353. choices=PrefixStatusChoices,
  354. null_value=None
  355. )
  356. class Meta:
  357. model = Prefix
  358. fields = ('id', 'scope_id', 'is_pool', 'mark_utilized', 'description')
  359. def search(self, queryset, name, value):
  360. if not value.strip():
  361. return queryset
  362. qs_filter = Q(description__icontains=value)
  363. qs_filter |= Q(prefix__contains=value.strip())
  364. try:
  365. prefix = str(netaddr.IPNetwork(value.strip()).cidr)
  366. qs_filter |= Q(prefix__net_contains_or_equals=prefix)
  367. qs_filter |= Q(prefix__contains=value.strip())
  368. except (AddrFormatError, ValueError):
  369. pass
  370. return queryset.filter(qs_filter)
  371. def filter_prefix(self, queryset, name, value):
  372. query_values = []
  373. for v in value:
  374. try:
  375. query_values.append(netaddr.IPNetwork(v))
  376. except (AddrFormatError, ValueError):
  377. pass
  378. return queryset.filter(prefix__in=query_values)
  379. def search_within(self, queryset, name, value):
  380. value = value.strip()
  381. if not value:
  382. return queryset
  383. try:
  384. query = str(netaddr.IPNetwork(value).cidr)
  385. return queryset.filter(prefix__net_contained=query)
  386. except (AddrFormatError, ValueError):
  387. return queryset.none()
  388. def search_within_include(self, queryset, name, value):
  389. value = value.strip()
  390. if not value:
  391. return queryset
  392. try:
  393. query = str(netaddr.IPNetwork(value).cidr)
  394. return queryset.filter(prefix__net_contained_or_equal=query)
  395. except (AddrFormatError, ValueError):
  396. return queryset.none()
  397. def search_contains(self, queryset, name, value):
  398. value = value.strip()
  399. if not value:
  400. return queryset
  401. try:
  402. # Searching by prefix
  403. if '/' in value:
  404. return queryset.filter(prefix__net_contains_or_equals=str(netaddr.IPNetwork(value).cidr))
  405. # Searching by IP address
  406. else:
  407. return queryset.filter(prefix__net_contains=str(netaddr.IPAddress(value)))
  408. except (AddrFormatError, ValueError):
  409. return queryset.none()
  410. @extend_schema_field(OpenApiTypes.STR)
  411. def filter_present_in_vrf(self, queryset, name, vrf):
  412. if vrf is None:
  413. return queryset.none()
  414. return queryset.filter(
  415. Q(vrf=vrf) |
  416. Q(vrf__export_targets__in=vrf.import_targets.all())
  417. ).distinct()
  418. class IPRangeFilterSet(TenancyFilterSet, NetBoxModelFilterSet, ContactModelFilterSet):
  419. family = django_filters.NumberFilter(
  420. field_name='start_address',
  421. lookup_expr='family'
  422. )
  423. start_address = MultiValueCharFilter(
  424. method='filter_address',
  425. label=_('Address'),
  426. )
  427. end_address = MultiValueCharFilter(
  428. method='filter_address',
  429. label=_('Address'),
  430. )
  431. contains = django_filters.CharFilter(
  432. method='search_contains',
  433. label=_('Ranges which contain this prefix or IP'),
  434. )
  435. vrf_id = django_filters.ModelMultipleChoiceFilter(
  436. queryset=VRF.objects.all(),
  437. label=_('VRF'),
  438. )
  439. vrf = django_filters.ModelMultipleChoiceFilter(
  440. field_name='vrf__rd',
  441. queryset=VRF.objects.all(),
  442. to_field_name='rd',
  443. label=_('VRF (RD)'),
  444. )
  445. role_id = django_filters.ModelMultipleChoiceFilter(
  446. queryset=Role.objects.all(),
  447. label=_('Role (ID)'),
  448. )
  449. role = django_filters.ModelMultipleChoiceFilter(
  450. field_name='role__slug',
  451. queryset=Role.objects.all(),
  452. to_field_name='slug',
  453. label=_('Role (slug)'),
  454. )
  455. status = django_filters.MultipleChoiceFilter(
  456. choices=IPRangeStatusChoices,
  457. null_value=None
  458. )
  459. parent = MultiValueCharFilter(
  460. method='search_by_parent',
  461. label=_('Parent prefix'),
  462. )
  463. class Meta:
  464. model = IPRange
  465. fields = ('id', 'mark_populated', 'mark_utilized', 'size', 'description')
  466. def search(self, queryset, name, value):
  467. if not value.strip():
  468. return queryset
  469. qs_filter = Q(description__icontains=value) | Q(start_address__contains=value) | Q(end_address__contains=value)
  470. try:
  471. ipaddress = str(netaddr.IPNetwork(value.strip()))
  472. qs_filter |= Q(start_address=ipaddress)
  473. qs_filter |= Q(end_address=ipaddress)
  474. except (AddrFormatError, ValueError):
  475. pass
  476. return queryset.filter(qs_filter)
  477. def search_contains(self, queryset, name, value):
  478. value = value.strip()
  479. if not value:
  480. return queryset
  481. try:
  482. # Strip mask
  483. ipaddress = netaddr.IPNetwork(value)
  484. return queryset.filter(start_address__lte=ipaddress, end_address__gte=ipaddress)
  485. except (AddrFormatError, ValueError):
  486. return queryset.none()
  487. def filter_address(self, queryset, name, value):
  488. try:
  489. return queryset.filter(**{f'{name}__net_in': value})
  490. except ValidationError:
  491. return queryset.none()
  492. def search_by_parent(self, queryset, name, value):
  493. if not value:
  494. return queryset
  495. q = Q()
  496. for prefix in value:
  497. try:
  498. query = str(netaddr.IPNetwork(prefix.strip()).cidr)
  499. q |= Q(start_address__net_host_contained=query, end_address__net_host_contained=query)
  500. except (AddrFormatError, ValueError):
  501. return queryset.none()
  502. return queryset.filter(q)
  503. class IPAddressFilterSet(NetBoxModelFilterSet, TenancyFilterSet, ContactModelFilterSet):
  504. family = django_filters.NumberFilter(
  505. field_name='address',
  506. lookup_expr='family'
  507. )
  508. parent = MultiValueCharFilter(
  509. method='search_by_parent',
  510. label=_('Parent prefix'),
  511. )
  512. address = MultiValueCharFilter(
  513. method='filter_address',
  514. label=_('Address'),
  515. )
  516. mask_length = MultiValueNumberFilter(
  517. field_name='address',
  518. lookup_expr='net_mask_length',
  519. label=_('Mask length')
  520. )
  521. mask_length__gte = django_filters.NumberFilter(
  522. field_name='address',
  523. lookup_expr='net_mask_length__gte'
  524. )
  525. mask_length__lte = django_filters.NumberFilter(
  526. field_name='address',
  527. lookup_expr='net_mask_length__lte'
  528. )
  529. vrf_id = django_filters.ModelMultipleChoiceFilter(
  530. queryset=VRF.objects.all(),
  531. label=_('VRF'),
  532. )
  533. vrf = django_filters.ModelMultipleChoiceFilter(
  534. field_name='vrf__rd',
  535. queryset=VRF.objects.all(),
  536. to_field_name='rd',
  537. label=_('VRF (RD)'),
  538. )
  539. present_in_vrf_id = django_filters.ModelChoiceFilter(
  540. queryset=VRF.objects.all(),
  541. method='filter_present_in_vrf',
  542. label=_('VRF')
  543. )
  544. present_in_vrf = django_filters.ModelChoiceFilter(
  545. queryset=VRF.objects.all(),
  546. method='filter_present_in_vrf',
  547. to_field_name='rd',
  548. label=_('VRF (RD)'),
  549. )
  550. assigned_object_type = ContentTypeFilter()
  551. device = MultiValueCharFilter(
  552. method='filter_device',
  553. field_name='name',
  554. label=_('Device (name)'),
  555. )
  556. device_id = MultiValueNumberFilter(
  557. method='filter_device',
  558. field_name='pk',
  559. label=_('Device (ID)'),
  560. )
  561. virtual_machine = MultiValueCharFilter(
  562. method='filter_virtual_machine',
  563. field_name='name',
  564. label=_('Virtual machine (name)'),
  565. )
  566. virtual_machine_id = MultiValueNumberFilter(
  567. method='filter_virtual_machine',
  568. field_name='pk',
  569. label=_('Virtual machine (ID)'),
  570. )
  571. interface = django_filters.ModelMultipleChoiceFilter(
  572. field_name='interface__name',
  573. queryset=Interface.objects.all(),
  574. to_field_name='name',
  575. label=_('Interface (name)'),
  576. )
  577. interface_id = django_filters.ModelMultipleChoiceFilter(
  578. field_name='interface',
  579. queryset=Interface.objects.all(),
  580. label=_('Interface (ID)'),
  581. )
  582. vminterface = django_filters.ModelMultipleChoiceFilter(
  583. field_name='vminterface__name',
  584. queryset=VMInterface.objects.all(),
  585. to_field_name='name',
  586. label=_('VM interface (name)'),
  587. )
  588. vminterface_id = django_filters.ModelMultipleChoiceFilter(
  589. field_name='vminterface',
  590. queryset=VMInterface.objects.all(),
  591. label=_('VM interface (ID)'),
  592. )
  593. fhrpgroup_id = django_filters.ModelMultipleChoiceFilter(
  594. field_name='fhrpgroup',
  595. queryset=FHRPGroup.objects.all(),
  596. label=_('FHRP group (ID)'),
  597. )
  598. assigned_to_interface = django_filters.BooleanFilter(
  599. method='_assigned_to_interface',
  600. label=_('Is assigned to an interface'),
  601. )
  602. assigned = django_filters.BooleanFilter(
  603. method='_assigned',
  604. label=_('Is assigned'),
  605. )
  606. status = django_filters.MultipleChoiceFilter(
  607. choices=IPAddressStatusChoices,
  608. null_value=None
  609. )
  610. role = django_filters.MultipleChoiceFilter(
  611. choices=IPAddressRoleChoices
  612. )
  613. service_id = django_filters.ModelMultipleChoiceFilter(
  614. field_name='services',
  615. queryset=Service.objects.all(),
  616. label=_('Application Service (ID)'),
  617. )
  618. nat_inside_id = django_filters.ModelMultipleChoiceFilter(
  619. field_name='nat_inside',
  620. queryset=IPAddress.objects.all(),
  621. label=_('NAT inside IP address (ID)'),
  622. )
  623. class Meta:
  624. model = IPAddress
  625. fields = ('id', 'dns_name', 'description', 'assigned_object_type', 'assigned_object_id')
  626. def search(self, queryset, name, value):
  627. if not value.strip():
  628. return queryset
  629. qs_filter = (
  630. Q(dns_name__icontains=value) |
  631. Q(description__icontains=value) |
  632. Q(address__istartswith=value)
  633. )
  634. return queryset.filter(qs_filter)
  635. def search_by_parent(self, queryset, name, value):
  636. if not value:
  637. return queryset
  638. q = Q()
  639. for prefix in value:
  640. try:
  641. query = str(netaddr.IPNetwork(prefix.strip()).cidr)
  642. q |= Q(address__net_host_contained=query)
  643. except (AddrFormatError, ValueError):
  644. return queryset.none()
  645. return queryset.filter(q)
  646. def parse_inet_addresses(self, value):
  647. """
  648. Parse networks or IP addresses and cast to a format
  649. acceptable by the Postgres inet type.
  650. Skips invalid values.
  651. """
  652. parsed = []
  653. for addr in value:
  654. if netaddr.valid_ipv4(addr) or netaddr.valid_ipv6(addr):
  655. parsed.append(addr)
  656. continue
  657. try:
  658. network = netaddr.IPNetwork(addr)
  659. parsed.append(str(network))
  660. except (AddrFormatError, ValueError):
  661. continue
  662. return parsed
  663. def filter_address(self, queryset, name, value):
  664. # Let's first parse the addresses passed
  665. # as argument. If they are all invalid,
  666. # we return an empty queryset
  667. value = self.parse_inet_addresses(value)
  668. if len(value) == 0:
  669. return queryset.none()
  670. try:
  671. return queryset.filter(address__net_in=value)
  672. except ValidationError:
  673. return queryset.none()
  674. @extend_schema_field(OpenApiTypes.STR)
  675. def filter_present_in_vrf(self, queryset, name, vrf):
  676. if vrf is None:
  677. return queryset.none()
  678. return queryset.filter(
  679. Q(vrf=vrf) |
  680. Q(vrf__export_targets__in=vrf.import_targets.all())
  681. ).distinct()
  682. def filter_device(self, queryset, name, value):
  683. devices = Device.objects.filter(**{'{}__in'.format(name): value})
  684. if not devices.exists():
  685. return queryset.none()
  686. interface_ids = []
  687. for device in devices:
  688. interface_ids.extend(device.vc_interfaces().values_list('id', flat=True))
  689. return queryset.filter(
  690. interface__in=interface_ids
  691. )
  692. def filter_virtual_machine(self, queryset, name, value):
  693. virtual_machines = VirtualMachine.objects.filter(**{'{}__in'.format(name): value})
  694. if not virtual_machines.exists():
  695. return queryset.none()
  696. interface_ids = []
  697. for vm in virtual_machines:
  698. interface_ids.extend(vm.interfaces.values_list('id', flat=True))
  699. return queryset.filter(
  700. vminterface__in=interface_ids
  701. )
  702. def _assigned_to_interface(self, queryset, name, value):
  703. content_types = ContentType.objects.get_for_models(Interface, VMInterface).values()
  704. if value:
  705. return queryset.filter(
  706. assigned_object_type__in=content_types,
  707. assigned_object_id__isnull=False
  708. )
  709. else:
  710. return queryset.exclude(
  711. assigned_object_type__in=content_types,
  712. assigned_object_id__isnull=False
  713. )
  714. def _assigned(self, queryset, name, value):
  715. if value:
  716. return queryset.exclude(
  717. assigned_object_type__isnull=True,
  718. assigned_object_id__isnull=True
  719. )
  720. else:
  721. return queryset.filter(
  722. assigned_object_type__isnull=True,
  723. assigned_object_id__isnull=True
  724. )
  725. class FHRPGroupFilterSet(NetBoxModelFilterSet):
  726. protocol = django_filters.MultipleChoiceFilter(
  727. choices=FHRPGroupProtocolChoices
  728. )
  729. auth_type = django_filters.MultipleChoiceFilter(
  730. choices=FHRPGroupAuthTypeChoices
  731. )
  732. related_ip = django_filters.ModelMultipleChoiceFilter(
  733. queryset=IPAddress.objects.all(),
  734. method='filter_related_ip'
  735. )
  736. class Meta:
  737. model = FHRPGroup
  738. fields = ('id', 'group_id', 'name', 'auth_key', 'description')
  739. def search(self, queryset, name, value):
  740. if not value.strip():
  741. return queryset
  742. return queryset.filter(
  743. Q(description__icontains=value) |
  744. Q(group_id__contains=value) |
  745. Q(name__icontains=value)
  746. )
  747. @extend_schema_field(OpenApiTypes.STR)
  748. def filter_related_ip(self, queryset, name, value):
  749. """
  750. Filter by VRF & prefix of assigned IP addresses.
  751. """
  752. ip_filter = Q()
  753. for ipaddress in value:
  754. if ipaddress.vrf:
  755. q = Q(
  756. ip_addresses__address__net_contained_or_equal=ipaddress.address,
  757. ip_addresses__vrf=ipaddress.vrf
  758. )
  759. else:
  760. q = Q(
  761. ip_addresses__address__net_contained_or_equal=ipaddress.address,
  762. ip_addresses__vrf__isnull=True
  763. )
  764. ip_filter |= q
  765. return queryset.filter(ip_filter)
  766. class FHRPGroupAssignmentFilterSet(ChangeLoggedModelFilterSet):
  767. interface_type = ContentTypeFilter()
  768. group_id = django_filters.ModelMultipleChoiceFilter(
  769. queryset=FHRPGroup.objects.all(),
  770. label=_('Group (ID)'),
  771. )
  772. device = MultiValueCharFilter(
  773. method='filter_device',
  774. field_name='name',
  775. label=_('Device (name)'),
  776. )
  777. device_id = MultiValueNumberFilter(
  778. method='filter_device',
  779. field_name='pk',
  780. label=_('Device (ID)'),
  781. )
  782. virtual_machine = MultiValueCharFilter(
  783. method='filter_virtual_machine',
  784. field_name='name',
  785. label=_('Virtual machine (name)'),
  786. )
  787. virtual_machine_id = MultiValueNumberFilter(
  788. method='filter_virtual_machine',
  789. field_name='pk',
  790. label=_('Virtual machine (ID)'),
  791. )
  792. class Meta:
  793. model = FHRPGroupAssignment
  794. fields = ('id', 'group_id', 'interface_type', 'interface_id', 'priority')
  795. def filter_device(self, queryset, name, value):
  796. devices = Device.objects.filter(**{f'{name}__in': value})
  797. if not devices.exists():
  798. return queryset.none()
  799. interface_ids = []
  800. for device in devices:
  801. interface_ids.extend(device.vc_interfaces().values_list('id', flat=True))
  802. return queryset.filter(
  803. Q(interface_type=ContentType.objects.get_for_model(Interface), interface_id__in=interface_ids)
  804. )
  805. def filter_virtual_machine(self, queryset, name, value):
  806. virtual_machines = VirtualMachine.objects.filter(**{f'{name}__in': value})
  807. if not virtual_machines.exists():
  808. return queryset.none()
  809. interface_ids = []
  810. for vm in virtual_machines:
  811. interface_ids.extend(vm.interfaces.values_list('id', flat=True))
  812. return queryset.filter(
  813. Q(interface_type=ContentType.objects.get_for_model(VMInterface), interface_id__in=interface_ids)
  814. )
  815. class VLANGroupFilterSet(OrganizationalModelFilterSet, TenancyFilterSet):
  816. scope_type = ContentTypeFilter()
  817. region = django_filters.NumberFilter(
  818. method='filter_scope'
  819. )
  820. site_group = django_filters.NumberFilter(
  821. method='filter_scope'
  822. )
  823. site = django_filters.NumberFilter(
  824. method='filter_scope'
  825. )
  826. location = django_filters.NumberFilter(
  827. method='filter_scope'
  828. )
  829. rack = django_filters.NumberFilter(
  830. method='filter_scope'
  831. )
  832. cluster_group = django_filters.NumberFilter(
  833. method='filter_scope'
  834. )
  835. cluster = django_filters.NumberFilter(
  836. method='filter_scope'
  837. )
  838. contains_vid = django_filters.NumberFilter(
  839. field_name='vid_ranges',
  840. lookup_expr='range_contains',
  841. )
  842. class Meta:
  843. model = VLANGroup
  844. fields = ('id', 'name', 'slug', 'description', 'scope_id')
  845. def search(self, queryset, name, value):
  846. if not value.strip():
  847. return queryset
  848. qs_filter = (
  849. Q(name__icontains=value) |
  850. Q(description__icontains=value)
  851. )
  852. return queryset.filter(qs_filter)
  853. def filter_scope(self, queryset, name, value):
  854. model_name = name.replace('_', '')
  855. return queryset.filter(
  856. scope_type=ContentType.objects.get(model=model_name),
  857. scope_id=value
  858. )
  859. class VLANFilterSet(NetBoxModelFilterSet, TenancyFilterSet):
  860. region_id = TreeNodeMultipleChoiceFilter(
  861. queryset=Region.objects.all(),
  862. field_name='site__region',
  863. lookup_expr='in',
  864. label=_('Region (ID)'),
  865. )
  866. region = TreeNodeMultipleChoiceFilter(
  867. queryset=Region.objects.all(),
  868. field_name='site__region',
  869. lookup_expr='in',
  870. to_field_name='slug',
  871. label=_('Region (slug)'),
  872. )
  873. site_group_id = TreeNodeMultipleChoiceFilter(
  874. queryset=SiteGroup.objects.all(),
  875. field_name='site__group',
  876. lookup_expr='in',
  877. label=_('Site group (ID)'),
  878. )
  879. site_group = TreeNodeMultipleChoiceFilter(
  880. queryset=SiteGroup.objects.all(),
  881. field_name='site__group',
  882. lookup_expr='in',
  883. to_field_name='slug',
  884. label=_('Site group (slug)'),
  885. )
  886. site_id = django_filters.ModelMultipleChoiceFilter(
  887. queryset=Site.objects.all(),
  888. label=_('Site (ID)'),
  889. )
  890. site = django_filters.ModelMultipleChoiceFilter(
  891. field_name='site__slug',
  892. queryset=Site.objects.all(),
  893. to_field_name='slug',
  894. label=_('Site (slug)'),
  895. )
  896. group_id = django_filters.ModelMultipleChoiceFilter(
  897. queryset=VLANGroup.objects.all(),
  898. label=_('Group (ID)'),
  899. )
  900. group = django_filters.ModelMultipleChoiceFilter(
  901. field_name='group__slug',
  902. queryset=VLANGroup.objects.all(),
  903. to_field_name='slug',
  904. label=_('Group'),
  905. )
  906. role_id = django_filters.ModelMultipleChoiceFilter(
  907. queryset=Role.objects.all(),
  908. label=_('Role (ID)'),
  909. )
  910. role = django_filters.ModelMultipleChoiceFilter(
  911. field_name='role__slug',
  912. queryset=Role.objects.all(),
  913. to_field_name='slug',
  914. label=_('Role (slug)'),
  915. )
  916. status = django_filters.MultipleChoiceFilter(
  917. choices=VLANStatusChoices,
  918. null_value=None
  919. )
  920. available_at_site = django_filters.ModelChoiceFilter(
  921. queryset=Site.objects.all(),
  922. method='get_for_site'
  923. )
  924. available_on_device = django_filters.ModelChoiceFilter(
  925. queryset=Device.objects.all(),
  926. method='get_for_device'
  927. )
  928. available_on_virtualmachine = django_filters.ModelChoiceFilter(
  929. queryset=VirtualMachine.objects.all(),
  930. method='get_for_virtualmachine'
  931. )
  932. qinq_role = django_filters.MultipleChoiceFilter(
  933. choices=VLANQinQRoleChoices
  934. )
  935. qinq_svlan_id = django_filters.ModelMultipleChoiceFilter(
  936. queryset=VLAN.objects.all(),
  937. label=_('Q-in-Q SVLAN (ID)'),
  938. )
  939. qinq_svlan_vid = MultiValueNumberFilter(
  940. field_name='qinq_svlan__vid',
  941. label=_('Q-in-Q SVLAN number (1-4094)'),
  942. )
  943. l2vpn_id = django_filters.ModelMultipleChoiceFilter(
  944. field_name='l2vpn_terminations__l2vpn',
  945. queryset=L2VPN.objects.all(),
  946. label=_('L2VPN (ID)'),
  947. )
  948. l2vpn = django_filters.ModelMultipleChoiceFilter(
  949. field_name='l2vpn_terminations__l2vpn__identifier',
  950. queryset=L2VPN.objects.all(),
  951. to_field_name='identifier',
  952. label=_('L2VPN'),
  953. )
  954. interface_id = django_filters.ModelChoiceFilter(
  955. queryset=Interface.objects.all(),
  956. method='filter_interface_id',
  957. label=_('Assigned interface')
  958. )
  959. vminterface_id = django_filters.ModelChoiceFilter(
  960. queryset=VMInterface.objects.all(),
  961. method='filter_vminterface_id',
  962. label=_('Assigned VM interface')
  963. )
  964. class Meta:
  965. model = VLAN
  966. fields = ('id', 'vid', 'name', 'description')
  967. def search(self, queryset, name, value):
  968. if not value.strip():
  969. return queryset
  970. qs_filter = Q(name__icontains=value) | Q(description__icontains=value)
  971. try:
  972. qs_filter |= Q(vid=int(value.strip()))
  973. except ValueError:
  974. pass
  975. return queryset.filter(qs_filter)
  976. @extend_schema_field(OpenApiTypes.STR)
  977. def get_for_site(self, queryset, name, value):
  978. return queryset.get_for_site(value)
  979. @extend_schema_field(OpenApiTypes.STR)
  980. def get_for_device(self, queryset, name, value):
  981. return queryset.get_for_device(value)
  982. @extend_schema_field(OpenApiTypes.STR)
  983. def get_for_virtualmachine(self, queryset, name, value):
  984. return queryset.get_for_virtualmachine(value)
  985. @extend_schema_field(OpenApiTypes.INT)
  986. def filter_interface_id(self, queryset, name, value):
  987. if value is None:
  988. return queryset.none()
  989. return queryset.filter(
  990. Q(interfaces_as_tagged=value) |
  991. Q(interfaces_as_untagged=value)
  992. ).distinct()
  993. @extend_schema_field(OpenApiTypes.INT)
  994. def filter_vminterface_id(self, queryset, name, value):
  995. if value is None:
  996. return queryset.none()
  997. return queryset.filter(
  998. Q(vminterfaces_as_tagged=value) |
  999. Q(vminterfaces_as_untagged=value)
  1000. ).distinct()
  1001. class VLANTranslationPolicyFilterSet(NetBoxModelFilterSet):
  1002. class Meta:
  1003. model = VLANTranslationPolicy
  1004. fields = ('id', 'name', 'description')
  1005. def search(self, queryset, name, value):
  1006. if not value.strip():
  1007. return queryset
  1008. qs_filter = (
  1009. Q(name__icontains=value) |
  1010. Q(description__icontains=value)
  1011. )
  1012. return queryset.filter(qs_filter)
  1013. class VLANTranslationRuleFilterSet(NetBoxModelFilterSet):
  1014. policy_id = django_filters.ModelMultipleChoiceFilter(
  1015. queryset=VLANTranslationPolicy.objects.all(),
  1016. label=_('VLAN Translation Policy (ID)'),
  1017. )
  1018. policy = django_filters.ModelMultipleChoiceFilter(
  1019. field_name='policy__name',
  1020. queryset=VLANTranslationPolicy.objects.all(),
  1021. to_field_name='name',
  1022. label=_('VLAN Translation Policy (name)'),
  1023. )
  1024. class Meta:
  1025. model = VLANTranslationRule
  1026. fields = ('id', 'policy_id', 'policy', 'local_vid', 'remote_vid', 'description')
  1027. def search(self, queryset, name, value):
  1028. if not value.strip():
  1029. return queryset
  1030. qs_filter = (
  1031. Q(policy__name__icontains=value)
  1032. )
  1033. try:
  1034. int_value = int(value.strip())
  1035. qs_filter |= Q(local_vid=int_value)
  1036. qs_filter |= Q(remote_vid=int_value)
  1037. except ValueError:
  1038. pass
  1039. return queryset.filter(qs_filter)
  1040. class ServiceTemplateFilterSet(NetBoxModelFilterSet):
  1041. port = NumericArrayFilter(
  1042. field_name='ports',
  1043. lookup_expr='contains'
  1044. )
  1045. class Meta:
  1046. model = ServiceTemplate
  1047. fields = ('id', 'name', 'protocol', 'description')
  1048. def search(self, queryset, name, value):
  1049. if not value.strip():
  1050. return queryset
  1051. qs_filter = (
  1052. Q(name__icontains=value) |
  1053. Q(description__icontains=value)
  1054. )
  1055. return queryset.filter(qs_filter)
  1056. class ServiceFilterSet(ContactModelFilterSet, NetBoxModelFilterSet):
  1057. parent_object_type = ContentTypeFilter()
  1058. device = MultiValueCharFilter(
  1059. method='filter_device',
  1060. field_name='name',
  1061. label=_('Device (name)'),
  1062. )
  1063. device_id = MultiValueNumberFilter(
  1064. method='filter_device',
  1065. field_name='pk',
  1066. label=_('Device (ID)'),
  1067. )
  1068. virtual_machine = MultiValueCharFilter(
  1069. method='filter_virtual_machine',
  1070. field_name='name',
  1071. label=_('Virtual machine (name)'),
  1072. )
  1073. virtual_machine_id = MultiValueNumberFilter(
  1074. method='filter_virtual_machine',
  1075. field_name='pk',
  1076. label=_('Virtual machine (ID)'),
  1077. )
  1078. fhrpgroup = MultiValueCharFilter(
  1079. method='filter_fhrp_group',
  1080. field_name='name',
  1081. label=_('FHRP Group (name)'),
  1082. )
  1083. fhrpgroup_id = MultiValueNumberFilter(
  1084. method='filter_fhrp_group',
  1085. field_name='pk',
  1086. label=_('FHRP Group (ID)'),
  1087. )
  1088. ip_address_id = django_filters.ModelMultipleChoiceFilter(
  1089. field_name='ipaddresses',
  1090. queryset=IPAddress.objects.all(),
  1091. label=_('IP address (ID)'),
  1092. )
  1093. ip_address = django_filters.ModelMultipleChoiceFilter(
  1094. field_name='ipaddresses__address',
  1095. queryset=IPAddress.objects.all(),
  1096. to_field_name='address',
  1097. label=_('IP address'),
  1098. )
  1099. port = NumericArrayFilter(
  1100. field_name='ports',
  1101. lookup_expr='contains'
  1102. )
  1103. class Meta:
  1104. model = Service
  1105. fields = ('id', 'name', 'protocol', 'description', 'parent_object_type', 'parent_object_id')
  1106. def search(self, queryset, name, value):
  1107. if not value.strip():
  1108. return queryset
  1109. qs_filter = Q(name__icontains=value) | Q(description__icontains=value)
  1110. return queryset.filter(qs_filter)
  1111. def filter_device(self, queryset, name, value):
  1112. devices = Device.objects.filter(**{'{}__in'.format(name): value})
  1113. if not devices.exists():
  1114. return queryset.none()
  1115. service_ids = []
  1116. for device in devices:
  1117. service_ids.extend(device.services.values_list('id', flat=True))
  1118. return queryset.filter(id__in=service_ids)
  1119. def filter_fhrp_group(self, queryset, name, value):
  1120. groups = FHRPGroup.objects.filter(**{'{}__in'.format(name): value})
  1121. if not groups.exists():
  1122. return queryset.none()
  1123. service_ids = []
  1124. for group in groups:
  1125. service_ids.extend(group.services.values_list('id', flat=True))
  1126. return queryset.filter(id__in=service_ids)
  1127. def filter_virtual_machine(self, queryset, name, value):
  1128. virtual_machines = VirtualMachine.objects.filter(**{'{}__in'.format(name): value})
  1129. if not virtual_machines.exists():
  1130. return queryset.none()
  1131. service_ids = []
  1132. for vm in virtual_machines:
  1133. service_ids.extend(vm.services.values_list('id', flat=True))
  1134. return queryset.filter(id__in=service_ids)
  1135. class PrimaryIPFilterSet(django_filters.FilterSet):
  1136. """
  1137. An inheritable FilterSet for models which support primary IP assignment.
  1138. """
  1139. primary_ip4_id = django_filters.ModelMultipleChoiceFilter(
  1140. field_name='primary_ip4',
  1141. queryset=IPAddress.objects.all(),
  1142. label=_('Primary IPv4 (ID)'),
  1143. )
  1144. primary_ip4 = django_filters.ModelMultipleChoiceFilter(
  1145. field_name='primary_ip4__address',
  1146. queryset=IPAddress.objects.all(),
  1147. to_field_name='address',
  1148. label=_('Primary IPv4 (address)'),
  1149. )
  1150. primary_ip6_id = django_filters.ModelMultipleChoiceFilter(
  1151. field_name='primary_ip6',
  1152. queryset=IPAddress.objects.all(),
  1153. label=_('Primary IPv6 (ID)'),
  1154. )
  1155. primary_ip6 = django_filters.ModelMultipleChoiceFilter(
  1156. field_name='primary_ip6__address',
  1157. queryset=IPAddress.objects.all(),
  1158. to_field_name='address',
  1159. label=_('Primary IPv6 (address)'),
  1160. )