| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431 |
- import json
- from django import forms
- from django.conf import settings
- from django.contrib.auth import password_validation
- from django.contrib.postgres.forms import SimpleArrayField
- from django.core.exceptions import FieldError
- from django.utils.safestring import mark_safe
- from django.utils.translation import gettext_lazy as _
- from core.models import ObjectType
- from ipam.formfields import IPNetworkFormField
- from ipam.validators import prefix_validator
- from netbox.preferences import PREFERENCES
- from users.choices import TokenVersionChoices
- from users.constants import *
- from users.models import *
- from utilities.data import flatten_dict
- from utilities.forms.fields import ContentTypeMultipleChoiceField, DynamicModelMultipleChoiceField, JSONField
- from utilities.forms.rendering import FieldSet
- from utilities.forms.widgets import DateTimePicker, SplitMultiSelectWidget
- from utilities.permissions import qs_filter_from_constraints
- __all__ = (
- 'GroupForm',
- 'ObjectPermissionForm',
- 'TokenForm',
- 'UserConfigForm',
- 'UserForm',
- 'UserTokenForm',
- 'TokenForm',
- )
- class UserConfigFormMetaclass(forms.models.ModelFormMetaclass):
- def __new__(mcs, name, bases, attrs):
- # Emulate a declared field for each supported user preference
- preference_fields = {}
- for field_name, preference in PREFERENCES.items():
- help_text = f'<code>{field_name}</code>'
- if preference.description:
- help_text = f'{preference.description}<br />{help_text}'
- if warning := preference.warning:
- help_text = f'<span class="text-danger"><i class="mdi mdi-alert"></i> {warning}</span><br />{help_text}'
- field_kwargs = {
- 'label': preference.label,
- 'choices': preference.choices,
- 'help_text': mark_safe(help_text),
- 'coerce': preference.coerce,
- 'required': False,
- 'widget': forms.Select,
- }
- preference_fields[field_name] = forms.TypedChoiceField(**field_kwargs)
- attrs.update(preference_fields)
- return super().__new__(mcs, name, bases, attrs)
- class UserConfigForm(forms.ModelForm, metaclass=UserConfigFormMetaclass):
- fieldsets = (
- FieldSet(
- 'locale.language', 'pagination.per_page', 'pagination.placement', 'ui.htmx_navigation',
- 'ui.tables.striping',
- name=_('User Interface')
- ),
- FieldSet('data_format', 'csv_delimiter', name=_('Miscellaneous')),
- )
- # List of clearable preferences
- pk = forms.MultipleChoiceField(
- choices=[],
- required=False
- )
- class Meta:
- model = UserConfig
- fields = ()
- def __init__(self, *args, instance=None, **kwargs):
- # Get initial data from UserConfig instance
- initial_data = flatten_dict(instance.data)
- kwargs['initial'] = initial_data
- super().__init__(*args, instance=instance, **kwargs)
- # Compile clearable preference choices
- self.fields['pk'].choices = (
- (f'tables.{table_name}', '') for table_name in instance.data.get('tables', [])
- )
- def save(self, *args, **kwargs):
- # Set UserConfig data
- for pref_name, value in self.cleaned_data.items():
- if pref_name == 'pk':
- continue
- self.instance.set(pref_name, value, commit=False)
- # Clear selected preferences
- for preference in self.cleaned_data['pk']:
- self.instance.clear(preference)
- return super().save(*args, **kwargs)
- @property
- def plugin_fields(self):
- return [
- name for name in self.fields.keys() if name.startswith('plugins.')
- ]
- class UserTokenForm(forms.ModelForm):
- token = forms.CharField(
- label=_('Token'),
- help_text=_(
- 'Tokens must be at least 40 characters in length. <strong>Be sure to record your key</strong> prior to '
- 'submitting this form, as it may no longer be accessible once the token has been created.'
- ),
- widget=forms.TextInput(
- attrs={'data-clipboard': 'true'}
- )
- )
- allowed_ips = SimpleArrayField(
- base_field=IPNetworkFormField(validators=[prefix_validator]),
- required=False,
- label=_('Allowed IPs'),
- help_text=_(
- 'Allowed IPv4/IPv6 networks from where the token can be used. Leave blank for no restrictions. '
- 'Example: <code>10.1.1.0/24,192.168.10.16/32,2001:db8:1::/64</code>'
- ),
- )
- class Meta:
- model = Token
- fields = [
- 'version', 'token', 'write_enabled', 'expires', 'description', 'allowed_ips',
- ]
- widgets = {
- 'expires': DateTimePicker(),
- }
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- if self.instance.pk:
- # Disable the version & user fields for existing Tokens
- self.fields['version'].disabled = True
- self.fields['user'].disabled = True
- # Omit the key field when editing an existing token if token retrieval is not permitted
- if self.instance.v1 and settings.ALLOW_TOKEN_RETRIEVAL:
- self.initial['token'] = self.instance.plaintext
- else:
- del self.fields['token']
- # Generate an initial random key if none has been specified
- elif self.instance._state.adding and not self.initial.get('token'):
- self.initial['version'] = TokenVersionChoices.V2
- self.initial['token'] = Token.generate()
- def save(self, commit=True):
- if self.instance._state.adding and self.cleaned_data.get('token'):
- self.instance.token = self.cleaned_data['token']
- return super().save(commit=commit)
- class TokenForm(UserTokenForm):
- user = forms.ModelChoiceField(
- queryset=User.objects.order_by('username'),
- label=_('User')
- )
- class Meta(UserTokenForm.Meta):
- fields = [
- 'version', 'token', 'user', 'write_enabled', 'expires', 'description', 'allowed_ips',
- ]
- class UserForm(forms.ModelForm):
- password = forms.CharField(
- label=_('Password'),
- widget=forms.PasswordInput(),
- required=True,
- )
- confirm_password = forms.CharField(
- label=_('Confirm password'),
- widget=forms.PasswordInput(),
- required=True,
- help_text=_("Enter the same password as before, for verification."),
- )
- groups = DynamicModelMultipleChoiceField(
- label=_('Groups'),
- required=False,
- queryset=Group.objects.all()
- )
- object_permissions = DynamicModelMultipleChoiceField(
- required=False,
- label=_('Permissions'),
- queryset=ObjectPermission.objects.all()
- )
- fieldsets = (
- FieldSet('username', 'password', 'confirm_password', 'first_name', 'last_name', 'email', name=_('User')),
- FieldSet('groups', name=_('Groups')),
- FieldSet('is_active', 'is_superuser', name=_('Status')),
- FieldSet('object_permissions', name=_('Permissions')),
- )
- class Meta:
- model = User
- fields = [
- 'username', 'first_name', 'last_name', 'email', 'groups', 'object_permissions',
- 'is_active', 'is_superuser',
- ]
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- if self.instance.pk:
- # Password fields are optional for existing Users
- self.fields['password'].required = False
- self.fields['confirm_password'].required = False
- def save(self, *args, **kwargs):
- instance = super().save(*args, **kwargs)
- # On edit, check if we have to save the password
- if self.cleaned_data.get('password'):
- instance.set_password(self.cleaned_data.get('password'))
- instance.save()
- return instance
- def clean(self):
- # Check that password confirmation matches if password is set
- if self.cleaned_data['password'] and self.cleaned_data['password'] != self.cleaned_data['confirm_password']:
- raise forms.ValidationError(_("Passwords do not match! Please check your input and try again."))
- # Enforce password validation rules (if configured)
- if self.cleaned_data['password']:
- password_validation.validate_password(self.cleaned_data['password'], self.instance)
- class GroupForm(forms.ModelForm):
- users = DynamicModelMultipleChoiceField(
- label=_('Users'),
- required=False,
- queryset=User.objects.all()
- )
- object_permissions = DynamicModelMultipleChoiceField(
- required=False,
- label=_('Permissions'),
- queryset=ObjectPermission.objects.all()
- )
- fieldsets = (
- FieldSet('name', 'description'),
- FieldSet('users', name=_('Users')),
- FieldSet('object_permissions', name=_('Permissions')),
- )
- class Meta:
- model = Group
- fields = [
- 'name', 'description', 'users', 'object_permissions',
- ]
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- # Populate assigned users and permissions
- if self.instance.pk:
- self.fields['users'].initial = self.instance.users.values_list('id', flat=True)
- def save(self, *args, **kwargs):
- instance = super().save(*args, **kwargs)
- # Update assigned users
- instance.users.set(self.cleaned_data['users'])
- return instance
- def get_object_types_choices():
- return [
- (ot.pk, str(ot))
- for ot in ObjectType.objects.filter(OBJECTPERMISSION_OBJECT_TYPES).order_by('app_label', 'model')
- ]
- class ObjectPermissionForm(forms.ModelForm):
- object_types = ContentTypeMultipleChoiceField(
- label=_('Object types'),
- queryset=ObjectType.objects.all(),
- widget=SplitMultiSelectWidget(
- choices=get_object_types_choices
- ),
- help_text=_('Select the types of objects to which the permission will appy.')
- )
- can_view = forms.BooleanField(
- required=False
- )
- can_add = forms.BooleanField(
- required=False
- )
- can_change = forms.BooleanField(
- required=False
- )
- can_delete = forms.BooleanField(
- required=False
- )
- actions = SimpleArrayField(
- label=_('Additional actions'),
- base_field=forms.CharField(),
- required=False,
- help_text=_('Actions granted in addition to those listed above')
- )
- users = DynamicModelMultipleChoiceField(
- label=_('Users'),
- required=False,
- queryset=User.objects.all()
- )
- groups = DynamicModelMultipleChoiceField(
- label=_('Groups'),
- required=False,
- queryset=Group.objects.all()
- )
- constraints = JSONField(
- required=False,
- label=_('Constraints'),
- help_text=_(
- 'JSON expression of a queryset filter that will return only permitted objects. Leave null '
- 'to match all objects of this type. A list of multiple objects will result in a logical OR '
- 'operation.'
- ),
- )
- fieldsets = (
- FieldSet('name', 'description', 'enabled'),
- FieldSet('can_view', 'can_add', 'can_change', 'can_delete', 'actions', name=_('Actions')),
- FieldSet('object_types', name=_('Objects')),
- FieldSet('groups', 'users', name=_('Assignment')),
- FieldSet('constraints', name=_('Constraints')),
- )
- class Meta:
- model = ObjectPermission
- fields = [
- 'name', 'description', 'enabled', 'object_types', 'users', 'groups', 'constraints', 'actions',
- ]
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- # Make the actions field optional since the form uses it only for non-CRUD actions
- self.fields['actions'].required = False
- # Prepare the appropriate fields when editing an existing ObjectPermission
- if self.instance.pk:
- # Populate assigned users and groups
- self.fields['groups'].initial = self.instance.groups.values_list('id', flat=True)
- self.fields['users'].initial = self.instance.users.values_list('id', flat=True)
- # Check the appropriate checkboxes when editing an existing ObjectPermission
- for action in ['view', 'add', 'change', 'delete']:
- if action in self.instance.actions:
- self.fields[f'can_{action}'].initial = True
- self.instance.actions.remove(action)
- # Populate initial data for a new ObjectPermission
- elif self.initial:
- # Handle cloned objects - actions come from initial data (URL parameters)
- if 'actions' in self.initial:
- if cloned_actions := self.initial['actions']:
- for action in ['view', 'add', 'change', 'delete']:
- if action in cloned_actions:
- self.fields[f'can_{action}'].initial = True
- self.initial['actions'].remove(action)
- # Convert data delivered via initial data to JSON data
- if 'constraints' in self.initial:
- if type(self.initial['constraints']) is str:
- self.initial['constraints'] = json.loads(self.initial['constraints'])
- def clean(self):
- super().clean()
- object_types = self.cleaned_data.get('object_types')
- constraints = self.cleaned_data.get('constraints')
- # Append any of the selected CRUD checkboxes to the actions list
- if not self.cleaned_data.get('actions'):
- self.cleaned_data['actions'] = list()
- for action in ['view', 'add', 'change', 'delete']:
- if self.cleaned_data[f'can_{action}'] and action not in self.cleaned_data['actions']:
- self.cleaned_data['actions'].append(action)
- # At least one action must be specified
- if not self.cleaned_data['actions']:
- raise forms.ValidationError(_("At least one action must be selected."))
- # Validate the specified model constraints by attempting to execute a query. We don't care whether the query
- # returns anything; we just want to make sure the specified constraints are valid.
- if object_types and constraints:
- # Normalize the constraints to a list of dicts
- if type(constraints) is not list:
- constraints = [constraints]
- for ct in object_types:
- model = ct.model_class()
- try:
- tokens = {
- CONSTRAINT_TOKEN_USER: 0, # Replace token with a null user ID
- }
- model.objects.filter(qs_filter_from_constraints(constraints, tokens)).exists()
- except (FieldError, ValueError) as e:
- raise forms.ValidationError({
- 'constraints': _('Invalid filter for {model}: {error}').format(model=model, error=e)
- })
- def save(self, *args, **kwargs):
- instance = super().save(*args, **kwargs)
- # Update assigned users and groups
- instance.users.set(self.cleaned_data['users'])
- instance.groups.set(self.cleaned_data['groups'])
- return instance
|