Sfoglia il codice sorgente

Closes #21260: Defer object serialization for events pipeline (#21286)

Jeremy Stretch 2 settimane fa
parent
commit
a8c997ff29
2 ha cambiato i file con 61 aggiunte e 57 eliminazioni
  1. 54 46
      netbox/extras/events.py
  2. 7 11
      netbox/extras/signals.py

+ 54 - 46
netbox/extras/events.py

@@ -1,5 +1,5 @@
 import logging
-from collections import defaultdict
+from collections import UserDict, defaultdict
 
 from django.conf import settings
 from django.utils import timezone
@@ -12,7 +12,6 @@ from core.models import ObjectType
 from netbox.config import get_config
 from netbox.constants import RQ_QUEUE_DEFAULT
 from netbox.models.features import has_feature
-from users.models import User
 from utilities.api import get_serializer_for_model
 from utilities.request import copy_safe_request
 from utilities.rqworker import get_rq_retry
@@ -23,6 +22,21 @@ from .models import EventRule
 logger = logging.getLogger('netbox.events_processor')
 
 
+class EventContext(UserDict):
+    """
+    A custom dictionary that automatically serializes its associated object on demand.
+    """
+
+    # We're emulating a dictionary here (rather than using a custom class) because prior to NetBox v4.5.2, events were
+    # queued as dictionaries for processing by handles in EVENTS_PIPELINE. We need to avoid introducing any breaking
+    # changes until a suitable minor release.
+    def __getitem__(self, item):
+        if item == 'data' and 'data' not in self:
+            data = serialize_for_event(self['object'])
+            self.__setitem__('data', data)
+        return super().__getitem__(item)
+
+
 def serialize_for_event(instance):
     """
     Return a serialized representation of the given instance suitable for use in a queued event.
@@ -66,37 +80,42 @@ def enqueue_event(queue, instance, request, event_type):
     assert instance.pk is not None
     key = f'{app_label}.{model_name}:{instance.pk}'
     if key in queue:
-        queue[key]['data'] = serialize_for_event(instance)
         queue[key]['snapshots']['postchange'] = get_snapshots(instance, event_type)['postchange']
         # If the object is being deleted, update any prior "update" event to "delete"
         if event_type == OBJECT_DELETED:
             queue[key]['event_type'] = event_type
     else:
-        queue[key] = {
-            'object_type': ObjectType.objects.get_for_model(instance),
-            'object_id': instance.pk,
-            'event_type': event_type,
-            'data': serialize_for_event(instance),
-            'snapshots': get_snapshots(instance, event_type),
-            'request': request,
+        queue[key] = EventContext(
+            object_type=ObjectType.objects.get_for_model(instance),
+            object_id=instance.pk,
+            object=instance,
+            event_type=event_type,
+            snapshots=get_snapshots(instance, event_type),
+            request=request,
+            user=request.user,
             # Legacy request attributes for backward compatibility
-            'username': request.user.username,
-            'request_id': request.id,
-        }
+            username=request.user.username,
+            request_id=request.id,
+        )
+    # Force serialization of objects prior to them actually being deleted
+    if event_type == OBJECT_DELETED:
+        queue[key]['data'] = serialize_for_event(instance)
 
 
-def process_event_rules(event_rules, object_type, event_type, data, username=None, snapshots=None, request=None):
-    user = None  # To be resolved from the username if needed
+def process_event_rules(event_rules, object_type, event):
+    """
+    Process a list of EventRules against an event.
+    """
 
     for event_rule in event_rules:
 
         # Evaluate event rule conditions (if any)
-        if not event_rule.eval_conditions(data):
+        if not event_rule.eval_conditions(event['data']):
             continue
 
         # Compile event data
         event_data = event_rule.action_data or {}
-        event_data.update(data)
+        event_data.update(event['data'])
 
         # Webhooks
         if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
@@ -109,50 +128,41 @@ def process_event_rules(event_rules, object_type, event_type, data, username=Non
             params = {
                 "event_rule": event_rule,
                 "object_type": object_type,
-                "event_type": event_type,
+                "event_type": event['event_type'],
                 "data": event_data,
-                "snapshots": snapshots,
+                "snapshots": event.get('snapshots'),
                 "timestamp": timezone.now().isoformat(),
-                "username": username,
+                "username": event['username'],
                 "retry": get_rq_retry()
             }
-            if snapshots:
-                params["snapshots"] = snapshots
-            if request:
+            if 'request' in event:
                 # Exclude FILES - webhooks don't need uploaded files,
                 # which can cause pickle errors with Pillow.
-                params["request"] = copy_safe_request(request, include_files=False)
+                params['request'] = copy_safe_request(event['request'], include_files=False)
 
             # Enqueue the task
-            rq_queue.enqueue(
-                "extras.webhooks.send_webhook",
-                **params
-            )
+            rq_queue.enqueue('extras.webhooks.send_webhook', **params)
 
         # Scripts
         elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
             # Resolve the script from action parameters
             script = event_rule.action_object.python_class()
 
-            # Retrieve the User if not already resolved
-            if user is None:
-                user = User.objects.get(username=username)
-
             # Enqueue a Job to record the script's execution
             from extras.jobs import ScriptJob
             params = {
                 "instance": event_rule.action_object,
                 "name": script.name,
-                "user": user,
+                "user": event['user'],
                 "data": event_data
             }
-            if snapshots:
-                params["snapshots"] = snapshots
-            if request:
-                params["request"] = copy_safe_request(request)
-            ScriptJob.enqueue(
-                **params
-            )
+            if 'snapshots' in event:
+                params['snapshots'] = event['snapshots']
+            if 'request' in event:
+                params['request'] = copy_safe_request(event['request'])
+
+            # Enqueue the job
+            ScriptJob.enqueue(**params)
 
         # Notification groups
         elif event_rule.action_type == EventRuleActionChoices.NOTIFICATION:
@@ -161,7 +171,7 @@ def process_event_rules(event_rules, object_type, event_type, data, username=Non
                 object_type=object_type,
                 object_id=event_data['id'],
                 object_repr=event_data.get('display'),
-                event_type=event_type
+                event_type=event['event_type']
             )
 
         else:
@@ -173,6 +183,8 @@ def process_event_rules(event_rules, object_type, event_type, data, username=Non
 def process_event_queue(events):
     """
     Flush a list of object representation to RQ for EventRule processing.
