test_device_config.py 21 KB


  1. """Test the config parser"""
  2. from unittest import IsolatedAsyncioTestCase
  3. from unittest.mock import MagicMock
  4. from fuzzywuzzy import fuzz
  5. from homeassistant.components.sensor import SensorDeviceClass
  6. from custom_components.tuya_local.helpers.config import get_device_id
  7. from custom_components.tuya_local.helpers.device_config import (
  8. TuyaDeviceConfig,
  9. TuyaDpsConfig,
  10. TuyaEntityConfig,
  11. _bytes_to_fmt,
  12. _typematch,
  13. available_configs,
  14. get_config,
  15. )
  16. from custom_components.tuya_local.sensor import TuyaLocalSensor
  17. from .const import GPPH_HEATER_PAYLOAD, KOGAN_HEATER_PAYLOAD
  18. KNOWN_DPS = {
  19. "alarm_control_panel": {
  20. "required": ["alarm_state"],
  21. "optional": ["trigger"],
  22. },
  23. "binary_sensor": {"required": ["sensor"], "optional": []},
  24. "button": {"required": ["button"], "optional": []},
  25. "camera": {
  26. "required": [],
  27. "optional": ["switch", "motion_enable", "snapshot", "record"],
  28. },
  29. "climate": {
  30. "required": [],
  31. "optional": [
  32. "aux_heat",
  33. "current_temperature",
  34. "current_humidity",
  35. "fan_mode",
  36. "humidity",
  37. "hvac_mode",
  38. "hvac_action",
  39. "min_temperature",
  40. "max_temperature",
  41. "preset_mode",
  42. "swing_mode",
  43. {
  44. "xor": [
  45. "temperature",
  46. {"and": ["target_temp_high", "target_temp_low"]},
  47. ]
  48. },
  49. "temperature_unit",
  50. ],
  51. },
  52. "cover": {
  53. "required": [{"or": ["control", "position"]}],
  54. "optional": [
  55. "current_position",
  56. "action",
  57. "open",
  58. "reversed",
  59. ],
  60. },
  61. "fan": {
  62. "required": [{"or": ["preset_mode", "speed"]}],
  63. "optional": ["switch", "oscillate", "direction"],
  64. },
  65. "humidifier": {"required": ["switch", "humidity"], "optional": ["mode"]},
  66. "lawn_mower": {"required": ["activity", "command"], "optional": []},
  67. "light": {
  68. "required": [{"or": ["switch", "brightness", "effect"]}],
  69. "optional": ["color_mode", "color_temp", "rgbhsv"],
  70. },
  71. "lock": {
  72. "required": [],
  73. "optional": [
  74. "lock",
  75. {"and": ["request_unlock", "approve_unlock"]},
  76. {"and": ["request_intercom", "approve_intercom"]},
  77. "unlock_fingerprint",
  78. "unlock_password",
  79. "unlock_temp_pwd",
  80. "unlock_dynamic_pwd",
  81. "unlock_offline_pwd",
  82. "unlock_card",
  83. "unlock_app",
  84. "unlock_key",
  85. "unlock_ble",
  86. "jammed",
  87. ],
  88. },
  89. "number": {
  90. "required": ["value"],
  91. "optional": ["unit", "minimum", "maximum"],
  92. },
  93. "remote": {
  94. "required": ["send"],
  95. "optional": ["receive"],
  96. },
  97. "select": {"required": ["option"], "optional": []},
  98. "sensor": {"required": ["sensor"], "optional": ["unit"]},
  99. "siren": {
  100. "required": [],
  101. "optional": ["tone", "volume", "duration", "switch"],
  102. },
  103. "switch": {"required": ["switch"], "optional": ["current_power_w"]},
  104. "vacuum": {
  105. "required": ["status"],
  106. "optional": [
  107. "command",
  108. "locate",
  109. "power",
  110. "activate",
  111. "battery",
  112. "direction_control",
  113. "error",
  114. "fan_speed",
  115. ],
  116. },
  117. "water_heater": {
  118. "required": [],
  119. "optional": [
  120. "current_temperature",
  121. "operation_mode",
  122. "temperature",
  123. "temperature_unit",
  124. "min_temperature",
  125. "max_temperature",
  126. "away_mode",
  127. ],
  128. },
  129. }
  130. class TestDeviceConfig(IsolatedAsyncioTestCase):
  131. """Test the device config parser"""
  132. def test_can_find_config_files(self):
  133. """Test that the config files can be found by the parser."""
  134. found = False
  135. for cfg in available_configs():
  136. found = True
  137. break
  138. self.assertTrue(found)
  139. def dp_match(self, condition, accounted, unaccounted, known, required=False):
  140. if type(condition) is str:
  141. known.add(condition)
  142. if condition in unaccounted:
  143. unaccounted.remove(condition)
  144. accounted.add(condition)
  145. if required:
  146. return condition in accounted
  147. else:
  148. return True
  149. elif "and" in condition:
  150. return self.and_match(
  151. condition["and"], accounted, unaccounted, known, required
  152. )
  153. elif "or" in condition:
  154. return self.or_match(condition["or"], accounted, unaccounted, known)
  155. elif "xor" in condition:
  156. return self.xor_match(
  157. condition["xor"], accounted, unaccounted, known, required
  158. )
  159. else:
  160. self.fail(f"Unrecognized condition {condition}")
  161. def and_match(self, conditions, accounted, unaccounted, known, required):
  162. single_match = False
  163. all_match = True
  164. for cond in conditions:
  165. match = self.dp_match(cond, accounted, unaccounted, known, True)
  166. all_match = all_match and match
  167. single_match = single_match or match
  168. if required:
  169. return all_match
  170. else:
  171. return all_match == single_match
  172. def or_match(self, conditions, accounted, unaccounted, known):
  173. match = False
  174. # loop through all, to ensure they are transferred to accounted list
  175. for cond in conditions:
  176. match = match or self.dp_match(cond, accounted, unaccounted, known, True)
  177. return match
  178. def xor_match(self, conditions, accounted, unaccounted, known, required):
  179. prior_match = False
  180. for cond in conditions:
  181. match = self.dp_match(cond, accounted, unaccounted, known, True)
  182. if match and prior_match:
  183. return False
  184. prior_match = prior_match or match
  185. # If any matched, all should be considered matched
  186. # this bit only handles nesting "and" within "xor"
  187. if prior_match:
  188. for c in conditions:
  189. if type(c) is str:
  190. accounted.add(c)
  191. elif "and" in c:
  192. for c2 in c["and"]:
  193. if type(c2) is str:
  194. accounted.add(c2)
  195. return prior_match or not required
  196. def rule_broken_msg(self, rule):
  197. msg = ""
  198. if type(rule) is str:
  199. return f"{msg} {rule}"
  200. elif "and" in rule:
  201. msg = f"{msg} all of ["
  202. for sub in rule["and"]:
  203. msg = f"{msg} {self.rule_broken_msg(sub)}"
  204. return f"{msg} ]"
  205. elif "or" in rule:
  206. msg = f"{msg} at least one of ["
  207. for sub in rule["or"]:
  208. msg = f"{msg} {self.rule_broken_msg(sub)}"
  209. return f"{msg} ]"
  210. elif "xor" in rule:
  211. msg = f"{msg} only one of ["
  212. for sub in rule["xor"]:
  213. msg = f"{msg} {self.rule_broken_msg(sub)}"
  214. return f"{msg} ]"
  215. return "for reason unknown"
  216. def check_entity(self, entity, cfg):
  217. """
  218. Check that the entity has a dps list and each dps has an id,
  219. type and name, and any other consistency checks.
  220. """
  221. self.assertIsNotNone(
  222. entity._config.get("entity"), f"entity type missing in {cfg}"
  223. )
  224. e = entity.config_id
  225. self.assertIsNotNone(
  226. entity._config.get("dps"), f"dps missing from {e} in {cfg}"
  227. )
  228. functions = set()
  229. extra = set()
  230. known = set()
  231. redirects = set()
  232. # Basic checks of dps, and initialising of redirects and extras sets
  233. # for later checking
  234. for dp in entity.dps():
  235. self.assertIsNotNone(
  236. dp._config.get("id"), f"dp id missing from {e} in {cfg}"
  237. )
  238. self.assertIsNotNone(
  239. dp._config.get("type"), f"dp type missing from {e} in {cfg}"
  240. )
  241. self.assertIsNotNone(
  242. dp._config.get("name"), f"dp name missing from {e} in {cfg}"
  243. )
  244. extra.add(dp.name)
  245. mappings = dp._config.get("mapping", [])
  246. self.assertIsInstance(
  247. mappings,
  248. list,
  249. f"mapping is not a list in {cfg}; entity {e}, dp {dp.name}",
  250. )
  251. for m in mappings:
  252. conditions = m.get("conditions", [])
  253. self.assertIsInstance(
  254. conditions,
  255. list,
  256. f"conditions is not a list in {cfg}; entity {e}, dp {dp.name}",
  257. )
  258. for c in conditions:
  259. if c.get("value_redirect"):
  260. redirects.add(c.get("value_redirect"))
  261. if c.get("value_mirror"):
  262. redirects.add(c.get("value_mirror"))
  263. if m.get("value_redirect"):
  264. redirects.add(m.get("value_redirect"))
  265. if m.get("value_mirror"):
  266. redirects.add(m.get("value_mirror"))
  267. # Check redirects all exist
  268. for redirect in redirects:
  269. self.assertIn(redirect, extra, f"dp {redirect} missing from {e} in {cfg}")
  270. # Check dps that are required for this entity type all exist
  271. expected = KNOWN_DPS.get(entity.entity)
  272. for rule in expected["required"]:
  273. self.assertTrue(
  274. self.dp_match(rule, functions, extra, known, True),
  275. f"{cfg} missing required {self.rule_broken_msg(rule)} in {e}",
  276. )
  277. for rule in expected["optional"]:
  278. self.assertTrue(
  279. self.dp_match(rule, functions, extra, known, False),
  280. f"{cfg} expecting {self.rule_broken_msg(rule)} in {e}",
  281. )
  282. # Check for potential typos in extra attributes
  283. known_extra = known - functions
  284. for attr in extra:
  285. for dp in known_extra:
  286. self.assertLess(
  287. fuzz.ratio(attr, dp),
  288. 85,
  289. f"Probable typo {attr} is too similar to {dp} in {cfg} {e}",
  290. )
  291. # Check that sensors with mapped values are of class enum and vice versa
  292. if entity.entity == "sensor":
  293. mock_device = MagicMock()
  294. sensor = TuyaLocalSensor(mock_device, entity)
  295. if sensor.options:
  296. self.assertEqual(
  297. entity.device_class,
  298. SensorDeviceClass.ENUM,
  299. f"{cfg} {e} has mapped values but does not have a device class of enum",
  300. )
  301. if entity.device_class == SensorDeviceClass.ENUM:
  302. self.assertIsNotNone(
  303. sensor.options,
  304. f"{cfg} {e} has a device class of enum, but has no mapped values",
  305. )
  306. def test_config_files_parse(self):
  307. """
  308. All configs should be parsable and meet certain criteria
  309. """
  310. for cfg in available_configs():
  311. entities = []
  312. parsed = TuyaDeviceConfig(cfg)
  313. # Check for error messages or unparsed config
  314. if isinstance(parsed, str) or isinstance(parsed._config, str):
  315. self.fail(f"unparsable yaml in {cfg}")
  316. self.assertIsNotNone(
  317. parsed._config.get("name"),
  318. f"name missing from {cfg}",
  319. )
  320. self.assertIsNotNone(
  321. parsed._config.get("primary_entity"),
  322. f"primary_entity missing from {cfg}",
  323. )
  324. self.check_entity(parsed.primary_entity, cfg)
  325. entities.append(parsed.primary_entity.config_id)
  326. secondary = False
  327. for entity in parsed.secondary_entities():
  328. secondary = True
  329. self.check_entity(entity, cfg)
  330. entities.append(entity.config_id)
  331. # check entities are unique
  332. self.assertCountEqual(entities, set(entities))
  333. # If there are no secondary entities, check that it is intended
  334. if not secondary:
  335. for key in parsed._config.keys():
  336. self.assertFalse(
  337. key.startswith("sec"),
  338. f"misspelled secondary_entities in {cfg}",
  339. )
  340. # Most of the device_config functionality is exercised during testing of
  341. # the various supported devices. These tests concentrate only on the gaps.
  342. def test_match_quality(self):
  343. """Test the match_quality function."""
  344. cfg = get_config("deta_fan")
  345. q = cfg.match_quality({**KOGAN_HEATER_PAYLOAD, "updated_at": 0})
  346. self.assertEqual(q, 0)
  347. q = cfg.match_quality({**GPPH_HEATER_PAYLOAD})
  348. self.assertEqual(q, 0)
  349. def test_entity_find_unknown_dps_fails(self):
  350. """Test that finding a dps that doesn't exist fails."""
  351. cfg = get_config("kogan_switch")
  352. non_existing = cfg.primary_entity.find_dps("missing")
  353. self.assertIsNone(non_existing)
  354. async def test_dps_async_set_readonly_value_fails(self):
  355. """Test that setting a readonly dps fails."""
  356. mock_device = MagicMock()
  357. cfg = get_config("goldair_gpph_heater")
  358. error_code = cfg.primary_entity.find_dps("error")
  359. with self.assertRaises(TypeError):
  360. await error_code.async_set_value(mock_device, 1)
  361. def test_dps_values_is_empty_with_no_mapping(self):
  362. """
  363. Test that a dps with no mapping returns None as its possible values
  364. """
  365. mock_device = MagicMock()
  366. cfg = get_config("goldair_gpph_heater")
  367. temp = cfg.primary_entity.find_dps("current_temperature")
  368. self.assertEqual(temp.values(mock_device), [])
  369. def test_config_returned(self):
  370. """Test that config file is returned by config"""
  371. cfg = get_config("kogan_switch")
  372. self.assertEqual(cfg.config, "smartplugv1.yaml")
  373. def test_float_matches_ints(self):
  374. """Test that the _typematch function matches int values to float dps"""
  375. self.assertTrue(_typematch(float, 1))
  376. def test_bytes_to_fmt_returns_string_for_unknown(self):
  377. """
  378. Test that the _bytes_to_fmt function parses unknown number of bytes
  379. as a string format.
  380. """
  381. self.assertEqual(_bytes_to_fmt(5), "5s")
  382. def test_deprecation(self):
  383. """Test that deprecation messages are picked from the config."""
  384. mock_device = MagicMock()
  385. mock_device.name = "Testing"
  386. mock_config = {"entity": "Test", "deprecated": "Passed"}
  387. cfg = TuyaEntityConfig(mock_device, mock_config)
  388. self.assertTrue(cfg.deprecated)
  389. self.assertEqual(
  390. cfg.deprecation_message,
  391. "The use of Test for Testing is deprecated and should be "
  392. "replaced by Passed.",
  393. )
  394. def test_format_with_none_defined(self):
  395. """Test that format returns None when there is none configured."""
  396. mock_entity = MagicMock()
  397. mock_config = {"id": "1", "name": "test", "type": "string"}
  398. cfg = TuyaDpsConfig(mock_entity, mock_config)
  399. self.assertIsNone(cfg.format)
  400. def test_decoding_base64(self):
  401. """Test that decoded_value works with base64 encoding."""
  402. mock_entity = MagicMock()
  403. mock_config = {"id": "1", "name": "test", "type": "base64"}
  404. mock_device = MagicMock()
  405. mock_device.get_property.return_value = "VGVzdA=="
  406. cfg = TuyaDpsConfig(mock_entity, mock_config)
  407. self.assertEqual(
  408. cfg.decoded_value(mock_device),
  409. bytes("Test", "utf-8"),
  410. )
  411. def test_decoding_hex(self):
  412. """Test that decoded_value works with hex encoding."""
  413. mock_entity = MagicMock()
  414. mock_config = {"id": "1", "name": "test", "type": "hex"}
  415. mock_device = MagicMock()
  416. mock_device.get_property.return_value = "babe"
  417. cfg = TuyaDpsConfig(mock_entity, mock_config)
  418. self.assertEqual(
  419. cfg.decoded_value(mock_device),
  420. b"\xba\xbe",
  421. )
  422. def test_decoding_unencoded(self):
  423. """Test that decoded_value returns the raw value when not encoded."""
  424. mock_entity = MagicMock()
  425. mock_config = {"id": "1", "name": "test", "type": "string"}
  426. mock_device = MagicMock()
  427. mock_device.get_property.return_value = "VGVzdA=="
  428. cfg = TuyaDpsConfig(mock_entity, mock_config)
  429. self.assertEqual(
  430. cfg.decoded_value(mock_device),
  431. "VGVzdA==",
  432. )
  433. def test_encoding_base64(self):
  434. """Test that encode_value works with base64."""
  435. mock_entity = MagicMock()
  436. mock_config = {"id": "1", "name": "test", "type": "base64"}
  437. cfg = TuyaDpsConfig(mock_entity, mock_config)
  438. self.assertEqual(cfg.encode_value(bytes("Test", "utf-8")), "VGVzdA==")
  439. def test_encoding_hex(self):
  440. """Test that encode_value works with base64."""
  441. mock_entity = MagicMock()
  442. mock_config = {"id": "1", "name": "test", "type": "hex"}
  443. cfg = TuyaDpsConfig(mock_entity, mock_config)
  444. self.assertEqual(cfg.encode_value(b"\xca\xfe"), "cafe")
  445. def test_encoding_unencoded(self):
  446. """Test that encode_value works with base64."""
  447. mock_entity = MagicMock()
  448. mock_config = {"id": "1", "name": "test", "type": "string"}
  449. cfg = TuyaDpsConfig(mock_entity, mock_config)
  450. self.assertEqual(cfg.encode_value("Test"), "Test")
  451. def test_match_returns_false_on_errors_with_bitfield(self):
  452. """Test that TypeError and ValueError cause match to return False."""
  453. mock_entity = MagicMock()
  454. mock_config = {"id": "1", "name": "test", "type": "bitfield"}
  455. cfg = TuyaDpsConfig(mock_entity, mock_config)
  456. self.assertFalse(cfg._match(15, "not an integer"))
  457. def test_values_with_mirror(self):
  458. """Test that value_mirror redirects."""
  459. mock_entity = MagicMock()
  460. mock_config = {
  461. "id": "1",
  462. "type": "string",
  463. "name": "test",
  464. "mapping": [
  465. {"dps_val": "mirror", "value_mirror": "map_mirror"},
  466. {"dps_val": "plain", "value": "unmirrored"},
  467. ],
  468. }
  469. mock_map_config = {
  470. "id": "2",
  471. "type": "string",
  472. "name": "map_mirror",
  473. "mapping": [
  474. {"dps_val": "1", "value": "map_one"},
  475. {"dps_val": "2", "value": "map_two"},
  476. ],
  477. }
  478. mock_device = MagicMock()
  479. mock_device.get_property.return_value = "1"
  480. cfg = TuyaDpsConfig(mock_entity, mock_config)
  481. map = TuyaDpsConfig(mock_entity, mock_map_config)
  482. mock_entity.find_dps.return_value = map
  483. self.assertCountEqual(
  484. cfg.values(mock_device),
  485. ["unmirrored", "map_one", "map_two"],
  486. )
  487. def test_get_device_id(self):
  488. """Test that check if device id is correct"""
  489. self.assertEqual("my-device-id", get_device_id({"device_id": "my-device-id"}))
  490. self.assertEqual("sub-id", get_device_id({"device_cid": "sub-id"}))
  491. self.assertEqual("s", get_device_id({"device_id": "d", "device_cid": "s"}))
  492. def test_getting_masked_hex(self):
  493. """Test that get_value works with masked hex encoding."""
  494. mock_entity = MagicMock()
  495. mock_config = {
  496. "id": "1",
  497. "name": "test",
  498. "type": "hex",
  499. "mapping": [
  500. {"mask": "ff00"},
  501. ],
  502. }
  503. mock_device = MagicMock()
  504. mock_device.get_property.return_value = "babe"
  505. cfg = TuyaDpsConfig(mock_entity, mock_config)
  506. self.assertEqual(
  507. cfg.get_value(mock_device),
  508. 0xBA,
  509. )
  510. def test_setting_masked_hex(self):
  511. """Test that get_values_to_set works with masked hex encoding."""
  512. mock_entity = MagicMock()
  513. mock_config = {
  514. "id": "1",
  515. "name": "test",
  516. "type": "hex",
  517. "mapping": [
  518. {"mask": "ff00"},
  519. ],
  520. }
  521. mock_device = MagicMock()
  522. mock_device.get_property.return_value = "babe"
  523. cfg = TuyaDpsConfig(mock_entity, mock_config)
  524. self.assertEqual(
  525. cfg.get_values_to_set(mock_device, 0xCA),
  526. {"1": "cabe"},
  527. )
  528. def test_default_without_mapping(self):
  529. """Test that default returns None when there is no mapping"""
  530. mock_entity = MagicMock()
  531. mock_config = {"id": "1", "name": "test", "type": "string"}
  532. cfg = TuyaDpsConfig(mock_entity, mock_config)
  533. self.assertIsNone(cfg.default)