|
@@ -8,6 +8,7 @@ from rest_framework.test import APIClient
|
|
|
|
|
|
|
|
from core.models import ObjectType
|
|
from core.models import ObjectType
|
|
|
from dcim.models import Rack, Site
|
|
from dcim.models import Rack, Site
|
|
|
|
|
+from users.constants import TOKEN_PREFIX
|
|
|
from users.models import Group, ObjectPermission, Token, User
|
|
from users.models import Group, ObjectPermission, Token, User
|
|
|
from utilities.testing import TestCase
|
|
from utilities.testing import TestCase
|
|
|
from utilities.testing.api import APITestCase
|
|
from utilities.testing.api import APITestCase
|
|
@@ -16,67 +17,159 @@ from utilities.testing.api import APITestCase
|
|
|
class TokenAuthenticationTestCase(APITestCase):
|
|
class TokenAuthenticationTestCase(APITestCase):
|
|
|
|
|
|
|
|
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
|
|
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
|
|
|
- def test_token_authentication(self):
|
|
|
|
|
- url = reverse('dcim-api:site-list')
|
|
|
|
|
-
|
|
|
|
|
|
|
+ def test_no_token(self):
|
|
|
# Request without a token should return a 403
|
|
# Request without a token should return a 403
|
|
|
- response = self.client.get(url)
|
|
|
|
|
|
|
+ response = self.client.get(reverse('dcim-api:site-list'))
|
|
|
self.assertEqual(response.status_code, 403)
|
|
self.assertEqual(response.status_code, 403)
|
|
|
|
|
|
|
|
|
|
+ @override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
|
|
|
|
|
+ def test_v1_token_valid(self):
|
|
|
|
|
+ # Create a v1 token
|
|
|
|
|
+ token = Token.objects.create(version=1, user=self.user)
|
|
|
|
|
+
|
|
|
# Valid token should return a 200
|
|
# Valid token should return a 200
|
|
|
- token = Token.objects.create(user=self.user)
|
|
|
|
|
- response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}')
|
|
|
|
|
- self.assertEqual(response.status_code, 200)
|
|
|
|
|
|
|
+ header = f'Token {token.token}'
|
|
|
|
|
+ response = self.client.get(reverse('dcim-api:site-list'), HTTP_AUTHORIZATION=header)
|
|
|
|
|
+ self.assertEqual(response.status_code, 200, response.data)
|
|
|
|
|
|
|
|
# Check that the token's last_used time has been updated
|
|
# Check that the token's last_used time has been updated
|
|
|
token.refresh_from_db()
|
|
token.refresh_from_db()
|
|
|
self.assertIsNotNone(token.last_used)
|
|
self.assertIsNotNone(token.last_used)
|
|
|
|
|
|
|
|
|
|
+ @override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
|
|
|
|
|
+ def test_v1_token_invalid(self):
|
|
|
|
|
+ # Invalid token should return a 403
|
|
|
|
|
+ header = 'Token XXXXXXXXXX'
|
|
|
|
|
+ response = self.client.get(reverse('dcim-api:site-list'), HTTP_AUTHORIZATION=header)
|
|
|
|
|
+ self.assertEqual(response.status_code, 403)
|
|
|
|
|
+ self.assertEqual(response.data['detail'], "Invalid v1 token")
|
|
|
|
|
+
|
|
|
|
|
+ @override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
|
|
|
|
|
+ def test_v2_token_valid(self):
|
|
|
|
|
+ # Create a v2 token
|
|
|
|
|
+ token = Token.objects.create(version=2, user=self.user)
|
|
|
|
|
+
|
|
|
|
|
+ # Valid token should return a 200
|
|
|
|
|
+ header = f'Bearer {TOKEN_PREFIX}{token.key}.{token.token}'
|
|
|
|
|
+ response = self.client.get(reverse('dcim-api:site-list'), HTTP_AUTHORIZATION=header)
|
|
|
|
|
+ self.assertEqual(response.status_code, 200, response.data)
|
|
|
|
|
+
|
|
|
|
|
+ # Check that the token's last_used time has been updated
|
|
|
|
|
+ token.refresh_from_db()
|
|
|
|
|
+ self.assertIsNotNone(token.last_used)
|
|
|
|
|
+
|
|
|
|
|
+ @override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
|
|
|
|
|
+ def test_v2_token_invalid(self):
|
|
|
|
|
+ # Invalid token should return a 403
|
|
|
|
|
+ header = f'Bearer {TOKEN_PREFIX}XXXXXX.XXXXXXXXXX'
|
|
|
|
|
+ response = self.client.get(reverse('dcim-api:site-list'), HTTP_AUTHORIZATION=header)
|
|
|
|
|
+ self.assertEqual(response.status_code, 403)
|
|
|
|
|
+ self.assertEqual(response.data['detail'], "Invalid v2 token")
|
|
|
|
|
+
|
|
|
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
|
|
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
|
|
|
def test_token_expiration(self):
|
|
def test_token_expiration(self):
|
|
|
url = reverse('dcim-api:site-list')
|
|
url = reverse('dcim-api:site-list')
|
|
|
|
|
|
|
|
- # Request without a non-expired token should succeed
|
|
|
|
|
- token = Token.objects.create(user=self.user)
|
|
|
|
|
- response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}')
|
|
|
|
|
|
|
+ # Create v1 & v2 tokens
|
|
|
|
|
+ future = datetime.datetime(2100, 1, 1, tzinfo=datetime.timezone.utc)
|
|
|
|
|
+ token1 = Token.objects.create(version=1, user=self.user, expires=future)
|
|
|
|
|
+ token2 = Token.objects.create(version=2, user=self.user, expires=future)
|
|
|
|
|
+
|
|
|
|
|
+ # Request with a non-expired token should succeed
|
|
|
|
|
+ response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token1.token}')
|
|
|
|
|
+ self.assertEqual(response.status_code, 200)
|
|
|
|
|
+ response = self.client.get(url, HTTP_AUTHORIZATION=f'Bearer {TOKEN_PREFIX}{token2.key}.{token2.token}')
|
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
|
|
|
|
# Request with an expired token should fail
|
|
# Request with an expired token should fail
|
|
|
- token.expires = datetime.datetime(2020, 1, 1, tzinfo=datetime.timezone.utc)
|
|
|
|
|
- token.save()
|
|
|
|
|
- response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}')
|
|
|
|
|
|
|
+ past = datetime.datetime(2020, 1, 1, tzinfo=datetime.timezone.utc)
|
|
|
|
|
+ token1.expires = past
|
|
|
|
|
+ token1.save()
|
|
|
|
|
+ token2.expires = past
|
|
|
|
|
+ token2.save()
|
|
|
|
|
+ response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token1.key}')
|
|
|
|
|
+ self.assertEqual(response.status_code, 403)
|
|
|
|
|
+ response = self.client.get(url, HTTP_AUTHORIZATION=f'Bearer {TOKEN_PREFIX}{token2.key}')
|
|
|
self.assertEqual(response.status_code, 403)
|
|
self.assertEqual(response.status_code, 403)
|
|
|
|
|
|
|
|
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
|
|
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
|
|
|
def test_token_write_enabled(self):
|
|
def test_token_write_enabled(self):
|
|
|
url = reverse('dcim-api:site-list')
|
|
url = reverse('dcim-api:site-list')
|
|
|
- data = {
|
|
|
|
|
- 'name': 'Site 1',
|
|
|
|
|
- 'slug': 'site-1',
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ data = [
|
|
|
|
|
+ {
|
|
|
|
|
+ 'name': 'Site 1',
|
|
|
|
|
+ 'slug': 'site-1',
|
|
|
|
|
+ },
|
|
|
|
|
+ {
|
|
|
|
|
+ 'name': 'Site 2',
|
|
|
|
|
+ 'slug': 'site-2',
|
|
|
|
|
+ },
|
|
|
|
|
+ ]
|
|
|
|
|
+ self.add_permissions('dcim.view_site', 'dcim.add_site')
|
|
|
|
|
+
|
|
|
|
|
+ # Create v1 & v2 tokens
|
|
|
|
|
+ token1 = Token.objects.create(version=1, user=self.user, write_enabled=False)
|
|
|
|
|
+ token2 = Token.objects.create(version=2, user=self.user, write_enabled=False)
|
|
|
|
|
+
|
|
|
|
|
+ token1_header = f'Token {token1.token}'
|
|
|
|
|
+ token2_header = f'Bearer {TOKEN_PREFIX}{token2.key}.{token2.token}'
|
|
|
|
|
+
|
|
|
|
|
+ # GET request with a write-disabled token should succeed
|
|
|
|
|
+ response = self.client.get(url, HTTP_AUTHORIZATION=token1_header)
|
|
|
|
|
+ self.assertEqual(response.status_code, 200)
|
|
|
|
|
+ response = self.client.get(url, HTTP_AUTHORIZATION=token2_header)
|
|
|
|
|
+ self.assertEqual(response.status_code, 200)
|
|
|
|
|
|
|
|
- # Request with a write-disabled token should fail
|
|
|
|
|
- token = Token.objects.create(user=self.user, write_enabled=False)
|
|
|
|
|
- response = self.client.post(url, data, format='json', HTTP_AUTHORIZATION=f'Token {token.key}')
|
|
|
|
|
|
|
+ # POST request with a write-disabled token should fail
|
|
|
|
|
+ response = self.client.post(url, data[0], format='json', HTTP_AUTHORIZATION=token1_header)
|
|
|
self.assertEqual(response.status_code, 403)
|
|
self.assertEqual(response.status_code, 403)
|
|
|
-
|
|
|
|
|
- # Request with a write-enabled token should succeed
|
|
|
|
|
- token.write_enabled = True
|
|
|
|
|
- token.save()
|
|
|
|
|
- response = self.client.post(url, data, format='json', HTTP_AUTHORIZATION=f'Token {token.key}')
|
|
|
|
|
|
|
+ response = self.client.post(url, data[1], format='json', HTTP_AUTHORIZATION=token2_header)
|
|
|
self.assertEqual(response.status_code, 403)
|
|
self.assertEqual(response.status_code, 403)
|
|
|
|
|
|
|
|
|
|
+ # POST request with a write-enabled token should succeed
|
|
|
|
|
+ token1.write_enabled = True
|
|
|
|
|
+ token1.save()
|
|
|
|
|
+ token2.write_enabled = True
|
|
|
|
|
+ token2.save()
|
|
|
|
|
+ response = self.client.post(url, data[0], format='json', HTTP_AUTHORIZATION=token1_header)
|
|
|
|
|
+ self.assertEqual(response.status_code, 201)
|
|
|
|
|
+ response = self.client.post(url, data[1], format='json', HTTP_AUTHORIZATION=token2_header)
|
|
|
|
|
+ self.assertEqual(response.status_code, 201)
|
|
|
|
|
+
|
|
|
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
|
|
@override_settings(LOGIN_REQUIRED=True, EXEMPT_VIEW_PERMISSIONS=['*'])
|
|
|
def test_token_allowed_ips(self):
|
|
def test_token_allowed_ips(self):
|
|
|
url = reverse('dcim-api:site-list')
|
|
url = reverse('dcim-api:site-list')
|
|
|
|
|
|
|
|
|
|
+ # Create v1 & v2 tokens
|
|
|
|
|
+ token1 = Token.objects.create(version=1, user=self.user, allowed_ips=['192.0.2.0/24'])
|
|
|
|
|
+ token2 = Token.objects.create(version=2, user=self.user, allowed_ips=['192.0.2.0/24'])
|
|
|
|
|
+
|
|
|
# Request from a non-allowed client IP should fail
|
|
# Request from a non-allowed client IP should fail
|
|
|
- token = Token.objects.create(user=self.user, allowed_ips=['192.0.2.0/24'])
|
|
|
|
|
- response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}', REMOTE_ADDR='127.0.0.1')
|
|
|
|
|
|
|
+ response = self.client.get(
|
|
|
|
|
+ url,
|
|
|
|
|
+ HTTP_AUTHORIZATION=f'Token {token1.token}',
|
|
|
|
|
+ REMOTE_ADDR='127.0.0.1'
|
|
|
|
|
+ )
|
|
|
|
|
+ self.assertEqual(response.status_code, 403)
|
|
|
|
|
+ response = self.client.get(
|
|
|
|
|
+ url,
|
|
|
|
|
+ HTTP_AUTHORIZATION=f'Bearer {TOKEN_PREFIX}{token2.key}.{token2.token}',
|
|
|
|
|
+ REMOTE_ADDR='127.0.0.1'
|
|
|
|
|
+ )
|
|
|
self.assertEqual(response.status_code, 403)
|
|
self.assertEqual(response.status_code, 403)
|
|
|
|
|
|
|
|
- # Request with an expired token should fail
|
|
|
|
|
- response = self.client.get(url, HTTP_AUTHORIZATION=f'Token {token.key}', REMOTE_ADDR='192.0.2.1')
|
|
|
|
|
|
|
+ # Request from an allowed client IP should succeed
|
|
|
|
|
+ response = self.client.get(
|
|
|
|
|
+ url,
|
|
|
|
|
+ HTTP_AUTHORIZATION=f'Token {token1.token}',
|
|
|
|
|
+ REMOTE_ADDR='192.0.2.1'
|
|
|
|
|
+ )
|
|
|
|
|
+ self.assertEqual(response.status_code, 200)
|
|
|
|
|
+ response = self.client.get(
|
|
|
|
|
+ url,
|
|
|
|
|
+ HTTP_AUTHORIZATION=f'Bearer {TOKEN_PREFIX}{token2.key}.{token2.token}',
|
|
|
|
|
+ REMOTE_ADDR='192.0.2.1'
|
|
|
|
|
+ )
|
|
|
self.assertEqual(response.status_code, 200)
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
|
|
|
|
|
|
|
|
@@ -427,7 +520,7 @@ class ObjectPermissionAPIViewTestCase(TestCase):
|
|
|
"""
|
|
"""
|
|
|
self.user = User.objects.create(username='testuser')
|
|
self.user = User.objects.create(username='testuser')
|
|
|
self.token = Token.objects.create(user=self.user)
|
|
self.token = Token.objects.create(user=self.user)
|
|
|
- self.header = {'HTTP_AUTHORIZATION': 'Token {}'.format(self.token.key)}
|
|
|
|
|
|
|
+ self.header = {'HTTP_AUTHORIZATION': f'Bearer {TOKEN_PREFIX}{self.token.key}.{self.token.token}'}
|
|
|
|
|
|
|
|
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
|
|
@override_settings(EXEMPT_VIEW_PERMISSIONS=[])
|
|
|
def test_get_object(self):
|
|
def test_get_object(self):
|