Просмотр исходного кода

Removes the `housekeeping` management command (closes #21565)

The command was deprecated in v4.6.0; all housekeeping tasks are now
handled automatically by NetBox's built-in job scheduler.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Jeremy Stretch 2 недель назад
Родитель
Сommit
a59ab6f216
2 измененных файлов с 13 добавлено и 180 удалено
  1. 13 0
      docs/release-notes/version-4.7.md
  2. 0 180
      netbox/extras/management/commands/housekeeping.py

+ 13 - 0
docs/release-notes/version-4.7.md

@@ -0,0 +1,13 @@
+# NetBox v4.7
+
+## v4.7.0 (FUTURE)
+
+### Breaking Changes
+
+### Enhancements
+
+### Other Changes
+
+* [#21565](https://github.com/netbox-community/netbox/issues/21565) - Remove the obsolete `housekeeping` management command
+
+### REST API Changes

+ 0 - 180
netbox/extras/management/commands/housekeeping.py

@@ -1,180 +0,0 @@
-import warnings
-from datetime import timedelta
-from importlib import import_module
-
-import requests
-from django.conf import settings
-from django.core.cache import cache
-from django.core.management.base import BaseCommand
-from django.db.models import Exists, OuterRef, Subquery
-from django.utils import timezone
-from packaging import version
-
-from core.choices import ObjectChangeActionChoices
-from core.models import Job, ObjectChange
-from netbox.config import Config
-from utilities.proxy import resolve_proxies
-
-
-class Command(BaseCommand):
-    help = "Perform nightly housekeeping tasks [DEPRECATED]"
-
-    def handle(self, *args, **options):
-        warnings.warn(
-            "\n\nDEPRECATION WARNING\n"
-            "Running this command is no longer necessary: All housekeeping tasks\n"
-            "are addressed automatically via NetBox's built-in job scheduler. It\n"
-            "will be removed in a future release.\n",
-            category=FutureWarning,
-        )
-
-        config = Config()
-
-        # Clear expired authentication sessions (essentially replicating the `clearsessions` command)
-        if options['verbosity']:
-            self.stdout.write("[*] Clearing expired authentication sessions")
-            if options['verbosity'] >= 2:
-                self.stdout.write(f"\tConfigured session engine: {settings.SESSION_ENGINE}")
-        engine = import_module(settings.SESSION_ENGINE)
-        try:
-            engine.SessionStore.clear_expired()
-            if options['verbosity']:
-                self.stdout.write("\tSessions cleared.", self.style.SUCCESS)
-        except NotImplementedError:
-            if options['verbosity']:
-                self.stdout.write(
-                    f"\tThe configured session engine ({settings.SESSION_ENGINE}) does not support "
-                    f"clearing sessions; skipping."
-                )
-
-        # Delete expired ObjectChanges
-        if options['verbosity']:
-            self.stdout.write('[*] Checking for expired changelog records')
-        if config.CHANGELOG_RETENTION:
-            cutoff = timezone.now() - timedelta(days=config.CHANGELOG_RETENTION)
-            if options['verbosity'] >= 2:
-                self.stdout.write(f'\tRetention period: {config.CHANGELOG_RETENTION} days')
-                self.stdout.write(f'\tCut-off time: {cutoff}')
-
-            expired_qs = ObjectChange.objects.filter(time__lt=cutoff)
-
-            # When enabled, retain each object's original create and most recent update record while pruning expired
-            # changelog entries. This applies only to objects without a delete record.
-            if config.CHANGELOG_RETAIN_CREATE_LAST_UPDATE:
-                if options['verbosity'] >= 2:
-                    self.stdout.write('\tRetaining create & last update records for non-deleted objects')
-
-                deleted_exists = ObjectChange.objects.filter(
-                    action=ObjectChangeActionChoices.ACTION_DELETE,
-                    changed_object_type_id=OuterRef('changed_object_type_id'),
-                    changed_object_id=OuterRef('changed_object_id'),
-                )
-
-                # Keep create records only where no delete exists for that object
-                create_pks_to_keep = (
-                    ObjectChange.objects.filter(action=ObjectChangeActionChoices.ACTION_CREATE)
-                    .annotate(has_delete=Exists(deleted_exists))
-                    .filter(has_delete=False)
-                    .values('pk')
-                )
-
-                # Keep the most recent update per object only where no delete exists for the object
-                latest_update_pks_to_keep = (
-                    ObjectChange.objects.filter(action=ObjectChangeActionChoices.ACTION_UPDATE)
-                    .annotate(has_delete=Exists(deleted_exists))
-                    .filter(has_delete=False)
-                    .order_by('changed_object_type_id', 'changed_object_id', '-time', '-pk')
-                    .distinct('changed_object_type_id', 'changed_object_id')
-                    .values('pk')
-                )
-
-                expired_qs = expired_qs.exclude(pk__in=Subquery(create_pks_to_keep))
-                expired_qs = expired_qs.exclude(pk__in=Subquery(latest_update_pks_to_keep))
-
-            expired_records = expired_qs.count()
-            if expired_records:
-                if options['verbosity']:
-                    self.stdout.write(
-                        f'\tDeleting {expired_records} expired records... ', self.style.WARNING, ending=''
-                    )
-                    self.stdout.flush()
-                expired_qs.delete()
-                if options['verbosity']:
-                    self.stdout.write('Done.', self.style.SUCCESS)
-            elif options['verbosity']:
-                self.stdout.write('\tNo expired records found.', self.style.SUCCESS)
-        elif options['verbosity']:
-            self.stdout.write(
-                f'\tSkipping: No retention period specified (CHANGELOG_RETENTION = {config.CHANGELOG_RETENTION})'
-            )
-
-        # Delete expired Jobs
-        if options['verbosity']:
-            self.stdout.write("[*] Checking for expired jobs")
-        if config.JOB_RETENTION:
-            cutoff = timezone.now() - timedelta(days=config.JOB_RETENTION)
-            if options['verbosity'] >= 2:
-                self.stdout.write(f"\tRetention period: {config.JOB_RETENTION} days")
-                self.stdout.write(f"\tCut-off time: {cutoff}")
-            expired_records = Job.objects.filter(created__lt=cutoff).count()
-            if expired_records:
-                if options['verbosity']:
-                    self.stdout.write(
-                        f"\tDeleting {expired_records} expired records... ",
-                        self.style.WARNING,
-                        ending=""
-                    )
-                    self.stdout.flush()
-                Job.objects.filter(created__lt=cutoff).delete()
-                if options['verbosity']:
-                    self.stdout.write("Done.", self.style.SUCCESS)
-            elif options['verbosity']:
-                self.stdout.write("\tNo expired records found.", self.style.SUCCESS)
-        elif options['verbosity']:
-            self.stdout.write(
-                f"\tSkipping: No retention period specified (JOB_RETENTION = {config.JOB_RETENTION})"
-            )
-
-        # Check for new releases (if enabled)
-        if options['verbosity']:
-            self.stdout.write("[*] Checking for latest release")
-        if settings.ISOLATED_DEPLOYMENT:
-            if options['verbosity']:
-                self.stdout.write("\tSkipping: ISOLATED_DEPLOYMENT is enabled")
-        elif settings.RELEASE_CHECK_URL:
-            headers = {
-                'Accept': 'application/vnd.github.v3+json',
-            }
-
-            try:
-                if options['verbosity'] >= 2:
-                    self.stdout.write(f"\tFetching {settings.RELEASE_CHECK_URL}")
-                response = requests.get(
-                    url=settings.RELEASE_CHECK_URL,
-                    headers=headers,
-                    proxies=resolve_proxies(url=settings.RELEASE_CHECK_URL)
-                )
-                response.raise_for_status()
-
-                releases = []
-                for release in response.json():
-                    if 'tag_name' not in release or release.get('devrelease') or release.get('prerelease'):
-                        continue
-                    releases.append((version.parse(release['tag_name']), release.get('html_url')))
-                latest_release = max(releases)
-                if options['verbosity'] >= 2:
-                    self.stdout.write(f"\tFound {len(response.json())} releases; {len(releases)} usable")
-                if options['verbosity']:
-                    self.stdout.write(f"\tLatest release: {latest_release[0]}", self.style.SUCCESS)
-
-                # Cache the most recent release
-                cache.set('latest_release', latest_release, None)
-
-            except requests.exceptions.RequestException as exc:
-                self.stdout.write(f"\tRequest error: {exc}", self.style.ERROR)
-        else:
-            if options['verbosity']:
-                self.stdout.write("\tSkipping: RELEASE_CHECK_URL not set")
-
-        if options['verbosity']:
-            self.stdout.write("Finished.", self.style.SUCCESS)