| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243 |
- import copy
- import json
- from django import forms
- from django.conf import settings
- from django.forms.fields import JSONField as _JSONField
- from django.utils.translation import gettext_lazy as _
- from core.forms.mixins import SyncedDataMixin
- from core.models import *
- from netbox.config import PARAMS, get_config
- from netbox.forms import NetBoxModelForm, PrimaryModelForm
- from netbox.registry import registry
- from netbox.utils import get_data_backend_choices
- from utilities.forms import get_field_value
- from utilities.forms.fields import JSONField
- from utilities.forms.rendering import FieldSet
- from utilities.forms.widgets import HTMXSelect
- __all__ = (
- 'ConfigRevisionForm',
- 'DataSourceForm',
- 'ManagedFileForm',
- )
- EMPTY_VALUES = ('', None, [], ())
- class DataSourceForm(PrimaryModelForm):
- type = forms.ChoiceField(
- choices=get_data_backend_choices,
- widget=HTMXSelect()
- )
- class Meta:
- model = DataSource
- fields = [
- 'name', 'type', 'source_url', 'enabled', 'description', 'sync_interval', 'ignore_rules', 'owner',
- 'comments', 'tags',
- ]
- widgets = {
- 'ignore_rules': forms.Textarea(
- attrs={
- 'rows': 5,
- 'class': 'font-monospace',
- 'placeholder': '.cache\n*.txt'
- }
- ),
- }
- @property
- def fieldsets(self):
- fieldsets = [
- FieldSet(
- 'name', 'type', 'source_url', 'description', 'tags', 'ignore_rules', name=_('Source')
- ),
- FieldSet('enabled', 'sync_interval', name=_('Sync')),
- ]
- if self.backend_fields:
- fieldsets.append(
- FieldSet(*self.backend_fields, name=_('Backend Parameters'))
- )
- return fieldsets
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- # Determine the selected backend type
- backend_type = get_field_value(self, 'type')
- backend = registry['data_backends'].get(backend_type)
- # Add backend-specific form fields
- self.backend_fields = []
- if backend:
- for name, form_field in backend.parameters.items():
- field_name = f'backend_{name}'
- self.backend_fields.append(field_name)
- self.fields[field_name] = copy.copy(form_field)
- if self.instance and self.instance.parameters:
- self.fields[field_name].initial = self.instance.parameters.get(name)
- def save(self, *args, **kwargs):
- parameters = {}
- for name in self.fields:
- if name.startswith('backend_'):
- parameters[name[8:]] = self.cleaned_data[name]
- self.instance.parameters = parameters
- return super().save(*args, **kwargs)
- class ManagedFileForm(SyncedDataMixin, NetBoxModelForm):
- upload_file = forms.FileField(
- required=False
- )
- fieldsets = (
- FieldSet('upload_file', name=_('File Upload')),
- FieldSet('data_source', 'data_file', 'auto_sync_enabled', name=_('Data Source')),
- )
- class Meta:
- model = ManagedFile
- fields = ('data_source', 'data_file', 'auto_sync_enabled')
- def clean(self):
- super().clean()
- if self.cleaned_data.get('upload_file') and self.cleaned_data.get('data_file'):
- raise forms.ValidationError(_("Cannot upload a file and sync from an existing file"))
- if not self.cleaned_data.get('upload_file') and not self.cleaned_data.get('data_file'):
- raise forms.ValidationError(_("Must upload a file or select a data file to sync"))
- return self.cleaned_data
- def save(self, *args, **kwargs):
- # If a file was uploaded, save it to disk
- if self.cleaned_data['upload_file']:
- self.instance.file_path = self.cleaned_data['upload_file'].name
- with open(self.instance.full_path, 'wb+') as new_file:
- new_file.write(self.cleaned_data['upload_file'].read())
- return super().save(*args, **kwargs)
- class ConfigFormMetaclass(forms.models.ModelFormMetaclass):
- def __new__(mcs, name, bases, attrs):
- # Emulate a declared field for each supported configuration parameter
- param_fields = {}
- for param in PARAMS:
- field_kwargs = {
- 'required': False,
- 'label': param.label,
- 'help_text': param.description,
- }
- field_kwargs.update(**param.field_kwargs)
- if param.field is _JSONField:
- # Replace with our own JSONField to get pretty JSON in config editor
- param.field = JSONField
- param_fields[param.name] = param.field(**field_kwargs)
- attrs.update(param_fields)
- return super().__new__(mcs, name, bases, attrs)
- class ConfigRevisionForm(forms.ModelForm, metaclass=ConfigFormMetaclass):
- """
- Form for creating a new ConfigRevision.
- """
- fieldsets = (
- FieldSet(
- 'RACK_ELEVATION_DEFAULT_UNIT_HEIGHT', 'RACK_ELEVATION_DEFAULT_UNIT_WIDTH', name=_('Rack Elevations')
- ),
- FieldSet(
- 'POWERFEED_DEFAULT_VOLTAGE', 'POWERFEED_DEFAULT_AMPERAGE', 'POWERFEED_DEFAULT_MAX_UTILIZATION',
- name=_('Power')
- ),
- FieldSet('ENFORCE_GLOBAL_UNIQUE', 'PREFER_IPV4', name=_('IPAM')),
- FieldSet('ALLOWED_URL_SCHEMES', name=_('Security')),
- FieldSet('BANNER_LOGIN', 'BANNER_MAINTENANCE', 'BANNER_TOP', 'BANNER_BOTTOM', name=_('Banners')),
- FieldSet('PAGINATE_COUNT', 'MAX_PAGE_SIZE', name=_('Pagination')),
- FieldSet('CUSTOM_VALIDATORS', 'PROTECTION_RULES', name=_('Validation')),
- FieldSet('DEFAULT_USER_PREFERENCES', name=_('User Preferences')),
- FieldSet(
- 'MAINTENANCE_MODE', 'COPILOT_ENABLED', 'GRAPHQL_ENABLED', 'CHANGELOG_RETENTION', 'JOB_RETENTION',
- 'MAPS_URL', name=_('Miscellaneous'),
- ),
- FieldSet('comment', name=_('Config Revision'))
- )
- class Meta:
- model = ConfigRevision
- fields = '__all__'
- widgets = {
- 'BANNER_LOGIN': forms.Textarea(attrs={'class': 'font-monospace'}),
- 'BANNER_MAINTENANCE': forms.Textarea(attrs={'class': 'font-monospace'}),
- 'BANNER_TOP': forms.Textarea(attrs={'class': 'font-monospace'}),
- 'BANNER_BOTTOM': forms.Textarea(attrs={'class': 'font-monospace'}),
- 'CUSTOM_VALIDATORS': forms.Textarea(attrs={'class': 'font-monospace'}),
- 'PROTECTION_RULES': forms.Textarea(attrs={'class': 'font-monospace'}),
- 'comment': forms.Textarea(),
- }
- def __init__(self, *args, **kwargs):
- super().__init__(*args, **kwargs)
- # Append current parameter values to form field help texts and check for static configurations
- config = get_config()
- for param in PARAMS:
- value = getattr(config, param.name)
- # Set the field's initial value, if it can be serialized. (This may not be the case e.g. for
- # CUSTOM_VALIDATORS, which may reference Python objects.)
- try:
- json.dumps(value)
- if type(value) in (tuple, list):
- self.fields[param.name].initial = ', '.join(value)
- else:
- self.fields[param.name].initial = value
- except TypeError:
- pass
- # Check whether this parameter is statically configured (e.g. in configuration.py)
- if hasattr(settings, param.name):
- self.fields[param.name].disabled = True
- self.fields[param.name].help_text = _(
- 'This parameter has been defined statically and cannot be modified.'
- )
- continue
- # Set the field's help text
- help_text = self.fields[param.name].help_text
- if help_text:
- help_text += '<br />' # Line break
- help_text += _('Current value: <strong>{value}</strong>').format(value=value or '—')
- if value == param.default:
- help_text += _(' (default)')
- self.fields[param.name].help_text = help_text
- def save(self, commit=True):
- instance = super().save(commit=False)
- # Populate JSON data on the instance
- instance.data = self.render_json()
- if commit:
- instance.save()
- return instance
- def render_json(self):
- json = {}
- # Iterate through each field and populate non-empty values
- for field_name in self.declared_fields:
- if field_name in self.cleaned_data and self.cleaned_data[field_name] not in EMPTY_VALUES:
- json[field_name] = self.cleaned_data[field_name]
- return json
|