cloud.py 8.8 KB


  1. import logging
  2. from typing import Any
  3. from homeassistant.core import HomeAssistant
  4. from tuya_sharing import (
  5. CustomerDevice,
  6. LoginControl,
  7. Manager,
  8. SharingDeviceListener,
  9. SharingTokenListener,
  10. )
  11. from .const import (
  12. CONF_DEVICE_CID,
  13. CONF_ENDPOINT,
  14. CONF_LOCAL_KEY,
  15. CONF_TERMINAL_ID,
  16. DOMAIN,
  17. TUYA_CLIENT_ID,
  18. TUYA_RESPONSE_CODE,
  19. TUYA_RESPONSE_MSG,
  20. TUYA_RESPONSE_QR_CODE,
  21. TUYA_RESPONSE_RESULT,
  22. TUYA_RESPONSE_SUCCESS,
  23. TUYA_SCHEMA,
  24. )
  25. _LOGGER = logging.getLogger(__name__)
  26. HUB_CATEGORIES = [
  27. "wgsxj", # Gateway camera
  28. "lyqwg", # Router
  29. "bywg", # IoT edge gateway
  30. "zigbee", # Gateway
  31. "wg2", # Gateway
  32. "dgnzk", # Multi-function controller
  33. "videohub", # Videohub
  34. "xnwg", # Virtual gateway
  35. "qtyycp", # Voice gateway composite solution
  36. "alexa_yywg", # Gateway with Alexa
  37. "gywg", # Industrial gateway
  38. "cnwg", # Energy gateway
  39. "wnykq", # Smart IR
  40. ]
  41. class Cloud:
  42. """Optional Tuya cloud interface for getting device information."""
  43. def __init__(self, hass: HomeAssistant):
  44. self.__login_control = LoginControl()
  45. self.__authentication = {}
  46. self.__user_code = None
  47. self.__qr_code = None
  48. self.__hass = hass
  49. self.__error_code = None
  50. self.__error_msg = None
  51. # Restore cached authentication
  52. if cached := self.__hass.data[DOMAIN].get("auth_cache"):
  53. self.__authentication = cached
  54. async def async_get_qr_code(self, user_code: str | None = None) -> bool:
  55. """Get QR code from Tuya server for user code authentication."""
  56. if not user_code:
  57. user_code = self.__user_code
  58. if not user_code:
  59. _LOGGER.error("Cannot get QR code without a user code")
  60. return False, {TUYA_RESPONSE_MSG: "QR code requires a user code"}
  61. response = await self.__hass.async_add_executor_job(
  62. self.__login_control.qr_code,
  63. TUYA_CLIENT_ID,
  64. TUYA_SCHEMA,
  65. user_code,
  66. )
  67. if response.get(TUYA_RESPONSE_SUCCESS, False):
  68. self.__user_code = user_code
  69. self.__qr_code = response[TUYA_RESPONSE_RESULT][TUYA_RESPONSE_QR_CODE]
  70. return self.__qr_code
  71. self.__error_code = response.get(TUYA_RESPONSE_CODE, {})
  72. self.__error_msg = response.get(TUYA_RESPONSE_MSG, "Unknown error")
  73. return False
  74. async def async_login(self) -> bool:
  75. """Login to the Tuya cloud."""
  76. if not self.__user_code or not self.__qr_code:
  77. _LOGGER.warn("Login attempted without successful QR scan")
  78. return False, {}
  79. success, info = await self.__hass.async_add_executor_job(
  80. self.__login_control.login_result,
  81. self.__qr_code,
  82. TUYA_CLIENT_ID,
  83. self.__user_code,
  84. )
  85. if success:
  86. self.__authentication = {
  87. "user_code": self.__user_code,
  88. "terminal_id": info[CONF_TERMINAL_ID],
  89. "endpoint": info[CONF_ENDPOINT],
  90. "token_info": {
  91. "t": info["t"],
  92. "uid": info["uid"],
  93. "expire_time": info["expire_time"],
  94. "access_token": info["access_token"],
  95. "refresh_token": info["refresh_token"],
  96. },
  97. }
  98. self.__hass.data[DOMAIN]["auth_cache"] = self.__authentication
  99. else:
  100. self.__error_code = info.get(TUYA_RESPONSE_CODE, {})
  101. self.__error_msg = info.get(TUYA_RESPONSE_MSG, "Unknown error")
  102. return success
  103. async def async_get_devices(self) -> dict[str, Any]:
  104. """Get all devices associated with the account."""
  105. token_listener = TokenListener(self.__hass)
  106. manager = Manager(
  107. TUYA_CLIENT_ID,
  108. self.__authentication["user_code"],
  109. self.__authentication["terminal_id"],
  110. self.__authentication["endpoint"],
  111. self.__authentication["token_info"],
  112. token_listener,
  113. )
  114. listener = DeviceListener(self.__hass, manager)
  115. manager.add_device_listener(listener)
  116. # Get all devices from Tuya cloud
  117. await self.__hass.async_add_executor_job(manager.update_device_cache)
  118. # Register known device IDs
  119. cloud_devices = {}
  120. domain_data = self.__hass.data.get(DOMAIN)
  121. for device in manager.device_map.values():
  122. cloud_device = {
  123. "category": device.category,
  124. "id": device.id,
  125. "ip": device.ip,
  126. CONF_LOCAL_KEY: device.local_key
  127. if hasattr(device, CONF_LOCAL_KEY)
  128. else "",
  129. "name": device.name,
  130. "node_id": device.node_id if hasattr(device, "node_id") else "",
  131. "online": device.online,
  132. "product_id": device.product_id,
  133. "product_name": device.product_name,
  134. "uid": device.uid,
  135. "uuid": device.uuid,
  136. "support_local": device.support_local,
  137. CONF_DEVICE_CID: None,
  138. "version": None,
  139. "is_hub": (
  140. device.category in HUB_CATEGORIES
  141. or not hasattr(device, "local_key")
  142. ),
  143. }
  144. _LOGGER.debug("Found device: {cloud_device}")
  145. existing_id = domain_data.get(cloud_device["id"]) if domain_data else None
  146. existing_uuid = (
  147. domain_data.get(cloud_device["uuid"]) if domain_data else None
  148. )
  149. existing = existing_id or existing_uuid
  150. cloud_device["exists"] = existing and existing.get("device")
  151. cloud_devices[cloud_device["id"]] = cloud_device
  152. return cloud_devices
  153. async def async_get_datamodel(self, device_id) -> dict[str, Any] | None:
  154. """Get the data model for the specified device (QueryThingsDataModel)."""
  155. token_listener = TokenListener(self.__hass)
  156. manager = Manager(
  157. TUYA_CLIENT_ID,
  158. self.__authentication["user_code"],
  159. self.__authentication["terminal_id"],
  160. self.__authentication["endpoint"],
  161. self.__authentication["token_info"],
  162. token_listener,
  163. )
  164. response = await self.__hass.async_add_executor_job(
  165. manager.customer_api.get,
  166. manager.customer_api,
  167. f"/v1.0/m/life/devices/{device_id}/status",
  168. )
  169. if response.get("result"):
  170. response = response["result"]
  171. transform = []
  172. for entry in response.get("dpStatusRelationDTOS"):
  173. if entry["supportLocal"]:
  174. transform += {
  175. "id": entry["dpId"],
  176. "name": entry["dpCode"],
  177. "type": entry["valueType"],
  178. "format": entry["valueDesc"],
  179. "enumMap": entry["enumMappingMap"],
  180. }
  181. return transform
  182. @property
  183. def is_authenticated(self) -> bool:
  184. """Is the cloud account authenticated?"""
  185. return True if self.__authentication else False
  186. @property
  187. def last_error(self) -> dict[str, Any] | None:
  188. """The last cloud error code and message, if any."""
  189. if self.__error_code is not None:
  190. return {
  191. TUYA_RESPONSE_MSG: self.__error_msg,
  192. TUYA_RESPONSE_CODE: self.__error_code,
  193. }
  194. class DeviceListener(SharingDeviceListener):
  195. """Device update listener."""
  196. def __init__(
  197. self,
  198. hass: HomeAssistant,
  199. manager: Manager,
  200. ):
  201. self.__hass = hass
  202. self._manager = manager
  203. def update_device(self, device: CustomerDevice) -> None:
  204. """Device status has updated."""
  205. _LOGGER.debug(
  206. "Received update for device %s: %s",
  207. device.id,
  208. self._manager.device_map[device.id].status,
  209. )
  210. def add_device(self, device: CustomerDevice) -> None:
  211. """A new device has been added."""
  212. _LOGGER.device(
  213. "Received add device %s: %s",
  214. device.id,
  215. self._manager.device_map[device.id].status,
  216. )
  217. def remove_device(self, device_id: str) -> None:
  218. """A device has been removed."""
  219. _LOGGER.debug(
  220. "Received remove device %s: %s",
  221. device_id,
  222. self._manager.device_map[device_id].status,
  223. )
  224. class TokenListener(SharingTokenListener):
  225. """Listener for upstream token updates.
  226. This is only needed to get some debug output when tokens are refreshed."""
  227. def __init__(self, hass: HomeAssistant):
  228. self.__hass = hass
  229. def update_token(self, token_info: dict[str, Any]) -> None:
  230. """Update the token information."""
  231. _LOGGER.debug("Token updated")