counters.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. from django.apps import apps
  2. from django.db.models import F, Count, OuterRef, Subquery
  3. from django.db.models.signals import post_delete, post_save, pre_delete
  4. from netbox.registry import registry
  5. from .fields import CounterCacheField
  6. def get_counters_for_model(model):
  7. """
  8. Return field mappings for all counters registered to the given model.
  9. """
  10. return registry['counter_fields'][model].items()
  11. def update_counter(model, pk, counter_name, value):
  12. """
  13. Increment or decrement a counter field on an object identified by its model and primary key (PK). Positive values
  14. will increment; negative values will decrement.
  15. """
  16. model.objects.filter(pk=pk).update(
  17. **{counter_name: F(counter_name) + value}
  18. )
  19. def update_counts(model, field_name, related_query):
  20. """
  21. Perform a bulk update for the given model and counter field. For example,
  22. update_counts(Device, '_interface_count', 'interfaces')
  23. will effectively set
  24. Device.objects.update(_interface_count=Count('interfaces'))
  25. """
  26. subquery = Subquery(
  27. model.objects.filter(pk=OuterRef('pk')).annotate(_count=Count(related_query)).values('_count')
  28. )
  29. return model.objects.update(**{
  30. field_name: subquery
  31. })
  32. #
  33. # Signal handlers
  34. #
  35. def post_save_receiver(sender, instance, created, **kwargs):
  36. """
  37. Update counter fields on related objects when a TrackingModelMixin subclass is created or modified.
  38. """
  39. for field_name, counter_name in get_counters_for_model(sender):
  40. parent_model = sender._meta.get_field(field_name).related_model
  41. new_pk = getattr(instance, field_name, None)
  42. has_old_field = field_name in instance.tracker
  43. old_pk = instance.tracker.get(field_name) if has_old_field else None
  44. # Update the counters on the old and/or new parents as needed
  45. if old_pk is not None:
  46. update_counter(parent_model, old_pk, counter_name, -1)
  47. if new_pk is not None and (has_old_field or created):
  48. update_counter(parent_model, new_pk, counter_name, 1)
  49. def pre_delete_receiver(sender, instance, origin, **kwargs):
  50. model = instance._meta.model
  51. if not model.objects.filter(pk=instance.pk).exists():
  52. instance._previously_removed = True
  53. def post_delete_receiver(sender, instance, origin, **kwargs):
  54. """
  55. Update counter fields on related objects when a TrackingModelMixin subclass is deleted.
  56. """
  57. for field_name, counter_name in get_counters_for_model(sender):
  58. parent_model = sender._meta.get_field(field_name).related_model
  59. parent_pk = getattr(instance, field_name, None)
  60. # Decrement the parent's counter by one
  61. if parent_pk is not None and not hasattr(instance, '_previously_removed'):
  62. update_counter(parent_model, parent_pk, counter_name, -1)
  63. #
  64. # Registration
  65. #
  66. def connect_counters(*models):
  67. """
  68. Register counter fields and connect signal handlers for their child models.
  69. Ensures exactly one receiver per child (sender), even when multiple counters
  70. reference the same sender (e.g., Device).
  71. """
  72. connected = set() # child models we've already connected
  73. for model in models:
  74. # Find all CounterCacheFields on the model
  75. counter_fields = [field for field in model._meta.get_fields() if isinstance(field, CounterCacheField)]
  76. for field in counter_fields:
  77. to_model = apps.get_model(field.to_model_name)
  78. # Register the counter in the registry
  79. change_tracking_fields = registry['counter_fields'][to_model]
  80. change_tracking_fields[f'{field.to_field_name}_id'] = field.name
  81. # Connect signals once per child model
  82. if to_model in connected:
  83. continue
  84. # Ensure dispatch_uid is unique per model (sender), not per field
  85. uid_base = f'countercache.{to_model._meta.label_lower}'
  86. # Connect the post_save and post_delete handlers
  87. post_save.connect(
  88. post_save_receiver,
  89. sender=to_model,
  90. weak=False,
  91. dispatch_uid=f'{uid_base}.post_save',
  92. )
  93. pre_delete.connect(
  94. pre_delete_receiver,
  95. sender=to_model,
  96. weak=False,
  97. dispatch_uid=f'{uid_base}.pre_delete',
  98. )
  99. post_delete.connect(
  100. post_delete_receiver,
  101. sender=to_model,
  102. weak=False,
  103. dispatch_uid=f'{uid_base}.post_delete',
  104. )
  105. connected.add(to_model)