model_forms.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. import copy
  2. import json
  3. from django import forms
  4. from django.conf import settings
  5. from django.forms.fields import JSONField as _JSONField
  6. from django.utils.translation import gettext_lazy as _
  7. from core.forms.mixins import SyncedDataMixin
  8. from core.models import *
  9. from netbox.config import PARAMS, get_config
  10. from netbox.forms import NetBoxModelForm, PrimaryModelForm
  11. from netbox.registry import registry
  12. from netbox.utils import get_data_backend_choices
  13. from utilities.forms import get_field_value
  14. from utilities.forms.fields import JSONField
  15. from utilities.forms.rendering import FieldSet
  16. from utilities.forms.widgets import HTMXSelect
  17. __all__ = (
  18. 'ConfigRevisionForm',
  19. 'DataSourceForm',
  20. 'ManagedFileForm',
  21. )
  22. EMPTY_VALUES = ('', None, [], ())
  23. class DataSourceForm(PrimaryModelForm):
  24. type = forms.ChoiceField(
  25. choices=get_data_backend_choices,
  26. widget=HTMXSelect()
  27. )
  28. class Meta:
  29. model = DataSource
  30. fields = [
  31. 'name', 'type', 'source_url', 'enabled', 'description', 'sync_interval', 'ignore_rules', 'owner',
  32. 'comments', 'tags',
  33. ]
  34. widgets = {
  35. 'ignore_rules': forms.Textarea(
  36. attrs={
  37. 'rows': 5,
  38. 'class': 'font-monospace',
  39. 'placeholder': '.cache\n*.txt'
  40. }
  41. ),
  42. }
  43. @property
  44. def fieldsets(self):
  45. fieldsets = [
  46. FieldSet(
  47. 'name', 'type', 'source_url', 'description', 'tags', 'ignore_rules', name=_('Source')
  48. ),
  49. FieldSet('enabled', 'sync_interval', name=_('Sync')),
  50. ]
  51. if self.backend_fields:
  52. fieldsets.append(
  53. FieldSet(*self.backend_fields, name=_('Backend Parameters'))
  54. )
  55. return fieldsets
  56. def __init__(self, *args, **kwargs):
  57. super().__init__(*args, **kwargs)
  58. # Determine the selected backend type
  59. backend_type = get_field_value(self, 'type')
  60. backend = registry['data_backends'].get(backend_type)
  61. # Add backend-specific form fields
  62. self.backend_fields = []
  63. if backend:
  64. for name, form_field in backend.parameters.items():
  65. field_name = f'backend_{name}'
  66. self.backend_fields.append(field_name)
  67. self.fields[field_name] = copy.copy(form_field)
  68. if self.instance and self.instance.parameters:
  69. self.fields[field_name].initial = self.instance.parameters.get(name)
  70. def save(self, *args, **kwargs):
  71. parameters = {}
  72. for name in self.fields:
  73. if name.startswith('backend_'):
  74. parameters[name[8:]] = self.cleaned_data[name]
  75. self.instance.parameters = parameters
  76. return super().save(*args, **kwargs)
  77. class ManagedFileForm(SyncedDataMixin, NetBoxModelForm):
  78. upload_file = forms.FileField(
  79. required=False
  80. )
  81. fieldsets = (
  82. FieldSet('upload_file', name=_('File Upload')),
  83. FieldSet('data_source', 'data_file', 'auto_sync_enabled', name=_('Data Source')),
  84. )
  85. class Meta:
  86. model = ManagedFile
  87. fields = ('data_source', 'data_file', 'auto_sync_enabled')
  88. def clean(self):
  89. super().clean()
  90. if self.cleaned_data.get('upload_file') and self.cleaned_data.get('data_file'):
  91. raise forms.ValidationError(_("Cannot upload a file and sync from an existing file"))
  92. if not self.cleaned_data.get('upload_file') and not self.cleaned_data.get('data_file'):
  93. raise forms.ValidationError(_("Must upload a file or select a data file to sync"))
  94. return self.cleaned_data
  95. def save(self, *args, **kwargs):
  96. # If a file was uploaded, save it to disk
  97. if self.cleaned_data['upload_file']:
  98. self.instance.file_path = self.cleaned_data['upload_file'].name
  99. with open(self.instance.full_path, 'wb+') as new_file:
  100. new_file.write(self.cleaned_data['upload_file'].read())
  101. return super().save(*args, **kwargs)
  102. class ConfigFormMetaclass(forms.models.ModelFormMetaclass):
  103. def __new__(mcs, name, bases, attrs):
  104. # Emulate a declared field for each supported configuration parameter
  105. param_fields = {}
  106. for param in PARAMS:
  107. field_kwargs = {
  108. 'required': False,
  109. 'label': param.label,
  110. 'help_text': param.description,
  111. }
  112. field_kwargs.update(**param.field_kwargs)
  113. if param.field is _JSONField:
  114. # Replace with our own JSONField to get pretty JSON in config editor
  115. param.field = JSONField
  116. param_fields[param.name] = param.field(**field_kwargs)
  117. attrs.update(param_fields)
  118. return super().__new__(mcs, name, bases, attrs)
  119. class ConfigRevisionForm(forms.ModelForm, metaclass=ConfigFormMetaclass):
  120. """
  121. Form for creating a new ConfigRevision.
  122. """
  123. fieldsets = (
  124. FieldSet(
  125. 'RACK_ELEVATION_DEFAULT_UNIT_HEIGHT', 'RACK_ELEVATION_DEFAULT_UNIT_WIDTH', name=_('Rack Elevations')
  126. ),
  127. FieldSet(
  128. 'POWERFEED_DEFAULT_VOLTAGE', 'POWERFEED_DEFAULT_AMPERAGE', 'POWERFEED_DEFAULT_MAX_UTILIZATION',
  129. name=_('Power')
  130. ),
  131. FieldSet('ENFORCE_GLOBAL_UNIQUE', 'PREFER_IPV4', name=_('IPAM')),
  132. FieldSet('ALLOWED_URL_SCHEMES', name=_('Security')),
  133. FieldSet('BANNER_LOGIN', 'BANNER_MAINTENANCE', 'BANNER_TOP', 'BANNER_BOTTOM', name=_('Banners')),
  134. FieldSet('PAGINATE_COUNT', 'MAX_PAGE_SIZE', name=_('Pagination')),
  135. FieldSet('CUSTOM_VALIDATORS', 'PROTECTION_RULES', name=_('Validation')),
  136. FieldSet('DEFAULT_USER_PREFERENCES', name=_('User Preferences')),
  137. FieldSet(
  138. 'MAINTENANCE_MODE', 'COPILOT_ENABLED', 'GRAPHQL_ENABLED', 'CHANGELOG_RETENTION', 'JOB_RETENTION',
  139. 'MAPS_URL', name=_('Miscellaneous'),
  140. ),
  141. FieldSet('comment', name=_('Config Revision'))
  142. )
  143. class Meta:
  144. model = ConfigRevision
  145. fields = '__all__'
  146. widgets = {
  147. 'BANNER_LOGIN': forms.Textarea(attrs={'class': 'font-monospace'}),
  148. 'BANNER_MAINTENANCE': forms.Textarea(attrs={'class': 'font-monospace'}),
  149. 'BANNER_TOP': forms.Textarea(attrs={'class': 'font-monospace'}),
  150. 'BANNER_BOTTOM': forms.Textarea(attrs={'class': 'font-monospace'}),
  151. 'CUSTOM_VALIDATORS': forms.Textarea(attrs={'class': 'font-monospace'}),
  152. 'PROTECTION_RULES': forms.Textarea(attrs={'class': 'font-monospace'}),
  153. 'comment': forms.Textarea(),
  154. }
  155. def __init__(self, *args, **kwargs):
  156. super().__init__(*args, **kwargs)
  157. # Append current parameter values to form field help texts and check for static configurations
  158. config = get_config()
  159. for param in PARAMS:
  160. value = getattr(config, param.name)
  161. # Set the field's initial value, if it can be serialized. (This may not be the case e.g. for
  162. # CUSTOM_VALIDATORS, which may reference Python objects.)
  163. try:
  164. json.dumps(value)
  165. if type(value) in (tuple, list):
  166. self.fields[param.name].initial = ', '.join(value)
  167. else:
  168. self.fields[param.name].initial = value
  169. except TypeError:
  170. pass
  171. # Check whether this parameter is statically configured (e.g. in configuration.py)
  172. if hasattr(settings, param.name):
  173. self.fields[param.name].disabled = True
  174. self.fields[param.name].help_text = _(
  175. 'This parameter has been defined statically and cannot be modified.'
  176. )
  177. continue
  178. # Set the field's help text
  179. help_text = self.fields[param.name].help_text
  180. if help_text:
  181. help_text += '<br />' # Line break
  182. help_text += _('Current value: <strong>{value}</strong>').format(value=value or '&mdash;')
  183. if value == param.default:
  184. help_text += _(' (default)')
  185. self.fields[param.name].help_text = help_text
  186. def save(self, commit=True):
  187. instance = super().save(commit=False)
  188. # Populate JSON data on the instance
  189. instance.data = self.render_json()
  190. if commit:
  191. instance.save()
  192. return instance
  193. def render_json(self):
  194. json = {}
  195. # Iterate through each field and populate non-empty values
  196. for field_name in self.declared_fields:
  197. if field_name in self.cleaned_data and self.cleaned_data[field_name] not in EMPTY_VALUES:
  198. json[field_name] = self.cleaned_data[field_name]
  199. return json