bulk_import.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644
  1. from django import forms
  2. from django.contrib.contenttypes.models import ContentType
  3. from django.utils.translation import gettext_lazy as _
  4. from dcim.models import Device, Interface, Site
  5. from dcim.forms.mixins import ScopedImportForm
  6. from ipam.choices import *
  7. from ipam.constants import *
  8. from ipam.models import *
  9. from netbox.forms import NetBoxModelImportForm
  10. from tenancy.models import Tenant
  11. from utilities.forms.fields import (
  12. CSVChoiceField, CSVContentTypeField, CSVModelChoiceField, CSVModelMultipleChoiceField, SlugField,
  13. NumericRangeArrayField,
  14. )
  15. from virtualization.models import VirtualMachine, VMInterface
  16. __all__ = (
  17. 'AggregateImportForm',
  18. 'ASNImportForm',
  19. 'ASNRangeImportForm',
  20. 'FHRPGroupImportForm',
  21. 'IPAddressImportForm',
  22. 'IPRangeImportForm',
  23. 'PrefixImportForm',
  24. 'RIRImportForm',
  25. 'RoleImportForm',
  26. 'RouteTargetImportForm',
  27. 'ServiceImportForm',
  28. 'ServiceTemplateImportForm',
  29. 'VLANImportForm',
  30. 'VLANGroupImportForm',
  31. 'VLANTranslationPolicyImportForm',
  32. 'VLANTranslationRuleImportForm',
  33. 'VRFImportForm',
  34. )
  35. class VRFImportForm(NetBoxModelImportForm):
  36. tenant = CSVModelChoiceField(
  37. label=_('Tenant'),
  38. queryset=Tenant.objects.all(),
  39. required=False,
  40. to_field_name='name',
  41. help_text=_('Assigned tenant')
  42. )
  43. import_targets = CSVModelMultipleChoiceField(
  44. queryset=RouteTarget.objects.all(),
  45. required=False,
  46. to_field_name='name',
  47. help_text=_('Import route targets')
  48. )
  49. export_targets = CSVModelMultipleChoiceField(
  50. queryset=RouteTarget.objects.all(),
  51. required=False,
  52. to_field_name='name',
  53. help_text=_('Export route targets')
  54. )
  55. class Meta:
  56. model = VRF
  57. fields = (
  58. 'name', 'rd', 'tenant', 'enforce_unique', 'description', 'import_targets', 'export_targets', 'comments',
  59. 'tags',
  60. )
  61. class RouteTargetImportForm(NetBoxModelImportForm):
  62. tenant = CSVModelChoiceField(
  63. label=_('Tenant'),
  64. queryset=Tenant.objects.all(),
  65. required=False,
  66. to_field_name='name',
  67. help_text=_('Assigned tenant')
  68. )
  69. class Meta:
  70. model = RouteTarget
  71. fields = ('name', 'tenant', 'description', 'comments', 'tags')
  72. class RIRImportForm(NetBoxModelImportForm):
  73. slug = SlugField()
  74. class Meta:
  75. model = RIR
  76. fields = ('name', 'slug', 'is_private', 'description', 'tags')
  77. class AggregateImportForm(NetBoxModelImportForm):
  78. rir = CSVModelChoiceField(
  79. label=_('RIR'),
  80. queryset=RIR.objects.all(),
  81. to_field_name='name',
  82. help_text=_('Assigned RIR')
  83. )
  84. tenant = CSVModelChoiceField(
  85. label=_('Tenant'),
  86. queryset=Tenant.objects.all(),
  87. required=False,
  88. to_field_name='name',
  89. help_text=_('Assigned tenant')
  90. )
  91. class Meta:
  92. model = Aggregate
  93. fields = ('prefix', 'rir', 'tenant', 'date_added', 'description', 'comments', 'tags')
  94. class ASNRangeImportForm(NetBoxModelImportForm):
  95. rir = CSVModelChoiceField(
  96. label=_('RIR'),
  97. queryset=RIR.objects.all(),
  98. to_field_name='name',
  99. help_text=_('Assigned RIR')
  100. )
  101. tenant = CSVModelChoiceField(
  102. label=_('Tenant'),
  103. queryset=Tenant.objects.all(),
  104. required=False,
  105. to_field_name='name',
  106. help_text=_('Assigned tenant')
  107. )
  108. class Meta:
  109. model = ASNRange
  110. fields = ('name', 'slug', 'rir', 'start', 'end', 'tenant', 'description', 'tags')
  111. class ASNImportForm(NetBoxModelImportForm):
  112. rir = CSVModelChoiceField(
  113. label=_('RIR'),
  114. queryset=RIR.objects.all(),
  115. to_field_name='name',
  116. help_text=_('Assigned RIR')
  117. )
  118. tenant = CSVModelChoiceField(
  119. label=_('Tenant'),
  120. queryset=Tenant.objects.all(),
  121. required=False,
  122. to_field_name='name',
  123. help_text=_('Assigned tenant')
  124. )
  125. class Meta:
  126. model = ASN
  127. fields = ('asn', 'rir', 'tenant', 'description', 'comments', 'tags')
  128. class RoleImportForm(NetBoxModelImportForm):
  129. slug = SlugField()
  130. class Meta:
  131. model = Role
  132. fields = ('name', 'slug', 'weight', 'description', 'tags')
  133. class PrefixImportForm(ScopedImportForm, NetBoxModelImportForm):
  134. vrf = CSVModelChoiceField(
  135. label=_('VRF'),
  136. queryset=VRF.objects.all(),
  137. to_field_name='name',
  138. required=False,
  139. help_text=_('Assigned VRF')
  140. )
  141. tenant = CSVModelChoiceField(
  142. label=_('Tenant'),
  143. queryset=Tenant.objects.all(),
  144. required=False,
  145. to_field_name='name',
  146. help_text=_('Assigned tenant')
  147. )
  148. vlan_group = CSVModelChoiceField(
  149. label=_('VLAN group'),
  150. queryset=VLANGroup.objects.all(),
  151. required=False,
  152. to_field_name='name',
  153. help_text=_("VLAN's group (if any)")
  154. )
  155. vlan_site = CSVModelChoiceField(
  156. label=_('VLAN Site'),
  157. queryset=Site.objects.all(),
  158. required=False,
  159. to_field_name='name',
  160. help_text=_("VLAN's site (if any)")
  161. )
  162. vlan = CSVModelChoiceField(
  163. label=_('VLAN'),
  164. queryset=VLAN.objects.all(),
  165. required=False,
  166. to_field_name='vid',
  167. help_text=_("Assigned VLAN")
  168. )
  169. status = CSVChoiceField(
  170. label=_('Status'),
  171. choices=PrefixStatusChoices,
  172. help_text=_('Operational status')
  173. )
  174. role = CSVModelChoiceField(
  175. label=_('Role'),
  176. queryset=Role.objects.all(),
  177. required=False,
  178. to_field_name='name',
  179. help_text=_('Functional role')
  180. )
  181. class Meta:
  182. model = Prefix
  183. fields = (
  184. 'prefix', 'vrf', 'tenant', 'vlan_group', 'vlan_site', 'vlan', 'status', 'role', 'scope_type', 'scope_id',
  185. 'is_pool', 'mark_utilized', 'description', 'comments', 'tags',
  186. )
  187. labels = {
  188. 'scope_id': _('Scope ID'),
  189. }
  190. def __init__(self, data=None, *args, **kwargs):
  191. super().__init__(data, *args, **kwargs)
  192. if not data:
  193. return
  194. vlan_site = data.get('vlan_site')
  195. vlan_group = data.get('vlan_group')
  196. # Limit VLAN queryset by assigned site and/or group (if specified)
  197. query = Q()
  198. if vlan_site:
  199. query |= Q(**{
  200. f"site__{self.fields['vlan_site'].to_field_name}": vlan_site
  201. })
  202. # Don't Forget to include VLANs without a site in the filter
  203. query |= Q(**{
  204. f"site__{self.fields['vlan_site'].to_field_name}__isnull": True
  205. })
  206. if vlan_group:
  207. query &= Q(**{
  208. f"group__{self.fields['vlan_group'].to_field_name}": vlan_group
  209. })
  210. queryset = self.fields['vlan'].queryset.filter(query)
  211. self.fields['vlan'].queryset = queryset
  212. class IPRangeImportForm(NetBoxModelImportForm):
  213. vrf = CSVModelChoiceField(
  214. label=_('VRF'),
  215. queryset=VRF.objects.all(),
  216. to_field_name='name',
  217. required=False,
  218. help_text=_('Assigned VRF')
  219. )
  220. tenant = CSVModelChoiceField(
  221. label=_('Tenant'),
  222. queryset=Tenant.objects.all(),
  223. required=False,
  224. to_field_name='name',
  225. help_text=_('Assigned tenant')
  226. )
  227. status = CSVChoiceField(
  228. label=_('Status'),
  229. choices=IPRangeStatusChoices,
  230. help_text=_('Operational status')
  231. )
  232. role = CSVModelChoiceField(
  233. label=_('Role'),
  234. queryset=Role.objects.all(),
  235. required=False,
  236. to_field_name='name',
  237. help_text=_('Functional role')
  238. )
  239. class Meta:
  240. model = IPRange
  241. fields = (
  242. 'start_address', 'end_address', 'vrf', 'tenant', 'status', 'role', 'mark_populated', 'mark_utilized',
  243. 'description', 'comments', 'tags',
  244. )
  245. class IPAddressImportForm(NetBoxModelImportForm):
  246. vrf = CSVModelChoiceField(
  247. label=_('VRF'),
  248. queryset=VRF.objects.all(),
  249. to_field_name='name',
  250. required=False,
  251. help_text=_('Assigned VRF')
  252. )
  253. tenant = CSVModelChoiceField(
  254. label=_('Tenant'),
  255. queryset=Tenant.objects.all(),
  256. to_field_name='name',
  257. required=False,
  258. help_text=_('Assigned tenant')
  259. )
  260. status = CSVChoiceField(
  261. label=_('Status'),
  262. choices=IPAddressStatusChoices,
  263. help_text=_('Operational status')
  264. )
  265. role = CSVChoiceField(
  266. label=_('Role'),
  267. choices=IPAddressRoleChoices,
  268. required=False,
  269. help_text=_('Functional role')
  270. )
  271. device = CSVModelChoiceField(
  272. label=_('Device'),
  273. queryset=Device.objects.all(),
  274. required=False,
  275. to_field_name='name',
  276. help_text=_('Parent device of assigned interface (if any)')
  277. )
  278. virtual_machine = CSVModelChoiceField(
  279. label=_('Virtual machine'),
  280. queryset=VirtualMachine.objects.all(),
  281. required=False,
  282. to_field_name='name',
  283. help_text=_('Parent VM of assigned interface (if any)')
  284. )
  285. interface = CSVModelChoiceField(
  286. label=_('Interface'),
  287. queryset=Interface.objects.none(), # Can also refer to VMInterface
  288. required=False,
  289. to_field_name='name',
  290. help_text=_('Assigned interface')
  291. )
  292. fhrp_group = CSVModelChoiceField(
  293. label=_('FHRP Group'),
  294. queryset=FHRPGroup.objects.all(),
  295. required=False,
  296. to_field_name='name',
  297. help_text=_('Assigned FHRP Group name')
  298. )
  299. is_primary = forms.BooleanField(
  300. label=_('Is primary'),
  301. help_text=_('Make this the primary IP for the assigned device'),
  302. required=False
  303. )
  304. is_oob = forms.BooleanField(
  305. label=_('Is out-of-band'),
  306. help_text=_('Designate this as the out-of-band IP address for the assigned device'),
  307. required=False
  308. )
  309. class Meta:
  310. model = IPAddress
  311. fields = [
  312. 'address', 'vrf', 'tenant', 'status', 'role', 'device', 'virtual_machine', 'interface', 'fhrp_group',
  313. 'is_primary', 'is_oob', 'dns_name', 'description', 'comments', 'tags',
  314. ]
  315. def __init__(self, data=None, *args, **kwargs):
  316. super().__init__(data, *args, **kwargs)
  317. if data:
  318. # Limit interface queryset by assigned device
  319. if data.get('device'):
  320. self.fields['interface'].queryset = Interface.objects.filter(
  321. **{f"device__{self.fields['device'].to_field_name}": data['device']}
  322. )
  323. # Limit interface queryset by assigned VM
  324. elif data.get('virtual_machine'):
  325. self.fields['interface'].queryset = VMInterface.objects.filter(
  326. **{f"virtual_machine__{self.fields['virtual_machine'].to_field_name}": data['virtual_machine']}
  327. )
  328. def clean(self):
  329. super().clean()
  330. device = self.cleaned_data.get('device')
  331. virtual_machine = self.cleaned_data.get('virtual_machine')
  332. interface = self.cleaned_data.get('interface')
  333. is_primary = self.cleaned_data.get('is_primary')
  334. is_oob = self.cleaned_data.get('is_oob')
  335. # Validate is_primary and is_oob
  336. if is_primary and not device and not virtual_machine:
  337. raise forms.ValidationError({
  338. "is_primary": _("No device or virtual machine specified; cannot set as primary IP")
  339. })
  340. if is_oob and not device:
  341. raise forms.ValidationError({
  342. "is_oob": _("No device specified; cannot set as out-of-band IP")
  343. })
  344. if is_oob and virtual_machine:
  345. raise forms.ValidationError({
  346. "is_oob": _("Cannot set out-of-band IP for virtual machines")
  347. })
  348. if is_primary and not interface:
  349. raise forms.ValidationError({
  350. "is_primary": _("No interface specified; cannot set as primary IP")
  351. })
  352. if is_oob and not interface:
  353. raise forms.ValidationError({
  354. "is_oob": _("No interface specified; cannot set as out-of-band IP")
  355. })
  356. def save(self, *args, **kwargs):
  357. # Set interface assignment
  358. if self.cleaned_data.get('interface'):
  359. self.instance.assigned_object = self.cleaned_data['interface']
  360. if self.cleaned_data.get('fhrp_group'):
  361. self.instance.assigned_object = self.cleaned_data['fhrp_group']
  362. ipaddress = super().save(*args, **kwargs)
  363. # Set as primary for device/VM
  364. if self.cleaned_data.get('is_primary'):
  365. parent = self.cleaned_data.get('device') or self.cleaned_data.get('virtual_machine')
  366. if self.instance.address.version == 4:
  367. parent.primary_ip4 = ipaddress
  368. elif self.instance.address.version == 6:
  369. parent.primary_ip6 = ipaddress
  370. parent.save()
  371. # Set as OOB for device
  372. if self.cleaned_data.get('is_oob'):
  373. parent = self.cleaned_data.get('device')
  374. parent.oob_ip = ipaddress
  375. parent.save()
  376. return ipaddress
  377. class FHRPGroupImportForm(NetBoxModelImportForm):
  378. protocol = CSVChoiceField(
  379. label=_('Protocol'),
  380. choices=FHRPGroupProtocolChoices
  381. )
  382. auth_type = CSVChoiceField(
  383. label=_('Auth type'),
  384. choices=FHRPGroupAuthTypeChoices,
  385. required=False
  386. )
  387. class Meta:
  388. model = FHRPGroup
  389. fields = ('protocol', 'group_id', 'auth_type', 'auth_key', 'name', 'description', 'comments', 'tags')
  390. class VLANGroupImportForm(NetBoxModelImportForm):
  391. slug = SlugField()
  392. scope_type = CSVContentTypeField(
  393. queryset=ContentType.objects.filter(model__in=VLANGROUP_SCOPE_TYPES),
  394. required=False,
  395. label=_('Scope type (app & model)')
  396. )
  397. vid_ranges = NumericRangeArrayField(
  398. required=False
  399. )
  400. tenant = CSVModelChoiceField(
  401. label=_('Tenant'),
  402. queryset=Tenant.objects.all(),
  403. required=False,
  404. to_field_name='name',
  405. help_text=_('Assigned tenant')
  406. )
  407. class Meta:
  408. model = VLANGroup
  409. fields = ('name', 'slug', 'scope_type', 'scope_id', 'vid_ranges', 'tenant', 'description', 'tags')
  410. labels = {
  411. 'scope_id': 'Scope ID',
  412. }
  413. class VLANImportForm(NetBoxModelImportForm):
  414. site = CSVModelChoiceField(
  415. label=_('Site'),
  416. queryset=Site.objects.all(),
  417. required=False,
  418. to_field_name='name',
  419. help_text=_('Assigned site')
  420. )
  421. group = CSVModelChoiceField(
  422. label=_('Group'),
  423. queryset=VLANGroup.objects.all(),
  424. required=False,
  425. to_field_name='name',
  426. help_text=_('Assigned VLAN group')
  427. )
  428. tenant = CSVModelChoiceField(
  429. label=_('Tenant'),
  430. queryset=Tenant.objects.all(),
  431. to_field_name='name',
  432. required=False,
  433. help_text=_('Assigned tenant')
  434. )
  435. status = CSVChoiceField(
  436. label=_('Status'),
  437. choices=VLANStatusChoices,
  438. help_text=_('Operational status')
  439. )
  440. role = CSVModelChoiceField(
  441. label=_('Role'),
  442. queryset=Role.objects.all(),
  443. required=False,
  444. to_field_name='name',
  445. help_text=_('Functional role')
  446. )
  447. qinq_role = CSVChoiceField(
  448. label=_('Q-in-Q role'),
  449. choices=VLANQinQRoleChoices,
  450. required=False,
  451. help_text=_('Operational status')
  452. )
  453. qinq_svlan = CSVModelChoiceField(
  454. label=_('Q-in-Q SVLAN'),
  455. queryset=VLAN.objects.all(),
  456. required=False,
  457. to_field_name='vid',
  458. help_text=_("Service VLAN (for Q-in-Q/802.1ad customer VLANs)")
  459. )
  460. class Meta:
  461. model = VLAN
  462. fields = (
  463. 'site', 'group', 'vid', 'name', 'tenant', 'status', 'role', 'description', 'qinq_role', 'qinq_svlan',
  464. 'comments', 'tags',
  465. )
  466. class VLANTranslationPolicyImportForm(NetBoxModelImportForm):
  467. class Meta:
  468. model = VLANTranslationPolicy
  469. fields = ('name', 'description', 'tags')
  470. class VLANTranslationRuleImportForm(NetBoxModelImportForm):
  471. policy = CSVModelChoiceField(
  472. label=_('Policy'),
  473. queryset=VLANTranslationPolicy.objects.all(),
  474. to_field_name='name',
  475. help_text=_('VLAN translation policy')
  476. )
  477. class Meta:
  478. model = VLANTranslationRule
  479. fields = ('policy', 'local_vid', 'remote_vid')
  480. class ServiceTemplateImportForm(NetBoxModelImportForm):
  481. protocol = CSVChoiceField(
  482. label=_('Protocol'),
  483. choices=ServiceProtocolChoices,
  484. help_text=_('IP protocol')
  485. )
  486. class Meta:
  487. model = ServiceTemplate
  488. fields = ('name', 'protocol', 'ports', 'description', 'comments', 'tags')
  489. class ServiceImportForm(NetBoxModelImportForm):
  490. parent_object_type = CSVContentTypeField(
  491. queryset=ContentType.objects.filter(SERVICE_ASSIGNMENT_MODELS),
  492. required=True,
  493. label=_('Parent type (app & model)')
  494. )
  495. parent = CSVModelChoiceField(
  496. label=_('Parent'),
  497. queryset=Device.objects.all(),
  498. required=False,
  499. to_field_name='name',
  500. help_text=_('Parent object name')
  501. )
  502. parent_object_id = forms.IntegerField(
  503. required=False,
  504. help_text=_('Parent object ID'),
  505. )
  506. protocol = CSVChoiceField(
  507. label=_('Protocol'),
  508. choices=ServiceProtocolChoices,
  509. help_text=_('IP protocol')
  510. )
  511. ipaddresses = CSVModelMultipleChoiceField(
  512. queryset=IPAddress.objects.all(),
  513. required=False,
  514. to_field_name='address',
  515. help_text=_('IP Address'),
  516. )
  517. class Meta:
  518. model = Service
  519. fields = (
  520. 'ipaddresses', 'name', 'protocol', 'ports', 'description', 'comments', 'tags',
  521. )
  522. def __init__(self, data=None, *args, **kwargs):
  523. super().__init__(data, *args, **kwargs)
  524. # Limit parent queryset by assigned parent object type
  525. if data:
  526. match data.get('parent_object_type'):
  527. case 'dcim.device':
  528. self.fields['parent'].queryset = Device.objects.all()
  529. case 'ipam.fhrpgroup':
  530. self.fields['parent'].queryset = FHRPGroup.objects.all()
  531. case 'virtualization.virtualmachine':
  532. self.fields['parent'].queryset = VirtualMachine.objects.all()
  533. def save(self, *args, **kwargs):
  534. if (parent := self.cleaned_data.get('parent')):
  535. self.instance.parent = parent
  536. return super().save(*args, **kwargs)
  537. def clean(self):
  538. super().clean()
  539. if (parent_ct := self.cleaned_data.get('parent_object_type')):
  540. if (parent := self.cleaned_data.get('parent')):
  541. self.cleaned_data['parent_object_id'] = parent.pk
  542. elif (parent_id := self.cleaned_data.get('parent_object_id')):
  543. parent = parent_ct.model_class().objects.filter(id=parent_id).first()
  544. self.cleaned_data['parent'] = parent
  545. else:
  546. # If a parent object type is passed and we've made it here, then raise a validation
  547. # error since an associated parent object or parent object id has not been passed
  548. raise forms.ValidationError(
  549. _("One of parent or parent_object_id must be included with parent_object_type")
  550. )
  551. # making sure parent is defined. In cases where an import is resulting in an update, the
  552. # import data might not include the parent object and so the above logic might not be
  553. # triggered
  554. parent = self.cleaned_data.get('parent')
  555. for ip_address in self.cleaned_data.get('ipaddresses', []):
  556. if not (assigned := ip_address.assigned_object) or ( # no assigned object
  557. (isinstance(parent, FHRPGroup) and assigned != parent) # assigned to FHRPGroup
  558. and getattr(assigned, 'parent_object') != parent # assigned to [VM]Interface
  559. ):
  560. raise forms.ValidationError(
  561. _("{ip} is not assigned to this parent.").format(ip=ip_address)
  562. )
  563. return self.cleaned_data