device.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. """
  2. API for Tuya Local devices.
  3. """
  4. import json
  5. import logging
  6. from threading import Lock, Timer
  7. from time import time
  8. from tinytuya import CONTROL, Device as TuyaDevice
  9. from homeassistant.const import TEMP_CELSIUS
  10. from homeassistant.core import HomeAssistant
  11. from .const import (
  12. API_PROTOCOL_VERSIONS,
  13. CONF_TYPE_DEHUMIDIFIER,
  14. CONF_TYPE_EUROM_600_HEATER,
  15. CONF_TYPE_FAN,
  16. CONF_TYPE_GECO_HEATER,
  17. CONF_TYPE_GPCV_HEATER,
  18. CONF_TYPE_GPPH_HEATER,
  19. CONF_TYPE_GSH_HEATER,
  20. CONF_TYPE_GARDENPAC_HEATPUMP,
  21. CONF_TYPE_KOGAN_HEATER,
  22. CONF_TYPE_KOGAN_SWITCH,
  23. CONF_TYPE_PURLINE_M100_HEATER,
  24. DOMAIN,
  25. )
  26. _LOGGER = logging.getLogger(__name__)
  27. class TuyaLocalDevice(object):
  28. def __init__(self, name, dev_id, address, local_key, hass: HomeAssistant):
  29. """
  30. Represents a Tuya-based device.
  31. Args:
  32. dev_id (str): The device id.
  33. address (str): The network address.
  34. local_key (str): The encryption key.
  35. """
  36. self._name = name
  37. self._api_protocol_version_index = None
  38. self._api_protocol_working = False
  39. self._api = TuyaDevice(dev_id, address, local_key)
  40. self._refresh_task = None
  41. self._rotate_api_protocol_version()
  42. self._reset_cached_state()
  43. self._TEMPERATURE_UNIT = TEMP_CELSIUS
  44. self._hass = hass
  45. # API calls to update Tuya devices are asynchronous and non-blocking. This means
  46. # you can send a change and immediately request an updated state (like HA does),
  47. # but because it has not yet finished processing you will be returned the old state.
  48. # The solution is to keep a temporary list of changed properties that we can overlay
  49. # onto the state while we wait for the board to update its switches.
  50. self._FAKE_IT_TIL_YOU_MAKE_IT_TIMEOUT = 10
  51. self._CACHE_TIMEOUT = 20
  52. self._CONNECTION_ATTEMPTS = 4
  53. self._lock = Lock()
  54. @property
  55. def name(self):
  56. return self._name
  57. @property
  58. def unique_id(self):
  59. """Return the unique id for this device (the dev_id)."""
  60. return self._api.id
  61. @property
  62. def device_info(self):
  63. """Return the device information for this device."""
  64. return {
  65. "identifiers": {(DOMAIN, self.unique_id)},
  66. "name": self.name,
  67. "manufacturer": "Tuya",
  68. }
  69. @property
  70. def temperature_unit(self):
  71. return self._TEMPERATURE_UNIT
  72. async def async_inferred_type(self):
  73. cached_state = self._get_cached_state()
  74. if "1" not in cached_state and "3" not in cached_state:
  75. await self.async_refresh()
  76. cached_state = self._get_cached_state()
  77. _LOGGER.debug(f"Inferring device type from cached state: {cached_state}")
  78. if "1" not in cached_state and "3" in cached_state:
  79. _LOGGER.info(f"Detecting {self.name} as Kogan Heater")
  80. return CONF_TYPE_KOGAN_HEATER
  81. if "5" in cached_state and "3" not in cached_state and "103" in cached_state:
  82. _LOGGER.info(f"Detecting {self.name} as Goldair Dehumidifier")
  83. return CONF_TYPE_DEHUMIDIFIER
  84. if "8" in cached_state:
  85. _LOGGER.info(f"Detecting {self.name} as Goldair Fan")
  86. return CONF_TYPE_FAN
  87. if "10" in cached_state and "101" in cached_state:
  88. _LOGGER.info(f"Detecting {self.name} as Pur Line Hoti M100 heater")
  89. return CONF_TYPE_PURLINE_M100_HEATER
  90. if "5" in cached_state and "2" in cached_state and "4" not in cached_state:
  91. _LOGGER.info(f"Detecting {self.name} as Eurom Mon Soleil 600 Heater")
  92. return CONF_TYPE_EUROM_600_HEATER
  93. if "5" in cached_state and "3" not in cached_state:
  94. _LOGGER.info(f"Detecting {self.name} as Kogan Switch")
  95. return CONF_TYPE_KOGAN_SWITCH
  96. if "18" in cached_state:
  97. _LOGGER.info(f"Detecting {self.name} as newer type of Kogan Switch")
  98. return CONF_TYPE_KOGAN_SWITCH
  99. if "106" in cached_state and "2" not in cached_state:
  100. _LOGGER.info(f"Detecting {self.name} as GardenPAC Pool Heatpump")
  101. return CONF_TYPE_GARDENPAC_HEATPUMP
  102. if "106" in cached_state:
  103. _LOGGER.info(f"Detecting {self.name} as Goldair GPPH Heater")
  104. return CONF_TYPE_GPPH_HEATER
  105. if "7" in cached_state:
  106. _LOGGER.info(f"Detecting {self.name} as Goldair GPCV Heater")
  107. return CONF_TYPE_GPCV_HEATER
  108. if "12" in cached_state:
  109. _LOGGER.info(f"Detecting {self.name} as Andersson GSH Heter")
  110. return CONF_TYPE_GSH_HEATER
  111. if "3" in cached_state and "6" in cached_state:
  112. _LOGGER.info(f"Detecting {self.name} as Goldair GECO Heater")
  113. return CONF_TYPE_GECO_HEATER
  114. _LOGGER.warning(f"Detection for {self.name} failed")
  115. return None
  116. async def async_refresh(self):
  117. last_updated = self._get_cached_state()["updated_at"]
  118. if self._refresh_task is None or time() - last_updated >= self._CACHE_TIMEOUT:
  119. self._cached_state["updated_at"] = time()
  120. self._refresh_task = self._hass.async_add_executor_job(self.refresh)
  121. await self._refresh_task
  122. def refresh(self):
  123. _LOGGER.debug(f"Refreshing device state for {self.name}.")
  124. self._retry_on_failed_connection(
  125. lambda: self._refresh_cached_state(),
  126. f"Failed to refresh device state for {self.name}.",
  127. )
  128. def get_property(self, dps_id):
  129. cached_state = self._get_cached_state()
  130. if dps_id in cached_state:
  131. return cached_state[dps_id]
  132. else:
  133. return None
  134. def set_property(self, dps_id, value):
  135. self._set_properties({dps_id: value})
  136. async def async_set_property(self, dps_id, value):
  137. await self._hass.async_add_executor_job(self.set_property, dps_id, value)
  138. def anticipate_property_value(self, dps_id, value):
  139. """
  140. Update a value in the cached state only. This is good for when you know the device will reflect a new state in
  141. the next update, but don't want to wait for that update for the device to represent this state.
  142. The anticipated value will be cleared with the next update.
  143. """
  144. self._cached_state[dps_id] = value
  145. def _reset_cached_state(self):
  146. self._cached_state = {"updated_at": 0}
  147. self._pending_updates = {}
  148. def _refresh_cached_state(self):
  149. new_state = self._api.status()
  150. self._cached_state = new_state["dps"]
  151. self._cached_state["updated_at"] = time()
  152. _LOGGER.debug(f"refreshed device state: {json.dumps(new_state)}")
  153. _LOGGER.debug(
  154. f"new cache state (including pending properties): {json.dumps(self._get_cached_state())}"
  155. )
  156. def _set_properties(self, properties):
  157. if len(properties) == 0:
  158. return
  159. self._add_properties_to_pending_updates(properties)
  160. self._debounce_sending_updates()
  161. def _add_properties_to_pending_updates(self, properties):
  162. now = time()
  163. pending_updates = self._get_pending_updates()
  164. for key, value in properties.items():
  165. pending_updates[key] = {"value": value, "updated_at": now}
  166. _LOGGER.debug(f"new pending updates: {json.dumps(self._pending_updates)}")
  167. def _debounce_sending_updates(self):
  168. try:
  169. self._debounce.cancel()
  170. except AttributeError:
  171. pass
  172. self._debounce = Timer(1, self._send_pending_updates)
  173. self._debounce.start()
  174. def _send_pending_updates(self):
  175. pending_properties = self._get_pending_properties()
  176. payload = self._api.generate_payload(CONTROL, pending_properties)
  177. _LOGGER.debug(f"sending dps update: {json.dumps(pending_properties)}")
  178. self._retry_on_failed_connection(
  179. lambda: self._send_payload(payload), "Failed to update device state."
  180. )
  181. def _send_payload(self, payload):
  182. try:
  183. self._lock.acquire()
  184. self._api._send_receive(payload)
  185. self._cached_state["updated_at"] = 0
  186. now = time()
  187. pending_updates = self._get_pending_updates()
  188. for key, value in pending_updates.items():
  189. pending_updates[key]["updated_at"] = now
  190. finally:
  191. self._lock.release()
  192. def _retry_on_failed_connection(self, func, error_message):
  193. for i in range(self._CONNECTION_ATTEMPTS):
  194. try:
  195. func()
  196. self._api_protocol_working = True
  197. break
  198. except Exception as e:
  199. _LOGGER.info(f"Retrying after exception {e}")
  200. if i + 1 == self._CONNECTION_ATTEMPTS:
  201. self._reset_cached_state()
  202. self._api_protocol_working = False
  203. _LOGGER.error(error_message)
  204. elif self._api_protocol_working is False:
  205. self._rotate_api_protocol_version()
  206. def _get_cached_state(self):
  207. cached_state = self._cached_state.copy()
  208. return {**cached_state, **self._get_pending_properties()}
  209. def _get_pending_properties(self):
  210. return {key: info["value"] for key, info in self._get_pending_updates().items()}
  211. def _get_pending_updates(self):
  212. now = time()
  213. self._pending_updates = {
  214. key: value
  215. for key, value in self._pending_updates.items()
  216. if now - value["updated_at"] < self._FAKE_IT_TIL_YOU_MAKE_IT_TIMEOUT
  217. }
  218. return self._pending_updates
  219. def _rotate_api_protocol_version(self):
  220. if self._api_protocol_version_index is None:
  221. self._api_protocol_version_index = 0
  222. else:
  223. self._api_protocol_version_index += 1
  224. if self._api_protocol_version_index >= len(API_PROTOCOL_VERSIONS):
  225. self._api_protocol_version_index = 0
  226. new_version = API_PROTOCOL_VERSIONS[self._api_protocol_version_index]
  227. _LOGGER.info(f"Setting protocol version for {self.name} to {new_version}.")
  228. self._api.set_version(new_version)
  229. @staticmethod
  230. def get_key_for_value(obj, value, fallback=None):
  231. keys = list(obj.keys())
  232. values = list(obj.values())
  233. return keys[values.index(value)] if value in values else fallback