| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188 |
- from django.conf import settings
- from django.contrib.auth import authenticate
- from django.contrib.auth import get_user_model
- from drf_spectacular.utils import extend_schema_field
- from drf_spectacular.types import OpenApiTypes
- from rest_framework import serializers
- from rest_framework.exceptions import AuthenticationFailed, PermissionDenied
- from core.models import ObjectType
- from netbox.api.fields import ContentTypeField, IPNetworkSerializer, SerializedPKRelatedField
- from netbox.api.serializers import ValidatedModelSerializer
- from users.models import Group, ObjectPermission, Token
- from .nested_serializers import *
- __all__ = (
- 'GroupSerializer',
- 'ObjectPermissionSerializer',
- 'TokenSerializer',
- 'UserSerializer',
- )
- class UserSerializer(ValidatedModelSerializer):
- url = serializers.HyperlinkedIdentityField(view_name='users-api:user-detail')
- groups = SerializedPKRelatedField(
- queryset=Group.objects.all(),
- serializer=NestedGroupSerializer,
- required=False,
- many=True
- )
- class Meta:
- model = get_user_model()
- fields = (
- 'id', 'url', 'display', 'username', 'password', 'first_name', 'last_name', 'email', 'is_staff', 'is_active',
- 'date_joined', 'last_login', 'groups',
- )
- brief_fields = ('id', 'url', 'display', 'username')
- extra_kwargs = {
- 'password': {'write_only': True}
- }
- def create(self, validated_data):
- """
- Extract the password from validated data and set it separately to ensure proper hash generation.
- """
- password = validated_data.pop('password')
- user = super().create(validated_data)
- user.set_password(password)
- user.save()
- return user
- def update(self, instance, validated_data):
- """
- Ensure proper updated password hash generation.
- """
- password = validated_data.pop('password', None)
- if password is not None:
- instance.set_password(password)
- return super().update(instance, validated_data)
- @extend_schema_field(OpenApiTypes.STR)
- def get_display(self, obj):
- if full_name := obj.get_full_name():
- return f"{obj.username} ({full_name})"
- return obj.username
- class GroupSerializer(ValidatedModelSerializer):
- url = serializers.HyperlinkedIdentityField(view_name='users-api:group-detail')
- user_count = serializers.IntegerField(read_only=True)
- class Meta:
- model = Group
- fields = ('id', 'url', 'display', 'name', 'user_count')
- brief_fields = ('id', 'url', 'display', 'name')
- class TokenSerializer(ValidatedModelSerializer):
- url = serializers.HyperlinkedIdentityField(view_name='users-api:token-detail')
- key = serializers.CharField(
- min_length=40,
- max_length=40,
- allow_blank=True,
- required=False,
- write_only=not settings.ALLOW_TOKEN_RETRIEVAL
- )
- user = NestedUserSerializer()
- allowed_ips = serializers.ListField(
- child=IPNetworkSerializer(),
- required=False,
- allow_empty=True,
- default=[]
- )
- class Meta:
- model = Token
- fields = (
- 'id', 'url', 'display', 'user', 'created', 'expires', 'last_used', 'key', 'write_enabled', 'description',
- 'allowed_ips',
- )
- brief_fields = ('id', 'url', 'display', 'key', 'write_enabled', 'description')
- def to_internal_value(self, data):
- if 'key' not in data:
- data['key'] = Token.generate_key()
- return super().to_internal_value(data)
- def validate(self, data):
- # If the Token is being created on behalf of another user, enforce the grant_token permission.
- request = self.context.get('request')
- token_user = data.get('user')
- if token_user and token_user != request.user and not request.user.has_perm('users.grant_token'):
- raise PermissionDenied("This user does not have permission to create tokens for other users.")
- return super().validate(data)
- class TokenProvisionSerializer(TokenSerializer):
- user = NestedUserSerializer(
- read_only=True
- )
- username = serializers.CharField(
- write_only=True
- )
- password = serializers.CharField(
- write_only=True
- )
- last_used = serializers.DateTimeField(
- read_only=True
- )
- key = serializers.CharField(
- read_only=True
- )
- class Meta:
- model = Token
- fields = (
- 'id', 'url', 'display', 'user', 'created', 'expires', 'last_used', 'key', 'write_enabled', 'description',
- 'allowed_ips', 'username', 'password',
- )
- def validate(self, data):
- # Validate the username and password
- username = data.pop('username')
- password = data.pop('password')
- user = authenticate(request=self.context.get('request'), username=username, password=password)
- if user is None:
- raise AuthenticationFailed("Invalid username/password")
- # Inject the user into the validated data
- data['user'] = user
- return data
- class ObjectPermissionSerializer(ValidatedModelSerializer):
- url = serializers.HyperlinkedIdentityField(view_name='users-api:objectpermission-detail')
- object_types = ContentTypeField(
- queryset=ObjectType.objects.all(),
- many=True
- )
- groups = SerializedPKRelatedField(
- queryset=Group.objects.all(),
- serializer=NestedGroupSerializer,
- required=False,
- many=True
- )
- users = SerializedPKRelatedField(
- queryset=get_user_model().objects.all(),
- serializer=NestedUserSerializer,
- required=False,
- many=True
- )
- class Meta:
- model = ObjectPermission
- fields = (
- 'id', 'url', 'display', 'name', 'description', 'enabled', 'object_types', 'groups', 'users', 'actions',
- 'constraints',
- )
- brief_fields = (
- 'id', 'url', 'display', 'name', 'description', 'enabled', 'object_types', 'groups', 'users', 'actions',
- )
|