model_forms.py 16 KB


  1. from django import forms
  2. from django.core.exceptions import ValidationError
  3. from django.utils.translation import gettext_lazy as _
  4. from dcim.models import Device, Interface
  5. from ipam.models import IPAddress, RouteTarget, VLAN
  6. from netbox.forms import NetBoxModelForm
  7. from tenancy.forms import TenancyForm
  8. from utilities.forms.fields import CommentField, DynamicModelChoiceField, DynamicModelMultipleChoiceField, SlugField
  9. from utilities.forms.rendering import FieldSet, TabbedGroups
  10. from utilities.forms.utils import add_blank_choice, get_field_value
  11. from utilities.forms.widgets import HTMXSelect
  12. from virtualization.models import VirtualMachine, VMInterface
  13. from vpn.choices import *
  14. from vpn.models import *
  15. __all__ = (
  16. 'IKEPolicyForm',
  17. 'IKEProposalForm',
  18. 'IPSecPolicyForm',
  19. 'IPSecProfileForm',
  20. 'IPSecProposalForm',
  21. 'L2VPNForm',
  22. 'L2VPNTerminationForm',
  23. 'TunnelCreateForm',
  24. 'TunnelForm',
  25. 'TunnelGroupForm',
  26. 'TunnelTerminationForm',
  27. )
  28. class TunnelGroupForm(NetBoxModelForm):
  29. slug = SlugField()
  30. fieldsets = (
  31. FieldSet('name', 'slug', 'description', 'tags', name=_('Tunnel Group')),
  32. )
  33. class Meta:
  34. model = TunnelGroup
  35. fields = [
  36. 'name', 'slug', 'description', 'tags',
  37. ]
  38. class TunnelForm(TenancyForm, NetBoxModelForm):
  39. group = DynamicModelChoiceField(
  40. queryset=TunnelGroup.objects.all(),
  41. label=_('Tunnel Group'),
  42. required=False,
  43. quick_add=True
  44. )
  45. ipsec_profile = DynamicModelChoiceField(
  46. queryset=IPSecProfile.objects.all(),
  47. label=_('IPSec Profile'),
  48. required=False
  49. )
  50. comments = CommentField()
  51. fieldsets = (
  52. FieldSet('name', 'status', 'group', 'encapsulation', 'description', 'tunnel_id', 'tags', name=_('Tunnel')),
  53. FieldSet('ipsec_profile', name=_('Security')),
  54. FieldSet('tenant_group', 'tenant', name=_('Tenancy')),
  55. )
  56. class Meta:
  57. model = Tunnel
  58. fields = [
  59. 'name', 'status', 'group', 'encapsulation', 'description', 'tunnel_id', 'ipsec_profile', 'tenant_group',
  60. 'tenant', 'comments', 'tags',
  61. ]
  62. class TunnelCreateForm(TunnelForm):
  63. # First termination
  64. termination1_role = forms.ChoiceField(
  65. choices=add_blank_choice(TunnelTerminationRoleChoices),
  66. required=False,
  67. label=_('Role')
  68. )
  69. termination1_type = forms.ChoiceField(
  70. choices=TunnelTerminationTypeChoices,
  71. required=False,
  72. widget=HTMXSelect(),
  73. label=_('Type')
  74. )
  75. termination1_parent = DynamicModelChoiceField(
  76. queryset=Device.objects.all(),
  77. required=False,
  78. selector=True,
  79. label=_('Device')
  80. )
  81. termination1_termination = DynamicModelChoiceField(
  82. queryset=Interface.objects.all(),
  83. required=False,
  84. label=_('Tunnel interface'),
  85. query_params={
  86. 'device_id': '$termination1_parent',
  87. }
  88. )
  89. termination1_outside_ip = DynamicModelChoiceField(
  90. queryset=IPAddress.objects.all(),
  91. label=_('Outside IP'),
  92. required=False,
  93. query_params={
  94. 'device_id': '$termination1_parent',
  95. }
  96. )
  97. # Second termination
  98. termination2_role = forms.ChoiceField(
  99. choices=add_blank_choice(TunnelTerminationRoleChoices),
  100. required=False,
  101. label=_('Role')
  102. )
  103. termination2_type = forms.ChoiceField(
  104. choices=TunnelTerminationTypeChoices,
  105. required=False,
  106. widget=HTMXSelect(),
  107. label=_('Type')
  108. )
  109. termination2_parent = DynamicModelChoiceField(
  110. queryset=Device.objects.all(),
  111. required=False,
  112. selector=True,
  113. label=_('Device')
  114. )
  115. termination2_termination = DynamicModelChoiceField(
  116. queryset=Interface.objects.all(),
  117. required=False,
  118. label=_('Tunnel interface'),
  119. query_params={
  120. 'device_id': '$termination2_parent',
  121. }
  122. )
  123. termination2_outside_ip = DynamicModelChoiceField(
  124. queryset=IPAddress.objects.all(),
  125. required=False,
  126. label=_('Outside IP'),
  127. query_params={
  128. 'device_id': '$termination2_parent',
  129. }
  130. )
  131. fieldsets = (
  132. FieldSet('name', 'status', 'group', 'encapsulation', 'description', 'tunnel_id', 'tags', name=_('Tunnel')),
  133. FieldSet('ipsec_profile', name=_('Security')),
  134. FieldSet('tenant_group', 'tenant', name=_('Tenancy')),
  135. FieldSet(
  136. 'termination1_role', 'termination1_type', 'termination1_parent', 'termination1_termination',
  137. 'termination1_outside_ip', name=_('First Termination')),
  138. FieldSet(
  139. 'termination2_role', 'termination2_type', 'termination2_parent', 'termination2_termination',
  140. 'termination2_outside_ip', name=_('Second Termination')),
  141. )
  142. def __init__(self, *args, initial=None, **kwargs):
  143. super().__init__(*args, initial=initial, **kwargs)
  144. if get_field_value(self, 'termination1_type') == TunnelTerminationTypeChoices.TYPE_VIRTUALMACHINE:
  145. self.fields['termination1_parent'].label = _('Virtual Machine')
  146. self.fields['termination1_parent'].queryset = VirtualMachine.objects.all()
  147. self.fields['termination1_termination'].queryset = VMInterface.objects.all()
  148. self.fields['termination1_termination'].widget.add_query_params({
  149. 'virtual_machine_id': '$termination1_parent',
  150. })
  151. self.fields['termination1_outside_ip'].widget.add_query_params({
  152. 'virtual_machine_id': '$termination1_parent',
  153. })
  154. if get_field_value(self, 'termination2_type') == TunnelTerminationTypeChoices.TYPE_VIRTUALMACHINE:
  155. self.fields['termination2_parent'].label = _('Virtual Machine')
  156. self.fields['termination2_parent'].queryset = VirtualMachine.objects.all()
  157. self.fields['termination2_termination'].queryset = VMInterface.objects.all()
  158. self.fields['termination2_termination'].widget.add_query_params({
  159. 'virtual_machine_id': '$termination2_parent',
  160. })
  161. self.fields['termination2_outside_ip'].widget.add_query_params({
  162. 'virtual_machine_id': '$termination2_parent',
  163. })
  164. def clean(self):
  165. super().clean()
  166. # Validate attributes for each termination (if any)
  167. for term in ('termination1', 'termination2'):
  168. required_parameters = (
  169. f'{term}_role', f'{term}_parent', f'{term}_termination',
  170. )
  171. parameters = (
  172. *required_parameters,
  173. f'{term}_outside_ip',
  174. )
  175. if any([self.cleaned_data[param] for param in parameters]):
  176. for param in required_parameters:
  177. if not self.cleaned_data[param]:
  178. raise forms.ValidationError({
  179. param: _("This parameter is required when defining a termination.")
  180. })
  181. def save(self, *args, **kwargs):
  182. instance = super().save(*args, **kwargs)
  183. # Create first termination
  184. if self.cleaned_data['termination1_termination']:
  185. TunnelTermination.objects.create(
  186. tunnel=instance,
  187. role=self.cleaned_data['termination1_role'],
  188. termination=self.cleaned_data['termination1_termination'],
  189. outside_ip=self.cleaned_data['termination1_outside_ip'],
  190. )
  191. # Create second termination, if defined
  192. if self.cleaned_data['termination2_termination']:
  193. TunnelTermination.objects.create(
  194. tunnel=instance,
  195. role=self.cleaned_data['termination2_role'],
  196. termination=self.cleaned_data['termination2_termination'],
  197. outside_ip=self.cleaned_data.get('termination2_outside_ip'),
  198. )
  199. return instance
  200. class TunnelTerminationForm(NetBoxModelForm):
  201. tunnel = DynamicModelChoiceField(
  202. queryset=Tunnel.objects.all()
  203. )
  204. type = forms.ChoiceField(
  205. choices=TunnelTerminationTypeChoices,
  206. widget=HTMXSelect(),
  207. label=_('Type')
  208. )
  209. parent = DynamicModelChoiceField(
  210. queryset=Device.objects.all(),
  211. selector=True,
  212. label=_('Device')
  213. )
  214. termination = DynamicModelChoiceField(
  215. queryset=Interface.objects.all(),
  216. label=_('Tunnel interface'),
  217. query_params={
  218. 'device_id': '$parent',
  219. }
  220. )
  221. outside_ip = DynamicModelChoiceField(
  222. queryset=IPAddress.objects.all(),
  223. label=_('Outside IP'),
  224. required=False,
  225. query_params={
  226. 'device_id': '$parent',
  227. }
  228. )
  229. fieldsets = (
  230. FieldSet('tunnel', 'role', 'type', 'parent', 'termination', 'outside_ip', 'tags'),
  231. )
  232. class Meta:
  233. model = TunnelTermination
  234. fields = [
  235. 'tunnel', 'role', 'outside_ip', 'tags',
  236. ]
  237. def __init__(self, *args, initial=None, **kwargs):
  238. super().__init__(*args, initial=initial, **kwargs)
  239. if (get_field_value(self, 'type') is None and
  240. self.instance.pk and isinstance(self.instance.termination.parent_object, VirtualMachine)):
  241. self.fields['type'].initial = TunnelTerminationTypeChoices.TYPE_VIRTUALMACHINE
  242. # If initial or self.data is set and the type is a VIRTUALMACHINE type, swap the field querysets.
  243. if get_field_value(self, 'type') == TunnelTerminationTypeChoices.TYPE_VIRTUALMACHINE:
  244. self.fields['parent'].label = _('Virtual Machine')
  245. self.fields['parent'].queryset = VirtualMachine.objects.all()
  246. self.fields['parent'].widget.attrs['selector'] = 'virtualization.virtualmachine'
  247. self.fields['termination'].queryset = VMInterface.objects.all()
  248. self.fields['termination'].widget.add_query_params({
  249. 'virtual_machine_id': '$parent',
  250. })
  251. self.fields['outside_ip'].widget.add_query_params({
  252. 'virtual_machine_id': '$parent',
  253. })
  254. if self.instance.pk:
  255. self.fields['parent'].initial = self.instance.termination.parent_object
  256. self.fields['termination'].initial = self.instance.termination
  257. def clean(self):
  258. super().clean()
  259. # Set the terminated object
  260. self.instance.termination = self.cleaned_data.get('termination')
  261. class IKEProposalForm(NetBoxModelForm):
  262. fieldsets = (
  263. FieldSet('name', 'description', 'tags', name=_('Proposal')),
  264. FieldSet(
  265. 'authentication_method', 'encryption_algorithm', 'authentication_algorithm', 'group', 'sa_lifetime',
  266. name=_('Parameters')
  267. ),
  268. )
  269. class Meta:
  270. model = IKEProposal
  271. fields = [
  272. 'name', 'description', 'authentication_method', 'encryption_algorithm', 'authentication_algorithm', 'group',
  273. 'sa_lifetime', 'comments', 'tags',
  274. ]
  275. class IKEPolicyForm(NetBoxModelForm):
  276. proposals = DynamicModelMultipleChoiceField(
  277. queryset=IKEProposal.objects.all(),
  278. label=_('Proposals'),
  279. quick_add=True
  280. )
  281. fieldsets = (
  282. FieldSet('name', 'description', 'tags', name=_('Policy')),
  283. FieldSet('version', 'mode', 'proposals', 'preshared_key', name=_('Parameters')),
  284. )
  285. class Meta:
  286. model = IKEPolicy
  287. fields = [
  288. 'name', 'description', 'version', 'mode', 'proposals', 'preshared_key', 'comments', 'tags',
  289. ]
  290. class IPSecProposalForm(NetBoxModelForm):
  291. fieldsets = (
  292. FieldSet('name', 'description', 'tags', name=_('Proposal')),
  293. FieldSet(
  294. 'encryption_algorithm', 'authentication_algorithm', 'sa_lifetime_seconds', 'sa_lifetime_data',
  295. name=_('Parameters')
  296. ),
  297. )
  298. class Meta:
  299. model = IPSecProposal
  300. fields = [
  301. 'name', 'description', 'encryption_algorithm', 'authentication_algorithm', 'sa_lifetime_seconds',
  302. 'sa_lifetime_data', 'comments', 'tags',
  303. ]
  304. class IPSecPolicyForm(NetBoxModelForm):
  305. proposals = DynamicModelMultipleChoiceField(
  306. queryset=IPSecProposal.objects.all(),
  307. label=_('Proposals'),
  308. quick_add=True
  309. )
  310. fieldsets = (
  311. FieldSet('name', 'description', 'tags', name=_('Policy')),
  312. FieldSet('proposals', 'pfs_group', name=_('Parameters')),
  313. )
  314. class Meta:
  315. model = IPSecPolicy
  316. fields = [
  317. 'name', 'description', 'proposals', 'pfs_group', 'comments', 'tags',
  318. ]
  319. class IPSecProfileForm(NetBoxModelForm):
  320. ike_policy = DynamicModelChoiceField(
  321. queryset=IKEPolicy.objects.all(),
  322. label=_('IKE policy')
  323. )
  324. ipsec_policy = DynamicModelChoiceField(
  325. queryset=IPSecPolicy.objects.all(),
  326. label=_('IPSec policy')
  327. )
  328. comments = CommentField()
  329. fieldsets = (
  330. FieldSet('name', 'description', 'tags', name=_('Profile')),
  331. FieldSet('mode', 'ike_policy', 'ipsec_policy', name=_('Parameters')),
  332. )
  333. class Meta:
  334. model = IPSecProfile
  335. fields = [
  336. 'name', 'description', 'mode', 'ike_policy', 'ipsec_policy', 'description', 'comments', 'tags',
  337. ]
  338. #
  339. # L2VPN
  340. #
  341. class L2VPNForm(TenancyForm, NetBoxModelForm):
  342. slug = SlugField()
  343. import_targets = DynamicModelMultipleChoiceField(
  344. label=_('Import targets'),
  345. queryset=RouteTarget.objects.all(),
  346. required=False
  347. )
  348. export_targets = DynamicModelMultipleChoiceField(
  349. label=_('Export targets'),
  350. queryset=RouteTarget.objects.all(),
  351. required=False
  352. )
  353. comments = CommentField()
  354. fieldsets = (
  355. FieldSet('name', 'slug', 'type', 'identifier', 'description', 'tags', name=_('L2VPN')),
  356. FieldSet('import_targets', 'export_targets', name=_('Route Targets')),
  357. FieldSet('tenant_group', 'tenant', name=_('Tenancy')),
  358. )
  359. class Meta:
  360. model = L2VPN
  361. fields = (
  362. 'name', 'slug', 'type', 'identifier', 'import_targets', 'export_targets', 'tenant', 'description',
  363. 'comments', 'tags'
  364. )
  365. class L2VPNTerminationForm(NetBoxModelForm):
  366. l2vpn = DynamicModelChoiceField(
  367. queryset=L2VPN.objects.all(),
  368. required=True,
  369. query_params={},
  370. label=_('L2VPN')
  371. )
  372. vlan = DynamicModelChoiceField(
  373. queryset=VLAN.objects.all(),
  374. required=False,
  375. selector=True,
  376. label=_('VLAN')
  377. )
  378. interface = DynamicModelChoiceField(
  379. label=_('Interface'),
  380. queryset=Interface.objects.all(),
  381. required=False,
  382. selector=True
  383. )
  384. vminterface = DynamicModelChoiceField(
  385. queryset=VMInterface.objects.all(),
  386. required=False,
  387. selector=True,
  388. label=_('Interface')
  389. )
  390. fieldsets = (
  391. FieldSet(
  392. 'l2vpn',
  393. TabbedGroups(
  394. FieldSet('vlan', name=_('VLAN')),
  395. FieldSet('interface', name=_('Device')),
  396. FieldSet('vminterface', name=_('Virtual Machine')),
  397. ),
  398. 'tags',
  399. ),
  400. )
  401. class Meta:
  402. model = L2VPNTermination
  403. fields = ('l2vpn', 'tags')
  404. def __init__(self, *args, **kwargs):
  405. instance = kwargs.get('instance')
  406. initial = kwargs.get('initial', {}).copy()
  407. if instance:
  408. if type(instance.assigned_object) is Interface:
  409. initial['interface'] = instance.assigned_object
  410. elif type(instance.assigned_object) is VLAN:
  411. initial['vlan'] = instance.assigned_object
  412. elif type(instance.assigned_object) is VMInterface:
  413. initial['vminterface'] = instance.assigned_object
  414. kwargs['initial'] = initial
  415. super().__init__(*args, **kwargs)
  416. def clean(self):
  417. super().clean()
  418. interface = self.cleaned_data.get('interface')
  419. vminterface = self.cleaned_data.get('vminterface')
  420. vlan = self.cleaned_data.get('vlan')
  421. if not (interface or vminterface or vlan):
  422. raise ValidationError(_('A termination must specify an interface or VLAN.'))
  423. if len([x for x in (interface, vminterface, vlan) if x]) > 1:
  424. raise ValidationError(_('A termination can only have one terminating object (an interface or VLAN).'))
  425. self.instance.assigned_object = interface or vminterface or vlan