utils.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. import datetime
  2. import json
  3. from collections import OrderedDict
  4. from django.core.serializers import serialize
  5. from django.db.models import Count, OuterRef, Subquery
  6. from dcim.choices import CableLengthUnitChoices
  7. from extras.utils import is_taggable
  8. def csv_format(data):
  9. """
  10. Encapsulate any data which contains a comma within double quotes.
  11. """
  12. csv = []
  13. for value in data:
  14. # Represent None or False with empty string
  15. if value is None or value is False:
  16. csv.append('')
  17. continue
  18. # Convert dates to ISO format
  19. if isinstance(value, (datetime.date, datetime.datetime)):
  20. value = value.isoformat()
  21. # Force conversion to string first so we can check for any commas
  22. if not isinstance(value, str):
  23. value = '{}'.format(value)
  24. # Double-quote the value if it contains a comma
  25. if ',' in value or '\n' in value:
  26. csv.append('"{}"'.format(value))
  27. else:
  28. csv.append('{}'.format(value))
  29. return ','.join(csv)
  30. def foreground_color(bg_color):
  31. """
  32. Return the ideal foreground color (black or white) for a given background color in hexadecimal RGB format.
  33. """
  34. bg_color = bg_color.strip('#')
  35. r, g, b = [int(bg_color[c:c + 2], 16) for c in (0, 2, 4)]
  36. if r * 0.299 + g * 0.587 + b * 0.114 > 186:
  37. return '000000'
  38. else:
  39. return 'ffffff'
  40. def dynamic_import(name):
  41. """
  42. Dynamically import a class from an absolute path string
  43. """
  44. components = name.split('.')
  45. mod = __import__(components[0])
  46. for comp in components[1:]:
  47. mod = getattr(mod, comp)
  48. return mod
  49. def model_names_to_filter_dict(names):
  50. """
  51. Accept a list of content types in the format ['<app>.<model>', '<app>.<model>', ...] and return a dictionary
  52. suitable for QuerySet filtering.
  53. """
  54. # TODO: This should match on the app_label as well as the model name to avoid potential duplicate names
  55. return {
  56. 'model__in': [model.split('.')[1] for model in names],
  57. }
  58. def get_subquery(model, field):
  59. """
  60. Return a Subquery suitable for annotating a child object count.
  61. """
  62. subquery = Subquery(
  63. model.objects.filter(
  64. **{field: OuterRef('pk')}
  65. ).order_by().values(
  66. field
  67. ).annotate(
  68. c=Count('*')
  69. ).values('c')
  70. )
  71. return subquery
  72. def serialize_object(obj, extra=None):
  73. """
  74. Return a generic JSON representation of an object using Django's built-in serializer. (This is used for things like
  75. change logging, not the REST API.) Optionally include a dictionary to supplement the object data.
  76. """
  77. json_str = serialize('json', [obj])
  78. data = json.loads(json_str)[0]['fields']
  79. # Include any custom fields
  80. if hasattr(obj, 'get_custom_fields'):
  81. data['custom_fields'] = {
  82. field: str(value) for field, value in obj.cf.items()
  83. }
  84. # Include any tags
  85. if is_taggable(obj):
  86. data['tags'] = [tag.name for tag in obj.tags.all()]
  87. # Append any extra data
  88. if extra is not None:
  89. data.update(extra)
  90. return data
  91. def dict_to_filter_params(d, prefix=''):
  92. """
  93. Translate a dictionary of attributes to a nested set of parameters suitable for QuerySet filtering. For example:
  94. {
  95. "name": "Foo",
  96. "rack": {
  97. "facility_id": "R101"
  98. }
  99. }
  100. Becomes:
  101. {
  102. "name": "Foo",
  103. "rack__facility_id": "R101"
  104. }
  105. And can be employed as filter parameters:
  106. Device.objects.filter(**dict_to_filter(attrs_dict))
  107. """
  108. params = {}
  109. for key, val in d.items():
  110. k = prefix + key
  111. if isinstance(val, dict):
  112. params.update(dict_to_filter_params(val, k + '__'))
  113. else:
  114. params[k] = val
  115. return params
  116. def deepmerge(original, new):
  117. """
  118. Deep merge two dictionaries (new into original) and return a new dict
  119. """
  120. merged = OrderedDict(original)
  121. for key, val in new.items():
  122. if key in original and isinstance(original[key], dict) and isinstance(val, dict):
  123. merged[key] = deepmerge(original[key], val)
  124. else:
  125. merged[key] = val
  126. return merged
  127. def to_meters(length, unit):
  128. """
  129. Convert the given length to meters.
  130. """
  131. length = int(length)
  132. if length < 0:
  133. raise ValueError("Length must be a positive integer")
  134. valid_units = [u[0] for u in CableLengthUnitChoices]
  135. if unit not in valid_units:
  136. raise ValueError(
  137. "Unknown unit {}. Must be one of the following: {}".format(unit, ', '.join(valid_units))
  138. )
  139. if unit == CableLengthUnitChoices.UNIT_METER:
  140. return length
  141. if unit == CableLengthUnitChoices.UNIT_CENTIMETER:
  142. return length / 100
  143. if unit == CableLengthUnitChoices.UNIT_FOOT:
  144. return length * 0.3048
  145. if unit == CableLengthUnitChoices.UNIT_INCH:
  146. return length * 0.3048 * 12
  147. def prepare_cloned_fields(instance):
  148. """
  149. Compile an object's `clone_fields` list into a string of URL query parameters. Tags are automatically cloned where
  150. applicable.
  151. """
  152. params = {}
  153. for field_name in getattr(instance, 'clone_fields', []):
  154. field = instance._meta.get_field(field_name)
  155. field_value = field.value_from_object(instance)
  156. # Swap out False with URL-friendly value
  157. if field_value is False:
  158. field_value = ''
  159. # Omit empty values
  160. if field_value not in (None, ''):
  161. params[field_name] = field_value
  162. # Copy tags
  163. if is_taggable(instance):
  164. params['tags'] = ','.join([t.name for t in instance.tags.all()])
  165. # Concatenate parameters into a URL query string
  166. param_string = '&'.join(
  167. ['{}={}'.format(k, v) for k, v in params.items()]
  168. )
  169. return param_string