| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289 |
- from django.http import Http404, HttpResponse
- from django.shortcuts import get_object_or_404
- from django.utils.translation import gettext_lazy as _
- from django_rq.queues import get_redis_connection
- from django_rq.settings import QUEUES_LIST
- from django_rq.utils import get_statistics
- from drf_spectacular.types import OpenApiTypes
- from drf_spectacular.utils import OpenApiParameter, extend_schema
- from rest_framework import viewsets
- from rest_framework.decorators import action
- from rest_framework.exceptions import PermissionDenied
- from rest_framework.response import Response
- from rest_framework.routers import APIRootView
- from rq.job import Job as RQ_Job
- from rq.worker import Worker
- from core import filtersets
- from core.jobs import SyncDataSourceJob
- from core.models import *
- from core.utils import delete_rq_job, enqueue_rq_job, get_rq_jobs, requeue_rq_job, stop_rq_job
- from netbox.api.authentication import IsAuthenticatedOrLoginNotRequired
- from netbox.api.metadata import ContentTypeMetadata
- from netbox.api.pagination import LimitOffsetListPagination
- from netbox.api.viewsets import NetBoxModelViewSet, NetBoxReadOnlyModelViewSet
- from utilities.api import IsSuperuser
- from . import serializers
- class CoreRootView(APIRootView):
- """
- Core API root view
- """
- def get_view_name(self):
- return 'Core'
- class DataSourceViewSet(NetBoxModelViewSet):
- queryset = DataSource.objects.all()
- serializer_class = serializers.DataSourceSerializer
- filterset_class = filtersets.DataSourceFilterSet
- @action(detail=True, methods=['post'])
- def sync(self, request, pk):
- """
- Enqueue a job to synchronize the DataSource.
- """
- datasource = get_object_or_404(DataSource, pk=pk)
- if not request.user.has_perm('core.sync_datasource', obj=datasource):
- raise PermissionDenied(_("This user does not have permission to synchronize this data source."))
- # Enqueue the sync job
- SyncDataSourceJob.enqueue(instance=datasource, user=request.user)
- serializer = serializers.DataSourceSerializer(datasource, context={'request': request})
- return Response(serializer.data)
- class DataFileViewSet(NetBoxReadOnlyModelViewSet):
- queryset = DataFile.objects.defer('data')
- serializer_class = serializers.DataFileSerializer
- filterset_class = filtersets.DataFileFilterSet
- class JobViewSet(NetBoxReadOnlyModelViewSet):
- """
- Retrieve a list of job results
- """
- queryset = Job.objects.all()
- serializer_class = serializers.JobSerializer
- filterset_class = filtersets.JobFilterSet
- class ObjectChangeViewSet(NetBoxReadOnlyModelViewSet):
- """
- Retrieve a list of recent changes.
- """
- metadata_class = ContentTypeMetadata
- queryset = ObjectChange.objects.all()
- serializer_class = serializers.ObjectChangeSerializer
- filterset_class = filtersets.ObjectChangeFilterSet
- def get_queryset(self):
- return super().get_queryset().valid_models()
- class ObjectTypeViewSet(NetBoxReadOnlyModelViewSet):
- """
- Read-only list of ObjectTypes.
- """
- permission_classes = [IsAuthenticatedOrLoginNotRequired]
- queryset = ObjectType.objects.order_by('app_label', 'model')
- serializer_class = serializers.ObjectTypeSerializer
- filterset_class = filtersets.ObjectTypeFilterSet
- def initial(self, request, *args, **kwargs):
- """
- Override initial() to skip the restrict() call since ObjectType (a ContentType proxy)
- doesn't use RestrictedQuerySet and is publicly accessible metadata.
- """
- # Call GenericViewSet.initial() directly, skipping BaseViewSet.initial()
- # which would try to call restrict() on the queryset
- from rest_framework.viewsets import GenericViewSet
- GenericViewSet.initial(self, request, *args, **kwargs)
- class BaseRQViewSet(viewsets.ViewSet):
- """
- Base class for RQ view sets. Provides a list() method. Subclasses must implement get_data().
- """
- permission_classes = [IsSuperuser]
- serializer_class = None
- def get_data(self):
- raise NotImplementedError()
- @extend_schema(responses={200: OpenApiTypes.OBJECT})
- def list(self, request):
- data = self.get_data()
- paginator = LimitOffsetListPagination()
- data = paginator.paginate_list(data, request)
- serializer = self.serializer_class(data, many=True, context={'request': request})
- return paginator.get_paginated_response(serializer.data)
- def get_serializer(self, *args, **kwargs):
- """
- Return the serializer instance that should be used for validating and
- deserializing input and for serializing output.
- """
- serializer_class = self.get_serializer_class()
- kwargs['context'] = self.get_serializer_context()
- return serializer_class(*args, **kwargs)
- def get_serializer_class(self):
- """
- Return the class to use for the serializer.
- """
- return self.serializer_class
- def get_serializer_context(self):
- """
- Extra context provided to the serializer class.
- """
- return {
- 'request': self.request,
- 'format': self.format_kwarg,
- 'view': self,
- }
- class BackgroundQueueViewSet(BaseRQViewSet):
- """
- Retrieve a list of RQ Queues.
- Note: Queue names are not URL safe, so not returning a detail view.
- """
- serializer_class = serializers.BackgroundQueueSerializer
- lookup_field = 'name'
- lookup_value_regex = r'[\w.@+-]+'
- def get_view_name(self):
- return 'Background Queues'
- def get_data(self):
- return get_statistics(run_maintenance_tasks=True)['queues']
- @extend_schema(
- operation_id='core_background_queues_retrieve_by_name',
- parameters=[OpenApiParameter(name='name', type=OpenApiTypes.STR, location=OpenApiParameter.PATH)],
- responses={200: OpenApiTypes.OBJECT},
- )
- def retrieve(self, request, name):
- data = self.get_data()
- if not data:
- raise Http404
- for queue in data:
- if queue['name'] == name:
- serializer = self.serializer_class(queue, context={'request': request})
- return Response(serializer.data)
- raise Http404
- class BackgroundWorkerViewSet(BaseRQViewSet):
- """
- Retrieve a list of RQ Workers.
- """
- serializer_class = serializers.BackgroundWorkerSerializer
- lookup_field = 'name'
- def get_view_name(self):
- return 'Background Workers'
- def get_data(self):
- config = QUEUES_LIST[0]
- return Worker.all(get_redis_connection(config['connection_config']))
- @extend_schema(
- operation_id='core_background_workers_retrieve_by_name',
- parameters=[OpenApiParameter(name='name', type=OpenApiTypes.STR, location=OpenApiParameter.PATH)],
- responses={200: OpenApiTypes.OBJECT},
- )
- def retrieve(self, request, name):
- # all the RQ queues should use the same connection
- config = QUEUES_LIST[0]
- workers = Worker.all(get_redis_connection(config['connection_config']))
- worker = next((item for item in workers if item.name == name), None)
- if not worker:
- raise Http404
- serializer = serializers.BackgroundWorkerSerializer(worker, context={'request': request})
- return Response(serializer.data)
- class BackgroundTaskViewSet(BaseRQViewSet):
- """
- Retrieve a list of RQ Tasks.
- """
- serializer_class = serializers.BackgroundTaskSerializer
- lookup_field = 'id'
- def get_view_name(self):
- return 'Background Tasks'
- def get_data(self):
- return get_rq_jobs()
- def get_task_from_id(self, task_id):
- config = QUEUES_LIST[0]
- task = RQ_Job.fetch(task_id, connection=get_redis_connection(config['connection_config']))
- if not task:
- raise Http404
- return task
- @extend_schema(
- operation_id='core_background_tasks_retrieve_by_id',
- parameters=[OpenApiParameter(name='id', type=OpenApiTypes.STR, location=OpenApiParameter.PATH)],
- responses={200: OpenApiTypes.OBJECT},
- )
- def retrieve(self, request, id):
- """
- Retrieve the details of the specified RQ Task.
- """
- task = self.get_task_from_id(id)
- serializer = self.serializer_class(task, context={'request': request})
- return Response(serializer.data)
- @extend_schema(parameters=[OpenApiParameter(name='id', type=OpenApiTypes.STR, location=OpenApiParameter.PATH)])
- @action(methods=['POST'], detail=True)
- def delete(self, request, id):
- """
- Delete the specified RQ Task.
- """
- delete_rq_job(id)
- return HttpResponse(status=200)
- @extend_schema(parameters=[OpenApiParameter(name='id', type=OpenApiTypes.STR, location=OpenApiParameter.PATH)])
- @action(methods=['POST'], detail=True)
- def requeue(self, request, id):
- """
- Requeues the specified RQ Task.
- """
- requeue_rq_job(id)
- return HttpResponse(status=200)
- @extend_schema(parameters=[OpenApiParameter(name='id', type=OpenApiTypes.STR, location=OpenApiParameter.PATH)])
- @action(methods=['POST'], detail=True)
- def enqueue(self, request, id):
- """
- Enqueues the specified RQ Task.
- """
- enqueue_rq_job(id)
- return HttpResponse(status=200)
- @extend_schema(parameters=[OpenApiParameter(name='id', type=OpenApiTypes.STR, location=OpenApiParameter.PATH)])
- @action(methods=['POST'], detail=True)
- def stop(self, request, id):
- """
- Stops the specified RQ Task.
- """
- stopped_jobs = stop_rq_job(id)
- if len(stopped_jobs) == 1:
- return HttpResponse(status=200)
- else:
- return HttpResponse(status=204)
|