bulk_import.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510
  1. from django import forms
  2. from django.contrib.contenttypes.models import ContentType
  3. from django.core.exceptions import ValidationError
  4. from dcim.models import Device, Interface, Site
  5. from ipam.choices import *
  6. from ipam.constants import *
  7. from ipam.models import *
  8. from netbox.forms import NetBoxModelCSVForm
  9. from tenancy.models import Tenant
  10. from utilities.forms import CSVChoiceField, CSVContentTypeField, CSVModelChoiceField, SlugField
  11. from virtualization.models import VirtualMachine, VMInterface
  12. __all__ = (
  13. 'AggregateCSVForm',
  14. 'ASNCSVForm',
  15. 'FHRPGroupCSVForm',
  16. 'IPAddressCSVForm',
  17. 'IPRangeCSVForm',
  18. 'L2VPNCSVForm',
  19. 'L2VPNTerminationCSVForm',
  20. 'PrefixCSVForm',
  21. 'RIRCSVForm',
  22. 'RoleCSVForm',
  23. 'RouteTargetCSVForm',
  24. 'ServiceCSVForm',
  25. 'ServiceTemplateCSVForm',
  26. 'VLANCSVForm',
  27. 'VLANGroupCSVForm',
  28. 'VRFCSVForm',
  29. )
  30. class VRFCSVForm(NetBoxModelCSVForm):
  31. tenant = CSVModelChoiceField(
  32. queryset=Tenant.objects.all(),
  33. required=False,
  34. to_field_name='name',
  35. help_text='Assigned tenant'
  36. )
  37. class Meta:
  38. model = VRF
  39. fields = ('name', 'rd', 'tenant', 'enforce_unique', 'description', 'comments')
  40. class RouteTargetCSVForm(NetBoxModelCSVForm):
  41. tenant = CSVModelChoiceField(
  42. queryset=Tenant.objects.all(),
  43. required=False,
  44. to_field_name='name',
  45. help_text='Assigned tenant'
  46. )
  47. class Meta:
  48. model = RouteTarget
  49. fields = ('name', 'tenant', 'description', 'comments')
  50. class RIRCSVForm(NetBoxModelCSVForm):
  51. slug = SlugField()
  52. class Meta:
  53. model = RIR
  54. fields = ('name', 'slug', 'is_private', 'description')
  55. help_texts = {
  56. 'name': 'RIR name',
  57. }
  58. class AggregateCSVForm(NetBoxModelCSVForm):
  59. rir = CSVModelChoiceField(
  60. queryset=RIR.objects.all(),
  61. to_field_name='name',
  62. help_text='Assigned RIR'
  63. )
  64. tenant = CSVModelChoiceField(
  65. queryset=Tenant.objects.all(),
  66. required=False,
  67. to_field_name='name',
  68. help_text='Assigned tenant'
  69. )
  70. class Meta:
  71. model = Aggregate
  72. fields = ('prefix', 'rir', 'tenant', 'date_added', 'description', 'comments')
  73. class ASNCSVForm(NetBoxModelCSVForm):
  74. rir = CSVModelChoiceField(
  75. queryset=RIR.objects.all(),
  76. to_field_name='name',
  77. help_text='Assigned RIR'
  78. )
  79. tenant = CSVModelChoiceField(
  80. queryset=Tenant.objects.all(),
  81. required=False,
  82. to_field_name='name',
  83. help_text='Assigned tenant'
  84. )
  85. class Meta:
  86. model = ASN
  87. fields = ('asn', 'rir', 'tenant', 'description', 'comments')
  88. help_texts = {}
  89. class RoleCSVForm(NetBoxModelCSVForm):
  90. slug = SlugField()
  91. class Meta:
  92. model = Role
  93. fields = ('name', 'slug', 'weight', 'description')
  94. class PrefixCSVForm(NetBoxModelCSVForm):
  95. vrf = CSVModelChoiceField(
  96. queryset=VRF.objects.all(),
  97. to_field_name='name',
  98. required=False,
  99. help_text='Assigned VRF'
  100. )
  101. tenant = CSVModelChoiceField(
  102. queryset=Tenant.objects.all(),
  103. required=False,
  104. to_field_name='name',
  105. help_text='Assigned tenant'
  106. )
  107. site = CSVModelChoiceField(
  108. queryset=Site.objects.all(),
  109. required=False,
  110. to_field_name='name',
  111. help_text='Assigned site'
  112. )
  113. vlan_group = CSVModelChoiceField(
  114. queryset=VLANGroup.objects.all(),
  115. required=False,
  116. to_field_name='name',
  117. help_text="VLAN's group (if any)"
  118. )
  119. vlan = CSVModelChoiceField(
  120. queryset=VLAN.objects.all(),
  121. required=False,
  122. to_field_name='vid',
  123. help_text="Assigned VLAN"
  124. )
  125. status = CSVChoiceField(
  126. choices=PrefixStatusChoices,
  127. help_text='Operational status'
  128. )
  129. role = CSVModelChoiceField(
  130. queryset=Role.objects.all(),
  131. required=False,
  132. to_field_name='name',
  133. help_text='Functional role'
  134. )
  135. class Meta:
  136. model = Prefix
  137. fields = (
  138. 'prefix', 'vrf', 'tenant', 'site', 'vlan_group', 'vlan', 'status', 'role', 'is_pool', 'mark_utilized',
  139. 'description', 'comments',
  140. )
  141. def __init__(self, data=None, *args, **kwargs):
  142. super().__init__(data, *args, **kwargs)
  143. if data:
  144. # Limit VLAN queryset by assigned site and/or group (if specified)
  145. params = {}
  146. if data.get('site'):
  147. params[f"site__{self.fields['site'].to_field_name}"] = data.get('site')
  148. if data.get('vlan_group'):
  149. params[f"group__{self.fields['vlan_group'].to_field_name}"] = data.get('vlan_group')
  150. if params:
  151. self.fields['vlan'].queryset = self.fields['vlan'].queryset.filter(**params)
  152. class IPRangeCSVForm(NetBoxModelCSVForm):
  153. vrf = CSVModelChoiceField(
  154. queryset=VRF.objects.all(),
  155. to_field_name='name',
  156. required=False,
  157. help_text='Assigned VRF'
  158. )
  159. tenant = CSVModelChoiceField(
  160. queryset=Tenant.objects.all(),
  161. required=False,
  162. to_field_name='name',
  163. help_text='Assigned tenant'
  164. )
  165. status = CSVChoiceField(
  166. choices=IPRangeStatusChoices,
  167. help_text='Operational status'
  168. )
  169. role = CSVModelChoiceField(
  170. queryset=Role.objects.all(),
  171. required=False,
  172. to_field_name='name',
  173. help_text='Functional role'
  174. )
  175. class Meta:
  176. model = IPRange
  177. fields = (
  178. 'start_address', 'end_address', 'vrf', 'tenant', 'status', 'role', 'description', 'comments',
  179. )
  180. class IPAddressCSVForm(NetBoxModelCSVForm):
  181. vrf = CSVModelChoiceField(
  182. queryset=VRF.objects.all(),
  183. to_field_name='name',
  184. required=False,
  185. help_text='Assigned VRF'
  186. )
  187. tenant = CSVModelChoiceField(
  188. queryset=Tenant.objects.all(),
  189. to_field_name='name',
  190. required=False,
  191. help_text='Assigned tenant'
  192. )
  193. status = CSVChoiceField(
  194. choices=IPAddressStatusChoices,
  195. help_text='Operational status'
  196. )
  197. role = CSVChoiceField(
  198. choices=IPAddressRoleChoices,
  199. required=False,
  200. help_text='Functional role'
  201. )
  202. device = CSVModelChoiceField(
  203. queryset=Device.objects.all(),
  204. required=False,
  205. to_field_name='name',
  206. help_text='Parent device of assigned interface (if any)'
  207. )
  208. virtual_machine = CSVModelChoiceField(
  209. queryset=VirtualMachine.objects.all(),
  210. required=False,
  211. to_field_name='name',
  212. help_text='Parent VM of assigned interface (if any)'
  213. )
  214. interface = CSVModelChoiceField(
  215. queryset=Interface.objects.none(), # Can also refer to VMInterface
  216. required=False,
  217. to_field_name='name',
  218. help_text='Assigned interface'
  219. )
  220. is_primary = forms.BooleanField(
  221. help_text='Make this the primary IP for the assigned device',
  222. required=False
  223. )
  224. class Meta:
  225. model = IPAddress
  226. fields = [
  227. 'address', 'vrf', 'tenant', 'status', 'role', 'device', 'virtual_machine', 'interface', 'is_primary',
  228. 'dns_name', 'description', 'comments',
  229. ]
  230. def __init__(self, data=None, *args, **kwargs):
  231. super().__init__(data, *args, **kwargs)
  232. if data:
  233. # Limit interface queryset by assigned device
  234. if data.get('device'):
  235. self.fields['interface'].queryset = Interface.objects.filter(
  236. **{f"device__{self.fields['device'].to_field_name}": data['device']}
  237. )
  238. # Limit interface queryset by assigned device
  239. elif data.get('virtual_machine'):
  240. self.fields['interface'].queryset = VMInterface.objects.filter(
  241. **{f"virtual_machine__{self.fields['virtual_machine'].to_field_name}": data['virtual_machine']}
  242. )
  243. def clean(self):
  244. super().clean()
  245. device = self.cleaned_data.get('device')
  246. virtual_machine = self.cleaned_data.get('virtual_machine')
  247. interface = self.cleaned_data.get('interface')
  248. is_primary = self.cleaned_data.get('is_primary')
  249. # Validate is_primary
  250. if is_primary and not device and not virtual_machine:
  251. raise forms.ValidationError({
  252. "is_primary": "No device or virtual machine specified; cannot set as primary IP"
  253. })
  254. if is_primary and not interface:
  255. raise forms.ValidationError({
  256. "is_primary": "No interface specified; cannot set as primary IP"
  257. })
  258. def save(self, *args, **kwargs):
  259. # Set interface assignment
  260. if self.cleaned_data.get('interface'):
  261. self.instance.assigned_object = self.cleaned_data['interface']
  262. ipaddress = super().save(*args, **kwargs)
  263. # Set as primary for device/VM
  264. if self.cleaned_data.get('is_primary'):
  265. parent = self.cleaned_data['device'] or self.cleaned_data['virtual_machine']
  266. if self.instance.address.version == 4:
  267. parent.primary_ip4 = ipaddress
  268. elif self.instance.address.version == 6:
  269. parent.primary_ip6 = ipaddress
  270. parent.save()
  271. return ipaddress
  272. class FHRPGroupCSVForm(NetBoxModelCSVForm):
  273. protocol = CSVChoiceField(
  274. choices=FHRPGroupProtocolChoices
  275. )
  276. auth_type = CSVChoiceField(
  277. choices=FHRPGroupAuthTypeChoices,
  278. required=False
  279. )
  280. class Meta:
  281. model = FHRPGroup
  282. fields = ('protocol', 'group_id', 'auth_type', 'auth_key', 'name', 'description', 'comments')
  283. class VLANGroupCSVForm(NetBoxModelCSVForm):
  284. slug = SlugField()
  285. scope_type = CSVContentTypeField(
  286. queryset=ContentType.objects.filter(model__in=VLANGROUP_SCOPE_TYPES),
  287. required=False,
  288. label='Scope type (app & model)'
  289. )
  290. min_vid = forms.IntegerField(
  291. min_value=VLAN_VID_MIN,
  292. max_value=VLAN_VID_MAX,
  293. required=False,
  294. label=f'Minimum child VLAN VID (default: {VLAN_VID_MIN})'
  295. )
  296. max_vid = forms.IntegerField(
  297. min_value=VLAN_VID_MIN,
  298. max_value=VLAN_VID_MAX,
  299. required=False,
  300. label=f'Maximum child VLAN VID (default: {VLAN_VID_MIN})'
  301. )
  302. class Meta:
  303. model = VLANGroup
  304. fields = ('name', 'slug', 'scope_type', 'scope_id', 'min_vid', 'max_vid', 'description')
  305. labels = {
  306. 'scope_id': 'Scope ID',
  307. }
  308. class VLANCSVForm(NetBoxModelCSVForm):
  309. site = CSVModelChoiceField(
  310. queryset=Site.objects.all(),
  311. required=False,
  312. to_field_name='name',
  313. help_text='Assigned site'
  314. )
  315. group = CSVModelChoiceField(
  316. queryset=VLANGroup.objects.all(),
  317. required=False,
  318. to_field_name='name',
  319. help_text='Assigned VLAN group'
  320. )
  321. tenant = CSVModelChoiceField(
  322. queryset=Tenant.objects.all(),
  323. to_field_name='name',
  324. required=False,
  325. help_text='Assigned tenant'
  326. )
  327. status = CSVChoiceField(
  328. choices=VLANStatusChoices,
  329. help_text='Operational status'
  330. )
  331. role = CSVModelChoiceField(
  332. queryset=Role.objects.all(),
  333. required=False,
  334. to_field_name='name',
  335. help_text='Functional role'
  336. )
  337. class Meta:
  338. model = VLAN
  339. fields = ('site', 'group', 'vid', 'name', 'tenant', 'status', 'role', 'description', 'comments')
  340. help_texts = {
  341. 'vid': 'Numeric VLAN ID (1-4094)',
  342. 'name': 'VLAN name',
  343. }
  344. class ServiceTemplateCSVForm(NetBoxModelCSVForm):
  345. protocol = CSVChoiceField(
  346. choices=ServiceProtocolChoices,
  347. help_text='IP protocol'
  348. )
  349. class Meta:
  350. model = ServiceTemplate
  351. fields = ('name', 'protocol', 'ports', 'description', 'comments')
  352. class ServiceCSVForm(NetBoxModelCSVForm):
  353. device = CSVModelChoiceField(
  354. queryset=Device.objects.all(),
  355. required=False,
  356. to_field_name='name',
  357. help_text='Required if not assigned to a VM'
  358. )
  359. virtual_machine = CSVModelChoiceField(
  360. queryset=VirtualMachine.objects.all(),
  361. required=False,
  362. to_field_name='name',
  363. help_text='Required if not assigned to a device'
  364. )
  365. protocol = CSVChoiceField(
  366. choices=ServiceProtocolChoices,
  367. help_text='IP protocol'
  368. )
  369. class Meta:
  370. model = Service
  371. fields = ('device', 'virtual_machine', 'name', 'protocol', 'ports', 'description', 'comments')
  372. class L2VPNCSVForm(NetBoxModelCSVForm):
  373. tenant = CSVModelChoiceField(
  374. queryset=Tenant.objects.all(),
  375. required=False,
  376. to_field_name='name',
  377. )
  378. type = CSVChoiceField(
  379. choices=L2VPNTypeChoices,
  380. help_text='L2VPN type'
  381. )
  382. class Meta:
  383. model = L2VPN
  384. fields = ('identifier', 'name', 'slug', 'type', 'description', 'comments')
  385. class L2VPNTerminationCSVForm(NetBoxModelCSVForm):
  386. l2vpn = CSVModelChoiceField(
  387. queryset=L2VPN.objects.all(),
  388. required=True,
  389. to_field_name='name',
  390. label='L2VPN',
  391. )
  392. device = CSVModelChoiceField(
  393. queryset=Device.objects.all(),
  394. required=False,
  395. to_field_name='name',
  396. help_text='Parent device (for interface)'
  397. )
  398. virtual_machine = CSVModelChoiceField(
  399. queryset=VirtualMachine.objects.all(),
  400. required=False,
  401. to_field_name='name',
  402. help_text='Parent virtual machine (for interface)'
  403. )
  404. interface = CSVModelChoiceField(
  405. queryset=Interface.objects.none(), # Can also refer to VMInterface
  406. required=False,
  407. to_field_name='name',
  408. help_text='Assigned interface (device or VM)'
  409. )
  410. vlan = CSVModelChoiceField(
  411. queryset=VLAN.objects.all(),
  412. required=False,
  413. to_field_name='name',
  414. help_text='Assigned VLAN'
  415. )
  416. class Meta:
  417. model = L2VPNTermination
  418. fields = ('l2vpn', 'device', 'virtual_machine', 'interface', 'vlan')
  419. def __init__(self, data=None, *args, **kwargs):
  420. super().__init__(data, *args, **kwargs)
  421. if data:
  422. # Limit interface queryset by device or VM
  423. if data.get('device'):
  424. self.fields['interface'].queryset = Interface.objects.filter(
  425. **{f"device__{self.fields['device'].to_field_name}": data['device']}
  426. )
  427. elif data.get('virtual_machine'):
  428. self.fields['interface'].queryset = VMInterface.objects.filter(
  429. **{f"virtual_machine__{self.fields['virtual_machine'].to_field_name}": data['virtual_machine']}
  430. )
  431. def clean(self):
  432. super().clean()
  433. if self.cleaned_data.get('device') and self.cleaned_data.get('virtual_machine'):
  434. raise ValidationError('Cannot import device and VM interface terminations simultaneously.')
  435. if not (self.cleaned_data.get('interface') or self.cleaned_data.get('vlan')):
  436. raise ValidationError('Each termination must specify either an interface or a VLAN.')
  437. if self.cleaned_data.get('interface') and self.cleaned_data.get('vlan'):
  438. raise ValidationError('Cannot assign both an interface and a VLAN.')
  439. self.instance.assigned_object = self.cleaned_data.get('interface') or self.cleaned_data.get('vlan')