events.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. import logging
  2. from django.conf import settings
  3. from django.contrib.auth import get_user_model
  4. from django.contrib.contenttypes.models import ContentType
  5. from django.core.exceptions import ObjectDoesNotExist
  6. from django.utils import timezone
  7. from django.utils.module_loading import import_string
  8. from django_rq import get_queue
  9. from core.models import Job
  10. from netbox.config import get_config
  11. from netbox.constants import RQ_QUEUE_DEFAULT
  12. from netbox.registry import registry
  13. from utilities.api import get_serializer_for_model
  14. from utilities.rqworker import get_rq_retry
  15. from utilities.utils import serialize_object
  16. from .choices import *
  17. from .models import EventRule, ScriptModule
  18. logger = logging.getLogger('netbox.events_processor')
  19. def serialize_for_event(instance):
  20. """
  21. Return a serialized representation of the given instance suitable for use in a queued event.
  22. """
  23. serializer_class = get_serializer_for_model(instance.__class__)
  24. serializer_context = {
  25. 'request': None,
  26. }
  27. serializer = serializer_class(instance, context=serializer_context)
  28. return serializer.data
  29. def get_snapshots(instance, action):
  30. snapshots = {
  31. 'prechange': getattr(instance, '_prechange_snapshot', None),
  32. 'postchange': None,
  33. }
  34. if action != ObjectChangeActionChoices.ACTION_DELETE:
  35. # Use model's serialize_object() method if defined; fall back to serialize_object() utility function
  36. if hasattr(instance, 'serialize_object'):
  37. snapshots['postchange'] = instance.serialize_object()
  38. else:
  39. snapshots['postchange'] = serialize_object(instance)
  40. return snapshots
  41. def enqueue_object(queue, instance, user, request_id, action):
  42. """
  43. Enqueue a serialized representation of a created/updated/deleted object for the processing of
  44. events once the request has completed.
  45. """
  46. # Determine whether this type of object supports event rules
  47. app_label = instance._meta.app_label
  48. model_name = instance._meta.model_name
  49. if model_name not in registry['model_features']['event_rules'].get(app_label, []):
  50. return
  51. queue.append({
  52. 'content_type': ContentType.objects.get_for_model(instance),
  53. 'object_id': instance.pk,
  54. 'event': action,
  55. 'data': serialize_for_event(instance),
  56. 'snapshots': get_snapshots(instance, action),
  57. 'username': user.username,
  58. 'request_id': request_id
  59. })
  60. def process_event_rules(event_rules, model_name, event, data, username=None, snapshots=None, request_id=None):
  61. if username:
  62. user = get_user_model().objects.get(username=username)
  63. else:
  64. user = None
  65. for event_rule in event_rules:
  66. # Evaluate event rule conditions (if any)
  67. if not event_rule.eval_conditions(data):
  68. continue
  69. # Webhooks
  70. if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
  71. # Select the appropriate RQ queue
  72. queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT)
  73. rq_queue = get_queue(queue_name)
  74. # Compile the task parameters
  75. params = {
  76. "event_rule": event_rule,
  77. "model_name": model_name,
  78. "event": event,
  79. "data": data,
  80. "snapshots": snapshots,
  81. "timestamp": timezone.now().isoformat(),
  82. "username": username,
  83. "retry": get_rq_retry()
  84. }
  85. if snapshots:
  86. params["snapshots"] = snapshots
  87. if request_id:
  88. params["request_id"] = request_id
  89. # Enqueue the task
  90. rq_queue.enqueue(
  91. "extras.webhooks.send_webhook",
  92. **params
  93. )
  94. # Scripts
  95. elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
  96. # Resolve the script from action parameters
  97. script_module = event_rule.action_object
  98. script_name = event_rule.action_parameters['script_name']
  99. script = script_module.scripts[script_name]()
  100. # Enqueue a Job to record the script's execution
  101. Job.enqueue(
  102. "extras.scripts.run_script",
  103. instance=script_module,
  104. name=script.class_name,
  105. user=user,
  106. data=data
  107. )
  108. else:
  109. raise ValueError(f"Unknown action type for an event rule: {event_rule.action_type}")
  110. def process_event_queue(events):
  111. """
  112. Flush a list of object representation to RQ for EventRule processing.
  113. """
  114. events_cache = {
  115. 'type_create': {},
  116. 'type_update': {},
  117. 'type_delete': {},
  118. }
  119. for data in events:
  120. action_flag = {
  121. ObjectChangeActionChoices.ACTION_CREATE: 'type_create',
  122. ObjectChangeActionChoices.ACTION_UPDATE: 'type_update',
  123. ObjectChangeActionChoices.ACTION_DELETE: 'type_delete',
  124. }[data['event']]
  125. content_type = data['content_type']
  126. # Cache applicable Event Rules
  127. if content_type not in events_cache[action_flag]:
  128. events_cache[action_flag][content_type] = EventRule.objects.filter(
  129. **{action_flag: True},
  130. content_types=content_type,
  131. enabled=True
  132. )
  133. event_rules = events_cache[action_flag][content_type]
  134. process_event_rules(
  135. event_rules, content_type.model, data['event'], data['data'], data['username'],
  136. snapshots=data['snapshots'], request_id=data['request_id']
  137. )
  138. def flush_events(queue):
  139. """
  140. Flush a list of object representation to RQ for webhook processing.
  141. """
  142. if queue:
  143. for name in settings.EVENTS_PIPELINE:
  144. try:
  145. func = import_string(name)
  146. func(queue)
  147. except Exception as e:
  148. logger.error(f"Cannot import events pipeline {name} error: {e}")