utils.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. import datetime
  2. import json
  3. from collections import OrderedDict
  4. from django.core.serializers import serialize
  5. from django.db.models import Count, OuterRef, Subquery
  6. from jinja2 import Environment
  7. from dcim.choices import CableLengthUnitChoices
  8. from extras.utils import is_taggable
  9. from utilities.constants import HTTP_REQUEST_META_SAFE_COPY
  10. def csv_format(data):
  11. """
  12. Encapsulate any data which contains a comma within double quotes.
  13. """
  14. csv = []
  15. for value in data:
  16. # Represent None or False with empty string
  17. if value is None or value is False:
  18. csv.append('')
  19. continue
  20. # Convert dates to ISO format
  21. if isinstance(value, (datetime.date, datetime.datetime)):
  22. value = value.isoformat()
  23. # Force conversion to string first so we can check for any commas
  24. if not isinstance(value, str):
  25. value = '{}'.format(value)
  26. # Double-quote the value if it contains a comma or line break
  27. if ',' in value or '\n' in value:
  28. value = value.replace('"', '""') # Escape double-quotes
  29. csv.append('"{}"'.format(value))
  30. else:
  31. csv.append('{}'.format(value))
  32. return ','.join(csv)
  33. def foreground_color(bg_color):
  34. """
  35. Return the ideal foreground color (black or white) for a given background color in hexadecimal RGB format.
  36. """
  37. bg_color = bg_color.strip('#')
  38. r, g, b = [int(bg_color[c:c + 2], 16) for c in (0, 2, 4)]
  39. if r * 0.299 + g * 0.587 + b * 0.114 > 186:
  40. return '000000'
  41. else:
  42. return 'ffffff'
  43. def dynamic_import(name):
  44. """
  45. Dynamically import a class from an absolute path string
  46. """
  47. components = name.split('.')
  48. mod = __import__(components[0])
  49. for comp in components[1:]:
  50. mod = getattr(mod, comp)
  51. return mod
  52. def get_subquery(model, field):
  53. """
  54. Return a Subquery suitable for annotating a child object count.
  55. """
  56. subquery = Subquery(
  57. model.objects.filter(
  58. **{field: OuterRef('pk')}
  59. ).order_by().values(
  60. field
  61. ).annotate(
  62. c=Count('*')
  63. ).values('c')
  64. )
  65. return subquery
  66. def serialize_object(obj, extra=None, exclude=None):
  67. """
  68. Return a generic JSON representation of an object using Django's built-in serializer. (This is used for things like
  69. change logging, not the REST API.) Optionally include a dictionary to supplement the object data. A list of keys
  70. can be provided to exclude them from the returned dictionary. Private fields (prefaced with an underscore) are
  71. implicitly excluded.
  72. """
  73. json_str = serialize('json', [obj])
  74. data = json.loads(json_str)[0]['fields']
  75. # Include any custom fields
  76. if hasattr(obj, 'get_custom_fields'):
  77. data['custom_fields'] = {
  78. field: str(value) for field, value in obj.cf.items()
  79. }
  80. # Include any tags
  81. if is_taggable(obj):
  82. data['tags'] = [tag.name for tag in obj.tags.all()]
  83. # Append any extra data
  84. if extra is not None:
  85. data.update(extra)
  86. # Copy keys to list to avoid 'dictionary changed size during iteration' exception
  87. for key in list(data):
  88. # Private fields shouldn't be logged in the object change
  89. if isinstance(key, str) and key.startswith('_'):
  90. data.pop(key)
  91. # Explicitly excluded keys
  92. if isinstance(exclude, (list, tuple)) and key in exclude:
  93. data.pop(key)
  94. return data
  95. def dict_to_filter_params(d, prefix=''):
  96. """
  97. Translate a dictionary of attributes to a nested set of parameters suitable for QuerySet filtering. For example:
  98. {
  99. "name": "Foo",
  100. "rack": {
  101. "facility_id": "R101"
  102. }
  103. }
  104. Becomes:
  105. {
  106. "name": "Foo",
  107. "rack__facility_id": "R101"
  108. }
  109. And can be employed as filter parameters:
  110. Device.objects.filter(**dict_to_filter(attrs_dict))
  111. """
  112. params = {}
  113. for key, val in d.items():
  114. k = prefix + key
  115. if isinstance(val, dict):
  116. params.update(dict_to_filter_params(val, k + '__'))
  117. else:
  118. params[k] = val
  119. return params
  120. def normalize_querydict(querydict):
  121. """
  122. Convert a QueryDict to a normal, mutable dictionary, preserving list values. For example,
  123. QueryDict('foo=1&bar=2&bar=3&baz=')
  124. becomes:
  125. {'foo': '1', 'bar': ['2', '3'], 'baz': ''}
  126. This function is necessary because QueryDict does not provide any built-in mechanism which preserves multiple
  127. values.
  128. """
  129. return {
  130. k: v if len(v) > 1 else v[0] for k, v in querydict.lists()
  131. }
  132. def deepmerge(original, new):
  133. """
  134. Deep merge two dictionaries (new into original) and return a new dict
  135. """
  136. merged = OrderedDict(original)
  137. for key, val in new.items():
  138. if key in original and isinstance(original[key], dict) and isinstance(val, dict):
  139. merged[key] = deepmerge(original[key], val)
  140. else:
  141. merged[key] = val
  142. return merged
  143. def to_meters(length, unit):
  144. """
  145. Convert the given length to meters.
  146. """
  147. length = int(length)
  148. if length < 0:
  149. raise ValueError("Length must be a positive integer")
  150. valid_units = CableLengthUnitChoices.values()
  151. if unit not in valid_units:
  152. raise ValueError(
  153. "Unknown unit {}. Must be one of the following: {}".format(unit, ', '.join(valid_units))
  154. )
  155. if unit == CableLengthUnitChoices.UNIT_METER:
  156. return length
  157. if unit == CableLengthUnitChoices.UNIT_CENTIMETER:
  158. return length / 100
  159. if unit == CableLengthUnitChoices.UNIT_FOOT:
  160. return length * 0.3048
  161. if unit == CableLengthUnitChoices.UNIT_INCH:
  162. return length * 0.3048 * 12
  163. raise ValueError("Unknown unit {}. Must be 'm', 'cm', 'ft', or 'in'.".format(unit))
  164. def render_jinja2(template_code, context):
  165. """
  166. Render a Jinja2 template with the provided context. Return the rendered content.
  167. """
  168. return Environment().from_string(source=template_code).render(**context)
  169. def prepare_cloned_fields(instance):
  170. """
  171. Compile an object's `clone_fields` list into a string of URL query parameters. Tags are automatically cloned where
  172. applicable.
  173. """
  174. params = []
  175. for field_name in getattr(instance, 'clone_fields', []):
  176. field = instance._meta.get_field(field_name)
  177. field_value = field.value_from_object(instance)
  178. # Swap out False with URL-friendly value
  179. if field_value is False:
  180. field_value = ''
  181. # Omit empty values
  182. if field_value not in (None, ''):
  183. params.append((field_name, field_value))
  184. # Copy tags
  185. if is_taggable(instance):
  186. for tag in instance.tags.all():
  187. params.append(('tags', tag.pk))
  188. # Concatenate parameters into a URL query string
  189. param_string = '&'.join([f'{k}={v}' for k, v in params])
  190. return param_string
  191. def shallow_compare_dict(source_dict, destination_dict, exclude=None):
  192. """
  193. Return a new dictionary of the different keys. The values of `destination_dict` are returned. Only the equality of
  194. the first layer of keys/values is checked. `exclude` is a list or tuple of keys to be ignored.
  195. """
  196. difference = {}
  197. for key in destination_dict:
  198. if source_dict.get(key) != destination_dict[key]:
  199. if isinstance(exclude, (list, tuple)) and key in exclude:
  200. continue
  201. difference[key] = destination_dict[key]
  202. return difference
  203. def flatten_dict(d, prefix='', separator='.'):
  204. """
  205. Flatten netsted dictionaries into a single level by joining key names with a separator.
  206. :param d: The dictionary to be flattened
  207. :param prefix: Initial prefix (if any)
  208. :param separator: The character to use when concatenating key names
  209. """
  210. ret = {}
  211. for k, v in d.items():
  212. key = separator.join([prefix, k]) if prefix else k
  213. if type(v) is dict:
  214. ret.update(flatten_dict(v, prefix=key))
  215. else:
  216. ret[key] = v
  217. return ret
  218. # Taken from django.utils.functional (<3.0)
  219. def curry(_curried_func, *args, **kwargs):
  220. def _curried(*moreargs, **morekwargs):
  221. return _curried_func(*args, *moreargs, **{**kwargs, **morekwargs})
  222. return _curried
  223. #
  224. # Fake request object
  225. #
  226. class NetBoxFakeRequest:
  227. """
  228. A fake request object which is explicitly defined at the module level so it is able to be pickled. It simply
  229. takes what is passed to it as kwargs on init and sets them as instance variables.
  230. """
  231. def __init__(self, _dict):
  232. self.__dict__ = _dict
  233. def copy_safe_request(request):
  234. """
  235. Copy selected attributes from a request object into a new fake request object. This is needed in places where
  236. thread safe pickling of the useful request data is needed.
  237. """
  238. meta = {
  239. k: request.META[k]
  240. for k in HTTP_REQUEST_META_SAFE_COPY
  241. if k in request.META and isinstance(request.META[k], str)
  242. }
  243. return NetBoxFakeRequest({
  244. 'META': meta,
  245. 'POST': request.POST,
  246. 'GET': request.GET,
  247. 'FILES': request.FILES,
  248. 'user': request.user,
  249. 'path': request.path,
  250. 'id': getattr(request, 'id', None), # UUID assigned by middleware
  251. })