lookups.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. from django.contrib.postgres.fields import ArrayField
  2. from django.contrib.postgres.fields.ranges import RangeField
  3. from django.db.models import CharField, JSONField, Lookup
  4. from django.db.models.fields.json import KeyTextTransform
  5. from .fields import CachedValueField
  6. class RangeContains(Lookup):
  7. """
  8. Filter ArrayField(RangeField) columns where ANY element-range contains the scalar RHS.
  9. Usage (ORM):
  10. Model.objects.filter(<range_array_field>__range_contains=<scalar>)
  11. Works with int4range[], int8range[], daterange[], tstzrange[], etc.
  12. """
  13. lookup_name = 'range_contains'
  14. def as_sql(self, compiler, connection):
  15. # Compile LHS (the array-of-ranges column/expression) and RHS (scalar)
  16. lhs, lhs_params = self.process_lhs(compiler, connection)
  17. rhs, rhs_params = self.process_rhs(compiler, connection)
  18. # Guard: only allow ArrayField whose base_field is a PostgreSQL RangeField
  19. field = getattr(self.lhs, 'output_field', None)
  20. if not (isinstance(field, ArrayField) and isinstance(field.base_field, RangeField)):
  21. raise TypeError('range_contains is only valid for ArrayField(RangeField) columns')
  22. # Range-contains-element using EXISTS + UNNEST keeps the range on the LHS: r @> value
  23. sql = f"EXISTS (SELECT 1 FROM unnest({lhs}) AS r WHERE r @> {rhs})"
  24. params = lhs_params + rhs_params
  25. return sql, params
  26. class Empty(Lookup):
  27. """
  28. Filter on whether a string is empty.
  29. """
  30. lookup_name = 'empty'
  31. prepare_rhs = False
  32. def as_sql(self, compiler, connection):
  33. sql, params = compiler.compile(self.lhs)
  34. if self.rhs:
  35. return f"CAST(LENGTH({sql}) AS BOOLEAN) IS NOT TRUE", params
  36. return f"CAST(LENGTH({sql}) AS BOOLEAN) IS TRUE", params
  37. class JSONEmpty(Lookup):
  38. """
  39. Support "empty" lookups for JSONField keys.
  40. A key is considered empty if it is "", null, or does not exist.
  41. """
  42. lookup_name = 'empty'
  43. def as_sql(self, compiler, connection):
  44. # self.lhs.lhs is the parent expression (could be a JSONField or another KeyTransform)
  45. # Rebuild the expression using KeyTextTransform to guarantee ->> (text)
  46. text_expr = KeyTextTransform(self.lhs.key_name, self.lhs.lhs)
  47. lhs_sql, lhs_params = compiler.compile(text_expr)
  48. value = self.rhs
  49. if value not in (True, False):
  50. raise ValueError("The 'empty' lookup only accepts True or False.")
  51. condition = '' if value else 'NOT '
  52. sql = f"(NULLIF({lhs_sql}, '') IS {condition}NULL)"
  53. return sql, lhs_params
  54. class NetHost(Lookup):
  55. """
  56. Similar to ipam.lookups.NetHost, but casts the field to INET.
  57. """
  58. lookup_name = 'net_host'
  59. def as_sql(self, qn, connection):
  60. lhs, lhs_params = self.process_lhs(qn, connection)
  61. rhs, rhs_params = self.process_rhs(qn, connection)
  62. params = lhs_params + rhs_params
  63. return f'HOST(CAST({lhs} AS INET)) = HOST({rhs})', params
  64. class NetContainsOrEquals(Lookup):
  65. """
  66. Similar to ipam.lookups.NetContainsOrEquals, but casts the field to INET.
  67. """
  68. lookup_name = 'net_contains_or_equals'
  69. def as_sql(self, qn, connection):
  70. lhs, lhs_params = self.process_lhs(qn, connection)
  71. rhs, rhs_params = self.process_rhs(qn, connection)
  72. params = lhs_params + rhs_params
  73. return f'CAST({lhs} AS INET) >>= {rhs}', params
  74. ArrayField.register_lookup(RangeContains)
  75. CharField.register_lookup(Empty)
  76. JSONField.register_lookup(JSONEmpty)
  77. CachedValueField.register_lookup(NetHost)
  78. CachedValueField.register_lookup(NetContainsOrEquals)