cloud.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. import json
  2. import logging
  3. from typing import Any
  4. from homeassistant.core import HomeAssistant
  5. from tuya_sharing import (
  6. CustomerDevice,
  7. LoginControl,
  8. Manager,
  9. SharingDeviceListener,
  10. SharingTokenListener,
  11. )
  12. from .const import (
  13. CONF_DEVICE_CID,
  14. CONF_ENDPOINT,
  15. CONF_LOCAL_KEY,
  16. CONF_TERMINAL_ID,
  17. DOMAIN,
  18. TUYA_CLIENT_ID,
  19. TUYA_RESPONSE_CODE,
  20. TUYA_RESPONSE_MSG,
  21. TUYA_RESPONSE_QR_CODE,
  22. TUYA_RESPONSE_RESULT,
  23. TUYA_RESPONSE_SUCCESS,
  24. TUYA_SCHEMA,
  25. )
  26. _LOGGER = logging.getLogger(__name__)
  27. HUB_CATEGORIES = [
  28. "wgsxj", # Gateway camera
  29. "lyqwg", # Router
  30. "bywg", # IoT edge gateway
  31. "zigbee", # Gateway
  32. "wg2", # Gateway
  33. "dgnzk", # Multi-function controller
  34. "videohub", # Videohub
  35. "xnwg", # Virtual gateway
  36. "qtyycp", # Voice gateway composite solution
  37. "alexa_yywg", # Gateway with Alexa
  38. "gywg", # Industrial gateway
  39. "cnwg", # Energy gateway
  40. "wnykq", # Smart IR
  41. ]
  42. class Cloud:
  43. """Optional Tuya cloud interface for getting device information."""
  44. def __init__(self, hass: HomeAssistant):
  45. self.__login_control = LoginControl()
  46. self.__authentication = {}
  47. self.__user_code = None
  48. self.__qr_code = None
  49. self.__hass = hass
  50. self.__error_code = None
  51. self.__error_msg = None
  52. # Restore cached authentication
  53. if cached := self.__hass.data[DOMAIN].get("auth_cache"):
  54. self.__authentication = cached
  55. async def async_get_qr_code(self, user_code: str | None = None) -> bool:
  56. """Get QR code from Tuya server for user code authentication."""
  57. if not user_code:
  58. user_code = self.__user_code
  59. if not user_code:
  60. _LOGGER.error("Cannot get QR code without a user code")
  61. return False, {TUYA_RESPONSE_MSG: "QR code requires a user code"}
  62. response = await self.__hass.async_add_executor_job(
  63. self.__login_control.qr_code,
  64. TUYA_CLIENT_ID,
  65. TUYA_SCHEMA,
  66. user_code,
  67. )
  68. if response.get(TUYA_RESPONSE_SUCCESS, False):
  69. self.__user_code = user_code
  70. self.__qr_code = response[TUYA_RESPONSE_RESULT][TUYA_RESPONSE_QR_CODE]
  71. return self.__qr_code
  72. self.__error_code = response.get(TUYA_RESPONSE_CODE, {})
  73. self.__error_msg = response.get(TUYA_RESPONSE_MSG, "Unknown error")
  74. return False
  75. async def async_login(self) -> bool:
  76. """Login to the Tuya cloud."""
  77. if not self.__user_code or not self.__qr_code:
  78. _LOGGER.warn("Login attempted without successful QR scan")
  79. return False, {}
  80. success, info = await self.__hass.async_add_executor_job(
  81. self.__login_control.login_result,
  82. self.__qr_code,
  83. TUYA_CLIENT_ID,
  84. self.__user_code,
  85. )
  86. if success:
  87. self.__authentication = {
  88. "user_code": self.__user_code,
  89. "terminal_id": info[CONF_TERMINAL_ID],
  90. "endpoint": info[CONF_ENDPOINT],
  91. "token_info": {
  92. "t": info["t"],
  93. "uid": info["uid"],
  94. "expire_time": info["expire_time"],
  95. "access_token": info["access_token"],
  96. "refresh_token": info["refresh_token"],
  97. },
  98. }
  99. self.__hass.data[DOMAIN]["auth_cache"] = self.__authentication
  100. else:
  101. self.__error_code = info.get(TUYA_RESPONSE_CODE, {})
  102. self.__error_msg = info.get(TUYA_RESPONSE_MSG, "Unknown error")
  103. return success
  104. async def async_get_devices(self) -> dict[str, Any]:
  105. """Get all devices associated with the account."""
  106. token_listener = TokenListener(self.__hass)
  107. manager = Manager(
  108. TUYA_CLIENT_ID,
  109. self.__authentication["user_code"],
  110. self.__authentication["terminal_id"],
  111. self.__authentication["endpoint"],
  112. self.__authentication["token_info"],
  113. token_listener,
  114. )
  115. listener = DeviceListener(self.__hass, manager)
  116. manager.add_device_listener(listener)
  117. # Get all devices from Tuya cloud
  118. await self.__hass.async_add_executor_job(manager.update_device_cache)
  119. # Register known device IDs
  120. cloud_devices = {}
  121. domain_data = self.__hass.data.get(DOMAIN)
  122. for device in manager.device_map.values():
  123. cloud_device = {
  124. "category": device.category,
  125. "id": device.id,
  126. "ip": device.ip,
  127. CONF_LOCAL_KEY: device.local_key
  128. if hasattr(device, CONF_LOCAL_KEY)
  129. else "",
  130. "name": device.name,
  131. "node_id": device.node_id if hasattr(device, "node_id") else "",
  132. "online": device.online,
  133. "product_id": device.product_id,
  134. "product_name": device.product_name,
  135. "uid": device.uid,
  136. "uuid": device.uuid,
  137. "support_local": device.support_local,
  138. CONF_DEVICE_CID: None,
  139. "version": None,
  140. "is_hub": (
  141. device.category in HUB_CATEGORIES
  142. or not hasattr(device, "local_key")
  143. ),
  144. }
  145. _LOGGER.debug("Found device: {cloud_device}")
  146. existing_id = domain_data.get(cloud_device["id"]) if domain_data else None
  147. existing_uuid = (
  148. domain_data.get(cloud_device["uuid"]) if domain_data else None
  149. )
  150. existing = existing_id or existing_uuid
  151. cloud_device["exists"] = existing and existing.get("device")
  152. cloud_devices[cloud_device["id"]] = cloud_device
  153. return cloud_devices
  154. async def async_get_datamodel(self, device_id) -> dict[str, Any] | None:
  155. """Get the data model for the specified device (QueryThingsDataModel)."""
  156. token_listener = TokenListener(self.__hass)
  157. manager = Manager(
  158. TUYA_CLIENT_ID,
  159. self.__authentication["user_code"],
  160. self.__authentication["terminal_id"],
  161. self.__authentication["endpoint"],
  162. self.__authentication["token_info"],
  163. token_listener,
  164. )
  165. response = await self.__hass.async_add_executor_job(
  166. manager.customer_api.get,
  167. f"/v2.0/cloud/things/{device_id}/model",
  168. )
  169. if response.get("result"):
  170. response = response["result"]
  171. if response.get("model"):
  172. return json.loads(response["model"])
  173. return response
  174. @property
  175. def is_authenticated(self) -> bool:
  176. """Is the cloud account authenticated?"""
  177. return True if self.__authentication else False
  178. @property
  179. def last_error(self) -> dict[str, Any] | None:
  180. """The last cloud error code and message, if any."""
  181. if self.__error_code is not None:
  182. return {
  183. TUYA_RESPONSE_MSG: self.__error_msg,
  184. TUYA_RESPONSE_CODE: self.__error_code,
  185. }
  186. class DeviceListener(SharingDeviceListener):
  187. """Device update listener."""
  188. def __init__(
  189. self,
  190. hass: HomeAssistant,
  191. manager: Manager,
  192. ):
  193. self.__hass = hass
  194. self._manager = manager
  195. def update_device(self, device: CustomerDevice) -> None:
  196. """Device status has updated."""
  197. _LOGGER.debug(
  198. "Received update for device %s: %s",
  199. device.id,
  200. self._manager.device_map[device.id].status,
  201. )
  202. def add_device(self, device: CustomerDevice) -> None:
  203. """A new device has been added."""
  204. _LOGGER.device(
  205. "Received add device %s: %s",
  206. device.id,
  207. self._manager.device_map[device.id].status,
  208. )
  209. def remove_device(self, device_id: str) -> None:
  210. """A device has been removed."""
  211. _LOGGER.debug(
  212. "Received remove device %s: %s",
  213. device_id,
  214. self._manager.device_map[device_id].status,
  215. )
  216. class TokenListener(SharingTokenListener):
  217. """Listener for upstream token updates.
  218. This is only needed to get some debug output when tokens are refreshed."""
  219. def __init__(self, hass: HomeAssistant):
  220. self.__hass = hass
  221. def update_token(self, token_info: dict[str, Any]) -> None:
  222. """Update the token information."""
  223. _LOGGER.debug("Token updated")