bulk_import.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658
  1. from django import forms
  2. from django.contrib.contenttypes.models import ContentType
  3. from django.utils.translation import gettext_lazy as _
  4. from dcim.models import Device, Interface, Site
  5. from dcim.forms.mixins import ScopedImportForm
  6. from ipam.choices import *
  7. from ipam.constants import *
  8. from ipam.models import *
  9. from netbox.forms import NetBoxModelImportForm
  10. from tenancy.models import Tenant
  11. from utilities.forms.fields import (
  12. CSVChoiceField, CSVContentTypeField, CSVModelChoiceField, CSVModelMultipleChoiceField, SlugField,
  13. NumericRangeArrayField,
  14. )
  15. from virtualization.models import VirtualMachine, VMInterface
  16. __all__ = (
  17. 'AggregateImportForm',
  18. 'ASNImportForm',
  19. 'ASNRangeImportForm',
  20. 'FHRPGroupImportForm',
  21. 'IPAddressImportForm',
  22. 'IPRangeImportForm',
  23. 'PrefixImportForm',
  24. 'RIRImportForm',
  25. 'RoleImportForm',
  26. 'RouteTargetImportForm',
  27. 'ServiceImportForm',
  28. 'ServiceTemplateImportForm',
  29. 'VLANImportForm',
  30. 'VLANGroupImportForm',
  31. 'VLANTranslationPolicyImportForm',
  32. 'VLANTranslationRuleImportForm',
  33. 'VRFImportForm',
  34. )
  35. class VRFImportForm(NetBoxModelImportForm):
  36. tenant = CSVModelChoiceField(
  37. label=_('Tenant'),
  38. queryset=Tenant.objects.all(),
  39. required=False,
  40. to_field_name='name',
  41. help_text=_('Assigned tenant')
  42. )
  43. import_targets = CSVModelMultipleChoiceField(
  44. queryset=RouteTarget.objects.all(),
  45. required=False,
  46. to_field_name='name',
  47. help_text=_('Import route targets')
  48. )
  49. export_targets = CSVModelMultipleChoiceField(
  50. queryset=RouteTarget.objects.all(),
  51. required=False,
  52. to_field_name='name',
  53. help_text=_('Export route targets')
  54. )
  55. class Meta:
  56. model = VRF
  57. fields = (
  58. 'name', 'rd', 'tenant', 'enforce_unique', 'description', 'import_targets', 'export_targets', 'comments',
  59. 'tags',
  60. )
  61. class RouteTargetImportForm(NetBoxModelImportForm):
  62. tenant = CSVModelChoiceField(
  63. label=_('Tenant'),
  64. queryset=Tenant.objects.all(),
  65. required=False,
  66. to_field_name='name',
  67. help_text=_('Assigned tenant')
  68. )
  69. class Meta:
  70. model = RouteTarget
  71. fields = ('name', 'tenant', 'description', 'comments', 'tags')
  72. class RIRImportForm(NetBoxModelImportForm):
  73. slug = SlugField()
  74. class Meta:
  75. model = RIR
  76. fields = ('name', 'slug', 'is_private', 'description', 'tags')
  77. class AggregateImportForm(NetBoxModelImportForm):
  78. rir = CSVModelChoiceField(
  79. label=_('RIR'),
  80. queryset=RIR.objects.all(),
  81. to_field_name='name',
  82. help_text=_('Assigned RIR')
  83. )
  84. tenant = CSVModelChoiceField(
  85. label=_('Tenant'),
  86. queryset=Tenant.objects.all(),
  87. required=False,
  88. to_field_name='name',
  89. help_text=_('Assigned tenant')
  90. )
  91. class Meta:
  92. model = Aggregate
  93. fields = ('prefix', 'rir', 'tenant', 'date_added', 'description', 'comments', 'tags')
  94. class ASNRangeImportForm(NetBoxModelImportForm):
  95. rir = CSVModelChoiceField(
  96. label=_('RIR'),
  97. queryset=RIR.objects.all(),
  98. to_field_name='name',
  99. help_text=_('Assigned RIR')
  100. )
  101. tenant = CSVModelChoiceField(
  102. label=_('Tenant'),
  103. queryset=Tenant.objects.all(),
  104. required=False,
  105. to_field_name='name',
  106. help_text=_('Assigned tenant')
  107. )
  108. class Meta:
  109. model = ASNRange
  110. fields = ('name', 'slug', 'rir', 'start', 'end', 'tenant', 'description', 'tags')
  111. class ASNImportForm(NetBoxModelImportForm):
  112. rir = CSVModelChoiceField(
  113. label=_('RIR'),
  114. queryset=RIR.objects.all(),
  115. to_field_name='name',
  116. help_text=_('Assigned RIR')
  117. )
  118. tenant = CSVModelChoiceField(
  119. label=_('Tenant'),
  120. queryset=Tenant.objects.all(),
  121. required=False,
  122. to_field_name='name',
  123. help_text=_('Assigned tenant')
  124. )
  125. class Meta:
  126. model = ASN
  127. fields = ('asn', 'rir', 'tenant', 'description', 'comments', 'tags')
  128. class RoleImportForm(NetBoxModelImportForm):
  129. slug = SlugField()
  130. class Meta:
  131. model = Role
  132. fields = ('name', 'slug', 'weight', 'description', 'tags')
  133. class PrefixImportForm(ScopedImportForm, NetBoxModelImportForm):
  134. vrf = CSVModelChoiceField(
  135. label=_('VRF'),
  136. queryset=VRF.objects.all(),
  137. to_field_name='name',
  138. required=False,
  139. help_text=_('Assigned VRF')
  140. )
  141. tenant = CSVModelChoiceField(
  142. label=_('Tenant'),
  143. queryset=Tenant.objects.all(),
  144. required=False,
  145. to_field_name='name',
  146. help_text=_('Assigned tenant')
  147. )
  148. vlan_group = CSVModelChoiceField(
  149. label=_('VLAN group'),
  150. queryset=VLANGroup.objects.all(),
  151. required=False,
  152. to_field_name='name',
  153. help_text=_("VLAN's group (if any)")
  154. )
  155. vlan_site = CSVModelChoiceField(
  156. label=_('VLAN Site'),
  157. queryset=Site.objects.all(),
  158. required=False,
  159. to_field_name='name',
  160. help_text=_("VLAN's site (if any)")
  161. )
  162. vlan = CSVModelChoiceField(
  163. label=_('VLAN'),
  164. queryset=VLAN.objects.all(),
  165. required=False,
  166. to_field_name='vid',
  167. help_text=_("Assigned VLAN")
  168. )
  169. status = CSVChoiceField(
  170. label=_('Status'),
  171. choices=PrefixStatusChoices,
  172. help_text=_('Operational status')
  173. )
  174. role = CSVModelChoiceField(
  175. label=_('Role'),
  176. queryset=Role.objects.all(),
  177. required=False,
  178. to_field_name='name',
  179. help_text=_('Functional role')
  180. )
  181. class Meta:
  182. model = Prefix
  183. fields = (
  184. 'prefix', 'vrf', 'tenant', 'vlan_group', 'vlan_site', 'vlan', 'status', 'role', 'scope_type', 'scope_id',
  185. 'is_pool', 'mark_utilized', 'description', 'comments', 'tags',
  186. )
  187. labels = {
  188. 'scope_id': _('Scope ID'),
  189. }
  190. def __init__(self, data=None, *args, **kwargs):
  191. super().__init__(data, *args, **kwargs)
  192. if not data:
  193. return
  194. vlan_site = data.get('vlan_site')
  195. vlan_group = data.get('vlan_group')
  196. # Limit VLAN queryset by assigned site and/or group (if specified)
  197. query = Q()
  198. if vlan_site:
  199. query |= Q(**{
  200. f"site__{self.fields['vlan_site'].to_field_name}": vlan_site
  201. })
  202. # Don't Forget to include VLANs without a site in the filter
  203. query |= Q(**{
  204. f"site__{self.fields['vlan_site'].to_field_name}__isnull": True
  205. })
  206. if vlan_group:
  207. query &= Q(**{
  208. f"group__{self.fields['vlan_group'].to_field_name}": vlan_group
  209. })
  210. queryset = self.fields['vlan'].queryset.filter(query)
  211. self.fields['vlan'].queryset = queryset
  212. class IPRangeImportForm(NetBoxModelImportForm):
  213. vrf = CSVModelChoiceField(
  214. label=_('VRF'),
  215. queryset=VRF.objects.all(),
  216. to_field_name='name',
  217. required=False,
  218. help_text=_('Assigned VRF')
  219. )
  220. tenant = CSVModelChoiceField(
  221. label=_('Tenant'),
  222. queryset=Tenant.objects.all(),
  223. required=False,
  224. to_field_name='name',
  225. help_text=_('Assigned tenant')
  226. )
  227. status = CSVChoiceField(
  228. label=_('Status'),
  229. choices=IPRangeStatusChoices,
  230. help_text=_('Operational status')
  231. )
  232. role = CSVModelChoiceField(
  233. label=_('Role'),
  234. queryset=Role.objects.all(),
  235. required=False,
  236. to_field_name='name',
  237. help_text=_('Functional role')
  238. )
  239. class Meta:
  240. model = IPRange
  241. fields = (
  242. 'start_address', 'end_address', 'vrf', 'tenant', 'status', 'role', 'mark_populated', 'mark_utilized',
  243. 'description', 'comments', 'tags',
  244. )
  245. class IPAddressImportForm(NetBoxModelImportForm):
  246. vrf = CSVModelChoiceField(
  247. label=_('VRF'),
  248. queryset=VRF.objects.all(),
  249. to_field_name='name',
  250. required=False,
  251. help_text=_('Assigned VRF')
  252. )
  253. tenant = CSVModelChoiceField(
  254. label=_('Tenant'),
  255. queryset=Tenant.objects.all(),
  256. to_field_name='name',
  257. required=False,
  258. help_text=_('Assigned tenant')
  259. )
  260. status = CSVChoiceField(
  261. label=_('Status'),
  262. choices=IPAddressStatusChoices,
  263. help_text=_('Operational status')
  264. )
  265. role = CSVChoiceField(
  266. label=_('Role'),
  267. choices=IPAddressRoleChoices,
  268. required=False,
  269. help_text=_('Functional role')
  270. )
  271. device = CSVModelChoiceField(
  272. label=_('Device'),
  273. queryset=Device.objects.all(),
  274. required=False,
  275. to_field_name='name',
  276. help_text=_('Parent device of assigned interface (if any)')
  277. )
  278. virtual_machine = CSVModelChoiceField(
  279. label=_('Virtual machine'),
  280. queryset=VirtualMachine.objects.all(),
  281. required=False,
  282. to_field_name='name',
  283. help_text=_('Parent VM of assigned interface (if any)')
  284. )
  285. interface = CSVModelChoiceField(
  286. label=_('Interface'),
  287. queryset=Interface.objects.none(), # Can also refer to VMInterface
  288. required=False,
  289. to_field_name='name',
  290. help_text=_('Assigned interface')
  291. )
  292. fhrp_group = CSVModelChoiceField(
  293. label=_('FHRP Group'),
  294. queryset=FHRPGroup.objects.all(),
  295. required=False,
  296. to_field_name='name',
  297. help_text=_('Assigned FHRP Group name')
  298. )
  299. is_primary = forms.BooleanField(
  300. label=_('Is primary'),
  301. help_text=_('Make this the primary IP for the assigned device'),
  302. required=False
  303. )
  304. is_oob = forms.BooleanField(
  305. label=_('Is out-of-band'),
  306. help_text=_('Designate this as the out-of-band IP address for the assigned device'),
  307. required=False
  308. )
  309. class Meta:
  310. model = IPAddress
  311. fields = [
  312. 'address', 'vrf', 'tenant', 'status', 'role', 'device', 'virtual_machine', 'interface', 'fhrp_group',
  313. 'is_primary', 'is_oob', 'dns_name', 'description', 'comments', 'tags',
  314. ]
  315. def __init__(self, data=None, *args, **kwargs):
  316. super().__init__(data, *args, **kwargs)
  317. if data:
  318. # Limit interface queryset by assigned device
  319. if data.get('device'):
  320. self.fields['interface'].queryset = Interface.objects.filter(
  321. **{f"device__{self.fields['device'].to_field_name}": data['device']}
  322. )
  323. # Limit interface queryset by assigned VM
  324. elif data.get('virtual_machine'):
  325. self.fields['interface'].queryset = VMInterface.objects.filter(
  326. **{f"virtual_machine__{self.fields['virtual_machine'].to_field_name}": data['virtual_machine']}
  327. )
  328. def clean_is_primary(self):
  329. # Make sure is_primary is None when it's not included in the uploaded data
  330. if 'is_primary' not in self.data:
  331. return None
  332. else:
  333. return self.cleaned_data['is_primary']
  334. def clean_is_oob(self):
  335. # Make sure is_oob is None when it's not included in the uploaded data
  336. if 'is_oob' not in self.data:
  337. return None
  338. else:
  339. return self.cleaned_data['is_oob']
  340. def clean(self):
  341. super().clean()
  342. device = self.cleaned_data.get('device')
  343. virtual_machine = self.cleaned_data.get('virtual_machine')
  344. interface = self.cleaned_data.get('interface')
  345. is_primary = self.cleaned_data.get('is_primary')
  346. is_oob = self.cleaned_data.get('is_oob')
  347. # Validate is_primary and is_oob
  348. if is_primary and not device and not virtual_machine:
  349. raise forms.ValidationError({
  350. "is_primary": _("No device or virtual machine specified; cannot set as primary IP")
  351. })
  352. if is_oob and not device:
  353. raise forms.ValidationError({
  354. "is_oob": _("No device specified; cannot set as out-of-band IP")
  355. })
  356. if is_oob and virtual_machine:
  357. raise forms.ValidationError({
  358. "is_oob": _("Cannot set out-of-band IP for virtual machines")
  359. })
  360. if is_primary and not interface:
  361. raise forms.ValidationError({
  362. "is_primary": _("No interface specified; cannot set as primary IP")
  363. })
  364. if is_oob and not interface:
  365. raise forms.ValidationError({
  366. "is_oob": _("No interface specified; cannot set as out-of-band IP")
  367. })
  368. def save(self, *args, **kwargs):
  369. # Set interface assignment
  370. if self.cleaned_data.get('interface'):
  371. self.instance.assigned_object = self.cleaned_data['interface']
  372. if self.cleaned_data.get('fhrp_group'):
  373. self.instance.assigned_object = self.cleaned_data['fhrp_group']
  374. ipaddress = super().save(*args, **kwargs)
  375. # Set as primary for device/VM
  376. if self.cleaned_data.get('is_primary') is not None:
  377. parent = self.cleaned_data.get('device') or self.cleaned_data.get('virtual_machine')
  378. if self.instance.address.version == 4:
  379. parent.primary_ip4 = ipaddress if self.cleaned_data.get('is_primary') else None
  380. elif self.instance.address.version == 6:
  381. parent.primary_ip6 = ipaddress if self.cleaned_data.get('is_primary') else None
  382. parent.save()
  383. # Set as OOB for device
  384. if self.cleaned_data.get('is_oob') is not None:
  385. parent = self.cleaned_data.get('device')
  386. parent.oob_ip = ipaddress if self.cleaned_data.get('is_oob') else None
  387. parent.save()
  388. return ipaddress
  389. class FHRPGroupImportForm(NetBoxModelImportForm):
  390. protocol = CSVChoiceField(
  391. label=_('Protocol'),
  392. choices=FHRPGroupProtocolChoices
  393. )
  394. auth_type = CSVChoiceField(
  395. label=_('Auth type'),
  396. choices=FHRPGroupAuthTypeChoices,
  397. required=False
  398. )
  399. class Meta:
  400. model = FHRPGroup
  401. fields = ('protocol', 'group_id', 'auth_type', 'auth_key', 'name', 'description', 'comments', 'tags')
  402. class VLANGroupImportForm(NetBoxModelImportForm):
  403. slug = SlugField()
  404. scope_type = CSVContentTypeField(
  405. queryset=ContentType.objects.filter(model__in=VLANGROUP_SCOPE_TYPES),
  406. required=False,
  407. label=_('Scope type (app & model)')
  408. )
  409. vid_ranges = NumericRangeArrayField(
  410. required=False
  411. )
  412. tenant = CSVModelChoiceField(
  413. label=_('Tenant'),
  414. queryset=Tenant.objects.all(),
  415. required=False,
  416. to_field_name='name',
  417. help_text=_('Assigned tenant')
  418. )
  419. class Meta:
  420. model = VLANGroup
  421. fields = ('name', 'slug', 'scope_type', 'scope_id', 'vid_ranges', 'tenant', 'description', 'tags')
  422. labels = {
  423. 'scope_id': 'Scope ID',
  424. }
  425. class VLANImportForm(NetBoxModelImportForm):
  426. site = CSVModelChoiceField(
  427. label=_('Site'),
  428. queryset=Site.objects.all(),
  429. required=False,
  430. to_field_name='name',
  431. help_text=_('Assigned site')
  432. )
  433. group = CSVModelChoiceField(
  434. label=_('Group'),
  435. queryset=VLANGroup.objects.all(),
  436. required=False,
  437. to_field_name='name',
  438. help_text=_('Assigned VLAN group')
  439. )
  440. tenant = CSVModelChoiceField(
  441. label=_('Tenant'),
  442. queryset=Tenant.objects.all(),
  443. to_field_name='name',
  444. required=False,
  445. help_text=_('Assigned tenant')
  446. )
  447. status = CSVChoiceField(
  448. label=_('Status'),
  449. choices=VLANStatusChoices,
  450. help_text=_('Operational status')
  451. )
  452. role = CSVModelChoiceField(
  453. label=_('Role'),
  454. queryset=Role.objects.all(),
  455. required=False,
  456. to_field_name='name',
  457. help_text=_('Functional role')
  458. )
  459. qinq_role = CSVChoiceField(
  460. label=_('Q-in-Q role'),
  461. choices=VLANQinQRoleChoices,
  462. required=False,
  463. help_text=_('Operational status')
  464. )
  465. qinq_svlan = CSVModelChoiceField(
  466. label=_('Q-in-Q SVLAN'),
  467. queryset=VLAN.objects.all(),
  468. required=False,
  469. to_field_name='vid',
  470. help_text=_("Service VLAN (for Q-in-Q/802.1ad customer VLANs)")
  471. )
  472. class Meta:
  473. model = VLAN
  474. fields = (
  475. 'site', 'group', 'vid', 'name', 'tenant', 'status', 'role', 'description', 'qinq_role', 'qinq_svlan',
  476. 'comments', 'tags',
  477. )
  478. class VLANTranslationPolicyImportForm(NetBoxModelImportForm):
  479. class Meta:
  480. model = VLANTranslationPolicy
  481. fields = ('name', 'description', 'tags')
  482. class VLANTranslationRuleImportForm(NetBoxModelImportForm):
  483. policy = CSVModelChoiceField(
  484. label=_('Policy'),
  485. queryset=VLANTranslationPolicy.objects.all(),
  486. to_field_name='name',
  487. help_text=_('VLAN translation policy')
  488. )
  489. class Meta:
  490. model = VLANTranslationRule
  491. fields = ('policy', 'local_vid', 'remote_vid')
  492. class ServiceTemplateImportForm(NetBoxModelImportForm):
  493. protocol = CSVChoiceField(
  494. label=_('Protocol'),
  495. choices=ServiceProtocolChoices,
  496. help_text=_('IP protocol')
  497. )
  498. class Meta:
  499. model = ServiceTemplate
  500. fields = ('name', 'protocol', 'ports', 'description', 'comments', 'tags')
  501. class ServiceImportForm(NetBoxModelImportForm):
  502. parent_object_type = CSVContentTypeField(
  503. queryset=ContentType.objects.filter(SERVICE_ASSIGNMENT_MODELS),
  504. required=True,
  505. label=_('Parent type (app & model)')
  506. )
  507. parent = CSVModelChoiceField(
  508. label=_('Parent'),
  509. queryset=Device.objects.all(),
  510. required=False,
  511. to_field_name='name',
  512. help_text=_('Parent object name')
  513. )
  514. parent_object_id = forms.IntegerField(
  515. required=False,
  516. help_text=_('Parent object ID'),
  517. )
  518. protocol = CSVChoiceField(
  519. label=_('Protocol'),
  520. choices=ServiceProtocolChoices,
  521. help_text=_('IP protocol')
  522. )
  523. ipaddresses = CSVModelMultipleChoiceField(
  524. queryset=IPAddress.objects.all(),
  525. required=False,
  526. to_field_name='address',
  527. help_text=_('IP Address'),
  528. )
  529. class Meta:
  530. model = Service
  531. fields = (
  532. 'ipaddresses', 'name', 'protocol', 'ports', 'description', 'comments', 'tags',
  533. )
  534. def __init__(self, data=None, *args, **kwargs):
  535. super().__init__(data, *args, **kwargs)
  536. # Limit parent queryset by assigned parent object type
  537. if data:
  538. match data.get('parent_object_type'):
  539. case 'dcim.device':
  540. self.fields['parent'].queryset = Device.objects.all()
  541. case 'ipam.fhrpgroup':
  542. self.fields['parent'].queryset = FHRPGroup.objects.all()
  543. case 'virtualization.virtualmachine':
  544. self.fields['parent'].queryset = VirtualMachine.objects.all()
  545. def save(self, *args, **kwargs):
  546. if (parent := self.cleaned_data.get('parent')):
  547. self.instance.parent = parent
  548. return super().save(*args, **kwargs)
  549. def clean(self):
  550. super().clean()
  551. if (parent_ct := self.cleaned_data.get('parent_object_type')):
  552. if (parent := self.cleaned_data.get('parent')):
  553. self.cleaned_data['parent_object_id'] = parent.pk
  554. elif (parent_id := self.cleaned_data.get('parent_object_id')):
  555. parent = parent_ct.model_class().objects.filter(id=parent_id).first()
  556. self.cleaned_data['parent'] = parent
  557. else:
  558. # If a parent object type is passed and we've made it here, then raise a validation
  559. # error since an associated parent object or parent object id has not been passed
  560. raise forms.ValidationError(
  561. _("One of parent or parent_object_id must be included with parent_object_type")
  562. )
  563. # making sure parent is defined. In cases where an import is resulting in an update, the
  564. # import data might not include the parent object and so the above logic might not be
  565. # triggered
  566. parent = self.cleaned_data.get('parent')
  567. for ip_address in self.cleaned_data.get('ipaddresses', []):
  568. if not (assigned := ip_address.assigned_object) or ( # no assigned object
  569. (isinstance(parent, FHRPGroup) and assigned != parent) # assigned to FHRPGroup
  570. and getattr(assigned, 'parent_object') != parent # assigned to [VM]Interface
  571. ):
  572. raise forms.ValidationError(
  573. _("{ip} is not assigned to this parent.").format(ip=ip_address)
  574. )
  575. return self.cleaned_data