conditions.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. import functools
  2. import operator
  3. import re
  4. from django.utils.translation import gettext as _
  5. __all__ = (
  6. 'Condition',
  7. 'ConditionSet',
  8. 'InvalidCondition',
  9. )
  10. AND = 'and'
  11. OR = 'or'
  12. def is_ruleset(data):
  13. """
  14. Determine whether the given dictionary looks like a rule set.
  15. """
  16. return type(data) is dict and len(data) == 1 and list(data.keys())[0] in (AND, OR)
  17. class InvalidCondition(Exception):
  18. pass
  19. class Condition:
  20. """
  21. An individual conditional rule that evaluates a single attribute and its value.
  22. :param attr: The name of the attribute being evaluated
  23. :param value: The value being compared
  24. :param op: The logical operation to use when evaluating the value (default: 'eq')
  25. """
  26. EQ = 'eq'
  27. GT = 'gt'
  28. GTE = 'gte'
  29. LT = 'lt'
  30. LTE = 'lte'
  31. IN = 'in'
  32. CONTAINS = 'contains'
  33. REGEX = 'regex'
  34. OPERATORS = (
  35. EQ, GT, GTE, LT, LTE, IN, CONTAINS, REGEX
  36. )
  37. TYPES = {
  38. str: (EQ, CONTAINS, REGEX),
  39. bool: (EQ, CONTAINS),
  40. int: (EQ, GT, GTE, LT, LTE, CONTAINS),
  41. float: (EQ, GT, GTE, LT, LTE, CONTAINS),
  42. list: (EQ, IN, CONTAINS),
  43. type(None): (EQ,)
  44. }
  45. def __init__(self, attr, value, op=EQ, negate=False):
  46. if op not in self.OPERATORS:
  47. raise ValueError(_("Unknown operator: {op}. Must be one of: {operators}").format(
  48. op=op, operators=', '.join(self.OPERATORS)
  49. ))
  50. if type(value) not in self.TYPES:
  51. raise ValueError(_("Unsupported value type: {value}").format(value=type(value)))
  52. if op not in self.TYPES[type(value)]:
  53. raise ValueError(_("Invalid type for {op} operation: {value}").format(op=op, value=type(value)))
  54. self.attr = attr
  55. self.value = value
  56. self.op = op
  57. self.eval_func = getattr(self, f'eval_{op}')
  58. self.negate = negate
  59. def eval(self, data):
  60. """
  61. Evaluate the provided data to determine whether it matches the condition.
  62. """
  63. def _get(obj, key):
  64. if isinstance(obj, list):
  65. return [operator.getitem(item or {}, key) for item in obj]
  66. return operator.getitem(obj or {}, key)
  67. try:
  68. value = functools.reduce(_get, self.attr.split('.'), data)
  69. except KeyError:
  70. raise InvalidCondition(f"Invalid key path: {self.attr}")
  71. try:
  72. result = self.eval_func(value)
  73. except TypeError as e:
  74. raise InvalidCondition(f"Invalid data type at '{self.attr}' for '{self.op}' evaluation: {e}")
  75. if self.negate:
  76. return not result
  77. return result
  78. # Equivalency
  79. def eval_eq(self, value):
  80. return value == self.value
  81. def eval_neq(self, value):
  82. return value != self.value
  83. # Numeric comparisons
  84. def eval_gt(self, value):
  85. return value > self.value
  86. def eval_gte(self, value):
  87. return value >= self.value
  88. def eval_lt(self, value):
  89. return value < self.value
  90. def eval_lte(self, value):
  91. return value <= self.value
  92. # Membership
  93. def eval_in(self, value):
  94. return value in self.value
  95. def eval_contains(self, value):
  96. return self.value in value
  97. # Regular expressions
  98. def eval_regex(self, value):
  99. return re.match(self.value, value) is not None
  100. class ConditionSet:
  101. """
  102. A set of one or more Condition to be evaluated per the prescribed logic (AND or OR). Example:
  103. {"and": [
  104. {"attr": "foo", "op": "eq", "value": 1},
  105. {"attr": "bar", "op": "eq", "value": 2, "negate": true}
  106. ]}
  107. :param ruleset: A dictionary mapping a logical operator to a list of conditional rules
  108. """
  109. def __init__(self, ruleset):
  110. if type(ruleset) is not dict:
  111. raise ValueError(_("Ruleset must be a dictionary, not {ruleset}.").format(ruleset=type(ruleset)))
  112. if len(ruleset) == 1:
  113. self.logic = (list(ruleset.keys())[0]).lower()
  114. if self.logic not in (AND, OR):
  115. raise ValueError(_("Invalid logic type: must be 'AND' or 'OR'. Please check documentation."))
  116. # Compile the set of Conditions
  117. self.conditions = [
  118. ConditionSet(rule) if is_ruleset(rule) else Condition(**rule)
  119. for rule in ruleset[self.logic]
  120. ]
  121. else:
  122. try:
  123. self.logic = None
  124. self.conditions = [Condition(**ruleset)]
  125. except TypeError:
  126. raise ValueError(_("Incorrect key(s) informed. Please check documentation."))
  127. def eval(self, data):
  128. """
  129. Evaluate the provided data to determine whether it matches this set of conditions.
  130. """
  131. func = any if self.logic == 'or' else all
  132. return func(d.eval(data) for d in self.conditions)