+
+    This is the default processor listed in EVENTS_PIPELINE.
     """
     events_cache = defaultdict(dict)
 
@@ -192,11 +204,7 @@ def process_event_queue(events):
         process_event_rules(
             event_rules=event_rules,
             object_type=object_type,
-            event_type=event['event_type'],
-            data=event['data'],
-            username=event['username'],
-            snapshots=event['snapshots'],
-            request=event['request'],
+            event=event,
         )
 
 

+ 7 - 11
netbox/extras/signals.py

@@ -4,7 +4,7 @@ from django.dispatch import receiver
 
 from core.events import *
 from core.signals import job_end, job_start
-from extras.events import process_event_rules
+from extras.events import EventContext, process_event_rules
 from extras.models import EventRule, Notification, Subscription
 from netbox.config import get_config
 from netbox.models.features import has_feature
@@ -102,14 +102,12 @@ def process_job_start_event_rules(sender, **kwargs):
         enabled=True,
         object_types=sender.object_type
     )
-    username = sender.user.username if sender.user else None
-    process_event_rules(
-        event_rules=event_rules,
-        object_type=sender.object_type,
+    event = EventContext(
         event_type=JOB_STARTED,
         data=sender.data,
-        username=username
+        user=sender.user,
     )
+    process_event_rules(event_rules, sender.object_type, event)
 
 
 @receiver(job_end)
@@ -122,14 +120,12 @@ def process_job_end_event_rules(sender, **kwargs):
         enabled=True,
         object_types=sender.object_type
     )
-    username = sender.user.username if sender.user else None
-    process_event_rules(
-        event_rules=event_rules,
-        object_type=sender.object_type,
+    event = EventContext(
         event_type=JOB_COMPLETED,
         data=sender.data,
-        username=username
+        user=sender.user,
     )
+    process_event_rules(event_rules, sender.object_type, event)
 
 
 #