| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- import importlib
- import logging
- from django.contrib.contenttypes.models import ContentType
- from django.core.exceptions import ImproperlyConfigured, ValidationError
- from django.db.models.fields.reverse_related import ManyToManyRel
- from django.db.models.signals import m2m_changed, post_save, pre_delete
- from django.dispatch import receiver, Signal
- from django.utils.translation import gettext_lazy as _
- from django_prometheus.models import model_deletes, model_inserts, model_updates
- from core.choices import ObjectChangeActionChoices
- from core.events import *
- from core.models import ObjectChange, ObjectType
- from core.signals import job_end, job_start
- from extras.events import process_event_rules
- from extras.models import EventRule, Notification, Subscription
- from netbox.config import get_config
- from netbox.context import current_request, events_queue
- from netbox.models.features import ChangeLoggingMixin
- from netbox.registry import registry
- from netbox.signals import post_clean
- from utilities.exceptions import AbortRequest
- from .events import enqueue_event
- from .models import CustomField, TaggedItem
- from .validators import CustomValidator
- def run_validators(instance, validators):
- """
- Run the provided iterable of validators for the instance.
- """
- request = current_request.get()
- for validator in validators:
- # Loading a validator class by dotted path
- if type(validator) is str:
- module, cls = validator.rsplit('.', 1)
- validator = getattr(importlib.import_module(module), cls)()
- # Constructing a new instance on the fly from a ruleset
- elif type(validator) is dict:
- validator = CustomValidator(validator)
- elif not issubclass(validator.__class__, CustomValidator):
- raise ImproperlyConfigured(f"Invalid value for custom validator: {validator}")
- validator(instance, request)
- #
- # Change logging/webhooks
- #
- # Define a custom signal that can be sent to clear any queued events
- clear_events = Signal()
- @receiver((post_save, m2m_changed))
- def handle_changed_object(sender, instance, **kwargs):
- """
- Fires when an object is created or updated.
- """
- m2m_changed = False
- if not hasattr(instance, 'to_objectchange'):
- return
- # Get the current request, or bail if not set
- request = current_request.get()
- if request is None:
- return
- # Determine the type of change being made
- if kwargs.get('created'):
- event_type = OBJECT_CREATED
- elif 'created' in kwargs:
- event_type = OBJECT_UPDATED
- elif kwargs.get('action') in ['post_add', 'post_remove'] and kwargs['pk_set']:
- # m2m_changed with objects added or removed
- m2m_changed = True
- event_type = OBJECT_UPDATED
- else:
- return
- # Create/update an ObjectChange record for this change
- action = {
- OBJECT_CREATED: ObjectChangeActionChoices.ACTION_CREATE,
- OBJECT_UPDATED: ObjectChangeActionChoices.ACTION_UPDATE,
- OBJECT_DELETED: ObjectChangeActionChoices.ACTION_DELETE,
- }[event_type]
- objectchange = instance.to_objectchange(action)
- # If this is a many-to-many field change, check for a previous ObjectChange instance recorded
- # for this object by this request and update it
- if m2m_changed and (
- prev_change := ObjectChange.objects.filter(
- changed_object_type=ContentType.objects.get_for_model(instance),
- changed_object_id=instance.pk,
- request_id=request.id
- ).first()
- ):
- prev_change.postchange_data = objectchange.postchange_data
- prev_change.save()
- elif objectchange and objectchange.has_changes:
- objectchange.user = request.user
- objectchange.request_id = request.id
- objectchange.save()
- # Ensure that we're working with fresh M2M assignments
- if m2m_changed:
- instance.refresh_from_db()
- # Enqueue the object for event processing
- queue = events_queue.get()
- enqueue_event(queue, instance, request.user, request.id, event_type)
- events_queue.set(queue)
- # Increment metric counters
- if event_type == OBJECT_CREATED:
- model_inserts.labels(instance._meta.model_name).inc()
- elif event_type == OBJECT_UPDATED:
- model_updates.labels(instance._meta.model_name).inc()
- @receiver(pre_delete)
- def handle_deleted_object(sender, instance, **kwargs):
- """
- Fires when an object is deleted.
- """
- # Run any deletion protection rules for the object. Note that this must occur prior
- # to queueing any events for the object being deleted, in case a validation error is
- # raised, causing the deletion to fail.
- model_name = f'{sender._meta.app_label}.{sender._meta.model_name}'
- validators = get_config().PROTECTION_RULES.get(model_name, [])
- try:
- run_validators(instance, validators)
- except ValidationError as e:
- raise AbortRequest(
- _("Deletion is prevented by a protection rule: {message}").format(message=e)
- )
- # Get the current request, or bail if not set
- request = current_request.get()
- if request is None:
- return
- # Record an ObjectChange if applicable
- if hasattr(instance, 'to_objectchange'):
- if hasattr(instance, 'snapshot') and not getattr(instance, '_prechange_snapshot', None):
- instance.snapshot()
- objectchange = instance.to_objectchange(ObjectChangeActionChoices.ACTION_DELETE)
- objectchange.user = request.user
- objectchange.request_id = request.id
- objectchange.save()
- # Django does not automatically send an m2m_changed signal for the reverse direction of a
- # many-to-many relationship (see https://code.djangoproject.com/ticket/17688), so we need to
- # trigger one manually. We do this by checking for any reverse M2M relationships on the
- # instance being deleted, and explicitly call .remove() on the remote M2M field to delete
- # the association. This triggers an m2m_changed signal with the `post_remove` action type
- # for the forward direction of the relationship, ensuring that the change is recorded.
- for relation in instance._meta.related_objects:
- if type(relation) is not ManyToManyRel:
- continue
- related_model = relation.related_model
- related_field_name = relation.remote_field.name
- if not issubclass(related_model, ChangeLoggingMixin):
- # We only care about triggering the m2m_changed signal for models which support
- # change logging
- continue
- for obj in related_model.objects.filter(**{related_field_name: instance.pk}):
- obj.snapshot() # Ensure the change record includes the "before" state
- getattr(obj, related_field_name).remove(instance)
- # Enqueue the object for event processing
- queue = events_queue.get()
- enqueue_event(queue, instance, request.user, request.id, OBJECT_DELETED)
- events_queue.set(queue)
- # Increment metric counters
- model_deletes.labels(instance._meta.model_name).inc()
- @receiver(clear_events)
- def clear_events_queue(sender, **kwargs):
- """
- Delete any queued events (e.g. because of an aborted bulk transaction)
- """
- logger = logging.getLogger('events')
- logger.info(f"Clearing {len(events_queue.get())} queued events ({sender})")
- events_queue.set({})
- #
- # Custom fields
- #
- def handle_cf_added_obj_types(instance, action, pk_set, **kwargs):
- """
- Handle the population of default/null values when a CustomField is added to one or more ContentTypes.
- """
- if action == 'post_add':
- instance.populate_initial_data(ContentType.objects.filter(pk__in=pk_set))
- def handle_cf_removed_obj_types(instance, action, pk_set, **kwargs):
- """
- Handle the cleanup of old custom field data when a CustomField is removed from one or more ContentTypes.
- """
- if action == 'post_remove':
- instance.remove_stale_data(ContentType.objects.filter(pk__in=pk_set))
- def handle_cf_renamed(instance, created, **kwargs):
- """
- Handle the renaming of custom field data on objects when a CustomField is renamed.
- """
- if not created and instance.name != instance._name:
- instance.rename_object_data(old_name=instance._name, new_name=instance.name)
- def handle_cf_deleted(instance, **kwargs):
- """
- Handle the cleanup of old custom field data when a CustomField is deleted.
- """
- instance.remove_stale_data(instance.object_types.all())
- post_save.connect(handle_cf_renamed, sender=CustomField)
- pre_delete.connect(handle_cf_deleted, sender=CustomField)
- m2m_changed.connect(handle_cf_added_obj_types, sender=CustomField.object_types.through)
- m2m_changed.connect(handle_cf_removed_obj_types, sender=CustomField.object_types.through)
- #
- # Custom validation
- #
- @receiver(post_clean)
- def run_save_validators(sender, instance, **kwargs):
- """
- Run any custom validation rules for the model prior to calling save().
- """
- model_name = f'{sender._meta.app_label}.{sender._meta.model_name}'
- validators = get_config().CUSTOM_VALIDATORS.get(model_name, [])
- run_validators(instance, validators)
- #
- # Tags
- #
- @receiver(m2m_changed, sender=TaggedItem)
- def validate_assigned_tags(sender, instance, action, model, pk_set, **kwargs):
- """
- Validate that any Tags being assigned to the instance are not restricted to non-applicable object types.
- """
- if action != 'pre_add':
- return
- ct = ObjectType.objects.get_for_model(instance)
- # Retrieve any applied Tags that are restricted to certain object types
- for tag in model.objects.filter(pk__in=pk_set, object_types__isnull=False).prefetch_related('object_types'):
- if ct not in tag.object_types.all():
- raise AbortRequest(f"Tag {tag} cannot be assigned to {ct.model} objects.")
- #
- # Event rules
- #
- @receiver(job_start)
- def process_job_start_event_rules(sender, **kwargs):
- """
- Process event rules for jobs starting.
- """
- event_rules = EventRule.objects.filter(
- event_types__contains=[JOB_STARTED],
- enabled=True,
- object_types=sender.object_type
- )
- username = sender.user.username if sender.user else None
- process_event_rules(
- event_rules=event_rules,
- object_type=sender.object_type,
- event_type=JOB_STARTED,
- data=sender.data,
- username=username
- )
- @receiver(job_end)
- def process_job_end_event_rules(sender, **kwargs):
- """
- Process event rules for jobs terminating.
- """
- event_rules = EventRule.objects.filter(
- event_types__contains=[JOB_COMPLETED],
- enabled=True,
- object_types=sender.object_type
- )
- username = sender.user.username if sender.user else None
- process_event_rules(
- event_rules=event_rules,
- object_type=sender.object_type,
- event_type=JOB_COMPLETED,
- data=sender.data,
- username=username
- )
- #
- # Notifications
- #
- @receiver(post_save)
- def notify_object_changed(sender, instance, created, raw, **kwargs):
- if created or raw:
- return
- # Skip unsupported object types
- ct = ContentType.objects.get_for_model(instance)
- if ct.model not in registry['model_features']['notifications'].get(ct.app_label, []):
- return
- # Find all subscribed Users
- subscribed_users = Subscription.objects.filter(object_type=ct, object_id=instance.pk).values_list('user', flat=True)
- if not subscribed_users:
- return
- # Delete any existing Notifications for the object
- Notification.objects.filter(object_type=ct, object_id=instance.pk, user__in=subscribed_users).delete()
- # Create Notifications for Subscribers
- Notification.objects.bulk_create([
- Notification(user_id=user, object=instance, event_type=OBJECT_UPDATED)
- for user in subscribed_users
- ])
|