test_device_config.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583
  1. """Test the config parser"""
  2. from unittest import IsolatedAsyncioTestCase
  3. from unittest.mock import MagicMock
  4. from fuzzywuzzy import fuzz
  5. from homeassistant.components.sensor import SensorDeviceClass
  6. from custom_components.tuya_local.helpers.config import get_device_id
  7. from custom_components.tuya_local.helpers.device_config import (
  8. TuyaDeviceConfig,
  9. TuyaDpsConfig,
  10. TuyaEntityConfig,
  11. _bytes_to_fmt,
  12. _typematch,
  13. available_configs,
  14. get_config,
  15. )
  16. from custom_components.tuya_local.sensor import TuyaLocalSensor
  17. from .const import GPPH_HEATER_PAYLOAD, KOGAN_HEATER_PAYLOAD
  18. KNOWN_DPS = {
  19. "alarm_control_panel": {
  20. "required": ["alarm_state"],
  21. "optional": ["trigger"],
  22. },
  23. "binary_sensor": {"required": ["sensor"], "optional": []},
  24. "button": {"required": ["button"], "optional": []},
  25. "camera": {
  26. "required": [],
  27. "optional": ["switch", "motion_enable", "snapshot", "record"],
  28. },
  29. "climate": {
  30. "required": [],
  31. "optional": [
  32. "aux_heat",
  33. "current_temperature",
  34. "current_humidity",
  35. "fan_mode",
  36. "humidity",
  37. "hvac_mode",
  38. "hvac_action",
  39. "min_temperature",
  40. "max_temperature",
  41. "preset_mode",
  42. "swing_mode",
  43. {
  44. "xor": [
  45. "temperature",
  46. {"and": ["target_temp_high", "target_temp_low"]},
  47. ]
  48. },
  49. "temperature_unit",
  50. ],
  51. },
  52. "cover": {
  53. "required": [{"or": ["control", "position"]}],
  54. "optional": [
  55. "current_position",
  56. "action",
  57. "open",
  58. "reversed",
  59. ],
  60. },
  61. "event": {"required": ["event"], "optional": []},
  62. "fan": {
  63. "required": [{"or": ["preset_mode", "speed"]}],
  64. "optional": ["switch", "oscillate", "direction"],
  65. },
  66. "humidifier": {"required": ["switch", "humidity"], "optional": ["mode"]},
  67. "lawn_mower": {"required": ["activity", "command"], "optional": []},
  68. "light": {
  69. "required": [{"or": ["switch", "brightness", "effect"]}],
  70. "optional": ["color_mode", "color_temp", "rgbhsv"],
  71. },
  72. "lock": {
  73. "required": [],
  74. "optional": [
  75. "lock",
  76. {"and": ["request_unlock", "approve_unlock"]},
  77. {"and": ["request_intercom", "approve_intercom"]},
  78. "unlock_fingerprint",
  79. "unlock_password",
  80. "unlock_temp_pwd",
  81. "unlock_dynamic_pwd",
  82. "unlock_offline_pwd",
  83. "unlock_card",
  84. "unlock_app",
  85. "unlock_key",
  86. "unlock_ble",
  87. "jammed",
  88. ],
  89. },
  90. "number": {
  91. "required": ["value"],
  92. "optional": ["unit", "minimum", "maximum"],
  93. },
  94. "remote": {
  95. "required": ["send"],
  96. "optional": ["receive"],
  97. },
  98. "select": {"required": ["option"], "optional": []},
  99. "sensor": {"required": ["sensor"], "optional": ["unit"]},
  100. "siren": {
  101. "required": [],
  102. "optional": ["tone", "volume", "duration", "switch"],
  103. },
  104. "switch": {"required": ["switch"], "optional": ["current_power_w"]},
  105. "vacuum": {
  106. "required": ["status"],
  107. "optional": [
  108. "command",
  109. "locate",
  110. "power",
  111. "activate",
  112. "battery",
  113. "direction_control",
  114. "error",
  115. "fan_speed",
  116. ],
  117. },
  118. "water_heater": {
  119. "required": [],
  120. "optional": [
  121. "current_temperature",
  122. "operation_mode",
  123. "temperature",
  124. "temperature_unit",
  125. "min_temperature",
  126. "max_temperature",
  127. "away_mode",
  128. ],
  129. },
  130. }
  131. class TestDeviceConfig(IsolatedAsyncioTestCase):
  132. """Test the device config parser"""
  133. def test_can_find_config_files(self):
  134. """Test that the config files can be found by the parser."""
  135. found = False
  136. for cfg in available_configs():
  137. found = True
  138. break
  139. self.assertTrue(found)
  140. def dp_match(self, condition, accounted, unaccounted, known, required=False):
  141. if type(condition) is str:
  142. known.add(condition)
  143. if condition in unaccounted:
  144. unaccounted.remove(condition)
  145. accounted.add(condition)
  146. if required:
  147. return condition in accounted
  148. else:
  149. return True
  150. elif "and" in condition:
  151. return self.and_match(
  152. condition["and"], accounted, unaccounted, known, required
  153. )
  154. elif "or" in condition:
  155. return self.or_match(condition["or"], accounted, unaccounted, known)
  156. elif "xor" in condition:
  157. return self.xor_match(
  158. condition["xor"], accounted, unaccounted, known, required
  159. )
  160. else:
  161. self.fail(f"Unrecognized condition {condition}")
  162. def and_match(self, conditions, accounted, unaccounted, known, required):
  163. single_match = False
  164. all_match = True
  165. for cond in conditions:
  166. match = self.dp_match(cond, accounted, unaccounted, known, True)
  167. all_match = all_match and match
  168. single_match = single_match or match
  169. if required:
  170. return all_match
  171. else:
  172. return all_match == single_match
  173. def or_match(self, conditions, accounted, unaccounted, known):
  174. match = False
  175. # loop through all, to ensure they are transferred to accounted list
  176. for cond in conditions:
  177. match = match or self.dp_match(cond, accounted, unaccounted, known, True)
  178. return match
  179. def xor_match(self, conditions, accounted, unaccounted, known, required):
  180. prior_match = False
  181. for cond in conditions:
  182. match = self.dp_match(cond, accounted, unaccounted, known, True)
  183. if match and prior_match:
  184. return False
  185. prior_match = prior_match or match
  186. # If any matched, all should be considered matched
  187. # this bit only handles nesting "and" within "xor"
  188. if prior_match:
  189. for c in conditions:
  190. if type(c) is str:
  191. accounted.add(c)
  192. elif "and" in c:
  193. for c2 in c["and"]:
  194. if type(c2) is str:
  195. accounted.add(c2)
  196. return prior_match or not required
  197. def rule_broken_msg(self, rule):
  198. msg = ""
  199. if type(rule) is str:
  200. return f"{msg} {rule}"
  201. elif "and" in rule:
  202. msg = f"{msg} all of ["
  203. for sub in rule["and"]:
  204. msg = f"{msg} {self.rule_broken_msg(sub)}"
  205. return f"{msg} ]"
  206. elif "or" in rule:
  207. msg = f"{msg} at least one of ["
  208. for sub in rule["or"]:
  209. msg = f"{msg} {self.rule_broken_msg(sub)}"
  210. return f"{msg} ]"
  211. elif "xor" in rule:
  212. msg = f"{msg} only one of ["
  213. for sub in rule["xor"]:
  214. msg = f"{msg} {self.rule_broken_msg(sub)}"
  215. return f"{msg} ]"
  216. return "for reason unknown"
  217. def check_entity(self, entity, cfg):
  218. """
  219. Check that the entity has a dps list and each dps has an id,
  220. type and name, and any other consistency checks.
  221. """
  222. self.assertIsNotNone(
  223. entity._config.get("entity"), f"entity type missing in {cfg}"
  224. )
  225. e = entity.config_id
  226. self.assertIsNotNone(
  227. entity._config.get("dps"), f"dps missing from {e} in {cfg}"
  228. )
  229. functions = set()
  230. extra = set()
  231. known = set()
  232. redirects = set()
  233. # Basic checks of dps, and initialising of redirects and extras sets
  234. # for later checking
  235. for dp in entity.dps():
  236. self.assertIsNotNone(
  237. dp._config.get("id"), f"dp id missing from {e} in {cfg}"
  238. )
  239. self.assertIsNotNone(
  240. dp._config.get("type"), f"dp type missing from {e} in {cfg}"
  241. )
  242. self.assertIsNotNone(
  243. dp._config.get("name"), f"dp name missing from {e} in {cfg}"
  244. )
  245. extra.add(dp.name)
  246. mappings = dp._config.get("mapping", [])
  247. self.assertIsInstance(
  248. mappings,
  249. list,
  250. f"mapping is not a list in {cfg}; entity {e}, dp {dp.name}",
  251. )
  252. for m in mappings:
  253. conditions = m.get("conditions", [])
  254. self.assertIsInstance(
  255. conditions,
  256. list,
  257. f"conditions is not a list in {cfg}; entity {e}, dp {dp.name}",
  258. )
  259. for c in conditions:
  260. if c.get("value_redirect"):
  261. redirects.add(c.get("value_redirect"))
  262. if c.get("value_mirror"):
  263. redirects.add(c.get("value_mirror"))
  264. if m.get("value_redirect"):
  265. redirects.add(m.get("value_redirect"))
  266. if m.get("value_mirror"):
  267. redirects.add(m.get("value_mirror"))
  268. # Check redirects all exist
  269. for redirect in redirects:
  270. self.assertIn(redirect, extra, f"dp {redirect} missing from {e} in {cfg}")
  271. # Check dps that are required for this entity type all exist
  272. expected = KNOWN_DPS.get(entity.entity)
  273. for rule in expected["required"]:
  274. self.assertTrue(
  275. self.dp_match(rule, functions, extra, known, True),
  276. f"{cfg} missing required {self.rule_broken_msg(rule)} in {e}",
  277. )
  278. for rule in expected["optional"]:
  279. self.assertTrue(
  280. self.dp_match(rule, functions, extra, known, False),
  281. f"{cfg} expecting {self.rule_broken_msg(rule)} in {e}",
  282. )
  283. # Check for potential typos in extra attributes
  284. known_extra = known - functions
  285. for attr in extra:
  286. for dp in known_extra:
  287. self.assertLess(
  288. fuzz.ratio(attr, dp),
  289. 85,
  290. f"Probable typo {attr} is too similar to {dp} in {cfg} {e}",
  291. )
  292. # Check that sensors with mapped values are of class enum and vice versa
  293. if entity.entity == "sensor":
  294. mock_device = MagicMock()
  295. sensor = TuyaLocalSensor(mock_device, entity)
  296. if sensor.options:
  297. self.assertEqual(
  298. entity.device_class,
  299. SensorDeviceClass.ENUM,
  300. f"{cfg} {e} has mapped values but does not have a device class of enum",
  301. )
  302. if entity.device_class == SensorDeviceClass.ENUM:
  303. self.assertIsNotNone(
  304. sensor.options,
  305. f"{cfg} {e} has a device class of enum, but has no mapped values",
  306. )
  307. def test_config_files_parse(self):
  308. """
  309. All configs should be parsable and meet certain criteria
  310. """
  311. for cfg in available_configs():
  312. entities = []
  313. parsed = TuyaDeviceConfig(cfg)
  314. # Check for error messages or unparsed config
  315. if isinstance(parsed, str) or isinstance(parsed._config, str):
  316. self.fail(f"unparsable yaml in {cfg}")
  317. self.assertIsNotNone(
  318. parsed._config.get("name"),
  319. f"name missing from {cfg}",
  320. )
  321. self.assertIsNotNone(
  322. parsed._config.get("primary_entity"),
  323. f"primary_entity missing from {cfg}",
  324. )
  325. self.check_entity(parsed.primary_entity, cfg)
  326. entities.append(parsed.primary_entity.config_id)
  327. secondary = False
  328. for entity in parsed.secondary_entities():
  329. secondary = True
  330. self.check_entity(entity, cfg)
  331. entities.append(entity.config_id)
  332. # check entities are unique
  333. self.assertCountEqual(entities, set(entities))
  334. # If there are no secondary entities, check that it is intended
  335. if not secondary:
  336. for key in parsed._config.keys():
  337. self.assertFalse(
  338. key.startswith("sec"),
  339. f"misspelled secondary_entities in {cfg}",
  340. )
  341. # Most of the device_config functionality is exercised during testing of
  342. # the various supported devices. These tests concentrate only on the gaps.
  343. def test_match_quality(self):
  344. """Test the match_quality function."""
  345. cfg = get_config("deta_fan")
  346. q = cfg.match_quality({**KOGAN_HEATER_PAYLOAD, "updated_at": 0})
  347. self.assertEqual(q, 0)
  348. q = cfg.match_quality({**GPPH_HEATER_PAYLOAD})
  349. self.assertEqual(q, 0)
  350. def test_entity_find_unknown_dps_fails(self):
  351. """Test that finding a dps that doesn't exist fails."""
  352. cfg = get_config("kogan_switch")
  353. non_existing = cfg.primary_entity.find_dps("missing")
  354. self.assertIsNone(non_existing)
  355. async def test_dps_async_set_readonly_value_fails(self):
  356. """Test that setting a readonly dps fails."""
  357. mock_device = MagicMock()
  358. cfg = get_config("goldair_gpph_heater")
  359. error_code = cfg.primary_entity.find_dps("error")
  360. with self.assertRaises(TypeError):
  361. await error_code.async_set_value(mock_device, 1)
  362. def test_dps_values_is_empty_with_no_mapping(self):
  363. """
  364. Test that a dps with no mapping returns None as its possible values
  365. """
  366. mock_device = MagicMock()
  367. cfg = get_config("goldair_gpph_heater")
  368. temp = cfg.primary_entity.find_dps("current_temperature")
  369. self.assertEqual(temp.values(mock_device), [])
  370. def test_config_returned(self):
  371. """Test that config file is returned by config"""
  372. cfg = get_config("kogan_switch")
  373. self.assertEqual(cfg.config, "smartplugv1.yaml")
  374. def test_float_matches_ints(self):
  375. """Test that the _typematch function matches int values to float dps"""
  376. self.assertTrue(_typematch(float, 1))
  377. def test_bytes_to_fmt_returns_string_for_unknown(self):
  378. """
  379. Test that the _bytes_to_fmt function parses unknown number of bytes
  380. as a string format.
  381. """
  382. self.assertEqual(_bytes_to_fmt(5), "5s")
  383. def test_deprecation(self):
  384. """Test that deprecation messages are picked from the config."""
  385. mock_device = MagicMock()
  386. mock_device.name = "Testing"
  387. mock_config = {"entity": "Test", "deprecated": "Passed"}
  388. cfg = TuyaEntityConfig(mock_device, mock_config)
  389. self.assertTrue(cfg.deprecated)
  390. self.assertEqual(
  391. cfg.deprecation_message,
  392. "The use of Test for Testing is deprecated and should be "
  393. "replaced by Passed.",
  394. )
  395. def test_format_with_none_defined(self):
  396. """Test that format returns None when there is none configured."""
  397. mock_entity = MagicMock()
  398. mock_config = {"id": "1", "name": "test", "type": "string"}
  399. cfg = TuyaDpsConfig(mock_entity, mock_config)
  400. self.assertIsNone(cfg.format)
  401. def test_decoding_base64(self):
  402. """Test that decoded_value works with base64 encoding."""
  403. mock_entity = MagicMock()
  404. mock_config = {"id": "1", "name": "test", "type": "base64"}
  405. mock_device = MagicMock()
  406. mock_device.get_property.return_value = "VGVzdA=="
  407. cfg = TuyaDpsConfig(mock_entity, mock_config)
  408. self.assertEqual(
  409. cfg.decoded_value(mock_device),
  410. bytes("Test", "utf-8"),
  411. )
  412. def test_decoding_hex(self):
  413. """Test that decoded_value works with hex encoding."""
  414. mock_entity = MagicMock()
  415. mock_config = {"id": "1", "name": "test", "type": "hex"}
  416. mock_device = MagicMock()
  417. mock_device.get_property.return_value = "babe"
  418. cfg = TuyaDpsConfig(mock_entity, mock_config)
  419. self.assertEqual(
  420. cfg.decoded_value(mock_device),
  421. b"\xba\xbe",
  422. )
  423. def test_decoding_unencoded(self):
  424. """Test that decoded_value returns the raw value when not encoded."""
  425. mock_entity = MagicMock()
  426. mock_config = {"id": "1", "name": "test", "type": "string"}
  427. mock_device = MagicMock()
  428. mock_device.get_property.return_value = "VGVzdA=="
  429. cfg = TuyaDpsConfig(mock_entity, mock_config)
  430. self.assertEqual(
  431. cfg.decoded_value(mock_device),
  432. "VGVzdA==",
  433. )
  434. def test_encoding_base64(self):
  435. """Test that encode_value works with base64."""
  436. mock_entity = MagicMock()
  437. mock_config = {"id": "1", "name": "test", "type": "base64"}
  438. cfg = TuyaDpsConfig(mock_entity, mock_config)
  439. self.assertEqual(cfg.encode_value(bytes("Test", "utf-8")), "VGVzdA==")
  440. def test_encoding_hex(self):
  441. """Test that encode_value works with base64."""
  442. mock_entity = MagicMock()
  443. mock_config = {"id": "1", "name": "test", "type": "hex"}
  444. cfg = TuyaDpsConfig(mock_entity, mock_config)
  445. self.assertEqual(cfg.encode_value(b"\xca\xfe"), "cafe")
  446. def test_encoding_unencoded(self):
  447. """Test that encode_value works with base64."""
  448. mock_entity = MagicMock()
  449. mock_config = {"id": "1", "name": "test", "type": "string"}
  450. cfg = TuyaDpsConfig(mock_entity, mock_config)
  451. self.assertEqual(cfg.encode_value("Test"), "Test")
  452. def test_match_returns_false_on_errors_with_bitfield(self):
  453. """Test that TypeError and ValueError cause match to return False."""
  454. mock_entity = MagicMock()
  455. mock_config = {"id": "1", "name": "test", "type": "bitfield"}
  456. cfg = TuyaDpsConfig(mock_entity, mock_config)
  457. self.assertFalse(cfg._match(15, "not an integer"))
  458. def test_values_with_mirror(self):
  459. """Test that value_mirror redirects."""
  460. mock_entity = MagicMock()
  461. mock_config = {
  462. "id": "1",
  463. "type": "string",
  464. "name": "test",
  465. "mapping": [
  466. {"dps_val": "mirror", "value_mirror": "map_mirror"},
  467. {"dps_val": "plain", "value": "unmirrored"},
  468. ],
  469. }
  470. mock_map_config = {
  471. "id": "2",
  472. "type": "string",
  473. "name": "map_mirror",
  474. "mapping": [
  475. {"dps_val": "1", "value": "map_one"},
  476. {"dps_val": "2", "value": "map_two"},
  477. ],
  478. }
  479. mock_device = MagicMock()
  480. mock_device.get_property.return_value = "1"
  481. cfg = TuyaDpsConfig(mock_entity, mock_config)
  482. map = TuyaDpsConfig(mock_entity, mock_map_config)
  483. mock_entity.find_dps.return_value = map
  484. self.assertCountEqual(
  485. cfg.values(mock_device),
  486. ["unmirrored", "map_one", "map_two"],
  487. )
  488. def test_get_device_id(self):
  489. """Test that check if device id is correct"""
  490. self.assertEqual("my-device-id", get_device_id({"device_id": "my-device-id"}))
  491. self.assertEqual("sub-id", get_device_id({"device_cid": "sub-id"}))
  492. self.assertEqual("s", get_device_id({"device_id": "d", "device_cid": "s"}))
  493. def test_getting_masked_hex(self):
  494. """Test that get_value works with masked hex encoding."""
  495. mock_entity = MagicMock()
  496. mock_config = {
  497. "id": "1",
  498. "name": "test",
  499. "type": "hex",
  500. "mapping": [
  501. {"mask": "ff00"},
  502. ],
  503. }
  504. mock_device = MagicMock()
  505. mock_device.get_property.return_value = "babe"
  506. cfg = TuyaDpsConfig(mock_entity, mock_config)
  507. self.assertEqual(
  508. cfg.get_value(mock_device),
  509. 0xBA,
  510. )
  511. def test_setting_masked_hex(self):
  512. """Test that get_values_to_set works with masked hex encoding."""
  513. mock_entity = MagicMock()
  514. mock_config = {
  515. "id": "1",
  516. "name": "test",
  517. "type": "hex",
  518. "mapping": [
  519. {"mask": "ff00"},
  520. ],
  521. }
  522. mock_device = MagicMock()
  523. mock_device.get_property.return_value = "babe"
  524. cfg = TuyaDpsConfig(mock_entity, mock_config)
  525. self.assertEqual(
  526. cfg.get_values_to_set(mock_device, 0xCA),
  527. {"1": "cabe"},
  528. )
  529. def test_default_without_mapping(self):
  530. """Test that default returns None when there is no mapping"""
  531. mock_entity = MagicMock()
  532. mock_config = {"id": "1", "name": "test", "type": "string"}
  533. cfg = TuyaDpsConfig(mock_entity, mock_config)
  534. self.assertIsNone(cfg.default)