events.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. import logging
  2. from collections import defaultdict
  3. from django.conf import settings
  4. from django.contrib.contenttypes.models import ContentType
  5. from django.utils import timezone
  6. from django.utils.module_loading import import_string
  7. from django.utils.translation import gettext as _
  8. from django_rq import get_queue
  9. from core.events import *
  10. from netbox.config import get_config
  11. from netbox.constants import RQ_QUEUE_DEFAULT
  12. from netbox.registry import registry
  13. from users.models import User
  14. from utilities.api import get_serializer_for_model
  15. from utilities.rqworker import get_rq_retry
  16. from utilities.serialization import serialize_object
  17. from .choices import EventRuleActionChoices
  18. from .models import EventRule
  19. logger = logging.getLogger('netbox.events_processor')
  20. def serialize_for_event(instance):
  21. """
  22. Return a serialized representation of the given instance suitable for use in a queued event.
  23. """
  24. serializer_class = get_serializer_for_model(instance.__class__)
  25. serializer_context = {
  26. 'request': None,
  27. }
  28. serializer = serializer_class(instance, context=serializer_context)
  29. return serializer.data
  30. def get_snapshots(instance, event_type):
  31. snapshots = {
  32. 'prechange': getattr(instance, '_prechange_snapshot', None),
  33. 'postchange': None,
  34. }
  35. if event_type != OBJECT_DELETED:
  36. # Use model's serialize_object() method if defined; fall back to serialize_object() utility function
  37. if hasattr(instance, 'serialize_object'):
  38. snapshots['postchange'] = instance.serialize_object()
  39. else:
  40. snapshots['postchange'] = serialize_object(instance)
  41. return snapshots
  42. def enqueue_event(queue, instance, user, request_id, event_type):
  43. """
  44. Enqueue a serialized representation of a created/updated/deleted object for the processing of
  45. events once the request has completed.
  46. """
  47. # Determine whether this type of object supports event rules
  48. app_label = instance._meta.app_label
  49. model_name = instance._meta.model_name
  50. if model_name not in registry['model_features']['event_rules'].get(app_label, []):
  51. return
  52. assert instance.pk is not None
  53. key = f'{app_label}.{model_name}:{instance.pk}'
  54. if key in queue:
  55. queue[key]['data'] = serialize_for_event(instance)
  56. queue[key]['snapshots']['postchange'] = get_snapshots(instance, event_type)['postchange']
  57. # If the object is being deleted, update any prior "update" event to "delete"
  58. if event_type == OBJECT_DELETED:
  59. queue[key]['event_type'] = event_type
  60. else:
  61. queue[key] = {
  62. 'object_type': ContentType.objects.get_for_model(instance),
  63. 'object_id': instance.pk,
  64. 'event_type': event_type,
  65. 'data': serialize_for_event(instance),
  66. 'snapshots': get_snapshots(instance, event_type),
  67. 'username': user.username,
  68. 'request_id': request_id
  69. }
  70. def process_event_rules(event_rules, object_type, event_type, data, username=None, snapshots=None, request_id=None):
  71. user = User.objects.get(username=username) if username else None
  72. for event_rule in event_rules:
  73. # Evaluate event rule conditions (if any)
  74. if not event_rule.eval_conditions(data):
  75. continue
  76. # Webhooks
  77. if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
  78. # Select the appropriate RQ queue
  79. queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT)
  80. rq_queue = get_queue(queue_name)
  81. # Compile the task parameters
  82. params = {
  83. "event_rule": event_rule,
  84. "model_name": object_type.model,
  85. "event_type": event_type,
  86. "data": data,
  87. "snapshots": snapshots,
  88. "timestamp": timezone.now().isoformat(),
  89. "username": username,
  90. "retry": get_rq_retry()
  91. }
  92. if snapshots:
  93. params["snapshots"] = snapshots
  94. if request_id:
  95. params["request_id"] = request_id
  96. # Enqueue the task
  97. rq_queue.enqueue(
  98. "extras.webhooks.send_webhook",
  99. **params
  100. )
  101. # Scripts
  102. elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
  103. # Resolve the script from action parameters
  104. script = event_rule.action_object.python_class()
  105. # Enqueue a Job to record the script's execution
  106. from extras.jobs import ScriptJob
  107. ScriptJob.enqueue(
  108. instance=event_rule.action_object,
  109. name=script.name,
  110. user=user,
  111. data=data
  112. )
  113. # Notification groups
  114. elif event_rule.action_type == EventRuleActionChoices.NOTIFICATION:
  115. # Bulk-create notifications for all members of the notification group
  116. event_rule.action_object.notify(
  117. object_type=object_type,
  118. object_id=data['id'],
  119. event_type=event_type
  120. )
  121. else:
  122. raise ValueError(_("Unknown action type for an event rule: {action_type}").format(
  123. action_type=event_rule.action_type
  124. ))
  125. def process_event_queue(events):
  126. """
  127. Flush a list of object representation to RQ for EventRule processing.
  128. """
  129. events_cache = defaultdict(dict)
  130. for event in events:
  131. event_type = event['event_type']
  132. object_type = event['object_type']
  133. # Cache applicable Event Rules
  134. if object_type not in events_cache[event_type]:
  135. events_cache[event_type][object_type] = EventRule.objects.filter(
  136. event_types__contains=[event['event_type']],
  137. object_types=object_type,
  138. enabled=True
  139. )
  140. event_rules = events_cache[event_type][object_type]
  141. process_event_rules(
  142. event_rules=event_rules,
  143. object_type=object_type,
  144. event_type=event['event_type'],
  145. data=event['data'],
  146. username=event['username'],
  147. snapshots=event['snapshots'],
  148. request_id=event['request_id']
  149. )
  150. def flush_events(events):
  151. """
  152. Flush a list of object representations to RQ for event processing.
  153. """
  154. if events:
  155. for name in settings.EVENTS_PIPELINE:
  156. try:
  157. func = import_string(name)
  158. func(events)
  159. except Exception as e:
  160. logger.error(_("Cannot import events pipeline {name} error: {error}").format(name=name, error=e))