schema.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. import re
  2. import typing
  3. from collections import OrderedDict
  4. from drf_spectacular.extensions import OpenApiSerializerFieldExtension
  5. from drf_spectacular.openapi import AutoSchema
  6. from drf_spectacular.plumbing import (
  7. build_basic_type, build_choice_field, build_media_type_object, build_object_type, get_doc,
  8. )
  9. from drf_spectacular.types import OpenApiTypes
  10. from rest_framework.relations import ManyRelatedField
  11. from netbox.api.fields import ChoiceField, SerializedPKRelatedField
  12. from netbox.api.serializers import WritableNestedSerializer
  13. # see netbox.api.routers.NetBoxRouter
  14. BULK_ACTIONS = ("bulk_destroy", "bulk_partial_update", "bulk_update")
  15. WRITABLE_ACTIONS = ("PATCH", "POST", "PUT")
  16. class FixTimeZoneSerializerField(OpenApiSerializerFieldExtension):
  17. target_class = 'timezone_field.rest_framework.TimeZoneSerializerField'
  18. def map_serializer_field(self, auto_schema, direction):
  19. return build_basic_type(OpenApiTypes.STR)
  20. class ChoiceFieldFix(OpenApiSerializerFieldExtension):
  21. target_class = 'netbox.api.fields.ChoiceField'
  22. def map_serializer_field(self, auto_schema, direction):
  23. build_cf = build_choice_field(self.target)
  24. if direction == 'request':
  25. return build_cf
  26. elif direction == "response":
  27. value = build_cf
  28. label = {**build_basic_type(OpenApiTypes.STR), "enum": list(OrderedDict.fromkeys(self.target.choices.values()))}
  29. return build_object_type(
  30. properties={
  31. "value": value,
  32. "label": label
  33. }
  34. )
  35. class NetBoxAutoSchema(AutoSchema):
  36. """
  37. Overrides to drf_spectacular.openapi.AutoSchema to fix following issues:
  38. 1. bulk serializers cause operation_id conflicts with non-bulk ones
  39. 2. bulk operations should specify a list
  40. 3. bulk operations don't have filter params
  41. 4. bulk operations don't have pagination
  42. 5. bulk delete should specify input
  43. """
  44. writable_serializers = {}
  45. @property
  46. def is_bulk_action(self):
  47. if hasattr(self.view, "action") and self.view.action in BULK_ACTIONS:
  48. return True
  49. else:
  50. return False
  51. def get_operation_id(self):
  52. """
  53. bulk serializers cause operation_id conflicts with non-bulk ones
  54. bulk operations cause id conflicts in spectacular resulting in numerous:
  55. Warning: operationId "xxx" has collisions [xxx]. "resolving with numeral suffixes"
  56. code is modified from drf_spectacular.openapi.AutoSchema.get_operation_id
  57. """
  58. if self.is_bulk_action:
  59. tokenized_path = self._tokenize_path()
  60. # replace dashes as they can be problematic later in code generation
  61. tokenized_path = [t.replace('-', '_') for t in tokenized_path]
  62. if self.method == 'GET' and self._is_list_view():
  63. # this shouldn't happen, but keeping it here to follow base code
  64. action = 'list'
  65. else:
  66. # action = self.method_mapping[self.method.lower()]
  67. # use bulk name so partial_update -> bulk_partial_update
  68. action = self.view.action.lower()
  69. if not tokenized_path:
  70. tokenized_path.append('root')
  71. if re.search(r'<drf_format_suffix\w*:\w+>', self.path_regex):
  72. tokenized_path.append('formatted')
  73. return '_'.join(tokenized_path + [action])
  74. # if not bulk - just return normal id
  75. return super().get_operation_id()
  76. def get_request_serializer(self) -> typing.Any:
  77. # bulk operations should specify a list
  78. serializer = super().get_request_serializer()
  79. if self.is_bulk_action:
  80. return type(serializer)(many=True)
  81. # handle mapping for Writable serializers - adapted from dansheps original code
  82. # for drf-yasg
  83. if serializer is not None and self.method in WRITABLE_ACTIONS:
  84. writable_class = self.get_writable_class(serializer)
  85. if writable_class is not None:
  86. if hasattr(serializer, "child"):
  87. child_serializer = self.get_writable_class(serializer.child)
  88. serializer = writable_class(context=serializer.context, child=child_serializer)
  89. else:
  90. serializer = writable_class(context=serializer.context)
  91. return serializer
  92. def get_response_serializers(self) -> typing.Any:
  93. # bulk operations should specify a list
  94. response_serializers = super().get_response_serializers()
  95. if self.is_bulk_action:
  96. return type(response_serializers)(many=True)
  97. return response_serializers
  98. def get_serializer_ref_name(self, serializer):
  99. # from drf-yasg.utils
  100. """Get serializer's ref_name (or None for ModelSerializer if it is named 'NestedSerializer')
  101. :param serializer: Serializer instance
  102. :return: Serializer's ``ref_name`` or ``None`` for inline serializer
  103. :rtype: str or None
  104. """
  105. serializer_meta = getattr(serializer, 'Meta', None)
  106. serializer_name = type(serializer).__name__
  107. if hasattr(serializer_meta, 'ref_name'):
  108. ref_name = serializer_meta.ref_name
  109. elif serializer_name == 'NestedSerializer' and isinstance(serializer, serializers.ModelSerializer):
  110. ref_name = None
  111. else:
  112. ref_name = serializer_name
  113. if ref_name.endswith('Serializer'):
  114. ref_name = ref_name[: -len('Serializer')]
  115. return ref_name
  116. def get_writable_class(self, serializer):
  117. properties = {}
  118. fields = {} if hasattr(serializer, 'child') else serializer.fields
  119. remove_fields = []
  120. for child_name, child in fields.items():
  121. # read_only fields don't need to be in writable (write only) serializers
  122. if 'read_only' in dir(child) and child.read_only:
  123. remove_fields.append(child_name)
  124. if isinstance(child, (ChoiceField, WritableNestedSerializer)):
  125. properties[child_name] = None
  126. elif isinstance(child, ManyRelatedField) and isinstance(child.child_relation, SerializedPKRelatedField):
  127. properties[child_name] = None
  128. if not properties:
  129. return None
  130. if type(serializer) not in self.writable_serializers:
  131. writable_name = 'Writable' + type(serializer).__name__
  132. meta_class = getattr(type(serializer), 'Meta', None)
  133. if meta_class:
  134. ref_name = 'Writable' + self.get_serializer_ref_name(serializer)
  135. # remove read_only fields from write-only serializers
  136. fields = list(meta_class.fields)
  137. for field in remove_fields:
  138. fields.remove(field)
  139. writable_meta = type('Meta', (meta_class,), {'ref_name': ref_name, 'fields': fields})
  140. properties['Meta'] = writable_meta
  141. self.writable_serializers[type(serializer)] = type(writable_name, (type(serializer),), properties)
  142. writable_class = self.writable_serializers[type(serializer)]
  143. return writable_class
  144. def get_filter_backends(self):
  145. # bulk operations don't have filter params
  146. if self.is_bulk_action:
  147. return []
  148. return super().get_filter_backends()
  149. def _get_paginator(self):
  150. # bulk operations don't have pagination
  151. if self.is_bulk_action:
  152. return None
  153. return super()._get_paginator()
  154. def _get_request_body(self, direction='request'):
  155. # bulk delete should specify input
  156. if (not self.is_bulk_action) or (self.method != 'DELETE'):
  157. return super()._get_request_body(direction)
  158. # rest from drf_spectacular.openapi.AutoSchema._get_request_body
  159. # but remove the unsafe method check
  160. request_serializer = self.get_request_serializer()
  161. if isinstance(request_serializer, dict):
  162. content = []
  163. request_body_required = True
  164. for media_type, serializer in request_serializer.items():
  165. schema, partial_request_body_required = self._get_request_for_media_type(serializer, direction)
  166. examples = self._get_examples(serializer, direction, media_type)
  167. if schema is None:
  168. continue
  169. content.append((media_type, schema, examples))
  170. request_body_required &= partial_request_body_required
  171. else:
  172. schema, request_body_required = self._get_request_for_media_type(request_serializer, direction)
  173. if schema is None:
  174. return None
  175. content = [
  176. (media_type, schema, self._get_examples(request_serializer, direction, media_type))
  177. for media_type in self.map_parsers()
  178. ]
  179. request_body = {
  180. 'content': {
  181. media_type: build_media_type_object(schema, examples) for media_type, schema, examples in content
  182. }
  183. }
  184. if request_body_required:
  185. request_body['required'] = request_body_required
  186. return request_body
  187. def get_description(self):
  188. """
  189. Return a string description for the ViewSet.
  190. """
  191. # If a docstring is provided, use it.
  192. if self.view.__doc__:
  193. return get_doc(self.view.__class__)
  194. # When the action method is decorated with @action, use the docstring of the method.
  195. action_or_method = getattr(self.view, getattr(self.view, 'action', self.method.lower()), None)
  196. if action_or_method and action_or_method.__doc__:
  197. return get_doc(action_or_method)
  198. # Else, generate a description from the class name.
  199. return self._generate_description()
  200. def _generate_description(self):
  201. """
  202. Generate a docstring for the method. It also takes into account whether the method is for list or detail.
  203. """
  204. model_name = self.view.queryset.model._meta.verbose_name
  205. # Determine if the method is for list or detail.
  206. if '{id}' in self.path:
  207. return f"{self.method.capitalize()} a {model_name} object."
  208. return f"{self.method.capitalize()} a list of {model_name} objects."