|
@@ -1,14 +1,8 @@
|
|
|
-import csv
|
|
|
|
|
-import json
|
|
|
|
|
import re
|
|
import re
|
|
|
-from io import StringIO
|
|
|
|
|
|
|
|
|
|
-import yaml
|
|
|
|
|
from django import forms
|
|
from django import forms
|
|
|
from django.utils.translation import gettext as _
|
|
from django.utils.translation import gettext as _
|
|
|
|
|
|
|
|
-from utilities.choices import ImportFormatChoices
|
|
|
|
|
-from utilities.forms.utils import parse_csv
|
|
|
|
|
from .widgets import APISelect, APISelectMultiple, ClearableFileInput
|
|
from .widgets import APISelect, APISelectMultiple, ClearableFileInput
|
|
|
|
|
|
|
|
__all__ = (
|
|
__all__ = (
|
|
@@ -18,7 +12,6 @@ __all__ = (
|
|
|
'ConfirmationForm',
|
|
'ConfirmationForm',
|
|
|
'CSVModelForm',
|
|
'CSVModelForm',
|
|
|
'FilterForm',
|
|
'FilterForm',
|
|
|
- 'ImportForm',
|
|
|
|
|
'ReturnURLForm',
|
|
'ReturnURLForm',
|
|
|
'TableConfigForm',
|
|
'TableConfigForm',
|
|
|
)
|
|
)
|
|
@@ -157,126 +150,6 @@ class CSVModelForm(forms.ModelForm):
|
|
|
del self.fields[field]
|
|
del self.fields[field]
|
|
|
|
|
|
|
|
|
|
|
|
|
-class ImportForm(BootstrapMixin, forms.Form):
|
|
|
|
|
- data = forms.CharField(
|
|
|
|
|
- required=False,
|
|
|
|
|
- widget=forms.Textarea(attrs={'class': 'font-monospace'}),
|
|
|
|
|
- help_text=_("Enter object data in CSV, JSON or YAML format.")
|
|
|
|
|
- )
|
|
|
|
|
- data_file = forms.FileField(
|
|
|
|
|
- label="Data file",
|
|
|
|
|
- required=False
|
|
|
|
|
- )
|
|
|
|
|
- format = forms.ChoiceField(
|
|
|
|
|
- choices=ImportFormatChoices,
|
|
|
|
|
- initial=ImportFormatChoices.AUTO
|
|
|
|
|
- )
|
|
|
|
|
-
|
|
|
|
|
- data_field = 'data'
|
|
|
|
|
-
|
|
|
|
|
- def clean(self):
|
|
|
|
|
- super().clean()
|
|
|
|
|
-
|
|
|
|
|
- # Determine whether we're reading from form data or an uploaded file
|
|
|
|
|
- if self.cleaned_data['data'] and self.cleaned_data['data_file']:
|
|
|
|
|
- raise forms.ValidationError("Form data must be empty when uploading a file.")
|
|
|
|
|
- if 'data_file' in self.files:
|
|
|
|
|
- self.data_field = 'data_file'
|
|
|
|
|
- file = self.files.get('data_file')
|
|
|
|
|
- data = file.read().decode('utf-8-sig')
|
|
|
|
|
- else:
|
|
|
|
|
- data = self.cleaned_data['data']
|
|
|
|
|
-
|
|
|
|
|
- # Determine the data format
|
|
|
|
|
- if self.cleaned_data['format'] == ImportFormatChoices.AUTO:
|
|
|
|
|
- format = self._detect_format(data)
|
|
|
|
|
- else:
|
|
|
|
|
- format = self.cleaned_data['format']
|
|
|
|
|
-
|
|
|
|
|
- # Process data according to the selected format
|
|
|
|
|
- if format == ImportFormatChoices.CSV:
|
|
|
|
|
- self.cleaned_data['data'] = self._clean_csv(data)
|
|
|
|
|
- elif format == ImportFormatChoices.JSON:
|
|
|
|
|
- self.cleaned_data['data'] = self._clean_json(data)
|
|
|
|
|
- elif format == ImportFormatChoices.YAML:
|
|
|
|
|
- self.cleaned_data['data'] = self._clean_yaml(data)
|
|
|
|
|
- else:
|
|
|
|
|
- raise forms.ValidationError(f"Unknown data format: {format}")
|
|
|
|
|
-
|
|
|
|
|
- def _detect_format(self, data):
|
|
|
|
|
- """
|
|
|
|
|
- Attempt to automatically detect the format (CSV, JSON, or YAML) of the given data, or raise
|
|
|
|
|
- a ValidationError.
|
|
|
|
|
- """
|
|
|
|
|
- try:
|
|
|
|
|
- if data[0] in ('{', '['):
|
|
|
|
|
- return ImportFormatChoices.JSON
|
|
|
|
|
- if data.startswith('---') or data.startswith('- '):
|
|
|
|
|
- return ImportFormatChoices.YAML
|
|
|
|
|
- if ',' in data.split('\n', 1)[0]:
|
|
|
|
|
- return ImportFormatChoices.CSV
|
|
|
|
|
- except IndexError:
|
|
|
|
|
- pass
|
|
|
|
|
- raise forms.ValidationError({
|
|
|
|
|
- 'format': _('Unable to detect data format. Please specify.')
|
|
|
|
|
- })
|
|
|
|
|
-
|
|
|
|
|
- def _clean_csv(self, data):
|
|
|
|
|
- """
|
|
|
|
|
- Clean CSV-formatted data. The first row will be treated as column headers.
|
|
|
|
|
- """
|
|
|
|
|
- stream = StringIO(data.strip())
|
|
|
|
|
- reader = csv.reader(stream)
|
|
|
|
|
- headers, records = parse_csv(reader)
|
|
|
|
|
-
|
|
|
|
|
- # Set CSV headers for reference by the model form
|
|
|
|
|
- self._csv_headers = headers
|
|
|
|
|
-
|
|
|
|
|
- return records
|
|
|
|
|
-
|
|
|
|
|
- def _clean_json(self, data):
|
|
|
|
|
- """
|
|
|
|
|
- Clean JSON-formatted data. If only a single object is defined, it will be encapsulated as a list.
|
|
|
|
|
- """
|
|
|
|
|
- try:
|
|
|
|
|
- data = json.loads(data)
|
|
|
|
|
- # Accommodate for users entering single objects
|
|
|
|
|
- if type(data) is not list:
|
|
|
|
|
- data = [data]
|
|
|
|
|
- return data
|
|
|
|
|
- except json.decoder.JSONDecodeError as err:
|
|
|
|
|
- raise forms.ValidationError({
|
|
|
|
|
- self.data_field: f"Invalid JSON data: {err}"
|
|
|
|
|
- })
|
|
|
|
|
-
|
|
|
|
|
- def _clean_yaml(self, data):
|
|
|
|
|
- """
|
|
|
|
|
- Clean YAML-formatted data. Data must be either
|
|
|
|
|
- a) A single document comprising a list of dictionaries (each representing an object), or
|
|
|
|
|
- b) Multiple documents, separated with the '---' token
|
|
|
|
|
- """
|
|
|
|
|
- records = []
|
|
|
|
|
- try:
|
|
|
|
|
- for data in yaml.load_all(data, Loader=yaml.SafeLoader):
|
|
|
|
|
- if type(data) == list:
|
|
|
|
|
- records.extend(data)
|
|
|
|
|
- elif type(data) == dict:
|
|
|
|
|
- records.append(data)
|
|
|
|
|
- else:
|
|
|
|
|
- raise forms.ValidationError({
|
|
|
|
|
- self.data_field: _(
|
|
|
|
|
- "Invalid YAML data. Data must be in the form of multiple documents, or a single document "
|
|
|
|
|
- "comprising a list of dictionaries."
|
|
|
|
|
- )
|
|
|
|
|
- })
|
|
|
|
|
- except yaml.error.YAMLError as err:
|
|
|
|
|
- raise forms.ValidationError({
|
|
|
|
|
- self.data_field: f"Invalid YAML data: {err}"
|
|
|
|
|
- })
|
|
|
|
|
-
|
|
|
|
|
- return records
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
class FilterForm(BootstrapMixin, forms.Form):
|
|
class FilterForm(BootstrapMixin, forms.Form):
|
|
|
"""
|
|
"""
|
|
|
Base Form class for FilterSet forms.
|
|
Base Form class for FilterSet forms.
|