device_config.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409
  1. """
  2. Config parser for Tuya Local devices.
  3. """
  4. from fnmatch import fnmatch
  5. import logging
  6. from os import walk
  7. from os.path import join, dirname
  8. from pydoc import locate
  9. from homeassistant.util.yaml import load_yaml
  10. import custom_components.tuya_local.devices as config_dir
  11. _LOGGER = logging.getLogger(__name__)
  12. def _typematch(type, value):
  13. # Workaround annoying legacy of bool being a subclass of int in Python
  14. if type is int and isinstance(value, bool):
  15. return False
  16. if isinstance(value, type):
  17. return True
  18. # Allow values embedded in strings if they can be converted
  19. # But not for bool, as everything can be converted to bool
  20. elif isinstance(value, str) and type is not bool:
  21. try:
  22. type(value)
  23. return True
  24. except ValueError:
  25. return False
  26. return False
  27. class TuyaDeviceConfig:
  28. """Representation of a device config for Tuya Local devices."""
  29. def __init__(self, fname):
  30. """Initialize the device config.
  31. Args:
  32. fname (string): The filename of the yaml config to load."""
  33. _CONFIG_DIR = dirname(config_dir.__file__)
  34. self._fname = fname
  35. filename = join(_CONFIG_DIR, fname)
  36. self._config = load_yaml(filename)
  37. _LOGGER.debug("Loaded device config %s", fname)
  38. @property
  39. def name(self):
  40. """Return the friendly name for this device."""
  41. return self._config["name"]
  42. @property
  43. def config(self):
  44. """Return the config file associated with this device."""
  45. return self._fname
  46. @property
  47. def legacy_type(self):
  48. """Return the legacy conf_type associated with this device."""
  49. return self._config.get("legacy_type", None)
  50. @property
  51. def primary_entity(self):
  52. """Return the primary type of entity for this device."""
  53. return TuyaEntityConfig(self, self._config["primary_entity"])
  54. def secondary_entities(self):
  55. """Iterate through entites for any secondary entites supported."""
  56. if "secondary_entities" in self._config.keys():
  57. for conf in self._config["secondary_entities"]:
  58. yield TuyaEntityConfig(self, conf)
  59. def matches(self, dps):
  60. """Determine if this device matches the provided dps map."""
  61. for d in self.primary_entity.dps():
  62. if d.id not in dps.keys() or not _typematch(d.type, dps[d.id]):
  63. return False
  64. for dev in self.secondary_entities():
  65. for d in dev.dps():
  66. if d.id not in dps.keys() or not _typematch(d.type, dps[d.id]):
  67. return False
  68. _LOGGER.debug("Matched config for %s", self.name)
  69. return True
  70. def _entity_match_analyse(self, entity, keys, matched, dps):
  71. """
  72. Determine whether this entity can be a match for the dps
  73. Args:
  74. entity - the TuyaEntityConfig to check against
  75. keys - the unmatched keys for the device
  76. matched - the matched keys for the device
  77. dps - the dps values to be matched
  78. Side Effects:
  79. Moves items from keys to matched if they match dps
  80. Return Value:
  81. True if all dps in entity could be matched to dps, False otherwise
  82. """
  83. for d in entity.dps():
  84. if (d.id not in keys and d.id not in matched) or not _typematch(
  85. d.type, dps[d.id]
  86. ):
  87. return False
  88. if d.id in keys:
  89. matched.append(d.id)
  90. keys.remove(d.id)
  91. return True
  92. def match_quality(self, dps):
  93. """Determine the match quality for the provided dps map."""
  94. keys = list(dps.keys())
  95. matched = []
  96. if "updated_at" in keys:
  97. keys.remove("updated_at")
  98. total = len(keys)
  99. if not self._entity_match_analyse(self.primary_entity, keys, matched, dps):
  100. return 0
  101. for e in self.secondary_entities():
  102. if not self._entity_match_analyse(e, keys, matched, dps):
  103. return 0
  104. return round((total - len(keys)) * 100 / total)
  105. class TuyaEntityConfig:
  106. """Representation of an entity config for a supported entity."""
  107. def __init__(self, device, config):
  108. self._device = device
  109. self._config = config
  110. @property
  111. def name(self):
  112. """The friendly name for this entity."""
  113. return self._config.get("name", self._device.name)
  114. @property
  115. def legacy_class(self):
  116. """Return the legacy device corresponding to this config."""
  117. name = self._config.get("legacy_class", None)
  118. if name is None:
  119. return None
  120. return locate("custom_components.tuya_local" + name)
  121. @property
  122. def entity(self):
  123. """The entity type of this entity."""
  124. return self._config["entity"]
  125. @property
  126. def device_class(self):
  127. """The device class of this entity."""
  128. return self._config.get("class", None)
  129. def dps(self):
  130. """Iterate through the list of dps for this entity."""
  131. for d in self._config["dps"]:
  132. yield TuyaDpsConfig(self, d)
  133. def find_dps(self, name):
  134. """Find a dps with the specified name."""
  135. for d in self.dps():
  136. if d.name == name:
  137. return d
  138. return None
  139. class TuyaDpsConfig:
  140. """Representation of a dps config."""
  141. def __init__(self, entity, config):
  142. self._entity = entity
  143. self._config = config
  144. @property
  145. def id(self):
  146. return str(self._config["id"])
  147. @property
  148. def type(self):
  149. t = self._config["type"]
  150. types = {
  151. "boolean": bool,
  152. "integer": int,
  153. "string": str,
  154. "float": float,
  155. "bitfield": int,
  156. }
  157. return types.get(t, None)
  158. @property
  159. def name(self):
  160. return self._config["name"]
  161. def get_value(self, device):
  162. """Return the value of the dps from the given device."""
  163. return self._map_from_dps(device.get_property(self.id), device)
  164. async def async_set_value(self, device, value):
  165. """Set the value of the dps in the given device to given value."""
  166. if self.readonly:
  167. raise TypeError(f"{self.name} is read only")
  168. await device.async_set_property(self.id, self._map_to_dps(value, device))
  169. @property
  170. def values(self):
  171. """Return the possible values a dps can take."""
  172. if "mapping" not in self._config.keys():
  173. return None
  174. val = []
  175. for m in self._config["mapping"]:
  176. if "value" in m:
  177. val.append(m["value"])
  178. if "conditions" in m:
  179. for c in m["conditions"]:
  180. if "value" in c:
  181. val.append(c["value"])
  182. return list(set(val)) if len(val) > 0 else None
  183. @property
  184. def range(self):
  185. """Return the range for this dps if configured."""
  186. if (
  187. "range" in self._config.keys()
  188. and "min" in self._config["range"].keys()
  189. and "max" in self._config["range"].keys()
  190. ):
  191. return self._config["range"]
  192. else:
  193. return None
  194. def step(self, device):
  195. step = 1
  196. scale = 1
  197. mapping = self._find_map_for_dps(device.get_property(self.id))
  198. if mapping is not None:
  199. step = mapping.get("step", 1)
  200. scale = mapping.get("scale", 1)
  201. return step / scale
  202. @property
  203. def readonly(self):
  204. return "readonly" in self._config.keys() and self._config["readonly"] is True
  205. @property
  206. def hidden(self):
  207. return "hidden" in self._config.keys() and self._config["hidden"] is True
  208. def _find_map_for_dps(self, value):
  209. if "mapping" not in self._config.keys():
  210. return None
  211. default = None
  212. for m in self._config["mapping"]:
  213. if "dps_val" not in m:
  214. default = m
  215. elif str(m["dps_val"]) == str(value):
  216. return m
  217. return default
  218. def _map_from_dps(self, value, device):
  219. result = value
  220. mapping = self._find_map_for_dps(value)
  221. if mapping is not None:
  222. scale = mapping.get("scale", 1)
  223. if not isinstance(scale, (int, float)):
  224. scale = 1
  225. replaced = "value" in mapping
  226. result = mapping.get("value", result)
  227. if "conditions" in mapping:
  228. cond_dps = (
  229. self
  230. if "constraint" not in mapping
  231. else self._entity.find_dps(mapping["constraint"])
  232. )
  233. for c in mapping["conditions"]:
  234. if (
  235. "dps_val" in c
  236. and c["dps_val"] == device.get_property(cond_dps.id)
  237. and "value" in c
  238. ):
  239. result = c["value"]
  240. replaced = True
  241. if scale != 1 and isinstance(result, (int, float)):
  242. result = result / scale
  243. replaced = True
  244. if replaced:
  245. _LOGGER.debug(
  246. "%s: Mapped dps %s value from %s to %s",
  247. self._entity._device.name,
  248. self.id,
  249. value,
  250. result,
  251. )
  252. return result
  253. def _find_map_for_value(self, value):
  254. if "mapping" not in self._config.keys():
  255. return None
  256. default = None
  257. for m in self._config["mapping"]:
  258. if "dps_val" not in m:
  259. default = m
  260. if "value" in m and str(m["value"]) == str(value):
  261. return m
  262. if "conditions" in m:
  263. for c in m["conditions"]:
  264. if "value" in c and c["value"] == value:
  265. return m
  266. return default
  267. def _map_to_dps(self, value, device):
  268. result = value
  269. mapping = self._find_map_for_value(value)
  270. if mapping is not None:
  271. replaced = False
  272. scale = mapping.get("scale", 1)
  273. if not isinstance(scale, (int, float)):
  274. scale = 1
  275. step = mapping.get("step", None)
  276. if not isinstance(step, (int, float)):
  277. step = None
  278. if "dps_val" in mapping:
  279. result = mapping["dps_val"]
  280. replaced = True
  281. # Conditions may have side effect of setting another value.
  282. if "conditions" in mapping and "constraint" in mapping:
  283. c_dps = self._entity.find_dps(mapping["constraint"])
  284. for c in mapping["conditions"]:
  285. if "value" in c and c["value"] == value:
  286. device.set_property(c_dps.id, c["dps_val"])
  287. if scale != 1 and isinstance(result, (int, float)):
  288. _LOGGER.debug(f"Scaling {result} by {scale}")
  289. result = result * scale
  290. replaced = True
  291. if step is not None and isinstance(result, (int, float)):
  292. _LOGGER.debug(f"Stepping {result} to {step}")
  293. result = step * round(float(result) / step)
  294. replaced = True
  295. if replaced:
  296. _LOGGER.debug(
  297. "%s: Mapped dps %s to %s from %s",
  298. self._entity._device.name,
  299. self.id,
  300. result,
  301. value,
  302. )
  303. if self.range is not None:
  304. minimum = self.range["min"]
  305. maximum = self.range["max"]
  306. if result < minimum or result > maximum:
  307. raise ValueError(
  308. f"Target {self.name} ({value}) must be between "
  309. f"{minimum} and {maximum}"
  310. )
  311. if self.type is int:
  312. _LOGGER.debug(f"Rounding {self.name}")
  313. result = int(round(result))
  314. elif self.type is bool:
  315. result = True if result else False
  316. elif self.type is float:
  317. result = float(result)
  318. elif self.type is str:
  319. result = str(result)
  320. return result
  321. def available_configs():
  322. """List the available config files."""
  323. _CONFIG_DIR = dirname(config_dir.__file__)
  324. for (path, dirs, files) in walk(_CONFIG_DIR):
  325. for basename in sorted(files):
  326. if fnmatch(basename, "*.yaml"):
  327. yield basename
  328. def possible_matches(dps):
  329. """Return possible matching configs for a given set of dps values."""
  330. for cfg in available_configs():
  331. parsed = TuyaDeviceConfig(cfg)
  332. if parsed.matches(dps):
  333. yield parsed
  334. def config_for_legacy_use(conf_type):
  335. """
  336. Return a config to use with config_type for legacy transition.
  337. Note: as there are two variants for Kogan Socket, this is not guaranteed
  338. to be the correct config for the device, so only use it for looking up
  339. the legacy class during the transition period.
  340. """
  341. for cfg in available_configs():
  342. parsed = TuyaDeviceConfig(cfg)
  343. if parsed.legacy_type == conf_type:
  344. return parsed
  345. return None