| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714 |
- import json
- import platform
- from copy import deepcopy
- from django import __version__ as django_version
- from django.conf import settings
- from django.contrib import messages
- from django.contrib.auth.mixins import UserPassesTestMixin
- from django.core.cache import cache
- from django.db import connection, ProgrammingError
- from django.http import HttpResponse, HttpResponseForbidden, Http404
- from django.shortcuts import get_object_or_404, redirect, render
- from django.urls import reverse
- from django.utils.translation import gettext_lazy as _
- from django.views.generic import View
- from django_rq.queues import get_connection, get_queue_by_index, get_redis_connection
- from django_rq.settings import QUEUES_MAP, QUEUES_LIST
- from django_rq.utils import get_statistics
- from rq.exceptions import NoSuchJobError
- from rq.job import Job as RQ_Job, JobStatus as RQJobStatus
- from rq.worker import Worker
- from rq.worker_registration import clean_worker_registry
- from core.utils import delete_rq_job, enqueue_rq_job, get_rq_jobs_from_status, requeue_rq_job, stop_rq_job
- from netbox.config import get_config, PARAMS
- from netbox.object_actions import AddObject, BulkDelete, BulkExport, DeleteObject
- from netbox.plugins.utils import get_installed_plugins
- from netbox.views import generic
- from netbox.views.generic.base import BaseObjectView
- from netbox.views.generic.mixins import TableMixin
- from utilities.apps import get_installed_apps
- from utilities.data import shallow_compare_dict
- from utilities.forms import ConfirmationForm
- from utilities.htmx import htmx_partial
- from utilities.json import ConfigJSONEncoder
- from utilities.query import count_related
- from utilities.views import (
- ContentTypePermissionRequiredMixin,
- GetRelatedModelsMixin,
- GetReturnURLMixin,
- ViewTab,
- register_model_view,
- )
- from . import filtersets, forms, tables
- from .jobs import SyncDataSourceJob
- from .models import *
- from .plugins import get_catalog_plugins, get_local_plugins
- from .tables import CatalogPluginTable, JobLogEntryTable, PluginVersionTable
- #
- # Data sources
- #
- @register_model_view(DataSource, 'list', path='', detail=False)
- class DataSourceListView(generic.ObjectListView):
- queryset = DataSource.objects.annotate(
- file_count=count_related(DataFile, 'source')
- )
- filterset = filtersets.DataSourceFilterSet
- filterset_form = forms.DataSourceFilterForm
- table = tables.DataSourceTable
- @register_model_view(DataSource)
- class DataSourceView(GetRelatedModelsMixin, generic.ObjectView):
- queryset = DataSource.objects.all()
- def get_extra_context(self, request, instance):
- return {
- 'related_models': self.get_related_models(request, instance),
- }
- @register_model_view(DataSource, 'sync')
- class DataSourceSyncView(GetReturnURLMixin, BaseObjectView):
- queryset = DataSource.objects.all()
- def get_required_permission(self):
- return 'core.sync_datasource'
- def get(self, request, pk):
- # Redirect GET requests to the object view
- datasource = get_object_or_404(self.queryset, pk=pk)
- return redirect(datasource.get_absolute_url())
- def post(self, request, pk):
- datasource = get_object_or_404(self.queryset, pk=pk)
- # Enqueue the sync job
- job = SyncDataSourceJob.enqueue(instance=datasource, user=request.user)
- messages.success(
- request,
- _("Queued job #{id} to sync {datasource}").format(id=job.pk, datasource=datasource)
- )
- return redirect(self.get_return_url(request, datasource))
- @register_model_view(DataSource, 'add', detail=False)
- @register_model_view(DataSource, 'edit')
- class DataSourceEditView(generic.ObjectEditView):
- queryset = DataSource.objects.all()
- form = forms.DataSourceForm
- @register_model_view(DataSource, 'delete')
- class DataSourceDeleteView(generic.ObjectDeleteView):
- queryset = DataSource.objects.all()
- @register_model_view(DataSource, 'bulk_import', path='import', detail=False)
- class DataSourceBulkImportView(generic.BulkImportView):
- queryset = DataSource.objects.all()
- model_form = forms.DataSourceImportForm
- @register_model_view(DataSource, 'bulk_edit', path='edit', detail=False)
- class DataSourceBulkEditView(generic.BulkEditView):
- queryset = DataSource.objects.annotate(
- count_files=count_related(DataFile, 'source')
- )
- filterset = filtersets.DataSourceFilterSet
- table = tables.DataSourceTable
- form = forms.DataSourceBulkEditForm
- @register_model_view(DataSource, 'bulk_rename', path='rename', detail=False)
- class DataSourceBulkRenameView(generic.BulkRenameView):
- queryset = DataSource.objects.all()
- filterset = filtersets.DataSourceFilterSet
- @register_model_view(DataSource, 'bulk_delete', path='delete', detail=False)
- class DataSourceBulkDeleteView(generic.BulkDeleteView):
- queryset = DataSource.objects.annotate(
- count_files=count_related(DataFile, 'source')
- )
- filterset = filtersets.DataSourceFilterSet
- table = tables.DataSourceTable
- #
- # Data files
- #
- @register_model_view(DataFile, 'list', path='', detail=False)
- class DataFileListView(generic.ObjectListView):
- queryset = DataFile.objects.defer('data')
- filterset = filtersets.DataFileFilterSet
- filterset_form = forms.DataFileFilterForm
- table = tables.DataFileTable
- actions = (BulkDelete,)
- @register_model_view(DataFile)
- class DataFileView(generic.ObjectView):
- queryset = DataFile.objects.all()
- actions = (DeleteObject,)
- @register_model_view(DataFile, 'delete')
- class DataFileDeleteView(generic.ObjectDeleteView):
- queryset = DataFile.objects.all()
- @register_model_view(DataFile, 'bulk_delete', path='delete', detail=False)
- class DataFileBulkDeleteView(generic.BulkDeleteView):
- queryset = DataFile.objects.defer('data')
- filterset = filtersets.DataFileFilterSet
- table = tables.DataFileTable
- #
- # Jobs
- #
- @register_model_view(Job, 'list', path='', detail=False)
- class JobListView(generic.ObjectListView):
- queryset = Job.objects.defer('data')
- filterset = filtersets.JobFilterSet
- filterset_form = forms.JobFilterForm
- table = tables.JobTable
- actions = (BulkExport, BulkDelete)
- @register_model_view(Job)
- class JobView(generic.ObjectView):
- queryset = Job.objects.all()
- actions = (DeleteObject,)
- @register_model_view(Job, 'log')
- class JobLogView(generic.ObjectView):
- queryset = Job.objects.all()
- actions = (DeleteObject,)
- template_name = 'core/job/log.html'
- tab = ViewTab(
- label=_('Log'),
- badge=lambda obj: len(obj.log_entries),
- weight=500,
- )
- def get_extra_context(self, request, instance):
- table = JobLogEntryTable(instance.log_entries)
- table.configure(request)
- return {
- 'table': table,
- }
- @register_model_view(Job, 'delete')
- class JobDeleteView(generic.ObjectDeleteView):
- queryset = Job.objects.defer('data')
- @register_model_view(Job, 'bulk_delete', path='delete', detail=False)
- class JobBulkDeleteView(generic.BulkDeleteView):
- queryset = Job.objects.defer('data')
- filterset = filtersets.JobFilterSet
- table = tables.JobTable
- #
- # Change logging
- #
- @register_model_view(ObjectChange, 'list', path='', detail=False)
- class ObjectChangeListView(generic.ObjectListView):
- queryset = None
- filterset = filtersets.ObjectChangeFilterSet
- filterset_form = forms.ObjectChangeFilterForm
- table = tables.ObjectChangeTable
- template_name = 'core/objectchange_list.html'
- actions = (BulkExport,)
- def get_queryset(self, request):
- return ObjectChange.objects.valid_models()
- @register_model_view(ObjectChange)
- class ObjectChangeView(generic.ObjectView):
- queryset = None
- def get_queryset(self, request):
- return ObjectChange.objects.valid_models()
- def get_extra_context(self, request, instance):
- related_changes = ObjectChange.objects.valid_models().restrict(request.user, 'view').filter(
- request_id=instance.request_id
- ).exclude(
- pk=instance.pk
- )
- related_changes_table = tables.ObjectChangeTable(
- data=related_changes[:50],
- orderable=False
- )
- related_changes_table.configure(request)
- objectchanges = ObjectChange.objects.valid_models().restrict(request.user, 'view').filter(
- changed_object_type=instance.changed_object_type,
- changed_object_id=instance.changed_object_id,
- )
- next_change = objectchanges.filter(time__gt=instance.time).order_by('time').first()
- prev_change = objectchanges.filter(time__lt=instance.time).order_by('-time').first()
- if not instance.prechange_data and instance.action in ['update', 'delete'] and prev_change:
- non_atomic_change = True
- prechange_data = prev_change.postchange_data_clean
- else:
- non_atomic_change = False
- prechange_data = instance.prechange_data_clean
- if prechange_data and instance.postchange_data:
- diff_added = shallow_compare_dict(
- prechange_data or dict(),
- instance.postchange_data_clean or dict(),
- exclude=['last_updated'],
- )
- diff_removed = {
- x: prechange_data.get(x) for x in diff_added
- } if prechange_data else {}
- else:
- diff_added = None
- diff_removed = None
- return {
- 'diff_added': diff_added,
- 'diff_removed': diff_removed,
- 'next_change': next_change,
- 'prev_change': prev_change,
- 'related_changes_table': related_changes_table,
- 'related_changes_count': related_changes.count(),
- 'non_atomic_change': non_atomic_change
- }
- #
- # Config Revisions
- #
- @register_model_view(ConfigRevision, 'list', path='', detail=False)
- class ConfigRevisionListView(generic.ObjectListView):
- queryset = ConfigRevision.objects.all()
- filterset = filtersets.ConfigRevisionFilterSet
- filterset_form = forms.ConfigRevisionFilterForm
- table = tables.ConfigRevisionTable
- actions = (AddObject, BulkExport)
- @register_model_view(ConfigRevision)
- class ConfigRevisionView(generic.ObjectView):
- queryset = ConfigRevision.objects.all()
- def get_extra_context(self, request, instance):
- """
- Retrieve additional context for a given request and instance.
- """
- # Copy the revision data to avoid modifying the original
- config = deepcopy(instance.data or {})
- # Serialize any JSON-based classes
- for attr in ['CUSTOM_VALIDATORS', 'DEFAULT_USER_PREFERENCES', 'PROTECTION_RULES']:
- if attr in config:
- config[attr] = json.dumps(config[attr], cls=ConfigJSONEncoder, indent=4)
- return {
- 'config': config,
- }
- @register_model_view(ConfigRevision, 'add', detail=False)
- class ConfigRevisionEditView(generic.ObjectEditView):
- queryset = ConfigRevision.objects.all()
- form = forms.ConfigRevisionForm
- @register_model_view(ConfigRevision, 'delete')
- class ConfigRevisionDeleteView(generic.ObjectDeleteView):
- queryset = ConfigRevision.objects.all()
- @register_model_view(ConfigRevision, 'bulk_delete', path='delete', detail=False)
- class ConfigRevisionBulkDeleteView(generic.BulkDeleteView):
- queryset = ConfigRevision.objects.all()
- filterset = filtersets.ConfigRevisionFilterSet
- table = tables.ConfigRevisionTable
- @register_model_view(ConfigRevision, 'restore')
- class ConfigRevisionRestoreView(ContentTypePermissionRequiredMixin, View):
- def get_required_permission(self):
- return 'core.configrevision_edit'
- def get(self, request, pk):
- candidate_config = get_object_or_404(ConfigRevision, pk=pk)
- # Get the current ConfigRevision
- config_version = get_config().version
- current_config = ConfigRevision.objects.filter(pk=config_version).first()
- params = []
- for param in PARAMS:
- params.append((
- param.name,
- current_config.data.get(param.name, None) if current_config else None,
- candidate_config.data.get(param.name, None)
- ))
- return render(request, 'core/configrevision_restore.html', {
- 'object': candidate_config,
- 'params': params,
- })
- def post(self, request, pk):
- if not request.user.has_perm('core.configrevision_edit'):
- return HttpResponseForbidden()
- candidate_config = get_object_or_404(ConfigRevision, pk=pk)
- candidate_config.activate()
- messages.success(request, _("Restored configuration revision #{id}").format(id=pk))
- return redirect(candidate_config.get_absolute_url())
- #
- # Background Tasks (RQ)
- #
- class BaseRQView(UserPassesTestMixin, View):
- def test_func(self):
- return self.request.user.is_superuser
- class BackgroundQueueListView(TableMixin, BaseRQView):
- table = tables.BackgroundQueueTable
- def get(self, request):
- data = get_statistics(run_maintenance_tasks=True)["queues"]
- table = self.get_table(data, request, bulk_actions=False)
- return render(request, 'core/rq_queue_list.html', {
- 'table': table,
- })
- class BackgroundTaskListView(TableMixin, BaseRQView):
- table = tables.BackgroundTaskTable
- def get_table_data(self, request, queue, status):
- # Call get_jobs() to returned queued tasks
- if status == RQJobStatus.QUEUED:
- return queue.get_jobs()
- return get_rq_jobs_from_status(queue, status)
- def get(self, request, queue_index, status):
- queue = get_queue_by_index(queue_index)
- data = self.get_table_data(request, queue, status)
- table = self.get_table(data, request, False)
- # If this is an HTMX request, return only the rendered table HTML
- if htmx_partial(request):
- return render(request, 'htmx/table.html', {
- 'table': table,
- })
- return render(request, 'core/rq_task_list.html', {
- 'table': table,
- 'queue': queue,
- 'status': status,
- })
- class BackgroundTaskView(BaseRQView):
- def get(self, request, job_id):
- # all the RQ queues should use the same connection
- config = QUEUES_LIST[0]
- try:
- job = RQ_Job.fetch(job_id, connection=get_redis_connection(config['connection_config']),)
- except NoSuchJobError:
- raise Http404(_("Job {job_id} not found").format(job_id=job_id))
- queue_index = QUEUES_MAP[job.origin]
- queue = get_queue_by_index(queue_index)
- try:
- exc_info = job._exc_info
- except AttributeError:
- exc_info = None
- return render(request, 'core/rq_task.html', {
- 'queue': queue,
- 'job': job,
- 'queue_index': queue_index,
- 'dependency_id': job._dependency_id,
- 'exc_info': exc_info,
- })
- class BackgroundTaskDeleteView(BaseRQView):
- def get(self, request, job_id):
- if not request.htmx:
- return redirect(reverse('core:background_queue_list'))
- form = ConfirmationForm(initial=request.GET)
- return render(request, 'htmx/delete_form.html', {
- 'object_type': 'background task',
- 'object': job_id,
- 'form': form,
- 'form_url': reverse('core:background_task_delete', kwargs={'job_id': job_id})
- })
- def post(self, request, job_id):
- form = ConfirmationForm(request.POST)
- if form.is_valid():
- delete_rq_job(job_id)
- messages.success(request, _('Job {id} has been deleted.').format(id=job_id))
- else:
- messages.error(request, _('Error deleting job {id}: {error}').format(id=job_id, error=form.errors[0]))
- return redirect(reverse('core:background_queue_list'))
- class BackgroundTaskRequeueView(BaseRQView):
- def get(self, request, job_id):
- requeue_rq_job(job_id)
- messages.success(request, _('Job {id} has been re-enqueued.').format(id=job_id))
- return redirect(reverse('core:background_task', args=[job_id]))
- class BackgroundTaskEnqueueView(BaseRQView):
- def get(self, request, job_id):
- # all the RQ queues should use the same connection
- enqueue_rq_job(job_id)
- messages.success(request, _('Job {id} has been enqueued.').format(id=job_id))
- return redirect(reverse('core:background_task', args=[job_id]))
- class BackgroundTaskStopView(BaseRQView):
- def get(self, request, job_id):
- stopped_jobs = stop_rq_job(job_id)
- if len(stopped_jobs) == 1:
- messages.success(request, _('Job {id} has been stopped.').format(id=job_id))
- else:
- messages.error(request, _('Failed to stop job {id}').format(id=job_id))
- return redirect(reverse('core:background_task', args=[job_id]))
- class WorkerListView(TableMixin, BaseRQView):
- table = tables.WorkerTable
- def get_table_data(self, request, queue):
- clean_worker_registry(queue)
- all_workers = Worker.all(queue.connection)
- workers = [worker for worker in all_workers if queue.name in worker.queue_names()]
- return workers
- def get(self, request, queue_index):
- queue = get_queue_by_index(queue_index)
- data = self.get_table_data(request, queue)
- table = self.get_table(data, request, False)
- # If this is an HTMX request, return only the rendered table HTML
- if htmx_partial(request):
- if not request.htmx.target:
- table.embedded = True
- # Hide selection checkboxes
- if 'pk' in table.base_columns:
- table.columns.hide('pk')
- return render(request, 'htmx/table.html', {
- 'table': table,
- 'queue': queue,
- })
- return render(request, 'core/rq_worker_list.html', {
- 'table': table,
- 'queue': queue,
- })
- class WorkerView(BaseRQView):
- def get(self, request, key):
- # all the RQ queues should use the same connection
- config = QUEUES_LIST[0]
- worker = Worker.find_by_key('rq:worker:' + key, connection=get_redis_connection(config['connection_config']))
- # Convert microseconds to milliseconds
- worker.total_working_time = worker.total_working_time / 1000
- return render(request, 'core/rq_worker.html', {
- 'worker': worker,
- 'job': worker.get_current_job(),
- 'total_working_time': worker.total_working_time * 1000,
- })
- #
- # System
- #
- class SystemView(UserPassesTestMixin, View):
- def test_func(self):
- return self.request.user.is_superuser
- def get(self, request):
- # System status
- psql_version = db_name = db_size = None
- try:
- with connection.cursor() as cursor:
- cursor.execute("SELECT version()")
- psql_version = cursor.fetchone()[0]
- psql_version = psql_version.split('(')[0].strip()
- cursor.execute("SELECT current_database()")
- db_name = cursor.fetchone()[0]
- cursor.execute(f"SELECT pg_size_pretty(pg_database_size('{db_name}'))")
- db_size = cursor.fetchone()[0]
- except (ProgrammingError, IndexError):
- pass
- stats = {
- 'netbox_release': settings.RELEASE,
- 'django_version': django_version,
- 'python_version': platform.python_version(),
- 'postgresql_version': psql_version,
- 'database_name': db_name,
- 'database_size': db_size,
- 'rq_worker_count': Worker.count(get_connection('default')),
- }
- # Django apps
- django_apps = get_installed_apps()
- # Configuration
- config = get_config()
- # Plugins
- plugins = get_installed_plugins()
- # Object counts
- objects = {}
- for ot in ObjectType.objects.public().order_by('app_label', 'model'):
- if model := ot.model_class():
- objects[ot] = model.objects.count()
- # Raw data export
- if 'export' in request.GET:
- stats['netbox_release'] = stats['netbox_release'].asdict()
- params = [param.name for param in PARAMS]
- data = {
- **stats,
- 'django_apps': django_apps,
- 'plugins': plugins,
- 'config': {
- k: getattr(config, k) for k in sorted(params)
- },
- 'objects': {
- f'{ot.app_label}.{ot.model}': count for ot, count in objects.items()
- },
- }
- response = HttpResponse(json.dumps(data, cls=ConfigJSONEncoder, indent=4), content_type='text/json')
- response['Content-Disposition'] = 'attachment; filename="netbox.json"'
- return response
- # Serialize any JSON-based classes
- for attr in ['CUSTOM_VALIDATORS', 'DEFAULT_USER_PREFERENCES', 'PROTECTION_RULES']:
- if hasattr(config, attr) and getattr(config, attr, None):
- setattr(config, attr, json.dumps(getattr(config, attr), cls=ConfigJSONEncoder, indent=4))
- return render(request, 'core/system.html', {
- 'stats': stats,
- 'django_apps': django_apps,
- 'config': config,
- 'plugins': plugins,
- 'objects': objects,
- })
- #
- # Plugins
- #
- class BasePluginView(UserPassesTestMixin, View):
- CACHE_KEY_CATALOG_ERROR = 'plugins-catalog-error'
- def test_func(self):
- return self.request.user.is_superuser
- def get_cached_plugins(self, request):
- catalog_plugins = {}
- catalog_plugins_error = cache.get(self.CACHE_KEY_CATALOG_ERROR, default=False)
- if not catalog_plugins_error:
- catalog_plugins = get_catalog_plugins()
- if not catalog_plugins and not settings.ISOLATED_DEPLOYMENT:
- # Cache for 5 minutes to avoid spamming connection
- cache.set(self.CACHE_KEY_CATALOG_ERROR, True, 300)
- messages.warning(request, _("Plugins catalog could not be loaded"))
- return get_local_plugins(catalog_plugins)
- class PluginListView(BasePluginView):
- def get(self, request):
- q = request.GET.get('q', None)
- plugins = self.get_cached_plugins(request).values()
- if q:
- plugins = [obj for obj in plugins if q.casefold() in obj.title_short.casefold()]
- plugins = [plugin for plugin in plugins if not plugin.hidden]
- table = CatalogPluginTable(plugins, user=request.user)
- table.configure(request)
- # If this is an HTMX request, return only the rendered table HTML
- if htmx_partial(request):
- return render(request, 'htmx/table.html', {
- 'table': table,
- })
- return render(request, 'core/plugin_list.html', {
- 'table': table,
- })
- class PluginView(BasePluginView):
- def get(self, request, name):
- plugins = self.get_cached_plugins(request)
- if name not in plugins:
- raise Http404(_("Plugin {name} not found").format(name=name))
- plugin = plugins[name]
- table = PluginVersionTable(plugin.release_recent_history, user=request.user)
- table.configure(request)
- return render(request, 'core/plugin.html', {
- 'plugin': plugin,
- 'table': table,
- })
|