files.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. import logging
  2. import os
  3. from functools import cached_property
  4. from django.conf import settings
  5. from django.core.exceptions import ValidationError
  6. from django.core.files.storage import storages
  7. from django.db import models
  8. from django.utils.translation import gettext as _
  9. from extras.storage import ScriptFileSystemStorage
  10. from netbox.models.features import SyncedDataMixin
  11. from utilities.querysets import RestrictedQuerySet
  12. from ..choices import ManagedFileRootPathChoices
  13. __all__ = (
  14. 'ManagedFile',
  15. )
  16. logger = logging.getLogger('netbox.core.files')
  17. class ManagedFile(SyncedDataMixin, models.Model):
  18. """
  19. Database representation for a file on disk. This class is typically wrapped by a proxy class (e.g. ScriptModule)
  20. to provide additional functionality.
  21. """
  22. created = models.DateTimeField(
  23. verbose_name=_('created'),
  24. auto_now_add=True
  25. )
  26. last_updated = models.DateTimeField(
  27. verbose_name=_('last updated'),
  28. editable=False,
  29. blank=True,
  30. null=True
  31. )
  32. file_root = models.CharField(
  33. verbose_name=_('file root'),
  34. max_length=1000,
  35. choices=ManagedFileRootPathChoices
  36. )
  37. file_path = models.FilePathField(
  38. verbose_name=_('file path'),
  39. editable=False,
  40. help_text=_('File path relative to the designated root path')
  41. )
  42. objects = RestrictedQuerySet.as_manager()
  43. _netbox_private = True
  44. class Meta:
  45. ordering = ('file_root', 'file_path')
  46. constraints = (
  47. models.UniqueConstraint(
  48. fields=('file_root', 'file_path'),
  49. name='%(app_label)s_%(class)s_unique_root_path'
  50. ),
  51. )
  52. verbose_name = _('managed file')
  53. verbose_name_plural = _('managed files')
  54. def __str__(self):
  55. return self.name
  56. @property
  57. def name(self):
  58. return self.file_path
  59. @property
  60. def full_path(self):
  61. return os.path.join(self._resolve_root_path(), self.file_path)
  62. def _resolve_root_path(self):
  63. storage = self.storage
  64. if isinstance(storage, ScriptFileSystemStorage):
  65. return {
  66. 'scripts': settings.SCRIPTS_ROOT,
  67. 'reports': settings.REPORTS_ROOT,
  68. }[self.file_root]
  69. else:
  70. return ""
  71. def sync_data(self):
  72. if self.data_file:
  73. self.file_path = os.path.basename(self.data_path)
  74. storage = self.storage
  75. with storage.open(self.full_path, 'wb+') as new_file:
  76. new_file.write(self.data_file.data)
  77. sync_data.alters_data = True
  78. @cached_property
  79. def storage(self):
  80. return storages.create_storage(storages.backends["scripts"])
  81. def clean(self):
  82. super().clean()
  83. if self.data_file and not self.file_path:
  84. self.file_path = os.path.basename(self.data_path)
  85. # Ensure that the file root and path make a unique pair
  86. if self._meta.model.objects.filter(
  87. file_root=self.file_root, file_path=self.file_path
  88. ).exclude(pk=self.pk).exists():
  89. raise ValidationError(
  90. _("A {model} with this file path already exists ({path}).").format(
  91. model=self._meta.verbose_name.lower(),
  92. path=f"{self.file_root}/{self.file_path}"
  93. ))
  94. def delete(self, *args, **kwargs):
  95. # Delete file from disk
  96. storage = self.storage
  97. try:
  98. storage.delete(self.full_path)
  99. except FileNotFoundError:
  100. pass
  101. return super().delete(*args, **kwargs)