Browse Source

Closes #21783: Fix support for bulk import of cables connected to power feeds

Jeremy Stretch 12 hours ago
parent
commit
6f135699e6
2 changed files with 92 additions and 23 deletions
  1. 69 22
      netbox/dcim/forms/bulk_import.py
  2. 23 1
      netbox/dcim/tests/test_views.py

+ 69 - 22
netbox/dcim/forms/bulk_import.py

@@ -1409,8 +1409,16 @@ class CableImportForm(PrimaryModelImportForm):
     side_a_device = CSVModelChoiceField(
         label=_('Side A device'),
         queryset=Device.objects.all(),
+        required=False,
+        to_field_name='name',
+        help_text=_('Device name (for device component terminations)')
+    )
+    side_a_power_panel = CSVModelChoiceField(
+        label=_('Side A power panel'),
+        queryset=PowerPanel.objects.all(),
+        required=False,
         to_field_name='name',
-        help_text=_('Device name')
+        help_text=_('Power panel name (for power feed terminations)')
     )
     side_a_type = CSVContentTypeField(
         label=_('Side A type'),
@@ -1434,8 +1442,16 @@ class CableImportForm(PrimaryModelImportForm):
     side_b_device = CSVModelChoiceField(
         label=_('Side B device'),
         queryset=Device.objects.all(),
+        required=False,
         to_field_name='name',
-        help_text=_('Device name')
+        help_text=_('Device name (for device component terminations)')
+    )
+    side_b_power_panel = CSVModelChoiceField(
+        label=_('Side B power panel'),
+        queryset=PowerPanel.objects.all(),
+        required=False,
+        to_field_name='name',
+        help_text=_('Power panel name (for power feed terminations)')
     )
     side_b_type = CSVContentTypeField(
         label=_('Side B type'),
@@ -1490,8 +1506,9 @@ class CableImportForm(PrimaryModelImportForm):
     class Meta:
         model = Cable
         fields = [
-            'side_a_site', 'side_a_device', 'side_a_type', 'side_a_name', 'side_b_site', 'side_b_device', 'side_b_type',
-            'side_b_name', 'type', 'status', 'profile', 'tenant', 'label', 'color', 'length', 'length_unit',
+            'side_a_site', 'side_a_device', 'side_a_power_panel', 'side_a_type', 'side_a_name',
+            'side_b_site', 'side_b_device', 'side_b_power_panel', 'side_b_type', 'side_b_name',
+            'type', 'status', 'profile', 'tenant', 'label', 'color', 'length', 'length_unit',
             'description', 'owner', 'comments', 'tags',
         ]
 
@@ -1505,6 +1522,9 @@ class CableImportForm(PrimaryModelImportForm):
                 self.fields['side_a_device'].queryset = self.fields['side_a_device'].queryset.filter(
                     **side_a_device_params
                 )
+                self.fields['side_a_power_panel'].queryset = self.fields['side_a_power_panel'].queryset.filter(
+                    **side_a_device_params
+                )
 
             # Limit choices for side_b_device to the assigned side_b_site
             if side_b_site := data.get('side_b_site'):
@@ -1512,6 +1532,9 @@ class CableImportForm(PrimaryModelImportForm):
                 self.fields['side_b_device'].queryset = self.fields['side_b_device'].queryset.filter(
                     **side_b_device_params
                 )
+                self.fields['side_b_power_panel'].queryset = self.fields['side_b_power_panel'].queryset.filter(
+                    **side_b_device_params
+                )
 
     def _clean_side(self, side):
         """
@@ -1522,33 +1545,57 @@ class CableImportForm(PrimaryModelImportForm):
         assert side in 'ab', f"Invalid side designation: {side}"
 
         device = self.cleaned_data.get(f'side_{side}_device')
+        power_panel = self.cleaned_data.get(f'side_{side}_power_panel')
         content_type = self.cleaned_data.get(f'side_{side}_type')
         name = self.cleaned_data.get(f'side_{side}_name')
-        if not device or not content_type or not name:
+        if not content_type or not name:
             return None
 
         model = content_type.model_class()
-        try:
-            if (
-                device.virtual_chassis and
-                device.virtual_chassis.master == device and
-                not model.objects.filter(device=device, name=name).exists()
-            ):
-                termination_object = model.objects.get(device__in=device.virtual_chassis.members.all(), name=name)
-            else:
-                termination_object = model.objects.get(device=device, name=name)
-            if termination_object.cable is not None and termination_object.cable != self.instance:
+
+        # PowerFeed terminations reference a PowerPanel, not a Device
+        if content_type.model == 'powerfeed':
+            if not power_panel:
+                return None
+            try:
+                termination_object = model.objects.get(power_panel=power_panel, name=name)
+                if termination_object.cable is not None and termination_object.cable != self.instance:
+                    raise forms.ValidationError(
+                        _("Side {side_upper}: {power_panel} {termination_object} is already connected").format(
+                            side_upper=side.upper(), power_panel=power_panel, termination_object=termination_object
+                        )
+                    )
+            except ObjectDoesNotExist:
                 raise forms.ValidationError(
-                    _("Side {side_upper}: {device} {termination_object} is already connected").format(
-                        side_upper=side.upper(), device=device, termination_object=termination_object
+                    _("{side_upper} side termination not found: {power_panel} {name}").format(
+                        side_upper=side.upper(), power_panel=power_panel, name=name
                     )
                 )
-        except ObjectDoesNotExist:
-            raise forms.ValidationError(
-                _("{side_upper} side termination not found: {device} {name}").format(
-                    side_upper=side.upper(), device=device, name=name
+        else:
+            if not device:
+                return None
+            try:
+                if (
+                    device.virtual_chassis and
+                    device.virtual_chassis.master == device and
+                    not model.objects.filter(device=device, name=name).exists()
+                ):
+                    termination_object = model.objects.get(device__in=device.virtual_chassis.members.all(), name=name)
+                else:
+                    termination_object = model.objects.get(device=device, name=name)
+                if termination_object.cable is not None and termination_object.cable != self.instance:
+                    raise forms.ValidationError(
+                        _("Side {side_upper}: {device} {termination_object} is already connected").format(
+                            side_upper=side.upper(), device=device, termination_object=termination_object
+                        )
+                    )
+            except ObjectDoesNotExist:
+                raise forms.ValidationError(
+                    _("{side_upper} side termination not found: {device} {name}").format(
+                        side_upper=side.upper(), device=device, name=name
+                    )
                 )
-            )
+
         setattr(self.instance, f'{side}_terminations', [termination_object])
         return termination_object
 

+ 23 - 1
netbox/dcim/tests/test_views.py

@@ -3603,6 +3603,21 @@ class CableTestCase(
         cable3 = Cable(a_terminations=[interfaces[2]], b_terminations=[interfaces[5]], type=CableTypeChoices.TYPE_CAT6)
         cable3.save()
 
+        # Power panel, power feeds, and power ports for powerfeed-to-powerport cable import tests
+        power_panel = PowerPanel.objects.create(site=sites[0], name='Power Panel 1')
+        power_feeds = (
+            PowerFeed(name='Power Feed 1', power_panel=power_panel),
+            PowerFeed(name='Power Feed 2', power_panel=power_panel),
+            PowerFeed(name='Power Feed 3', power_panel=power_panel),
+        )
+        PowerFeed.objects.bulk_create(power_feeds)
+        power_ports = (
+            PowerPort(device=devices[3], name='Power Port 1'),
+            PowerPort(device=devices[3], name='Power Port 2'),
+            PowerPort(device=devices[3], name='Power Port 3'),
+        )
+        PowerPort.objects.bulk_create(power_ports)
+
         tags = create_tags('Alpha', 'Bravo', 'Charlie')
 
         cls.form_data = {
@@ -3640,7 +3655,14 @@ class CableTestCase(
                 "Site 1,Device 3,dcim.interface,Interface 3,Site 2,Device 1,dcim.interface,Interface 3",
                 "Site 1,Device 1,dcim.interface,Device 2 Interface,Site 2,Device 1,dcim.interface,Interface 4",
                 "Site 1,Device 1,dcim.interface,Device 3 Interface,Site 2,Device 1,dcim.interface,Interface 5",
-            )
+            ),
+            'powerfeed-to-powerport': (
+                # Ensure that powerfeed-to-powerport cables can be imported via CSV using side_a_power_panel
+                "side_a_power_panel,side_a_type,side_a_name,side_b_device,side_b_type,side_b_name",
+                "Power Panel 1,dcim.powerfeed,Power Feed 1,Device 4,dcim.powerport,Power Port 1",
+                "Power Panel 1,dcim.powerfeed,Power Feed 2,Device 4,dcim.powerport,Power Port 2",
+                "Power Panel 1,dcim.powerfeed,Power Feed 3,Device 4,dcim.powerport,Power Port 3",
+            ),
         }
 
         cls.csv_update_data = (