2
0

events.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. import logging
  2. from collections import defaultdict
  3. from django.conf import settings
  4. from django.utils import timezone
  5. from django.utils.module_loading import import_string
  6. from django.utils.translation import gettext as _
  7. from django_rq import get_queue
  8. from core.events import *
  9. from core.models import ObjectType
  10. from netbox.config import get_config
  11. from netbox.constants import RQ_QUEUE_DEFAULT
  12. from netbox.models.features import has_feature
  13. from users.models import User
  14. from utilities.api import get_serializer_for_model
  15. from utilities.request import copy_safe_request
  16. from utilities.rqworker import get_rq_retry
  17. from utilities.serialization import serialize_object
  18. from .choices import EventRuleActionChoices
  19. from .models import EventRule
  20. logger = logging.getLogger('netbox.events_processor')
  21. def serialize_for_event(instance):
  22. """
  23. Return a serialized representation of the given instance suitable for use in a queued event.
  24. """
  25. serializer_class = get_serializer_for_model(instance.__class__)
  26. serializer_context = {
  27. 'request': None,
  28. }
  29. serializer = serializer_class(instance, context=serializer_context)
  30. return serializer.data
  31. def get_snapshots(instance, event_type):
  32. snapshots = {
  33. 'prechange': getattr(instance, '_prechange_snapshot', None),
  34. 'postchange': None,
  35. }
  36. if event_type != OBJECT_DELETED:
  37. # Use model's serialize_object() method if defined; fall back to serialize_object() utility function
  38. if hasattr(instance, 'serialize_object'):
  39. snapshots['postchange'] = instance.serialize_object()
  40. else:
  41. snapshots['postchange'] = serialize_object(instance)
  42. return snapshots
  43. def enqueue_event(queue, instance, request, event_type):
  44. """
  45. Enqueue a serialized representation of a created/updated/deleted object for the processing of
  46. events once the request has completed.
  47. """
  48. # Bail if this type of object does not support event rules
  49. if not has_feature(instance, 'event_rules'):
  50. return
  51. app_label = instance._meta.app_label
  52. model_name = instance._meta.model_name
  53. assert instance.pk is not None
  54. key = f'{app_label}.{model_name}:{instance.pk}'
  55. if key in queue:
  56. queue[key]['data'] = serialize_for_event(instance)
  57. queue[key]['snapshots']['postchange'] = get_snapshots(instance, event_type)['postchange']
  58. # If the object is being deleted, update any prior "update" event to "delete"
  59. if event_type == OBJECT_DELETED:
  60. queue[key]['event_type'] = event_type
  61. else:
  62. queue[key] = {
  63. 'object_type': ObjectType.objects.get_for_model(instance),
  64. 'object_id': instance.pk,
  65. 'event_type': event_type,
  66. 'data': serialize_for_event(instance),
  67. 'snapshots': get_snapshots(instance, event_type),
  68. 'request': request,
  69. # Legacy request attributes for backward compatibility
  70. 'username': request.user.username,
  71. 'request_id': request.id,
  72. }
  73. def process_event_rules(event_rules, object_type, event_type, data, username=None, snapshots=None, request=None):
  74. user = User.objects.get(username=username) if username else None
  75. for event_rule in event_rules:
  76. # Evaluate event rule conditions (if any)
  77. if not event_rule.eval_conditions(data):
  78. continue
  79. # Compile event data
  80. event_data = event_rule.action_data or {}
  81. event_data.update(data)
  82. # Webhooks
  83. if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
  84. # Select the appropriate RQ queue
  85. queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT)
  86. rq_queue = get_queue(queue_name)
  87. # Compile the task parameters
  88. params = {
  89. "event_rule": event_rule,
  90. "object_type": object_type,
  91. "event_type": event_type,
  92. "data": event_data,
  93. "snapshots": snapshots,
  94. "timestamp": timezone.now().isoformat(),
  95. "username": username,
  96. "retry": get_rq_retry()
  97. }
  98. if snapshots:
  99. params["snapshots"] = snapshots
  100. if request:
  101. params["request"] = copy_safe_request(request)
  102. # Enqueue the task
  103. rq_queue.enqueue(
  104. "extras.webhooks.send_webhook",
  105. **params
  106. )
  107. # Scripts
  108. elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
  109. # Resolve the script from action parameters
  110. script = event_rule.action_object.python_class()
  111. # Enqueue a Job to record the script's execution
  112. from extras.jobs import ScriptJob
  113. params = {
  114. "instance": event_rule.action_object,
  115. "name": script.name,
  116. "user": user,
  117. "data": event_data
  118. }
  119. if snapshots:
  120. params["snapshots"] = snapshots
  121. if request:
  122. params["request"] = copy_safe_request(request)
  123. ScriptJob.enqueue(
  124. **params
  125. )
  126. # Notification groups
  127. elif event_rule.action_type == EventRuleActionChoices.NOTIFICATION:
  128. # Bulk-create notifications for all members of the notification group
  129. event_rule.action_object.notify(
  130. object_type=object_type,
  131. object_id=event_data['id'],
  132. object_repr=event_data.get('display'),
  133. event_type=event_type
  134. )
  135. else:
  136. raise ValueError(_("Unknown action type for an event rule: {action_type}").format(
  137. action_type=event_rule.action_type
  138. ))
  139. def process_event_queue(events):
  140. """
  141. Flush a list of object representation to RQ for EventRule processing.
  142. """
  143. events_cache = defaultdict(dict)
  144. for event in events:
  145. event_type = event['event_type']
  146. object_type = event['object_type']
  147. # Cache applicable Event Rules
  148. if object_type not in events_cache[event_type]:
  149. events_cache[event_type][object_type] = EventRule.objects.filter(
  150. event_types__contains=[event['event_type']],
  151. object_types=object_type,
  152. enabled=True
  153. )
  154. event_rules = events_cache[event_type][object_type]
  155. process_event_rules(
  156. event_rules=event_rules,
  157. object_type=object_type,
  158. event_type=event['event_type'],
  159. data=event['data'],
  160. username=event['username'],
  161. snapshots=event['snapshots'],
  162. request=event['request'],
  163. )
  164. def flush_events(events):
  165. """
  166. Flush a list of object representations to RQ for event processing.
  167. """
  168. if events:
  169. for name in settings.EVENTS_PIPELINE:
  170. try:
  171. func = import_string(name)
  172. func(events)
  173. except ImportError as e:
  174. logger.error(_("Cannot import events pipeline {name} error: {error}").format(name=name, error=e))