utils.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. import datetime
  2. import json
  3. from collections import OrderedDict
  4. from django.core.serializers import serialize
  5. from django.db.models import Count, OuterRef, Subquery
  6. from django.http import QueryDict
  7. from jinja2 import Environment
  8. from dcim.choices import CableLengthUnitChoices
  9. from extras.utils import is_taggable
  10. def csv_format(data):
  11. """
  12. Encapsulate any data which contains a comma within double quotes.
  13. """
  14. csv = []
  15. for value in data:
  16. # Represent None or False with empty string
  17. if value is None or value is False:
  18. csv.append('')
  19. continue
  20. # Convert dates to ISO format
  21. if isinstance(value, (datetime.date, datetime.datetime)):
  22. value = value.isoformat()
  23. # Force conversion to string first so we can check for any commas
  24. if not isinstance(value, str):
  25. value = '{}'.format(value)
  26. # Double-quote the value if it contains a comma
  27. if ',' in value or '\n' in value:
  28. csv.append('"{}"'.format(value))
  29. else:
  30. csv.append('{}'.format(value))
  31. return ','.join(csv)
  32. def foreground_color(bg_color):
  33. """
  34. Return the ideal foreground color (black or white) for a given background color in hexadecimal RGB format.
  35. """
  36. bg_color = bg_color.strip('#')
  37. r, g, b = [int(bg_color[c:c + 2], 16) for c in (0, 2, 4)]
  38. if r * 0.299 + g * 0.587 + b * 0.114 > 186:
  39. return '000000'
  40. else:
  41. return 'ffffff'
  42. def dynamic_import(name):
  43. """
  44. Dynamically import a class from an absolute path string
  45. """
  46. components = name.split('.')
  47. mod = __import__(components[0])
  48. for comp in components[1:]:
  49. mod = getattr(mod, comp)
  50. return mod
  51. def get_subquery(model, field):
  52. """
  53. Return a Subquery suitable for annotating a child object count.
  54. """
  55. subquery = Subquery(
  56. model.objects.filter(
  57. **{field: OuterRef('pk')}
  58. ).order_by().values(
  59. field
  60. ).annotate(
  61. c=Count('*')
  62. ).values('c')
  63. )
  64. return subquery
  65. def serialize_object(obj, extra=None):
  66. """
  67. Return a generic JSON representation of an object using Django's built-in serializer. (This is used for things like
  68. change logging, not the REST API.) Optionally include a dictionary to supplement the object data.
  69. """
  70. json_str = serialize('json', [obj])
  71. data = json.loads(json_str)[0]['fields']
  72. # Include any custom fields
  73. if hasattr(obj, 'get_custom_fields'):
  74. data['custom_fields'] = {
  75. field: str(value) for field, value in obj.cf.items()
  76. }
  77. # Include any tags
  78. if is_taggable(obj):
  79. data['tags'] = [tag.name for tag in obj.tags.all()]
  80. # Append any extra data
  81. if extra is not None:
  82. data.update(extra)
  83. return data
  84. def dict_to_filter_params(d, prefix=''):
  85. """
  86. Translate a dictionary of attributes to a nested set of parameters suitable for QuerySet filtering. For example:
  87. {
  88. "name": "Foo",
  89. "rack": {
  90. "facility_id": "R101"
  91. }
  92. }
  93. Becomes:
  94. {
  95. "name": "Foo",
  96. "rack__facility_id": "R101"
  97. }
  98. And can be employed as filter parameters:
  99. Device.objects.filter(**dict_to_filter(attrs_dict))
  100. """
  101. params = {}
  102. for key, val in d.items():
  103. k = prefix + key
  104. if isinstance(val, dict):
  105. params.update(dict_to_filter_params(val, k + '__'))
  106. else:
  107. params[k] = val
  108. return params
  109. def deepmerge(original, new):
  110. """
  111. Deep merge two dictionaries (new into original) and return a new dict
  112. """
  113. merged = OrderedDict(original)
  114. for key, val in new.items():
  115. if key in original and isinstance(original[key], dict) and isinstance(val, dict):
  116. merged[key] = deepmerge(original[key], val)
  117. else:
  118. merged[key] = val
  119. return merged
  120. def to_meters(length, unit):
  121. """
  122. Convert the given length to meters.
  123. """
  124. length = int(length)
  125. if length < 0:
  126. raise ValueError("Length must be a positive integer")
  127. valid_units = CableLengthUnitChoices.values()
  128. if unit not in valid_units:
  129. raise ValueError(
  130. "Unknown unit {}. Must be one of the following: {}".format(unit, ', '.join(valid_units))
  131. )
  132. if unit == CableLengthUnitChoices.UNIT_METER:
  133. return length
  134. if unit == CableLengthUnitChoices.UNIT_CENTIMETER:
  135. return length / 100
  136. if unit == CableLengthUnitChoices.UNIT_FOOT:
  137. return length * 0.3048
  138. if unit == CableLengthUnitChoices.UNIT_INCH:
  139. return length * 0.3048 * 12
  140. raise ValueError("Unknown unit {}. Must be 'm', 'cm', 'ft', or 'in'.".format(unit))
  141. def render_jinja2(template_code, context):
  142. """
  143. Render a Jinja2 template with the provided context. Return the rendered content.
  144. """
  145. return Environment().from_string(source=template_code).render(**context)
  146. def prepare_cloned_fields(instance):
  147. """
  148. Compile an object's `clone_fields` list into a string of URL query parameters. Tags are automatically cloned where
  149. applicable.
  150. """
  151. params = {}
  152. for field_name in getattr(instance, 'clone_fields', []):
  153. field = instance._meta.get_field(field_name)
  154. field_value = field.value_from_object(instance)
  155. # Swap out False with URL-friendly value
  156. if field_value is False:
  157. field_value = ''
  158. # Omit empty values
  159. if field_value not in (None, ''):
  160. params[field_name] = field_value
  161. # Copy tags
  162. if is_taggable(instance):
  163. params['tags'] = ','.join([t.name for t in instance.tags.all()])
  164. # Concatenate parameters into a URL query string
  165. param_string = '&'.join(
  166. ['{}={}'.format(k, v) for k, v in params.items()]
  167. )
  168. return param_string
  169. def querydict_to_dict(querydict):
  170. """
  171. Convert a django.http.QueryDict object to a regular Python dictionary, preserving lists of multiple values.
  172. (QueryDict.dict() will return only the last value in a list for each key.)
  173. """
  174. assert isinstance(querydict, QueryDict)
  175. return {
  176. key: querydict.get(key) if len(value) == 1 and key != 'pk' else querydict.getlist(key)
  177. for key, value in querydict.lists()
  178. }
  179. def shallow_compare_dict(source_dict, destination_dict, exclude=None):
  180. """
  181. Return a new dictionary of the different keys. The values of `destination_dict` are returned. Only the equality of
  182. the first layer of keys/values is checked. `exclude` is a list or tuple of keys to be ignored.
  183. """
  184. difference = {}
  185. for key in destination_dict:
  186. if source_dict.get(key) != destination_dict[key]:
  187. if isinstance(exclude, (list, tuple)) and key in exclude:
  188. continue
  189. difference[key] = destination_dict[key]
  190. return difference