device_config.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. """
  2. Config parser for Tuya Local devices.
  3. """
  4. from fnmatch import fnmatch
  5. import logging
  6. from os import walk
  7. from os.path import join, dirname
  8. from pydoc import locate
  9. from homeassistant.util.yaml import load_yaml
  10. import custom_components.tuya_local.devices as config_dir
  11. _LOGGER = logging.getLogger(__name__)
  12. def _typematch(type, value):
  13. # Workaround annoying legacy of bool being a subclass of int in Python
  14. if type is int and isinstance(value, bool):
  15. return False
  16. if isinstance(value, type):
  17. return True
  18. # Allow values embedded in strings if they can be converted
  19. # But not for bool, as everything can be converted to bool
  20. elif isinstance(value, str) and type is not bool:
  21. try:
  22. type(value)
  23. return True
  24. except ValueError:
  25. return False
  26. return False
  27. class TuyaDeviceConfig:
  28. """Representation of a device config for Tuya Local devices."""
  29. def __init__(self, fname):
  30. """Initialize the device config.
  31. Args:
  32. fname (string): The filename of the yaml config to load."""
  33. _CONFIG_DIR = dirname(config_dir.__file__)
  34. self._fname = fname
  35. filename = join(_CONFIG_DIR, fname)
  36. self._config = load_yaml(filename)
  37. _LOGGER.debug("Loaded device config %s", fname)
  38. @property
  39. def name(self):
  40. """Return the friendly name for this device."""
  41. return self._config["name"]
  42. @property
  43. def config(self):
  44. """Return the config file associated with this device."""
  45. return self._fname
  46. @property
  47. def legacy_type(self):
  48. """Return the legacy conf_type associated with this device."""
  49. return self._config.get("legacy_type", None)
  50. @property
  51. def primary_entity(self):
  52. """Return the primary type of entity for this device."""
  53. return TuyaEntityConfig(self, self._config["primary_entity"])
  54. def secondary_entities(self):
  55. """Iterate through entites for any secondary entites supported."""
  56. if "secondary_entities" in self._config.keys():
  57. for conf in self._config["secondary_entities"]:
  58. yield TuyaEntityConfig(self, conf)
  59. def matches(self, dps):
  60. """Determine if this device matches the provided dps map."""
  61. for d in self.primary_entity.dps():
  62. if d.id not in dps.keys() or not _typematch(d.type, dps[d.id]):
  63. return False
  64. for dev in self.secondary_entities():
  65. for d in dev.dps():
  66. if d.id not in dps.keys() or not _typematch(d.type, dps[d.id]):
  67. return False
  68. _LOGGER.debug("Matched config for %s", self.name)
  69. return True
  70. def _entity_match_analyse(self, entity, keys, matched, dps):
  71. """
  72. Determine whether this entity can be a match for the dps
  73. Args:
  74. entity - the TuyaEntityConfig to check against
  75. keys - the unmatched keys for the device
  76. matched - the matched keys for the device
  77. dps - the dps values to be matched
  78. Side Effects:
  79. Moves items from keys to matched if they match dps
  80. Return Value:
  81. True if all dps in entity could be matched to dps, False otherwise
  82. """
  83. for d in entity.dps():
  84. if (d.id not in keys and d.id not in matched) or not _typematch(
  85. d.type, dps[d.id]
  86. ):
  87. return False
  88. if d.id in keys:
  89. matched.append(d.id)
  90. keys.remove(d.id)
  91. return True
  92. def match_quality(self, dps):
  93. """Determine the match quality for the provided dps map."""
  94. keys = list(dps.keys())
  95. matched = []
  96. if "updated_at" in keys:
  97. keys.remove("updated_at")
  98. total = len(keys)
  99. if not self._entity_match_analyse(self.primary_entity, keys, matched, dps):
  100. return 0
  101. for e in self.secondary_entities():
  102. if not self._entity_match_analyse(e, keys, matched, dps):
  103. return 0
  104. return round((total - len(keys)) * 100 / total)
  105. class TuyaEntityConfig:
  106. """Representation of an entity config for a supported entity."""
  107. def __init__(self, device, config):
  108. self._device = device
  109. self._config = config
  110. @property
  111. def name(self):
  112. """The friendly name for this entity."""
  113. return self._config.get("name", self._device.name)
  114. @property
  115. def legacy_class(self):
  116. """Return the legacy device corresponding to this config."""
  117. name = self._config.get("legacy_class", None)
  118. if name is None:
  119. return None
  120. return locate("custom_components.tuya_local" + name)
  121. @property
  122. def entity(self):
  123. """The entity type of this entity."""
  124. return self._config["entity"]
  125. @property
  126. def device_class(self):
  127. """The device class of this entity."""
  128. return self._config.get("class", None)
  129. def dps(self):
  130. """Iterate through the list of dps for this entity."""
  131. for d in self._config["dps"]:
  132. yield TuyaDpsConfig(self, d)
  133. def find_dps(self, name):
  134. """Find a dps with the specified name."""
  135. for d in self.dps():
  136. if d.name == name:
  137. return d
  138. return None
  139. class TuyaDpsConfig:
  140. """Representation of a dps config."""
  141. def __init__(self, entity, config):
  142. self._entity = entity
  143. self._config = config
  144. @property
  145. def id(self):
  146. return str(self._config["id"])
  147. @property
  148. def type(self):
  149. t = self._config["type"]
  150. types = {
  151. "boolean": bool,
  152. "integer": int,
  153. "string": str,
  154. "float": float,
  155. "bitfield": int,
  156. }
  157. return types.get(t, None)
  158. @property
  159. def name(self):
  160. return self._config["name"]
  161. def get_value(self, device):
  162. """Return the value of the dps from the given device."""
  163. return self._map_from_dps(device.get_property(self.id), device)
  164. async def async_set_value(self, device, value):
  165. """Set the value of the dps in the given device to given value."""
  166. if self.readonly:
  167. raise TypeError(f"{self.name} is read only")
  168. await device.async_set_property(self.id, self._map_to_dps(value, device))
  169. @property
  170. def values(self):
  171. """Return the possible values a dps can take."""
  172. if "mapping" not in self._config.keys():
  173. return None
  174. val = []
  175. for m in self._config["mapping"]:
  176. if "value" in m:
  177. val.append(m["value"])
  178. if "conditions" in m:
  179. for c in m["conditions"]:
  180. if "value" in c:
  181. val.append(c["value"])
  182. return list(set(val)) if len(val) > 0 else None
  183. @property
  184. def range(self):
  185. """Return the range for this dps if configured."""
  186. if (
  187. "range" in self._config.keys()
  188. and "min" in self._config["range"].keys()
  189. and "max" in self._config["range"].keys()
  190. ):
  191. return self._config["range"]
  192. else:
  193. return None
  194. @property
  195. def readonly(self):
  196. return "readonly" in self._config.keys() and self._config["readonly"] is True
  197. @property
  198. def hidden(self):
  199. return "hidden" in self._config.keys() and self._config["hidden"] is True
  200. def _find_map_for_dps(self, value):
  201. if "mapping" not in self._config.keys():
  202. return None
  203. default = None
  204. for m in self._config["mapping"]:
  205. if "dps_val" not in m:
  206. default = m
  207. elif str(m["dps_val"]) == str(value):
  208. return m
  209. return default
  210. def _map_from_dps(self, value, device):
  211. result = value
  212. mapping = self._find_map_for_dps(value)
  213. if mapping is not None:
  214. scale = mapping.get("scale", 1)
  215. if not isinstance(scale, (int, float)):
  216. scale = 1
  217. replaced = "value" in mapping
  218. result = mapping.get("value", result)
  219. if "conditions" in mapping:
  220. cond_dps = (
  221. self
  222. if "constraint" not in mapping
  223. else self._entity.find_dps(mapping["constraint"])
  224. )
  225. for c in mapping["conditions"]:
  226. if (
  227. "dps_val" in c
  228. and c["dps_val"] == device.get_property(cond_dps.id)
  229. and "value" in c
  230. ):
  231. result = c["value"]
  232. replaced = True
  233. if scale != 1 and isinstance(result, (int, float)):
  234. result = result / scale
  235. replaced = True
  236. if replaced:
  237. _LOGGER.debug(
  238. "%s: Mapped dps %s value from %s to %s",
  239. self._entity._device.name,
  240. self.id,
  241. value,
  242. result,
  243. )
  244. return result
  245. def _find_map_for_value(self, value):
  246. if "mapping" not in self._config.keys():
  247. return None
  248. default = None
  249. for m in self._config["mapping"]:
  250. if "dps_val" not in m:
  251. default = m
  252. if "value" in m and str(m["value"]) == str(value):
  253. return m
  254. if "conditions" in m:
  255. for c in m["conditions"]:
  256. if "value" in c and c["value"] == value:
  257. return m
  258. return default
  259. def _map_to_dps(self, value, device):
  260. result = value
  261. mapping = self._find_map_for_value(value)
  262. if mapping is not None:
  263. replaced = False
  264. scale = mapping.get("scale", 1)
  265. if not isinstance(scale, (int, float)):
  266. scale = 1
  267. step = mapping.get("step", None)
  268. if not isinstance(step, (int, float)):
  269. step = None
  270. if "dps_val" in mapping:
  271. result = mapping["dps_val"]
  272. replaced = True
  273. # Conditions may have side effect of setting another value.
  274. if "conditions" in mapping and "constraint" in mapping:
  275. c_dps = self._entity.find_dps(mapping["constraint"])
  276. for c in mapping["conditions"]:
  277. if "value" in c and c["value"] == value:
  278. device.set_property(c_dps.id, c["dps_val"])
  279. if scale != 1 and isinstance(result, (int, float)):
  280. result = result / scale
  281. replaced = True
  282. if step is not None and isinstance(result, (int, float)):
  283. result = step * round(float(result) / step)
  284. replaced = True
  285. if replaced:
  286. _LOGGER.debug(
  287. "%s: Mapped dps %s to %s from %s",
  288. self._entity._device.name,
  289. self.id,
  290. result,
  291. value,
  292. )
  293. if self.range is not None:
  294. minimum = self.range["min"]
  295. maximum = self.range["max"]
  296. if result < minimum or result > maximum:
  297. raise ValueError(
  298. f"Target {self.name} ({value}) must be between "
  299. f"{minimum} and {maximum}"
  300. )
  301. return result
  302. def available_configs():
  303. """List the available config files."""
  304. _CONFIG_DIR = dirname(config_dir.__file__)
  305. for (path, dirs, files) in walk(_CONFIG_DIR):
  306. for basename in sorted(files):
  307. if fnmatch(basename, "*.yaml"):
  308. yield basename
  309. def possible_matches(dps):
  310. """Return possible matching configs for a given set of dps values."""
  311. for cfg in available_configs():
  312. parsed = TuyaDeviceConfig(cfg)
  313. if parsed.matches(dps):
  314. yield parsed
  315. def config_for_legacy_use(conf_type):
  316. """
  317. Return a config to use with config_type for legacy transition.
  318. Note: as there are two variants for Kogan Socket, this is not guaranteed
  319. to be the correct config for the device, so only use it for looking up
  320. the legacy class during the transition period.
  321. """
  322. for cfg in available_configs():
  323. parsed = TuyaDeviceConfig(cfg)
  324. if parsed.legacy_type == conf_type:
  325. return parsed
  326. return None