events.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. import logging
  2. from collections import defaultdict
  3. from django.conf import settings
  4. from django.contrib.contenttypes.models import ContentType
  5. from django.utils import timezone
  6. from django.utils.module_loading import import_string
  7. from django.utils.translation import gettext as _
  8. from django_rq import get_queue
  9. from core.events import *
  10. from netbox.config import get_config
  11. from netbox.constants import RQ_QUEUE_DEFAULT
  12. from netbox.registry import registry
  13. from users.models import User
  14. from utilities.api import get_serializer_for_model
  15. from utilities.rqworker import get_rq_retry
  16. from utilities.serialization import serialize_object
  17. from .choices import EventRuleActionChoices
  18. from .models import EventRule
  19. logger = logging.getLogger('netbox.events_processor')
  20. def serialize_for_event(instance):
  21. """
  22. Return a serialized representation of the given instance suitable for use in a queued event.
  23. """
  24. serializer_class = get_serializer_for_model(instance.__class__)
  25. serializer_context = {
  26. 'request': None,
  27. }
  28. serializer = serializer_class(instance, context=serializer_context)
  29. return serializer.data
  30. def get_snapshots(instance, event_type):
  31. snapshots = {
  32. 'prechange': getattr(instance, '_prechange_snapshot', None),
  33. 'postchange': None,
  34. }
  35. if event_type != OBJECT_DELETED:
  36. # Use model's serialize_object() method if defined; fall back to serialize_object() utility function
  37. if hasattr(instance, 'serialize_object'):
  38. snapshots['postchange'] = instance.serialize_object()
  39. else:
  40. snapshots['postchange'] = serialize_object(instance)
  41. return snapshots
  42. def enqueue_event(queue, instance, user, request_id, event_type):
  43. """
  44. Enqueue a serialized representation of a created/updated/deleted object for the processing of
  45. events once the request has completed.
  46. """
  47. # Determine whether this type of object supports event rules
  48. app_label = instance._meta.app_label
  49. model_name = instance._meta.model_name
  50. if model_name not in registry['model_features']['event_rules'].get(app_label, []):
  51. return
  52. assert instance.pk is not None
  53. key = f'{app_label}.{model_name}:{instance.pk}'
  54. if key in queue:
  55. queue[key]['data'] = serialize_for_event(instance)
  56. queue[key]['snapshots']['postchange'] = get_snapshots(instance, event_type)['postchange']
  57. # If the object is being deleted, update any prior "update" event to "delete"
  58. if event_type == OBJECT_DELETED:
  59. queue[key]['event_type'] = event_type
  60. else:
  61. queue[key] = {
  62. 'object_type': ContentType.objects.get_for_model(instance),
  63. 'object_id': instance.pk,
  64. 'event_type': event_type,
  65. 'data': serialize_for_event(instance),
  66. 'snapshots': get_snapshots(instance, event_type),
  67. 'username': user.username,
  68. 'request_id': request_id
  69. }
  70. def process_event_rules(event_rules, object_type, event_type, data, username=None, snapshots=None, request_id=None):
  71. user = User.objects.get(username=username) if username else None
  72. for event_rule in event_rules:
  73. # Evaluate event rule conditions (if any)
  74. if not event_rule.eval_conditions(data):
  75. continue
  76. # Compile event data
  77. event_data = event_rule.action_data or {}
  78. event_data.update(data)
  79. # Webhooks
  80. if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
  81. # Select the appropriate RQ queue
  82. queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT)
  83. rq_queue = get_queue(queue_name)
  84. # Compile the task parameters
  85. params = {
  86. "event_rule": event_rule,
  87. "model_name": object_type.model,
  88. "event_type": event_type,
  89. "data": event_data,
  90. "snapshots": snapshots,
  91. "timestamp": timezone.now().isoformat(),
  92. "username": username,
  93. "retry": get_rq_retry()
  94. }
  95. if snapshots:
  96. params["snapshots"] = snapshots
  97. if request_id:
  98. params["request_id"] = request_id
  99. # Enqueue the task
  100. rq_queue.enqueue(
  101. "extras.webhooks.send_webhook",
  102. **params
  103. )
  104. # Scripts
  105. elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
  106. # Resolve the script from action parameters
  107. script = event_rule.action_object.python_class()
  108. # Enqueue a Job to record the script's execution
  109. from extras.jobs import ScriptJob
  110. ScriptJob.enqueue(
  111. instance=event_rule.action_object,
  112. name=script.name,
  113. user=user,
  114. data=event_data
  115. )
  116. # Notification groups
  117. elif event_rule.action_type == EventRuleActionChoices.NOTIFICATION:
  118. # Bulk-create notifications for all members of the notification group
  119. event_rule.action_object.notify(
  120. object_type=object_type,
  121. object_id=event_data['id'],
  122. object_repr=event_data.get('display'),
  123. event_type=event_type
  124. )
  125. else:
  126. raise ValueError(_("Unknown action type for an event rule: {action_type}").format(
  127. action_type=event_rule.action_type
  128. ))
  129. def process_event_queue(events):
  130. """
  131. Flush a list of object representation to RQ for EventRule processing.
  132. """
  133. events_cache = defaultdict(dict)
  134. for event in events:
  135. event_type = event['event_type']
  136. object_type = event['object_type']
  137. # Cache applicable Event Rules
  138. if object_type not in events_cache[event_type]:
  139. events_cache[event_type][object_type] = EventRule.objects.filter(
  140. event_types__contains=[event['event_type']],
  141. object_types=object_type,
  142. enabled=True
  143. )
  144. event_rules = events_cache[event_type][object_type]
  145. process_event_rules(
  146. event_rules=event_rules,
  147. object_type=object_type,
  148. event_type=event['event_type'],
  149. data=event['data'],
  150. username=event['username'],
  151. snapshots=event['snapshots'],
  152. request_id=event['request_id']
  153. )
  154. def flush_events(events):
  155. """
  156. Flush a list of object representations to RQ for event processing.
  157. """
  158. if events:
  159. for name in settings.EVENTS_PIPELINE:
  160. try:
  161. func = import_string(name)
  162. func(events)
  163. except Exception as e:
  164. logger.error(_("Cannot import events pipeline {name} error: {error}").format(name=name, error=e))