conditions.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. import functools
  2. import re
  3. __all__ = (
  4. 'Condition',
  5. 'ConditionSet',
  6. )
  7. AND = 'and'
  8. OR = 'or'
  9. def is_ruleset(data):
  10. """
  11. Determine whether the given dictionary looks like a rule set.
  12. """
  13. return type(data) is dict and len(data) == 1 and list(data.keys())[0] in (AND, OR)
  14. class Condition:
  15. """
  16. An individual conditional rule that evaluates a single attribute and its value.
  17. :param attr: The name of the attribute being evaluated
  18. :param value: The value being compared
  19. :param op: The logical operation to use when evaluating the value (default: 'eq')
  20. """
  21. EQ = 'eq'
  22. GT = 'gt'
  23. GTE = 'gte'
  24. LT = 'lt'
  25. LTE = 'lte'
  26. IN = 'in'
  27. CONTAINS = 'contains'
  28. REGEX = 'regex'
  29. OPERATORS = (
  30. EQ, GT, GTE, LT, LTE, IN, CONTAINS, REGEX
  31. )
  32. TYPES = {
  33. str: (EQ, CONTAINS, REGEX),
  34. bool: (EQ, CONTAINS),
  35. int: (EQ, GT, GTE, LT, LTE, CONTAINS),
  36. float: (EQ, GT, GTE, LT, LTE, CONTAINS),
  37. list: (EQ, IN, CONTAINS)
  38. }
  39. def __init__(self, attr, value, op=EQ, negate=False):
  40. if op not in self.OPERATORS:
  41. raise ValueError(f"Unknown operator: {op}. Must be one of: {', '.join(self.OPERATORS)}")
  42. if type(value) not in self.TYPES:
  43. raise ValueError(f"Unsupported value type: {type(value)}")
  44. if op not in self.TYPES[type(value)]:
  45. raise ValueError(f"Invalid type for {op} operation: {type(value)}")
  46. self.attr = attr
  47. self.value = value
  48. self.eval_func = getattr(self, f'eval_{op}')
  49. self.negate = negate
  50. def eval(self, data):
  51. """
  52. Evaluate the provided data to determine whether it matches the condition.
  53. """
  54. value = functools.reduce(dict.get, self.attr.split('.'), data)
  55. result = self.eval_func(value)
  56. if self.negate:
  57. return not result
  58. return result
  59. # Equivalency
  60. def eval_eq(self, value):
  61. return value == self.value
  62. def eval_neq(self, value):
  63. return value != self.value
  64. # Numeric comparisons
  65. def eval_gt(self, value):
  66. return value > self.value
  67. def eval_gte(self, value):
  68. return value >= self.value
  69. def eval_lt(self, value):
  70. return value < self.value
  71. def eval_lte(self, value):
  72. return value <= self.value
  73. # Membership
  74. def eval_in(self, value):
  75. return value in self.value
  76. def eval_contains(self, value):
  77. return self.value in value
  78. # Regular expressions
  79. def eval_regex(self, value):
  80. return re.match(self.value, value) is not None
  81. class ConditionSet:
  82. """
  83. A set of one or more Condition to be evaluated per the prescribed logic (AND or OR). Example:
  84. {"and": [
  85. {"attr": "foo", "op": "eq", "value": 1},
  86. {"attr": "bar", "op": "eq", "value": 2, "negate": true}
  87. ]}
  88. :param ruleset: A dictionary mapping a logical operator to a list of conditional rules
  89. """
  90. def __init__(self, ruleset):
  91. if type(ruleset) is not dict:
  92. raise ValueError(f"Ruleset must be a dictionary, not {type(ruleset)}.")
  93. if len(ruleset) != 1:
  94. raise ValueError(f"Ruleset must have exactly one logical operator (found {len(ruleset)})")
  95. # Determine the logic type
  96. logic = list(ruleset.keys())[0]
  97. if type(logic) is not str or logic.lower() not in (AND, OR):
  98. raise ValueError(f"Invalid logic type: {logic} (must be '{AND}' or '{OR}')")
  99. self.logic = logic.lower()
  100. # Compile the set of Conditions
  101. self.conditions = [
  102. ConditionSet(rule) if is_ruleset(rule) else Condition(**rule)
  103. for rule in ruleset[self.logic]
  104. ]
  105. def eval(self, data):
  106. """
  107. Evaluate the provided data to determine whether it matches this set of conditions.
  108. """
  109. func = any if self.logic == 'or' else all
  110. return func(d.eval(data) for d in self.conditions)