filters.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. import django_filters
  2. import netaddr
  3. from django.core.exceptions import ValidationError
  4. from django.db.models import Q
  5. from netaddr.core import AddrFormatError
  6. from dcim.models import Site, Device, Interface
  7. from extras.filters import CustomFieldFilterSet
  8. from tenancy.models import Tenant
  9. from utilities.filters import NameSlugSearchFilterSet, TagFilter
  10. from virtualization.models import VirtualMachine
  11. from .constants import IPADDRESS_ROLE_CHOICES, IPADDRESS_STATUS_CHOICES, PREFIX_STATUS_CHOICES, VLAN_STATUS_CHOICES
  12. from .models import Aggregate, IPAddress, Prefix, RIR, Role, Service, VLAN, VLANGroup, VRF
  13. class VRFFilter(CustomFieldFilterSet):
  14. q = django_filters.CharFilter(
  15. method='search',
  16. label='Search',
  17. )
  18. tenant_id = django_filters.ModelMultipleChoiceFilter(
  19. queryset=Tenant.objects.all(),
  20. label='Tenant (ID)',
  21. )
  22. tenant = django_filters.ModelMultipleChoiceFilter(
  23. field_name='tenant__slug',
  24. queryset=Tenant.objects.all(),
  25. to_field_name='slug',
  26. label='Tenant (slug)',
  27. )
  28. tag = TagFilter()
  29. class Meta:
  30. model = VRF
  31. fields = ['id', 'name', 'rd', 'enforce_unique']
  32. def search(self, queryset, name, value):
  33. if not value.strip():
  34. return queryset
  35. return queryset.filter(
  36. Q(name__icontains=value) |
  37. Q(rd__icontains=value) |
  38. Q(description__icontains=value)
  39. )
  40. class RIRFilter(NameSlugSearchFilterSet):
  41. class Meta:
  42. model = RIR
  43. fields = ['id', 'name', 'slug', 'is_private']
  44. class AggregateFilter(CustomFieldFilterSet):
  45. q = django_filters.CharFilter(
  46. method='search',
  47. label='Search',
  48. )
  49. prefix = django_filters.CharFilter(
  50. method='filter_prefix',
  51. label='Prefix',
  52. )
  53. rir_id = django_filters.ModelMultipleChoiceFilter(
  54. queryset=RIR.objects.all(),
  55. label='RIR (ID)',
  56. )
  57. rir = django_filters.ModelMultipleChoiceFilter(
  58. field_name='rir__slug',
  59. queryset=RIR.objects.all(),
  60. to_field_name='slug',
  61. label='RIR (slug)',
  62. )
  63. tag = TagFilter()
  64. class Meta:
  65. model = Aggregate
  66. fields = ['id', 'family', 'date_added']
  67. def search(self, queryset, name, value):
  68. if not value.strip():
  69. return queryset
  70. qs_filter = Q(description__icontains=value)
  71. try:
  72. prefix = str(netaddr.IPNetwork(value.strip()).cidr)
  73. qs_filter |= Q(prefix__net_contains_or_equals=prefix)
  74. except (AddrFormatError, ValueError):
  75. pass
  76. return queryset.filter(qs_filter)
  77. def filter_prefix(self, queryset, name, value):
  78. if not value.strip():
  79. return queryset
  80. try:
  81. query = str(netaddr.IPNetwork(value).cidr)
  82. return queryset.filter(prefix=query)
  83. except ValidationError:
  84. return queryset.none()
  85. class RoleFilter(NameSlugSearchFilterSet):
  86. q = django_filters.CharFilter(
  87. method='search',
  88. label='Search',
  89. )
  90. class Meta:
  91. model = Role
  92. fields = ['id', 'name', 'slug']
  93. class PrefixFilter(CustomFieldFilterSet):
  94. q = django_filters.CharFilter(
  95. method='search',
  96. label='Search',
  97. )
  98. prefix = django_filters.CharFilter(
  99. method='filter_prefix',
  100. label='Prefix',
  101. )
  102. within = django_filters.CharFilter(
  103. method='search_within',
  104. label='Within prefix',
  105. )
  106. within_include = django_filters.CharFilter(
  107. method='search_within_include',
  108. label='Within and including prefix',
  109. )
  110. contains = django_filters.CharFilter(
  111. method='search_contains',
  112. label='Prefixes which contain this prefix or IP',
  113. )
  114. mask_length = django_filters.NumberFilter(
  115. method='filter_mask_length',
  116. label='Mask length',
  117. )
  118. vrf_id = django_filters.ModelMultipleChoiceFilter(
  119. queryset=VRF.objects.all(),
  120. label='VRF',
  121. )
  122. vrf = django_filters.ModelMultipleChoiceFilter(
  123. field_name='vrf__rd',
  124. queryset=VRF.objects.all(),
  125. to_field_name='rd',
  126. label='VRF (RD)',
  127. )
  128. tenant_id = django_filters.ModelMultipleChoiceFilter(
  129. queryset=Tenant.objects.all(),
  130. label='Tenant (ID)',
  131. )
  132. tenant = django_filters.ModelMultipleChoiceFilter(
  133. field_name='tenant__slug',
  134. queryset=Tenant.objects.all(),
  135. to_field_name='slug',
  136. label='Tenant (slug)',
  137. )
  138. site_id = django_filters.ModelMultipleChoiceFilter(
  139. queryset=Site.objects.all(),
  140. label='Site (ID)',
  141. )
  142. site = django_filters.ModelMultipleChoiceFilter(
  143. field_name='site__slug',
  144. queryset=Site.objects.all(),
  145. to_field_name='slug',
  146. label='Site (slug)',
  147. )
  148. vlan_id = django_filters.ModelMultipleChoiceFilter(
  149. queryset=VLAN.objects.all(),
  150. label='VLAN (ID)',
  151. )
  152. vlan_vid = django_filters.NumberFilter(
  153. field_name='vlan__vid',
  154. label='VLAN number (1-4095)',
  155. )
  156. role_id = django_filters.ModelMultipleChoiceFilter(
  157. queryset=Role.objects.all(),
  158. label='Role (ID)',
  159. )
  160. role = django_filters.ModelMultipleChoiceFilter(
  161. field_name='role__slug',
  162. queryset=Role.objects.all(),
  163. to_field_name='slug',
  164. label='Role (slug)',
  165. )
  166. status = django_filters.MultipleChoiceFilter(
  167. choices=PREFIX_STATUS_CHOICES,
  168. null_value=None
  169. )
  170. tag = TagFilter()
  171. class Meta:
  172. model = Prefix
  173. fields = ['id', 'family', 'is_pool']
  174. def search(self, queryset, name, value):
  175. if not value.strip():
  176. return queryset
  177. qs_filter = Q(description__icontains=value)
  178. try:
  179. prefix = str(netaddr.IPNetwork(value.strip()).cidr)
  180. qs_filter |= Q(prefix__net_contains_or_equals=prefix)
  181. except (AddrFormatError, ValueError):
  182. pass
  183. return queryset.filter(qs_filter)
  184. def filter_prefix(self, queryset, name, value):
  185. if not value.strip():
  186. return queryset
  187. try:
  188. query = str(netaddr.IPNetwork(value).cidr)
  189. return queryset.filter(prefix=query)
  190. except ValidationError:
  191. return queryset.none()
  192. def search_within(self, queryset, name, value):
  193. value = value.strip()
  194. if not value:
  195. return queryset
  196. try:
  197. query = str(netaddr.IPNetwork(value).cidr)
  198. return queryset.filter(prefix__net_contained=query)
  199. except (AddrFormatError, ValueError):
  200. return queryset.none()
  201. def search_within_include(self, queryset, name, value):
  202. value = value.strip()
  203. if not value:
  204. return queryset
  205. try:
  206. query = str(netaddr.IPNetwork(value).cidr)
  207. return queryset.filter(prefix__net_contained_or_equal=query)
  208. except (AddrFormatError, ValueError):
  209. return queryset.none()
  210. def search_contains(self, queryset, name, value):
  211. value = value.strip()
  212. if not value:
  213. return queryset
  214. try:
  215. # Searching by prefix
  216. if '/' in value:
  217. return queryset.filter(prefix__net_contains_or_equals=str(netaddr.IPNetwork(value).cidr))
  218. # Searching by IP address
  219. else:
  220. return queryset.filter(prefix__net_contains=str(netaddr.IPAddress(value)))
  221. except (AddrFormatError, ValueError):
  222. return queryset.none()
  223. def filter_mask_length(self, queryset, name, value):
  224. if not value:
  225. return queryset
  226. return queryset.filter(prefix__net_mask_length=value)
  227. class IPAddressFilter(CustomFieldFilterSet):
  228. q = django_filters.CharFilter(
  229. method='search',
  230. label='Search',
  231. )
  232. parent = django_filters.CharFilter(
  233. method='search_by_parent',
  234. label='Parent prefix',
  235. )
  236. address = django_filters.CharFilter(
  237. method='filter_address',
  238. label='Address',
  239. )
  240. mask_length = django_filters.NumberFilter(
  241. method='filter_mask_length',
  242. label='Mask length',
  243. )
  244. vrf_id = django_filters.ModelMultipleChoiceFilter(
  245. queryset=VRF.objects.all(),
  246. label='VRF',
  247. )
  248. vrf = django_filters.ModelMultipleChoiceFilter(
  249. field_name='vrf__rd',
  250. queryset=VRF.objects.all(),
  251. to_field_name='rd',
  252. label='VRF (RD)',
  253. )
  254. tenant_id = django_filters.ModelMultipleChoiceFilter(
  255. queryset=Tenant.objects.all(),
  256. label='Tenant (ID)',
  257. )
  258. tenant = django_filters.ModelMultipleChoiceFilter(
  259. field_name='tenant__slug',
  260. queryset=Tenant.objects.all(),
  261. to_field_name='slug',
  262. label='Tenant (slug)',
  263. )
  264. device = django_filters.CharFilter(
  265. method='filter_device',
  266. field_name='name',
  267. label='Device',
  268. )
  269. device_id = django_filters.NumberFilter(
  270. method='filter_device',
  271. field_name='pk',
  272. label='Device (ID)',
  273. )
  274. virtual_machine_id = django_filters.ModelMultipleChoiceFilter(
  275. field_name='interface__virtual_machine',
  276. queryset=VirtualMachine.objects.all(),
  277. label='Virtual machine (ID)',
  278. )
  279. virtual_machine = django_filters.ModelMultipleChoiceFilter(
  280. field_name='interface__virtual_machine__name',
  281. queryset=VirtualMachine.objects.all(),
  282. to_field_name='name',
  283. label='Virtual machine (name)',
  284. )
  285. interface_id = django_filters.ModelMultipleChoiceFilter(
  286. queryset=Interface.objects.all(),
  287. label='Interface (ID)',
  288. )
  289. status = django_filters.MultipleChoiceFilter(
  290. choices=IPADDRESS_STATUS_CHOICES,
  291. null_value=None
  292. )
  293. role = django_filters.MultipleChoiceFilter(
  294. choices=IPADDRESS_ROLE_CHOICES
  295. )
  296. tag = TagFilter()
  297. class Meta:
  298. model = IPAddress
  299. fields = ['id', 'family', 'dns_name']
  300. def search(self, queryset, name, value):
  301. if not value.strip():
  302. return queryset
  303. qs_filter = (
  304. Q(dns_name__icontains=value) |
  305. Q(description__icontains=value) |
  306. Q(address__istartswith=value)
  307. )
  308. return queryset.filter(qs_filter)
  309. def search_by_parent(self, queryset, name, value):
  310. value = value.strip()
  311. if not value:
  312. return queryset
  313. try:
  314. query = str(netaddr.IPNetwork(value.strip()).cidr)
  315. return queryset.filter(address__net_host_contained=query)
  316. except (AddrFormatError, ValueError):
  317. return queryset.none()
  318. def filter_address(self, queryset, name, value):
  319. if not value.strip():
  320. return queryset
  321. try:
  322. # Match address and subnet mask
  323. if '/' in value:
  324. return queryset.filter(address=value)
  325. return queryset.filter(address__net_host=value)
  326. except ValidationError:
  327. return queryset.none()
  328. def filter_mask_length(self, queryset, name, value):
  329. if not value:
  330. return queryset
  331. return queryset.filter(address__net_mask_length=value)
  332. def filter_device(self, queryset, name, value):
  333. try:
  334. device = Device.objects.select_related('device_type').get(**{name: value})
  335. vc_interface_ids = [i['id'] for i in device.vc_interfaces.values('id')]
  336. return queryset.filter(interface_id__in=vc_interface_ids)
  337. except Device.DoesNotExist:
  338. return queryset.none()
  339. class VLANGroupFilter(NameSlugSearchFilterSet):
  340. site_id = django_filters.ModelMultipleChoiceFilter(
  341. queryset=Site.objects.all(),
  342. label='Site (ID)',
  343. )
  344. site = django_filters.ModelMultipleChoiceFilter(
  345. field_name='site__slug',
  346. queryset=Site.objects.all(),
  347. to_field_name='slug',
  348. label='Site (slug)',
  349. )
  350. class Meta:
  351. model = VLANGroup
  352. fields = ['id', 'name', 'slug']
  353. class VLANFilter(CustomFieldFilterSet):
  354. q = django_filters.CharFilter(
  355. method='search',
  356. label='Search',
  357. )
  358. site_id = django_filters.ModelMultipleChoiceFilter(
  359. queryset=Site.objects.all(),
  360. label='Site (ID)',
  361. )
  362. site = django_filters.ModelMultipleChoiceFilter(
  363. field_name='site__slug',
  364. queryset=Site.objects.all(),
  365. to_field_name='slug',
  366. label='Site (slug)',
  367. )
  368. group_id = django_filters.ModelMultipleChoiceFilter(
  369. queryset=VLANGroup.objects.all(),
  370. label='Group (ID)',
  371. )
  372. group = django_filters.ModelMultipleChoiceFilter(
  373. field_name='group__slug',
  374. queryset=VLANGroup.objects.all(),
  375. to_field_name='slug',
  376. label='Group',
  377. )
  378. tenant_id = django_filters.ModelMultipleChoiceFilter(
  379. queryset=Tenant.objects.all(),
  380. label='Tenant (ID)',
  381. )
  382. tenant = django_filters.ModelMultipleChoiceFilter(
  383. field_name='tenant__slug',
  384. queryset=Tenant.objects.all(),
  385. to_field_name='slug',
  386. label='Tenant (slug)',
  387. )
  388. role_id = django_filters.ModelMultipleChoiceFilter(
  389. queryset=Role.objects.all(),
  390. label='Role (ID)',
  391. )
  392. role = django_filters.ModelMultipleChoiceFilter(
  393. field_name='role__slug',
  394. queryset=Role.objects.all(),
  395. to_field_name='slug',
  396. label='Role (slug)',
  397. )
  398. status = django_filters.MultipleChoiceFilter(
  399. choices=VLAN_STATUS_CHOICES,
  400. null_value=None
  401. )
  402. tag = TagFilter()
  403. class Meta:
  404. model = VLAN
  405. fields = ['id', 'vid', 'name']
  406. def search(self, queryset, name, value):
  407. if not value.strip():
  408. return queryset
  409. qs_filter = Q(name__icontains=value) | Q(description__icontains=value)
  410. try:
  411. qs_filter |= Q(vid=int(value.strip()))
  412. except ValueError:
  413. pass
  414. return queryset.filter(qs_filter)
  415. class ServiceFilter(django_filters.FilterSet):
  416. q = django_filters.CharFilter(
  417. method='search',
  418. label='Search',
  419. )
  420. device_id = django_filters.ModelMultipleChoiceFilter(
  421. queryset=Device.objects.all(),
  422. label='Device (ID)',
  423. )
  424. device = django_filters.ModelMultipleChoiceFilter(
  425. field_name='device__name',
  426. queryset=Device.objects.all(),
  427. to_field_name='name',
  428. label='Device (name)',
  429. )
  430. virtual_machine_id = django_filters.ModelMultipleChoiceFilter(
  431. queryset=VirtualMachine.objects.all(),
  432. label='Virtual machine (ID)',
  433. )
  434. virtual_machine = django_filters.ModelMultipleChoiceFilter(
  435. field_name='virtual_machine__name',
  436. queryset=VirtualMachine.objects.all(),
  437. to_field_name='name',
  438. label='Virtual machine (name)',
  439. )
  440. tag = TagFilter()
  441. class Meta:
  442. model = Service
  443. fields = ['id', 'name', 'protocol', 'port']
  444. def search(self, queryset, name, value):
  445. if not value.strip():
  446. return queryset
  447. qs_filter = Q(name__icontains=value) | Q(description__icontains=value)
  448. return queryset.filter(qs_filter)