views.py 48 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160
  1. import csv
  2. from django.conf import settings
  3. from django.contrib.contenttypes.models import ContentType
  4. from django.core.exceptions import ObjectDoesNotExist
  5. from django.db.models import ForeignKey
  6. from django.test import override_settings
  7. from django.urls import reverse
  8. from django.utils.translation import gettext as _
  9. from core.choices import ObjectChangeActionChoices
  10. from core.models import ObjectChange, ObjectType
  11. from netbox.choices import CSVDelimiterChoices, ImportFormatChoices
  12. from netbox.models.features import ChangeLoggingMixin, CustomFieldsMixin
  13. from users.models import ObjectPermission
  14. from .base import ModelTestCase
  15. from .utils import add_custom_field_data, disable_warnings, get_random_string, post_data
  16. __all__ = (
  17. 'ModelViewTestCase',
  18. 'ViewTestCases',
  19. )
  20. #
  21. # UI Tests
  22. #
  23. class ModelViewTestCase(ModelTestCase):
  24. """
  25. Base TestCase for model views. Subclass to test individual views.
  26. """
  27. def _get_base_url(self):
  28. """
  29. Return the base format for a URL for the test's model. Override this to test for a model which belongs
  30. to a different app (e.g. testing Interfaces within the virtualization app).
  31. """
  32. return '{}:{}_{{}}'.format(
  33. self.model._meta.app_label,
  34. self.model._meta.model_name
  35. )
  36. def _get_url(self, action, instance=None):
  37. """
  38. Return the URL name for a specific action and optionally a specific instance
  39. """
  40. url_format = self._get_base_url()
  41. # If no instance was provided, assume we don't need a unique identifier
  42. if instance is None:
  43. return reverse(url_format.format(action))
  44. return reverse(url_format.format(action), kwargs={'pk': instance.pk})
  45. class ViewTestCases:
  46. """
  47. We keep any TestCases with test_* methods inside a class to prevent unittest from trying to run them.
  48. """
  49. class GetObjectViewTestCase(ModelViewTestCase):
  50. """
  51. Retrieve a single instance.
  52. """
  53. @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], LOGIN_REQUIRED=False)
  54. def test_get_object_anonymous(self):
  55. # Make the request as an unauthenticated user
  56. self.client.logout()
  57. ct = ContentType.objects.get_for_model(self.model)
  58. if (ct.app_label, ct.model) in settings.EXEMPT_EXCLUDE_MODELS:
  59. # Models listed in EXEMPT_EXCLUDE_MODELS should not be accessible to anonymous users
  60. with disable_warnings('django.request'):
  61. response = self.client.get(self._get_queryset().first().get_absolute_url())
  62. self.assertHttpStatus(response, 302)
  63. else:
  64. response = self.client.get(self._get_queryset().first().get_absolute_url())
  65. self.assertHttpStatus(response, 200)
  66. def test_get_object_without_permission(self):
  67. instance = self._get_queryset().first()
  68. # Try GET without permission
  69. with disable_warnings('django.request'):
  70. self.assertHttpStatus(self.client.get(instance.get_absolute_url()), 403)
  71. def test_get_object_with_permission(self):
  72. instance = self._get_queryset().first()
  73. # Add model-level permission
  74. obj_perm = ObjectPermission(
  75. name='Test permission',
  76. actions=['view']
  77. )
  78. obj_perm.save()
  79. obj_perm.users.add(self.user)
  80. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  81. # Try GET with model-level permission
  82. self.assertHttpStatus(self.client.get(instance.get_absolute_url()), 200)
  83. def test_get_object_with_constrained_permission(self):
  84. instance1, instance2 = self._get_queryset().all()[:2]
  85. # Add object-level permission
  86. obj_perm = ObjectPermission(
  87. name='Test permission',
  88. constraints={'pk': instance1.pk},
  89. actions=['view']
  90. )
  91. obj_perm.save()
  92. obj_perm.users.add(self.user)
  93. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  94. # Try GET to permitted object
  95. self.assertHttpStatus(self.client.get(instance1.get_absolute_url()), 200)
  96. # Try GET to non-permitted object
  97. self.assertHttpStatus(self.client.get(instance2.get_absolute_url()), 404)
  98. class GetObjectChangelogViewTestCase(ModelViewTestCase):
  99. """
  100. View the changelog for an instance.
  101. """
  102. @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
  103. def test_get_object_changelog(self):
  104. url = self._get_url('changelog', self._get_queryset().first())
  105. response = self.client.get(url)
  106. self.assertHttpStatus(response, 200)
  107. class CreateObjectViewTestCase(ModelViewTestCase):
  108. """
  109. Create a single new instance.
  110. :form_data: Data to be used when creating a new object.
  111. """
  112. form_data = {}
  113. validation_excluded_fields = []
  114. def test_create_object_without_permission(self):
  115. # Try GET without permission
  116. with disable_warnings('django.request'):
  117. self.assertHttpStatus(self.client.get(self._get_url('add')), 403)
  118. # Try POST without permission
  119. request = {
  120. 'path': self._get_url('add'),
  121. 'data': post_data(self.form_data),
  122. }
  123. response = self.client.post(**request)
  124. with disable_warnings('django.request'):
  125. self.assertHttpStatus(response, 403)
  126. @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[])
  127. def test_create_object_with_permission(self):
  128. # Assign unconstrained permission
  129. obj_perm = ObjectPermission(
  130. name='Test permission',
  131. actions=['add']
  132. )
  133. obj_perm.save()
  134. obj_perm.users.add(self.user)
  135. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  136. # Try GET with model-level permission
  137. self.assertHttpStatus(self.client.get(self._get_url('add')), 200)
  138. # Add custom field data if the model supports it
  139. if issubclass(self.model, CustomFieldsMixin):
  140. add_custom_field_data(self.form_data, self.model)
  141. # If supported, add a changelog message
  142. if issubclass(self.model, ChangeLoggingMixin):
  143. if 'changelog_message' not in self.form_data:
  144. self.form_data['changelog_message'] = get_random_string(10)
  145. # Try POST with model-level permission
  146. initial_count = self._get_queryset().count()
  147. request = {
  148. 'path': self._get_url('add'),
  149. 'data': post_data(self.form_data),
  150. }
  151. self.assertHttpStatus(self.client.post(**request), 302)
  152. self.assertEqual(initial_count + 1, self._get_queryset().count())
  153. instance = self._get_queryset().order_by('pk').last()
  154. self.assertInstanceEqual(instance, self.form_data, exclude=self.validation_excluded_fields)
  155. # Verify ObjectChange creation
  156. if issubclass(self.model, ChangeLoggingMixin):
  157. objectchanges = ObjectChange.objects.filter(
  158. changed_object_type=ContentType.objects.get_for_model(instance),
  159. changed_object_id=instance.pk
  160. )
  161. self.assertEqual(len(objectchanges), 1)
  162. self.assertObjectChange(objectchanges[0], action=ObjectChangeActionChoices.ACTION_CREATE,
  163. message=self.form_data['changelog_message'])
  164. @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[])
  165. def test_create_object_with_constrained_permission(self):
  166. # Assign constrained permission
  167. obj_perm = ObjectPermission(
  168. name='Test permission',
  169. constraints={'pk': 0}, # Dummy permission to deny all
  170. actions=['add']
  171. )
  172. obj_perm.save()
  173. obj_perm.users.add(self.user)
  174. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  175. # Try GET with object-level permission
  176. self.assertHttpStatus(self.client.get(self._get_url('add')), 200)
  177. # Try to create an object (not permitted)
  178. initial_count = self._get_queryset().count()
  179. request = {
  180. 'path': self._get_url('add'),
  181. 'data': post_data(self.form_data),
  182. }
  183. self.assertHttpStatus(self.client.post(**request), 200)
  184. self.assertEqual(initial_count, self._get_queryset().count()) # Check that no object was created
  185. # Update the ObjectPermission to allow creation
  186. obj_perm.constraints = {'pk__gt': 0}
  187. obj_perm.save()
  188. # Try to create an object (permitted)
  189. request = {
  190. 'path': self._get_url('add'),
  191. 'data': post_data(self.form_data),
  192. }
  193. self.assertHttpStatus(self.client.post(**request), 302)
  194. self.assertEqual(initial_count + 1, self._get_queryset().count())
  195. instance = self._get_queryset().order_by('pk').last()
  196. self.assertInstanceEqual(instance, self.form_data, exclude=self.validation_excluded_fields)
  197. class EditObjectViewTestCase(ModelViewTestCase):
  198. """
  199. Edit a single existing instance.
  200. :form_data: Data to be used when updating the first existing object.
  201. """
  202. form_data = {}
  203. validation_excluded_fields = []
  204. def test_edit_object_without_permission(self):
  205. instance = self._get_queryset().first()
  206. # Try GET without permission
  207. with disable_warnings('django.request'):
  208. self.assertHttpStatus(self.client.get(self._get_url('edit', instance)), 403)
  209. # Try POST without permission
  210. request = {
  211. 'path': self._get_url('edit', instance),
  212. 'data': post_data(self.form_data),
  213. }
  214. with disable_warnings('django.request'):
  215. self.assertHttpStatus(self.client.post(**request), 403)
  216. @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[])
  217. def test_edit_object_with_permission(self):
  218. instance = self._get_queryset().first()
  219. # Assign model-level permission
  220. obj_perm = ObjectPermission(
  221. name='Test permission',
  222. actions=['change']
  223. )
  224. obj_perm.save()
  225. obj_perm.users.add(self.user)
  226. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  227. # Try GET with model-level permission
  228. self.assertHttpStatus(self.client.get(self._get_url('edit', instance)), 200)
  229. # Add custom field data if the model supports it
  230. if issubclass(self.model, CustomFieldsMixin):
  231. add_custom_field_data(self.form_data, self.model)
  232. # If supported, add a changelog message
  233. if issubclass(self.model, ChangeLoggingMixin):
  234. if 'changelog_message' not in self.form_data:
  235. self.form_data['changelog_message'] = get_random_string(10)
  236. # Try POST with model-level permission
  237. request = {
  238. 'path': self._get_url('edit', instance),
  239. 'data': post_data(self.form_data),
  240. }
  241. self.assertHttpStatus(self.client.post(**request), 302)
  242. instance = self._get_queryset().get(pk=instance.pk)
  243. self.assertInstanceEqual(instance, self.form_data, exclude=self.validation_excluded_fields)
  244. # Verify ObjectChange creation
  245. if issubclass(self.model, ChangeLoggingMixin):
  246. objectchanges = ObjectChange.objects.filter(
  247. changed_object_type=ContentType.objects.get_for_model(instance),
  248. changed_object_id=instance.pk
  249. )
  250. self.assertEqual(len(objectchanges), 1)
  251. self.assertObjectChange(objectchanges[0], action=ObjectChangeActionChoices.ACTION_UPDATE,
  252. message=self.form_data['changelog_message'])
  253. @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[])
  254. def test_edit_object_with_constrained_permission(self):
  255. instance1, instance2 = self._get_queryset().all()[:2]
  256. # Assign constrained permission
  257. obj_perm = ObjectPermission(
  258. name='Test permission',
  259. constraints={'pk': instance1.pk},
  260. actions=['change']
  261. )
  262. obj_perm.save()
  263. obj_perm.users.add(self.user)
  264. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  265. # Try GET with a permitted object
  266. self.assertHttpStatus(self.client.get(self._get_url('edit', instance1)), 200)
  267. # Try GET with a non-permitted object
  268. self.assertHttpStatus(self.client.get(self._get_url('edit', instance2)), 404)
  269. # Try to edit a permitted object
  270. request = {
  271. 'path': self._get_url('edit', instance1),
  272. 'data': post_data(self.form_data),
  273. }
  274. self.assertHttpStatus(self.client.post(**request), 302)
  275. instance = self._get_queryset().get(pk=instance1.pk)
  276. self.assertInstanceEqual(instance, self.form_data, exclude=self.validation_excluded_fields)
  277. # Try to edit a non-permitted object
  278. request = {
  279. 'path': self._get_url('edit', instance2),
  280. 'data': post_data(self.form_data),
  281. }
  282. self.assertHttpStatus(self.client.post(**request), 404)
  283. class DeleteObjectViewTestCase(ModelViewTestCase):
  284. """
  285. Delete a single instance.
  286. """
  287. def test_delete_object_without_permission(self):
  288. instance = self._get_queryset().first()
  289. # Try GET without permission
  290. with disable_warnings('django.request'):
  291. self.assertHttpStatus(self.client.get(self._get_url('delete', instance)), 403)
  292. # Try POST without permission
  293. request = {
  294. 'path': self._get_url('delete', instance),
  295. 'data': post_data({'confirm': True}),
  296. }
  297. with disable_warnings('django.request'):
  298. self.assertHttpStatus(self.client.post(**request), 403)
  299. @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
  300. def test_delete_object_with_permission(self):
  301. instance = self._get_queryset().first()
  302. form_data = {'confirm': True}
  303. # Assign model-level permission
  304. obj_perm = ObjectPermission(
  305. name='Test permission',
  306. actions=['delete']
  307. )
  308. obj_perm.save()
  309. obj_perm.users.add(self.user)
  310. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  311. # Try GET with model-level permission
  312. self.assertHttpStatus(self.client.get(self._get_url('delete', instance)), 200)
  313. # If supported, add a changelog message
  314. if issubclass(self.model, ChangeLoggingMixin):
  315. form_data['changelog_message'] = get_random_string(10)
  316. # Try POST with model-level permission
  317. request = {
  318. 'path': self._get_url('delete', instance),
  319. 'data': post_data(form_data),
  320. }
  321. self.assertHttpStatus(self.client.post(**request), 302)
  322. with self.assertRaises(ObjectDoesNotExist):
  323. self._get_queryset().get(pk=instance.pk)
  324. # Verify ObjectChange creation
  325. if issubclass(self.model, ChangeLoggingMixin):
  326. objectchanges = ObjectChange.objects.filter(
  327. changed_object_type=ContentType.objects.get_for_model(instance),
  328. changed_object_id=instance.pk
  329. )
  330. self.assertEqual(len(objectchanges), 1)
  331. self.assertObjectChange(objectchanges[0], action=ObjectChangeActionChoices.ACTION_DELETE,
  332. message=form_data['changelog_message'])
  333. @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
  334. def test_delete_object_with_constrained_permission(self):
  335. instance1, instance2 = self._get_queryset().all()[:2]
  336. # Assign object-level permission
  337. obj_perm = ObjectPermission(
  338. name='Test permission',
  339. constraints={'pk': instance1.pk},
  340. actions=['delete']
  341. )
  342. obj_perm.save()
  343. obj_perm.users.add(self.user)
  344. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  345. # Try GET with a permitted object
  346. self.assertHttpStatus(self.client.get(self._get_url('delete', instance1)), 200)
  347. # Try GET with a non-permitted object
  348. self.assertHttpStatus(self.client.get(self._get_url('delete', instance2)), 404)
  349. # Try to delete a permitted object
  350. request = {
  351. 'path': self._get_url('delete', instance1),
  352. 'data': post_data({'confirm': True}),
  353. }
  354. self.assertHttpStatus(self.client.post(**request), 302)
  355. with self.assertRaises(ObjectDoesNotExist):
  356. self._get_queryset().get(pk=instance1.pk)
  357. # Try to delete a non-permitted object
  358. request = {
  359. 'path': self._get_url('delete', instance2),
  360. 'data': post_data({'confirm': True}),
  361. }
  362. self.assertHttpStatus(self.client.post(**request), 404)
  363. self.assertTrue(self._get_queryset().filter(pk=instance2.pk).exists())
  364. class ListObjectsViewTestCase(ModelViewTestCase):
  365. """
  366. Retrieve multiple instances.
  367. """
  368. @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], LOGIN_REQUIRED=False)
  369. def test_list_objects_anonymous(self):
  370. # Make the request as an unauthenticated user
  371. self.client.logout()
  372. ct = ContentType.objects.get_for_model(self.model)
  373. if (ct.app_label, ct.model) in settings.EXEMPT_EXCLUDE_MODELS:
  374. # Models listed in EXEMPT_EXCLUDE_MODELS should not be accessible to anonymous users
  375. with disable_warnings('django.request'):
  376. response = self.client.get(self._get_url('list'))
  377. self.assertHttpStatus(response, 302)
  378. else:
  379. response = self.client.get(self._get_url('list'))
  380. self.assertHttpStatus(response, 200)
  381. def test_list_objects_without_permission(self):
  382. # Try GET without permission
  383. with disable_warnings('django.request'):
  384. self.assertHttpStatus(self.client.get(self._get_url('list')), 403)
  385. def test_list_objects_with_permission(self):
  386. # Add model-level permission
  387. obj_perm = ObjectPermission(
  388. name='Test permission',
  389. actions=['view']
  390. )
  391. obj_perm.save()
  392. obj_perm.users.add(self.user)
  393. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  394. # Try GET with model-level permission
  395. self.assertHttpStatus(self.client.get(self._get_url('list')), 200)
  396. def test_list_objects_with_constrained_permission(self):
  397. instance1, instance2 = self._get_queryset().all()[:2]
  398. # Add object-level permission
  399. obj_perm = ObjectPermission(
  400. name='Test permission',
  401. constraints={'pk': instance1.pk},
  402. actions=['view']
  403. )
  404. obj_perm.save()
  405. obj_perm.users.add(self.user)
  406. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  407. # Try GET with object-level permission
  408. response = self.client.get(self._get_url('list'))
  409. self.assertHttpStatus(response, 200)
  410. content = str(response.content)
  411. self.assertIn(instance1.get_absolute_url(), content)
  412. self.assertNotIn(instance2.get_absolute_url(), content)
  413. def test_export_objects(self):
  414. url = self._get_url('list')
  415. # Add model-level permission
  416. obj_perm = ObjectPermission(
  417. name='Test permission',
  418. actions=['view']
  419. )
  420. obj_perm.save()
  421. obj_perm.users.add(self.user)
  422. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  423. # Test default CSV export
  424. response = self.client.get(f'{url}?export')
  425. self.assertHttpStatus(response, 200)
  426. self.assertEqual(response.get('Content-Type'), 'text/csv; charset=utf-8')
  427. # Test table-based export
  428. response = self.client.get(f'{url}?export=table')
  429. self.assertHttpStatus(response, 200)
  430. self.assertEqual(response.get('Content-Type'), 'text/csv; charset=utf-8')
  431. class CreateMultipleObjectsViewTestCase(ModelViewTestCase):
  432. """
  433. Create multiple instances using a single form. Expects the creation of three new instances by default.
  434. :bulk_create_count: The number of objects expected to be created (default: 3).
  435. :bulk_create_data: A dictionary of data to be used for bulk object creation.
  436. """
  437. bulk_create_count = 3
  438. bulk_create_data = {}
  439. validation_excluded_fields = []
  440. def test_create_multiple_objects_without_permission(self):
  441. request = {
  442. 'path': self._get_url('add'),
  443. 'data': post_data(self.bulk_create_data),
  444. }
  445. # Try POST without permission
  446. with disable_warnings('django.request'):
  447. self.assertHttpStatus(self.client.post(**request), 403)
  448. def test_create_multiple_objects_with_permission(self):
  449. initial_count = self._get_queryset().count()
  450. request = {
  451. 'path': self._get_url('add'),
  452. 'data': post_data(self.bulk_create_data),
  453. }
  454. # Assign non-constrained permission
  455. obj_perm = ObjectPermission(
  456. name='Test permission',
  457. actions=['add'],
  458. )
  459. obj_perm.save()
  460. obj_perm.users.add(self.user)
  461. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  462. # Bulk create objects
  463. response = self.client.post(**request)
  464. self.assertHttpStatus(response, 302)
  465. self.assertEqual(initial_count + self.bulk_create_count, self._get_queryset().count())
  466. for instance in self._get_queryset().order_by('-pk')[:self.bulk_create_count]:
  467. self.assertInstanceEqual(instance, self.bulk_create_data, exclude=self.validation_excluded_fields)
  468. def test_create_multiple_objects_with_constrained_permission(self):
  469. initial_count = self._get_queryset().count()
  470. request = {
  471. 'path': self._get_url('add'),
  472. 'data': post_data(self.bulk_create_data),
  473. }
  474. # Assign constrained permission
  475. obj_perm = ObjectPermission(
  476. name='Test permission',
  477. actions=['add'],
  478. constraints={'pk': 0} # Dummy constraint to deny all
  479. )
  480. obj_perm.save()
  481. obj_perm.users.add(self.user)
  482. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  483. # Attempt to make the request with unmet constraints
  484. self.assertHttpStatus(self.client.post(**request), 200)
  485. self.assertEqual(self._get_queryset().count(), initial_count)
  486. # Update the ObjectPermission to allow creation
  487. obj_perm.constraints = {'pk__gt': 0} # Dummy constraint to allow all
  488. obj_perm.save()
  489. response = self.client.post(**request)
  490. self.assertHttpStatus(response, 302)
  491. self.assertEqual(initial_count + self.bulk_create_count, self._get_queryset().count())
  492. for instance in self._get_queryset().order_by('-pk')[: self.bulk_create_count]:
  493. self.assertInstanceEqual(instance, self.bulk_create_data, exclude=self.validation_excluded_fields)
  494. class BulkImportObjectsViewTestCase(ModelViewTestCase):
  495. """
  496. Create multiple instances from imported data.
  497. :csv_data: CSV data for bulk import testing. Supports two formats:
  498. 1. Tuple/list format (backwards compatible):
  499. csv_data = (
  500. "name,slug,description",
  501. "Object 1,object-1,First object",
  502. "Object 2,object-2,Second object",
  503. )
  504. 2. Dictionary format for multiple scenarios:
  505. csv_data = {
  506. 'default': (
  507. "name,slug,description",
  508. "Object 1,object-1,First object",
  509. ),
  510. 'with_optional_fields': (
  511. "name,slug,description,comments",
  512. "Object 2,object-2,Second object,With comments",
  513. )
  514. }
  515. When using dictionary format, test_bulk_import_objects_with_permission()
  516. runs each scenario as a separate subtest with clear output:
  517. test_bulk_import_objects_with_permission (scenario=default) ... ok
  518. test_bulk_import_objects_with_permission (scenario=with_optional_fields) ... ok
  519. """
  520. csv_data = ()
  521. def get_scenarios(self):
  522. return self.csv_data.keys() if isinstance(self.csv_data, dict) else ['default']
  523. def _get_csv_data(self, scenario_name='default'):
  524. """
  525. Get CSV data for testing. Supports both tuple/list and dictionary formats.
  526. """
  527. if isinstance(self.csv_data, dict):
  528. if scenario_name not in self.csv_data:
  529. available = ', '.join(self.csv_data.keys())
  530. raise ValueError(f"Scenario '{scenario_name}' not found in csv_data. Available: {available}")
  531. return '\n'.join(self.csv_data[scenario_name])
  532. if isinstance(self.csv_data, (tuple, list)):
  533. return '\n'.join(self.csv_data)
  534. raise TypeError(f'csv_data must be a tuple, list, or dictionary, got {type(self.csv_data)}')
  535. def _get_update_csv_data(self):
  536. return self.csv_update_data, '\n'.join(self.csv_update_data)
  537. def test_bulk_import_objects_without_permission(self):
  538. data = {
  539. 'data': self._get_csv_data(),
  540. 'format': ImportFormatChoices.CSV,
  541. 'csv_delimiter': CSVDelimiterChoices.AUTO,
  542. }
  543. # Test GET without permission
  544. with disable_warnings('django.request'):
  545. self.assertHttpStatus(self.client.get(self._get_url('bulk_import')), 403)
  546. # Try POST without permission
  547. response = self.client.post(self._get_url('bulk_import'), data)
  548. with disable_warnings('django.request'):
  549. self.assertHttpStatus(response, 403)
  550. @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[])
  551. def test_bulk_import_objects_with_permission(self, post_import_callback=None):
  552. # Assign model-level permission once for all scenarios
  553. obj_perm = ObjectPermission(name='Test permission', actions=['add'])
  554. obj_perm.save()
  555. obj_perm.users.add(self.user)
  556. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  557. # Try GET with model-level permission (only once)
  558. self.assertHttpStatus(self.client.get(self._get_url('bulk_import')), 200)
  559. # Test each scenario
  560. for scenario_name in self.get_scenarios():
  561. with self.cleanupSubTest(scenario=scenario_name):
  562. self._test_bulk_import_with_permission_scenario(scenario_name)
  563. if post_import_callback:
  564. post_import_callback(scenario_name)
  565. def _test_bulk_import_with_permission_scenario(self, scenario_name):
  566. """
  567. Helper method to test a single bulk import scenario.
  568. """
  569. initial_count = self._get_queryset().count()
  570. # Get CSV data for this scenario
  571. scenario_data = self._get_csv_data(scenario_name)
  572. expected_new_objects = len(scenario_data.splitlines()) - 1
  573. data = {
  574. 'data': scenario_data,
  575. 'format': ImportFormatChoices.CSV,
  576. 'csv_delimiter': CSVDelimiterChoices.AUTO,
  577. }
  578. # If supported, add a changelog message
  579. if issubclass(self.model, ChangeLoggingMixin):
  580. data['changelog_message'] = get_random_string(10)
  581. # Test POST with permission
  582. response = self.client.post(self._get_url('bulk_import'), data)
  583. self.assertHttpStatus(response, 302)
  584. # Verify object count increase
  585. self.assertEqual(self._get_queryset().count(), initial_count + expected_new_objects)
  586. # Verify ObjectChange creation
  587. if issubclass(self.model, ChangeLoggingMixin):
  588. request_id = response.headers.get('X-Request-ID')
  589. self.assertIsNotNone(request_id, 'Unable to determine request ID from response')
  590. objectchanges = ObjectChange.objects.filter(
  591. changed_object_type=ContentType.objects.get_for_model(self.model),
  592. request_id=request_id,
  593. action=ObjectChangeActionChoices.ACTION_CREATE,
  594. )
  595. self.assertEqual(len(objectchanges), expected_new_objects)
  596. for oc in objectchanges:
  597. self.assertObjectChange(oc, action=ObjectChangeActionChoices.ACTION_CREATE,
  598. message=data['changelog_message'])
  599. @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
  600. def test_bulk_update_objects_with_permission(self):
  601. if not hasattr(self, 'csv_update_data'):
  602. raise NotImplementedError(_("The test must define csv_update_data."))
  603. initial_count = self._get_queryset().count()
  604. array, csv_data = self._get_update_csv_data()
  605. data = {
  606. 'format': ImportFormatChoices.CSV,
  607. 'data': csv_data,
  608. 'csv_delimiter': CSVDelimiterChoices.AUTO,
  609. }
  610. # Assign model-level permission
  611. obj_perm = ObjectPermission(
  612. name='Test permission',
  613. actions=['add']
  614. )
  615. obj_perm.save()
  616. obj_perm.users.add(self.user)
  617. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  618. # Test POST with permission
  619. self.assertHttpStatus(self.client.post(self._get_url('bulk_import'), data), 302)
  620. self.assertEqual(initial_count, self._get_queryset().count())
  621. reader = csv.DictReader(array, delimiter=',')
  622. check_data = list(reader)
  623. for line in check_data:
  624. obj = self.model.objects.get(id=line["id"])
  625. for attr, value in line.items():
  626. if attr != "id":
  627. field = self.model._meta.get_field(attr)
  628. value = getattr(obj, attr)
  629. # cannot verify FK fields as don't know what name the CSV maps to
  630. if value is not None and not isinstance(field, ForeignKey):
  631. self.assertEqual(value, value)
  632. @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[])
  633. def test_bulk_import_objects_with_constrained_permission(self, post_import_callback=None):
  634. # Assign constrained permission (deny all initially)
  635. obj_perm = ObjectPermission(
  636. name='Test permission',
  637. constraints={'pk': 0}, # Dummy permission to deny all
  638. actions=['add'],
  639. )
  640. obj_perm.save()
  641. obj_perm.users.add(self.user)
  642. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  643. # Test each scenario with constrained permissions
  644. for scenario_name in self.get_scenarios():
  645. with self.cleanupSubTest(scenario=scenario_name):
  646. self._test_bulk_import_constrained_scenario(scenario_name, obj_perm)
  647. if post_import_callback:
  648. post_import_callback(scenario_name)
  649. def _test_bulk_import_constrained_scenario(self, scenario_name, obj_perm):
  650. """
  651. Helper method to test a single bulk import scenario with constrained permissions.
  652. """
  653. initial_count = self._get_queryset().count()
  654. # Get CSV data for this scenario
  655. scenario_data = self._get_csv_data(scenario_name)
  656. expected_new_objects = len(scenario_data.splitlines()) - 1
  657. data = {
  658. 'data': scenario_data,
  659. 'format': ImportFormatChoices.CSV,
  660. 'csv_delimiter': CSVDelimiterChoices.AUTO,
  661. }
  662. # Attempt to import non-permitted objects (should fail)
  663. self.assertHttpStatus(self.client.post(self._get_url('bulk_import'), data), 200)
  664. self.assertEqual(self._get_queryset().count(), initial_count)
  665. # Update permission constraints to allow all
  666. obj_perm.constraints = {'pk__gt': 0} # Dummy permission to allow all
  667. obj_perm.save()
  668. # Import permitted objects (should succeed)
  669. self.assertHttpStatus(self.client.post(self._get_url('bulk_import'), data), 302)
  670. self.assertEqual(self._get_queryset().count(), initial_count + expected_new_objects)
  671. class BulkEditObjectsViewTestCase(ModelViewTestCase):
  672. """
  673. Edit multiple instances.
  674. :bulk_edit_data: A dictionary of data to be used when bulk editing a set of objects. This data should differ
  675. from that used for initial object creation within setUpTestData().
  676. """
  677. bulk_edit_data = {}
  678. def test_bulk_edit_objects_without_permission(self):
  679. pk_list = self._get_queryset().values_list('pk', flat=True)[:3]
  680. data = {
  681. 'pk': pk_list,
  682. '_apply': True, # Form button
  683. }
  684. # Test GET without permission
  685. with disable_warnings('django.request'):
  686. self.assertHttpStatus(self.client.get(self._get_url('bulk_edit')), 403)
  687. # Try POST without permission
  688. with disable_warnings('django.request'):
  689. self.assertHttpStatus(self.client.post(self._get_url('bulk_edit'), data), 403)
  690. @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[])
  691. def test_bulk_edit_objects_with_permission(self):
  692. pk_list = list(self._get_queryset().values_list('pk', flat=True)[:3])
  693. data = {
  694. 'pk': pk_list,
  695. '_apply': True, # Form button
  696. }
  697. # If supported, add a changelog message
  698. if issubclass(self.model, ChangeLoggingMixin):
  699. data['changelog_message'] = get_random_string(10)
  700. # Append the form data to the request
  701. data.update(post_data(self.bulk_edit_data))
  702. # Assign model-level permission
  703. obj_perm = ObjectPermission(
  704. name='Test permission',
  705. actions=['view', 'change']
  706. )
  707. obj_perm.save()
  708. obj_perm.users.add(self.user)
  709. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  710. # Try POST with model-level permission
  711. response = self.client.post(self._get_url('bulk_edit'), data)
  712. self.assertHttpStatus(response, 302)
  713. for i, instance in enumerate(self._get_queryset().filter(pk__in=pk_list)):
  714. self.assertInstanceEqual(instance, self.bulk_edit_data)
  715. # Verify ObjectChange creation
  716. if issubclass(self.model, ChangeLoggingMixin):
  717. request_id = response.headers.get('X-Request-ID')
  718. self.assertIsNotNone(request_id, "Unable to determine request ID from response")
  719. objectchanges = ObjectChange.objects.filter(
  720. changed_object_type=ContentType.objects.get_for_model(self.model),
  721. changed_object_id__in=pk_list
  722. )
  723. self.assertEqual(len(objectchanges), len(pk_list))
  724. for oc in objectchanges:
  725. self.assertObjectChange(oc, action=ObjectChangeActionChoices.ACTION_UPDATE,
  726. message=data['changelog_message'])
  727. @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'], EXEMPT_EXCLUDE_MODELS=[])
  728. def test_bulk_edit_objects_with_constrained_permission(self):
  729. pk_list = list(self._get_queryset().values_list('pk', flat=True)[:3])
  730. data = {
  731. 'pk': pk_list,
  732. '_apply': True, # Form button
  733. }
  734. # Append the form data to the request
  735. data.update(post_data(self.bulk_edit_data))
  736. # Dynamically determine a constraint that will *not* be matched by the updated objects.
  737. attr_name = list(self.bulk_edit_data.keys())[0]
  738. field = self.model._meta.get_field(attr_name)
  739. value = field.value_from_object(self._get_queryset().first())
  740. # Assign constrained permission
  741. obj_perm = ObjectPermission(
  742. name='Test permission',
  743. constraints={attr_name: value},
  744. actions=['view', 'change']
  745. )
  746. obj_perm.save()
  747. obj_perm.users.add(self.user)
  748. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  749. # Attempt to bulk edit permitted objects into a non-permitted state
  750. response = self.client.post(self._get_url('bulk_edit'), data)
  751. self.assertHttpStatus(response, 200)
  752. # Update permission constraints
  753. obj_perm.constraints = {'pk__gt': 0}
  754. obj_perm.save()
  755. # Bulk edit permitted objects
  756. self.assertHttpStatus(self.client.post(self._get_url('bulk_edit'), data), 302)
  757. for i, instance in enumerate(self._get_queryset().filter(pk__in=pk_list)):
  758. self.assertInstanceEqual(instance, self.bulk_edit_data)
  759. class BulkDeleteObjectsViewTestCase(ModelViewTestCase):
  760. """
  761. Delete multiple instances.
  762. """
  763. def test_bulk_delete_objects_without_permission(self):
  764. pk_list = self._get_queryset().values_list('pk', flat=True)[:3]
  765. data = {
  766. 'pk': pk_list,
  767. 'confirm': True,
  768. '_confirm': True, # Form button
  769. }
  770. # Test GET without permission
  771. with disable_warnings('django.request'):
  772. self.assertHttpStatus(self.client.get(self._get_url('bulk_delete')), 403)
  773. # Try POST without permission
  774. with disable_warnings('django.request'):
  775. self.assertHttpStatus(self.client.post(self._get_url('bulk_delete'), data), 403)
  776. def test_bulk_delete_objects_with_permission(self):
  777. pk_list = list(self._get_queryset().values_list('pk', flat=True))[:3]
  778. data = {
  779. 'pk': pk_list,
  780. 'confirm': True,
  781. '_confirm': True, # Form button
  782. }
  783. # If supported, add a changelog message
  784. if issubclass(self.model, ChangeLoggingMixin):
  785. data['changelog_message'] = get_random_string(10)
  786. # Assign unconstrained permission
  787. obj_perm = ObjectPermission(
  788. name='Test permission',
  789. actions=['delete']
  790. )
  791. obj_perm.save()
  792. obj_perm.users.add(self.user)
  793. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  794. # Try POST with model-level permission
  795. response = self.client.post(self._get_url('bulk_delete'), data)
  796. self.assertHttpStatus(response, 302)
  797. self.assertFalse(self._get_queryset().filter(pk__in=pk_list).exists())
  798. # Verify ObjectChange creation
  799. if issubclass(self.model, ChangeLoggingMixin):
  800. objectchanges = ObjectChange.objects.filter(
  801. changed_object_type=ContentType.objects.get_for_model(self.model),
  802. changed_object_id__in=pk_list
  803. )
  804. self.assertEqual(len(objectchanges), len(pk_list))
  805. for oc in objectchanges:
  806. self.assertObjectChange(oc, action=ObjectChangeActionChoices.ACTION_DELETE,
  807. message=data['changelog_message'])
  808. def test_bulk_delete_objects_with_constrained_permission(self):
  809. pk_list = self._get_queryset().values_list('pk', flat=True)
  810. data = {
  811. 'pk': pk_list,
  812. 'confirm': True,
  813. '_confirm': True, # Form button
  814. }
  815. # Assign constrained permission
  816. obj_perm = ObjectPermission(
  817. name='Test permission',
  818. constraints={'pk': 0}, # Dummy permission to deny all
  819. actions=['delete']
  820. )
  821. obj_perm.save()
  822. obj_perm.users.add(self.user)
  823. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  824. # Attempt to bulk delete non-permitted objects
  825. initial_count = self._get_queryset().count()
  826. self.assertHttpStatus(self.client.post(self._get_url('bulk_delete'), data), 302)
  827. self.assertEqual(self._get_queryset().count(), initial_count)
  828. # Update permission constraints
  829. obj_perm.constraints = {'pk__gt': 0} # Dummy permission to allow all
  830. obj_perm.save()
  831. # Bulk delete permitted objects
  832. self.assertHttpStatus(self.client.post(self._get_url('bulk_delete'), data), 302)
  833. self.assertEqual(self._get_queryset().count(), 0)
  834. class BulkRenameObjectsViewTestCase(ModelViewTestCase):
  835. """
  836. Rename multiple instances.
  837. """
  838. rename_data = {
  839. 'find': '^(.*)$',
  840. 'replace': '\\1X', # Append an X to the original value
  841. 'use_regex': True,
  842. }
  843. def test_bulk_rename_objects_without_permission(self):
  844. pk_list = self._get_queryset().values_list('pk', flat=True)[:3]
  845. data = {
  846. 'pk': pk_list,
  847. '_apply': True, # Form button
  848. }
  849. data.update(self.rename_data)
  850. # Test GET without permission
  851. with disable_warnings('django.request'):
  852. self.assertHttpStatus(self.client.get(self._get_url('bulk_rename')), 403)
  853. # Try POST without permission
  854. with disable_warnings('django.request'):
  855. self.assertHttpStatus(self.client.post(self._get_url('bulk_rename'), data), 403)
  856. @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
  857. def test_bulk_rename_objects_with_permission(self):
  858. objects = self._get_queryset().all()[:3]
  859. pk_list = [obj.pk for obj in objects]
  860. data = {
  861. 'pk': pk_list,
  862. '_apply': True, # Form button
  863. }
  864. data.update(self.rename_data)
  865. # Assign model-level permission
  866. obj_perm = ObjectPermission(
  867. name='Test permission',
  868. actions=['change']
  869. )
  870. obj_perm.save()
  871. obj_perm.users.add(self.user)
  872. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  873. # Try POST with model-level permission
  874. self.assertHttpStatus(self.client.post(self._get_url('bulk_rename'), data), 302)
  875. for i, instance in enumerate(self._get_queryset().filter(pk__in=pk_list)):
  876. self.assertEqual(instance.name, f'{objects[i].name}X')
  877. @override_settings(EXEMPT_VIEW_PERMISSIONS=['*'])
  878. def test_bulk_rename_objects_with_constrained_permission(self):
  879. objects = self._get_queryset().all()[:3]
  880. pk_list = [obj.pk for obj in objects]
  881. data = {
  882. 'pk': pk_list,
  883. '_apply': True, # Form button
  884. }
  885. data.update(self.rename_data)
  886. # Assign constrained permission
  887. obj_perm = ObjectPermission(
  888. name='Test permission',
  889. constraints={'name__regex': '[^X]$'},
  890. actions=['change']
  891. )
  892. obj_perm.save()
  893. obj_perm.users.add(self.user)
  894. obj_perm.object_types.add(ObjectType.objects.get_for_model(self.model))
  895. # Attempt to bulk edit permitted objects into a non-permitted state
  896. response = self.client.post(self._get_url('bulk_rename'), data)
  897. self.assertHttpStatus(response, 200)
  898. # Update permission constraints
  899. obj_perm.constraints = {'pk__gt': 0}
  900. obj_perm.save()
  901. # Bulk rename permitted objects
  902. self.assertHttpStatus(self.client.post(self._get_url('bulk_rename'), data), 302)
  903. for i, instance in enumerate(self._get_queryset().filter(pk__in=pk_list)):
  904. self.assertEqual(instance.name, f'{objects[i].name}X')
  905. class PrimaryObjectViewTestCase(
  906. GetObjectViewTestCase,
  907. GetObjectChangelogViewTestCase,
  908. CreateObjectViewTestCase,
  909. EditObjectViewTestCase,
  910. DeleteObjectViewTestCase,
  911. ListObjectsViewTestCase,
  912. BulkImportObjectsViewTestCase,
  913. BulkEditObjectsViewTestCase,
  914. BulkDeleteObjectsViewTestCase,
  915. ):
  916. """
  917. TestCase suitable for testing all standard View functions for primary objects
  918. """
  919. maxDiff = None
  920. class OrganizationalObjectViewTestCase(
  921. GetObjectViewTestCase,
  922. GetObjectChangelogViewTestCase,
  923. CreateObjectViewTestCase,
  924. EditObjectViewTestCase,
  925. DeleteObjectViewTestCase,
  926. ListObjectsViewTestCase,
  927. BulkImportObjectsViewTestCase,
  928. BulkEditObjectsViewTestCase,
  929. BulkDeleteObjectsViewTestCase,
  930. ):
  931. """
  932. TestCase suitable for all organizational objects
  933. """
  934. maxDiff = None
  935. class AdminModelViewTestCase(
  936. GetObjectViewTestCase,
  937. CreateObjectViewTestCase,
  938. EditObjectViewTestCase,
  939. DeleteObjectViewTestCase,
  940. ListObjectsViewTestCase,
  941. BulkImportObjectsViewTestCase,
  942. BulkEditObjectsViewTestCase,
  943. BulkDeleteObjectsViewTestCase,
  944. ):
  945. """
  946. TestCase suitable for testing all standard View functions for objects which inherit from AdminModel.
  947. """
  948. maxDiff = None
  949. class DeviceComponentTemplateViewTestCase(
  950. EditObjectViewTestCase,
  951. DeleteObjectViewTestCase,
  952. CreateMultipleObjectsViewTestCase,
  953. BulkEditObjectsViewTestCase,
  954. BulkRenameObjectsViewTestCase,
  955. BulkDeleteObjectsViewTestCase,
  956. ):
  957. """
  958. TestCase suitable for testing device component template models (ConsolePortTemplates, InterfaceTemplates, etc.)
  959. """
  960. maxDiff = None
  961. class DeviceComponentViewTestCase(
  962. GetObjectViewTestCase,
  963. GetObjectChangelogViewTestCase,
  964. EditObjectViewTestCase,
  965. DeleteObjectViewTestCase,
  966. ListObjectsViewTestCase,
  967. CreateMultipleObjectsViewTestCase,
  968. BulkImportObjectsViewTestCase,
  969. BulkEditObjectsViewTestCase,
  970. BulkRenameObjectsViewTestCase,
  971. BulkDeleteObjectsViewTestCase,
  972. ):
  973. """
  974. TestCase suitable for testing device component models (ConsolePorts, Interfaces, etc.)
  975. """
  976. maxDiff = None