schema.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. import re
  2. import typing
  3. from collections import OrderedDict
  4. from drf_spectacular.extensions import OpenApiSerializerFieldExtension
  5. from drf_spectacular.openapi import AutoSchema
  6. from drf_spectacular.plumbing import (
  7. build_basic_type, build_choice_field, build_media_type_object, build_object_type, get_doc,
  8. )
  9. from drf_spectacular.types import OpenApiTypes
  10. from rest_framework import serializers
  11. from rest_framework.relations import ManyRelatedField
  12. from netbox.api.fields import ChoiceField, SerializedPKRelatedField
  13. from netbox.api.serializers import WritableNestedSerializer
  14. # see netbox.api.routers.NetBoxRouter
  15. BULK_ACTIONS = ("bulk_destroy", "bulk_partial_update", "bulk_update")
  16. WRITABLE_ACTIONS = ("PATCH", "POST", "PUT")
  17. class FixTimeZoneSerializerField(OpenApiSerializerFieldExtension):
  18. target_class = 'timezone_field.rest_framework.TimeZoneSerializerField'
  19. def map_serializer_field(self, auto_schema, direction):
  20. return build_basic_type(OpenApiTypes.STR)
  21. class ChoiceFieldFix(OpenApiSerializerFieldExtension):
  22. target_class = 'netbox.api.fields.ChoiceField'
  23. def map_serializer_field(self, auto_schema, direction):
  24. build_cf = build_choice_field(self.target)
  25. if direction == 'request':
  26. return build_cf
  27. elif direction == "response":
  28. value = build_cf
  29. label = {**build_basic_type(OpenApiTypes.STR), "enum": list(OrderedDict.fromkeys(self.target.choices.values()))}
  30. return build_object_type(
  31. properties={
  32. "value": value,
  33. "label": label
  34. }
  35. )
  36. class NetBoxAutoSchema(AutoSchema):
  37. """
  38. Overrides to drf_spectacular.openapi.AutoSchema to fix following issues:
  39. 1. bulk serializers cause operation_id conflicts with non-bulk ones
  40. 2. bulk operations should specify a list
  41. 3. bulk operations don't have filter params
  42. 4. bulk operations don't have pagination
  43. 5. bulk delete should specify input
  44. """
  45. writable_serializers = {}
  46. @property
  47. def is_bulk_action(self):
  48. if hasattr(self.view, "action") and self.view.action in BULK_ACTIONS:
  49. return True
  50. else:
  51. return False
  52. def get_operation_id(self):
  53. """
  54. bulk serializers cause operation_id conflicts with non-bulk ones
  55. bulk operations cause id conflicts in spectacular resulting in numerous:
  56. Warning: operationId "xxx" has collisions [xxx]. "resolving with numeral suffixes"
  57. code is modified from drf_spectacular.openapi.AutoSchema.get_operation_id
  58. """
  59. if self.is_bulk_action:
  60. tokenized_path = self._tokenize_path()
  61. # replace dashes as they can be problematic later in code generation
  62. tokenized_path = [t.replace('-', '_') for t in tokenized_path]
  63. if self.method == 'GET' and self._is_list_view():
  64. # this shouldn't happen, but keeping it here to follow base code
  65. action = 'list'
  66. else:
  67. # action = self.method_mapping[self.method.lower()]
  68. # use bulk name so partial_update -> bulk_partial_update
  69. action = self.view.action.lower()
  70. if not tokenized_path:
  71. tokenized_path.append('root')
  72. if re.search(r'<drf_format_suffix\w*:\w+>', self.path_regex):
  73. tokenized_path.append('formatted')
  74. return '_'.join(tokenized_path + [action])
  75. # if not bulk - just return normal id
  76. return super().get_operation_id()
  77. def get_request_serializer(self) -> typing.Any:
  78. # bulk operations should specify a list
  79. serializer = super().get_request_serializer()
  80. if self.is_bulk_action:
  81. return type(serializer)(many=True)
  82. # handle mapping for Writable serializers - adapted from dansheps original code
  83. # for drf-yasg
  84. if serializer is not None and self.method in WRITABLE_ACTIONS:
  85. writable_class = self.get_writable_class(serializer)
  86. if writable_class is not None:
  87. if hasattr(serializer, "child"):
  88. child_serializer = self.get_writable_class(serializer.child)
  89. serializer = writable_class(context=serializer.context, child=child_serializer)
  90. else:
  91. serializer = writable_class(context=serializer.context)
  92. return serializer
  93. def get_response_serializers(self) -> typing.Any:
  94. # bulk operations should specify a list
  95. response_serializers = super().get_response_serializers()
  96. if self.is_bulk_action:
  97. return type(response_serializers)(many=True)
  98. return response_serializers
  99. def get_serializer_ref_name(self, serializer):
  100. # from drf-yasg.utils
  101. """Get serializer's ref_name (or None for ModelSerializer if it is named 'NestedSerializer')
  102. :param serializer: Serializer instance
  103. :return: Serializer's ``ref_name`` or ``None`` for inline serializer
  104. :rtype: str or None
  105. """
  106. serializer_meta = getattr(serializer, 'Meta', None)
  107. serializer_name = type(serializer).__name__
  108. if hasattr(serializer_meta, 'ref_name'):
  109. ref_name = serializer_meta.ref_name
  110. elif serializer_name == 'NestedSerializer' and isinstance(serializer, serializers.ModelSerializer):
  111. ref_name = None
  112. else:
  113. ref_name = serializer_name
  114. if ref_name.endswith('Serializer'):
  115. ref_name = ref_name[: -len('Serializer')]
  116. return ref_name
  117. def get_writable_class(self, serializer):
  118. properties = {}
  119. fields = {} if hasattr(serializer, 'child') else serializer.fields
  120. remove_fields = []
  121. for child_name, child in fields.items():
  122. # read_only fields don't need to be in writable (write only) serializers
  123. if 'read_only' in dir(child) and child.read_only:
  124. remove_fields.append(child_name)
  125. if isinstance(child, (ChoiceField, WritableNestedSerializer)):
  126. properties[child_name] = None
  127. elif isinstance(child, ManyRelatedField) and isinstance(child.child_relation, SerializedPKRelatedField):
  128. properties[child_name] = None
  129. if not properties:
  130. return None
  131. if type(serializer) not in self.writable_serializers:
  132. writable_name = 'Writable' + type(serializer).__name__
  133. meta_class = getattr(type(serializer), 'Meta', None)
  134. if meta_class:
  135. ref_name = 'Writable' + self.get_serializer_ref_name(serializer)
  136. # remove read_only fields from write-only serializers
  137. fields = list(meta_class.fields)
  138. for field in remove_fields:
  139. fields.remove(field)
  140. writable_meta = type('Meta', (meta_class,), {'ref_name': ref_name, 'fields': fields})
  141. properties['Meta'] = writable_meta
  142. self.writable_serializers[type(serializer)] = type(writable_name, (type(serializer),), properties)
  143. writable_class = self.writable_serializers[type(serializer)]
  144. return writable_class
  145. def get_filter_backends(self):
  146. # bulk operations don't have filter params
  147. if self.is_bulk_action:
  148. return []
  149. return super().get_filter_backends()
  150. def _get_paginator(self):
  151. # bulk operations don't have pagination
  152. if self.is_bulk_action:
  153. return None
  154. return super()._get_paginator()
  155. def _get_request_body(self, direction='request'):
  156. # bulk delete should specify input
  157. if (not self.is_bulk_action) or (self.method != 'DELETE'):
  158. return super()._get_request_body(direction)
  159. # rest from drf_spectacular.openapi.AutoSchema._get_request_body
  160. # but remove the unsafe method check
  161. request_serializer = self.get_request_serializer()
  162. if isinstance(request_serializer, dict):
  163. content = []
  164. request_body_required = True
  165. for media_type, serializer in request_serializer.items():
  166. schema, partial_request_body_required = self._get_request_for_media_type(serializer, direction)
  167. examples = self._get_examples(serializer, direction, media_type)
  168. if schema is None:
  169. continue
  170. content.append((media_type, schema, examples))
  171. request_body_required &= partial_request_body_required
  172. else:
  173. schema, request_body_required = self._get_request_for_media_type(request_serializer, direction)
  174. if schema is None:
  175. return None
  176. content = [
  177. (media_type, schema, self._get_examples(request_serializer, direction, media_type))
  178. for media_type in self.map_parsers()
  179. ]
  180. request_body = {
  181. 'content': {
  182. media_type: build_media_type_object(schema, examples) for media_type, schema, examples in content
  183. }
  184. }
  185. if request_body_required:
  186. request_body['required'] = request_body_required
  187. return request_body
  188. def get_description(self):
  189. """
  190. Return a string description for the ViewSet.
  191. """
  192. # If a docstring is provided, use it.
  193. if self.view.__doc__:
  194. return get_doc(self.view.__class__)
  195. # When the action method is decorated with @action, use the docstring of the method.
  196. action_or_method = getattr(self.view, getattr(self.view, 'action', self.method.lower()), None)
  197. if action_or_method and action_or_method.__doc__:
  198. return get_doc(action_or_method)
  199. # Else, generate a description from the class name.
  200. return self._generate_description()
  201. def _generate_description(self):
  202. """
  203. Generate a docstring for the method. It also takes into account whether the method is for list or detail.
  204. """
  205. model_name = self.view.queryset.model._meta.verbose_name
  206. # Determine if the method is for list or detail.
  207. if '{id}' in self.path:
  208. return f"{self.method.capitalize()} a {model_name} object."
  209. return f"{self.method.capitalize()} a list of {model_name} objects."