| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- import logging
- import os
- import yaml
- from fnmatch import fnmatchcase
- from urllib.parse import urlparse
- from django.conf import settings
- from django.contrib.contenttypes.models import ContentType
- from django.core.exceptions import ValidationError
- from django.core.validators import RegexValidator
- from django.db import models
- from django.urls import reverse
- from django.utils import timezone
- from django.utils.module_loading import import_string
- from django.utils.translation import gettext as _
- from extras.models import JobResult
- from netbox.models import PrimaryModel
- from netbox.models.features import ChangeLoggingMixin
- from netbox.registry import registry
- from utilities.files import sha256_hash
- from utilities.querysets import RestrictedQuerySet
- from ..choices import *
- from ..exceptions import SyncError
- from ..signals import post_sync, pre_sync
- __all__ = (
- 'DataFile',
- 'DataSource',
- )
- logger = logging.getLogger('netbox.core.data')
- class DataSource(PrimaryModel):
- """
- A remote source, such as a git repository, from which DataFiles are synchronized.
- """
- name = models.CharField(
- max_length=100,
- unique=True
- )
- type = models.CharField(
- max_length=50,
- choices=DataSourceTypeChoices,
- default=DataSourceTypeChoices.LOCAL
- )
- source_url = models.CharField(
- max_length=200,
- verbose_name=_('URL')
- )
- status = models.CharField(
- max_length=50,
- choices=DataSourceStatusChoices,
- default=DataSourceStatusChoices.NEW,
- editable=False
- )
- enabled = models.BooleanField(
- default=True
- )
- ignore_rules = models.TextField(
- blank=True,
- help_text=_("Patterns (one per line) matching files to ignore when syncing")
- )
- parameters = models.JSONField(
- blank=True,
- null=True
- )
- last_synced = models.DateTimeField(
- blank=True,
- null=True,
- editable=False
- )
- class Meta:
- ordering = ('name',)
- def __str__(self):
- return f'{self.name}'
- def get_absolute_url(self):
- return reverse('core:datasource', args=[self.pk])
- @property
- def docs_url(self):
- return f'{settings.STATIC_URL}docs/models/{self._meta.app_label}/{self._meta.model_name}/'
- def get_type_color(self):
- return DataSourceTypeChoices.colors.get(self.type)
- def get_status_color(self):
- return DataSourceStatusChoices.colors.get(self.status)
- @property
- def url_scheme(self):
- return urlparse(self.source_url).scheme.lower()
- @property
- def ready_for_sync(self):
- return self.enabled and self.status not in (
- DataSourceStatusChoices.QUEUED,
- DataSourceStatusChoices.SYNCING
- )
- def clean(self):
- # Ensure URL scheme matches selected type
- if self.type == DataSourceTypeChoices.LOCAL and self.url_scheme not in ('file', ''):
- raise ValidationError({
- 'url': f"URLs for local sources must start with file:// (or omit the scheme)"
- })
- def enqueue_sync_job(self, request):
- """
- Enqueue a background job to synchronize the DataSource by calling sync().
- """
- # Set the status to "syncing"
- self.status = DataSourceStatusChoices.QUEUED
- # Enqueue a sync job
- job_result = JobResult.enqueue_job(
- import_string('core.jobs.sync_datasource'),
- name=self.name,
- obj_type=ContentType.objects.get_for_model(DataSource),
- user=request.user,
- )
- return job_result
- def get_backend(self):
- backend_cls = registry['data_backends'].get(self.type)
- backend_params = self.parameters or {}
- return backend_cls(self.source_url, **backend_params)
- def sync(self):
- """
- Create/update/delete child DataFiles as necessary to synchronize with the remote source.
- """
- if not self.ready_for_sync:
- raise SyncError(f"Cannot initiate sync; data source not ready/enabled")
- # Emit the pre_sync signal
- pre_sync.send(sender=self.__class__, instance=self)
- self.status = DataSourceStatusChoices.SYNCING
- DataSource.objects.filter(pk=self.pk).update(status=self.status)
- # Replicate source data locally
- backend = self.get_backend()
- with backend.fetch() as local_path:
- logger.debug(f'Syncing files from source root {local_path}')
- data_files = self.datafiles.all()
- known_paths = {df.path for df in data_files}
- logger.debug(f'Starting with {len(known_paths)} known files')
- # Check for any updated/deleted files
- updated_files = []
- deleted_file_ids = []
- for datafile in data_files:
- try:
- if datafile.refresh_from_disk(source_root=local_path):
- updated_files.append(datafile)
- except FileNotFoundError:
- # File no longer exists
- deleted_file_ids.append(datafile.pk)
- continue
- # Bulk update modified files
- updated_count = DataFile.objects.bulk_update(updated_files, ('last_updated', 'size', 'hash', 'data'))
- logger.debug(f"Updated {updated_count} files")
- # Bulk delete deleted files
- deleted_count, _ = DataFile.objects.filter(pk__in=deleted_file_ids).delete()
- logger.debug(f"Deleted {updated_count} files")
- # Walk the local replication to find new files
- new_paths = self._walk(local_path) - known_paths
- # Bulk create new files
- new_datafiles = []
- for path in new_paths:
- datafile = DataFile(source=self, path=path)
- datafile.refresh_from_disk(source_root=local_path)
- datafile.full_clean()
- new_datafiles.append(datafile)
- created_count = len(DataFile.objects.bulk_create(new_datafiles, batch_size=100))
- logger.debug(f"Created {created_count} data files")
- # Update status & last_synced time
- self.status = DataSourceStatusChoices.COMPLETED
- self.last_synced = timezone.now()
- DataSource.objects.filter(pk=self.pk).update(status=self.status, last_synced=self.last_synced)
- # Emit the post_sync signal
- post_sync.send(sender=self.__class__, instance=self)
- def _walk(self, root):
- """
- Return a set of all non-excluded files within the root path.
- """
- logger.debug(f"Walking {root}...")
- paths = set()
- for path, dir_names, file_names in os.walk(root):
- path = path.split(root)[1].lstrip('/') # Strip root path
- if path.startswith('.'):
- continue
- for file_name in file_names:
- if not self._ignore(file_name):
- paths.add(os.path.join(path, file_name))
- logger.debug(f"Found {len(paths)} files")
- return paths
- def _ignore(self, filename):
- """
- Returns a boolean indicating whether the file should be ignored per the DataSource's configured
- ignore rules.
- """
- if filename.startswith('.'):
- return True
- for rule in self.ignore_rules.splitlines():
- if fnmatchcase(filename, rule):
- return True
- return False
- class DataFile(ChangeLoggingMixin, models.Model):
- """
- The database representation of a remote file fetched from a remote DataSource. DataFile instances should be created,
- updated, or deleted only by calling DataSource.sync().
- """
- source = models.ForeignKey(
- to='core.DataSource',
- on_delete=models.CASCADE,
- related_name='datafiles',
- editable=False
- )
- path = models.CharField(
- max_length=1000,
- editable=False,
- help_text=_("File path relative to the data source's root")
- )
- last_updated = models.DateTimeField(
- editable=False
- )
- size = models.PositiveIntegerField(
- editable=False
- )
- hash = models.CharField(
- max_length=64,
- editable=False,
- validators=[
- RegexValidator(regex='^[0-9a-f]{64}$', message=_("Length must be 64 hexadecimal characters."))
- ],
- help_text=_("SHA256 hash of the file data")
- )
- data = models.BinaryField()
- objects = RestrictedQuerySet.as_manager()
- class Meta:
- ordering = ('source', 'path')
- constraints = (
- models.UniqueConstraint(
- fields=('source', 'path'),
- name='%(app_label)s_%(class)s_unique_source_path'
- ),
- )
- def __str__(self):
- return self.path
- def get_absolute_url(self):
- return reverse('core:datafile', args=[self.pk])
- @property
- def data_as_string(self):
- try:
- return self.data.tobytes().decode('utf-8')
- except UnicodeDecodeError:
- return None
- def get_data(self):
- """
- Attempt to read the file data as JSON/YAML and return a native Python object.
- """
- # TODO: Something more robust
- return yaml.safe_load(self.data_as_string)
- def refresh_from_disk(self, source_root):
- """
- Update instance attributes from the file on disk. Returns True if any attribute
- has changed.
- """
- file_path = os.path.join(source_root, self.path)
- file_hash = sha256_hash(file_path).hexdigest()
- # Update instance file attributes & data
- if is_modified := file_hash != self.hash:
- self.last_updated = timezone.now()
- self.size = os.path.getsize(file_path)
- self.hash = file_hash
- with open(file_path, 'rb') as f:
- self.data = f.read()
- return is_modified
|