events.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. import logging
  2. from collections import UserDict, defaultdict
  3. from django.conf import settings
  4. from django.utils import timezone
  5. from django.utils.module_loading import import_string
  6. from django.utils.translation import gettext as _
  7. from django_rq import get_queue
  8. from core.events import *
  9. from core.models import ObjectType
  10. from netbox.config import get_config
  11. from netbox.constants import RQ_QUEUE_DEFAULT
  12. from netbox.models.features import has_feature
  13. from utilities.api import get_serializer_for_model
  14. from utilities.request import copy_safe_request
  15. from utilities.rqworker import get_rq_retry
  16. from utilities.serialization import serialize_object
  17. from .choices import EventRuleActionChoices
  18. from .models import EventRule
  19. logger = logging.getLogger('netbox.events_processor')
  20. class EventContext(UserDict):
  21. """
  22. Dictionary-compatible wrapper for queued events that lazily serializes
  23. ``event['data']`` on first access.
  24. Backward-compatible with the plain-dict interface expected by existing
  25. EVENTS_PIPELINE consumers. When the same object is enqueued more than once
  26. in a single request, the serialization source is updated so consumers see
  27. the latest state.
  28. """
  29. def __init__(self, *args, **kwargs):
  30. super().__init__(*args, **kwargs)
  31. # Track which model instance should be serialized if/when `data` is
  32. # requested. This may be refreshed on duplicate enqueue, while leaving
  33. # the public `object` entry untouched for compatibility.
  34. self._serialization_source = None
  35. if 'object' in self:
  36. self._serialization_source = super().__getitem__('object')
  37. def refresh_serialization_source(self, instance):
  38. """
  39. Point lazy serialization at a fresher instance, invalidating any
  40. already-materialized ``data``.
  41. """
  42. self._serialization_source = instance
  43. # UserDict.__contains__ checks the backing dict directly, so `in`
  44. # does not trigger __getitem__'s lazy serialization.
  45. if 'data' in self:
  46. del self['data']
  47. def freeze_data(self, instance):
  48. """
  49. Eagerly serialize and cache the payload for delete events, where the
  50. object may become inaccessible after deletion.
  51. """
  52. super().__setitem__('data', serialize_for_event(instance))
  53. self._serialization_source = None
  54. def __getitem__(self, item):
  55. if item == 'data' and 'data' not in self:
  56. # Materialize the payload only when an event consumer asks for it.
  57. #
  58. # On coalesced events, use the latest explicitly queued instance so
  59. # webhooks/scripts/notifications observe the final queued state for
  60. # that object within the request.
  61. source = self._serialization_source or super().__getitem__('object')
  62. super().__setitem__('data', serialize_for_event(source))
  63. return super().__getitem__(item)
  64. def serialize_for_event(instance):
  65. """
  66. Return a serialized representation of the given instance suitable for use in a queued event.
  67. """
  68. serializer_class = get_serializer_for_model(instance.__class__)
  69. serializer_context = {
  70. 'request': None,
  71. }
  72. serializer = serializer_class(instance, context=serializer_context)
  73. return serializer.data
  74. def get_snapshots(instance, event_type):
  75. """
  76. Return a dictionary of pre- and post-change snapshots for the given instance.
  77. """
  78. if event_type == OBJECT_DELETED:
  79. # Post-change snapshot must be empty for deleted objects
  80. postchange_snapshot = None
  81. elif hasattr(instance, '_postchange_snapshot'):
  82. # Use the cached post-change snapshot if one is available
  83. postchange_snapshot = instance._postchange_snapshot
  84. elif hasattr(instance, 'serialize_object'):
  85. # Use model's serialize_object() method if defined
  86. postchange_snapshot = instance.serialize_object()
  87. else:
  88. # Fall back to the serialize_object() utility function
  89. postchange_snapshot = serialize_object(instance)
  90. return {
  91. 'prechange': getattr(instance, '_prechange_snapshot', None),
  92. 'postchange': postchange_snapshot,
  93. }
  94. def enqueue_event(queue, instance, request, event_type):
  95. """
  96. Enqueue (or coalesce) an event for a created/updated/deleted object.
  97. Events are processed after the request completes.
  98. """
  99. # Bail if this type of object does not support event rules
  100. if not has_feature(instance, 'event_rules'):
  101. return
  102. app_label = instance._meta.app_label
  103. model_name = instance._meta.model_name
  104. assert instance.pk is not None
  105. key = f'{app_label}.{model_name}:{instance.pk}'
  106. if key in queue:
  107. queue[key]['snapshots']['postchange'] = get_snapshots(instance, event_type)['postchange']
  108. # If the object is being deleted, convert any prior update event into a
  109. # delete event and freeze the payload before the object (or related
  110. # rows) become inaccessible.
  111. if event_type == OBJECT_DELETED:
  112. queue[key]['event_type'] = event_type
  113. else:
  114. # Keep the public `object` entry stable for compatibility.
  115. queue[key].refresh_serialization_source(instance)
  116. else:
  117. queue[key] = EventContext(
  118. object_type=ObjectType.objects.get_for_model(instance),
  119. object_id=instance.pk,
  120. object=instance,
  121. event_type=event_type,
  122. snapshots=get_snapshots(instance, event_type),
  123. request=request,
  124. user=request.user,
  125. # Legacy request attributes for backward compatibility
  126. username=request.user.username, # DEPRECATED, will be removed in NetBox v4.7.0
  127. request_id=request.id, # DEPRECATED, will be removed in NetBox v4.7.0
  128. )
  129. # For delete events, eagerly serialize the payload before the row is gone.
  130. # This covers both first-time enqueues and coalesced update→delete promotions.
  131. if event_type == OBJECT_DELETED:
  132. queue[key].freeze_data(instance)
  133. def process_event_rules(event_rules, object_type, event):
  134. """
  135. Process a list of EventRules against an event.
  136. Notes on event sources:
  137. - Object change events (created/updated/deleted) are enqueued via
  138. enqueue_event() during an HTTP request.
  139. These events include a request object and legacy request
  140. attributes (e.g. username, request_id) for backward compatibility.
  141. - Job lifecycle events (JOB_STARTED/JOB_COMPLETED) are emitted by
  142. job_start/job_end signal handlers and may not include a request
  143. context.
  144. Consumers must not assume that fields like `username` are always
  145. present.
  146. """
  147. for event_rule in event_rules:
  148. # Evaluate event rule conditions (if any)
  149. if not event_rule.eval_conditions(event['data']):
  150. continue
  151. # Guard against action_data that is valid JSON but not a dict
  152. # (e.g. a bare string or number). Existing rows with bad data are
  153. # tolerated at runtime; validation on EventRule.clean() prevents
  154. # new ones.
  155. if event_rule.action_data is None:
  156. action_data = {}
  157. elif isinstance(event_rule.action_data, dict):
  158. action_data = event_rule.action_data
  159. else:
  160. logger.warning(
  161. _('Ignoring invalid action_data on event rule "{rule}" (got {data_type})').format(
  162. rule=event_rule,
  163. data_type=type(event_rule.action_data).__name__,
  164. )
  165. )
  166. action_data = {}
  167. # Merge rule-specific action_data with the event payload.
  168. # Copy to avoid mutating the rule's stored action_data dict.
  169. event_data = {**action_data, **event['data']}
  170. # Webhooks
  171. if event_rule.action_type == EventRuleActionChoices.WEBHOOK:
  172. # Select the appropriate RQ queue
  173. queue_name = get_config().QUEUE_MAPPINGS.get('webhook', RQ_QUEUE_DEFAULT)
  174. rq_queue = get_queue(queue_name)
  175. # For job lifecycle events, `username` may be absent because
  176. # there is no request context.
  177. # Prefer the associated user object when present, falling
  178. # back to the legacy username attribute.
  179. username = getattr(event.get('user'), 'username', None) or event.get('username')
  180. # Compile the task parameters
  181. params = {
  182. 'event_rule': event_rule,
  183. 'object_type': object_type,
  184. 'event_type': event['event_type'],
  185. 'data': event_data,
  186. 'snapshots': event.get('snapshots'),
  187. 'timestamp': timezone.now().isoformat(),
  188. 'username': username,
  189. 'retry': get_rq_retry(),
  190. }
  191. if 'request' in event:
  192. # Exclude FILES - webhooks don't need uploaded files,
  193. # which can cause pickle errors with Pillow.
  194. params['request'] = copy_safe_request(event['request'], include_files=False)
  195. # Enqueue the task
  196. rq_queue.enqueue('extras.webhooks.send_webhook', **params)
  197. # Scripts
  198. elif event_rule.action_type == EventRuleActionChoices.SCRIPT:
  199. # Resolve the script from action parameters
  200. script = event_rule.action_object.python_class()
  201. # Enqueue a Job to record the script's execution
  202. from extras.jobs import ScriptJob
  203. params = {
  204. 'instance': event_rule.action_object,
  205. 'name': script.name,
  206. 'user': event['user'],
  207. 'data': event_data,
  208. }
  209. if 'snapshots' in event:
  210. params['snapshots'] = event['snapshots']
  211. if 'request' in event:
  212. params['request'] = copy_safe_request(event['request'])
  213. # Enqueue the job
  214. ScriptJob.enqueue(**params)
  215. # Notification groups
  216. elif event_rule.action_type == EventRuleActionChoices.NOTIFICATION:
  217. # Bulk-create notifications for all members of the notification group
  218. event_rule.action_object.notify(
  219. object_type=object_type,
  220. object_id=event_data['id'],
  221. object_repr=event_data.get('display'),
  222. event_type=event['event_type'],
  223. )
  224. else:
  225. raise ValueError(_("Unknown action type for an event rule: {action_type}").format(
  226. action_type=event_rule.action_type
  227. ))
  228. def process_event_queue(events):
  229. """
  230. Flush a list of object representation to RQ for EventRule processing.
  231. This is the default processor listed in EVENTS_PIPELINE.
  232. """
  233. events_cache = defaultdict(dict)
  234. for event in events:
  235. event_type = event['event_type']
  236. object_type = event['object_type']
  237. # Cache applicable Event Rules
  238. if object_type not in events_cache[event_type]:
  239. events_cache[event_type][object_type] = EventRule.objects.filter(
  240. event_types__contains=[event['event_type']],
  241. object_types=object_type,
  242. enabled=True
  243. )
  244. event_rules = events_cache[event_type][object_type]
  245. process_event_rules(
  246. event_rules=event_rules,
  247. object_type=object_type,
  248. event=event,
  249. )
  250. def flush_events(events):
  251. """
  252. Flush a list of object representations to RQ for event processing.
  253. """
  254. if events:
  255. for name in settings.EVENTS_PIPELINE:
  256. try:
  257. func = import_string(name)
  258. func(events)
  259. except ImportError as e:
  260. logger.error(_("Cannot import events pipeline {name} error: {error}").format(name=name, error=e))