data.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. import logging
  2. import os
  3. import yaml
  4. from fnmatch import fnmatchcase
  5. from urllib.parse import urlparse
  6. from django.conf import settings
  7. from django.contrib.contenttypes.models import ContentType
  8. from django.core.exceptions import ValidationError
  9. from django.core.validators import RegexValidator
  10. from django.db import models
  11. from django.urls import reverse
  12. from django.utils import timezone
  13. from django.utils.module_loading import import_string
  14. from django.utils.translation import gettext as _
  15. from extras.models import JobResult
  16. from netbox.models import PrimaryModel
  17. from netbox.models.features import ChangeLoggingMixin
  18. from netbox.registry import registry
  19. from utilities.files import sha256_hash
  20. from utilities.querysets import RestrictedQuerySet
  21. from ..choices import *
  22. from ..exceptions import SyncError
  23. from ..signals import post_sync, pre_sync
  24. __all__ = (
  25. 'DataFile',
  26. 'DataSource',
  27. )
  28. logger = logging.getLogger('netbox.core.data')
  29. class DataSource(PrimaryModel):
  30. """
  31. A remote source, such as a git repository, from which DataFiles are synchronized.
  32. """
  33. name = models.CharField(
  34. max_length=100,
  35. unique=True
  36. )
  37. type = models.CharField(
  38. max_length=50,
  39. choices=DataSourceTypeChoices,
  40. default=DataSourceTypeChoices.LOCAL
  41. )
  42. source_url = models.CharField(
  43. max_length=200,
  44. verbose_name=_('URL')
  45. )
  46. status = models.CharField(
  47. max_length=50,
  48. choices=DataSourceStatusChoices,
  49. default=DataSourceStatusChoices.NEW,
  50. editable=False
  51. )
  52. enabled = models.BooleanField(
  53. default=True
  54. )
  55. ignore_rules = models.TextField(
  56. blank=True,
  57. help_text=_("Patterns (one per line) matching files to ignore when syncing")
  58. )
  59. parameters = models.JSONField(
  60. blank=True,
  61. null=True
  62. )
  63. last_synced = models.DateTimeField(
  64. blank=True,
  65. null=True,
  66. editable=False
  67. )
  68. class Meta:
  69. ordering = ('name',)
  70. def __str__(self):
  71. return f'{self.name}'
  72. def get_absolute_url(self):
  73. return reverse('core:datasource', args=[self.pk])
  74. @property
  75. def docs_url(self):
  76. return f'{settings.STATIC_URL}docs/models/{self._meta.app_label}/{self._meta.model_name}/'
  77. def get_type_color(self):
  78. return DataSourceTypeChoices.colors.get(self.type)
  79. def get_status_color(self):
  80. return DataSourceStatusChoices.colors.get(self.status)
  81. @property
  82. def url_scheme(self):
  83. return urlparse(self.source_url).scheme.lower()
  84. @property
  85. def ready_for_sync(self):
  86. return self.enabled and self.status not in (
  87. DataSourceStatusChoices.QUEUED,
  88. DataSourceStatusChoices.SYNCING
  89. )
  90. def clean(self):
  91. # Ensure URL scheme matches selected type
  92. if self.type == DataSourceTypeChoices.LOCAL and self.url_scheme not in ('file', ''):
  93. raise ValidationError({
  94. 'url': f"URLs for local sources must start with file:// (or omit the scheme)"
  95. })
  96. def enqueue_sync_job(self, request):
  97. """
  98. Enqueue a background job to synchronize the DataSource by calling sync().
  99. """
  100. # Set the status to "syncing"
  101. self.status = DataSourceStatusChoices.QUEUED
  102. # Enqueue a sync job
  103. job_result = JobResult.enqueue_job(
  104. import_string('core.jobs.sync_datasource'),
  105. name=self.name,
  106. obj_type=ContentType.objects.get_for_model(DataSource),
  107. user=request.user,
  108. )
  109. return job_result
  110. def get_backend(self):
  111. backend_cls = registry['data_backends'].get(self.type)
  112. backend_params = self.parameters or {}
  113. return backend_cls(self.source_url, **backend_params)
  114. def sync(self):
  115. """
  116. Create/update/delete child DataFiles as necessary to synchronize with the remote source.
  117. """
  118. if not self.ready_for_sync:
  119. raise SyncError(f"Cannot initiate sync; data source not ready/enabled")
  120. # Emit the pre_sync signal
  121. pre_sync.send(sender=self.__class__, instance=self)
  122. self.status = DataSourceStatusChoices.SYNCING
  123. DataSource.objects.filter(pk=self.pk).update(status=self.status)
  124. # Replicate source data locally
  125. backend = self.get_backend()
  126. with backend.fetch() as local_path:
  127. logger.debug(f'Syncing files from source root {local_path}')
  128. data_files = self.datafiles.all()
  129. known_paths = {df.path for df in data_files}
  130. logger.debug(f'Starting with {len(known_paths)} known files')
  131. # Check for any updated/deleted files
  132. updated_files = []
  133. deleted_file_ids = []
  134. for datafile in data_files:
  135. try:
  136. if datafile.refresh_from_disk(source_root=local_path):
  137. updated_files.append(datafile)
  138. except FileNotFoundError:
  139. # File no longer exists
  140. deleted_file_ids.append(datafile.pk)
  141. continue
  142. # Bulk update modified files
  143. updated_count = DataFile.objects.bulk_update(updated_files, ('last_updated', 'size', 'hash', 'data'))
  144. logger.debug(f"Updated {updated_count} files")
  145. # Bulk delete deleted files
  146. deleted_count, _ = DataFile.objects.filter(pk__in=deleted_file_ids).delete()
  147. logger.debug(f"Deleted {updated_count} files")
  148. # Walk the local replication to find new files
  149. new_paths = self._walk(local_path) - known_paths
  150. # Bulk create new files
  151. new_datafiles = []
  152. for path in new_paths:
  153. datafile = DataFile(source=self, path=path)
  154. datafile.refresh_from_disk(source_root=local_path)
  155. datafile.full_clean()
  156. new_datafiles.append(datafile)
  157. created_count = len(DataFile.objects.bulk_create(new_datafiles, batch_size=100))
  158. logger.debug(f"Created {created_count} data files")
  159. # Update status & last_synced time
  160. self.status = DataSourceStatusChoices.COMPLETED
  161. self.last_synced = timezone.now()
  162. DataSource.objects.filter(pk=self.pk).update(status=self.status, last_synced=self.last_synced)
  163. # Emit the post_sync signal
  164. post_sync.send(sender=self.__class__, instance=self)
  165. def _walk(self, root):
  166. """
  167. Return a set of all non-excluded files within the root path.
  168. """
  169. logger.debug(f"Walking {root}...")
  170. paths = set()
  171. for path, dir_names, file_names in os.walk(root):
  172. path = path.split(root)[1].lstrip('/') # Strip root path
  173. if path.startswith('.'):
  174. continue
  175. for file_name in file_names:
  176. if not self._ignore(file_name):
  177. paths.add(os.path.join(path, file_name))
  178. logger.debug(f"Found {len(paths)} files")
  179. return paths
  180. def _ignore(self, filename):
  181. """
  182. Returns a boolean indicating whether the file should be ignored per the DataSource's configured
  183. ignore rules.
  184. """
  185. if filename.startswith('.'):
  186. return True
  187. for rule in self.ignore_rules.splitlines():
  188. if fnmatchcase(filename, rule):
  189. return True
  190. return False
  191. class DataFile(ChangeLoggingMixin, models.Model):
  192. """
  193. The database representation of a remote file fetched from a remote DataSource. DataFile instances should be created,
  194. updated, or deleted only by calling DataSource.sync().
  195. """
  196. source = models.ForeignKey(
  197. to='core.DataSource',
  198. on_delete=models.CASCADE,
  199. related_name='datafiles',
  200. editable=False
  201. )
  202. path = models.CharField(
  203. max_length=1000,
  204. editable=False,
  205. help_text=_("File path relative to the data source's root")
  206. )
  207. last_updated = models.DateTimeField(
  208. editable=False
  209. )
  210. size = models.PositiveIntegerField(
  211. editable=False
  212. )
  213. hash = models.CharField(
  214. max_length=64,
  215. editable=False,
  216. validators=[
  217. RegexValidator(regex='^[0-9a-f]{64}$', message=_("Length must be 64 hexadecimal characters."))
  218. ],
  219. help_text=_("SHA256 hash of the file data")
  220. )
  221. data = models.BinaryField()
  222. objects = RestrictedQuerySet.as_manager()
  223. class Meta:
  224. ordering = ('source', 'path')
  225. constraints = (
  226. models.UniqueConstraint(
  227. fields=('source', 'path'),
  228. name='%(app_label)s_%(class)s_unique_source_path'
  229. ),
  230. )
  231. def __str__(self):
  232. return self.path
  233. def get_absolute_url(self):
  234. return reverse('core:datafile', args=[self.pk])
  235. @property
  236. def data_as_string(self):
  237. try:
  238. return self.data.tobytes().decode('utf-8')
  239. except UnicodeDecodeError:
  240. return None
  241. def get_data(self):
  242. """
  243. Attempt to read the file data as JSON/YAML and return a native Python object.
  244. """
  245. # TODO: Something more robust
  246. return yaml.safe_load(self.data_as_string)
  247. def refresh_from_disk(self, source_root):
  248. """
  249. Update instance attributes from the file on disk. Returns True if any attribute
  250. has changed.
  251. """
  252. file_path = os.path.join(source_root, self.path)
  253. file_hash = sha256_hash(file_path).hexdigest()
  254. # Update instance file attributes & data
  255. if is_modified := file_hash != self.hash:
  256. self.last_updated = timezone.now()
  257. self.size = os.path.getsize(file_path)
  258. self.hash = file_hash
  259. with open(file_path, 'rb') as f:
  260. self.data = f.read()
  261. return is_modified