data.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. import hashlib
  2. import logging
  3. import os
  4. from fnmatch import fnmatchcase
  5. from urllib.parse import urlparse
  6. import yaml
  7. from django.conf import settings
  8. from django.contrib.contenttypes.fields import GenericForeignKey
  9. from django.core.exceptions import ValidationError
  10. from django.core.validators import RegexValidator
  11. from django.db import models
  12. from django.urls import reverse
  13. from django.utils import timezone
  14. from django.utils.translation import gettext_lazy as _
  15. from netbox.constants import CENSOR_TOKEN, CENSOR_TOKEN_CHANGED
  16. from netbox.models import PrimaryModel
  17. from netbox.models.features import JobsMixin
  18. from netbox.registry import registry
  19. from utilities.querysets import RestrictedQuerySet
  20. from ..choices import *
  21. from ..exceptions import SyncError
  22. __all__ = (
  23. 'AutoSyncRecord',
  24. 'DataFile',
  25. 'DataSource',
  26. )
  27. logger = logging.getLogger('netbox.core.data')
  28. class DataSource(JobsMixin, PrimaryModel):
  29. """
  30. A remote source, such as a git repository, from which DataFiles are synchronized.
  31. """
  32. name = models.CharField(
  33. verbose_name=_('name'),
  34. max_length=100,
  35. unique=True
  36. )
  37. type = models.CharField(
  38. verbose_name=_('type'),
  39. max_length=50
  40. )
  41. source_url = models.CharField(
  42. max_length=200,
  43. verbose_name=_('URL')
  44. )
  45. status = models.CharField(
  46. verbose_name=_('status'),
  47. max_length=50,
  48. choices=DataSourceStatusChoices,
  49. default=DataSourceStatusChoices.NEW,
  50. editable=False
  51. )
  52. enabled = models.BooleanField(
  53. verbose_name=_('enabled'),
  54. default=True
  55. )
  56. sync_interval = models.PositiveSmallIntegerField(
  57. verbose_name=_('sync interval'),
  58. choices=JobIntervalChoices,
  59. blank=True,
  60. null=True
  61. )
  62. ignore_rules = models.TextField(
  63. verbose_name=_('ignore rules'),
  64. blank=True,
  65. help_text=_("Patterns (one per line) matching files or paths to ignore when syncing")
  66. )
  67. parameters = models.JSONField(
  68. verbose_name=_('parameters'),
  69. blank=True,
  70. null=True
  71. )
  72. last_synced = models.DateTimeField(
  73. verbose_name=_('last synced'),
  74. blank=True,
  75. null=True,
  76. editable=False
  77. )
  78. class Meta:
  79. ordering = ('name',)
  80. verbose_name = _('data source')
  81. verbose_name_plural = _('data sources')
  82. def __str__(self):
  83. return f'{self.name}'
  84. @property
  85. def docs_url(self):
  86. return f'{settings.STATIC_URL}docs/models/{self._meta.app_label}/{self._meta.model_name}/'
  87. def get_type_display(self):
  88. if backend := registry['data_backends'].get(self.type):
  89. return backend.label
  90. return None
  91. def get_status_color(self):
  92. return DataSourceStatusChoices.colors.get(self.status)
  93. @property
  94. def url_scheme(self):
  95. return urlparse(self.source_url).scheme.lower()
  96. @property
  97. def backend_class(self):
  98. return registry['data_backends'].get(self.type)
  99. @property
  100. def ready_for_sync(self):
  101. return self.enabled and self.status not in (
  102. DataSourceStatusChoices.QUEUED,
  103. DataSourceStatusChoices.SYNCING
  104. )
  105. def clean(self):
  106. super().clean()
  107. # Validate data backend type
  108. if self.type and self.type not in registry['data_backends']:
  109. raise ValidationError({
  110. 'type': _("Unknown backend type: {type}".format(type=self.type))
  111. })
  112. # Ensure URL scheme matches selected type
  113. if self.backend_class.is_local and self.url_scheme not in ('file', ''):
  114. raise ValidationError({
  115. 'source_url': _("URLs for local sources must start with {scheme} (or specify no scheme)").format(
  116. scheme='file://'
  117. )
  118. })
  119. def save(self, *args, **kwargs):
  120. # If recurring sync is disabled for an existing DataSource, clear any pending sync jobs for it and reset its
  121. # "queued" status
  122. if not self._state.adding and not self.sync_interval:
  123. self.jobs.filter(status=JobStatusChoices.STATUS_PENDING).delete()
  124. if self.status == DataSourceStatusChoices.QUEUED and self.last_synced:
  125. self.status = DataSourceStatusChoices.COMPLETED
  126. elif self.status == DataSourceStatusChoices.QUEUED:
  127. self.status = DataSourceStatusChoices.NEW
  128. super().save(*args, **kwargs)
  129. def to_objectchange(self, action):
  130. objectchange = super().to_objectchange(action)
  131. # Censor any backend parameters marked as sensitive in the serialized data
  132. pre_change_params = {}
  133. post_change_params = {}
  134. if objectchange.prechange_data:
  135. pre_change_params = objectchange.prechange_data.get('parameters') or {} # parameters may be None
  136. if objectchange.postchange_data:
  137. post_change_params = objectchange.postchange_data.get('parameters') or {}
  138. for param in self.backend_class.sensitive_parameters:
  139. if post_change_params.get(param):
  140. if post_change_params[param] != pre_change_params.get(param):
  141. # Set the "changed" token if the parameter's value has been modified
  142. post_change_params[param] = CENSOR_TOKEN_CHANGED
  143. else:
  144. post_change_params[param] = CENSOR_TOKEN
  145. if pre_change_params.get(param):
  146. pre_change_params[param] = CENSOR_TOKEN
  147. return objectchange
  148. def get_backend(self):
  149. backend_params = self.parameters or {}
  150. return self.backend_class(self.source_url, **backend_params)
  151. def sync(self):
  152. """
  153. Create/update/delete child DataFiles as necessary to synchronize with the remote source.
  154. """
  155. from core.signals import post_sync, pre_sync
  156. if self.status == DataSourceStatusChoices.SYNCING:
  157. raise SyncError(_("Cannot initiate sync; syncing already in progress."))
  158. # Emit the pre_sync signal
  159. pre_sync.send(sender=self.__class__, instance=self)
  160. self.status = DataSourceStatusChoices.SYNCING
  161. DataSource.objects.filter(pk=self.pk).update(status=self.status)
  162. # Replicate source data locally
  163. try:
  164. backend = self.get_backend()
  165. except ModuleNotFoundError as e:
  166. raise SyncError(
  167. _("There was an error initializing the backend. A dependency needs to be installed: ") + str(e)
  168. )
  169. with backend.fetch() as local_path:
  170. logger.debug(f'Syncing files from source root {local_path}')
  171. data_files = self.datafiles.all()
  172. known_paths = {df.path for df in data_files}
  173. logger.debug(f'Starting with {len(known_paths)} known files')
  174. # Check for any updated/deleted files
  175. updated_files = []
  176. deleted_file_ids = []
  177. for datafile in data_files:
  178. try:
  179. if datafile.refresh_from_disk(source_root=local_path):
  180. updated_files.append(datafile)
  181. except FileNotFoundError:
  182. # File no longer exists
  183. deleted_file_ids.append(datafile.pk)
  184. continue
  185. # Bulk update modified files
  186. updated_count = DataFile.objects.bulk_update(updated_files, ('last_updated', 'size', 'hash', 'data'))
  187. logger.debug(f"Updated {updated_count} files")
  188. # Bulk delete deleted files
  189. deleted_count, __ = DataFile.objects.filter(pk__in=deleted_file_ids).delete()
  190. logger.debug(f"Deleted {deleted_count} files")
  191. # Walk the local replication to find new files
  192. new_paths = self._walk(local_path) - known_paths
  193. # Bulk create new files
  194. new_datafiles = []
  195. for path in new_paths:
  196. datafile = DataFile(source=self, path=path)
  197. datafile.refresh_from_disk(source_root=local_path)
  198. datafile.full_clean()
  199. new_datafiles.append(datafile)
  200. created_count = len(DataFile.objects.bulk_create(new_datafiles, batch_size=100))
  201. logger.debug(f"Created {created_count} data files")
  202. # Update status & last_synced time
  203. self.status = DataSourceStatusChoices.COMPLETED
  204. self.last_synced = timezone.now()
  205. DataSource.objects.filter(pk=self.pk).update(status=self.status, last_synced=self.last_synced)
  206. # Emit the post_sync signal
  207. post_sync.send(sender=self.__class__, instance=self)
  208. sync.alters_data = True
  209. def _walk(self, root):
  210. """
  211. Return a set of all non-excluded files within the root path.
  212. """
  213. logger.debug(f"Walking {root}...")
  214. paths = set()
  215. for path, dir_names, file_names in os.walk(root):
  216. path = path.split(root)[1].lstrip('/') # Strip root path
  217. if path.startswith('.'):
  218. continue
  219. for file_name in file_names:
  220. file_path = os.path.join(path, file_name)
  221. if not self._ignore(file_path):
  222. paths.add(file_path)
  223. logger.debug(f"Found {len(paths)} files")
  224. return paths
  225. def _ignore(self, file_path):
  226. """
  227. Returns a boolean indicating whether the file should be ignored per the DataSource's configured
  228. ignore rules. file_path is the full relative path (e.g. "subdir/file.txt").
  229. """
  230. if os.path.basename(file_path).startswith('.'):
  231. return True
  232. for rule in self.ignore_rules.splitlines():
  233. if fnmatchcase(file_path, rule) or fnmatchcase(os.path.basename(file_path), rule):
  234. return True
  235. return False
  236. class DataFile(models.Model):
  237. """
  238. The database representation of a remote file fetched from a remote DataSource. DataFile instances should be created,
  239. updated, or deleted only by calling DataSource.sync().
  240. """
  241. created = models.DateTimeField(
  242. verbose_name=_('created'),
  243. auto_now_add=True
  244. )
  245. last_updated = models.DateTimeField(
  246. verbose_name=_('last updated'),
  247. editable=False
  248. )
  249. source = models.ForeignKey(
  250. to='core.DataSource',
  251. on_delete=models.CASCADE,
  252. related_name='datafiles',
  253. editable=False
  254. )
  255. path = models.CharField(
  256. verbose_name=_('path'),
  257. max_length=1000,
  258. editable=False,
  259. help_text=_("File path relative to the data source's root")
  260. )
  261. size = models.PositiveIntegerField(
  262. editable=False,
  263. verbose_name=_('size')
  264. )
  265. hash = models.CharField(
  266. verbose_name=_('hash'),
  267. max_length=64,
  268. editable=False,
  269. validators=[
  270. RegexValidator(regex='^[0-9a-f]{64}$', message=_("Length must be 64 hexadecimal characters."))
  271. ],
  272. help_text=_('SHA256 hash of the file data')
  273. )
  274. data = models.BinaryField()
  275. objects = RestrictedQuerySet.as_manager()
  276. class Meta:
  277. ordering = ('source', 'path')
  278. constraints = (
  279. models.UniqueConstraint(
  280. fields=('source', 'path'),
  281. name='%(app_label)s_%(class)s_unique_source_path'
  282. ),
  283. )
  284. verbose_name = _('data file')
  285. verbose_name_plural = _('data files')
  286. def __str__(self):
  287. return self.path
  288. def get_absolute_url(self):
  289. return reverse('core:datafile', args=[self.pk])
  290. @property
  291. def data_as_string(self):
  292. if not self.data:
  293. return None
  294. try:
  295. return self.data.decode('utf-8')
  296. except UnicodeDecodeError:
  297. return None
  298. def get_data(self):
  299. """
  300. Attempt to read the file data as JSON/YAML and return a native Python object.
  301. """
  302. # TODO: Something more robust
  303. return yaml.safe_load(self.data_as_string)
  304. def refresh_from_disk(self, source_root):
  305. """
  306. Update instance attributes from the file on disk. Returns True if any attribute
  307. has changed.
  308. """
  309. file_path = os.path.join(source_root, self.path)
  310. with open(file_path, 'rb') as f:
  311. file_hash = hashlib.sha256(f.read()).hexdigest()
  312. # Update instance file attributes & data
  313. if is_modified := file_hash != self.hash:
  314. self.last_updated = timezone.now()
  315. self.size = os.path.getsize(file_path)
  316. self.hash = file_hash
  317. with open(file_path, 'rb') as f:
  318. self.data = f.read()
  319. return is_modified
  320. class AutoSyncRecord(models.Model):
  321. """
  322. Maps a DataFile to a synced object for efficient automatic updating.
  323. """
  324. datafile = models.ForeignKey(
  325. to=DataFile,
  326. on_delete=models.CASCADE,
  327. related_name='+'
  328. )
  329. object_type = models.ForeignKey(
  330. to='contenttypes.ContentType',
  331. on_delete=models.CASCADE,
  332. related_name='+'
  333. )
  334. object_id = models.PositiveBigIntegerField()
  335. object = GenericForeignKey(
  336. ct_field='object_type',
  337. fk_field='object_id'
  338. )
  339. _netbox_private = True
  340. class Meta:
  341. constraints = (
  342. models.UniqueConstraint(
  343. fields=('object_type', 'object_id'),
  344. name='%(app_label)s_%(class)s_object'
  345. ),
  346. )
  347. verbose_name = _('auto sync record')
  348. verbose_name_plural = _('auto sync records')