jobs.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. import sys
  2. from datetime import timedelta
  3. from importlib import import_module
  4. import requests
  5. from django.conf import settings
  6. from django.core.cache import cache
  7. from django.utils import timezone
  8. from packaging import version
  9. from core.models import Job, ObjectChange
  10. from netbox.config import Config
  11. from netbox.jobs import JobRunner, system_job
  12. from netbox.search.backends import search_backend
  13. from utilities.proxy import resolve_proxies
  14. from .choices import DataSourceStatusChoices, JobIntervalChoices
  15. from .models import DataSource
  16. class SyncDataSourceJob(JobRunner):
  17. """
  18. Call sync() on a DataSource.
  19. """
  20. class Meta:
  21. name = 'Synchronization'
  22. @classmethod
  23. def enqueue(cls, *args, **kwargs):
  24. job = super().enqueue(*args, **kwargs)
  25. # Update the DataSource's synchronization status to queued
  26. if datasource := job.object:
  27. datasource.status = DataSourceStatusChoices.QUEUED
  28. DataSource.objects.filter(pk=datasource.pk).update(status=datasource.status)
  29. return job
  30. def run(self, *args, **kwargs):
  31. datasource = DataSource.objects.get(pk=self.job.object_id)
  32. self.logger.debug(f"Found DataSource ID {datasource.pk}")
  33. try:
  34. self.logger.info(f"Syncing data source {datasource}")
  35. datasource.sync()
  36. # Update the search cache for DataFiles belonging to this source
  37. self.logger.debug("Updating search cache for data files")
  38. search_backend.cache(datasource.datafiles.iterator())
  39. except Exception as e:
  40. self.logger.error(f"Error syncing data source: {e}")
  41. DataSource.objects.filter(pk=datasource.pk).update(status=DataSourceStatusChoices.FAILED)
  42. raise e
  43. self.logger.info("Syncing completed successfully")
  44. @system_job(interval=JobIntervalChoices.INTERVAL_DAILY)
  45. class SystemHousekeepingJob(JobRunner):
  46. """
  47. Perform daily system housekeeping functions.
  48. """
  49. class Meta:
  50. name = "System Housekeeping"
  51. def run(self, *args, **kwargs):
  52. # Skip if running in development or test mode
  53. if settings.DEBUG:
  54. self.logger.warning("Aborting execution: Debug is enabled")
  55. return
  56. if 'test' in sys.argv:
  57. self.logger.warning("Aborting execution: Tests are running")
  58. return
  59. self.send_census_report()
  60. self.clear_expired_sessions()
  61. self.prune_changelog()
  62. self.delete_expired_jobs()
  63. self.check_for_new_releases()
  64. def send_census_report(self):
  65. """
  66. Send a census report (if enabled).
  67. """
  68. self.logger.info("Reporting census data...")
  69. if settings.ISOLATED_DEPLOYMENT:
  70. self.logger.info("ISOLATED_DEPLOYMENT is enabled; skipping")
  71. return
  72. if not settings.CENSUS_REPORTING_ENABLED:
  73. self.logger.info("CENSUS_REPORTING_ENABLED is disabled; skipping")
  74. return
  75. census_data = {
  76. 'version': settings.RELEASE.full_version,
  77. 'python_version': sys.version.split()[0],
  78. 'deployment_id': settings.DEPLOYMENT_ID,
  79. }
  80. try:
  81. requests.get(
  82. url=settings.CENSUS_URL,
  83. params=census_data,
  84. timeout=3,
  85. proxies=resolve_proxies(url=settings.CENSUS_URL)
  86. )
  87. except requests.exceptions.RequestException:
  88. pass
  89. def clear_expired_sessions(self):
  90. """
  91. Clear any expired sessions from the database.
  92. """
  93. self.logger.info("Clearing expired sessions...")
  94. engine = import_module(settings.SESSION_ENGINE)
  95. try:
  96. engine.SessionStore.clear_expired()
  97. self.logger.info("Sessions cleared.")
  98. except NotImplementedError:
  99. self.logger.warning(
  100. f"The configured session engine ({settings.SESSION_ENGINE}) does not support "
  101. f"clearing sessions; skipping."
  102. )
  103. def prune_changelog(self):
  104. """
  105. Delete any ObjectChange records older than the configured changelog retention time (if any).
  106. """
  107. self.logger.info("Pruning old changelog entries...")
  108. config = Config()
  109. if not config.CHANGELOG_RETENTION:
  110. self.logger.info("No retention period specified; skipping.")
  111. return
  112. cutoff = timezone.now() - timedelta(days=config.CHANGELOG_RETENTION)
  113. self.logger.debug(
  114. f"Changelog retention period: {config.CHANGELOG_RETENTION} days ({cutoff:%Y-%m-%d %H:%M:%S})"
  115. )
  116. count = ObjectChange.objects.filter(time__lt=cutoff).delete()[0]
  117. self.logger.info(f"Deleted {count} expired changelog records")
  118. def delete_expired_jobs(self):
  119. """
  120. Delete any jobs older than the configured retention period (if any).
  121. """
  122. self.logger.info("Deleting expired jobs...")
  123. config = Config()
  124. if not config.JOB_RETENTION:
  125. self.logger.info("No retention period specified; skipping.")
  126. return
  127. cutoff = timezone.now() - timedelta(days=config.JOB_RETENTION)
  128. self.logger.debug(
  129. f"Job retention period: {config.JOB_RETENTION} days ({cutoff:%Y-%m-%d %H:%M:%S})"
  130. )
  131. count = Job.objects.filter(created__lt=cutoff).delete()[0]
  132. self.logger.info(f"Deleted {count} expired jobs")
  133. def check_for_new_releases(self):
  134. """
  135. Check for new releases and cache the latest release.
  136. """
  137. self.logger.info("Checking for new releases...")
  138. if settings.ISOLATED_DEPLOYMENT:
  139. self.logger.info("ISOLATED_DEPLOYMENT is enabled; skipping")
  140. return
  141. if not settings.RELEASE_CHECK_URL:
  142. self.logger.info("RELEASE_CHECK_URL is not set; skipping")
  143. return
  144. # Fetch the latest releases
  145. self.logger.debug(f"Release check URL: {settings.RELEASE_CHECK_URL}")
  146. try:
  147. response = requests.get(
  148. url=settings.RELEASE_CHECK_URL,
  149. headers={'Accept': 'application/vnd.github.v3+json'},
  150. proxies=resolve_proxies(url=settings.RELEASE_CHECK_URL)
  151. )
  152. response.raise_for_status()
  153. except requests.exceptions.RequestException as exc:
  154. self.logger.error(f"Error fetching release: {exc}")
  155. return
  156. # Determine the most recent stable release
  157. releases = []
  158. for release in response.json():
  159. if 'tag_name' not in release or release.get('devrelease') or release.get('prerelease'):
  160. continue
  161. releases.append((version.parse(release['tag_name']), release.get('html_url')))
  162. latest_release = max(releases)
  163. self.logger.debug(f"Found {len(response.json())} releases; {len(releases)} usable")
  164. self.logger.info(f"Latest release: {latest_release[0]}")
  165. # Cache the most recent release
  166. cache.set('latest_release', latest_release, None)