context_managers.py 682 B

1234567891011121314151617181920212223242526
  1. from contextlib import contextmanager
  2. from netbox.context import current_request, events_queue
  3. from .events import flush_events
  4. @contextmanager
  5. def event_tracking(request):
  6. """
  7. Queue interesting events in memory while processing a request, then flush that queue for processing by the
  8. events pipline before returning the response.
  9. :param request: WSGIRequest object with a unique `id` set
  10. """
  11. current_request.set(request)
  12. events_queue.set({})
  13. yield
  14. # Flush queued webhooks to RQ
  15. if events := list(events_queue.get().values()):
  16. flush_events(events)
  17. # Clear context vars
  18. current_request.set(None)
  19. events_queue.set({})