test_device_config.py 17 KB

1
  1. """Test the config parser""" from fuzzywuzzy import fuzz from unittest import IsolatedAsyncioTestCase from unittest.mock import MagicMock from homeassistant.components.sensor import SensorDeviceClass from custom_components.tuya_local.helpers.device_config import ( available_configs, get_config, _bytes_to_fmt, _typematch, TuyaDeviceConfig, TuyaDpsConfig, TuyaEntityConfig, ) from custom_components.tuya_local.sensor import TuyaLocalSensor from .const import ( GPPH_HEATER_PAYLOAD, KOGAN_HEATER_PAYLOAD, ) KNOWN_DPS = { "binary_sensor": {"required": ["sensor"], "optional": []}, "button": {"required": ["button"], "optional": []}, "camera": { "required": [], "optional": ["switch", "motion_enable", "snapshot", "record"], }, "climate": { "required": [], "optional": [ "aux_heat", "current_temperature", "current_humidity", "fan_mode", "humidity", "hvac_mode", "hvac_action", "min_temperature", "max_temperature", "preset_mode", "swing_mode", { "xor": [ "temperature", {"and": ["target_temp_high", "target_temp_low"]}, ] }, "temperature_unit", ], }, "cover": { "required": [{"or": ["control", "position"]}], "optional": [ "current_position", "action", "open", "reversed", ], }, "fan": { "required": [{"or": ["preset_mode", "speed"]}], "optional": ["switch", "oscillate", "direction"], }, "humidifier": {"required": ["switch", "humidity"], "optional": ["mode"]}, "light": { "required": [{"or": ["switch", "brightness", "effect"]}], "optional": ["color_mode", "color_temp", "rgbhsv"], }, "lock": { "required": [ {"or": ["lock", {"and": ["request_unlock", "approve_unlock"]}]}, ], "optional": [ "unlock_fingerprint", "unlock_password", "unlock_temp_pwd", "unlock_dynamic_pwd", "unlock_card", "unlock_app", "unlock_key", {"and": ["request_intercom", "approve_intercom"]}, "jammed", ], }, "number": { "required": ["value"], "optional": ["unit", "minimum", "maximum"], }, "select": {"required": ["option"], "optional": []}, "sensor": {"required": ["sensor"], "optional": ["unit"]}, "siren": {"required": [], "optional": ["tone", "volume", "duration"]}, "switch": {"required": ["switch"], "optional": ["current_power_w"]}, "vacuum": { "required": ["status"], "optional": [ "command", "locate", "power", "activate", "battery", "direction_control", "error", "fan_speed", ], }, "water_heater": { "required": [], "optional": [ "current_temperature", "operation_mode", "temperature", "temperature_unit", "min_temperature", "max_temperature", ], }, } class TestDeviceConfig(IsolatedAsyncioTestCase): """Test the device config parser""" def test_can_find_config_files(self): """Test that the config files can be found by the parser.""" found = False for cfg in available_configs(): found = True break self.assertTrue(found) def dp_match(self, condition, accounted, unaccounted, known, required=False): if type(condition) is str: known.add(condition) if condition in unaccounted: unaccounted.remove(condition) accounted.add(condition) if required: return condition in accounted else: return True elif "and" in condition: return self.and_match( condition["and"], accounted, unaccounted, known, required ) elif "or" in condition: return self.or_match(condition["or"], accounted, unaccounted, known) elif "xor" in condition: return self.xor_match( condition["xor"], accounted, unaccounted, known, required ) else: self.fail(f"Unrecognized condition {condition}") def and_match(self, conditions, accounted, unaccounted, known, required): single_match = False all_match = True for cond in conditions: match = self.dp_match(cond, accounted, unaccounted, known, True) all_match = all_match and match single_match = single_match or match if required: return all_match else: return all_match == single_match def or_match(self, conditions, accounted, unaccounted, known): match = False # loop through all, to ensure they are transferred to accounted list for cond in conditions: match = match or self.dp_match(cond, accounted, unaccounted, known, True) return match def xor_match(self, conditions, accounted, unaccounted, known, required): prior_match = False for cond in conditions: match = self.dp_match(cond, accounted, unaccounted, known, True) if match and prior_match: return False prior_match = prior_match or match # If any matched, all should be considered matched # this bit only handles nesting "and" within "xor" if prior_match: for c in conditions: if type(c) is str: accounted.add(c) elif "and" in c: for c2 in c["and"]: if type(c2) is str: accounted.add(c2) return prior_match or not required def rule_broken_msg(self, rule): msg = "" if type(rule) is str: return f"{msg} {rule}" elif "and" in rule: msg = f"{msg} all of [" for sub in rule["and"]: msg = f"{msg} {self.rule_broken_msg(sub)}" return f"{msg} ]" elif "or" in rule: msg = f"{msg} at least one of [" for sub in rule["or"]: msg = f"{msg} {self.rule_broken_msg(sub)}" return f"{msg} ]" elif "xor" in rule: msg = f"{msg} only one of [" for sub in rule["xor"]: msg = f"{msg} {self.rule_broken_msg(sub)}" return f"{msg} ]" return "for reason unknown" def check_entity(self, entity, cfg): """ Check that the entity has a dps list and each dps has an id, type and name. """ self.assertIsNotNone( entity._config.get("entity"), f"entity type missing in {cfg}" ) e = entity.config_id self.assertIsNotNone( entity._config.get("dps"), f"dps missing from {e} in {cfg}" ) functions = set() extra = set() known = set() for dp in entity.dps(): self.assertIsNotNone( dp._config.get("id"), f"dp id missing from {e} in {cfg}" ) self.assertIsNotNone( dp._config.get("type"), f"dp type missing from {e} in {cfg}" ) self.assertIsNotNone( dp._config.get("name"), f"dp name missing from {e} in {cfg}" ) extra.add(dp.name) expected = KNOWN_DPS.get(entity.entity) for rule in expected["required"]: self.assertTrue( self.dp_match(rule, functions, extra, known, True), f"{cfg} missing required {self.rule_broken_msg(rule)} in {e}", ) for rule in expected["optional"]: self.assertTrue( self.dp_match(rule, functions, extra, known, False), f"{cfg} expecting {self.rule_broken_msg(rule)} in {e}", ) # Check for potential typos in extra attributes known_extra = known - functions for attr in extra: for dp in known_extra: self.assertLess( fuzz.ratio(attr, dp), 85, f"Probable typo {attr} is too similar to {dp} in {cfg} {e}", ) # Check that sensors with mapped values are of class enum and vice versa if entity.entity == "sensor": mock_device = MagicMock() sensor = TuyaLocalSensor(mock_device, entity) if sensor.options: self.assertEqual( entity.device_class, SensorDeviceClass.ENUM, f"{cfg} {e} has mapped values but does not have a device class of enum", ) if entity.device_class == SensorDeviceClass.ENUM: self.assertIsNotNone( sensor.options, f"{cfg} {e} has a device class of enum, but has no mapped values", ) def test_config_files_parse(self): """ All configs should be parsable and meet certain criteria """ for cfg in available_configs(): entities = [] parsed = TuyaDeviceConfig(cfg) # Check for error messages or unparsed config if isinstance(parsed, str) or isinstance(parsed._config, str): self.fail(f"unparsable yaml in {cfg}") self.assertIsNotNone( parsed._config.get("name"), f"name missing from {cfg}", ) self.assertIsNotNone( parsed._config.get("primary_entity"), f"primary_entity missing from {cfg}", ) self.check_entity(parsed.primary_entity, cfg) entities.append(parsed.primary_entity.config_id) for entity in parsed.secondary_entities(): self.check_entity(entity, cfg) entities.append(entity.config_id) self.assertCountEqual(entities, set(entities)) # Most of the device_config functionality is exercised during testing of # the various supported devices. These tests concentrate only on the gaps. def test_match_quality(self): """Test the match_quality function.""" cfg = get_config("deta_fan") q = cfg.match_quality({**KOGAN_HEATER_PAYLOAD, "updated_at": 0}) self.assertEqual(q, 0) q = cfg.match_quality({**GPPH_HEATER_PAYLOAD}) self.assertEqual(q, 0) def test_entity_find_unknown_dps_fails(self): """Test that finding a dps that doesn't exist fails.""" cfg = get_config("kogan_switch") non_existing = cfg.primary_entity.find_dps("missing") self.assertIsNone(non_existing) async def test_dps_async_set_readonly_value_fails(self): """Test that setting a readonly dps fails.""" mock_device = MagicMock() cfg = get_config("goldair_gpph_heater") error_code = cfg.primary_entity.find_dps("error") with self.assertRaises(TypeError): await error_code.async_set_value(mock_device, 1) def test_dps_values_returns_none_with_no_mapping(self): """ Test that a dps with no mapping returns None as its possible values """ mock_device = MagicMock() cfg = get_config("goldair_gpph_heater") temp = cfg.primary_entity.find_dps("current_temperature") self.assertIsNone(temp.values(mock_device)) def test_config_returned(self): """Test that config file is returned by config""" cfg = get_config("kogan_switch") self.assertEqual(cfg.config, "smartplugv1.yaml") def test_float_matches_ints(self): """Test that the _typematch function matches int values to float dps""" self.assertTrue(_typematch(float, 1)) def test_bytes_to_fmt_returns_string_for_unknown(self): """ Test that the _bytes_to_fmt function parses unknown number of bytes as a string format. """ self.assertEqual(_bytes_to_fmt(5), "5s") def test_deprecation(self): """Test that deprecation messages are picked from the config.""" mock_device = MagicMock() mock_device.name = "Testing" mock_config = {"entity": "Test", "deprecated": "Passed"} cfg = TuyaEntityConfig(mock_device, mock_config) self.assertTrue(cfg.deprecated) self.assertEqual( cfg.deprecation_message, "The use of Test for Testing is deprecated and should be " "replaced by Passed.", ) def test_format_with_none_defined(self): """Test that format returns None when there is none configured.""" mock_entity = MagicMock() mock_config = {"id": "1", "name": "test", "type": "string"} cfg = TuyaDpsConfig(mock_entity, mock_config) self.assertIsNone(cfg.format) def test_decoding_base64(self): """Test that decoded_value works with base64 encoding.""" mock_entity = MagicMock() mock_config = {"id": "1", "name": "test", "type": "base64"} mock_device = MagicMock() mock_device.get_property.return_value = "VGVzdA==" cfg = TuyaDpsConfig(mock_entity, mock_config) self.assertEqual( cfg.decoded_value(mock_device), bytes("Test", "utf-8"), ) def test_decoding_unencoded(self): """Test that decoded_value returns the raw value when not encoded.""" mock_entity = MagicMock() mock_config = {"id": "1", "name": "test", "type": "string"} mock_device = MagicMock() mock_device.get_property.return_value = "VGVzdA==" cfg = TuyaDpsConfig(mock_entity, mock_config) self.assertEqual( cfg.decoded_value(mock_device), "VGVzdA==", ) def test_encoding_base64(self): """Test that encode_value works with base64.""" mock_entity = MagicMock() mock_config = {"id": "1", "name": "test", "type": "base64"} cfg = TuyaDpsConfig(mock_entity, mock_config) self.assertEqual(cfg.encode_value(bytes("Test", "utf-8")), "VGVzdA==") def test_encoding_unencoded(self): """Test that encode_value works with base64.""" mock_entity = MagicMock() mock_config = {"id": "1", "name": "test", "type": "string"} cfg = TuyaDpsConfig(mock_entity, mock_config) self.assertEqual(cfg.encode_value("Test"), "Test") def test_match_returns_false_on_errors_with_bitfield(self): """Test that TypeError and ValueError cause match to return False.""" mock_entity = MagicMock() mock_config = {"id": "1", "name": "test", "type": "bitfield"} cfg = TuyaDpsConfig(mock_entity, mock_config) self.assertFalse(cfg._match(15, "not an integer")) def test_values_with_mirror(self): """Test that value_mirror redirects.""" mock_entity = MagicMock() mock_config = { "id": "1", "type": "string", "name": "test", "mapping": [ {"dps_val": "mirror", "value_mirror": "map_mirror"}, {"dps_val": "plain", "value": "unmirrored"}, ], } mock_map_config = { "id": "2", "type": "string", "name": "map_mirror", "mapping": [ {"dps_val": "1", "value": "map_one"}, {"dps_val": "2", "value": "map_two"}, ], } mock_device = MagicMock() mock_device.get_property.return_value = "1" cfg = TuyaDpsConfig(mock_entity, mock_config) map = TuyaDpsConfig(mock_entity, mock_map_config) mock_entity.find_dps.return_value = map self.assertCountEqual( cfg.values(mock_device), ["unmirrored", "map_one", "map_two"], ) # values gets very complex, with things like mappings within conditions # within mappings. I'd expect something like this was added with purpose, # but it isn't exercised by any of the existing unit tests. # value-mirror above is explained by the fact that the device it was # added for never worked properly, so was removed. def test_default_without_mapping(self): """Test that default returns None when there is no mapping""" mock_entity = MagicMock() mock_config = {"id": "1", "name": "test", "type": "string"} cfg = TuyaDpsConfig(mock_entity, mock_config) self.assertIsNone(cfg.default())