device.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. """
  2. API for Tuya Local devices.
  3. """
  4. import json
  5. import logging
  6. import tinytuya
  7. from threading import Lock, Timer
  8. from time import time
  9. from homeassistant.const import CONF_HOST, CONF_NAME, UnitOfTemperature
  10. from homeassistant.core import HomeAssistant
  11. from .const import (
  12. API_PROTOCOL_VERSIONS,
  13. CONF_DEVICE_ID,
  14. CONF_LOCAL_KEY,
  15. CONF_PROTOCOL_VERSION,
  16. DOMAIN,
  17. )
  18. from .helpers.device_config import possible_matches
  19. _LOGGER = logging.getLogger(__name__)
  20. class TuyaLocalDevice(object):
  21. def __init__(
  22. self,
  23. name,
  24. dev_id,
  25. address,
  26. local_key,
  27. protocol_version,
  28. hass: HomeAssistant,
  29. ):
  30. """
  31. Represents a Tuya-based device.
  32. Args:
  33. dev_id (str): The device id.
  34. address (str): The network address.
  35. local_key (str): The encryption key.
  36. protocol_version (str | number): The protocol version.
  37. """
  38. self._name = name
  39. self._api_protocol_version_index = None
  40. self._api_protocol_working = False
  41. self._api = tinytuya.Device(dev_id, address, local_key)
  42. self._refresh_task = None
  43. self._protocol_configured = protocol_version
  44. self._reset_cached_state()
  45. self._TEMPERATURE_UNIT = UnitOfTemperature.CELSIUS
  46. self._hass = hass
  47. # API calls to update Tuya devices are asynchronous and non-blocking.
  48. # This means you can send a change and immediately request an updated
  49. # state (like HA does), but because it has not yet finished processing
  50. # you will be returned the old state.
  51. # The solution is to keep a temporary list of changed properties that
  52. # we can overlay onto the state while we wait for the board to update
  53. # its switches.
  54. self._FAKE_IT_TIL_YOU_MAKE_IT_TIMEOUT = 10
  55. self._CACHE_TIMEOUT = 20
  56. # More attempts are needed in auto mode so we can cycle through all
  57. # the possibilities a couple of times
  58. self._AUTO_CONNECTION_ATTEMPTS = 9
  59. self._SINGLE_PROTO_CONNECTION_ATTEMPTS = 3
  60. self._lock = Lock()
  61. @property
  62. def name(self):
  63. return self._name
  64. @property
  65. def unique_id(self):
  66. """Return the unique id for this device (the dev_id)."""
  67. return self._api.id
  68. @property
  69. def device_info(self):
  70. """Return the device information for this device."""
  71. return {
  72. "identifiers": {(DOMAIN, self.unique_id)},
  73. "name": self.name,
  74. "manufacturer": "Tuya",
  75. }
  76. @property
  77. def has_returned_state(self):
  78. """Return True if the device has returned some state."""
  79. return len(self._get_cached_state()) > 1
  80. @property
  81. def temperature_unit(self):
  82. return self._TEMPERATURE_UNIT
  83. async def async_possible_types(self):
  84. cached_state = self._get_cached_state()
  85. if len(cached_state) <= 1:
  86. await self.async_refresh()
  87. cached_state = self._get_cached_state()
  88. for match in possible_matches(cached_state):
  89. yield match
  90. async def async_inferred_type(self):
  91. best_match = None
  92. best_quality = 0
  93. cached_state = {}
  94. async for config in self.async_possible_types():
  95. cached_state = self._get_cached_state()
  96. quality = config.match_quality(cached_state)
  97. _LOGGER.info(
  98. f"{self.name} considering {config.name} with quality {quality}"
  99. )
  100. if quality > best_quality:
  101. best_quality = quality
  102. best_match = config
  103. if best_match is None:
  104. _LOGGER.warning(f"Detection for {self.name} with dps {cached_state} failed")
  105. return None
  106. return best_match.config_type
  107. async def async_refresh(self):
  108. cache = self._get_cached_state()
  109. if "updated_at" in cache:
  110. last_updated = self._get_cached_state()["updated_at"]
  111. else:
  112. last_updated = 0
  113. if self._refresh_task is None or time() - last_updated >= self._CACHE_TIMEOUT:
  114. self._cached_state["updated_at"] = time()
  115. self._refresh_task = self._hass.async_add_executor_job(self.refresh)
  116. await self._refresh_task
  117. def refresh(self):
  118. _LOGGER.debug(f"Refreshing device state for {self.name}.")
  119. self._retry_on_failed_connection(
  120. lambda: self._refresh_cached_state(),
  121. f"Failed to refresh device state for {self.name}.",
  122. )
  123. def get_property(self, dps_id):
  124. cached_state = self._get_cached_state()
  125. if dps_id in cached_state:
  126. return cached_state[dps_id]
  127. else:
  128. return None
  129. def set_property(self, dps_id, value):
  130. self._set_properties({dps_id: value})
  131. async def async_set_property(self, dps_id, value):
  132. await self._hass.async_add_executor_job(self.set_property, dps_id, value)
  133. async def async_set_properties(self, dps_map):
  134. await self._hass.async_add_executor_job(self._set_properties, dps_map)
  135. def anticipate_property_value(self, dps_id, value):
  136. """
  137. Update a value in the cached state only. This is good for when you know the device will reflect a new state in
  138. the next update, but don't want to wait for that update for the device to represent this state.
  139. The anticipated value will be cleared with the next update.
  140. """
  141. self._cached_state[dps_id] = value
  142. def _reset_cached_state(self):
  143. self._cached_state = {"updated_at": 0}
  144. self._pending_updates = {}
  145. self._last_connection = 0
  146. def _refresh_cached_state(self):
  147. new_state = self._api.status()
  148. self._cached_state = self._cached_state | new_state["dps"]
  149. self._cached_state["updated_at"] = time()
  150. _LOGGER.debug(f"{self.name} refreshed device state: {json.dumps(new_state)}")
  151. _LOGGER.debug(
  152. f"new cache state (including pending properties): {json.dumps(self._get_cached_state())}"
  153. )
  154. def _set_properties(self, properties):
  155. if len(properties) == 0:
  156. return
  157. self._add_properties_to_pending_updates(properties)
  158. self._debounce_sending_updates()
  159. def _add_properties_to_pending_updates(self, properties):
  160. now = time()
  161. pending_updates = self._get_pending_updates()
  162. for key, value in properties.items():
  163. pending_updates[key] = {"value": value, "updated_at": now}
  164. _LOGGER.debug(
  165. f"{self.name} new pending updates: {json.dumps(self._pending_updates)}"
  166. )
  167. def _debounce_sending_updates(self):
  168. now = time()
  169. since = now - self._last_connection
  170. # set this now to avoid a race condition, it will be updated later
  171. # when the data is actally sent
  172. self._last_connection = now
  173. # Only delay a second if there was recently another command.
  174. # Otherwise delay 1ms, to keep things simple by reusing the
  175. # same send mechanism.
  176. waittime = 1 if since < 1.1 else 0.001
  177. try:
  178. self._debounce.cancel()
  179. except AttributeError:
  180. pass
  181. self._debounce = Timer(waittime, self._send_pending_updates)
  182. self._debounce.start()
  183. def _send_pending_updates(self):
  184. pending_properties = self._get_pending_properties()
  185. payload = self._api.generate_payload(tinytuya.CONTROL, pending_properties)
  186. _LOGGER.debug(
  187. f"{self.name} sending dps update: {json.dumps(pending_properties)}"
  188. )
  189. self._retry_on_failed_connection(
  190. lambda: self._send_payload(payload), "Failed to update device state."
  191. )
  192. def _send_payload(self, payload):
  193. try:
  194. self._lock.acquire()
  195. self._api._send_receive(payload)
  196. self._cached_state["updated_at"] = 0
  197. now = time()
  198. self._last_connection = now
  199. pending_updates = self._get_pending_updates()
  200. for key, value in pending_updates.items():
  201. pending_updates[key]["updated_at"] = now
  202. finally:
  203. self._lock.release()
  204. def _retry_on_failed_connection(self, func, error_message):
  205. if self._api_protocol_version_index is None:
  206. self._rotate_api_protocol_version()
  207. connections = (
  208. self._AUTO_CONNECTION_ATTEMPTS
  209. if (self._protocol_configured == "auto" and not self._api_protocol_working)
  210. else self._SINGLE_PROTO_CONNECTION_ATTEMPTS
  211. )
  212. for i in range(connections):
  213. try:
  214. func()
  215. self._api_protocol_working = True
  216. break
  217. except Exception as e:
  218. _LOGGER.debug(f"Retrying after exception {e}")
  219. if i + 1 == connections:
  220. self._reset_cached_state()
  221. self._api_protocol_working = False
  222. _LOGGER.error(error_message)
  223. if not self._api_protocol_working:
  224. self._rotate_api_protocol_version()
  225. def _get_cached_state(self):
  226. cached_state = self._cached_state.copy()
  227. return {**cached_state, **self._get_pending_properties()}
  228. def _get_pending_properties(self):
  229. return {key: info["value"] for key, info in self._get_pending_updates().items()}
  230. def _get_pending_updates(self):
  231. now = time()
  232. self._pending_updates = {
  233. key: value
  234. for key, value in self._pending_updates.items()
  235. if now - value["updated_at"] < self._FAKE_IT_TIL_YOU_MAKE_IT_TIMEOUT
  236. }
  237. return self._pending_updates
  238. def _rotate_api_protocol_version(self):
  239. if self._api_protocol_version_index is None:
  240. try:
  241. self._api_protocol_version_index = API_PROTOCOL_VERSIONS.index(
  242. self._protocol_configured
  243. )
  244. except ValueError:
  245. self._api_protocol_version_index = 0
  246. # only rotate if configured as auto
  247. elif self._protocol_configured == "auto":
  248. self._api_protocol_version_index += 1
  249. if self._api_protocol_version_index >= len(API_PROTOCOL_VERSIONS):
  250. self._api_protocol_version_index = 0
  251. new_version = API_PROTOCOL_VERSIONS[self._api_protocol_version_index]
  252. _LOGGER.info(f"Setting protocol version for {self.name} to {new_version}.")
  253. self._api.set_version(new_version)
  254. @staticmethod
  255. def get_key_for_value(obj, value, fallback=None):
  256. keys = list(obj.keys())
  257. values = list(obj.values())
  258. return keys[values.index(value)] if value in values else fallback
  259. def setup_device(hass: HomeAssistant, config: dict):
  260. """Setup a tuya device based on passed in config."""
  261. _LOGGER.info(f"Creating device: {config[CONF_DEVICE_ID]}")
  262. hass.data[DOMAIN] = hass.data.get(DOMAIN, {})
  263. device = TuyaLocalDevice(
  264. config[CONF_NAME],
  265. config[CONF_DEVICE_ID],
  266. config[CONF_HOST],
  267. config[CONF_LOCAL_KEY],
  268. config[CONF_PROTOCOL_VERSION],
  269. hass,
  270. )
  271. hass.data[DOMAIN][config[CONF_DEVICE_ID]] = {"device": device}
  272. return device
  273. def delete_device(hass: HomeAssistant, config: dict):
  274. _LOGGER.info(f"Deleting device: {config[CONF_DEVICE_ID]}")
  275. del hass.data[DOMAIN][config[CONF_DEVICE_ID]]["device"]