model_forms.py 16 KB


  1. from django import forms
  2. from django.core.exceptions import ValidationError
  3. from django.utils.translation import gettext_lazy as _
  4. from dcim.models import Device, Interface
  5. from ipam.models import IPAddress, RouteTarget, VLAN
  6. from netbox.forms import NetBoxModelForm
  7. from tenancy.forms import TenancyForm
  8. from utilities.forms.fields import CommentField, DynamicModelChoiceField, DynamicModelMultipleChoiceField, SlugField
  9. from utilities.forms.utils import add_blank_choice, get_field_value
  10. from utilities.forms.widgets import HTMXSelect
  11. from virtualization.models import VirtualMachine, VMInterface
  12. from vpn.choices import *
  13. from vpn.models import *
  14. __all__ = (
  15. 'IKEPolicyForm',
  16. 'IKEProposalForm',
  17. 'IPSecPolicyForm',
  18. 'IPSecProfileForm',
  19. 'IPSecProposalForm',
  20. 'L2VPNForm',
  21. 'L2VPNTerminationForm',
  22. 'TunnelCreateForm',
  23. 'TunnelForm',
  24. 'TunnelGroupForm',
  25. 'TunnelTerminationForm',
  26. )
  27. class TunnelGroupForm(NetBoxModelForm):
  28. slug = SlugField()
  29. fieldsets = (
  30. (_('Tunnel Group'), ('name', 'slug', 'description', 'tags')),
  31. )
  32. class Meta:
  33. model = TunnelGroup
  34. fields = [
  35. 'name', 'slug', 'description', 'tags',
  36. ]
  37. class TunnelForm(TenancyForm, NetBoxModelForm):
  38. group = DynamicModelChoiceField(
  39. queryset=TunnelGroup.objects.all(),
  40. label=_('Tunnel Group'),
  41. required=False
  42. )
  43. ipsec_profile = DynamicModelChoiceField(
  44. queryset=IPSecProfile.objects.all(),
  45. label=_('IPSec Profile'),
  46. required=False
  47. )
  48. comments = CommentField()
  49. fieldsets = (
  50. (_('Tunnel'), ('name', 'status', 'group', 'encapsulation', 'description', 'tunnel_id', 'tags')),
  51. (_('Security'), ('ipsec_profile',)),
  52. (_('Tenancy'), ('tenant_group', 'tenant')),
  53. )
  54. class Meta:
  55. model = Tunnel
  56. fields = [
  57. 'name', 'status', 'group', 'encapsulation', 'description', 'tunnel_id', 'ipsec_profile', 'tenant_group',
  58. 'tenant', 'comments', 'tags',
  59. ]
  60. class TunnelCreateForm(TunnelForm):
  61. # First termination
  62. termination1_role = forms.ChoiceField(
  63. choices=add_blank_choice(TunnelTerminationRoleChoices),
  64. required=False,
  65. label=_('Role')
  66. )
  67. termination1_type = forms.ChoiceField(
  68. choices=TunnelTerminationTypeChoices,
  69. required=False,
  70. widget=HTMXSelect(),
  71. label=_('Type')
  72. )
  73. termination1_parent = DynamicModelChoiceField(
  74. queryset=Device.objects.all(),
  75. required=False,
  76. selector=True,
  77. label=_('Device')
  78. )
  79. termination1_termination = DynamicModelChoiceField(
  80. queryset=Interface.objects.all(),
  81. required=False,
  82. label=_('Interface'),
  83. query_params={
  84. 'device_id': '$termination1_parent',
  85. }
  86. )
  87. termination1_outside_ip = DynamicModelChoiceField(
  88. queryset=IPAddress.objects.all(),
  89. label=_('Outside IP'),
  90. required=False,
  91. query_params={
  92. 'device_id': '$termination1_parent',
  93. }
  94. )
  95. # Second termination
  96. termination2_role = forms.ChoiceField(
  97. choices=add_blank_choice(TunnelTerminationRoleChoices),
  98. required=False,
  99. label=_('Role')
  100. )
  101. termination2_type = forms.ChoiceField(
  102. choices=TunnelTerminationTypeChoices,
  103. required=False,
  104. widget=HTMXSelect(),
  105. label=_('Type')
  106. )
  107. termination2_parent = DynamicModelChoiceField(
  108. queryset=Device.objects.all(),
  109. required=False,
  110. selector=True,
  111. label=_('Device')
  112. )
  113. termination2_termination = DynamicModelChoiceField(
  114. queryset=Interface.objects.all(),
  115. required=False,
  116. label=_('Interface'),
  117. query_params={
  118. 'device_id': '$termination2_parent',
  119. }
  120. )
  121. termination2_outside_ip = DynamicModelChoiceField(
  122. queryset=IPAddress.objects.all(),
  123. required=False,
  124. label=_('Outside IP'),
  125. query_params={
  126. 'device_id': '$termination2_parent',
  127. }
  128. )
  129. fieldsets = (
  130. (_('Tunnel'), ('name', 'status', 'group', 'encapsulation', 'description', 'tunnel_id', 'tags')),
  131. (_('Security'), ('ipsec_profile',)),
  132. (_('Tenancy'), ('tenant_group', 'tenant')),
  133. (_('First Termination'), (
  134. 'termination1_role', 'termination1_type', 'termination1_parent', 'termination1_termination',
  135. 'termination1_outside_ip',
  136. )),
  137. (_('Second Termination'), (
  138. 'termination2_role', 'termination2_type', 'termination2_parent', 'termination2_termination',
  139. 'termination2_outside_ip',
  140. )),
  141. )
  142. def __init__(self, *args, initial=None, **kwargs):
  143. super().__init__(*args, initial=initial, **kwargs)
  144. if get_field_value(self, 'termination1_type') == TunnelTerminationTypeChoices.TYPE_VIRTUALMACHINE:
  145. self.fields['termination1_parent'].label = _('Virtual Machine')
  146. self.fields['termination1_parent'].queryset = VirtualMachine.objects.all()
  147. self.fields['termination1_termination'].queryset = VMInterface.objects.all()
  148. self.fields['termination1_termination'].widget.add_query_params({
  149. 'virtual_machine_id': '$termination1_parent',
  150. })
  151. self.fields['termination1_outside_ip'].widget.add_query_params({
  152. 'virtual_machine_id': '$termination1_parent',
  153. })
  154. if get_field_value(self, 'termination2_type') == TunnelTerminationTypeChoices.TYPE_VIRTUALMACHINE:
  155. self.fields['termination2_parent'].label = _('Virtual Machine')
  156. self.fields['termination2_parent'].queryset = VirtualMachine.objects.all()
  157. self.fields['termination2_termination'].queryset = VMInterface.objects.all()
  158. self.fields['termination2_termination'].widget.add_query_params({
  159. 'virtual_machine_id': '$termination2_parent',
  160. })
  161. self.fields['termination2_outside_ip'].widget.add_query_params({
  162. 'virtual_machine_id': '$termination2_parent',
  163. })
  164. def clean(self):
  165. super().clean()
  166. # Validate attributes for each termination (if any)
  167. for term in ('termination1', 'termination2'):
  168. required_parameters = (
  169. f'{term}_role', f'{term}_parent', f'{term}_termination',
  170. )
  171. parameters = (
  172. *required_parameters,
  173. f'{term}_outside_ip',
  174. )
  175. if any([self.cleaned_data[param] for param in parameters]):
  176. for param in required_parameters:
  177. if not self.cleaned_data[param]:
  178. raise forms.ValidationError({
  179. param: _("This parameter is required when defining a termination.")
  180. })
  181. def save(self, *args, **kwargs):
  182. instance = super().save(*args, **kwargs)
  183. # Create first termination
  184. if self.cleaned_data['termination1_termination']:
  185. TunnelTermination.objects.create(
  186. tunnel=instance,
  187. role=self.cleaned_data['termination1_role'],
  188. termination=self.cleaned_data['termination1_termination'],
  189. outside_ip=self.cleaned_data['termination1_outside_ip'],
  190. )
  191. # Create second termination, if defined
  192. if self.cleaned_data['termination2_termination']:
  193. TunnelTermination.objects.create(
  194. tunnel=instance,
  195. role=self.cleaned_data['termination2_role'],
  196. termination=self.cleaned_data['termination2_termination'],
  197. outside_ip=self.cleaned_data.get('termination2_outside_ip'),
  198. )
  199. return instance
  200. class TunnelTerminationForm(NetBoxModelForm):
  201. tunnel = DynamicModelChoiceField(
  202. queryset=Tunnel.objects.all()
  203. )
  204. type = forms.ChoiceField(
  205. choices=TunnelTerminationTypeChoices,
  206. widget=HTMXSelect(),
  207. label=_('Type')
  208. )
  209. parent = DynamicModelChoiceField(
  210. queryset=Device.objects.all(),
  211. selector=True,
  212. label=_('Device')
  213. )
  214. termination = DynamicModelChoiceField(
  215. queryset=Interface.objects.all(),
  216. label=_('Interface'),
  217. query_params={
  218. 'device_id': '$parent',
  219. }
  220. )
  221. outside_ip = DynamicModelChoiceField(
  222. queryset=IPAddress.objects.all(),
  223. label=_('Outside IP'),
  224. required=False,
  225. query_params={
  226. 'device_id': '$parent',
  227. }
  228. )
  229. fieldsets = (
  230. (None, ('tunnel', 'role', 'type', 'parent', 'termination', 'outside_ip', 'tags')),
  231. )
  232. class Meta:
  233. model = TunnelTermination
  234. fields = [
  235. 'tunnel', 'role', 'termination', 'outside_ip', 'tags',
  236. ]
  237. def __init__(self, *args, initial=None, **kwargs):
  238. super().__init__(*args, initial=initial, **kwargs)
  239. if (get_field_value(self, 'type') is None and
  240. self.instance.pk and isinstance(self.instance.termination.parent_object, VirtualMachine)):
  241. self.fields['type'].initial = TunnelTerminationTypeChoices.TYPE_VIRTUALMACHINE
  242. # If initial or self.data is set and the type is a VIRTUALMACHINE type, swap the field querysets.
  243. if get_field_value(self, 'type') == TunnelTerminationTypeChoices.TYPE_VIRTUALMACHINE:
  244. self.fields['parent'].label = _('Virtual Machine')
  245. self.fields['parent'].queryset = VirtualMachine.objects.all()
  246. self.fields['parent'].widget.attrs['selector'] = 'virtualization.virtualmachine'
  247. self.fields['termination'].queryset = VMInterface.objects.all()
  248. self.fields['termination'].widget.add_query_params({
  249. 'virtual_machine_id': '$parent',
  250. })
  251. self.fields['outside_ip'].widget.add_query_params({
  252. 'virtual_machine_id': '$parent',
  253. })
  254. if self.instance.pk:
  255. self.fields['parent'].initial = self.instance.termination.parent_object
  256. self.fields['termination'].initial = self.instance.termination
  257. def clean(self):
  258. super().clean()
  259. # Set the terminated object
  260. self.instance.termination = self.cleaned_data.get('termination')
  261. class IKEProposalForm(NetBoxModelForm):
  262. fieldsets = (
  263. (_('Proposal'), ('name', 'description', 'tags')),
  264. (_('Parameters'), (
  265. 'authentication_method', 'encryption_algorithm', 'authentication_algorithm', 'group', 'sa_lifetime',
  266. )),
  267. )
  268. class Meta:
  269. model = IKEProposal
  270. fields = [
  271. 'name', 'description', 'authentication_method', 'encryption_algorithm', 'authentication_algorithm', 'group',
  272. 'sa_lifetime', 'comments', 'tags',
  273. ]
  274. class IKEPolicyForm(NetBoxModelForm):
  275. proposals = DynamicModelMultipleChoiceField(
  276. queryset=IKEProposal.objects.all(),
  277. label=_('Proposals')
  278. )
  279. fieldsets = (
  280. (_('Policy'), ('name', 'description', 'tags')),
  281. (_('Parameters'), ('version', 'mode', 'proposals', 'preshared_key')),
  282. )
  283. class Meta:
  284. model = IKEPolicy
  285. fields = [
  286. 'name', 'description', 'version', 'mode', 'proposals', 'preshared_key', 'comments', 'tags',
  287. ]
  288. class IPSecProposalForm(NetBoxModelForm):
  289. fieldsets = (
  290. (_('Proposal'), ('name', 'description', 'tags')),
  291. (_('Parameters'), (
  292. 'encryption_algorithm', 'authentication_algorithm', 'sa_lifetime_seconds', 'sa_lifetime_data',
  293. )),
  294. )
  295. class Meta:
  296. model = IPSecProposal
  297. fields = [
  298. 'name', 'description', 'encryption_algorithm', 'authentication_algorithm', 'sa_lifetime_seconds',
  299. 'sa_lifetime_data', 'comments', 'tags',
  300. ]
  301. class IPSecPolicyForm(NetBoxModelForm):
  302. proposals = DynamicModelMultipleChoiceField(
  303. queryset=IPSecProposal.objects.all(),
  304. label=_('Proposals')
  305. )
  306. fieldsets = (
  307. (_('Policy'), ('name', 'description', 'tags')),
  308. (_('Parameters'), ('proposals', 'pfs_group')),
  309. )
  310. class Meta:
  311. model = IPSecPolicy
  312. fields = [
  313. 'name', 'description', 'proposals', 'pfs_group', 'comments', 'tags',
  314. ]
  315. class IPSecProfileForm(NetBoxModelForm):
  316. ike_policy = DynamicModelChoiceField(
  317. queryset=IKEPolicy.objects.all(),
  318. label=_('IKE policy')
  319. )
  320. ipsec_policy = DynamicModelChoiceField(
  321. queryset=IPSecPolicy.objects.all(),
  322. label=_('IPSec policy')
  323. )
  324. comments = CommentField()
  325. fieldsets = (
  326. (_('Profile'), ('name', 'description', 'tags')),
  327. (_('Parameters'), ('mode', 'ike_policy', 'ipsec_policy')),
  328. )
  329. class Meta:
  330. model = IPSecProfile
  331. fields = [
  332. 'name', 'description', 'mode', 'ike_policy', 'ipsec_policy', 'description', 'comments', 'tags',
  333. ]
  334. #
  335. # L2VPN
  336. #
  337. class L2VPNForm(TenancyForm, NetBoxModelForm):
  338. slug = SlugField()
  339. import_targets = DynamicModelMultipleChoiceField(
  340. label=_('Import targets'),
  341. queryset=RouteTarget.objects.all(),
  342. required=False
  343. )
  344. export_targets = DynamicModelMultipleChoiceField(
  345. label=_('Export targets'),
  346. queryset=RouteTarget.objects.all(),
  347. required=False
  348. )
  349. comments = CommentField()
  350. fieldsets = (
  351. (_('L2VPN'), ('name', 'slug', 'type', 'identifier', 'description', 'tags')),
  352. (_('Route Targets'), ('import_targets', 'export_targets')),
  353. (_('Tenancy'), ('tenant_group', 'tenant')),
  354. )
  355. class Meta:
  356. model = L2VPN
  357. fields = (
  358. 'name', 'slug', 'type', 'identifier', 'import_targets', 'export_targets', 'tenant', 'description',
  359. 'comments', 'tags'
  360. )
  361. class L2VPNTerminationForm(NetBoxModelForm):
  362. l2vpn = DynamicModelChoiceField(
  363. queryset=L2VPN.objects.all(),
  364. required=True,
  365. query_params={},
  366. label=_('L2VPN'),
  367. fetch_trigger='open'
  368. )
  369. vlan = DynamicModelChoiceField(
  370. queryset=VLAN.objects.all(),
  371. required=False,
  372. selector=True,
  373. label=_('VLAN')
  374. )
  375. interface = DynamicModelChoiceField(
  376. label=_('Interface'),
  377. queryset=Interface.objects.all(),
  378. required=False,
  379. selector=True
  380. )
  381. vminterface = DynamicModelChoiceField(
  382. queryset=VMInterface.objects.all(),
  383. required=False,
  384. selector=True,
  385. label=_('Interface')
  386. )
  387. class Meta:
  388. model = L2VPNTermination
  389. fields = ('l2vpn', 'tags')
  390. def __init__(self, *args, **kwargs):
  391. instance = kwargs.get('instance')
  392. initial = kwargs.get('initial', {}).copy()
  393. if instance:
  394. if type(instance.assigned_object) is Interface:
  395. initial['interface'] = instance.assigned_object
  396. elif type(instance.assigned_object) is VLAN:
  397. initial['vlan'] = instance.assigned_object
  398. elif type(instance.assigned_object) is VMInterface:
  399. initial['vminterface'] = instance.assigned_object
  400. kwargs['initial'] = initial
  401. super().__init__(*args, **kwargs)
  402. def clean(self):
  403. super().clean()
  404. interface = self.cleaned_data.get('interface')
  405. vminterface = self.cleaned_data.get('vminterface')
  406. vlan = self.cleaned_data.get('vlan')
  407. if not (interface or vminterface or vlan):
  408. raise ValidationError(_('A termination must specify an interface or VLAN.'))
  409. if len([x for x in (interface, vminterface, vlan) if x]) > 1:
  410. raise ValidationError(_('A termination can only have one terminating object (an interface or VLAN).'))
  411. self.instance.assigned_object = interface or vminterface or vlan