tokens.py 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. import hashlib
  2. import hmac
  3. import random
  4. import zoneinfo
  5. from django.conf import settings
  6. from django.contrib.postgres.fields import ArrayField
  7. from django.core.exceptions import ValidationError
  8. from django.core.validators import MinLengthValidator
  9. from django.db import models
  10. from django.db.models import Q
  11. from django.urls import reverse
  12. from django.utils import timezone
  13. from django.utils.translation import gettext_lazy as _
  14. from netaddr import IPNetwork
  15. from ipam.fields import IPNetworkField
  16. from users.choices import TokenVersionChoices
  17. from users.constants import TOKEN_CHARSET, TOKEN_DEFAULT_LENGTH, TOKEN_KEY_LENGTH, TOKEN_PREFIX
  18. from users.utils import get_current_pepper
  19. from utilities.querysets import RestrictedQuerySet
  20. __all__ = (
  21. 'Token',
  22. )
  23. class Token(models.Model):
  24. """
  25. An API token used for user authentication. This extends the stock model to allow each user to have multiple tokens.
  26. It also supports setting an expiration time and toggling write ability.
  27. """
  28. _token = None
  29. version = models.PositiveSmallIntegerField(
  30. verbose_name=_('version'),
  31. choices=TokenVersionChoices,
  32. default=TokenVersionChoices.V2,
  33. )
  34. user = models.ForeignKey(
  35. to='users.User',
  36. on_delete=models.CASCADE,
  37. related_name='tokens'
  38. )
  39. description = models.CharField(
  40. verbose_name=_('description'),
  41. max_length=200,
  42. blank=True
  43. )
  44. created = models.DateTimeField(
  45. verbose_name=_('created'),
  46. auto_now_add=True
  47. )
  48. expires = models.DateTimeField(
  49. verbose_name=_('expires'),
  50. blank=True,
  51. null=True
  52. )
  53. last_used = models.DateTimeField(
  54. verbose_name=_('last used'),
  55. blank=True,
  56. null=True
  57. )
  58. write_enabled = models.BooleanField(
  59. verbose_name=_('write enabled'),
  60. default=True,
  61. help_text=_('Permit create/update/delete operations using this key')
  62. )
  63. # For legacy v1 tokens, this field stores the plaintext 40-char token value. Not used for v2.
  64. plaintext = models.CharField(
  65. verbose_name=_('plaintext'),
  66. max_length=40,
  67. unique=True,
  68. blank=True,
  69. null=True,
  70. validators=[MinLengthValidator(40)],
  71. )
  72. key = models.CharField(
  73. verbose_name=_('key'),
  74. max_length=TOKEN_KEY_LENGTH,
  75. unique=True,
  76. blank=True,
  77. null=True,
  78. validators=[MinLengthValidator(TOKEN_KEY_LENGTH)],
  79. help_text=_('v2 token identification key'),
  80. )
  81. pepper_id = models.PositiveSmallIntegerField(
  82. verbose_name=_('pepper ID'),
  83. blank=True,
  84. null=True,
  85. help_text=_('ID of the cryptographic pepper used to hash the token (v2 only)'),
  86. )
  87. hmac_digest = models.CharField(
  88. verbose_name=_('digest'),
  89. max_length=64,
  90. blank=True,
  91. null=True,
  92. help_text=_('SHA256 hash of the token and pepper (v2 only)'),
  93. )
  94. allowed_ips = ArrayField(
  95. base_field=IPNetworkField(),
  96. blank=True,
  97. null=True,
  98. verbose_name=_('allowed IPs'),
  99. help_text=_(
  100. 'Allowed IPv4/IPv6 networks from where the token can be used. Leave blank for no restrictions. '
  101. 'Ex: "10.1.1.0/24, 192.168.10.16/32, 2001:DB8:1::/64"'
  102. ),
  103. )
  104. objects = RestrictedQuerySet.as_manager()
  105. class Meta:
  106. ordering = ('-created',)
  107. verbose_name = _('token')
  108. verbose_name_plural = _('tokens')
  109. constraints = [
  110. models.CheckConstraint(
  111. name='enforce_version_dependent_fields',
  112. condition=(
  113. Q(
  114. version=1,
  115. key__isnull=True,
  116. pepper_id__isnull=True,
  117. hmac_digest__isnull=True,
  118. plaintext__isnull=False
  119. ) |
  120. Q(
  121. version=2,
  122. key__isnull=False,
  123. pepper_id__isnull=False,
  124. hmac_digest__isnull=False,
  125. plaintext__isnull=True
  126. )
  127. ),
  128. ),
  129. ]
  130. def __init__(self, *args, token=None, **kwargs):
  131. super().__init__(*args, **kwargs)
  132. # This stores the initial plaintext value (if given) on the creation of a new Token. If not provided, a
  133. # random token value will be generated and assigned immediately prior to saving the Token instance.
  134. self.token = token
  135. def __str__(self):
  136. return self.key if self.v2 else self.partial
  137. def get_absolute_url(self):
  138. return reverse('users:token', args=[self.pk])
  139. @property
  140. def v1(self):
  141. return self.version == 1
  142. @property
  143. def v2(self):
  144. return self.version == 2
  145. @property
  146. def partial(self):
  147. """
  148. Return a sanitized representation of a v1 token.
  149. """
  150. return f'**********************************{self.plaintext[-6:]}' if self.plaintext else ''
  151. @property
  152. def token(self):
  153. return self._token
  154. @token.setter
  155. def token(self, value):
  156. if not self._state.adding:
  157. raise ValueError("Cannot assign a new plaintext value for an existing token.")
  158. self._token = value
  159. if value is not None:
  160. if self.v1:
  161. self.plaintext = value
  162. elif self.v2:
  163. self.key = self.key or self.generate_key()
  164. self.update_digest()
  165. def clean(self):
  166. super().clean()
  167. if self._state.adding:
  168. if self.pepper_id is not None and self.pepper_id not in settings.API_TOKEN_PEPPERS:
  169. raise ValidationError(_(
  170. "Invalid pepper ID: {id}. Check configured API_TOKEN_PEPPERS."
  171. ).format(id=self.pepper_id))
  172. # Prevent creating a token with a past expiration date
  173. # while allowing updates to existing tokens.
  174. if self.pk is None and self.is_expired:
  175. current_tz = zoneinfo.ZoneInfo(settings.TIME_ZONE)
  176. now = timezone.now().astimezone(current_tz)
  177. current_time_str = f'{now.date().isoformat()} {now.time().isoformat(timespec="seconds")}'
  178. # Translators: {current_time} is the current server date and time in ISO format,
  179. # {timezone} is the configured server time zone (for example, "UTC" or "Europe/Berlin").
  180. message = _(
  181. 'Expiration time must be in the future. Current server time is {current_time} ({timezone}).'
  182. ).format(current_time=current_time_str, timezone=current_tz.key)
  183. raise ValidationError({'expires': message})
  184. def save(self, *args, **kwargs):
  185. # If creating a new Token and no token value has been specified, generate one
  186. if self._state.adding and self.token is None:
  187. self.token = self.generate()
  188. return super().save(*args, **kwargs)
  189. @classmethod
  190. def generate_key(cls):
  191. """
  192. Generate and return a random alphanumeric key for v2 tokens.
  193. """
  194. return cls.generate(length=TOKEN_KEY_LENGTH)
  195. @staticmethod
  196. def generate(length=TOKEN_DEFAULT_LENGTH):
  197. """
  198. Generate and return a random token value of the given length.
  199. """
  200. return ''.join(random.choice(TOKEN_CHARSET) for _ in range(length))
  201. def update_digest(self):
  202. """
  203. Recalculate and save the HMAC digest using the currently defined pepper and token values.
  204. """
  205. self.pepper_id, pepper = get_current_pepper()
  206. self.hmac_digest = hmac.new(
  207. pepper.encode('utf-8'),
  208. self.token.encode('utf-8'),
  209. hashlib.sha256
  210. ).hexdigest()
  211. @property
  212. def is_expired(self):
  213. if self.expires is None or timezone.now() < self.expires:
  214. return False
  215. return True
  216. def validate(self, token):
  217. """
  218. Validate the given plaintext against the token.
  219. For v1 tokens, check that the given value is equal to the stored plaintext. For v2 tokens, calculate an HMAC
  220. from the Token's pepper ID and the given plaintext value, and check whether the result matches the recorded
  221. digest.
  222. """
  223. if self.v1:
  224. return token == self.token
  225. if self.v2:
  226. token = token.removeprefix(TOKEN_PREFIX)
  227. try:
  228. pepper = settings.API_TOKEN_PEPPERS[self.pepper_id]
  229. except KeyError:
  230. # Invalid pepper ID
  231. return False
  232. digest = hmac.new(pepper.encode('utf-8'), token.encode('utf-8'), hashlib.sha256).hexdigest()
  233. return digest == self.hmac_digest
  234. def validate_client_ip(self, client_ip):
  235. """
  236. Validate the API client IP address against the source IP restrictions (if any) set on the token.
  237. """
  238. if not self.allowed_ips:
  239. return True
  240. for ip_network in self.allowed_ips:
  241. if client_ip in IPNetwork(ip_network):
  242. return True
  243. return False