2
0

events.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. import logging
  2. from collections import UserDict, defaultdict
  3. from django.conf import settings
  4. from django.utils import timezone
  5. from django.utils.module_loading import import_string
  6. from django.utils.translation import gettext as _
  7. from django_rq import get_queue
  8. from core.events import *
  9. from core.models import ObjectType
  10. from netbox.config import get_config
  11. from netbox.constants import RQ_QUEUE_DEFAULT
  12. from netbox.models.features import has_feature
  13. from utilities.api import get_serializer_for_model
  14. from utilities.request import copy_safe_request
  15. from utilities.rqworker import get_rq_retry
  16. from utilities.serialization import serialize_object
  17. from .choices import EventRuleActionChoices
  18. from .models import EventRule
  19. logger = logging.getLogger('netbox.events_processor')
  20. class EventContext(UserDict):
  21. """
  22. A custom dictionary that automatically serializes its associated object on demand.
  23. """
  24. # We're emulating a dictionary here (rather than using a custom class) because prior to NetBox v4.5.2, events were
  25. # queued as dictionaries for processing by handles in EVENTS_PIPELINE. We need to avoid introducing any breaking
  26. # changes until a suitable minor release.
  27. def __getitem__(self, item):
  28. if item == 'data' and 'data' not in self:
  29. data = serialize_for_event(self['object'])
  30. self.__setitem__('data', data)
  31. return super().__getitem__(item)
  32. def serialize_for_event(instance):
  33. """
  34. Return a serialized representation of the given instance suitable for use in a queued event.
  35. """
  36. serializer_class = get_serializer_for_model(instance.__class__)
  37. serializer_context = {
  38. 'request': None,
  39. }
  40. serializer = serializer_class(instance, context=serializer_context)
  41. return serializer.data
  42. def get_snapshots(instance, event_type):
  43. """
  44. Return a dictionary of pre- and post-change snapshots for the given instance.
  45. """
  46. if event_type == OBJECT_DELETED:
  47. # Post-change snapshot must be empty for deleted objects
  48. postchange_snapshot = None
  49. elif hasattr(instance, '_postchange_snapshot'):
  50. # Use the cached post-change snapshot if one is available
  51. postchange_snapshot = instance._postchange_snapshot
  52. elif hasattr(instance, 'serialize_object'):
  53. # Use model's serialize_object() method if defined
  54. postchange_snapshot = instance.serialize_object()
  55. else:
  56. # Fall back to the serialize_object() utility function
  57. postchange_snapshot = serialize_object(instance)
  58. return {
  59. 'prechange': getattr(instance, '_prechange_snapshot', None),
  60. 'postchange': postchange_snapshot,
  61. }
  62. def enqueue_event(queue, instance, request, event_type):
  63. """
  64. Enqueue a serialized representation of a created/updated/deleted object for the processing of
  65. events once the request has completed.
  66. """
  67. # Bail if this type of object does not support event rules
  68. if not has_feature(instance, 'event_rules'):
  69. return
  70. app_label = instance._meta.app_label
  71. model_name = instance._meta.model_name
  72. assert instance.pk is not None
  73. key = f'{app_label}.{model_name}:{instance.pk}'
  74. if key in queue:
  75. queue[key]['snapshots']['postchange'] = get_snapshots(instance, event_type)['postchange']
  76. # If the object is being deleted, update any prior "update" event to "delete"
  77. if event_type == OBJECT_DELETED:
  78. queue[key]['event_type'] = event_type
  79. else:
  80. queue[key] = EventContext(
  81. object_type=ObjectType.objects.get_for_model(instance),
  82. object_id=instance.pk,
  83. object=instance,
  84. event_type=event_type,
  85. snapshots=get_snapshots(instance, event_type),
  86. request=request,
  87. user=request.user,
  88. # Legacy request attributes for backward compatibility
  89. username=request.user.username,
  90. request_id=request.id,
  91. )
  92. # Force serialization of objects prior to them actually being deleted
  93. if event_type == OBJECT_DELETED:
  94. queue[key]['data'] = serialize_for_event(instance)
  95. def process_event_rules(event_rules, object_type, event):
  96. """
  97. Process a list of EventRules against an event.
  98. Notes on event sources:
  99. - Object change events (created/updated/deleted) are enqueued via
  100. enqueue_event() during an HTTP request.
  101. These events include a request object and legacy request
  102. attributes (e.g. username, request_id) for backward compatibility.
  103. - Job lifecycle events (JOB_STARTED/JOB_COMPLETED) are emitted by
  104. job_start/job_end signal handlers and may not include a request
  105. context.
  106. Consumers must not assume that fields like `username` are always
  107. present.
  108. """
  109. for event_rule in event_rules:
  110. # Evaluate event rule conditions (if any)
  111. if not event_rule.eval_conditions(event['data']):
  112. continue
  113. # Compile event data
  114. event_data = event_rule.action_data or {}
  115. event_data.update(event['data'])
  116. # Webhooks
  117. if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
  118. # Select the appropriate RQ queue
  119. queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT)
  120. rq_queue = get_queue(queue_name)
  121. # For job lifecycle events, `username` may be absent because
  122. # there is no request context.
  123. # Prefer the associated user object when present, falling
  124. # back to the legacy username attribute.
  125. username = getattr(event.get('user'), 'username', None) or event.get('username')
  126. # Compile the task parameters
  127. params = {
  128. 'event_rule': event_rule,
  129. 'object_type': object_type,
  130. 'event_type': event['event_type'],
  131. 'data': event_data,
  132. 'snapshots': event.get('snapshots'),
  133. 'timestamp': timezone.now().isoformat(),
  134. 'username': username,
  135. 'retry': get_rq_retry(),
  136. }
  137. if 'request' in event:
  138. # Exclude FILES - webhooks don't need uploaded files,
  139. # which can cause pickle errors with Pillow.
  140. params['request'] = copy_safe_request(event['request'], include_files=False)
  141. # Enqueue the task
  142. rq_queue.enqueue('extras.webhooks.send_webhook', **params)
  143. # Scripts
  144. elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
  145. # Resolve the script from action parameters
  146. script = event_rule.action_object.python_class()
  147. # Enqueue a Job to record the script's execution
  148. from extras.jobs import ScriptJob
  149. params = {
  150. 'instance': event_rule.action_object,
  151. 'name': script.name,
  152. 'user': event['user'],
  153. 'data': event_data,
  154. }
  155. if 'snapshots' in event:
  156. params['snapshots'] = event['snapshots']
  157. if 'request' in event:
  158. params['request'] = copy_safe_request(event['request'])
  159. # Enqueue the job
  160. ScriptJob.enqueue(**params)
  161. # Notification groups
  162. elif event_rule.action_type == EventRuleActionChoices.NOTIFICATION:
  163. # Bulk-create notifications for all members of the notification group
  164. event_rule.action_object.notify(
  165. object_type=object_type,
  166. object_id=event_data['id'],
  167. object_repr=event_data.get('display'),
  168. event_type=event['event_type'],
  169. )
  170. else:
  171. raise ValueError(_("Unknown action type for an event rule: {action_type}").format(
  172. action_type=event_rule.action_type
  173. ))
  174. def process_event_queue(events):
  175. """
  176. Flush a list of object representation to RQ for EventRule processing.
  177. This is the default processor listed in EVENTS_PIPELINE.
  178. """
  179. events_cache = defaultdict(dict)
  180. for event in events:
  181. event_type = event['event_type']
  182. object_type = event['object_type']
  183. # Cache applicable Event Rules
  184. if object_type not in events_cache[event_type]:
  185. events_cache[event_type][object_type] = EventRule.objects.filter(
  186. event_types__contains=[event['event_type']],
  187. object_types=object_type,
  188. enabled=True
  189. )
  190. event_rules = events_cache[event_type][object_type]
  191. process_event_rules(
  192. event_rules=event_rules,
  193. object_type=object_type,
  194. event=event,
  195. )
  196. def flush_events(events):
  197. """
  198. Flush a list of object representations to RQ for event processing.
  199. """
  200. if events:
  201. for name in settings.EVENTS_PIPELINE:
  202. try:
  203. func = import_string(name)
  204. func(events)
  205. except ImportError as e:
  206. logger.error(_("Cannot import events pipeline {name} error: {error}").format(name=name, error=e))