Просмотр исходного кода

Use context vars instead of thread-local storage for change logging

jeremystretch 3 лет назад
Родитель
Сommit
cd8943144b

+ 7 - 8
netbox/extras/context_managers.py

@@ -3,8 +3,7 @@ from contextlib import contextmanager
 from django.db.models.signals import m2m_changed, pre_delete, post_save
 
 from extras.signals import clear_webhooks, clear_webhook_queue, handle_changed_object, handle_deleted_object
-from netbox import thread_locals
-from netbox.request_context import set_request
+from netbox.context import current_request, webhooks_queue
 from .webhooks import flush_webhooks
 
 
@@ -16,8 +15,8 @@ def change_logging(request):
 
     :param request: WSGIRequest object with a unique `id` set
     """
-    set_request(request)
-    thread_locals.webhook_queue = []
+    current_request.set(request)
+    webhooks_queue.set([])
 
     # Connect our receivers to the post_save and post_delete signals.
     post_save.connect(handle_changed_object, dispatch_uid='handle_changed_object')
@@ -35,8 +34,8 @@ def change_logging(request):
     clear_webhooks.disconnect(clear_webhook_queue, dispatch_uid='clear_webhook_queue')
 
     # Flush queued webhooks to RQ
-    flush_webhooks(thread_locals.webhook_queue)
-    del thread_locals.webhook_queue
+    flush_webhooks(webhooks_queue.get())
 
-    # Clear the request from thread-local storage
-    set_request(None)
+    # Clear context vars
+    current_request.set(None)
+    webhooks_queue.set([])

+ 14 - 15
netbox/extras/signals.py

@@ -7,9 +7,8 @@ from django.dispatch import receiver, Signal
 from django_prometheus.models import model_deletes, model_inserts, model_updates
 
 from extras.validators import CustomValidator
-from netbox import thread_locals
 from netbox.config import get_config
-from netbox.request_context import get_request
+from netbox.context import current_request, webhooks_queue
 from netbox.signals import post_clean
 from .choices import ObjectChangeActionChoices
 from .models import ConfigRevision, CustomField, ObjectChange
@@ -30,7 +29,7 @@ def handle_changed_object(sender, instance, **kwargs):
     if not hasattr(instance, 'to_objectchange'):
         return
 
-    request = get_request()
+    request = current_request.get()
     m2m_changed = False
 
     def is_same_object(instance, webhook_data):
@@ -69,13 +68,14 @@ def handle_changed_object(sender, instance, **kwargs):
             objectchange.save()
 
     # If this is an M2M change, update the previously queued webhook (from post_save)
-    webhook_queue = thread_locals.webhook_queue
-    if m2m_changed and webhook_queue and is_same_object(instance, webhook_queue[-1]):
+    queue = webhooks_queue.get()
+    if m2m_changed and queue and is_same_object(instance, queue[-1]):
         instance.refresh_from_db()  # Ensure that we're working with fresh M2M assignments
-        webhook_queue[-1]['data'] = serialize_for_webhook(instance)
-        webhook_queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange']
+        queue[-1]['data'] = serialize_for_webhook(instance)
+        queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange']
     else:
-        enqueue_object(webhook_queue, instance, request.user, request.id, action)
+        enqueue_object(queue, instance, request.user, request.id, action)
+    webhooks_queue.set(queue)
 
     # Increment metric counters
     if action == ObjectChangeActionChoices.ACTION_CREATE:
@@ -91,7 +91,7 @@ def handle_deleted_object(sender, instance, **kwargs):
     if not hasattr(instance, 'to_objectchange'):
         return
 
-    request = get_request()
+    request = current_request.get()
 
     # Record an ObjectChange if applicable
     if hasattr(instance, 'to_objectchange'):
@@ -101,8 +101,9 @@ def handle_deleted_object(sender, instance, **kwargs):
         objectchange.save()
 
     # Enqueue webhooks
-    webhook_queue = thread_locals.webhook_queue
-    enqueue_object(webhook_queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE)
+    queue = webhooks_queue.get()
+    enqueue_object(queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE)
+    webhooks_queue.set(queue)
 
     # Increment metric counters
     model_deletes.labels(instance._meta.model_name).inc()
@@ -113,10 +114,8 @@ def clear_webhook_queue(sender, **kwargs):
     Delete any queued webhooks (e.g. because of an aborted bulk transaction)
     """
     logger = logging.getLogger('webhooks')
-    webhook_queue = thread_locals.webhook_queue
-
-    logger.info(f"Clearing {len(webhook_queue)} queued webhooks ({sender})")
-    webhook_queue.clear()
+    logger.info(f"Clearing {len(webhooks_queue.get())} queued webhooks ({sender})")
+    webhooks_queue.set([])
 
 
 #

+ 0 - 3
netbox/netbox/__init__.py

@@ -1,3 +0,0 @@
-import threading
-
-thread_locals = threading.local()

+ 10 - 0
netbox/netbox/context.py

@@ -0,0 +1,10 @@
+from contextvars import ContextVar
+
+__all__ = (
+    'current_request',
+    'webhooks_queue',
+)
+
+
+current_request = ContextVar('current_request')
+webhooks_queue = ContextVar('webhooks_queue')

+ 0 - 9
netbox/netbox/request_context.py

@@ -1,9 +0,0 @@
-from netbox import thread_locals
-
-
-def set_request(request):
-    thread_locals.request = request
-
-
-def get_request():
-    return getattr(thread_locals, 'request', None)