2
0
Эх сурвалжийг харах

Closes #17598: Add bulk creation for VLANs (#22377)

Martin Hauser 2 долоо хоног өмнө
parent
commit
d7de863681

+ 6 - 0
docs/models/ipam/vlan.md

@@ -2,6 +2,12 @@
 
 A Virtual LAN (VLAN) represents an isolated layer two domain, identified by a name and a numeric ID (1-4094) as defined in [IEEE 802.1Q](https://en.wikipedia.org/wiki/IEEE_802.1Q). VLANs are arranged into [VLAN groups](./vlangroup.md) to define scope and to enforce uniqueness.
 
+## Bulk Creation
+
+Multiple VLANs can be created at once by selecting the "Bulk Create" tab on the VLAN creation form. Enter the desired VLAN IDs and/or ID ranges as a comma-separated list (e.g. `100,200-210,4000-4010`). The string `{vid}` may be embedded in the name field as a placeholder for each VLAN's ID; for example, `VLAN-{vid}` yields `VLAN-100`, `VLAN-200`, and so on. All other attributes (status, role, tenant, etc.) are applied to every new VLAN.
+
+The operation is atomic: if any VLAN fails validation (for example, a VLAN ID falling outside the assigned group's permitted ranges), no VLANs are created.
+
 ## Fields
 
 ### ID

+ 17 - 1
netbox/ipam/forms/bulk_create.py

@@ -1,10 +1,12 @@
 from django import forms
 from django.utils.translation import gettext_lazy as _
 
-from utilities.forms.fields import ExpandableIPNetworkField
+from ipam.constants import VLAN_VID_MAX, VLAN_VID_MIN
+from utilities.forms.fields import ExpandableIPNetworkField, NumericArrayField
 
 __all__ = (
     'IPNetworkBulkCreateForm',
+    'VLANIDBulkCreateForm',
 )
 
 
@@ -15,3 +17,17 @@ class IPNetworkBulkCreateForm(forms.Form):
     pattern = ExpandableIPNetworkField(
         label=_('Pattern')
     )
+
+
+class VLANIDBulkCreateForm(forms.Form):
+    pattern = NumericArrayField(
+        base_field=forms.IntegerField(
+            min_value=VLAN_VID_MIN,
+            max_value=VLAN_VID_MAX
+        ),
+        label=_('VLAN IDs'),
+        help_text=_(
+            'Enter VLAN IDs and ranges separated by commas. '
+            'Example: 100,200-210,3100-3299'
+        )
+    )

+ 21 - 0
netbox/ipam/forms/model_forms.py

@@ -45,6 +45,7 @@ __all__ = (
     'ServiceCreateForm',
     'ServiceForm',
     'ServiceTemplateForm',
+    'VLANBulkAddForm',
     'VLANForm',
     'VLANGroupForm',
     'VLANTranslationPolicyForm',
@@ -727,6 +728,26 @@ class VLANForm(TenancyForm, PrimaryModelForm):
         ]
 
 
+class VLANBulkAddForm(VLANForm):
+    """
+    Subclass of VLANForm for bulk creation.
+
+    The VID field is inherited but excluded from the visible fieldsets, as it is
+    populated programmatically by BulkCreateView from the expanded pattern.
+    """
+    fieldsets = (
+        FieldSet('group', 'site', 'name', 'status', 'role', 'description', 'tags', name=_('VLAN')),
+        FieldSet('qinq_role', 'qinq_svlan', name=_('Q-in-Q/802.1ad')),
+        FieldSet('tenant_group', 'tenant', name=_('Tenancy')),
+    )
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self.fields['name'].help_text = _(
+            'Use {vid} as a placeholder for the VLAN ID. Example: VLAN-{vid}.'
+        )
+
+
 class VLANTranslationPolicyForm(PrimaryModelForm):
 
     fieldsets = (

+ 1 - 1
netbox/ipam/models/vlans.py

@@ -330,7 +330,7 @@ class VLAN(PrimaryModel):
                 )
 
         # Check that the VLAN ID is permitted in the assigned group (if any)
-        if self.group:
+        if self.group and self.vid is not None:
             if not any([self.vid in r for r in self.group.vid_ranges]):
                 raise ValidationError({
                     'vid': _(

+ 29 - 1
netbox/ipam/tests/test_forms.py

@@ -3,7 +3,7 @@ from django.test import TestCase
 
 from dcim.constants import InterfaceTypeChoices
 from dcim.models import Device, DeviceRole, DeviceType, Interface, Location, Manufacturer, Region, Site, SiteGroup
-from ipam.forms import PrefixForm
+from ipam.forms import PrefixForm, VLANIDBulkCreateForm
 from ipam.forms.bulk_import import IPAddressImportForm
 
 
@@ -96,3 +96,31 @@ class IPAddressImportFormTestCase(TestCase):
 
         self.device.refresh_from_db()
         self.assertEqual(self.device.oob_ip, ip1, "OOB IP was incorrectly cleared by a row with is_oob=False")
+
+
+class VLANFormTestCase(TestCase):
+
+    def test_bulk_create_valid_patterns(self):
+        """Single values, ranges, and combinations expand to sorted, deduplicated VLAN IDs."""
+        cases = (
+            ('100', [100]),
+            ('5,10,20', [5, 10, 20]),
+            ('10-20', list(range(10, 21))),
+            ('1,10-20,300-305', [1, *range(10, 21), *range(300, 306)]),
+            (' 5 , 7 - 9 ', [5, 7, 8, 9]),
+            ('5,5,4-6', [4, 5, 6]),
+        )
+        for pattern, expected in cases:
+            with self.subTest(pattern=pattern):
+                form = VLANIDBulkCreateForm({'pattern': pattern})
+                self.assertTrue(form.is_valid(), form.errors)
+                self.assertEqual(form.cleaned_data['pattern'], expected)
+
+    def test_bulk_create_invalid_patterns(self):
+        """Malformed, descending, or out-of-range patterns are rejected with an error on the pattern field."""
+        cases = ('', 'abc', '10,abc', '20-10', '10-', '5,', '-5', '0', '4095')
+        for pattern in cases:
+            with self.subTest(pattern=pattern):
+                form = VLANIDBulkCreateForm({'pattern': pattern})
+                self.assertFalse(form.is_valid())
+                self.assertIn('pattern', form.errors)

+ 7 - 0
netbox/ipam/tests/test_models.py

@@ -1808,6 +1808,13 @@ class VLANTestCase(TestCase):
         with self.assertRaises(ValidationError):
             vlan.full_clean()
 
+    def test_vlan_group_vid_validation_with_null_vid(self):
+        """A missing VID on a grouped VLAN raises a ValidationError, not a TypeError."""
+        group = VLANGroup.objects.create(name='VLAN Group 1', slug='vlan-group-1')
+        vlan = VLAN(name='VLAN X', vid=None, group=group)
+        with self.assertRaises(ValidationError):
+            vlan.full_clean()
+
 
 class PrefixGetChildIPsTestCase(TestCase):
     @classmethod

+ 173 - 0
netbox/ipam/tests/test_views.py

@@ -1,6 +1,7 @@
 import datetime
 
 from django.contrib.contenttypes.models import ContentType
+from django.db.backends.postgresql.psycopg_any import NumericRange
 from django.test import RequestFactory
 from django.urls import reverse
 from netaddr import IPNetwork
@@ -1501,6 +1502,178 @@ class VLANTestCase(ViewTestCases.PrimaryObjectViewTestCase):
             'description': 'New description',
         }
 
+    def test_bulk_add_vlans(self):
+        self.add_permissions('ipam.add_vlan')
+
+        group = VLANGroup.objects.get(name='VLAN Group 1')
+        initial_count = VLAN.objects.count()
+        expected_vids = (110, 120, 121, 122)
+
+        form_data = {
+            'pattern': '110,120-122',
+            'group': group.pk,
+            'name': 'Pool-{vid}',
+            'status': VLANStatusChoices.STATUS_RESERVED,
+        }
+
+        response = self.client.post(reverse('ipam:vlan_bulk_add'), form_data)
+
+        self.assertHttpStatus(response, 302)
+        self.assertEqual(VLAN.objects.count(), initial_count + len(expected_vids))
+
+        for vid in expected_vids:
+            self.assertTrue(
+                VLAN.objects.filter(
+                    group=group,
+                    vid=vid,
+                    name=f'Pool-{vid}'
+                ).exists()
+            )
+
+    def test_bulk_add_vlans_rolls_back_on_duplicate_name(self):
+        self.add_permissions('ipam.add_vlan')
+
+        group = VLANGroup.objects.get(name='VLAN Group 1')
+        initial_count = VLAN.objects.count()
+
+        form_data = {
+            'pattern': '110-112',
+            'group': group.pk,
+            'name': 'Duplicate name',
+            'status': VLANStatusChoices.STATUS_RESERVED,
+        }
+
+        response = self.client.post(reverse('ipam:vlan_bulk_add'), form_data)
+
+        self.assertHttpStatus(response, 200)
+        self.assertEqual(VLAN.objects.count(), initial_count)
+        self.assertFalse(VLAN.objects.filter(group=group, vid=110).exists())
+
+    def test_bulk_add_vlans_rolls_back_when_any_id_outside_group_range(self):
+        self.add_permissions('ipam.add_vlan')
+
+        group = VLANGroup.objects.create(
+            name='Restricted VLAN Group',
+            slug='restricted-vlan-group',
+            vid_ranges=[NumericRange(200, 204)]  # Valid VIDs: 200-203
+        )
+        initial_count = VLAN.objects.count()
+
+        form_data = {
+            'pattern': '200-203,500',
+            'group': group.pk,
+            'name': 'Restricted-{vid}',
+            'status': VLANStatusChoices.STATUS_RESERVED,
+        }
+
+        response = self.client.post(reverse('ipam:vlan_bulk_add'), form_data)
+
+        self.assertHttpStatus(response, 200)
+        self.assertEqual(VLAN.objects.count(), initial_count)
+        self.assertFalse(VLAN.objects.filter(group=group, vid=200).exists())
+        self.assertFalse(VLAN.objects.filter(group=group, vid=203).exists())
+        self.assertFalse(VLAN.objects.filter(group=group, vid=500).exists())
+
+    def test_bulk_add_vlans_pattern_shapes(self):
+        """Single values, multiple values, ranges, and combinations create the expected VLANs."""
+        self.add_permissions('ipam.add_vlan')
+        # The combination runs against a second group: subTests share one transaction, and VIDs
+        # 10 & 20 would otherwise collide with the multiple-values case via the (group, vid) constraint.
+        cases = (
+            ('500', (500,), 'VLAN Group 1'),
+            ('5,10,20', (5, 10, 20), 'VLAN Group 1'),
+            ('600-605', tuple(range(600, 606)), 'VLAN Group 1'),
+            ('1,10-20,300-305', (1, *range(10, 21), *range(300, 306)), 'VLAN Group 2'),
+        )
+        for pattern, expected_vids, group_name in cases:
+            with self.subTest(pattern=pattern):
+                group = VLANGroup.objects.get(name=group_name)
+                initial_count = VLAN.objects.count()
+                form_data = {
+                    'pattern': pattern,
+                    'group': group.pk,
+                    'name': 'Pool-{vid}',
+                    'status': VLANStatusChoices.STATUS_ACTIVE,
+                }
+                response = self.client.post(reverse('ipam:vlan_bulk_add'), form_data)
+                self.assertHttpStatus(response, 302)
+                self.assertEqual(VLAN.objects.count(), initial_count + len(expected_vids))
+                for vid in expected_vids:
+                    self.assertTrue(VLAN.objects.filter(group=group, vid=vid, name=f'Pool-{vid}').exists())
+
+    def test_bulk_add_vlans_invalid_pattern(self):
+        """An invalid pattern re-renders the form with a pattern error and creates nothing."""
+        self.add_permissions('ipam.add_vlan')
+        initial_count = VLAN.objects.count()
+
+        for pattern in ('abc', '20-10', '0', '4095', '10-'):
+            with self.subTest(pattern=pattern):
+                form_data = {
+                    'pattern': pattern,
+                    'name': 'Pool-{vid}',
+                    'status': VLANStatusChoices.STATUS_ACTIVE,
+                }
+                response = self.client.post(reverse('ipam:vlan_bulk_add'), form_data)
+                self.assertHttpStatus(response, 200)
+                self.assertIn('pattern', response.context['form'].errors)
+                self.assertEqual(VLAN.objects.count(), initial_count)
+
+    def test_bulk_add_vlans_static_name_without_group(self):
+        """A static name (no {vid} placeholder) is permitted across VLANs not assigned to a group."""
+        self.add_permissions('ipam.add_vlan')
+        initial_count = VLAN.objects.count()
+
+        form_data = {
+            'pattern': '710-712',
+            'name': 'Same name',
+            'status': VLANStatusChoices.STATUS_ACTIVE,
+        }
+        response = self.client.post(reverse('ipam:vlan_bulk_add'), form_data)
+
+        self.assertHttpStatus(response, 302)
+        self.assertEqual(VLAN.objects.count(), initial_count + 3)
+        self.assertEqual(VLAN.objects.filter(name='Same name').count(), 3)
+
+    def test_bulk_add_vlans_rolls_back_on_constrained_permission(self):
+        """Bulk creation rolls back when a generated VLAN falls outside the user's add constraints."""
+        obj_perm = ObjectPermission(
+            name='Test permission',
+            actions=['add'],
+            constraints={'vid__lt': 120}
+        )
+        obj_perm.save()
+        obj_perm.users.add(self.user)
+        obj_perm.object_types.add(ObjectType.objects.get_for_model(VLAN))
+
+        initial_count = VLAN.objects.count()
+        form_data = {
+            'pattern': '110,120-122',
+            'name': 'Pool-{vid}',
+            'status': VLANStatusChoices.STATUS_ACTIVE,
+        }
+        response = self.client.post(reverse('ipam:vlan_bulk_add'), form_data)
+
+        self.assertHttpStatus(response, 200)
+        self.assertEqual(VLAN.objects.count(), initial_count)
+        self.assertTrue(response.context['form'].non_field_errors())
+
+    def test_bulk_add_vlans_propagates_field_errors(self):
+        """A per-object validation error on a non-pattern field is reported on the bulk-create form."""
+        self.add_permissions('ipam.add_vlan')
+        initial_count = VLAN.objects.count()
+
+        form_data = {
+            'pattern': '800',
+            'name': 'Pool-{vid}',
+            'status': VLANStatusChoices.STATUS_ACTIVE,
+            'qinq_role': VLANQinQRoleChoices.ROLE_CUSTOMER,  # Requires an SVLAN
+        }
+        response = self.client.post(reverse('ipam:vlan_bulk_add'), form_data)
+
+        self.assertHttpStatus(response, 200)
+        self.assertEqual(VLAN.objects.count(), initial_count)
+        self.assertTrue(response.context['form'].non_field_errors())
+
 
 class VLANTranslationPolicyTestCase(ViewTestCases.PrimaryObjectViewTestCase):
     model = VLANTranslationPolicy

+ 10 - 0
netbox/ipam/views.py

@@ -1793,6 +1793,16 @@ class VLANDeleteView(generic.ObjectDeleteView):
     queryset = VLAN.objects.all()
 
 
+@register_model_view(VLAN, 'bulk_add', path='bulk-add', detail=False)
+class VLANBulkCreateView(generic.BulkCreateView):
+    queryset = VLAN.objects.all()
+    form = forms.VLANIDBulkCreateForm
+    model_form = forms.VLANBulkAddForm
+    pattern_target = 'vid'
+    pattern_template_fields = ('name',)
+    template_name = 'ipam/vlan_bulk_add.html'
+
+
 @register_model_view(VLAN, 'bulk_import', path='import', detail=False)
 class VLANBulkImportView(generic.BulkImportView):
     queryset = VLAN.objects.all()

+ 92 - 10
netbox/netbox/views/generic/bulk_views.py

@@ -27,7 +27,7 @@ from netbox.forms.bulk_rename import NetBoxModelBulkRenameForm
 from netbox.models.features import ChangeLoggingMixin
 from netbox.object_actions import AddObject, BulkDelete, BulkEdit, BulkExport, BulkImport, BulkRename
 from utilities.error_handlers import handle_protectederror
-from utilities.exceptions import AbortRequest, PermissionsViolation
+from utilities.exceptions import AbortRequest, AbortTransaction, PermissionsViolation
 from utilities.export import TableExport, stream_table_csv_response
 from utilities.forms import BulkDeleteForm, BulkRenameForm, restrict_form_fields
 from utilities.forms.bulk_import import BulkImportForm
@@ -245,11 +245,96 @@ class BulkCreateView(GetReturnURLMixin, BaseMultiObjectView):
     form = None
     model_form = None
     pattern_target = ''
+    pattern_template_fields = ()
     htmx_template_name = 'htmx/bulk_add_form.html'
 
     def get_required_permission(self):
         return get_permission_for_model(self.queryset.model, 'add')
 
+    def get_pattern_context(self, value):
+        """
+        Return a context mapping for substituting the generated pattern value into
+        model form fields.
+
+        By default, the field named by ``pattern_target`` is supported as a
+        placeholder, e.g. ``{vid}``.
+        """
+        if not self.pattern_target:
+            return {}
+
+        return {
+            self.pattern_target: str(value),
+        }
+
+    def render_pattern_template(self, template, value):
+        """
+        Replace pattern placeholders in a single form field value.
+        """
+        rendered = str(template)
+
+        for key, replacement in self.get_pattern_context(value).items():
+            rendered = rendered.replace(f'{{{key}}}', replacement)
+
+        return rendered
+
+    def apply_pattern_template_fields(self, data, value):
+        """
+        Apply the generated pattern value to any configured template fields.
+        """
+        for field_name in self.pattern_template_fields:
+            if field_name not in data:
+                continue
+
+            # QueryDict values may be multi-valued; preserve that behavior.
+            if hasattr(data, 'getlist') and hasattr(data, 'setlist'):
+                data.setlist(field_name, [
+                    self.render_pattern_template(field_value, value)
+                    for field_value in data.getlist(field_name)
+                ])
+            else:
+                data[field_name] = self.render_pattern_template(data[field_name], value)
+
+        return data
+
+    def get_model_form_data(self, form, request, value):
+        """
+        Return the submitted data to use when instantiating the model form for a
+        single generated pattern value.
+        """
+        data = request.POST.copy()
+        data[self.pattern_target] = value
+
+        return self.apply_pattern_template_fields(data, value)
+
+    def add_model_form_errors(self, form, model_form, value):
+        """
+        Copy validation errors from the generated object's model form back onto
+        the pattern form for display.
+        """
+        errors = model_form.errors.as_data()
+
+        if errors.get(self.pattern_target):
+            form.add_error('pattern', errors.pop(self.pattern_target))
+
+        for field_name, field_errors in errors.items():
+            if field_name == '__all__':
+                field_label = _('General')
+            elif field_name in model_form.fields:
+                field_label = model_form.fields[field_name].label
+            else:
+                field_label = field_name
+
+            for error in field_errors:
+                for message in error.messages:
+                    form.add_error(
+                        None,
+                        _('{value}: {field}: {error}').format(
+                            value=value,
+                            field=field_label,
+                            error=message,
+                        )
+                    )
+
     def _create_objects(self, form, request):
         new_objects = []
 
@@ -258,8 +343,7 @@ class BulkCreateView(GetReturnURLMixin, BaseMultiObjectView):
 
             # Reinstantiate the model form each time to avoid overwriting the same instance. Use a mutable
             # copy of the POST QueryDict so that we can update the target field value.
-            model_form = self.model_form(request.POST.copy())
-            model_form.data[self.pattern_target] = value
+            model_form = self.model_form(self.get_model_form_data(form, request, value))
 
             # Validate each new object independently.
             if model_form.is_valid():
@@ -267,12 +351,10 @@ class BulkCreateView(GetReturnURLMixin, BaseMultiObjectView):
                 obj = model_form.save()
                 new_objects.append(obj)
             else:
-                # Copy any errors on the pattern target field to the pattern form.
-                errors = model_form.errors.as_data()
-                if errors.get(self.pattern_target):
-                    form.add_error('pattern', errors[self.pattern_target])
-                # Raise an IntegrityError to break the for loop and abort the transaction.
-                raise IntegrityError()
+                self.add_model_form_errors(form, model_form, value)
+
+                # Abort the transaction and break out of the loop.
+                raise AbortTransaction()
 
         return new_objects
 
@@ -343,7 +425,7 @@ class BulkCreateView(GetReturnURLMixin, BaseMultiObjectView):
                     return redirect(request.path)
                 return redirect(self.get_return_url(request))
 
-            except IntegrityError:
+            except (AbortTransaction, IntegrityError):
                 pass
 
             except (AbortRequest, PermissionsViolation) as e:

+ 23 - 0
netbox/templates/ipam/inc/vlan_edit_header.html

@@ -0,0 +1,23 @@
+{% load helpers %}
+{% load i18n %}
+
+<ul class="nav nav-tabs">
+  <li class="nav-item">
+    <a href="{% if object.pk %}{% url 'ipam:vlan_edit' pk=object.pk %}{% else %}{% url 'ipam:vlan_add' %}{% querystring request %}{% endif %}"
+       class="nav-link{% if active_tab == 'add' %} active{% endif %}">
+      {% if object.pk %}
+        {% trans "VLAN" %}
+      {% else %}
+        {% trans "Create" %}
+      {% endif %}
+    </a>
+  </li>
+  {% if not object.pk %}
+    <li class="nav-item">
+      <a href="{% url 'ipam:vlan_bulk_add' %}{% querystring request %}"
+         class="nav-link{% if active_tab == 'bulk_add' %} active{% endif %}">
+        {% trans "Bulk Create" %}
+      </a>
+    </li>
+  {% endif %}
+</ul>

+ 5 - 0
netbox/templates/ipam/vlan_bulk_add.html

@@ -0,0 +1,5 @@
+{% extends 'generic/bulk_add.html' %}
+
+{% block tabs %}
+  {% include 'ipam/inc/vlan_edit_header.html' with active_tab='bulk_add' %}
+{% endblock tabs %}

+ 4 - 0
netbox/templates/ipam/vlan_edit.html

@@ -4,6 +4,10 @@
 {% load helpers %}
 {% load i18n %}
 
+{% block tabs %}
+  {% include 'ipam/inc/vlan_edit_header.html' with active_tab='add' %}
+{% endblock tabs %}
+
 {% block form %}
   {% for field in form.hidden_fields %}
     {{ field }}