data.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392
  1. import logging
  2. import os
  3. import yaml
  4. from fnmatch import fnmatchcase
  5. from urllib.parse import urlparse
  6. from django.conf import settings
  7. from django.contrib.contenttypes.fields import GenericForeignKey
  8. from django.contrib.contenttypes.models import ContentType
  9. from django.core.exceptions import ValidationError
  10. from django.core.validators import RegexValidator
  11. from django.db import models
  12. from django.urls import reverse
  13. from django.utils import timezone
  14. from django.utils.module_loading import import_string
  15. from django.utils.translation import gettext as _
  16. from netbox.models import PrimaryModel
  17. from netbox.models.features import JobsMixin
  18. from netbox.registry import registry
  19. from utilities.files import sha256_hash
  20. from utilities.querysets import RestrictedQuerySet
  21. from ..choices import *
  22. from ..exceptions import SyncError
  23. from ..signals import post_sync, pre_sync
  24. from .jobs import Job
  25. __all__ = (
  26. 'AutoSyncRecord',
  27. 'DataFile',
  28. 'DataSource',
  29. )
  30. logger = logging.getLogger('netbox.core.data')
  31. class DataSource(JobsMixin, PrimaryModel):
  32. """
  33. A remote source, such as a git repository, from which DataFiles are synchronized.
  34. """
  35. name = models.CharField(
  36. verbose_name=_('name'),
  37. max_length=100,
  38. unique=True
  39. )
  40. type = models.CharField(
  41. verbose_name=_('type'),
  42. max_length=50
  43. )
  44. source_url = models.CharField(
  45. max_length=200,
  46. verbose_name=_('URL')
  47. )
  48. status = models.CharField(
  49. verbose_name=_('status'),
  50. max_length=50,
  51. choices=DataSourceStatusChoices,
  52. default=DataSourceStatusChoices.NEW,
  53. editable=False
  54. )
  55. enabled = models.BooleanField(
  56. verbose_name=_('enabled'),
  57. default=True
  58. )
  59. ignore_rules = models.TextField(
  60. verbose_name=_('ignore rules'),
  61. blank=True,
  62. help_text=_("Patterns (one per line) matching files to ignore when syncing")
  63. )
  64. parameters = models.JSONField(
  65. verbose_name=_('parameters'),
  66. blank=True,
  67. null=True
  68. )
  69. last_synced = models.DateTimeField(
  70. verbose_name=_('last synced'),
  71. blank=True,
  72. null=True,
  73. editable=False
  74. )
  75. class Meta:
  76. ordering = ('name',)
  77. verbose_name = _('data source')
  78. verbose_name_plural = _('data sources')
  79. def __str__(self):
  80. return f'{self.name}'
  81. def get_absolute_url(self):
  82. return reverse('core:datasource', args=[self.pk])
  83. @property
  84. def docs_url(self):
  85. return f'{settings.STATIC_URL}docs/models/{self._meta.app_label}/{self._meta.model_name}/'
  86. def get_type_display(self):
  87. if backend := registry['data_backends'].get(self.type):
  88. return backend.label
  89. def get_status_color(self):
  90. return DataSourceStatusChoices.colors.get(self.status)
  91. @property
  92. def url_scheme(self):
  93. return urlparse(self.source_url).scheme.lower()
  94. @property
  95. def backend_class(self):
  96. return registry['data_backends'].get(self.type)
  97. @property
  98. def ready_for_sync(self):
  99. return self.enabled and self.status not in (
  100. DataSourceStatusChoices.QUEUED,
  101. DataSourceStatusChoices.SYNCING
  102. )
  103. def clean(self):
  104. # Validate data backend type
  105. if self.type and self.type not in registry['data_backends']:
  106. raise ValidationError({
  107. 'type': _("Unknown backend type: {type}".format(type=self.type))
  108. })
  109. # Ensure URL scheme matches selected type
  110. if self.backend_class.is_local and self.url_scheme not in ('file', ''):
  111. raise ValidationError({
  112. 'source_url': f"URLs for local sources must start with file:// (or specify no scheme)"
  113. })
  114. def enqueue_sync_job(self, request):
  115. """
  116. Enqueue a background job to synchronize the DataSource by calling sync().
  117. """
  118. # Set the status to "syncing"
  119. self.status = DataSourceStatusChoices.QUEUED
  120. DataSource.objects.filter(pk=self.pk).update(status=self.status)
  121. # Enqueue a sync job
  122. return Job.enqueue(
  123. import_string('core.jobs.sync_datasource'),
  124. instance=self,
  125. user=request.user
  126. )
  127. def get_backend(self):
  128. backend_params = self.parameters or {}
  129. return self.backend_class(self.source_url, **backend_params)
  130. def sync(self):
  131. """
  132. Create/update/delete child DataFiles as necessary to synchronize with the remote source.
  133. """
  134. if self.status == DataSourceStatusChoices.SYNCING:
  135. raise SyncError("Cannot initiate sync; syncing already in progress.")
  136. # Emit the pre_sync signal
  137. pre_sync.send(sender=self.__class__, instance=self)
  138. self.status = DataSourceStatusChoices.SYNCING
  139. DataSource.objects.filter(pk=self.pk).update(status=self.status)
  140. # Replicate source data locally
  141. try:
  142. backend = self.get_backend()
  143. except ModuleNotFoundError as e:
  144. raise SyncError(
  145. f"There was an error initializing the backend. A dependency needs to be installed: {e}"
  146. )
  147. with backend.fetch() as local_path:
  148. logger.debug(f'Syncing files from source root {local_path}')
  149. data_files = self.datafiles.all()
  150. known_paths = {df.path for df in data_files}
  151. logger.debug(f'Starting with {len(known_paths)} known files')
  152. # Check for any updated/deleted files
  153. updated_files = []
  154. deleted_file_ids = []
  155. for datafile in data_files:
  156. try:
  157. if datafile.refresh_from_disk(source_root=local_path):
  158. updated_files.append(datafile)
  159. except FileNotFoundError:
  160. # File no longer exists
  161. deleted_file_ids.append(datafile.pk)
  162. continue
  163. # Bulk update modified files
  164. updated_count = DataFile.objects.bulk_update(updated_files, ('last_updated', 'size', 'hash', 'data'))
  165. logger.debug(f"Updated {updated_count} files")
  166. # Bulk delete deleted files
  167. deleted_count, _ = DataFile.objects.filter(pk__in=deleted_file_ids).delete()
  168. logger.debug(f"Deleted {deleted_count} files")
  169. # Walk the local replication to find new files
  170. new_paths = self._walk(local_path) - known_paths
  171. # Bulk create new files
  172. new_datafiles = []
  173. for path in new_paths:
  174. datafile = DataFile(source=self, path=path)
  175. datafile.refresh_from_disk(source_root=local_path)
  176. datafile.full_clean()
  177. new_datafiles.append(datafile)
  178. created_count = len(DataFile.objects.bulk_create(new_datafiles, batch_size=100))
  179. logger.debug(f"Created {created_count} data files")
  180. # Update status & last_synced time
  181. self.status = DataSourceStatusChoices.COMPLETED
  182. self.last_synced = timezone.now()
  183. DataSource.objects.filter(pk=self.pk).update(status=self.status, last_synced=self.last_synced)
  184. # Emit the post_sync signal
  185. post_sync.send(sender=self.__class__, instance=self)
  186. sync.alters_data = True
  187. def _walk(self, root):
  188. """
  189. Return a set of all non-excluded files within the root path.
  190. """
  191. logger.debug(f"Walking {root}...")
  192. paths = set()
  193. for path, dir_names, file_names in os.walk(root):
  194. path = path.split(root)[1].lstrip('/') # Strip root path
  195. if path.startswith('.'):
  196. continue
  197. for file_name in file_names:
  198. if not self._ignore(file_name):
  199. paths.add(os.path.join(path, file_name))
  200. logger.debug(f"Found {len(paths)} files")
  201. return paths
  202. def _ignore(self, filename):
  203. """
  204. Returns a boolean indicating whether the file should be ignored per the DataSource's configured
  205. ignore rules.
  206. """
  207. if filename.startswith('.'):
  208. return True
  209. for rule in self.ignore_rules.splitlines():
  210. if fnmatchcase(filename, rule):
  211. return True
  212. return False
  213. class DataFile(models.Model):
  214. """
  215. The database representation of a remote file fetched from a remote DataSource. DataFile instances should be created,
  216. updated, or deleted only by calling DataSource.sync().
  217. """
  218. created = models.DateTimeField(
  219. verbose_name=_('created'),
  220. auto_now_add=True
  221. )
  222. last_updated = models.DateTimeField(
  223. verbose_name=_('last updated'),
  224. editable=False
  225. )
  226. source = models.ForeignKey(
  227. to='core.DataSource',
  228. on_delete=models.CASCADE,
  229. related_name='datafiles',
  230. editable=False
  231. )
  232. path = models.CharField(
  233. verbose_name=_('path'),
  234. max_length=1000,
  235. editable=False,
  236. help_text=_("File path relative to the data source's root")
  237. )
  238. size = models.PositiveIntegerField(
  239. editable=False,
  240. verbose_name=_('size')
  241. )
  242. hash = models.CharField(
  243. verbose_name=_('hash'),
  244. max_length=64,
  245. editable=False,
  246. validators=[
  247. RegexValidator(regex='^[0-9a-f]{64}$', message=_("Length must be 64 hexadecimal characters."))
  248. ],
  249. help_text=_('SHA256 hash of the file data')
  250. )
  251. data = models.BinaryField()
  252. objects = RestrictedQuerySet.as_manager()
  253. class Meta:
  254. ordering = ('source', 'path')
  255. constraints = (
  256. models.UniqueConstraint(
  257. fields=('source', 'path'),
  258. name='%(app_label)s_%(class)s_unique_source_path'
  259. ),
  260. )
  261. indexes = [
  262. models.Index(fields=('source', 'path'), name='core_datafile_source_path'),
  263. ]
  264. verbose_name = _('data file')
  265. verbose_name_plural = _('data files')
  266. def __str__(self):
  267. return self.path
  268. def get_absolute_url(self):
  269. return reverse('core:datafile', args=[self.pk])
  270. @property
  271. def data_as_string(self):
  272. if not self.data:
  273. return None
  274. try:
  275. return self.data.decode('utf-8')
  276. except UnicodeDecodeError:
  277. return None
  278. def get_data(self):
  279. """
  280. Attempt to read the file data as JSON/YAML and return a native Python object.
  281. """
  282. # TODO: Something more robust
  283. return yaml.safe_load(self.data_as_string)
  284. def refresh_from_disk(self, source_root):
  285. """
  286. Update instance attributes from the file on disk. Returns True if any attribute
  287. has changed.
  288. """
  289. file_path = os.path.join(source_root, self.path)
  290. file_hash = sha256_hash(file_path).hexdigest()
  291. # Update instance file attributes & data
  292. if is_modified := file_hash != self.hash:
  293. self.last_updated = timezone.now()
  294. self.size = os.path.getsize(file_path)
  295. self.hash = file_hash
  296. with open(file_path, 'rb') as f:
  297. self.data = f.read()
  298. return is_modified
  299. def write_to_disk(self, path, overwrite=False):
  300. """
  301. Write the object's data to disk at the specified path
  302. """
  303. # Check whether file already exists
  304. if os.path.isfile(path) and not overwrite:
  305. raise FileExistsError()
  306. with open(path, 'wb+') as new_file:
  307. new_file.write(self.data)
  308. class AutoSyncRecord(models.Model):
  309. """
  310. Maps a DataFile to a synced object for efficient automatic updating.
  311. """
  312. datafile = models.ForeignKey(
  313. to=DataFile,
  314. on_delete=models.CASCADE,
  315. related_name='+'
  316. )
  317. object_type = models.ForeignKey(
  318. to=ContentType,
  319. on_delete=models.CASCADE,
  320. related_name='+'
  321. )
  322. object_id = models.PositiveBigIntegerField()
  323. object = GenericForeignKey(
  324. ct_field='object_type',
  325. fk_field='object_id'
  326. )
  327. class Meta:
  328. constraints = (
  329. models.UniqueConstraint(
  330. fields=('object_type', 'object_id'),
  331. name='%(app_label)s_%(class)s_object'
  332. ),
  333. )
  334. indexes = (
  335. models.Index(fields=('object_type', 'object_id')),
  336. )
  337. verbose_name = _('auto sync record')
  338. verbose_name_plural = _('auto sync records')