cloud.py 9.1 KB


  1. import logging
  2. from typing import Any
  3. from homeassistant.core import HomeAssistant
  4. from tuya_sharing import (
  5. CustomerDevice,
  6. LoginControl,
  7. Manager,
  8. SharingDeviceListener,
  9. SharingTokenListener,
  10. )
  11. from .const import (
  12. CONF_DEVICE_CID,
  13. CONF_ENDPOINT,
  14. CONF_LOCAL_KEY,
  15. CONF_TERMINAL_ID,
  16. DOMAIN,
  17. TUYA_CLIENT_ID,
  18. TUYA_RESPONSE_CODE,
  19. TUYA_RESPONSE_MSG,
  20. TUYA_RESPONSE_QR_CODE,
  21. TUYA_RESPONSE_RESULT,
  22. TUYA_RESPONSE_SUCCESS,
  23. TUYA_SCHEMA,
  24. )
  25. _LOGGER = logging.getLogger(__name__)
  26. HUB_CATEGORIES = [
  27. "wgsxj", # Gateway camera
  28. "lyqwg", # Router
  29. "bywg", # IoT edge gateway
  30. "zigbee", # Gateway
  31. "wg2", # Gateway
  32. "dgnzk", # Multi-function controller
  33. "videohub", # Videohub
  34. "xnwg", # Virtual gateway
  35. "qtyycp", # Voice gateway composite solution
  36. "alexa_yywg", # Gateway with Alexa
  37. "gywg", # Industrial gateway
  38. "cnwg", # Energy gateway
  39. "wnykq", # Smart IR
  40. ]
  41. class Cloud:
  42. """Optional Tuya cloud interface for getting device information."""
  43. def __init__(self, hass: HomeAssistant):
  44. self.__login_control = LoginControl()
  45. self.__authentication = {}
  46. self.__user_code = None
  47. self.__qr_code = None
  48. self.__hass = hass
  49. self.__error_code = None
  50. self.__error_msg = None
  51. # Restore cached authentication
  52. if cached := self.__hass.data[DOMAIN].get("auth_cache"):
  53. self.__authentication = cached
  54. async def async_get_qr_code(self, user_code: str | None = None) -> bool:
  55. """Get QR code from Tuya server for user code authentication."""
  56. if not user_code:
  57. user_code = self.__user_code
  58. if not user_code:
  59. _LOGGER.error("Cannot get QR code without a user code")
  60. return False, {TUYA_RESPONSE_MSG: "QR code requires a user code"}
  61. response = await self.__hass.async_add_executor_job(
  62. self.__login_control.qr_code,
  63. TUYA_CLIENT_ID,
  64. TUYA_SCHEMA,
  65. user_code,
  66. )
  67. if response.get(TUYA_RESPONSE_SUCCESS, False):
  68. self.__user_code = user_code
  69. self.__qr_code = response[TUYA_RESPONSE_RESULT][TUYA_RESPONSE_QR_CODE]
  70. return self.__qr_code
  71. _LOGGER.error("Failed to get QR code: %s", response)
  72. self.__error_code = response.get(TUYA_RESPONSE_CODE, {})
  73. self.__error_msg = response.get(TUYA_RESPONSE_MSG, "Unknown error")
  74. return False
  75. async def async_login(self) -> bool:
  76. """Login to the Tuya cloud."""
  77. if not self.__user_code or not self.__qr_code:
  78. _LOGGER.warning("Login attempted without successful QR scan")
  79. return False, {}
  80. success, info = await self.__hass.async_add_executor_job(
  81. self.__login_control.login_result,
  82. self.__qr_code,
  83. TUYA_CLIENT_ID,
  84. self.__user_code,
  85. )
  86. if success:
  87. self.__authentication = {
  88. "user_code": self.__user_code,
  89. "terminal_id": info[CONF_TERMINAL_ID],
  90. "endpoint": info[CONF_ENDPOINT],
  91. "token_info": {
  92. "t": info["t"],
  93. "uid": info["uid"],
  94. "expire_time": info["expire_time"],
  95. "access_token": info["access_token"],
  96. "refresh_token": info["refresh_token"],
  97. },
  98. }
  99. self.__hass.data[DOMAIN]["auth_cache"] = self.__authentication
  100. else:
  101. _LOGGER.warning("Login failed: %s", info)
  102. self.__error_code = info.get(TUYA_RESPONSE_CODE, {})
  103. self.__error_msg = info.get(TUYA_RESPONSE_MSG, "Unknown error")
  104. return success
  105. async def async_get_devices(self) -> dict[str, Any]:
  106. """Get all devices associated with the account."""
  107. token_listener = TokenListener(self.__hass)
  108. manager = Manager(
  109. TUYA_CLIENT_ID,
  110. self.__authentication["user_code"],
  111. self.__authentication["terminal_id"],
  112. self.__authentication["endpoint"],
  113. self.__authentication["token_info"],
  114. token_listener,
  115. )
  116. listener = DeviceListener(self.__hass, manager)
  117. manager.add_device_listener(listener)
  118. # Get all devices from Tuya cloud
  119. await self.__hass.async_add_executor_job(manager.update_device_cache)
  120. # Register known device IDs
  121. cloud_devices = {}
  122. domain_data = self.__hass.data.get(DOMAIN)
  123. for device in manager.device_map.values():
  124. cloud_device = {
  125. "category": device.category,
  126. "id": device.id,
  127. "ip": device.ip,
  128. CONF_LOCAL_KEY: device.local_key
  129. if hasattr(device, CONF_LOCAL_KEY)
  130. else "",
  131. "name": device.name,
  132. "node_id": device.node_id if hasattr(device, "node_id") else "",
  133. "online": device.online,
  134. "product_id": device.product_id,
  135. "product_name": device.product_name,
  136. "uid": device.uid,
  137. "uuid": device.uuid,
  138. "support_local": device.support_local,
  139. CONF_DEVICE_CID: None,
  140. "version": None,
  141. "is_hub": (
  142. device.category in HUB_CATEGORIES
  143. or not hasattr(device, "local_key")
  144. ),
  145. }
  146. _LOGGER.debug("Found device: %s", cloud_device["product_name"])
  147. existing_id = domain_data.get(cloud_device["id"]) if domain_data else None
  148. existing_uuid = (
  149. domain_data.get(cloud_device["uuid"]) if domain_data else None
  150. )
  151. existing = existing_id or existing_uuid
  152. cloud_device["exists"] = existing and existing.get("device")
  153. cloud_devices[cloud_device["id"]] = cloud_device
  154. return cloud_devices
  155. async def async_get_datamodel(self, device_id) -> dict[str, Any] | None:
  156. """Get the data model for the specified device (QueryThingsDataModel)."""
  157. token_listener = TokenListener(self.__hass)
  158. manager = Manager(
  159. TUYA_CLIENT_ID,
  160. self.__authentication["user_code"],
  161. self.__authentication["terminal_id"],
  162. self.__authentication["endpoint"],
  163. self.__authentication["token_info"],
  164. token_listener,
  165. )
  166. response = await self.__hass.async_add_executor_job(
  167. manager.customer_api.get,
  168. f"/v1.0/m/life/devices/{device_id}/status",
  169. )
  170. _LOGGER.debug("Datamodel response: %s", response)
  171. if response.get("result"):
  172. response = response["result"]
  173. transform = []
  174. for entry in response.get("dpStatusRelationDTOS"):
  175. if entry["supportLocal"]:
  176. transform.append(
  177. {
  178. "id": entry["dpId"],
  179. "name": entry["dpCode"],
  180. "type": entry["valueType"],
  181. "format": entry["valueDesc"],
  182. "enumMap": entry["enumMappingMap"],
  183. }
  184. )
  185. return transform
  186. @property
  187. def is_authenticated(self) -> bool:
  188. """Is the cloud account authenticated?"""
  189. return True if self.__authentication else False
  190. @property
  191. def last_error(self) -> dict[str, Any] | None:
  192. """The last cloud error code and message, if any."""
  193. if self.__error_code is not None:
  194. return {
  195. TUYA_RESPONSE_MSG: self.__error_msg,
  196. TUYA_RESPONSE_CODE: self.__error_code,
  197. }
  198. class DeviceListener(SharingDeviceListener):
  199. """Device update listener."""
  200. def __init__(
  201. self,
  202. hass: HomeAssistant,
  203. manager: Manager,
  204. ):
  205. self.__hass = hass
  206. self._manager = manager
  207. def update_device(
  208. self,
  209. device: CustomerDevice,
  210. updated_status_properties: list[str] | None,
  211. ) -> None:
  212. """Device status has updated."""
  213. _LOGGER.debug(
  214. "Received update for device %s: %s (properties %s)",
  215. device.id,
  216. self._manager.device_map[device.id].status,
  217. updated_status_properties,
  218. )
  219. def add_device(self, device: CustomerDevice) -> None:
  220. """A new device has been added."""
  221. _LOGGER.debug(
  222. "Received add device %s: %s",
  223. device.id,
  224. self._manager.device_map[device.id].status,
  225. )
  226. def remove_device(self, device_id: str) -> None:
  227. """A device has been removed."""
  228. _LOGGER.debug(
  229. "Received remove device %s: %s",
  230. device_id,
  231. self._manager.device_map[device_id].status,
  232. )
  233. class TokenListener(SharingTokenListener):
  234. """Listener for upstream token updates.
  235. This is only needed to get some debug output when tokens are refreshed."""
  236. def __init__(self, hass: HomeAssistant):
  237. self.__hass = hass
  238. def update_token(self, token_info: dict[str, Any]) -> None:
  239. """Update the token information."""
  240. _LOGGER.debug("Token updated")