customfields.py 4.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. from django.contrib.contenttypes.models import ContentType
  2. from django.utils.translation import gettext as _
  3. from drf_spectacular.utils import extend_schema_field
  4. from drf_spectacular.types import OpenApiTypes
  5. from rest_framework.fields import Field
  6. from rest_framework.serializers import ValidationError
  7. from extras.choices import CustomFieldTypeChoices
  8. from extras.models import CustomField
  9. from netbox.constants import NESTED_SERIALIZER_PREFIX
  10. from utilities.api import get_serializer_for_model
  11. #
  12. # Custom fields
  13. #
  14. class CustomFieldDefaultValues:
  15. """
  16. Return a dictionary of all CustomFields assigned to the parent model and their default values.
  17. """
  18. requires_context = True
  19. def __call__(self, serializer_field):
  20. self.model = serializer_field.parent.Meta.model
  21. # Retrieve the CustomFields for the parent model
  22. content_type = ContentType.objects.get_for_model(self.model)
  23. fields = CustomField.objects.filter(content_types=content_type)
  24. # Populate the default value for each CustomField
  25. value = {}
  26. for field in fields:
  27. if field.default is not None:
  28. value[field.name] = field.default
  29. else:
  30. value[field.name] = None
  31. return value
  32. @extend_schema_field(OpenApiTypes.OBJECT)
  33. class CustomFieldsDataField(Field):
  34. def _get_custom_fields(self):
  35. """
  36. Cache CustomFields assigned to this model to avoid redundant database queries
  37. """
  38. if not hasattr(self, '_custom_fields'):
  39. content_type = ContentType.objects.get_for_model(self.parent.Meta.model)
  40. self._custom_fields = CustomField.objects.filter(content_types=content_type)
  41. return self._custom_fields
  42. def to_representation(self, obj):
  43. # TODO: Fix circular import
  44. from utilities.api import get_serializer_for_model
  45. data = {}
  46. for cf in self._get_custom_fields():
  47. value = cf.deserialize(obj.get(cf.name))
  48. if value is not None and cf.type == CustomFieldTypeChoices.TYPE_OBJECT:
  49. serializer = get_serializer_for_model(cf.object_type.model_class(), prefix=NESTED_SERIALIZER_PREFIX)
  50. value = serializer(value, context=self.parent.context).data
  51. elif value is not None and cf.type == CustomFieldTypeChoices.TYPE_MULTIOBJECT:
  52. serializer = get_serializer_for_model(cf.object_type.model_class(), prefix=NESTED_SERIALIZER_PREFIX)
  53. value = serializer(value, many=True, context=self.parent.context).data
  54. data[cf.name] = value
  55. return data
  56. def to_internal_value(self, data):
  57. if type(data) is not dict:
  58. raise ValidationError(
  59. "Invalid data format. Custom field data must be passed as a dictionary mapping field names to their "
  60. "values."
  61. )
  62. # Serialize object and multi-object values
  63. for cf in self._get_custom_fields():
  64. if cf.name in data and data[cf.name] not in (None, []) and cf.type in (
  65. CustomFieldTypeChoices.TYPE_OBJECT,
  66. CustomFieldTypeChoices.TYPE_MULTIOBJECT
  67. ):
  68. serializer_class = get_serializer_for_model(
  69. model=cf.object_type.model_class(),
  70. prefix=NESTED_SERIALIZER_PREFIX
  71. )
  72. many = cf.type == CustomFieldTypeChoices.TYPE_MULTIOBJECT
  73. serializer = serializer_class(data=data[cf.name], many=many, context=self.parent.context)
  74. if serializer.is_valid():
  75. data[cf.name] = [obj['id'] for obj in serializer.data] if many else serializer.data['id']
  76. else:
  77. raise ValidationError(_("Unknown related object(s): {name}").format(name=data[cf.name]))
  78. # If updating an existing instance, start with existing custom_field_data
  79. if self.parent.instance:
  80. data = {**self.parent.instance.custom_field_data, **data}
  81. return data