api.py 12 KB


  1. from collections import OrderedDict
  2. import pytz
  3. from django.conf import settings
  4. from django.contrib.contenttypes.models import ContentType
  5. from django.core.exceptions import FieldError, MultipleObjectsReturned, ObjectDoesNotExist
  6. from django.db.models import ManyToManyField, ProtectedError
  7. from django.http import Http404
  8. from django.urls import reverse
  9. from rest_framework.exceptions import APIException
  10. from rest_framework.permissions import BasePermission
  11. from rest_framework.relations import PrimaryKeyRelatedField, RelatedField
  12. from rest_framework.response import Response
  13. from rest_framework.serializers import Field, ModelSerializer, ValidationError
  14. from rest_framework.viewsets import ModelViewSet as _ModelViewSet, ViewSet
  15. from .utils import dict_to_filter_params, dynamic_import
  16. class ServiceUnavailable(APIException):
  17. status_code = 503
  18. default_detail = "Service temporarily unavailable, please try again later."
  19. class SerializerNotFound(Exception):
  20. pass
  21. def get_serializer_for_model(model, prefix=''):
  22. """
  23. Dynamically resolve and return the appropriate serializer for a model.
  24. """
  25. app_name, model_name = model._meta.label.split('.')
  26. serializer_name = '{}.api.serializers.{}{}Serializer'.format(
  27. app_name, prefix, model_name
  28. )
  29. try:
  30. return dynamic_import(serializer_name)
  31. except AttributeError:
  32. raise SerializerNotFound(
  33. "Could not determine serializer for {}.{} with prefix '{}'".format(app_name, model_name, prefix)
  34. )
  35. def is_api_request(request):
  36. """
  37. Return True of the request is being made via the REST API.
  38. """
  39. api_path = reverse('api-root')
  40. return request.path_info.startswith(api_path)
  41. #
  42. # Authentication
  43. #
  44. class IsAuthenticatedOrLoginNotRequired(BasePermission):
  45. """
  46. Returns True if the user is authenticated or LOGIN_REQUIRED is False.
  47. """
  48. def has_permission(self, request, view):
  49. if not settings.LOGIN_REQUIRED:
  50. return True
  51. return request.user.is_authenticated
  52. #
  53. # Fields
  54. #
  55. class ChoiceField(Field):
  56. """
  57. Represent a ChoiceField as {'value': <DB value>, 'label': <string>}. Accepts a single value on write.
  58. :param choices: An iterable of choices in the form (value, key).
  59. :param allow_blank: Allow blank values in addition to the listed choices.
  60. """
  61. def __init__(self, choices, allow_blank=False, **kwargs):
  62. self.choiceset = choices
  63. self.allow_blank = allow_blank
  64. self._choices = dict()
  65. # Unpack grouped choices
  66. for k, v in choices:
  67. if type(v) in [list, tuple]:
  68. for k2, v2 in v:
  69. self._choices[k2] = v2
  70. else:
  71. self._choices[k] = v
  72. super().__init__(**kwargs)
  73. def validate_empty_values(self, data):
  74. # Convert null to an empty string unless allow_null == True
  75. if data is None:
  76. if self.allow_null:
  77. return True, None
  78. else:
  79. data = ''
  80. return super().validate_empty_values(data)
  81. def to_representation(self, obj):
  82. if obj is '':
  83. return None
  84. data = OrderedDict([
  85. ('value', obj),
  86. ('label', self._choices[obj])
  87. ])
  88. # TODO: Remove in v2.8
  89. # Include legacy numeric ID (where applicable)
  90. if hasattr(self.choiceset, 'LEGACY_MAP') and obj in self.choiceset.LEGACY_MAP:
  91. data['id'] = self.choiceset.LEGACY_MAP.get(obj)
  92. return data
  93. def to_internal_value(self, data):
  94. if data is '':
  95. if self.allow_blank:
  96. return data
  97. raise ValidationError("This field may not be blank.")
  98. # Provide an explicit error message if the request is trying to write a dict or list
  99. if isinstance(data, (dict, list)):
  100. raise ValidationError('Value must be passed directly (e.g. "foo": 123); do not use a dictionary or list.')
  101. # Check for string representations of boolean/integer values
  102. if hasattr(data, 'lower'):
  103. if data.lower() == 'true':
  104. data = True
  105. elif data.lower() == 'false':
  106. data = False
  107. else:
  108. try:
  109. data = int(data)
  110. except ValueError:
  111. pass
  112. try:
  113. if data in self._choices:
  114. return data
  115. # Check if data is a legacy numeric ID
  116. slug = self.choiceset.id_to_slug(data)
  117. if slug is not None:
  118. return slug
  119. except TypeError: # Input is an unhashable type
  120. pass
  121. raise ValidationError("{} is not a valid choice.".format(data))
  122. @property
  123. def choices(self):
  124. return self._choices
  125. class ContentTypeField(RelatedField):
  126. """
  127. Represent a ContentType as '<app_label>.<model>'
  128. """
  129. default_error_messages = {
  130. "does_not_exist": "Invalid content type: {content_type}",
  131. "invalid": "Invalid value. Specify a content type as '<app_label>.<model_name>'.",
  132. }
  133. def to_internal_value(self, data):
  134. try:
  135. app_label, model = data.split('.')
  136. return ContentType.objects.get_by_natural_key(app_label=app_label, model=model)
  137. except ObjectDoesNotExist:
  138. self.fail('does_not_exist', content_type=data)
  139. except (TypeError, ValueError):
  140. self.fail('invalid')
  141. def to_representation(self, obj):
  142. return "{}.{}".format(obj.app_label, obj.model)
  143. class TimeZoneField(Field):
  144. """
  145. Represent a pytz time zone.
  146. """
  147. def to_representation(self, obj):
  148. return obj.zone if obj else None
  149. def to_internal_value(self, data):
  150. if not data:
  151. return ""
  152. if data not in pytz.common_timezones:
  153. raise ValidationError('Unknown time zone "{}" (see pytz.common_timezones for all options)'.format(data))
  154. return pytz.timezone(data)
  155. class SerializedPKRelatedField(PrimaryKeyRelatedField):
  156. """
  157. Extends PrimaryKeyRelatedField to return a serialized object on read. This is useful for representing related
  158. objects in a ManyToManyField while still allowing a set of primary keys to be written.
  159. """
  160. def __init__(self, serializer, **kwargs):
  161. self.serializer = serializer
  162. self.pk_field = kwargs.pop('pk_field', None)
  163. super().__init__(**kwargs)
  164. def to_representation(self, value):
  165. return self.serializer(value, context={'request': self.context['request']}).data
  166. #
  167. # Serializers
  168. #
  169. # TODO: We should probably take a fresh look at exactly what we're doing with this. There might be a more elegant
  170. # way to enforce model validation on the serializer.
  171. class ValidatedModelSerializer(ModelSerializer):
  172. """
  173. Extends the built-in ModelSerializer to enforce calling clean() on the associated model during validation.
  174. """
  175. def validate(self, data):
  176. # Remove custom fields data and tags (if any) prior to model validation
  177. attrs = data.copy()
  178. attrs.pop('custom_fields', None)
  179. attrs.pop('tags', None)
  180. # Skip ManyToManyFields
  181. for field in self.Meta.model._meta.get_fields():
  182. if isinstance(field, ManyToManyField):
  183. attrs.pop(field.name, None)
  184. # Run clean() on an instance of the model
  185. if self.instance is None:
  186. instance = self.Meta.model(**attrs)
  187. else:
  188. instance = self.instance
  189. for k, v in attrs.items():
  190. setattr(instance, k, v)
  191. instance.clean()
  192. instance.validate_unique()
  193. return data
  194. class WritableNestedSerializer(ModelSerializer):
  195. """
  196. Returns a nested representation of an object on read, but accepts only a primary key on write.
  197. """
  198. def to_internal_value(self, data):
  199. if data is None:
  200. return None
  201. # Dictionary of related object attributes
  202. if isinstance(data, dict):
  203. params = dict_to_filter_params(data)
  204. try:
  205. return self.Meta.model.objects.get(**params)
  206. except ObjectDoesNotExist:
  207. raise ValidationError(
  208. "Related object not found using the provided attributes: {}".format(params)
  209. )
  210. except MultipleObjectsReturned:
  211. raise ValidationError(
  212. "Multiple objects match the provided attributes: {}".format(params)
  213. )
  214. except FieldError as e:
  215. raise ValidationError(e)
  216. # Integer PK of related object
  217. if isinstance(data, int):
  218. pk = data
  219. else:
  220. try:
  221. # PK might have been mistakenly passed as a string
  222. pk = int(data)
  223. except (TypeError, ValueError):
  224. raise ValidationError(
  225. "Related objects must be referenced by numeric ID or by dictionary of attributes. Received an "
  226. "unrecognized value: {}".format(data)
  227. )
  228. # Look up object by PK
  229. try:
  230. return self.Meta.model.objects.get(pk=int(data))
  231. except ObjectDoesNotExist:
  232. raise ValidationError(
  233. "Related object not found using the provided numeric ID: {}".format(pk)
  234. )
  235. #
  236. # Viewsets
  237. #
  238. class ModelViewSet(_ModelViewSet):
  239. """
  240. Accept either a single object or a list of objects to create.
  241. """
  242. def get_serializer(self, *args, **kwargs):
  243. # If a list of objects has been provided, initialize the serializer with many=True
  244. if isinstance(kwargs.get('data', {}), list):
  245. kwargs['many'] = True
  246. return super().get_serializer(*args, **kwargs)
  247. def get_serializer_class(self):
  248. # If 'brief' has been passed as a query param, find and return the nested serializer for this model, if one
  249. # exists
  250. request = self.get_serializer_context()['request']
  251. if request.query_params.get('brief', False):
  252. try:
  253. return get_serializer_for_model(self.queryset.model, prefix='Nested')
  254. except SerializerNotFound:
  255. pass
  256. # Fall back to the hard-coded serializer class
  257. return self.serializer_class
  258. def dispatch(self, request, *args, **kwargs):
  259. try:
  260. return super().dispatch(request, *args, **kwargs)
  261. except ProtectedError as e:
  262. models = ['{} ({})'.format(o, o._meta) for o in e.protected_objects.all()]
  263. msg = 'Unable to delete object. The following dependent objects were found: {}'.format(', '.join(models))
  264. return self.finalize_response(
  265. request,
  266. Response({'detail': msg}, status=409),
  267. *args,
  268. **kwargs
  269. )
  270. def list(self, *args, **kwargs):
  271. """
  272. Call to super to allow for caching
  273. """
  274. return super().list(*args, **kwargs)
  275. def retrieve(self, *args, **kwargs):
  276. """
  277. Call to super to allow for caching
  278. """
  279. return super().retrieve(*args, **kwargs)
  280. class FieldChoicesViewSet(ViewSet):
  281. """
  282. Expose the built-in numeric values which represent static choices for a model's field.
  283. """
  284. permission_classes = [IsAuthenticatedOrLoginNotRequired]
  285. fields = []
  286. def __init__(self, *args, **kwargs):
  287. super().__init__(*args, **kwargs)
  288. # Compile a dict of all fields in this view
  289. self._fields = OrderedDict()
  290. for serializer_class, field_list in self.fields:
  291. for field_name in field_list:
  292. model_name = serializer_class.Meta.model._meta.verbose_name
  293. key = ':'.join([model_name.lower().replace(' ', '-'), field_name])
  294. serializer = serializer_class()
  295. choices = []
  296. for k, v in serializer.get_fields()[field_name].choices.items():
  297. if type(v) in [list, tuple]:
  298. for k2, v2 in v:
  299. choices.append({
  300. 'value': k2,
  301. 'label': v2,
  302. })
  303. else:
  304. choices.append({
  305. 'value': k,
  306. 'label': v,
  307. })
  308. self._fields[key] = choices
  309. def list(self, request):
  310. return Response(self._fields)
  311. def retrieve(self, request, pk):
  312. if pk not in self._fields:
  313. raise Http404
  314. return Response(self._fields[pk])
  315. def get_view_name(self):
  316. return "Field Choices"