| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494 |
- from django import forms
- from django.core.exceptions import ValidationError
- from django.utils.translation import gettext_lazy as _
- from dcim.models import Device, Interface
- from ipam.models import IPAddress, RouteTarget, VLAN
- from netbox.forms import NetBoxModelForm
- from tenancy.forms import TenancyForm
- from utilities.forms.fields import CommentField, DynamicModelChoiceField, DynamicModelMultipleChoiceField, SlugField
- from utilities.forms.rendering import FieldSet, TabbedGroups
- from utilities.forms.utils import add_blank_choice, get_field_value
- from utilities.forms.widgets import HTMXSelect
- from virtualization.models import VirtualMachine, VMInterface
- from vpn.choices import *
- from vpn.models import *
- __all__ = (
- 'IKEPolicyForm',
- 'IKEProposalForm',
- 'IPSecPolicyForm',
- 'IPSecProfileForm',
- 'IPSecProposalForm',
- 'L2VPNForm',
- 'L2VPNTerminationForm',
- 'TunnelCreateForm',
- 'TunnelForm',
- 'TunnelGroupForm',
- 'TunnelTerminationForm',
- )
- class TunnelGroupForm(NetBoxModelForm):
- slug = SlugField()
- fieldsets = (
- FieldSet('name', 'slug', 'description', 'tags', name=_('Tunnel Group')),
- )
- class Meta:
- model = TunnelGroup
- fields = [
- 'name', 'slug', 'description', 'tags',
- ]
- class TunnelForm(TenancyForm, NetBoxModelForm):
- group = DynamicModelChoiceField(
- queryset=TunnelGroup.objects.all(),
- label=_('Tunnel Group'),
- required=False,
- quick_add=True
- )
- ipsec_profile = DynamicModelChoiceField(
- queryset=IPSecProfile.objects.all(),
- label=_('IPSec Profile'),
- required=False
- )
- comments = CommentField()
- fieldsets = (
- FieldSet('name', 'status', 'group', 'encapsulation', 'description', 'tunnel_id', 'tags', name=_('Tunnel')),
- FieldSet('ipsec_profile', name=_('Security')),
- FieldSet('tenant_group', 'tenant', name=_('Tenancy')),
- )
- class Meta:
- model = Tunnel
- fields = [
- 'name', 'status', 'group', 'encapsulation', 'description', 'tunnel_id', 'ipsec_profile', 'tenant_group',
- 'tenant', 'comments', 'tags',
- ]
- class TunnelCreateForm(TunnelForm):
- # First termination
- termination1_role = forms.ChoiceField(
- choices=add_blank_choice(TunnelTerminationRoleChoices),
- required=False,
- label=_('Role')
- )
- termination1_type = forms.ChoiceField(
- choices=TunnelTerminationTypeChoices,
- required=False,
- widget=HTMXSelect(),
- label=_('Type')
- )
- termination1_parent = DynamicModelChoiceField(
- queryset=Device.objects.all(),
- required=False,
- selector=True,
- label=_('Device')
- )
- termination1_termination = DynamicModelChoiceField(
- queryset=Interface.objects.all(),
- required=False,
- label=_('Tunnel interface'),
- query_params={
- 'device_id': '$termination1_parent',
- }
- )
- termination1_outside_ip = DynamicModelChoiceField(
- queryset=IPAddress.objects.all(),
- label=_('Outside IP'),
- required=False,
- query_params={
- 'device_id': '$termination1_parent',
- }
- )
- # Second termination
- termination2_role = forms.ChoiceField(
- choices=add_blank_choice(TunnelTerminationRoleChoices),
- required=False,
- label=_('Role')
- )
- termination2_type = forms.ChoiceField(
- choices=TunnelTerminationTypeChoices,
- required=False,
- widget=HTMXSelect(),
- label=_('Type')
- )
- termination2_parent = DynamicModelChoiceField(
- queryset=Device.objects.all(),
- required=False,
- selector=True,
- label=_('Device')
- )
- termination2_termination = DynamicModelChoiceField(
- queryset=Interface.objects.all(),
- required=False,
- label=_('Tunnel interface'),
- query_params={
- 'device_id': '$termination2_parent',
- }
- )
- termination2_outside_ip = DynamicModelChoiceField(
- queryset=IPAddress.objects.all(),
- required=False,
- label=_('Outside IP'),
- query_params={
- 'device_id': '$termination2_parent',
- }
- )
- fieldsets = (
- FieldSet('name', 'status', 'group', 'encapsulation', 'description', 'tunnel_id', 'tags', name=_('Tunnel')),
- FieldSet('ipsec_profile', name=_('Security')),
- FieldSet('tenant_group', 'tenant', name=_('Tenancy')),
- FieldSet(
- 'termination1_role', 'termination1_type', 'termination1_parent', 'termination1_termination',
- 'termination1_outside_ip', name=_('First Termination')),
- FieldSet(
- 'termination2_role', 'termination2_type', 'termination2_parent', 'termination2_termination',
- 'termination2_outside_ip', name=_('Second Termination')),
- )
- def __init__(self, *args, initial=None, **kwargs):
- super().__init__(*args, initial=initial, **kwargs)
- if get_field_value(self, 'termination1_type') == TunnelTerminationTypeChoices.TYPE_VIRTUALMACHINE:
- self.fields['termination1_parent'].label = _('Virtual Machine')
- self.fields['termination1_parent'].queryset = VirtualMachine.objects.all()
- self.fields['termination1_termination'].queryset = VMInterface.objects.all()
- self.fields['termination1_termination'].widget.add_query_params({
- 'virtual_machine_id': '$termination1_parent',
- })
- self.fields['termination1_outside_ip'].widget.add_query_params({
- 'virtual_machine_id': '$termination1_parent',
- })
- if get_field_value(self, 'termination2_type') == TunnelTerminationTypeChoices.TYPE_VIRTUALMACHINE:
- self.fields['termination2_parent'].label = _('Virtual Machine')
- self.fields['termination2_parent'].queryset = VirtualMachine.objects.all()
- self.fields['termination2_termination'].queryset = VMInterface.objects.all()
- self.fields['termination2_termination'].widget.add_query_params({
- 'virtual_machine_id': '$termination2_parent',
- })
- self.fields['termination2_outside_ip'].widget.add_query_params({
- 'virtual_machine_id': '$termination2_parent',
- })
- def clean(self):
- super().clean()
- # Validate attributes for each termination (if any)
- for term in ('termination1', 'termination2'):
- required_parameters = (
- f'{term}_role', f'{term}_parent', f'{term}_termination',
- )
- parameters = (
- *required_parameters,
- f'{term}_outside_ip',
- )
- if any([self.cleaned_data[param] for param in parameters]):
- for param in required_parameters:
- if not self.cleaned_data[param]:
- raise forms.ValidationError({
- param: _("This parameter is required when defining a termination.")
- })
- def save(self, *args, **kwargs):
- instance = super().save(*args, **kwargs)
- # Create first termination
- if self.cleaned_data['termination1_termination']:
- TunnelTermination.objects.create(
- tunnel=instance,
- role=self.cleaned_data['termination1_role'],
- termination=self.cleaned_data['termination1_termination'],
- outside_ip=self.cleaned_data['termination1_outside_ip'],
- )
- # Create second termination, if defined
- if self.cleaned_data['termination2_termination']:
- TunnelTermination.objects.create(
- tunnel=instance,
- role=self.cleaned_data['termination2_role'],
- termination=self.cleaned_data['termination2_termination'],
- outside_ip=self.cleaned_data.get('termination2_outside_ip'),
- )
- return instance
- class TunnelTerminationForm(NetBoxModelForm):
- tunnel = DynamicModelChoiceField(
- queryset=Tunnel.objects.all()
- )
- type = forms.ChoiceField(
- choices=TunnelTerminationTypeChoices,
- widget=HTMXSelect(),
- label=_('Type')
- )
- parent = DynamicModelChoiceField(
- queryset=Device.objects.all(),
- selector=True,
- label=_('Device')
- )
- termination = DynamicModelChoiceField(
- queryset=Interface.objects.all(),
- label=_('Tunnel interface'),
- query_params={
- 'device_id': '$parent',
- }
- )
- outside_ip = DynamicModelChoiceField(
- queryset=IPAddress.objects.all(),
- label=_('Outside IP'),
- required=False,
- query_params={
- 'device_id': '$parent',
- }
- )
- fieldsets = (
- FieldSet('tunnel', 'role', 'type', 'parent', 'termination', 'outside_ip', 'tags'),
- )
- class Meta:
- model = TunnelTermination
- fields = [
- 'tunnel', 'role', 'outside_ip', 'tags',
- ]
- def __init__(self, *args, initial=None, **kwargs):
- super().__init__(*args, initial=initial, **kwargs)
- if (get_field_value(self, 'type') is None and
- self.instance.pk and isinstance(self.instance.termination.parent_object, VirtualMachine)):
- self.fields['type'].initial = TunnelTerminationTypeChoices.TYPE_VIRTUALMACHINE
- # If initial or self.data is set and the type is a VIRTUALMACHINE type, swap the field querysets.
- if get_field_value(self, 'type') == TunnelTerminationTypeChoices.TYPE_VIRTUALMACHINE:
- self.fields['parent'].label = _('Virtual Machine')
- self.fields['parent'].queryset = VirtualMachine.objects.all()
- self.fields['parent'].widget.attrs['selector'] = 'virtualization.virtualmachine'
- self.fields['termination'].queryset = VMInterface.objects.all()
- self.fields['termination'].widget.add_query_params({
- 'virtual_machine_id': '$parent',
- })
- self.fields['outside_ip'].widget.add_query_params({
- 'virtual_machine_id': '$parent',
- })
- if self.instance.pk:
- self.fields['parent'].initial = self.instance.termination.parent_object
- self.fields['termination'].initial = self.instance.termination
- def clean(self):
- super().clean()
- # Set the terminated object
- self.instance.termination = self.cleaned_data.get('termination')
- class IKEProposalForm(NetBoxModelForm):
- fieldsets = (
- FieldSet('name', 'description', 'tags', name=_('Proposal')),
- FieldSet(
- 'authentication_method', 'encryption_algorithm', 'authentication_algorithm', 'group', 'sa_lifetime',
- name=_('Parameters')
- ),
- )
- class Meta:
- model = IKEProposal
- fields = [
- 'name', 'description', 'authentication_method', 'encryption_algorithm', 'authentication_algorithm', 'group',
- 'sa_lifetime', 'comments', 'tags',
- ]
- class IKEPolicyForm(NetBoxModelForm):
- proposals = DynamicModelMultipleChoiceField(
- queryset=IKEProposal.objects.all(),
- label=_('Proposals'),
- quick_add=True
- )
- fieldsets = (
- FieldSet('name', 'description', 'tags', name=_('Policy')),
- FieldSet('version', 'mode', 'proposals', 'preshared_key', name=_('Parameters')),
- )
- class Meta:
- model = IKEPolicy
- fields = [
- 'name', 'description', 'version', 'mode', 'proposals', 'preshared_key', 'comments', 'tags',
- ]
- class IPSecProposalForm(NetBoxModelForm):
- fieldsets = (
- FieldSet('name', 'description', 'tags', name=_('Proposal')),
- FieldSet(
- 'encryption_algorithm', 'authentication_algorithm', 'sa_lifetime_seconds', 'sa_lifetime_data',
- name=_('Parameters')
- ),
- )
- class Meta:
- model = IPSecProposal
- fields = [
- 'name', 'description', 'encryption_algorithm', 'authentication_algorithm', 'sa_lifetime_seconds',
- 'sa_lifetime_data', 'comments', 'tags',
- ]
- class IPSecPolicyForm(NetBoxModelForm):
- proposals = DynamicModelMultipleChoiceField(
- queryset=IPSecProposal.objects.all(),
- label=_('Proposals'),
- quick_add=True
- )
- fieldsets = (
- FieldSet('name', 'description', 'tags', name=_('Policy')),
- FieldSet('proposals', 'pfs_group', name=_('Parameters')),
- )
- class Meta:
- model = IPSecPolicy
- fields = [
- 'name', 'description', 'proposals', 'pfs_group', 'comments', 'tags',
- ]
- class IPSecProfileForm(NetBoxModelForm):
- ike_policy = DynamicModelChoiceField(
- queryset=IKEPolicy.objects.all(),
- label=_('IKE policy')
- )
- ipsec_policy = DynamicModelChoiceField(
- queryset=IPSecPolicy.objects.all(),
- label=_('IPSec policy')
- )
- comments = CommentField()
- fieldsets = (
- FieldSet('name', 'description', 'tags', name=_('Profile')),
- FieldSet('mode', 'ike_policy', 'ipsec_policy', name=_('Parameters')),
- )
- class Meta:
- model = IPSecProfile
- fields = [
- 'name', 'description', 'mode', 'ike_policy', 'ipsec_policy', 'description', 'comments', 'tags',
- ]
- #
- # L2VPN
- #
- class L2VPNForm(TenancyForm, NetBoxModelForm):
- slug = SlugField()
- import_targets = DynamicModelMultipleChoiceField(
- label=_('Import targets'),
- queryset=RouteTarget.objects.all(),
- required=False
- )
- export_targets = DynamicModelMultipleChoiceField(
- label=_('Export targets'),
- queryset=RouteTarget.objects.all(),
- required=False
- )
- comments = CommentField()
- fieldsets = (
- FieldSet('name', 'slug', 'type', 'identifier', 'description', 'tags', name=_('L2VPN')),
- FieldSet('import_targets', 'export_targets', name=_('Route Targets')),
- FieldSet('tenant_group', 'tenant', name=_('Tenancy')),
- )
- class Meta:
- model = L2VPN
- fields = (
- 'name', 'slug', 'type', 'identifier', 'import_targets', 'export_targets', 'tenant', 'description',
- 'comments', 'tags'
- )
- class L2VPNTerminationForm(NetBoxModelForm):
- l2vpn = DynamicModelChoiceField(
- queryset=L2VPN.objects.all(),
- required=True,
- query_params={},
- label=_('L2VPN')
- )
- vlan = DynamicModelChoiceField(
- queryset=VLAN.objects.all(),
- required=False,
- selector=True,
- label=_('VLAN')
- )
- interface = DynamicModelChoiceField(
- label=_('Interface'),
- queryset=Interface.objects.all(),
- required=False,
- selector=True
- )
- vminterface = DynamicModelChoiceField(
- queryset=VMInterface.objects.all(),
- required=False,
- selector=True,
- label=_('Interface')
- )
- fieldsets = (
- FieldSet(
- 'l2vpn',
- TabbedGroups(
- FieldSet('vlan', name=_('VLAN')),
- FieldSet('interface', name=_('Device')),
- FieldSet('vminterface', name=_('Virtual Machine')),
- ),
- 'tags',
- ),
- )
- class Meta:
- model = L2VPNTermination
- fields = ('l2vpn', 'tags')
- def __init__(self, *args, **kwargs):
- instance = kwargs.get('instance')
- initial = kwargs.get('initial', {}).copy()
- if instance:
- if type(instance.assigned_object) is Interface:
- initial['interface'] = instance.assigned_object
- elif type(instance.assigned_object) is VLAN:
- initial['vlan'] = instance.assigned_object
- elif type(instance.assigned_object) is VMInterface:
- initial['vminterface'] = instance.assigned_object
- kwargs['initial'] = initial
- super().__init__(*args, **kwargs)
- def clean(self):
- super().clean()
- interface = self.cleaned_data.get('interface')
- vminterface = self.cleaned_data.get('vminterface')
- vlan = self.cleaned_data.get('vlan')
- if not (interface or vminterface or vlan):
- raise ValidationError(_('A termination must specify an interface or VLAN.'))
- if len([x for x in (interface, vminterface, vlan) if x]) > 1:
- raise ValidationError(_('A termination can only have one terminating object (an interface or VLAN).'))
- self.instance.assigned_object = interface or vminterface or vlan
|