models.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. import binascii
  2. import os
  3. from django.contrib.auth.models import Group, User
  4. from django.contrib.contenttypes.models import ContentType
  5. from django.contrib.postgres.fields import JSONField
  6. from django.core.exceptions import FieldError, ValidationError
  7. from django.core.validators import MinLengthValidator
  8. from django.db import models
  9. from django.db.models import Q
  10. from django.db.models.signals import post_save
  11. from django.dispatch import receiver
  12. from django.utils import timezone
  13. from utilities.utils import flatten_dict
  14. __all__ = (
  15. 'Token',
  16. 'UserConfig',
  17. )
  18. class UserConfig(models.Model):
  19. """
  20. This model stores arbitrary user-specific preferences in a JSON data structure.
  21. """
  22. user = models.OneToOneField(
  23. to=User,
  24. on_delete=models.CASCADE,
  25. related_name='config'
  26. )
  27. data = JSONField(
  28. default=dict
  29. )
  30. class Meta:
  31. ordering = ['user']
  32. verbose_name = verbose_name_plural = 'User Preferences'
  33. def get(self, path, default=None):
  34. """
  35. Retrieve a configuration parameter specified by its dotted path. Example:
  36. userconfig.get('foo.bar.baz')
  37. :param path: Dotted path to the configuration key. For example, 'foo.bar' returns self.data['foo']['bar'].
  38. :param default: Default value to return for a nonexistent key (default: None).
  39. """
  40. d = self.data
  41. keys = path.split('.')
  42. # Iterate down the hierarchy, returning the default value if any invalid key is encountered
  43. for key in keys:
  44. if type(d) is dict and key in d:
  45. d = d.get(key)
  46. else:
  47. return default
  48. return d
  49. def all(self):
  50. """
  51. Return a dictionary of all defined keys and their values.
  52. """
  53. return flatten_dict(self.data)
  54. def set(self, path, value, commit=False):
  55. """
  56. Define or overwrite a configuration parameter. Example:
  57. userconfig.set('foo.bar.baz', 123)
  58. Leaf nodes (those which are not dictionaries of other nodes) cannot be overwritten as dictionaries. Similarly,
  59. branch nodes (dictionaries) cannot be overwritten as single values. (A TypeError exception will be raised.) In
  60. both cases, the existing key must first be cleared. This safeguard is in place to help avoid inadvertently
  61. overwriting the wrong key.
  62. :param path: Dotted path to the configuration key. For example, 'foo.bar' sets self.data['foo']['bar'].
  63. :param value: The value to be written. This can be any type supported by JSON.
  64. :param commit: If true, the UserConfig instance will be saved once the new value has been applied.
  65. """
  66. d = self.data
  67. keys = path.split('.')
  68. # Iterate through the hierarchy to find the key we're setting. Raise TypeError if we encounter any
  69. # interim leaf nodes (keys which do not contain dictionaries).
  70. for i, key in enumerate(keys[:-1]):
  71. if key in d and type(d[key]) is dict:
  72. d = d[key]
  73. elif key in d:
  74. err_path = '.'.join(path.split('.')[:i + 1])
  75. raise TypeError(f"Key '{err_path}' is a leaf node; cannot assign new keys")
  76. else:
  77. d = d.setdefault(key, {})
  78. # Set a key based on the last item in the path. Raise TypeError if attempting to overwrite a non-leaf node.
  79. key = keys[-1]
  80. if key in d and type(d[key]) is dict:
  81. raise TypeError(f"Key '{path}' has child keys; cannot assign a value")
  82. else:
  83. d[key] = value
  84. if commit:
  85. self.save()
  86. def clear(self, path, commit=False):
  87. """
  88. Delete a configuration parameter specified by its dotted path. The key and any child keys will be deleted.
  89. Example:
  90. userconfig.clear('foo.bar.baz')
  91. Invalid keys will be ignored silently.
  92. :param path: Dotted path to the configuration key. For example, 'foo.bar' deletes self.data['foo']['bar'].
  93. :param commit: If true, the UserConfig instance will be saved once the new value has been applied.
  94. """
  95. d = self.data
  96. keys = path.split('.')
  97. for key in keys[:-1]:
  98. if key not in d:
  99. break
  100. if type(d[key]) is dict:
  101. d = d[key]
  102. key = keys[-1]
  103. d.pop(key, None) # Avoid a KeyError on invalid keys
  104. if commit:
  105. self.save()
  106. @receiver(post_save, sender=User)
  107. def create_userconfig(instance, created, **kwargs):
  108. """
  109. Automatically create a new UserConfig when a new User is created.
  110. """
  111. if created:
  112. UserConfig(user=instance).save()
  113. class Token(models.Model):
  114. """
  115. An API token used for user authentication. This extends the stock model to allow each user to have multiple tokens.
  116. It also supports setting an expiration time and toggling write ability.
  117. """
  118. user = models.ForeignKey(
  119. to=User,
  120. on_delete=models.CASCADE,
  121. related_name='tokens'
  122. )
  123. created = models.DateTimeField(
  124. auto_now_add=True
  125. )
  126. expires = models.DateTimeField(
  127. blank=True,
  128. null=True
  129. )
  130. key = models.CharField(
  131. max_length=40,
  132. unique=True,
  133. validators=[MinLengthValidator(40)]
  134. )
  135. write_enabled = models.BooleanField(
  136. default=True,
  137. help_text='Permit create/update/delete operations using this key'
  138. )
  139. description = models.CharField(
  140. max_length=200,
  141. blank=True
  142. )
  143. class Meta:
  144. pass
  145. def __str__(self):
  146. # Only display the last 24 bits of the token to avoid accidental exposure.
  147. return "{} ({})".format(self.key[-6:], self.user)
  148. def save(self, *args, **kwargs):
  149. if not self.key:
  150. self.key = self.generate_key()
  151. return super().save(*args, **kwargs)
  152. def generate_key(self):
  153. # Generate a random 160-bit key expressed in hexadecimal.
  154. return binascii.hexlify(os.urandom(20)).decode()
  155. @property
  156. def is_expired(self):
  157. if self.expires is None or timezone.now() < self.expires:
  158. return False
  159. return True
  160. class ObjectPermissionManager(models.Manager):
  161. def get_attr_constraints(self, user, perm):
  162. """
  163. Compile all ObjectPermission attributes applicable to a specific combination of user, model, and action. Returns
  164. a dictionary that can be passed directly to .filter() on a QuerySet.
  165. """
  166. app_label, codename = perm.split('.')
  167. action, model_name = codename.split('_')
  168. assert action in ['view', 'add', 'change', 'delete'], f"Invalid action: {action}"
  169. content_type = ContentType.objects.get(app_label=app_label, model=model_name)
  170. qs = self.get_queryset().filter(
  171. Q(users=user) | Q(groups__user=user),
  172. model=content_type,
  173. **{f'can_{action}': True}
  174. )
  175. attrs = Q()
  176. for perm in qs:
  177. attrs |= Q(**perm.attrs)
  178. return attrs
  179. class ObjectPermission(models.Model):
  180. """
  181. A mapping of view, add, change, and/or delete permission for users and/or groups to an arbitrary set of objects
  182. identified by ORM query parameters.
  183. """
  184. users = models.ManyToManyField(
  185. to=User,
  186. blank=True,
  187. related_name='object_permissions'
  188. )
  189. groups = models.ManyToManyField(
  190. to=Group,
  191. blank=True,
  192. related_name='object_permissions'
  193. )
  194. model = models.ForeignKey(
  195. to=ContentType,
  196. on_delete=models.CASCADE
  197. )
  198. attrs = JSONField(
  199. blank=True,
  200. null=True,
  201. verbose_name='Attributes'
  202. )
  203. can_view = models.BooleanField(
  204. default=False
  205. )
  206. can_add = models.BooleanField(
  207. default=False
  208. )
  209. can_change = models.BooleanField(
  210. default=False
  211. )
  212. can_delete = models.BooleanField(
  213. default=False
  214. )
  215. objects = ObjectPermissionManager()
  216. class Meta:
  217. unique_together = ('model', 'attrs')
  218. def clean(self):
  219. # Validate the specified model attributes by attempting to execute a query. We don't care whether the query
  220. # returns anything; we just want to make sure the specified attributes are valid.
  221. if self.attrs:
  222. model = self.model.model_class()
  223. try:
  224. model.objects.filter(**self.attrs).exists()
  225. except FieldError as e:
  226. raise ValidationError({
  227. 'attrs': f'Invalid attributes for {model}: {e}'
  228. })