data.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. import hashlib
  2. import logging
  3. import os
  4. from fnmatch import fnmatchcase
  5. from urllib.parse import urlparse
  6. import yaml
  7. from django.conf import settings
  8. from django.contrib.contenttypes.fields import GenericForeignKey
  9. from django.core.exceptions import ValidationError
  10. from django.core.validators import RegexValidator
  11. from django.db import models
  12. from django.urls import reverse
  13. from django.utils import timezone
  14. from django.utils.translation import gettext as _
  15. from netbox.constants import CENSOR_TOKEN, CENSOR_TOKEN_CHANGED
  16. from netbox.models import PrimaryModel
  17. from netbox.models.features import JobsMixin
  18. from netbox.registry import registry
  19. from utilities.querysets import RestrictedQuerySet
  20. from ..choices import *
  21. from ..exceptions import SyncError
  22. __all__ = (
  23. 'AutoSyncRecord',
  24. 'DataFile',
  25. 'DataSource',
  26. )
  27. logger = logging.getLogger('netbox.core.data')
  28. class DataSource(JobsMixin, PrimaryModel):
  29. """
  30. A remote source, such as a git repository, from which DataFiles are synchronized.
  31. """
  32. name = models.CharField(
  33. verbose_name=_('name'),
  34. max_length=100,
  35. unique=True
  36. )
  37. type = models.CharField(
  38. verbose_name=_('type'),
  39. max_length=50
  40. )
  41. source_url = models.CharField(
  42. max_length=200,
  43. verbose_name=_('URL')
  44. )
  45. status = models.CharField(
  46. verbose_name=_('status'),
  47. max_length=50,
  48. choices=DataSourceStatusChoices,
  49. default=DataSourceStatusChoices.NEW,
  50. editable=False
  51. )
  52. enabled = models.BooleanField(
  53. verbose_name=_('enabled'),
  54. default=True
  55. )
  56. sync_interval = models.PositiveSmallIntegerField(
  57. verbose_name=_('sync interval'),
  58. choices=JobIntervalChoices,
  59. blank=True,
  60. null=True
  61. )
  62. ignore_rules = models.TextField(
  63. verbose_name=_('ignore rules'),
  64. blank=True,
  65. help_text=_("Patterns (one per line) matching files to ignore when syncing")
  66. )
  67. parameters = models.JSONField(
  68. verbose_name=_('parameters'),
  69. blank=True,
  70. null=True
  71. )
  72. last_synced = models.DateTimeField(
  73. verbose_name=_('last synced'),
  74. blank=True,
  75. null=True,
  76. editable=False
  77. )
  78. class Meta:
  79. ordering = ('name',)
  80. verbose_name = _('data source')
  81. verbose_name_plural = _('data sources')
  82. def __str__(self):
  83. return f'{self.name}'
  84. @property
  85. def docs_url(self):
  86. return f'{settings.STATIC_URL}docs/models/{self._meta.app_label}/{self._meta.model_name}/'
  87. def get_type_display(self):
  88. if backend := registry['data_backends'].get(self.type):
  89. return backend.label
  90. def get_status_color(self):
  91. return DataSourceStatusChoices.colors.get(self.status)
  92. @property
  93. def url_scheme(self):
  94. return urlparse(self.source_url).scheme.lower()
  95. @property
  96. def backend_class(self):
  97. return registry['data_backends'].get(self.type)
  98. @property
  99. def ready_for_sync(self):
  100. return self.enabled and self.status not in (
  101. DataSourceStatusChoices.QUEUED,
  102. DataSourceStatusChoices.SYNCING
  103. )
  104. def clean(self):
  105. super().clean()
  106. # Validate data backend type
  107. if self.type and self.type not in registry['data_backends']:
  108. raise ValidationError({
  109. 'type': _("Unknown backend type: {type}".format(type=self.type))
  110. })
  111. # Ensure URL scheme matches selected type
  112. if self.backend_class.is_local and self.url_scheme not in ('file', ''):
  113. raise ValidationError({
  114. 'source_url': "URLs for local sources must start with file:// (or specify no scheme)"
  115. })
  116. def to_objectchange(self, action):
  117. objectchange = super().to_objectchange(action)
  118. # Censor any backend parameters marked as sensitive in the serialized data
  119. pre_change_params = {}
  120. post_change_params = {}
  121. if objectchange.prechange_data:
  122. pre_change_params = objectchange.prechange_data.get('parameters') or {} # parameters may be None
  123. if objectchange.postchange_data:
  124. post_change_params = objectchange.postchange_data.get('parameters') or {}
  125. for param in self.backend_class.sensitive_parameters:
  126. if post_change_params.get(param):
  127. if post_change_params[param] != pre_change_params.get(param):
  128. # Set the "changed" token if the parameter's value has been modified
  129. post_change_params[param] = CENSOR_TOKEN_CHANGED
  130. else:
  131. post_change_params[param] = CENSOR_TOKEN
  132. if pre_change_params.get(param):
  133. pre_change_params[param] = CENSOR_TOKEN
  134. return objectchange
  135. def get_backend(self):
  136. backend_params = self.parameters or {}
  137. return self.backend_class(self.source_url, **backend_params)
  138. def sync(self):
  139. """
  140. Create/update/delete child DataFiles as necessary to synchronize with the remote source.
  141. """
  142. from core.signals import post_sync, pre_sync
  143. if self.status == DataSourceStatusChoices.SYNCING:
  144. raise SyncError(_("Cannot initiate sync; syncing already in progress."))
  145. # Emit the pre_sync signal
  146. pre_sync.send(sender=self.__class__, instance=self)
  147. self.status = DataSourceStatusChoices.SYNCING
  148. DataSource.objects.filter(pk=self.pk).update(status=self.status)
  149. # Replicate source data locally
  150. try:
  151. backend = self.get_backend()
  152. except ModuleNotFoundError as e:
  153. raise SyncError(
  154. _("There was an error initializing the backend. A dependency needs to be installed: ") + str(e)
  155. )
  156. with backend.fetch() as local_path:
  157. logger.debug(f'Syncing files from source root {local_path}')
  158. data_files = self.datafiles.all()
  159. known_paths = {df.path for df in data_files}
  160. logger.debug(f'Starting with {len(known_paths)} known files')
  161. # Check for any updated/deleted files
  162. updated_files = []
  163. deleted_file_ids = []
  164. for datafile in data_files:
  165. try:
  166. if datafile.refresh_from_disk(source_root=local_path):
  167. updated_files.append(datafile)
  168. except FileNotFoundError:
  169. # File no longer exists
  170. deleted_file_ids.append(datafile.pk)
  171. continue
  172. # Bulk update modified files
  173. updated_count = DataFile.objects.bulk_update(updated_files, ('last_updated', 'size', 'hash', 'data'))
  174. logger.debug(f"Updated {updated_count} files")
  175. # Bulk delete deleted files
  176. deleted_count, __ = DataFile.objects.filter(pk__in=deleted_file_ids).delete()
  177. logger.debug(f"Deleted {deleted_count} files")
  178. # Walk the local replication to find new files
  179. new_paths = self._walk(local_path) - known_paths
  180. # Bulk create new files
  181. new_datafiles = []
  182. for path in new_paths:
  183. datafile = DataFile(source=self, path=path)
  184. datafile.refresh_from_disk(source_root=local_path)
  185. datafile.full_clean()
  186. new_datafiles.append(datafile)
  187. created_count = len(DataFile.objects.bulk_create(new_datafiles, batch_size=100))
  188. logger.debug(f"Created {created_count} data files")
  189. # Update status & last_synced time
  190. self.status = DataSourceStatusChoices.COMPLETED
  191. self.last_synced = timezone.now()
  192. DataSource.objects.filter(pk=self.pk).update(status=self.status, last_synced=self.last_synced)
  193. # Emit the post_sync signal
  194. post_sync.send(sender=self.__class__, instance=self)
  195. sync.alters_data = True
  196. def _walk(self, root):
  197. """
  198. Return a set of all non-excluded files within the root path.
  199. """
  200. logger.debug(f"Walking {root}...")
  201. paths = set()
  202. for path, dir_names, file_names in os.walk(root):
  203. path = path.split(root)[1].lstrip('/') # Strip root path
  204. if path.startswith('.'):
  205. continue
  206. for file_name in file_names:
  207. if not self._ignore(file_name):
  208. paths.add(os.path.join(path, file_name))
  209. logger.debug(f"Found {len(paths)} files")
  210. return paths
  211. def _ignore(self, filename):
  212. """
  213. Returns a boolean indicating whether the file should be ignored per the DataSource's configured
  214. ignore rules.
  215. """
  216. if filename.startswith('.'):
  217. return True
  218. for rule in self.ignore_rules.splitlines():
  219. if fnmatchcase(filename, rule):
  220. return True
  221. return False
  222. class DataFile(models.Model):
  223. """
  224. The database representation of a remote file fetched from a remote DataSource. DataFile instances should be created,
  225. updated, or deleted only by calling DataSource.sync().
  226. """
  227. created = models.DateTimeField(
  228. verbose_name=_('created'),
  229. auto_now_add=True
  230. )
  231. last_updated = models.DateTimeField(
  232. verbose_name=_('last updated'),
  233. editable=False
  234. )
  235. source = models.ForeignKey(
  236. to='core.DataSource',
  237. on_delete=models.CASCADE,
  238. related_name='datafiles',
  239. editable=False
  240. )
  241. path = models.CharField(
  242. verbose_name=_('path'),
  243. max_length=1000,
  244. editable=False,
  245. help_text=_("File path relative to the data source's root")
  246. )
  247. size = models.PositiveIntegerField(
  248. editable=False,
  249. verbose_name=_('size')
  250. )
  251. hash = models.CharField(
  252. verbose_name=_('hash'),
  253. max_length=64,
  254. editable=False,
  255. validators=[
  256. RegexValidator(regex='^[0-9a-f]{64}$', message=_("Length must be 64 hexadecimal characters."))
  257. ],
  258. help_text=_('SHA256 hash of the file data')
  259. )
  260. data = models.BinaryField()
  261. objects = RestrictedQuerySet.as_manager()
  262. class Meta:
  263. ordering = ('source', 'path')
  264. constraints = (
  265. models.UniqueConstraint(
  266. fields=('source', 'path'),
  267. name='%(app_label)s_%(class)s_unique_source_path'
  268. ),
  269. )
  270. indexes = [
  271. models.Index(fields=('source', 'path'), name='core_datafile_source_path'),
  272. ]
  273. verbose_name = _('data file')
  274. verbose_name_plural = _('data files')
  275. def __str__(self):
  276. return self.path
  277. def get_absolute_url(self):
  278. return reverse('core:datafile', args=[self.pk])
  279. @property
  280. def data_as_string(self):
  281. if not self.data:
  282. return None
  283. try:
  284. return self.data.decode('utf-8')
  285. except UnicodeDecodeError:
  286. return None
  287. def get_data(self):
  288. """
  289. Attempt to read the file data as JSON/YAML and return a native Python object.
  290. """
  291. # TODO: Something more robust
  292. return yaml.safe_load(self.data_as_string)
  293. def refresh_from_disk(self, source_root):
  294. """
  295. Update instance attributes from the file on disk. Returns True if any attribute
  296. has changed.
  297. """
  298. file_path = os.path.join(source_root, self.path)
  299. with open(file_path, 'rb') as f:
  300. file_hash = hashlib.sha256(f.read()).hexdigest()
  301. # Update instance file attributes & data
  302. if is_modified := file_hash != self.hash:
  303. self.last_updated = timezone.now()
  304. self.size = os.path.getsize(file_path)
  305. self.hash = file_hash
  306. with open(file_path, 'rb') as f:
  307. self.data = f.read()
  308. return is_modified
  309. def write_to_disk(self, path, overwrite=False):
  310. """
  311. Write the object's data to disk at the specified path
  312. """
  313. # Check whether file already exists
  314. if os.path.isfile(path) and not overwrite:
  315. raise FileExistsError()
  316. with open(path, 'wb+') as new_file:
  317. new_file.write(self.data)
  318. class AutoSyncRecord(models.Model):
  319. """
  320. Maps a DataFile to a synced object for efficient automatic updating.
  321. """
  322. datafile = models.ForeignKey(
  323. to=DataFile,
  324. on_delete=models.CASCADE,
  325. related_name='+'
  326. )
  327. object_type = models.ForeignKey(
  328. to='contenttypes.ContentType',
  329. on_delete=models.CASCADE,
  330. related_name='+'
  331. )
  332. object_id = models.PositiveBigIntegerField()
  333. object = GenericForeignKey(
  334. ct_field='object_type',
  335. fk_field='object_id'
  336. )
  337. _netbox_private = True
  338. class Meta:
  339. constraints = (
  340. models.UniqueConstraint(
  341. fields=('object_type', 'object_id'),
  342. name='%(app_label)s_%(class)s_object'
  343. ),
  344. )
  345. indexes = (
  346. models.Index(fields=('object_type', 'object_id')),
  347. )
  348. verbose_name = _('auto sync record')
  349. verbose_name_plural = _('auto sync records')