widgets.py 15 KB


  1. import logging
  2. import uuid
  3. from functools import cached_property
  4. from hashlib import sha256
  5. from urllib.parse import urlencode
  6. import feedparser
  7. import requests
  8. from django import forms
  9. from django.conf import settings
  10. from django.core.cache import cache
  11. from django.db.models import Model
  12. from django.template.loader import render_to_string
  13. from django.urls import NoReverseMatch, resolve
  14. from django.utils.translation import gettext as _
  15. from core.models import ObjectType
  16. from extras.choices import BookmarkOrderingChoices
  17. from utilities.object_types import object_type_identifier, object_type_name
  18. from utilities.permissions import get_permission_for_model
  19. from utilities.proxy import resolve_proxies
  20. from utilities.querydict import dict_to_querydict
  21. from utilities.templatetags.builtins.filters import render_markdown
  22. from utilities.views import get_action_url
  23. from .utils import register_widget
  24. __all__ = (
  25. 'BookmarksWidget',
  26. 'DashboardWidget',
  27. 'NoteWidget',
  28. 'ObjectCountsWidget',
  29. 'ObjectListWidget',
  30. 'RSSFeedWidget',
  31. 'WidgetConfigForm',
  32. )
  33. logger = logging.getLogger('netbox.data_backends')
  34. def get_object_type_choices():
  35. return [
  36. (object_type_identifier(ot), object_type_name(ot))
  37. for ot in ObjectType.objects.public().order_by('app_label', 'model')
  38. ]
  39. def object_list_widget_supports_model(model: Model) -> bool:
  40. """Test whether a model is supported by the ObjectListWidget
  41. In theory there could be more than one reason why a model isn't supported by the
  42. ObjectListWidget, although we've only identified one so far--there's no resolve-able 'list' URL
  43. for the model. Add more tests if more conditions arise.
  44. """
  45. def can_resolve_model_list_view(model: Model) -> bool:
  46. try:
  47. get_action_url(model, action='list')
  48. return True
  49. except NoReverseMatch:
  50. return False
  51. tests = [
  52. can_resolve_model_list_view,
  53. ]
  54. return all(test(model) for test in tests)
  55. def get_bookmarks_object_type_choices():
  56. return [
  57. (object_type_identifier(ot), object_type_name(ot))
  58. for ot in ObjectType.objects.with_feature('bookmarks').order_by('app_label', 'model')
  59. ]
  60. def get_models_from_content_types(content_types):
  61. """
  62. Return a list of models corresponding to the given content types, identified by natural key.
  63. Accepts both lowercase (e.g. "dcim.site") and PascalCase (e.g. "dcim.Site") model names.
  64. """
  65. models = []
  66. for content_type_id in content_types:
  67. app_label, model_name = content_type_id.lower().split('.')
  68. try:
  69. content_type = ObjectType.objects.get_by_natural_key(app_label, model_name)
  70. if content_type.model_class():
  71. models.append(content_type.model_class())
  72. else:
  73. logger.debug(f"Dashboard Widget model_class not found: {app_label}:{model_name}")
  74. except ObjectType.DoesNotExist:
  75. logger.debug(f"Dashboard Widget ObjectType not found: {app_label}:{model_name}")
  76. return models
  77. class WidgetConfigForm(forms.Form):
  78. pass
  79. class DashboardWidget:
  80. """
  81. Base class for custom dashboard widgets.
  82. Attributes:
  83. description: A brief, user-friendly description of the widget's function
  84. default_title: The string to show for the widget's title when none has been specified.
  85. default_config: Default configuration parameters, as a dictionary mapping
  86. width: The widget's default width (1 to 12)
  87. height: The widget's default height; the number of rows it consumes
  88. """
  89. description = None
  90. default_title = None
  91. default_config = {}
  92. width = 4
  93. height = 3
  94. class ConfigForm(WidgetConfigForm):
  95. """
  96. The widget's configuration form.
  97. """
  98. pass
  99. def __init__(self, id=None, title=None, color=None, config=None, width=None, height=None, x=None, y=None):
  100. self.id = id or str(uuid.uuid4())
  101. self.config = config or self.default_config
  102. self.title = title or self.default_title
  103. self.color = color
  104. if width:
  105. self.width = width
  106. if height:
  107. self.height = height
  108. self.x, self.y = x, y
  109. def __str__(self):
  110. return self.title or self.__class__.__name__
  111. def set_layout(self, grid_item):
  112. self.width = grid_item.get('w', 1)
  113. self.height = grid_item.get('h', 1)
  114. self.x = grid_item.get('x')
  115. self.y = grid_item.get('y')
  116. def render(self, request):
  117. """
  118. This method is called to render the widget's content.
  119. Params:
  120. request: The current request
  121. """
  122. raise NotImplementedError(_("{class_name} must define a render() method.").format(
  123. class_name=self.__class__
  124. ))
  125. @property
  126. def name(self):
  127. return f'{self.__class__.__module__.split(".")[0]}.{self.__class__.__name__}'
  128. @property
  129. def form_data(self):
  130. return {
  131. 'title': self.title,
  132. 'color': self.color,
  133. 'config': self.config,
  134. }
  135. @register_widget
  136. class NoteWidget(DashboardWidget):
  137. default_title = _('Note')
  138. description = _('Display some arbitrary custom content. Markdown is supported.')
  139. class ConfigForm(WidgetConfigForm):
  140. content = forms.CharField(
  141. widget=forms.Textarea()
  142. )
  143. def render(self, request):
  144. return render_markdown(self.config.get('content'))
  145. @register_widget
  146. class ObjectCountsWidget(DashboardWidget):
  147. default_title = _('Object Counts')
  148. description = _('Display a set of NetBox models and the number of objects created for each type.')
  149. template_name = 'extras/dashboard/widgets/objectcounts.html'
  150. class ConfigForm(WidgetConfigForm):
  151. models = forms.MultipleChoiceField(
  152. choices=get_object_type_choices
  153. )
  154. filters = forms.JSONField(
  155. required=False,
  156. label='Object filters',
  157. help_text=_("Filters to apply when counting the number of objects")
  158. )
  159. def clean_filters(self):
  160. if data := self.cleaned_data['filters']:
  161. try:
  162. dict(data)
  163. except TypeError:
  164. raise forms.ValidationError(_("Invalid format. Object filters must be passed as a dictionary."))
  165. return data
  166. def render(self, request):
  167. counts = []
  168. for model in get_models_from_content_types(self.config['models']):
  169. permission = get_permission_for_model(model, 'view')
  170. if request.user.has_perm(permission):
  171. try:
  172. url = get_action_url(model, action='list')
  173. except NoReverseMatch:
  174. url = None
  175. try:
  176. qs = model.objects.restrict(request.user, 'view')
  177. except AttributeError:
  178. qs = model.objects.all()
  179. # Apply any specified filters
  180. if url and (filters := self.config.get('filters')):
  181. params = dict_to_querydict(filters)
  182. filterset = getattr(resolve(url).func.view_class, 'filterset', None)
  183. qs = filterset(params, qs).qs
  184. url = f'{url}?{params.urlencode()}'
  185. object_count = qs.count
  186. counts.append((model, object_count, url))
  187. else:
  188. counts.append((model, None, None))
  189. return render_to_string(self.template_name, {
  190. 'counts': counts,
  191. })
  192. @register_widget
  193. class ObjectListWidget(DashboardWidget):
  194. default_title = _('Object List')
  195. description = _('Display an arbitrary list of objects.')
  196. template_name = 'extras/dashboard/widgets/objectlist.html'
  197. width = 12
  198. height = 4
  199. class ConfigForm(WidgetConfigForm):
  200. model = forms.ChoiceField(
  201. choices=get_object_type_choices
  202. )
  203. page_size = forms.IntegerField(
  204. required=False,
  205. min_value=1,
  206. max_value=100,
  207. help_text=_('The default number of objects to display')
  208. )
  209. url_params = forms.JSONField(
  210. required=False,
  211. label='URL parameters'
  212. )
  213. def clean_url_params(self):
  214. if data := self.cleaned_data['url_params']:
  215. try:
  216. urlencode(data)
  217. except (TypeError, ValueError):
  218. raise forms.ValidationError(_("Invalid format. URL parameters must be passed as a dictionary."))
  219. return data
  220. def clean_model(self):
  221. if model_info := self.cleaned_data['model']:
  222. app_label, model_name = model_info.split('.')
  223. model = ObjectType.objects.get_by_natural_key(app_label, model_name).model_class()
  224. if not object_list_widget_supports_model(model):
  225. raise forms.ValidationError(
  226. _(f"Invalid model selection: {self['model'].data} is not supported.")
  227. )
  228. return model_info
  229. def render(self, request):
  230. app_label, model_name = self.config['model'].split('.')
  231. model = ObjectType.objects.get_by_natural_key(app_label, model_name).model_class()
  232. if not model:
  233. logger.debug(f"Dashboard Widget model_class not found: {app_label}:{model_name}")
  234. return
  235. # Evaluate user's permission. Note that this controls only whether the HTMX element is
  236. # embedded on the page: The view itself will also evaluate permissions separately.
  237. permission = get_permission_for_model(model, 'view')
  238. has_permission = request.user.has_perm(permission)
  239. try:
  240. htmx_url = get_action_url(model, action='list')
  241. except NoReverseMatch:
  242. htmx_url = None
  243. parameters = self.config.get('url_params') or {}
  244. if page_size := self.config.get('page_size'):
  245. parameters['per_page'] = page_size
  246. parameters['embedded'] = True
  247. if parameters and htmx_url is not None:
  248. try:
  249. htmx_url = f'{htmx_url}?{urlencode(parameters, doseq=True)}'
  250. except ValueError:
  251. pass
  252. return render_to_string(self.template_name, {
  253. 'model_name': model_name,
  254. 'has_permission': has_permission,
  255. 'htmx_url': htmx_url,
  256. })
  257. @register_widget
  258. class RSSFeedWidget(DashboardWidget):
  259. default_title = _('RSS Feed')
  260. default_config = {
  261. 'max_entries': 10,
  262. 'cache_timeout': 3600, # seconds
  263. 'request_timeout': 3, # seconds
  264. 'requires_internet': True,
  265. }
  266. description = _('Embed an RSS feed from an external website.')
  267. template_name = 'extras/dashboard/widgets/rssfeed.html'
  268. width = 6
  269. height = 4
  270. class ConfigForm(WidgetConfigForm):
  271. feed_url = forms.URLField(
  272. label=_('Feed URL'),
  273. assume_scheme='https'
  274. )
  275. requires_internet = forms.BooleanField(
  276. label=_('Requires external connection'),
  277. required=False,
  278. )
  279. max_entries = forms.IntegerField(
  280. min_value=1,
  281. max_value=1000,
  282. help_text=_('The maximum number of objects to display')
  283. )
  284. cache_timeout = forms.IntegerField(
  285. min_value=600, # 10 minutes
  286. max_value=86400, # 24 hours
  287. help_text=_('How long to stored the cached content (in seconds)')
  288. )
  289. request_timeout = forms.IntegerField(
  290. min_value=1,
  291. max_value=60,
  292. required=False,
  293. help_text=_('Timeout value for fetching the feed (in seconds)')
  294. )
  295. def render(self, request):
  296. return render_to_string(self.template_name, {
  297. 'url': self.config['feed_url'],
  298. **self.get_feed()
  299. })
  300. @cached_property
  301. def cache_key(self):
  302. url = self.config['feed_url']
  303. url_checksum = sha256(url.encode('utf-8')).hexdigest()
  304. return f'dashboard_rss_{url_checksum}'
  305. def get_feed(self):
  306. if self.config.get('requires_internet') and settings.ISOLATED_DEPLOYMENT:
  307. return {
  308. 'isolated_deployment': True,
  309. }
  310. # Fetch RSS content from cache if available
  311. if feed_content := cache.get(self.cache_key):
  312. return {
  313. 'feed': feedparser.FeedParserDict(feed_content),
  314. }
  315. # Fetch feed content from remote server
  316. try:
  317. response = requests.get(
  318. url=self.config['feed_url'],
  319. headers={'User-Agent': f'NetBox/{settings.RELEASE.version}'},
  320. proxies=resolve_proxies(url=self.config['feed_url'], context={'client': self}),
  321. timeout=self.config.get('request_timeout', 3),
  322. )
  323. response.raise_for_status()
  324. except requests.exceptions.RequestException as e:
  325. return {
  326. 'error': e,
  327. }
  328. # Parse feed content
  329. feed = feedparser.parse(response.content)
  330. if not feed.bozo:
  331. # Cap number of entries
  332. max_entries = self.config.get('max_entries')
  333. feed['entries'] = feed['entries'][:max_entries]
  334. # Cache the feed content
  335. cache.set(self.cache_key, dict(feed), self.config.get('cache_timeout'))
  336. return {
  337. 'feed': feed,
  338. }
  339. @register_widget
  340. class BookmarksWidget(DashboardWidget):
  341. default_title = _('Bookmarks')
  342. default_config = {
  343. 'order_by': BookmarkOrderingChoices.ORDERING_NEWEST,
  344. }
  345. description = _('Show your personal bookmarks')
  346. template_name = 'extras/dashboard/widgets/bookmarks.html'
  347. class ConfigForm(WidgetConfigForm):
  348. object_types = forms.MultipleChoiceField(
  349. choices=get_bookmarks_object_type_choices,
  350. required=False
  351. )
  352. order_by = forms.ChoiceField(
  353. choices=BookmarkOrderingChoices
  354. )
  355. max_items = forms.IntegerField(
  356. min_value=1,
  357. required=False
  358. )
  359. def render(self, request):
  360. from extras.models import Bookmark
  361. if request.user.is_anonymous:
  362. bookmarks = list()
  363. else:
  364. bookmarks = Bookmark.objects.filter(user=request.user)
  365. if object_types := self.config.get('object_types'):
  366. models = get_models_from_content_types(object_types)
  367. content_types = ObjectType.objects.get_for_models(*models).values()
  368. bookmarks = bookmarks.filter(object_type__in=content_types)
  369. if self.config['order_by'] == BookmarkOrderingChoices.ORDERING_ALPHABETICAL_AZ:
  370. bookmarks = sorted(bookmarks, key=lambda bookmark: bookmark.__str__().lower())
  371. elif self.config['order_by'] == BookmarkOrderingChoices.ORDERING_ALPHABETICAL_ZA:
  372. bookmarks = sorted(bookmarks, key=lambda bookmark: bookmark.__str__().lower(), reverse=True)
  373. else:
  374. bookmarks = bookmarks.order_by(self.config['order_by'])
  375. if max_items := self.config.get('max_items'):
  376. bookmarks = bookmarks[:max_items]
  377. return render_to_string(self.template_name, {
  378. 'bookmarks': bookmarks,
  379. })