cloud.py 9.4 KB


  1. import logging
  2. from typing import Any
  3. from homeassistant.core import HomeAssistant
  4. from tuya_sharing import (
  5. CustomerDevice,
  6. LoginControl,
  7. Manager,
  8. SharingDeviceListener,
  9. SharingTokenListener,
  10. )
  11. from .const import (
  12. CONF_DEVICE_CID,
  13. CONF_ENDPOINT,
  14. CONF_LOCAL_KEY,
  15. CONF_TERMINAL_ID,
  16. DOMAIN,
  17. TUYA_CLIENT_ID,
  18. TUYA_RESPONSE_CODE,
  19. TUYA_RESPONSE_MSG,
  20. TUYA_RESPONSE_QR_CODE,
  21. TUYA_RESPONSE_RESULT,
  22. TUYA_RESPONSE_SUCCESS,
  23. TUYA_SCHEMA,
  24. )
  25. _LOGGER = logging.getLogger(__name__)
  26. HUB_CATEGORIES = [
  27. "wgsxj", # Gateway camera
  28. "lyqwg", # Router
  29. "bywg", # IoT edge gateway
  30. "zigbee", # Gateway
  31. "wg2", # Gateway
  32. "dgnzk", # Multi-function controller
  33. "videohub", # Videohub
  34. "xnwg", # Virtual gateway
  35. "qtyycp", # Voice gateway composite solution
  36. "alexa_yywg", # Gateway with Alexa
  37. "gywg", # Industrial gateway
  38. "cnwg", # Energy gateway
  39. "wnykq", # Smart IR
  40. ]
  41. class Cloud:
  42. """Optional Tuya cloud interface for getting device information."""
  43. def __init__(self, hass: HomeAssistant):
  44. self.__login_control = LoginControl()
  45. self.__authentication = {}
  46. self.__user_code = None
  47. self.__qr_code = None
  48. self.__hass = hass
  49. self.__error_code = None
  50. self.__error_msg = None
  51. # Restore cached authentication
  52. if cached := self.__hass.data[DOMAIN].get("auth_cache"):
  53. self.__authentication = cached
  54. async def async_get_qr_code(self, user_code: str | None = None) -> bool:
  55. """Get QR code from Tuya server for user code authentication."""
  56. if not user_code:
  57. user_code = self.__user_code
  58. if not user_code:
  59. _LOGGER.error("Cannot get QR code without a user code")
  60. return False, {TUYA_RESPONSE_MSG: "QR code requires a user code"}
  61. response = await self.__hass.async_add_executor_job(
  62. self.__login_control.qr_code,
  63. TUYA_CLIENT_ID,
  64. TUYA_SCHEMA,
  65. user_code,
  66. )
  67. if response.get(TUYA_RESPONSE_SUCCESS, False):
  68. self.__user_code = user_code
  69. self.__qr_code = response[TUYA_RESPONSE_RESULT][TUYA_RESPONSE_QR_CODE]
  70. return self.__qr_code
  71. _LOGGER.error("Failed to get QR code: %s", response)
  72. self.__error_code = response.get(TUYA_RESPONSE_CODE, {})
  73. self.__error_msg = response.get(TUYA_RESPONSE_MSG, "Unknown error")
  74. return False
  75. async def async_login(self) -> bool:
  76. """Login to the Tuya cloud."""
  77. if not self.__user_code or not self.__qr_code:
  78. _LOGGER.warning("Login attempted without successful QR scan")
  79. return False, {}
  80. success, info = await self.__hass.async_add_executor_job(
  81. self.__login_control.login_result,
  82. self.__qr_code,
  83. TUYA_CLIENT_ID,
  84. self.__user_code,
  85. )
  86. if success:
  87. self.__authentication = {
  88. "user_code": self.__user_code,
  89. "terminal_id": info[CONF_TERMINAL_ID],
  90. "endpoint": info[CONF_ENDPOINT],
  91. "token_info": {
  92. "t": info["t"],
  93. "uid": info["uid"],
  94. "expire_time": info["expire_time"],
  95. "access_token": info["access_token"],
  96. "refresh_token": info["refresh_token"],
  97. },
  98. }
  99. self.__hass.data[DOMAIN]["auth_cache"] = self.__authentication
  100. else:
  101. _LOGGER.warning("Login failed: %s", info)
  102. self.__error_code = info.get(TUYA_RESPONSE_CODE, {})
  103. self.__error_msg = info.get(TUYA_RESPONSE_MSG, "Unknown error")
  104. return success
  105. async def async_get_devices(self) -> dict[str, Any]:
  106. """Get all devices associated with the account."""
  107. token_listener = TokenListener(self.__hass)
  108. manager = Manager(
  109. TUYA_CLIENT_ID,
  110. self.__authentication["user_code"],
  111. self.__authentication["terminal_id"],
  112. self.__authentication["endpoint"],
  113. self.__authentication["token_info"],
  114. token_listener,
  115. )
  116. listener = DeviceListener(self.__hass, manager)
  117. manager.add_device_listener(listener)
  118. # Get all devices from Tuya cloud
  119. await self.__hass.async_add_executor_job(manager.update_device_cache)
  120. # Register known device IDs
  121. cloud_devices = {}
  122. domain_data = self.__hass.data.get(DOMAIN)
  123. for device in manager.device_map.values():
  124. cloud_device = {
  125. "category": device.category,
  126. "id": device.id,
  127. "ip": device.ip,
  128. CONF_LOCAL_KEY: device.local_key
  129. if hasattr(device, CONF_LOCAL_KEY)
  130. else "",
  131. "name": device.name,
  132. "node_id": device.node_id if hasattr(device, "node_id") else "",
  133. "online": device.online,
  134. "product_id": device.product_id,
  135. "product_name": device.product_name,
  136. "uid": device.uid,
  137. "uuid": device.uuid,
  138. "support_local": device.support_local,
  139. CONF_DEVICE_CID: None,
  140. "version": None,
  141. "is_hub": (
  142. device.category in HUB_CATEGORIES
  143. or not hasattr(device, "local_key")
  144. ),
  145. }
  146. _LOGGER.debug("Found device: %s", cloud_device["product_name"])
  147. existing_id = domain_data.get(cloud_device["id"]) if domain_data else None
  148. existing_uuid = (
  149. domain_data.get(cloud_device["uuid"]) if domain_data else None
  150. )
  151. existing = existing_id or existing_uuid
  152. cloud_device["exists"] = existing and existing.get("device")
  153. if hasattr(device, "node_id"):
  154. index = "/".join(
  155. [
  156. cloud_device["id"],
  157. cloud_device["node_id"],
  158. ]
  159. )
  160. else:
  161. index = cloud_device["id"]
  162. cloud_devices[index] = cloud_device
  163. return cloud_devices
  164. async def async_get_datamodel(self, device_id) -> dict[str, Any] | None:
  165. """Get the data model for the specified device (QueryThingsDataModel)."""
  166. token_listener = TokenListener(self.__hass)
  167. manager = Manager(
  168. TUYA_CLIENT_ID,
  169. self.__authentication["user_code"],
  170. self.__authentication["terminal_id"],
  171. self.__authentication["endpoint"],
  172. self.__authentication["token_info"],
  173. token_listener,
  174. )
  175. response = await self.__hass.async_add_executor_job(
  176. manager.customer_api.get,
  177. f"/v1.0/m/life/devices/{device_id}/status",
  178. )
  179. _LOGGER.debug("Datamodel response: %s", response)
  180. if response.get("result"):
  181. response = response["result"]
  182. transform = []
  183. for entry in response.get("dpStatusRelationDTOS"):
  184. if entry["supportLocal"]:
  185. transform.append(
  186. {
  187. "id": entry["dpId"],
  188. "name": entry["dpCode"],
  189. "type": entry["valueType"],
  190. "format": entry["valueDesc"],
  191. "enumMap": entry["enumMappingMap"],
  192. }
  193. )
  194. return transform
  195. @property
  196. def is_authenticated(self) -> bool:
  197. """Is the cloud account authenticated?"""
  198. return True if self.__authentication else False
  199. @property
  200. def last_error(self) -> dict[str, Any] | None:
  201. """The last cloud error code and message, if any."""
  202. if self.__error_code is not None:
  203. return {
  204. TUYA_RESPONSE_MSG: self.__error_msg,
  205. TUYA_RESPONSE_CODE: self.__error_code,
  206. }
  207. class DeviceListener(SharingDeviceListener):
  208. """Device update listener."""
  209. def __init__(
  210. self,
  211. hass: HomeAssistant,
  212. manager: Manager,
  213. ):
  214. self.__hass = hass
  215. self._manager = manager
  216. def update_device(
  217. self,
  218. device: CustomerDevice,
  219. updated_status_properties: list[str] | None,
  220. ) -> None:
  221. """Device status has updated."""
  222. _LOGGER.debug(
  223. "Received update for device %s: %s (properties %s)",
  224. device.id,
  225. self._manager.device_map[device.id].status,
  226. updated_status_properties,
  227. )
  228. def add_device(self, device: CustomerDevice) -> None:
  229. """A new device has been added."""
  230. _LOGGER.debug(
  231. "Received add device %s: %s",
  232. device.id,
  233. self._manager.device_map[device.id].status,
  234. )
  235. def remove_device(self, device_id: str) -> None:
  236. """A device has been removed."""
  237. _LOGGER.debug(
  238. "Received remove device %s: %s",
  239. device_id,
  240. self._manager.device_map[device_id].status,
  241. )
  242. class TokenListener(SharingTokenListener):
  243. """Listener for upstream token updates.
  244. This is only needed to get some debug output when tokens are refreshed."""
  245. def __init__(self, hass: HomeAssistant):
  246. self.__hass = hass
  247. def update_token(self, token_info: dict[str, Any]) -> None:
  248. """Update the token information."""
  249. _LOGGER.debug("Token updated")