test_device_config.py 21 KB


  1. """Test the config parser"""
  2. from unittest import IsolatedAsyncioTestCase
  3. from unittest.mock import MagicMock
  4. from fuzzywuzzy import fuzz
  5. from homeassistant.components.sensor import SensorDeviceClass
  6. from custom_components.tuya_local.helpers.config import get_device_id
  7. from custom_components.tuya_local.helpers.device_config import (
  8. TuyaDeviceConfig,
  9. TuyaDpsConfig,
  10. TuyaEntityConfig,
  11. _bytes_to_fmt,
  12. _typematch,
  13. available_configs,
  14. get_config,
  15. )
  16. from custom_components.tuya_local.sensor import TuyaLocalSensor
  17. from .const import GPPH_HEATER_PAYLOAD, KOGAN_HEATER_PAYLOAD
  18. KNOWN_DPS = {
  19. "alarm_control_panel": {
  20. "required": ["alarm_state"],
  21. "optional": ["trigger"],
  22. },
  23. "binary_sensor": {"required": ["sensor"], "optional": []},
  24. "button": {"required": ["button"], "optional": []},
  25. "camera": {
  26. "required": [],
  27. "optional": ["switch", "motion_enable", "snapshot", "record"],
  28. },
  29. "climate": {
  30. "required": [],
  31. "optional": [
  32. "aux_heat",
  33. "current_temperature",
  34. "current_humidity",
  35. "fan_mode",
  36. "humidity",
  37. "hvac_mode",
  38. "hvac_action",
  39. "min_temperature",
  40. "max_temperature",
  41. "preset_mode",
  42. "swing_mode",
  43. {
  44. "xor": [
  45. "temperature",
  46. {"and": ["target_temp_high", "target_temp_low"]},
  47. ]
  48. },
  49. "temperature_unit",
  50. ],
  51. },
  52. "cover": {
  53. "required": [{"or": ["control", "position"]}],
  54. "optional": [
  55. "current_position",
  56. "action",
  57. "open",
  58. "reversed",
  59. ],
  60. },
  61. "event": {"required": ["event"], "optional": []},
  62. "fan": {
  63. "required": [{"or": ["preset_mode", "speed"]}],
  64. "optional": ["switch", "oscillate", "direction"],
  65. },
  66. "humidifier": {
  67. "required": ["humidity"],
  68. "optional": ["switch", "mode", "current_humidity"],
  69. },
  70. "lawn_mower": {"required": ["activity", "command"], "optional": []},
  71. "light": {
  72. "required": [{"or": ["switch", "brightness", "effect"]}],
  73. "optional": ["color_mode", "color_temp", "rgbhsv"],
  74. },
  75. "lock": {
  76. "required": [],
  77. "optional": [
  78. "lock",
  79. {"and": ["request_unlock", "approve_unlock"]},
  80. {"and": ["request_intercom", "approve_intercom"]},
  81. "unlock_fingerprint",
  82. "unlock_password",
  83. "unlock_temp_pwd",
  84. "unlock_dynamic_pwd",
  85. "unlock_offline_pwd",
  86. "unlock_card",
  87. "unlock_app",
  88. "unlock_key",
  89. "unlock_ble",
  90. "jammed",
  91. ],
  92. },
  93. "number": {
  94. "required": ["value"],
  95. "optional": ["unit", "minimum", "maximum"],
  96. },
  97. "remote": {
  98. "required": ["send"],
  99. "optional": ["receive"],
  100. },
  101. "select": {"required": ["option"], "optional": []},
  102. "sensor": {"required": ["sensor"], "optional": ["unit"]},
  103. "siren": {
  104. "required": [],
  105. "optional": ["tone", "volume", "duration", "switch"],
  106. },
  107. "switch": {"required": ["switch"], "optional": ["current_power_w"]},
  108. "vacuum": {
  109. "required": ["status"],
  110. "optional": [
  111. "command",
  112. "locate",
  113. "power",
  114. "activate",
  115. "battery",
  116. "direction_control",
  117. "error",
  118. "fan_speed",
  119. ],
  120. },
  121. "water_heater": {
  122. "required": [],
  123. "optional": [
  124. "current_temperature",
  125. "operation_mode",
  126. "temperature",
  127. "temperature_unit",
  128. "min_temperature",
  129. "max_temperature",
  130. "away_mode",
  131. ],
  132. },
  133. }
  134. class TestDeviceConfig(IsolatedAsyncioTestCase):
  135. """Test the device config parser"""
  136. def test_can_find_config_files(self):
  137. """Test that the config files can be found by the parser."""
  138. found = False
  139. for cfg in available_configs():
  140. found = True
  141. break
  142. self.assertTrue(found)
  143. def dp_match(self, condition, accounted, unaccounted, known, required=False):
  144. if type(condition) is str:
  145. known.add(condition)
  146. if condition in unaccounted:
  147. unaccounted.remove(condition)
  148. accounted.add(condition)
  149. if required:
  150. return condition in accounted
  151. else:
  152. return True
  153. elif "and" in condition:
  154. return self.and_match(
  155. condition["and"], accounted, unaccounted, known, required
  156. )
  157. elif "or" in condition:
  158. return self.or_match(condition["or"], accounted, unaccounted, known)
  159. elif "xor" in condition:
  160. return self.xor_match(
  161. condition["xor"], accounted, unaccounted, known, required
  162. )
  163. else:
  164. self.fail(f"Unrecognized condition {condition}")
  165. def and_match(self, conditions, accounted, unaccounted, known, required):
  166. single_match = False
  167. all_match = True
  168. for cond in conditions:
  169. match = self.dp_match(cond, accounted, unaccounted, known, True)
  170. all_match = all_match and match
  171. single_match = single_match or match
  172. if required:
  173. return all_match
  174. else:
  175. return all_match == single_match
  176. def or_match(self, conditions, accounted, unaccounted, known):
  177. match = False
  178. # loop through all, to ensure they are transferred to accounted list
  179. for cond in conditions:
  180. match = match or self.dp_match(cond, accounted, unaccounted, known, True)
  181. return match
  182. def xor_match(self, conditions, accounted, unaccounted, known, required):
  183. prior_match = False
  184. for cond in conditions:
  185. match = self.dp_match(cond, accounted, unaccounted, known, True)
  186. if match and prior_match:
  187. return False
  188. prior_match = prior_match or match
  189. # If any matched, all should be considered matched
  190. # this bit only handles nesting "and" within "xor"
  191. if prior_match:
  192. for c in conditions:
  193. if type(c) is str:
  194. accounted.add(c)
  195. elif "and" in c:
  196. for c2 in c["and"]:
  197. if type(c2) is str:
  198. accounted.add(c2)
  199. return prior_match or not required
  200. def rule_broken_msg(self, rule):
  201. msg = ""
  202. if type(rule) is str:
  203. return f"{msg} {rule}"
  204. elif "and" in rule:
  205. msg = f"{msg} all of ["
  206. for sub in rule["and"]:
  207. msg = f"{msg} {self.rule_broken_msg(sub)}"
  208. return f"{msg} ]"
  209. elif "or" in rule:
  210. msg = f"{msg} at least one of ["
  211. for sub in rule["or"]:
  212. msg = f"{msg} {self.rule_broken_msg(sub)}"
  213. return f"{msg} ]"
  214. elif "xor" in rule:
  215. msg = f"{msg} only one of ["
  216. for sub in rule["xor"]:
  217. msg = f"{msg} {self.rule_broken_msg(sub)}"
  218. return f"{msg} ]"
  219. return "for reason unknown"
  220. def check_entity(self, entity, cfg):
  221. """
  222. Check that the entity has a dps list and each dps has an id,
  223. type and name, and any other consistency checks.
  224. """
  225. self.assertIsNotNone(
  226. entity._config.get("entity"), f"entity type missing in {cfg}"
  227. )
  228. e = entity.config_id
  229. self.assertIsNotNone(
  230. entity._config.get("dps"), f"dps missing from {e} in {cfg}"
  231. )
  232. functions = set()
  233. extra = set()
  234. known = set()
  235. redirects = set()
  236. # Basic checks of dps, and initialising of redirects and extras sets
  237. # for later checking
  238. for dp in entity.dps():
  239. self.assertIsNotNone(
  240. dp._config.get("id"), f"dp id missing from {e} in {cfg}"
  241. )
  242. self.assertIsNotNone(
  243. dp._config.get("type"), f"dp type missing from {e} in {cfg}"
  244. )
  245. self.assertIsNotNone(
  246. dp._config.get("name"), f"dp name missing from {e} in {cfg}"
  247. )
  248. extra.add(dp.name)
  249. mappings = dp._config.get("mapping", [])
  250. self.assertIsInstance(
  251. mappings,
  252. list,
  253. f"mapping is not a list in {cfg}; entity {e}, dp {dp.name}",
  254. )
  255. for m in mappings:
  256. conditions = m.get("conditions", [])
  257. self.assertIsInstance(
  258. conditions,
  259. list,
  260. f"conditions is not a list in {cfg}; entity {e}, dp {dp.name}",
  261. )
  262. for c in conditions:
  263. if c.get("value_redirect"):
  264. redirects.add(c.get("value_redirect"))
  265. if c.get("value_mirror"):
  266. redirects.add(c.get("value_mirror"))
  267. if m.get("value_redirect"):
  268. redirects.add(m.get("value_redirect"))
  269. if m.get("value_mirror"):
  270. redirects.add(m.get("value_mirror"))
  271. # Check redirects all exist
  272. for redirect in redirects:
  273. self.assertIn(redirect, extra, f"dp {redirect} missing from {e} in {cfg}")
  274. # Check dps that are required for this entity type all exist
  275. expected = KNOWN_DPS.get(entity.entity)
  276. for rule in expected["required"]:
  277. self.assertTrue(
  278. self.dp_match(rule, functions, extra, known, True),
  279. f"{cfg} missing required {self.rule_broken_msg(rule)} in {e}",
  280. )
  281. for rule in expected["optional"]:
  282. self.assertTrue(
  283. self.dp_match(rule, functions, extra, known, False),
  284. f"{cfg} expecting {self.rule_broken_msg(rule)} in {e}",
  285. )
  286. # Check for potential typos in extra attributes
  287. known_extra = known - functions
  288. for attr in extra:
  289. for dp in known_extra:
  290. self.assertLess(
  291. fuzz.ratio(attr, dp),
  292. 85,
  293. f"Probable typo {attr} is too similar to {dp} in {cfg} {e}",
  294. )
  295. # Check that sensors with mapped values are of class enum and vice versa
  296. if entity.entity == "sensor":
  297. mock_device = MagicMock()
  298. sensor = TuyaLocalSensor(mock_device, entity)
  299. if sensor.options:
  300. self.assertEqual(
  301. entity.device_class,
  302. SensorDeviceClass.ENUM,
  303. f"{cfg} {e} has mapped values but does not have a device class of enum",
  304. )
  305. if entity.device_class == SensorDeviceClass.ENUM:
  306. self.assertIsNotNone(
  307. sensor.options,
  308. f"{cfg} {e} has a device class of enum, but has no mapped values",
  309. )
  310. def test_config_files_parse(self):
  311. """
  312. All configs should be parsable and meet certain criteria
  313. """
  314. for cfg in available_configs():
  315. entities = []
  316. parsed = TuyaDeviceConfig(cfg)
  317. # Check for error messages or unparsed config
  318. if isinstance(parsed, str) or isinstance(parsed._config, str):
  319. self.fail(f"unparsable yaml in {cfg}")
  320. self.assertIsNotNone(
  321. parsed._config.get("name"),
  322. f"name missing from {cfg}",
  323. )
  324. self.assertIsNotNone(
  325. parsed._config.get("primary_entity"),
  326. f"primary_entity missing from {cfg}",
  327. )
  328. self.check_entity(parsed.primary_entity, cfg)
  329. entities.append(parsed.primary_entity.config_id)
  330. secondary = False
  331. for entity in parsed.secondary_entities():
  332. secondary = True
  333. self.check_entity(entity, cfg)
  334. entities.append(entity.config_id)
  335. # check entities are unique
  336. self.assertCountEqual(entities, set(entities))
  337. # If there are no secondary entities, check that it is intended
  338. if not secondary:
  339. for key in parsed._config.keys():
  340. self.assertFalse(
  341. key.startswith("sec"),
  342. f"misspelled secondary_entities in {cfg}",
  343. )
  344. # Most of the device_config functionality is exercised during testing of
  345. # the various supported devices. These tests concentrate only on the gaps.
  346. def test_match_quality(self):
  347. """Test the match_quality function."""
  348. cfg = get_config("deta_fan")
  349. q = cfg.match_quality({**KOGAN_HEATER_PAYLOAD, "updated_at": 0})
  350. self.assertEqual(q, 0)
  351. q = cfg.match_quality({**GPPH_HEATER_PAYLOAD})
  352. self.assertEqual(q, 0)
  353. def test_entity_find_unknown_dps_fails(self):
  354. """Test that finding a dps that doesn't exist fails."""
  355. cfg = get_config("kogan_switch")
  356. non_existing = cfg.primary_entity.find_dps("missing")
  357. self.assertIsNone(non_existing)
  358. async def test_dps_async_set_readonly_value_fails(self):
  359. """Test that setting a readonly dps fails."""
  360. mock_device = MagicMock()
  361. cfg = get_config("goldair_gpph_heater")
  362. error_code = cfg.primary_entity.find_dps("error")
  363. with self.assertRaises(TypeError):
  364. await error_code.async_set_value(mock_device, 1)
  365. def test_dps_values_is_empty_with_no_mapping(self):
  366. """
  367. Test that a dps with no mapping returns None as its possible values
  368. """
  369. mock_device = MagicMock()
  370. cfg = get_config("goldair_gpph_heater")
  371. temp = cfg.primary_entity.find_dps("current_temperature")
  372. self.assertEqual(temp.values(mock_device), [])
  373. def test_config_returned(self):
  374. """Test that config file is returned by config"""
  375. cfg = get_config("kogan_switch")
  376. self.assertEqual(cfg.config, "smartplugv1.yaml")
  377. def test_float_matches_ints(self):
  378. """Test that the _typematch function matches int values to float dps"""
  379. self.assertTrue(_typematch(float, 1))
  380. def test_bytes_to_fmt_returns_string_for_unknown(self):
  381. """
  382. Test that the _bytes_to_fmt function parses unknown number of bytes
  383. as a string format.
  384. """
  385. self.assertEqual(_bytes_to_fmt(5), "5s")
  386. def test_deprecation(self):
  387. """Test that deprecation messages are picked from the config."""
  388. mock_device = MagicMock()
  389. mock_device.name = "Testing"
  390. mock_config = {"entity": "Test", "deprecated": "Passed"}
  391. cfg = TuyaEntityConfig(mock_device, mock_config)
  392. self.assertTrue(cfg.deprecated)
  393. self.assertEqual(
  394. cfg.deprecation_message,
  395. "The use of Test for Testing is deprecated and should be "
  396. "replaced by Passed.",
  397. )
  398. def test_format_with_none_defined(self):
  399. """Test that format returns None when there is none configured."""
  400. mock_entity = MagicMock()
  401. mock_config = {"id": "1", "name": "test", "type": "string"}
  402. cfg = TuyaDpsConfig(mock_entity, mock_config)
  403. self.assertIsNone(cfg.format)
  404. def test_decoding_base64(self):
  405. """Test that decoded_value works with base64 encoding."""
  406. mock_entity = MagicMock()
  407. mock_config = {"id": "1", "name": "test", "type": "base64"}
  408. mock_device = MagicMock()
  409. mock_device.get_property.return_value = "VGVzdA=="
  410. cfg = TuyaDpsConfig(mock_entity, mock_config)
  411. self.assertEqual(
  412. cfg.decoded_value(mock_device),
  413. bytes("Test", "utf-8"),
  414. )
  415. def test_decoding_hex(self):
  416. """Test that decoded_value works with hex encoding."""
  417. mock_entity = MagicMock()
  418. mock_config = {"id": "1", "name": "test", "type": "hex"}
  419. mock_device = MagicMock()
  420. mock_device.get_property.return_value = "babe"
  421. cfg = TuyaDpsConfig(mock_entity, mock_config)
  422. self.assertEqual(
  423. cfg.decoded_value(mock_device),
  424. b"\xba\xbe",
  425. )
  426. def test_decoding_unencoded(self):
  427. """Test that decoded_value returns the raw value when not encoded."""
  428. mock_entity = MagicMock()
  429. mock_config = {"id": "1", "name": "test", "type": "string"}
  430. mock_device = MagicMock()
  431. mock_device.get_property.return_value = "VGVzdA=="
  432. cfg = TuyaDpsConfig(mock_entity, mock_config)
  433. self.assertEqual(
  434. cfg.decoded_value(mock_device),
  435. "VGVzdA==",
  436. )
  437. def test_encoding_base64(self):
  438. """Test that encode_value works with base64."""
  439. mock_entity = MagicMock()
  440. mock_config = {"id": "1", "name": "test", "type": "base64"}
  441. cfg = TuyaDpsConfig(mock_entity, mock_config)
  442. self.assertEqual(cfg.encode_value(bytes("Test", "utf-8")), "VGVzdA==")
  443. def test_encoding_hex(self):
  444. """Test that encode_value works with base64."""
  445. mock_entity = MagicMock()
  446. mock_config = {"id": "1", "name": "test", "type": "hex"}
  447. cfg = TuyaDpsConfig(mock_entity, mock_config)
  448. self.assertEqual(cfg.encode_value(b"\xca\xfe"), "cafe")
  449. def test_encoding_unencoded(self):
  450. """Test that encode_value works with base64."""
  451. mock_entity = MagicMock()
  452. mock_config = {"id": "1", "name": "test", "type": "string"}
  453. cfg = TuyaDpsConfig(mock_entity, mock_config)
  454. self.assertEqual(cfg.encode_value("Test"), "Test")
  455. def test_match_returns_false_on_errors_with_bitfield(self):
  456. """Test that TypeError and ValueError cause match to return False."""
  457. mock_entity = MagicMock()
  458. mock_config = {"id": "1", "name": "test", "type": "bitfield"}
  459. cfg = TuyaDpsConfig(mock_entity, mock_config)
  460. self.assertFalse(cfg._match(15, "not an integer"))
  461. def test_values_with_mirror(self):
  462. """Test that value_mirror redirects."""
  463. mock_entity = MagicMock()
  464. mock_config = {
  465. "id": "1",
  466. "type": "string",
  467. "name": "test",
  468. "mapping": [
  469. {"dps_val": "mirror", "value_mirror": "map_mirror"},
  470. {"dps_val": "plain", "value": "unmirrored"},
  471. ],
  472. }
  473. mock_map_config = {
  474. "id": "2",
  475. "type": "string",
  476. "name": "map_mirror",
  477. "mapping": [
  478. {"dps_val": "1", "value": "map_one"},
  479. {"dps_val": "2", "value": "map_two"},
  480. ],
  481. }
  482. mock_device = MagicMock()
  483. mock_device.get_property.return_value = "1"
  484. cfg = TuyaDpsConfig(mock_entity, mock_config)
  485. map = TuyaDpsConfig(mock_entity, mock_map_config)
  486. mock_entity.find_dps.return_value = map
  487. self.assertCountEqual(
  488. cfg.values(mock_device),
  489. ["unmirrored", "map_one", "map_two"],
  490. )
  491. def test_get_device_id(self):
  492. """Test that check if device id is correct"""
  493. self.assertEqual("my-device-id", get_device_id({"device_id": "my-device-id"}))
  494. self.assertEqual("sub-id", get_device_id({"device_cid": "sub-id"}))
  495. self.assertEqual("s", get_device_id({"device_id": "d", "device_cid": "s"}))
  496. def test_getting_masked_hex(self):
  497. """Test that get_value works with masked hex encoding."""
  498. mock_entity = MagicMock()
  499. mock_config = {
  500. "id": "1",
  501. "name": "test",
  502. "type": "hex",
  503. "mapping": [
  504. {"mask": "ff00"},
  505. ],
  506. }
  507. mock_device = MagicMock()
  508. mock_device.get_property.return_value = "babe"
  509. cfg = TuyaDpsConfig(mock_entity, mock_config)
  510. self.assertEqual(
  511. cfg.get_value(mock_device),
  512. 0xBA,
  513. )
  514. def test_setting_masked_hex(self):
  515. """Test that get_values_to_set works with masked hex encoding."""
  516. mock_entity = MagicMock()
  517. mock_config = {
  518. "id": "1",
  519. "name": "test",
  520. "type": "hex",
  521. "mapping": [
  522. {"mask": "ff00"},
  523. ],
  524. }
  525. mock_device = MagicMock()
  526. mock_device.get_property.return_value = "babe"
  527. cfg = TuyaDpsConfig(mock_entity, mock_config)
  528. self.assertEqual(
  529. cfg.get_values_to_set(mock_device, 0xCA),
  530. {"1": "cabe"},
  531. )
  532. def test_default_without_mapping(self):
  533. """Test that default returns None when there is no mapping"""
  534. mock_entity = MagicMock()
  535. mock_config = {"id": "1", "name": "test", "type": "string"}
  536. cfg = TuyaDpsConfig(mock_entity, mock_config)
  537. self.assertIsNone(cfg.default)