cloud.py 9.8 KB


  1. import logging
  2. from typing import Any
  3. from homeassistant.core import HomeAssistant
  4. from tuya_sharing import (
  5. CustomerDevice,
  6. LoginControl,
  7. Manager,
  8. SharingDeviceListener,
  9. SharingTokenListener,
  10. )
  11. from .const import (
  12. CONF_DEVICE_CID,
  13. CONF_ENDPOINT,
  14. CONF_LOCAL_KEY,
  15. CONF_TERMINAL_ID,
  16. DOMAIN,
  17. TUYA_CLIENT_ID,
  18. TUYA_RESPONSE_CODE,
  19. TUYA_RESPONSE_MSG,
  20. TUYA_RESPONSE_QR_CODE,
  21. TUYA_RESPONSE_RESULT,
  22. TUYA_RESPONSE_SUCCESS,
  23. TUYA_SCHEMA,
  24. )
  25. _LOGGER = logging.getLogger(__name__)
  26. HUB_CATEGORIES = [
  27. "wgsxj", # Gateway camera
  28. "lyqwg", # Router
  29. "bywg", # IoT edge gateway
  30. "zigbee", # Gateway
  31. "wg2", # Gateway
  32. "dgnzk", # Multi-function controller
  33. "videohub", # Videohub
  34. "xnwg", # Virtual gateway
  35. "qtyycp", # Voice gateway composite solution
  36. "alexa_yywg", # Gateway with Alexa
  37. "gywg", # Industrial gateway
  38. "cnwg", # Energy gateway
  39. "wnykq", # Smart IR
  40. ]
  41. class Cloud:
  42. """Optional Tuya cloud interface for getting device information."""
  43. def __init__(self, hass: HomeAssistant):
  44. self.__login_control = LoginControl()
  45. self.__authentication = {}
  46. self.__user_code = None
  47. self.__qr_code = None
  48. self.__hass = hass
  49. self.__error_code = None
  50. self.__error_msg = None
  51. # Restore cached authentication
  52. if cached := self.__hass.data[DOMAIN].get("auth_cache"):
  53. self.__authentication = cached
  54. async def async_get_qr_code(self, user_code: str | None = None) -> bool:
  55. """Get QR code from Tuya server for user code authentication."""
  56. if not user_code:
  57. user_code = self.__user_code
  58. if not user_code:
  59. _LOGGER.error("Cannot get QR code without a user code")
  60. return False, {TUYA_RESPONSE_MSG: "QR code requires a user code"}
  61. response = await self.__hass.async_add_executor_job(
  62. self.__login_control.qr_code,
  63. TUYA_CLIENT_ID,
  64. TUYA_SCHEMA,
  65. user_code,
  66. )
  67. if response.get(TUYA_RESPONSE_SUCCESS, False):
  68. self.__user_code = user_code
  69. self.__qr_code = response[TUYA_RESPONSE_RESULT][TUYA_RESPONSE_QR_CODE]
  70. return self.__qr_code
  71. _LOGGER.error("Failed to get QR code: %s", response)
  72. self.__error_code = response.get(TUYA_RESPONSE_CODE, {})
  73. self.__error_msg = response.get(TUYA_RESPONSE_MSG, "Unknown error")
  74. return False
  75. async def async_login(self) -> bool:
  76. """Login to the Tuya cloud."""
  77. if not self.__user_code or not self.__qr_code:
  78. _LOGGER.warning("Login attempted without successful QR scan")
  79. return False, {}
  80. success, info = await self.__hass.async_add_executor_job(
  81. self.__login_control.login_result,
  82. self.__qr_code,
  83. TUYA_CLIENT_ID,
  84. self.__user_code,
  85. )
  86. if success:
  87. self.__authentication = {
  88. "user_code": self.__user_code,
  89. "terminal_id": info[CONF_TERMINAL_ID],
  90. "endpoint": info[CONF_ENDPOINT],
  91. "token_info": {
  92. "t": info["t"],
  93. "uid": info["uid"],
  94. "expire_time": info["expire_time"],
  95. "access_token": info["access_token"],
  96. "refresh_token": info["refresh_token"],
  97. },
  98. }
  99. self.__hass.data[DOMAIN]["auth_cache"] = self.__authentication
  100. else:
  101. _LOGGER.warning("Login failed: %s", info)
  102. self.__error_code = info.get(TUYA_RESPONSE_CODE, {})
  103. self.__error_msg = info.get(TUYA_RESPONSE_MSG, "Unknown error")
  104. # Ensure expired authentication is cleared on next attempt
  105. self.__hass.data[DOMAIN]["auth_cache"] = None
  106. self.__authentication = {}
  107. return success
  108. async def async_get_devices(self) -> dict[str, Any]:
  109. """Get all devices associated with the account."""
  110. token_listener = TokenListener(self.__hass)
  111. manager = Manager(
  112. TUYA_CLIENT_ID,
  113. self.__authentication["user_code"],
  114. self.__authentication["terminal_id"],
  115. self.__authentication["endpoint"],
  116. self.__authentication["token_info"],
  117. token_listener,
  118. )
  119. listener = DeviceListener(self.__hass, manager)
  120. manager.add_device_listener(listener)
  121. # Get all devices from Tuya cloud
  122. await self.__hass.async_add_executor_job(manager.update_device_cache)
  123. # Register known device IDs
  124. cloud_devices = {}
  125. domain_data = self.__hass.data.get(DOMAIN)
  126. for device in manager.device_map.values():
  127. cloud_device = {
  128. "category": device.category,
  129. "id": device.id,
  130. "ip": device.ip,
  131. CONF_LOCAL_KEY: device.local_key
  132. if hasattr(device, CONF_LOCAL_KEY)
  133. else "",
  134. "name": device.name,
  135. "node_id": device.node_id if hasattr(device, "node_id") else "",
  136. "online": device.online,
  137. "product_id": device.product_id,
  138. "product_name": device.product_name,
  139. "uid": device.uid,
  140. "uuid": device.uuid,
  141. "support_local": device.support_local,
  142. CONF_DEVICE_CID: None,
  143. "version": None,
  144. "is_hub": (
  145. device.category in HUB_CATEGORIES
  146. or not hasattr(device, "local_key")
  147. ),
  148. }
  149. _LOGGER.debug("Found device: %s", cloud_device["product_name"])
  150. existing_id = domain_data.get(cloud_device["id"]) if domain_data else None
  151. existing_uuid = (
  152. domain_data.get(cloud_device["uuid"]) if domain_data else None
  153. )
  154. existing = existing_id or existing_uuid
  155. cloud_device["exists"] = existing and existing.get("device")
  156. if hasattr(device, "node_id"):
  157. index = "/".join(
  158. [
  159. cloud_device["id"],
  160. cloud_device["node_id"],
  161. ]
  162. )
  163. else:
  164. index = cloud_device["id"]
  165. cloud_devices[index] = cloud_device
  166. return cloud_devices
  167. async def async_get_datamodel(self, device_id) -> dict[str, Any] | None:
  168. """Get the data model for the specified device (QueryThingsDataModel)."""
  169. token_listener = TokenListener(self.__hass)
  170. manager = Manager(
  171. TUYA_CLIENT_ID,
  172. self.__authentication["user_code"],
  173. self.__authentication["terminal_id"],
  174. self.__authentication["endpoint"],
  175. self.__authentication["token_info"],
  176. token_listener,
  177. )
  178. response = await self.__hass.async_add_executor_job(
  179. manager.customer_api.get,
  180. f"/v1.0/m/life/devices/{device_id}/status",
  181. )
  182. _LOGGER.debug("Datamodel response: %s", response)
  183. if response.get("result"):
  184. response = response["result"]
  185. transform = []
  186. for entry in response.get("dpStatusRelationDTOS"):
  187. if entry["supportLocal"]:
  188. transform.append(
  189. {
  190. "id": entry["dpId"],
  191. "name": entry["dpCode"],
  192. "type": entry["valueType"],
  193. "format": entry["valueDesc"],
  194. "enumMap": entry["enumMappingMap"],
  195. }
  196. )
  197. return transform
  198. def logout(self) -> None:
  199. """Logout from the Tuya cloud."""
  200. _LOGGER.debug("Logging out from Tuya cloud")
  201. # Clear authentication cache
  202. self.__hass.data[DOMAIN]["auth_cache"] = None
  203. self.__authentication = {}
  204. @property
  205. def is_authenticated(self) -> bool:
  206. """Is the cloud account authenticated?"""
  207. return True if self.__authentication else False
  208. @property
  209. def last_error(self) -> dict[str, Any] | None:
  210. """The last cloud error code and message, if any."""
  211. if self.__error_code is not None:
  212. return {
  213. TUYA_RESPONSE_MSG: self.__error_msg,
  214. TUYA_RESPONSE_CODE: self.__error_code,
  215. }
  216. class DeviceListener(SharingDeviceListener):
  217. """Device update listener."""
  218. def __init__(
  219. self,
  220. hass: HomeAssistant,
  221. manager: Manager,
  222. ):
  223. self.__hass = hass
  224. self._manager = manager
  225. def update_device(
  226. self,
  227. device: CustomerDevice,
  228. updated_status_properties: list[str] | None,
  229. ) -> None:
  230. """Device status has updated."""
  231. _LOGGER.debug(
  232. "Received update for device %s: %s (properties %s)",
  233. device.id,
  234. self._manager.device_map[device.id].status,
  235. updated_status_properties,
  236. )
  237. def add_device(self, device: CustomerDevice) -> None:
  238. """A new device has been added."""
  239. _LOGGER.debug(
  240. "Received add device %s: %s",
  241. device.id,
  242. self._manager.device_map[device.id].status,
  243. )
  244. def remove_device(self, device_id: str) -> None:
  245. """A device has been removed."""
  246. _LOGGER.debug(
  247. "Received remove device %s: %s",
  248. device_id,
  249. self._manager.device_map[device_id].status,
  250. )
  251. class TokenListener(SharingTokenListener):
  252. """Listener for upstream token updates.
  253. This is only needed to get some debug output when tokens are refreshed."""
  254. def __init__(self, hass: HomeAssistant):
  255. self.__hass = hass
  256. def update_token(self, token_info: dict[str, Any]) -> None:
  257. """Update the token information."""
  258. _LOGGER.debug("Token updated")