test_device_config.py 16 KB


  1. """Test the config parser"""
  2. from fuzzywuzzy import fuzz
  3. from unittest import IsolatedAsyncioTestCase
  4. from unittest.mock import MagicMock
  5. from custom_components.tuya_local.helpers.device_config import (
  6. available_configs,
  7. get_config,
  8. _bytes_to_fmt,
  9. _typematch,
  10. TuyaDeviceConfig,
  11. TuyaDpsConfig,
  12. TuyaEntityConfig,
  13. )
  14. from .const import (
  15. GPPH_HEATER_PAYLOAD,
  16. KOGAN_HEATER_PAYLOAD,
  17. )
  18. KNOWN_DPS = {
  19. "binary_sensor": {"required": ["sensor"], "optional": []},
  20. "button": {"required": ["button"], "optional": []},
  21. "climate": {
  22. "required": [],
  23. "optional": [
  24. "aux_heat",
  25. "current_temperature",
  26. "current_humidity",
  27. "fan_mode",
  28. "humidity",
  29. "hvac_mode",
  30. "hvac_action",
  31. "min_temperature",
  32. "max_temperature",
  33. "preset_mode",
  34. "swing_mode",
  35. {
  36. "xor": [
  37. "temperature",
  38. {"and": ["target_temp_high", "target_temp_low"]},
  39. ]
  40. },
  41. "temperature_unit",
  42. ],
  43. },
  44. "cover": {
  45. "required": [{"or": ["control", "position"]}],
  46. "optional": [
  47. "current_position",
  48. "action",
  49. "open",
  50. "reversed",
  51. ],
  52. },
  53. "fan": {
  54. "required": [{"or": ["preset_mode", "speed"]}],
  55. "optional": ["switch", "oscillate", "direction"],
  56. },
  57. "humidifier": {"required": ["switch", "humidity"], "optional": ["mode"]},
  58. "light": {
  59. "required": [{"or": ["switch", "brightness", "effect"]}],
  60. "optional": ["color_mode", "color_temp", "rgbhsv"],
  61. },
  62. "lock": {
  63. "required": [
  64. {"or": ["lock", {"and": ["request_unlock", "approve_unlock"]}]},
  65. ],
  66. "optional": [
  67. "unlock_fingerprint",
  68. "unlock_password",
  69. "unlock_temp_pwd",
  70. "unlock_dynamic_pwd",
  71. "unlock_card",
  72. "unlock_app",
  73. "unlock_key",
  74. {"and": ["request_intercom", "approve_intercom"]},
  75. "jammed",
  76. ],
  77. },
  78. "number": {
  79. "required": ["value"],
  80. "optional": ["unit", "minimum", "maximum"],
  81. },
  82. "select": {"required": ["option"], "optional": []},
  83. "sensor": {"required": ["sensor"], "optional": ["unit"]},
  84. "siren": {"required": [], "optional": ["tone", "volume", "duration"]},
  85. "switch": {"required": ["switch"], "optional": ["current_power_w"]},
  86. "vacuum": {
  87. "required": ["status"],
  88. "optional": [
  89. "command",
  90. "locate",
  91. "power",
  92. "activate",
  93. "battery",
  94. "direction_control",
  95. "error",
  96. "fan_speed",
  97. ],
  98. },
  99. "water_heater": {
  100. "required": [],
  101. "optional": [
  102. "current_temperature",
  103. "operation_mode",
  104. "temperature",
  105. "temperature_unit",
  106. "min_temperature",
  107. "max_temperature",
  108. ],
  109. },
  110. }
  111. class TestDeviceConfig(IsolatedAsyncioTestCase):
  112. """Test the device config parser"""
  113. def test_can_find_config_files(self):
  114. """Test that the config files can be found by the parser."""
  115. found = False
  116. for cfg in available_configs():
  117. found = True
  118. break
  119. self.assertTrue(found)
  120. def dp_match(self, condition, accounted, unaccounted, known, required=False):
  121. if type(condition) is str:
  122. known.add(condition)
  123. if condition in unaccounted:
  124. unaccounted.remove(condition)
  125. accounted.add(condition)
  126. if required:
  127. return condition in accounted
  128. else:
  129. return True
  130. elif "and" in condition:
  131. return self.and_match(
  132. condition["and"], accounted, unaccounted, known, required
  133. )
  134. elif "or" in condition:
  135. return self.or_match(condition["or"], accounted, unaccounted, known)
  136. elif "xor" in condition:
  137. return self.xor_match(
  138. condition["xor"], accounted, unaccounted, known, required
  139. )
  140. else:
  141. self.assertTrue(False, f"Unrecognized condition {condition}")
  142. def and_match(self, conditions, accounted, unaccounted, known, required):
  143. single_match = False
  144. all_match = True
  145. for cond in conditions:
  146. match = self.dp_match(cond, accounted, unaccounted, known, True)
  147. all_match = all_match and match
  148. single_match = single_match or match
  149. if required:
  150. return all_match
  151. else:
  152. return all_match == single_match
  153. def or_match(self, conditions, accounted, unaccounted, known):
  154. match = False
  155. # loop through all, to ensure they are transferred to accounted list
  156. for cond in conditions:
  157. match = match or self.dp_match(cond, accounted, unaccounted, known, True)
  158. return match
  159. def xor_match(self, conditions, accounted, unaccounted, known, required):
  160. prior_match = False
  161. for cond in conditions:
  162. match = self.dp_match(cond, accounted, unaccounted, known, True)
  163. if match and prior_match:
  164. return False
  165. prior_match = prior_match or match
  166. # If any matched, all should be considered matched
  167. # this bit only handles nesting "and" within "xor"
  168. if prior_match:
  169. for c in conditions:
  170. if type(c) is str:
  171. accounted.add(c)
  172. elif "and" in c:
  173. for c2 in c["and"]:
  174. if type(c2) is str:
  175. accounted.add(c2)
  176. return prior_match or not required
  177. def rule_broken_msg(self, rule):
  178. msg = ""
  179. if type(rule) is str:
  180. return f"{msg} {rule}"
  181. elif "and" in rule:
  182. msg = f"{msg} all of ["
  183. for sub in rule["and"]:
  184. msg = f"{msg} {self.rule_broken_msg(sub)}"
  185. return f"{msg} ]"
  186. elif "or" in rule:
  187. msg = f"{msg} at least one of ["
  188. for sub in rule["or"]:
  189. msg = f"{msg} {self.rule_broken_msg(sub)}"
  190. return f"{msg} ]"
  191. elif "xor" in rule:
  192. msg = f"{msg} only one of ["
  193. for sub in rule["xor"]:
  194. msg = f"{msg} {self.rule_broken_msg(sub)}"
  195. return f"{msg} ]"
  196. return "for reason unknown"
  197. def check_entity(self, entity, cfg):
  198. """
  199. Check that the entity has a dps list and each dps has an id,
  200. type and name.
  201. """
  202. self.assertIsNotNone(
  203. entity._config.get("entity"), f"entity type missing in {cfg}"
  204. )
  205. e = entity.config_id
  206. self.assertIsNotNone(
  207. entity._config.get("dps"), f"dps missing from {e} in {cfg}"
  208. )
  209. functions = set()
  210. extra = set()
  211. known = set()
  212. for dp in entity.dps():
  213. self.assertIsNotNone(
  214. dp._config.get("id"), f"dp id missing from {e} in {cfg}"
  215. )
  216. self.assertIsNotNone(
  217. dp._config.get("type"), f"dp type missing from {e} in {cfg}"
  218. )
  219. self.assertIsNotNone(
  220. dp._config.get("name"), f"dp name missing from {e} in {cfg}"
  221. )
  222. extra.add(dp.name)
  223. expected = KNOWN_DPS.get(entity.entity)
  224. for rule in expected["required"]:
  225. self.assertTrue(
  226. self.dp_match(rule, functions, extra, known, True),
  227. f"{cfg} missing required {self.rule_broken_msg(rule)} in {e}",
  228. )
  229. for rule in expected["optional"]:
  230. self.assertTrue(
  231. self.dp_match(rule, functions, extra, known, False),
  232. f"{cfg} expecting {self.rule_broken_msg(rule)} in {e}",
  233. )
  234. # Check for potential typos in extra attributes
  235. known_extra = known - functions
  236. for attr in extra:
  237. for dp in known_extra:
  238. self.assertLess(
  239. fuzz.ratio(attr, dp),
  240. 85,
  241. f"Probable typo {attr} is too similar to {dp} in {cfg} {e}",
  242. )
  243. def test_config_files_parse(self):
  244. """
  245. All configs should be parsable and meet certain criteria
  246. """
  247. for cfg in available_configs():
  248. parsed = TuyaDeviceConfig(cfg)
  249. # Check for error messages or unparsed config
  250. if isinstance(parsed, str) or isinstance(parsed._config, str):
  251. self.fail(f"unparsable yaml in {cfg}")
  252. self.assertIsNotNone(
  253. parsed._config.get("name"),
  254. f"name missing from {cfg}",
  255. )
  256. self.assertIsNotNone(
  257. parsed._config.get("primary_entity"),
  258. f"primary_entity missing from {cfg}",
  259. )
  260. self.check_entity(parsed.primary_entity, cfg)
  261. for entity in parsed.secondary_entities():
  262. self.check_entity(entity, cfg)
  263. # Most of the device_config functionality is exercised during testing of
  264. # the various supported devices. These tests concentrate only on the gaps.
  265. def test_match_quality(self):
  266. """Test the match_quality function."""
  267. cfg = get_config("deta_fan")
  268. q = cfg.match_quality({**KOGAN_HEATER_PAYLOAD, "updated_at": 0})
  269. self.assertEqual(q, 0)
  270. q = cfg.match_quality({**GPPH_HEATER_PAYLOAD})
  271. self.assertEqual(q, 0)
  272. def test_entity_find_unknown_dps_fails(self):
  273. """Test that finding a dps that doesn't exist fails."""
  274. cfg = get_config("kogan_switch")
  275. non_existing = cfg.primary_entity.find_dps("missing")
  276. self.assertIsNone(non_existing)
  277. async def test_dps_async_set_readonly_value_fails(self):
  278. """Test that setting a readonly dps fails."""
  279. mock_device = MagicMock()
  280. cfg = get_config("kogan_switch")
  281. voltage = cfg.primary_entity.find_dps("voltage_v")
  282. with self.assertRaises(TypeError):
  283. await voltage.async_set_value(mock_device, 230)
  284. def test_dps_values_returns_none_with_no_mapping(self):
  285. """
  286. Test that a dps with no mapping returns None as its possible values
  287. """
  288. mock_device = MagicMock()
  289. cfg = get_config("kogan_switch")
  290. voltage = cfg.primary_entity.find_dps("voltage_v")
  291. self.assertIsNone(voltage.values(mock_device))
  292. def test_config_returned(self):
  293. """Test that config file is returned by config"""
  294. cfg = get_config("kogan_switch")
  295. self.assertEqual(cfg.config, "smartplugv1.yaml")
  296. def test_float_matches_ints(self):
  297. """Test that the _typematch function matches int values to float dps"""
  298. self.assertTrue(_typematch(float, 1))
  299. def test_bytes_to_fmt_returns_string_for_unknown(self):
  300. """
  301. Test that the _bytes_to_fmt function parses unknown number of bytes
  302. as a string format.
  303. """
  304. self.assertEqual(_bytes_to_fmt(5), "5s")
  305. def test_deprecation(self):
  306. """Test that deprecation messages are picked from the config."""
  307. mock_device = MagicMock()
  308. mock_device.name = "Testing"
  309. mock_config = {"entity": "Test", "deprecated": "Passed"}
  310. cfg = TuyaEntityConfig(mock_device, mock_config)
  311. self.assertTrue(cfg.deprecated)
  312. self.assertEqual(
  313. cfg.deprecation_message,
  314. "The use of Test for Testing is deprecated and should be "
  315. "replaced by Passed.",
  316. )
  317. def test_format_with_none_defined(self):
  318. """Test that format returns None when there is none configured."""
  319. mock_entity = MagicMock()
  320. mock_config = {"id": "1", "name": "test", "type": "string"}
  321. cfg = TuyaDpsConfig(mock_entity, mock_config)
  322. self.assertIsNone(cfg.format)
  323. def test_decoding_base64(self):
  324. """Test that decoded_value works with base64 encoding."""
  325. mock_entity = MagicMock()
  326. mock_config = {"id": "1", "name": "test", "type": "base64"}
  327. mock_device = MagicMock()
  328. mock_device.get_property.return_value = "VGVzdA=="
  329. cfg = TuyaDpsConfig(mock_entity, mock_config)
  330. self.assertEqual(
  331. cfg.decoded_value(mock_device),
  332. bytes("Test", "utf-8"),
  333. )
  334. def test_decoding_unencoded(self):
  335. """Test that decoded_value returns the raw value when not encoded."""
  336. mock_entity = MagicMock()
  337. mock_config = {"id": "1", "name": "test", "type": "string"}
  338. mock_device = MagicMock()
  339. mock_device.get_property.return_value = "VGVzdA=="
  340. cfg = TuyaDpsConfig(mock_entity, mock_config)
  341. self.assertEqual(
  342. cfg.decoded_value(mock_device),
  343. "VGVzdA==",
  344. )
  345. def test_encoding_base64(self):
  346. """Test that encode_value works with base64."""
  347. mock_entity = MagicMock()
  348. mock_config = {"id": "1", "name": "test", "type": "base64"}
  349. cfg = TuyaDpsConfig(mock_entity, mock_config)
  350. self.assertEqual(cfg.encode_value(bytes("Test", "utf-8")), "VGVzdA==")
  351. def test_encoding_unencoded(self):
  352. """Test that encode_value works with base64."""
  353. mock_entity = MagicMock()
  354. mock_config = {"id": "1", "name": "test", "type": "string"}
  355. cfg = TuyaDpsConfig(mock_entity, mock_config)
  356. self.assertEqual(cfg.encode_value("Test"), "Test")
  357. def test_match_returns_false_on_errors_with_bitfield(self):
  358. """Test that TypeError and ValueError cause match to return False."""
  359. mock_entity = MagicMock()
  360. mock_config = {"id": "1", "name": "test", "type": "bitfield"}
  361. cfg = TuyaDpsConfig(mock_entity, mock_config)
  362. self.assertFalse(cfg._match(15, "not an integer"))
  363. def test_values_with_mirror(self):
  364. """Test that value_mirror redirects."""
  365. mock_entity = MagicMock()
  366. mock_config = {
  367. "id": "1",
  368. "type": "string",
  369. "name": "test",
  370. "mapping": [
  371. {"dps_val": "mirror", "value_mirror": "map_mirror"},
  372. {"dps_val": "plain", "value": "unmirrored"},
  373. ],
  374. }
  375. mock_map_config = {
  376. "id": "2",
  377. "type": "string",
  378. "name": "map_mirror",
  379. "mapping": [
  380. {"dps_val": "1", "value": "map_one"},
  381. {"dps_val": "2", "value": "map_two"},
  382. ],
  383. }
  384. mock_device = MagicMock()
  385. mock_device.get_property.return_value = "1"
  386. cfg = TuyaDpsConfig(mock_entity, mock_config)
  387. map = TuyaDpsConfig(mock_entity, mock_map_config)
  388. mock_entity.find_dps.return_value = map
  389. self.assertCountEqual(
  390. cfg.values(mock_device),
  391. ["unmirrored", "map_one", "map_two"],
  392. )
  393. # values gets very complex, with things like mappings within conditions
  394. # within mappings. I'd expect something like this was added with purpose,
  395. # but it isn't exercised by any of the existing unit tests.
  396. # value-mirror above is explained by the fact that the device it was
  397. # added for never worked properly, so was removed.
  398. def test_default_without_mapping(self):
  399. """Test that default returns None when there is no mapping"""
  400. mock_entity = MagicMock()
  401. mock_config = {"id": "1", "name": "test", "type": "string"}
  402. cfg = TuyaDpsConfig(mock_entity, mock_config)
  403. self.assertIsNone(cfg.default())