signals.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. import random
  2. from datetime import timedelta
  3. from cacheops.signals import cache_invalidated, cache_read
  4. from django.conf import settings
  5. from django.contrib.contenttypes.models import ContentType
  6. from django.db import DEFAULT_DB_ALIAS
  7. from django.db.models.signals import m2m_changed, post_save, pre_delete
  8. from django.utils import timezone
  9. from django_prometheus.models import model_deletes, model_inserts, model_updates
  10. from prometheus_client import Counter
  11. from .choices import ObjectChangeActionChoices
  12. from .models import CustomField, ObjectChange
  13. from .webhooks import enqueue_object, get_snapshots, serialize_for_webhook
  14. #
  15. # Change logging/webhooks
  16. #
  17. def _handle_changed_object(request, webhook_queue, sender, instance, **kwargs):
  18. """
  19. Fires when an object is created or updated.
  20. """
  21. def is_same_object(instance, webhook_data):
  22. return (
  23. ContentType.objects.get_for_model(instance) == webhook_data['content_type'] and
  24. instance.pk == webhook_data['object_id'] and
  25. request.id == webhook_data['request_id']
  26. )
  27. if not hasattr(instance, 'to_objectchange'):
  28. return
  29. m2m_changed = False
  30. # Determine the type of change being made
  31. if kwargs.get('created'):
  32. action = ObjectChangeActionChoices.ACTION_CREATE
  33. elif 'created' in kwargs:
  34. action = ObjectChangeActionChoices.ACTION_UPDATE
  35. elif kwargs.get('action') in ['post_add', 'post_remove'] and kwargs['pk_set']:
  36. # m2m_changed with objects added or removed
  37. m2m_changed = True
  38. action = ObjectChangeActionChoices.ACTION_UPDATE
  39. else:
  40. return
  41. # Record an ObjectChange if applicable
  42. if hasattr(instance, 'to_objectchange'):
  43. if m2m_changed:
  44. ObjectChange.objects.filter(
  45. changed_object_type=ContentType.objects.get_for_model(instance),
  46. changed_object_id=instance.pk,
  47. request_id=request.id
  48. ).update(
  49. postchange_data=instance.to_objectchange(action).postchange_data
  50. )
  51. else:
  52. objectchange = instance.to_objectchange(action)
  53. objectchange.user = request.user
  54. objectchange.request_id = request.id
  55. objectchange.save()
  56. # If this is an M2M change, update the previously queued webhook (from post_save)
  57. if m2m_changed and webhook_queue and is_same_object(instance, webhook_queue[-1]):
  58. instance.refresh_from_db() # Ensure that we're working with fresh M2M assignments
  59. webhook_queue[-1]['data'] = serialize_for_webhook(instance)
  60. webhook_queue[-1]['snapshots']['postchange'] = get_snapshots(instance, action)['postchange']
  61. else:
  62. enqueue_object(webhook_queue, instance, request.user, request.id, action)
  63. # Increment metric counters
  64. if action == ObjectChangeActionChoices.ACTION_CREATE:
  65. model_inserts.labels(instance._meta.model_name).inc()
  66. elif action == ObjectChangeActionChoices.ACTION_UPDATE:
  67. model_updates.labels(instance._meta.model_name).inc()
  68. # Housekeeping: 0.1% chance of clearing out expired ObjectChanges
  69. if settings.CHANGELOG_RETENTION and random.randint(1, 1000) == 1:
  70. cutoff = timezone.now() - timedelta(days=settings.CHANGELOG_RETENTION)
  71. ObjectChange.objects.filter(time__lt=cutoff)._raw_delete(using=DEFAULT_DB_ALIAS)
  72. def _handle_deleted_object(request, webhook_queue, sender, instance, **kwargs):
  73. """
  74. Fires when an object is deleted.
  75. """
  76. if not hasattr(instance, 'to_objectchange'):
  77. return
  78. # Record an ObjectChange if applicable
  79. if hasattr(instance, 'to_objectchange'):
  80. objectchange = instance.to_objectchange(ObjectChangeActionChoices.ACTION_DELETE)
  81. objectchange.user = request.user
  82. objectchange.request_id = request.id
  83. objectchange.save()
  84. # Enqueue webhooks
  85. enqueue_object(webhook_queue, instance, request.user, request.id, ObjectChangeActionChoices.ACTION_DELETE)
  86. # Increment metric counters
  87. model_deletes.labels(instance._meta.model_name).inc()
  88. #
  89. # Custom fields
  90. #
  91. def handle_cf_removed_obj_types(instance, action, pk_set, **kwargs):
  92. """
  93. Handle the cleanup of old custom field data when a CustomField is removed from one or more ContentTypes.
  94. """
  95. if action == 'post_remove':
  96. instance.remove_stale_data(ContentType.objects.filter(pk__in=pk_set))
  97. def handle_cf_renamed(instance, created, **kwargs):
  98. """
  99. Handle the renaming of custom field data on objects when a CustomField is renamed.
  100. """
  101. if not created and instance.name != instance._name:
  102. instance.rename_object_data(old_name=instance._name, new_name=instance.name)
  103. def handle_cf_deleted(instance, **kwargs):
  104. """
  105. Handle the cleanup of old custom field data when a CustomField is deleted.
  106. """
  107. instance.remove_stale_data(instance.content_types.all())
  108. m2m_changed.connect(handle_cf_removed_obj_types, sender=CustomField.content_types.through)
  109. post_save.connect(handle_cf_renamed, sender=CustomField)
  110. pre_delete.connect(handle_cf_deleted, sender=CustomField)
  111. #
  112. # Caching
  113. #
  114. cacheops_cache_hit = Counter('cacheops_cache_hit', 'Number of cache hits')
  115. cacheops_cache_miss = Counter('cacheops_cache_miss', 'Number of cache misses')
  116. cacheops_cache_invalidated = Counter('cacheops_cache_invalidated', 'Number of cache invalidations')
  117. def cache_read_collector(sender, func, hit, **kwargs):
  118. if hit:
  119. cacheops_cache_hit.inc()
  120. else:
  121. cacheops_cache_miss.inc()
  122. def cache_invalidated_collector(sender, obj_dict, **kwargs):
  123. cacheops_cache_invalidated.inc()
  124. cache_read.connect(cache_read_collector)
  125. cache_invalidated.connect(cache_invalidated_collector)