utils.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. import datetime
  2. import json
  3. from collections import OrderedDict
  4. from django.core.serializers import serialize
  5. from django.db.models import Count, OuterRef, Subquery
  6. from dcim.constants import LENGTH_UNIT_CENTIMETER, LENGTH_UNIT_FOOT, LENGTH_UNIT_INCH, LENGTH_UNIT_METER
  7. def csv_format(data):
  8. """
  9. Encapsulate any data which contains a comma within double quotes.
  10. """
  11. csv = []
  12. for value in data:
  13. # Represent None or False with empty string
  14. if value is None or value is False:
  15. csv.append('')
  16. continue
  17. # Convert dates to ISO format
  18. if isinstance(value, (datetime.date, datetime.datetime)):
  19. value = value.isoformat()
  20. # Force conversion to string first so we can check for any commas
  21. if not isinstance(value, str):
  22. value = '{}'.format(value)
  23. # Double-quote the value if it contains a comma
  24. if ',' in value or '\n' in value:
  25. csv.append('"{}"'.format(value))
  26. else:
  27. csv.append('{}'.format(value))
  28. return ','.join(csv)
  29. def foreground_color(bg_color):
  30. """
  31. Return the ideal foreground color (black or white) for a given background color in hexadecimal RGB format.
  32. """
  33. bg_color = bg_color.strip('#')
  34. r, g, b = [int(bg_color[c:c + 2], 16) for c in (0, 2, 4)]
  35. if r * 0.299 + g * 0.587 + b * 0.114 > 186:
  36. return '000000'
  37. else:
  38. return 'ffffff'
  39. def dynamic_import(name):
  40. """
  41. Dynamically import a class from an absolute path string
  42. """
  43. components = name.split('.')
  44. mod = __import__(components[0])
  45. for comp in components[1:]:
  46. mod = getattr(mod, comp)
  47. return mod
  48. def model_names_to_filter_dict(names):
  49. """
  50. Accept a list of content types in the format ['<app>.<model>', '<app>.<model>', ...] and return a dictionary
  51. suitable for QuerySet filtering.
  52. """
  53. # TODO: This should match on the app_label as well as the model name to avoid potential duplicate names
  54. return {
  55. 'model__in': [model.split('.')[1] for model in names],
  56. }
  57. def get_subquery(model, field):
  58. """
  59. Return a Subquery suitable for annotating a child object count.
  60. """
  61. subquery = Subquery(
  62. model.objects.filter(
  63. **{field: OuterRef('pk')}
  64. ).order_by().values(
  65. field
  66. ).annotate(
  67. c=Count('*')
  68. ).values('c')
  69. )
  70. return subquery
  71. def serialize_object(obj, extra=None):
  72. """
  73. Return a generic JSON representation of an object using Django's built-in serializer. (This is used for things like
  74. change logging, not the REST API.) Optionally include a dictionary to supplement the object data.
  75. """
  76. json_str = serialize('json', [obj])
  77. data = json.loads(json_str)[0]['fields']
  78. # Include any custom fields
  79. if hasattr(obj, 'get_custom_fields'):
  80. data['custom_fields'] = {
  81. field.name: str(value) for field, value in obj.get_custom_fields().items()
  82. }
  83. # Include any tags
  84. if hasattr(obj, 'tags'):
  85. data['tags'] = [tag.name for tag in obj.tags.all()]
  86. # Append any extra data
  87. if extra is not None:
  88. data.update(extra)
  89. return data
  90. def dict_to_filter_params(d, prefix=''):
  91. """
  92. Translate a dictionary of attributes to a nested set of parameters suitable for QuerySet filtering. For example:
  93. {
  94. "name": "Foo",
  95. "rack": {
  96. "facility_id": "R101"
  97. }
  98. }
  99. Becomes:
  100. {
  101. "name": "Foo",
  102. "rack__facility_id": "R101"
  103. }
  104. And can be employed as filter parameters:
  105. Device.objects.filter(**dict_to_filter(attrs_dict))
  106. """
  107. params = {}
  108. for key, val in d.items():
  109. k = prefix + key
  110. if isinstance(val, dict):
  111. params.update(dict_to_filter_params(val, k + '__'))
  112. else:
  113. params[k] = val
  114. return params
  115. def deepmerge(original, new):
  116. """
  117. Deep merge two dictionaries (new into original) and return a new dict
  118. """
  119. merged = OrderedDict(original)
  120. for key, val in new.items():
  121. if key in original and isinstance(original[key], dict) and isinstance(val, dict):
  122. merged[key] = deepmerge(original[key], val)
  123. else:
  124. merged[key] = val
  125. return merged
  126. def to_meters(length, unit):
  127. """
  128. Convert the given length to meters.
  129. """
  130. length = int(length)
  131. if length < 0:
  132. raise ValueError("Length must be a positive integer")
  133. if unit == LENGTH_UNIT_METER:
  134. return length
  135. if unit == LENGTH_UNIT_CENTIMETER:
  136. return length / 100
  137. if unit == LENGTH_UNIT_FOOT:
  138. return length * 0.3048
  139. if unit == LENGTH_UNIT_INCH:
  140. return length * 0.3048 * 12
  141. raise ValueError("Unknown unit {}. Must be 'm', 'cm', 'ft', or 'in'.".format(unit))