test_device_config.py 17 KB


  1. """Test the config parser"""
  2. from fuzzywuzzy import fuzz
  3. from unittest import IsolatedAsyncioTestCase
  4. from unittest.mock import MagicMock
  5. from homeassistant.components.sensor import SensorDeviceClass
  6. from custom_components.tuya_local.helpers.config import get_device_id
  7. from custom_components.tuya_local.helpers.device_config import (
  8. available_configs,
  9. get_config,
  10. _bytes_to_fmt,
  11. _typematch,
  12. TuyaDeviceConfig,
  13. TuyaDpsConfig,
  14. TuyaEntityConfig,
  15. )
  16. from custom_components.tuya_local.sensor import TuyaLocalSensor
  17. from .const import (
  18. GPPH_HEATER_PAYLOAD,
  19. KOGAN_HEATER_PAYLOAD,
  20. )
  21. KNOWN_DPS = {
  22. "binary_sensor": {"required": ["sensor"], "optional": []},
  23. "button": {"required": ["button"], "optional": []},
  24. "camera": {
  25. "required": [],
  26. "optional": ["switch", "motion_enable", "snapshot", "record"],
  27. },
  28. "climate": {
  29. "required": [],
  30. "optional": [
  31. "aux_heat",
  32. "current_temperature",
  33. "current_humidity",
  34. "fan_mode",
  35. "humidity",
  36. "hvac_mode",
  37. "hvac_action",
  38. "min_temperature",
  39. "max_temperature",
  40. "preset_mode",
  41. "swing_mode",
  42. {
  43. "xor": [
  44. "temperature",
  45. {"and": ["target_temp_high", "target_temp_low"]},
  46. ]
  47. },
  48. "temperature_unit",
  49. ],
  50. },
  51. "cover": {
  52. "required": [{"or": ["control", "position"]}],
  53. "optional": [
  54. "current_position",
  55. "action",
  56. "open",
  57. "reversed",
  58. ],
  59. },
  60. "fan": {
  61. "required": [{"or": ["preset_mode", "speed"]}],
  62. "optional": ["switch", "oscillate", "direction"],
  63. },
  64. "humidifier": {"required": ["switch", "humidity"], "optional": ["mode"]},
  65. "light": {
  66. "required": [{"or": ["switch", "brightness", "effect"]}],
  67. "optional": ["color_mode", "color_temp", "rgbhsv"],
  68. },
  69. "lock": {
  70. "required": [
  71. {"or": ["lock", {"and": ["request_unlock", "approve_unlock"]}]},
  72. ],
  73. "optional": [
  74. "unlock_fingerprint",
  75. "unlock_password",
  76. "unlock_temp_pwd",
  77. "unlock_dynamic_pwd",
  78. "unlock_card",
  79. "unlock_app",
  80. "unlock_key",
  81. {"and": ["request_intercom", "approve_intercom"]},
  82. "jammed",
  83. ],
  84. },
  85. "number": {
  86. "required": ["value"],
  87. "optional": ["unit", "minimum", "maximum"],
  88. },
  89. "select": {"required": ["option"], "optional": []},
  90. "sensor": {"required": ["sensor"], "optional": ["unit"]},
  91. "siren": {"required": [], "optional": ["tone", "volume", "duration"]},
  92. "switch": {"required": ["switch"], "optional": ["current_power_w"]},
  93. "vacuum": {
  94. "required": ["status"],
  95. "optional": [
  96. "command",
  97. "locate",
  98. "power",
  99. "activate",
  100. "battery",
  101. "direction_control",
  102. "error",
  103. "fan_speed",
  104. ],
  105. },
  106. "water_heater": {
  107. "required": [],
  108. "optional": [
  109. "current_temperature",
  110. "operation_mode",
  111. "temperature",
  112. "temperature_unit",
  113. "min_temperature",
  114. "max_temperature",
  115. ],
  116. },
  117. }
  118. class TestDeviceConfig(IsolatedAsyncioTestCase):
  119. """Test the device config parser"""
  120. def test_can_find_config_files(self):
  121. """Test that the config files can be found by the parser."""
  122. found = False
  123. for cfg in available_configs():
  124. found = True
  125. break
  126. self.assertTrue(found)
  127. def dp_match(self, condition, accounted, unaccounted, known, required=False):
  128. if type(condition) is str:
  129. known.add(condition)
  130. if condition in unaccounted:
  131. unaccounted.remove(condition)
  132. accounted.add(condition)
  133. if required:
  134. return condition in accounted
  135. else:
  136. return True
  137. elif "and" in condition:
  138. return self.and_match(
  139. condition["and"], accounted, unaccounted, known, required
  140. )
  141. elif "or" in condition:
  142. return self.or_match(condition["or"], accounted, unaccounted, known)
  143. elif "xor" in condition:
  144. return self.xor_match(
  145. condition["xor"], accounted, unaccounted, known, required
  146. )
  147. else:
  148. self.fail(f"Unrecognized condition {condition}")
  149. def and_match(self, conditions, accounted, unaccounted, known, required):
  150. single_match = False
  151. all_match = True
  152. for cond in conditions:
  153. match = self.dp_match(cond, accounted, unaccounted, known, True)
  154. all_match = all_match and match
  155. single_match = single_match or match
  156. if required:
  157. return all_match
  158. else:
  159. return all_match == single_match
  160. def or_match(self, conditions, accounted, unaccounted, known):
  161. match = False
  162. # loop through all, to ensure they are transferred to accounted list
  163. for cond in conditions:
  164. match = match or self.dp_match(cond, accounted, unaccounted, known, True)
  165. return match
  166. def xor_match(self, conditions, accounted, unaccounted, known, required):
  167. prior_match = False
  168. for cond in conditions:
  169. match = self.dp_match(cond, accounted, unaccounted, known, True)
  170. if match and prior_match:
  171. return False
  172. prior_match = prior_match or match
  173. # If any matched, all should be considered matched
  174. # this bit only handles nesting "and" within "xor"
  175. if prior_match:
  176. for c in conditions:
  177. if type(c) is str:
  178. accounted.add(c)
  179. elif "and" in c:
  180. for c2 in c["and"]:
  181. if type(c2) is str:
  182. accounted.add(c2)
  183. return prior_match or not required
  184. def rule_broken_msg(self, rule):
  185. msg = ""
  186. if type(rule) is str:
  187. return f"{msg} {rule}"
  188. elif "and" in rule:
  189. msg = f"{msg} all of ["
  190. for sub in rule["and"]:
  191. msg = f"{msg} {self.rule_broken_msg(sub)}"
  192. return f"{msg} ]"
  193. elif "or" in rule:
  194. msg = f"{msg} at least one of ["
  195. for sub in rule["or"]:
  196. msg = f"{msg} {self.rule_broken_msg(sub)}"
  197. return f"{msg} ]"
  198. elif "xor" in rule:
  199. msg = f"{msg} only one of ["
  200. for sub in rule["xor"]:
  201. msg = f"{msg} {self.rule_broken_msg(sub)}"
  202. return f"{msg} ]"
  203. return "for reason unknown"
  204. def check_entity(self, entity, cfg):
  205. """
  206. Check that the entity has a dps list and each dps has an id,
  207. type and name.
  208. """
  209. self.assertIsNotNone(
  210. entity._config.get("entity"), f"entity type missing in {cfg}"
  211. )
  212. e = entity.config_id
  213. self.assertIsNotNone(
  214. entity._config.get("dps"), f"dps missing from {e} in {cfg}"
  215. )
  216. functions = set()
  217. extra = set()
  218. known = set()
  219. for dp in entity.dps():
  220. self.assertIsNotNone(
  221. dp._config.get("id"), f"dp id missing from {e} in {cfg}"
  222. )
  223. self.assertIsNotNone(
  224. dp._config.get("type"), f"dp type missing from {e} in {cfg}"
  225. )
  226. self.assertIsNotNone(
  227. dp._config.get("name"), f"dp name missing from {e} in {cfg}"
  228. )
  229. extra.add(dp.name)
  230. expected = KNOWN_DPS.get(entity.entity)
  231. for rule in expected["required"]:
  232. self.assertTrue(
  233. self.dp_match(rule, functions, extra, known, True),
  234. f"{cfg} missing required {self.rule_broken_msg(rule)} in {e}",
  235. )
  236. for rule in expected["optional"]:
  237. self.assertTrue(
  238. self.dp_match(rule, functions, extra, known, False),
  239. f"{cfg} expecting {self.rule_broken_msg(rule)} in {e}",
  240. )
  241. # Check for potential typos in extra attributes
  242. known_extra = known - functions
  243. for attr in extra:
  244. for dp in known_extra:
  245. self.assertLess(
  246. fuzz.ratio(attr, dp),
  247. 85,
  248. f"Probable typo {attr} is too similar to {dp} in {cfg} {e}",
  249. )
  250. # Check that sensors with mapped values are of class enum and vice versa
  251. if entity.entity == "sensor":
  252. mock_device = MagicMock()
  253. sensor = TuyaLocalSensor(mock_device, entity)
  254. if sensor.options:
  255. self.assertEqual(
  256. entity.device_class,
  257. SensorDeviceClass.ENUM,
  258. f"{cfg} {e} has mapped values but does not have a device class of enum",
  259. )
  260. if entity.device_class == SensorDeviceClass.ENUM:
  261. self.assertIsNotNone(
  262. sensor.options,
  263. f"{cfg} {e} has a device class of enum, but has no mapped values",
  264. )
  265. def test_config_files_parse(self):
  266. """
  267. All configs should be parsable and meet certain criteria
  268. """
  269. for cfg in available_configs():
  270. entities = []
  271. parsed = TuyaDeviceConfig(cfg)
  272. # Check for error messages or unparsed config
  273. if isinstance(parsed, str) or isinstance(parsed._config, str):
  274. self.fail(f"unparsable yaml in {cfg}")
  275. self.assertIsNotNone(
  276. parsed._config.get("name"),
  277. f"name missing from {cfg}",
  278. )
  279. self.assertIsNotNone(
  280. parsed._config.get("primary_entity"),
  281. f"primary_entity missing from {cfg}",
  282. )
  283. self.check_entity(parsed.primary_entity, cfg)
  284. entities.append(parsed.primary_entity.config_id)
  285. for entity in parsed.secondary_entities():
  286. self.check_entity(entity, cfg)
  287. entities.append(entity.config_id)
  288. self.assertCountEqual(entities, set(entities))
  289. # Most of the device_config functionality is exercised during testing of
  290. # the various supported devices. These tests concentrate only on the gaps.
  291. def test_match_quality(self):
  292. """Test the match_quality function."""
  293. cfg = get_config("deta_fan")
  294. q = cfg.match_quality({**KOGAN_HEATER_PAYLOAD, "updated_at": 0})
  295. self.assertEqual(q, 0)
  296. q = cfg.match_quality({**GPPH_HEATER_PAYLOAD})
  297. self.assertEqual(q, 0)
  298. def test_entity_find_unknown_dps_fails(self):
  299. """Test that finding a dps that doesn't exist fails."""
  300. cfg = get_config("kogan_switch")
  301. non_existing = cfg.primary_entity.find_dps("missing")
  302. self.assertIsNone(non_existing)
  303. async def test_dps_async_set_readonly_value_fails(self):
  304. """Test that setting a readonly dps fails."""
  305. mock_device = MagicMock()
  306. cfg = get_config("goldair_gpph_heater")
  307. error_code = cfg.primary_entity.find_dps("error")
  308. with self.assertRaises(TypeError):
  309. await error_code.async_set_value(mock_device, 1)
  310. def test_dps_values_returns_none_with_no_mapping(self):
  311. """
  312. Test that a dps with no mapping returns None as its possible values
  313. """
  314. mock_device = MagicMock()
  315. cfg = get_config("goldair_gpph_heater")
  316. temp = cfg.primary_entity.find_dps("current_temperature")
  317. self.assertIsNone(temp.values(mock_device))
  318. def test_config_returned(self):
  319. """Test that config file is returned by config"""
  320. cfg = get_config("kogan_switch")
  321. self.assertEqual(cfg.config, "smartplugv1.yaml")
  322. def test_float_matches_ints(self):
  323. """Test that the _typematch function matches int values to float dps"""
  324. self.assertTrue(_typematch(float, 1))
  325. def test_bytes_to_fmt_returns_string_for_unknown(self):
  326. """
  327. Test that the _bytes_to_fmt function parses unknown number of bytes
  328. as a string format.
  329. """
  330. self.assertEqual(_bytes_to_fmt(5), "5s")
  331. def test_deprecation(self):
  332. """Test that deprecation messages are picked from the config."""
  333. mock_device = MagicMock()
  334. mock_device.name = "Testing"
  335. mock_config = {"entity": "Test", "deprecated": "Passed"}
  336. cfg = TuyaEntityConfig(mock_device, mock_config)
  337. self.assertTrue(cfg.deprecated)
  338. self.assertEqual(
  339. cfg.deprecation_message,
  340. "The use of Test for Testing is deprecated and should be "
  341. "replaced by Passed.",
  342. )
  343. def test_format_with_none_defined(self):
  344. """Test that format returns None when there is none configured."""
  345. mock_entity = MagicMock()
  346. mock_config = {"id": "1", "name": "test", "type": "string"}
  347. cfg = TuyaDpsConfig(mock_entity, mock_config)
  348. self.assertIsNone(cfg.format)
  349. def test_decoding_base64(self):
  350. """Test that decoded_value works with base64 encoding."""
  351. mock_entity = MagicMock()
  352. mock_config = {"id": "1", "name": "test", "type": "base64"}
  353. mock_device = MagicMock()
  354. mock_device.get_property.return_value = "VGVzdA=="
  355. cfg = TuyaDpsConfig(mock_entity, mock_config)
  356. self.assertEqual(
  357. cfg.decoded_value(mock_device),
  358. bytes("Test", "utf-8"),
  359. )
  360. def test_decoding_unencoded(self):
  361. """Test that decoded_value returns the raw value when not encoded."""
  362. mock_entity = MagicMock()
  363. mock_config = {"id": "1", "name": "test", "type": "string"}
  364. mock_device = MagicMock()
  365. mock_device.get_property.return_value = "VGVzdA=="
  366. cfg = TuyaDpsConfig(mock_entity, mock_config)
  367. self.assertEqual(
  368. cfg.decoded_value(mock_device),
  369. "VGVzdA==",
  370. )
  371. def test_encoding_base64(self):
  372. """Test that encode_value works with base64."""
  373. mock_entity = MagicMock()
  374. mock_config = {"id": "1", "name": "test", "type": "base64"}
  375. cfg = TuyaDpsConfig(mock_entity, mock_config)
  376. self.assertEqual(cfg.encode_value(bytes("Test", "utf-8")), "VGVzdA==")
  377. def test_encoding_unencoded(self):
  378. """Test that encode_value works with base64."""
  379. mock_entity = MagicMock()
  380. mock_config = {"id": "1", "name": "test", "type": "string"}
  381. cfg = TuyaDpsConfig(mock_entity, mock_config)
  382. self.assertEqual(cfg.encode_value("Test"), "Test")
  383. def test_match_returns_false_on_errors_with_bitfield(self):
  384. """Test that TypeError and ValueError cause match to return False."""
  385. mock_entity = MagicMock()
  386. mock_config = {"id": "1", "name": "test", "type": "bitfield"}
  387. cfg = TuyaDpsConfig(mock_entity, mock_config)
  388. self.assertFalse(cfg._match(15, "not an integer"))
  389. def test_values_with_mirror(self):
  390. """Test that value_mirror redirects."""
  391. mock_entity = MagicMock()
  392. mock_config = {
  393. "id": "1",
  394. "type": "string",
  395. "name": "test",
  396. "mapping": [
  397. {"dps_val": "mirror", "value_mirror": "map_mirror"},
  398. {"dps_val": "plain", "value": "unmirrored"},
  399. ],
  400. }
  401. mock_map_config = {
  402. "id": "2",
  403. "type": "string",
  404. "name": "map_mirror",
  405. "mapping": [
  406. {"dps_val": "1", "value": "map_one"},
  407. {"dps_val": "2", "value": "map_two"},
  408. ],
  409. }
  410. mock_device = MagicMock()
  411. mock_device.get_property.return_value = "1"
  412. cfg = TuyaDpsConfig(mock_entity, mock_config)
  413. map = TuyaDpsConfig(mock_entity, mock_map_config)
  414. mock_entity.find_dps.return_value = map
  415. self.assertCountEqual(
  416. cfg.values(mock_device),
  417. ["unmirrored", "map_one", "map_two"],
  418. )
  419. def test_get_device_id(self):
  420. """Test that check if device id is correct"""
  421. self.assertEqual("my-device-id", get_device_id({"device_id": "my-device-id"}))
  422. self.assertEqual("sub-id", get_device_id({"device_cid": "sub-id"}))
  423. self.assertEqual("s", get_device_id({"device_id": "d", "device_cid": "s"}))
  424. # values gets very complex, with things like mappings within conditions
  425. # within mappings. I'd expect something like this was added with purpose,
  426. # but it isn't exercised by any of the existing unit tests.
  427. # value-mirror above is explained by the fact that the device it was
  428. # added for never worked properly, so was removed.
  429. def test_default_without_mapping(self):
  430. """Test that default returns None when there is no mapping"""
  431. mock_entity = MagicMock()
  432. mock_config = {"id": "1", "name": "test", "type": "string"}
  433. cfg = TuyaDpsConfig(mock_entity, mock_config)
  434. self.assertIsNone(cfg.default())