widgets.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. import uuid
  2. from functools import cached_property
  3. from hashlib import sha256
  4. from urllib.parse import urlencode
  5. import feedparser
  6. import requests
  7. from django import forms
  8. from django.conf import settings
  9. from django.contrib.contenttypes.models import ContentType
  10. from django.core.cache import cache
  11. from django.db.models import Q
  12. from django.template.loader import render_to_string
  13. from django.urls import NoReverseMatch, resolve, reverse
  14. from django.utils.translation import gettext as _
  15. from extras.utils import FeatureQuery
  16. from utilities.forms import BootstrapMixin
  17. from utilities.permissions import get_permission_for_model
  18. from utilities.templatetags.builtins.filters import render_markdown
  19. from utilities.utils import content_type_identifier, content_type_name, dict_to_querydict, get_viewname
  20. from .utils import register_widget
  21. __all__ = (
  22. 'DashboardWidget',
  23. 'NoteWidget',
  24. 'ObjectCountsWidget',
  25. 'ObjectListWidget',
  26. 'RSSFeedWidget',
  27. 'WidgetConfigForm',
  28. )
  29. def get_content_type_labels():
  30. return [
  31. (content_type_identifier(ct), content_type_name(ct))
  32. for ct in ContentType.objects.filter(
  33. FeatureQuery('export_templates').get_query() | Q(app_label='extras', model='objectchange') |
  34. Q(app_label='extras', model='configcontext')
  35. ).order_by('app_label', 'model')
  36. ]
  37. def get_models_from_content_types(content_types):
  38. """
  39. Return a list of models corresponding to the given content types, identified by natural key.
  40. """
  41. models = []
  42. for content_type_id in content_types:
  43. app_label, model_name = content_type_id.split('.')
  44. content_type = ContentType.objects.get_by_natural_key(app_label, model_name)
  45. models.append(content_type.model_class())
  46. return models
  47. class WidgetConfigForm(BootstrapMixin, forms.Form):
  48. pass
  49. class DashboardWidget:
  50. """
  51. Base class for custom dashboard widgets.
  52. Attributes:
  53. description: A brief, user-friendly description of the widget's function
  54. default_title: The string to show for the widget's title when none has been specified.
  55. default_config: Default configuration parameters, as a dictionary mapping
  56. width: The widget's default width (1 to 12)
  57. height: The widget's default height; the number of rows it consumes
  58. """
  59. description = None
  60. default_title = None
  61. default_config = {}
  62. width = 4
  63. height = 3
  64. class ConfigForm(WidgetConfigForm):
  65. """
  66. The widget's configuration form.
  67. """
  68. pass
  69. def __init__(self, id=None, title=None, color=None, config=None, width=None, height=None, x=None, y=None):
  70. self.id = id or str(uuid.uuid4())
  71. self.config = config or self.default_config
  72. self.title = title or self.default_title
  73. self.color = color
  74. if width:
  75. self.width = width
  76. if height:
  77. self.height = height
  78. self.x, self.y = x, y
  79. def __str__(self):
  80. return self.title or self.__class__.__name__
  81. def set_layout(self, grid_item):
  82. self.width = grid_item.get('w', 1)
  83. self.height = grid_item.get('h', 1)
  84. self.x = grid_item.get('x')
  85. self.y = grid_item.get('y')
  86. def render(self, request):
  87. """
  88. This method is called to render the widget's content.
  89. Params:
  90. request: The current request
  91. """
  92. raise NotImplementedError(f"{self.__class__} must define a render() method.")
  93. @property
  94. def name(self):
  95. return f'{self.__class__.__module__.split(".")[0]}.{self.__class__.__name__}'
  96. @property
  97. def form_data(self):
  98. return {
  99. 'title': self.title,
  100. 'color': self.color,
  101. 'config': self.config,
  102. }
  103. @register_widget
  104. class NoteWidget(DashboardWidget):
  105. default_title = _('Note')
  106. description = _('Display some arbitrary custom content. Markdown is supported.')
  107. class ConfigForm(WidgetConfigForm):
  108. content = forms.CharField(
  109. widget=forms.Textarea()
  110. )
  111. def render(self, request):
  112. return render_markdown(self.config.get('content'))
  113. @register_widget
  114. class ObjectCountsWidget(DashboardWidget):
  115. default_title = _('Object Counts')
  116. description = _('Display a set of NetBox models and the number of objects created for each type.')
  117. template_name = 'extras/dashboard/widgets/objectcounts.html'
  118. class ConfigForm(WidgetConfigForm):
  119. models = forms.MultipleChoiceField(
  120. choices=get_content_type_labels
  121. )
  122. filters = forms.JSONField(
  123. required=False,
  124. label='Object filters',
  125. help_text=_("Filters to apply when counting the number of objects")
  126. )
  127. def clean_filters(self):
  128. if data := self.cleaned_data['filters']:
  129. try:
  130. dict(data)
  131. except TypeError:
  132. raise forms.ValidationError("Invalid format. Object filters must be passed as a dictionary.")
  133. return data
  134. def render(self, request):
  135. counts = []
  136. for model in get_models_from_content_types(self.config['models']):
  137. permission = get_permission_for_model(model, 'view')
  138. if request.user.has_perm(permission):
  139. url = reverse(get_viewname(model, 'list'))
  140. qs = model.objects.restrict(request.user, 'view')
  141. # Apply any specified filters
  142. if filters := self.config.get('filters'):
  143. params = dict_to_querydict(filters)
  144. filterset = getattr(resolve(url).func.view_class, 'filterset', None)
  145. qs = filterset(params, qs).qs
  146. url = f'{url}?{params.urlencode()}'
  147. object_count = qs.count
  148. counts.append((model, object_count, url))
  149. else:
  150. counts.append((model, None, None))
  151. return render_to_string(self.template_name, {
  152. 'counts': counts,
  153. })
  154. @register_widget
  155. class ObjectListWidget(DashboardWidget):
  156. default_title = _('Object List')
  157. description = _('Display an arbitrary list of objects.')
  158. template_name = 'extras/dashboard/widgets/objectlist.html'
  159. width = 12
  160. height = 4
  161. class ConfigForm(WidgetConfigForm):
  162. model = forms.ChoiceField(
  163. choices=get_content_type_labels
  164. )
  165. page_size = forms.IntegerField(
  166. required=False,
  167. min_value=1,
  168. max_value=100,
  169. help_text=_('The default number of objects to display')
  170. )
  171. url_params = forms.JSONField(
  172. required=False,
  173. label='URL parameters'
  174. )
  175. def clean_url_params(self):
  176. if data := self.cleaned_data['url_params']:
  177. try:
  178. urlencode(data)
  179. except (TypeError, ValueError):
  180. raise forms.ValidationError("Invalid format. URL parameters must be passed as a dictionary.")
  181. return data
  182. def render(self, request):
  183. app_label, model_name = self.config['model'].split('.')
  184. model = ContentType.objects.get_by_natural_key(app_label, model_name).model_class()
  185. viewname = get_viewname(model, action='list')
  186. # Evaluate user's permission. Note that this controls only whether the HTMX element is
  187. # embedded on the page: The view itself will also evaluate permissions separately.
  188. permission = get_permission_for_model(model, 'view')
  189. has_permission = request.user.has_perm(permission)
  190. try:
  191. htmx_url = reverse(viewname)
  192. except NoReverseMatch:
  193. htmx_url = None
  194. parameters = self.config.get('url_params') or {}
  195. if page_size := self.config.get('page_size'):
  196. parameters['per_page'] = page_size
  197. if parameters:
  198. try:
  199. htmx_url = f'{htmx_url}?{urlencode(parameters, doseq=True)}'
  200. except ValueError:
  201. pass
  202. return render_to_string(self.template_name, {
  203. 'viewname': viewname,
  204. 'has_permission': has_permission,
  205. 'htmx_url': htmx_url,
  206. })
  207. @register_widget
  208. class RSSFeedWidget(DashboardWidget):
  209. default_title = _('RSS Feed')
  210. default_config = {
  211. 'max_entries': 10,
  212. 'cache_timeout': 3600, # seconds
  213. }
  214. description = _('Embed an RSS feed from an external website.')
  215. template_name = 'extras/dashboard/widgets/rssfeed.html'
  216. width = 6
  217. height = 4
  218. class ConfigForm(WidgetConfigForm):
  219. feed_url = forms.URLField(
  220. label=_('Feed URL')
  221. )
  222. max_entries = forms.IntegerField(
  223. min_value=1,
  224. max_value=1000,
  225. help_text=_('The maximum number of objects to display')
  226. )
  227. cache_timeout = forms.IntegerField(
  228. min_value=600, # 10 minutes
  229. max_value=86400, # 24 hours
  230. help_text=_('How long to stored the cached content (in seconds)')
  231. )
  232. def render(self, request):
  233. return render_to_string(self.template_name, {
  234. 'url': self.config['feed_url'],
  235. **self.get_feed()
  236. })
  237. @cached_property
  238. def cache_key(self):
  239. url = self.config['feed_url']
  240. url_checksum = sha256(url.encode('utf-8')).hexdigest()
  241. return f'dashboard_rss_{url_checksum}'
  242. def get_feed(self):
  243. # Fetch RSS content from cache if available
  244. if feed_content := cache.get(self.cache_key):
  245. return {
  246. 'feed': feedparser.FeedParserDict(feed_content),
  247. }
  248. # Fetch feed content from remote server
  249. try:
  250. response = requests.get(
  251. url=self.config['feed_url'],
  252. headers={'User-Agent': f'NetBox/{settings.VERSION}'},
  253. proxies=settings.HTTP_PROXIES,
  254. timeout=3
  255. )
  256. response.raise_for_status()
  257. except requests.exceptions.RequestException as e:
  258. return {
  259. 'error': e,
  260. }
  261. # Parse feed content
  262. feed = feedparser.parse(response.content)
  263. if not feed.bozo:
  264. # Cap number of entries
  265. max_entries = self.config.get('max_entries')
  266. feed['entries'] = feed['entries'][:max_entries]
  267. # Cache the feed content
  268. cache.set(self.cache_key, dict(feed), self.config.get('cache_timeout'))
  269. return {
  270. 'feed': feed,
  271. }