device.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. """
  2. API for Goldair Tuya devices.
  3. """
  4. import json
  5. import logging
  6. from threading import Lock, Timer
  7. from time import time
  8. from homeassistant.const import TEMP_CELSIUS
  9. from .const import DOMAIN, API_PROTOCOL_VERSIONS, CONF_TYPE_DEHUMIDIFIER, CONF_TYPE_FAN, CONF_TYPE_GPCV_HEATER, CONF_TYPE_HEATER
  10. _LOGGER = logging.getLogger(__name__)
  11. class GoldairTuyaDevice(object):
  12. def __init__(self, name, dev_id, address, local_key, hass):
  13. """
  14. Represents a Goldair Tuya-based device.
  15. Args:
  16. dev_id (str): The device id.
  17. address (str): The network address.
  18. local_key (str): The encryption key.
  19. """
  20. import pytuya
  21. self._name = name
  22. self._api_protocol_version_index = None
  23. self._api = pytuya.Device(dev_id, address, local_key, "device")
  24. self._rotate_api_protocol_version()
  25. self._fixed_properties = {}
  26. self._reset_cached_state()
  27. self._TEMPERATURE_UNIT = TEMP_CELSIUS
  28. self._hass = hass
  29. # API calls to update Goldair heaters are asynchronous and non-blocking. This means
  30. # you can send a change and immediately request an updated state (like HA does),
  31. # but because it has not yet finished processing you will be returned the old state.
  32. # The solution is to keep a temporary list of changed properties that we can overlay
  33. # onto the state while we wait for the board to update its switches.
  34. self._FAKE_IT_TIL_YOU_MAKE_IT_TIMEOUT = 10
  35. self._CACHE_TIMEOUT = 20
  36. self._CONNECTION_ATTEMPTS = 4
  37. self._lock = Lock()
  38. @property
  39. def name(self):
  40. return self._name
  41. @property
  42. def unique_id(self):
  43. """Return the unique id for this device (the dev_id)."""
  44. return self._api.id
  45. @property
  46. def device_info(self):
  47. """Return the device information for this device."""
  48. return {
  49. "identifiers": {(DOMAIN, self.unique_id)},
  50. "name": self.name,
  51. "manufacturer": "Goldair"
  52. }
  53. @property
  54. def temperature_unit(self):
  55. return self._TEMPERATURE_UNIT
  56. async def async_inferred_type(self):
  57. cached_state = self._get_cached_state()
  58. if not "1" in cached_state:
  59. await self.async_refresh()
  60. return await self.async_inferred_type()
  61. _LOGGER.debug(f"Inferring device type from cached state: {cached_state}")
  62. if "5" in cached_state:
  63. if "3" in cached_state:
  64. return CONF_TYPE_GPCV_HEATER
  65. else:
  66. return CONF_TYPE_DEHUMIDIFIER
  67. if "8" in cached_state:
  68. return CONF_TYPE_FAN
  69. if "106" in cached_state:
  70. return CONF_TYPE_HEATER
  71. return None
  72. def set_fixed_properties(self, fixed_properties):
  73. self._fixed_properties = fixed_properties
  74. set_fixed_properties = Timer(
  75. 10, lambda: self._set_properties(self._fixed_properties)
  76. )
  77. set_fixed_properties.start()
  78. def refresh(self):
  79. now = time()
  80. cached_state = self._get_cached_state()
  81. if now - cached_state["updated_at"] >= self._CACHE_TIMEOUT:
  82. self._cached_state["updated_at"] = time()
  83. self._retry_on_failed_connection(
  84. lambda: self._refresh_cached_state(),
  85. f"Failed to refresh device state for {self.name}.",
  86. )
  87. async def async_refresh(self):
  88. await self._hass.async_add_executor_job(self.refresh)
  89. def get_property(self, dps_id):
  90. cached_state = self._get_cached_state()
  91. if dps_id in cached_state:
  92. return cached_state[dps_id]
  93. else:
  94. return None
  95. def set_property(self, dps_id, value):
  96. self._set_properties({dps_id: value})
  97. async def async_set_property(self, dps_id, value):
  98. await self._hass.async_add_executor_job(self.set_property, dps_id, value)
  99. def anticipate_property_value(self, dps_id, value):
  100. """
  101. Update a value in the cached state only. This is good for when you know the device will reflect a new state in
  102. the next update, but don't want to wait for that update for the device to represent this state.
  103. The anticipated value will be cleared with the next update.
  104. """
  105. self._cached_state[dps_id] = value
  106. def _reset_cached_state(self):
  107. self._cached_state = {"updated_at": 0}
  108. self._pending_updates = {}
  109. def _refresh_cached_state(self):
  110. new_state = self._api.status()
  111. self._cached_state = new_state["dps"]
  112. self._cached_state["updated_at"] = time()
  113. _LOGGER.info(f"refreshed device state: {json.dumps(new_state)}")
  114. _LOGGER.debug(
  115. f"new cache state (including pending properties): {json.dumps(self._get_cached_state())}"
  116. )
  117. def _set_properties(self, properties):
  118. if len(properties) == 0:
  119. return
  120. self._add_properties_to_pending_updates(properties)
  121. self._debounce_sending_updates()
  122. def _add_properties_to_pending_updates(self, properties):
  123. now = time()
  124. properties = {**properties, **self._fixed_properties}
  125. pending_updates = self._get_pending_updates()
  126. for key, value in properties.items():
  127. pending_updates[key] = {"value": value, "updated_at": now}
  128. _LOGGER.debug(f"new pending updates: {json.dumps(self._pending_updates)}")
  129. def _debounce_sending_updates(self):
  130. try:
  131. self._debounce.cancel()
  132. except AttributeError:
  133. pass
  134. self._debounce = Timer(1, self._send_pending_updates)
  135. self._debounce.start()
  136. def _send_pending_updates(self):
  137. pending_properties = self._get_pending_properties()
  138. payload = self._api.generate_payload("set", pending_properties)
  139. _LOGGER.info(f"sending dps update: {json.dumps(pending_properties)}")
  140. self._retry_on_failed_connection(
  141. lambda: self._send_payload(payload), "Failed to update device state."
  142. )
  143. def _send_payload(self, payload):
  144. try:
  145. self._lock.acquire()
  146. self._api._send_receive(payload)
  147. self._cached_state["updated_at"] = 0
  148. now = time()
  149. pending_updates = self._get_pending_updates()
  150. for key, value in pending_updates.items():
  151. pending_updates[key]["updated_at"] = now
  152. finally:
  153. self._lock.release()
  154. def _retry_on_failed_connection(self, func, error_message):
  155. for i in range(self._CONNECTION_ATTEMPTS):
  156. try:
  157. func()
  158. except:
  159. if i + 1 == self._CONNECTION_ATTEMPTS:
  160. self._reset_cached_state()
  161. _LOGGER.error(error_message)
  162. else:
  163. self._rotate_api_protocol_version()
  164. def _get_cached_state(self):
  165. cached_state = self._cached_state.copy()
  166. _LOGGER.debug(f"pending updates: {json.dumps(self._get_pending_updates())}")
  167. return {**cached_state, **self._get_pending_properties()}
  168. def _get_pending_properties(self):
  169. return {key: info["value"] for key, info in self._get_pending_updates().items()}
  170. def _get_pending_updates(self):
  171. now = time()
  172. self._pending_updates = {
  173. key: value
  174. for key, value in self._pending_updates.items()
  175. if now - value["updated_at"] < self._FAKE_IT_TIL_YOU_MAKE_IT_TIMEOUT
  176. }
  177. return self._pending_updates
  178. def _rotate_api_protocol_version(self):
  179. if self._api_protocol_version_index is None:
  180. self._api_protocol_version_index = 0
  181. else:
  182. self._api_protocol_version_index += 1
  183. if self._api_protocol_version_index >= len(API_PROTOCOL_VERSIONS):
  184. self._api_protocol_version_index = 0
  185. new_version = API_PROTOCOL_VERSIONS[self._api_protocol_version_index]
  186. _LOGGER.info(f"Setting protocol version for {self.name} to {new_version}.")
  187. self._api.set_version(new_version)
  188. @staticmethod
  189. def get_key_for_value(obj, value, fallback=None):
  190. keys = list(obj.keys())
  191. values = list(obj.values())
  192. return keys[values.index(value)] or fallback