filters.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482
  1. import django_filters
  2. import netaddr
  3. from django.core.exceptions import ValidationError
  4. from django.db.models import Q
  5. from netaddr.core import AddrFormatError
  6. from dcim.models import Site, Device, Interface
  7. from extras.filters import CustomFieldFilterSet, ChangeLoggedFilter
  8. from tenancy.filtersets import TenancyFilterSet
  9. from utilities.filters import NameSlugSearchFilterSet, NumericInFilter, TagFilter
  10. from virtualization.models import VirtualMachine
  11. from .constants import *
  12. from .models import Aggregate, IPAddress, Prefix, RIR, Role, Service, VLAN, VLANGroup, VRF
  13. class VRFFilter(TenancyFilterSet, CustomFieldFilterSet, ChangeLoggedFilter):
  14. id__in = NumericInFilter(
  15. field_name='id',
  16. lookup_expr='in'
  17. )
  18. q = django_filters.CharFilter(
  19. method='search',
  20. label='Search',
  21. )
  22. tag = TagFilter()
  23. def search(self, queryset, name, value):
  24. if not value.strip():
  25. return queryset
  26. return queryset.filter(
  27. Q(name__icontains=value) |
  28. Q(rd__icontains=value) |
  29. Q(description__icontains=value)
  30. )
  31. class Meta:
  32. model = VRF
  33. fields = ['name', 'rd', 'enforce_unique']
  34. class RIRFilter(NameSlugSearchFilterSet):
  35. id__in = NumericInFilter(
  36. field_name='id',
  37. lookup_expr='in'
  38. )
  39. class Meta:
  40. model = RIR
  41. fields = ['name', 'slug', 'is_private']
  42. class AggregateFilter(CustomFieldFilterSet, ChangeLoggedFilter):
  43. id__in = NumericInFilter(
  44. field_name='id',
  45. lookup_expr='in'
  46. )
  47. q = django_filters.CharFilter(
  48. method='search',
  49. label='Search',
  50. )
  51. prefix = django_filters.CharFilter(
  52. method='filter_prefix',
  53. label='Prefix',
  54. )
  55. rir_id = django_filters.ModelMultipleChoiceFilter(
  56. queryset=RIR.objects.all(),
  57. label='RIR (ID)',
  58. )
  59. rir = django_filters.ModelMultipleChoiceFilter(
  60. field_name='rir__slug',
  61. queryset=RIR.objects.all(),
  62. to_field_name='slug',
  63. label='RIR (slug)',
  64. )
  65. tag = TagFilter()
  66. class Meta:
  67. model = Aggregate
  68. fields = ['family', 'date_added']
  69. def search(self, queryset, name, value):
  70. if not value.strip():
  71. return queryset
  72. qs_filter = Q(description__icontains=value)
  73. try:
  74. prefix = str(netaddr.IPNetwork(value.strip()).cidr)
  75. qs_filter |= Q(prefix__net_contains_or_equals=prefix)
  76. except (AddrFormatError, ValueError):
  77. pass
  78. return queryset.filter(qs_filter)
  79. def filter_prefix(self, queryset, name, value):
  80. if not value.strip():
  81. return queryset
  82. try:
  83. query = str(netaddr.IPNetwork(value).cidr)
  84. return queryset.filter(prefix=query)
  85. except ValidationError:
  86. return queryset.none()
  87. class RoleFilter(NameSlugSearchFilterSet):
  88. q = django_filters.CharFilter(
  89. method='search',
  90. label='Search',
  91. )
  92. class Meta:
  93. model = Role
  94. fields = ['id', 'name', 'slug']
  95. class PrefixFilter(TenancyFilterSet, CustomFieldFilterSet, ChangeLoggedFilter):
  96. id__in = NumericInFilter(
  97. field_name='id',
  98. lookup_expr='in'
  99. )
  100. q = django_filters.CharFilter(
  101. method='search',
  102. label='Search',
  103. )
  104. prefix = django_filters.CharFilter(
  105. method='filter_prefix',
  106. label='Prefix',
  107. )
  108. within = django_filters.CharFilter(
  109. method='search_within',
  110. label='Within prefix',
  111. )
  112. within_include = django_filters.CharFilter(
  113. method='search_within_include',
  114. label='Within and including prefix',
  115. )
  116. contains = django_filters.CharFilter(
  117. method='search_contains',
  118. label='Prefixes which contain this prefix or IP',
  119. )
  120. mask_length = django_filters.NumberFilter(
  121. method='filter_mask_length',
  122. label='Mask length',
  123. )
  124. vrf_id = django_filters.ModelMultipleChoiceFilter(
  125. queryset=VRF.objects.all(),
  126. label='VRF',
  127. )
  128. vrf = django_filters.ModelMultipleChoiceFilter(
  129. field_name='vrf__rd',
  130. queryset=VRF.objects.all(),
  131. to_field_name='rd',
  132. label='VRF (RD)',
  133. )
  134. site_id = django_filters.ModelMultipleChoiceFilter(
  135. queryset=Site.objects.all(),
  136. label='Site (ID)',
  137. )
  138. site = django_filters.ModelMultipleChoiceFilter(
  139. field_name='site__slug',
  140. queryset=Site.objects.all(),
  141. to_field_name='slug',
  142. label='Site (slug)',
  143. )
  144. vlan_id = django_filters.ModelMultipleChoiceFilter(
  145. queryset=VLAN.objects.all(),
  146. label='VLAN (ID)',
  147. )
  148. vlan_vid = django_filters.NumberFilter(
  149. field_name='vlan__vid',
  150. label='VLAN number (1-4095)',
  151. )
  152. role_id = django_filters.ModelMultipleChoiceFilter(
  153. queryset=Role.objects.all(),
  154. label='Role (ID)',
  155. )
  156. role = django_filters.ModelMultipleChoiceFilter(
  157. field_name='role__slug',
  158. queryset=Role.objects.all(),
  159. to_field_name='slug',
  160. label='Role (slug)',
  161. )
  162. status = django_filters.MultipleChoiceFilter(
  163. choices=PREFIX_STATUS_CHOICES,
  164. null_value=None
  165. )
  166. tag = TagFilter()
  167. class Meta:
  168. model = Prefix
  169. fields = ['family', 'is_pool']
  170. def search(self, queryset, name, value):
  171. if not value.strip():
  172. return queryset
  173. qs_filter = Q(description__icontains=value)
  174. try:
  175. prefix = str(netaddr.IPNetwork(value.strip()).cidr)
  176. qs_filter |= Q(prefix__net_contains_or_equals=prefix)
  177. except (AddrFormatError, ValueError):
  178. pass
  179. return queryset.filter(qs_filter)
  180. def filter_prefix(self, queryset, name, value):
  181. if not value.strip():
  182. return queryset
  183. try:
  184. query = str(netaddr.IPNetwork(value).cidr)
  185. return queryset.filter(prefix=query)
  186. except ValidationError:
  187. return queryset.none()
  188. def search_within(self, queryset, name, value):
  189. value = value.strip()
  190. if not value:
  191. return queryset
  192. try:
  193. query = str(netaddr.IPNetwork(value).cidr)
  194. return queryset.filter(prefix__net_contained=query)
  195. except (AddrFormatError, ValueError):
  196. return queryset.none()
  197. def search_within_include(self, queryset, name, value):
  198. value = value.strip()
  199. if not value:
  200. return queryset
  201. try:
  202. query = str(netaddr.IPNetwork(value).cidr)
  203. return queryset.filter(prefix__net_contained_or_equal=query)
  204. except (AddrFormatError, ValueError):
  205. return queryset.none()
  206. def search_contains(self, queryset, name, value):
  207. value = value.strip()
  208. if not value:
  209. return queryset
  210. try:
  211. # Searching by prefix
  212. if '/' in value:
  213. return queryset.filter(prefix__net_contains_or_equals=str(netaddr.IPNetwork(value).cidr))
  214. # Searching by IP address
  215. else:
  216. return queryset.filter(prefix__net_contains=str(netaddr.IPAddress(value)))
  217. except (AddrFormatError, ValueError):
  218. return queryset.none()
  219. def filter_mask_length(self, queryset, name, value):
  220. if not value:
  221. return queryset
  222. return queryset.filter(prefix__net_mask_length=value)
  223. class IPAddressFilter(TenancyFilterSet, CustomFieldFilterSet, ChangeLoggedFilter):
  224. id__in = NumericInFilter(
  225. field_name='id',
  226. lookup_expr='in'
  227. )
  228. q = django_filters.CharFilter(
  229. method='search',
  230. label='Search',
  231. )
  232. parent = django_filters.CharFilter(
  233. method='search_by_parent',
  234. label='Parent prefix',
  235. )
  236. address = django_filters.CharFilter(
  237. method='filter_address',
  238. label='Address',
  239. )
  240. mask_length = django_filters.NumberFilter(
  241. method='filter_mask_length',
  242. label='Mask length',
  243. )
  244. vrf_id = django_filters.ModelMultipleChoiceFilter(
  245. queryset=VRF.objects.all(),
  246. label='VRF',
  247. )
  248. vrf = django_filters.ModelMultipleChoiceFilter(
  249. field_name='vrf__rd',
  250. queryset=VRF.objects.all(),
  251. to_field_name='rd',
  252. label='VRF (RD)',
  253. )
  254. device = django_filters.CharFilter(
  255. method='filter_device',
  256. field_name='name',
  257. label='Device',
  258. )
  259. device_id = django_filters.NumberFilter(
  260. method='filter_device',
  261. field_name='pk',
  262. label='Device (ID)',
  263. )
  264. virtual_machine_id = django_filters.ModelMultipleChoiceFilter(
  265. field_name='interface__virtual_machine',
  266. queryset=VirtualMachine.objects.all(),
  267. label='Virtual machine (ID)',
  268. )
  269. virtual_machine = django_filters.ModelMultipleChoiceFilter(
  270. field_name='interface__virtual_machine__name',
  271. queryset=VirtualMachine.objects.all(),
  272. to_field_name='name',
  273. label='Virtual machine (name)',
  274. )
  275. interface = django_filters.ModelMultipleChoiceFilter(
  276. field_name='interface__name',
  277. queryset=Interface.objects.all(),
  278. to_field_name='name',
  279. label='Interface (ID)',
  280. )
  281. interface_id = django_filters.ModelMultipleChoiceFilter(
  282. queryset=Interface.objects.all(),
  283. label='Interface (ID)',
  284. )
  285. status = django_filters.MultipleChoiceFilter(
  286. choices=IPADDRESS_STATUS_CHOICES,
  287. null_value=None
  288. )
  289. role = django_filters.MultipleChoiceFilter(
  290. choices=IPADDRESS_ROLE_CHOICES
  291. )
  292. tag = TagFilter()
  293. class Meta:
  294. model = IPAddress
  295. fields = ['family', 'dns_name']
  296. def search(self, queryset, name, value):
  297. if not value.strip():
  298. return queryset
  299. qs_filter = (
  300. Q(dns_name__icontains=value) |
  301. Q(description__icontains=value) |
  302. Q(address__istartswith=value)
  303. )
  304. return queryset.filter(qs_filter)
  305. def search_by_parent(self, queryset, name, value):
  306. value = value.strip()
  307. if not value:
  308. return queryset
  309. try:
  310. query = str(netaddr.IPNetwork(value.strip()).cidr)
  311. return queryset.filter(address__net_host_contained=query)
  312. except (AddrFormatError, ValueError):
  313. return queryset.none()
  314. def filter_address(self, queryset, name, value):
  315. if not value.strip():
  316. return queryset
  317. try:
  318. # Match address and subnet mask
  319. if '/' in value:
  320. return queryset.filter(address=value)
  321. return queryset.filter(address__net_host=value)
  322. except ValidationError:
  323. return queryset.none()
  324. def filter_mask_length(self, queryset, name, value):
  325. if not value:
  326. return queryset
  327. return queryset.filter(address__net_mask_length=value)
  328. def filter_device(self, queryset, name, value):
  329. try:
  330. device = Device.objects.prefetch_related('device_type').get(**{name: value})
  331. vc_interface_ids = [i['id'] for i in device.vc_interfaces.values('id')]
  332. return queryset.filter(interface_id__in=vc_interface_ids)
  333. except Device.DoesNotExist:
  334. return queryset.none()
  335. class VLANGroupFilter(NameSlugSearchFilterSet):
  336. site_id = django_filters.ModelMultipleChoiceFilter(
  337. queryset=Site.objects.all(),
  338. label='Site (ID)',
  339. )
  340. site = django_filters.ModelMultipleChoiceFilter(
  341. field_name='site__slug',
  342. queryset=Site.objects.all(),
  343. to_field_name='slug',
  344. label='Site (slug)',
  345. )
  346. class Meta:
  347. model = VLANGroup
  348. fields = ['id', 'name', 'slug']
  349. class VLANFilter(TenancyFilterSet, CustomFieldFilterSet, ChangeLoggedFilter):
  350. id__in = NumericInFilter(
  351. field_name='id',
  352. lookup_expr='in'
  353. )
  354. q = django_filters.CharFilter(
  355. method='search',
  356. label='Search',
  357. )
  358. site_id = django_filters.ModelMultipleChoiceFilter(
  359. queryset=Site.objects.all(),
  360. label='Site (ID)',
  361. )
  362. site = django_filters.ModelMultipleChoiceFilter(
  363. field_name='site__slug',
  364. queryset=Site.objects.all(),
  365. to_field_name='slug',
  366. label='Site (slug)',
  367. )
  368. group_id = django_filters.ModelMultipleChoiceFilter(
  369. queryset=VLANGroup.objects.all(),
  370. label='Group (ID)',
  371. )
  372. group = django_filters.ModelMultipleChoiceFilter(
  373. field_name='group__slug',
  374. queryset=VLANGroup.objects.all(),
  375. to_field_name='slug',
  376. label='Group',
  377. )
  378. role_id = django_filters.ModelMultipleChoiceFilter(
  379. queryset=Role.objects.all(),
  380. label='Role (ID)',
  381. )
  382. role = django_filters.ModelMultipleChoiceFilter(
  383. field_name='role__slug',
  384. queryset=Role.objects.all(),
  385. to_field_name='slug',
  386. label='Role (slug)',
  387. )
  388. status = django_filters.MultipleChoiceFilter(
  389. choices=VLAN_STATUS_CHOICES,
  390. null_value=None
  391. )
  392. tag = TagFilter()
  393. class Meta:
  394. model = VLAN
  395. fields = ['vid', 'name']
  396. def search(self, queryset, name, value):
  397. if not value.strip():
  398. return queryset
  399. qs_filter = Q(name__icontains=value) | Q(description__icontains=value)
  400. try:
  401. qs_filter |= Q(vid=int(value.strip()))
  402. except ValueError:
  403. pass
  404. return queryset.filter(qs_filter)
  405. class ServiceFilter(ChangeLoggedFilter):
  406. q = django_filters.CharFilter(
  407. method='search',
  408. label='Search',
  409. )
  410. device_id = django_filters.ModelMultipleChoiceFilter(
  411. queryset=Device.objects.all(),
  412. label='Device (ID)',
  413. )
  414. device = django_filters.ModelMultipleChoiceFilter(
  415. field_name='device__name',
  416. queryset=Device.objects.all(),
  417. to_field_name='name',
  418. label='Device (name)',
  419. )
  420. virtual_machine_id = django_filters.ModelMultipleChoiceFilter(
  421. queryset=VirtualMachine.objects.all(),
  422. label='Virtual machine (ID)',
  423. )
  424. virtual_machine = django_filters.ModelMultipleChoiceFilter(
  425. field_name='virtual_machine__name',
  426. queryset=VirtualMachine.objects.all(),
  427. to_field_name='name',
  428. label='Virtual machine (name)',
  429. )
  430. tag = TagFilter()
  431. class Meta:
  432. model = Service
  433. fields = ['id', 'name', 'protocol', 'port']
  434. def search(self, queryset, name, value):
  435. if not value.strip():
  436. return queryset
  437. qs_filter = Q(name__icontains=value) | Q(description__icontains=value)
  438. return queryset.filter(qs_filter)