cloud.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. import json
  2. import logging
  3. from typing import Any
  4. from homeassistant.core import HomeAssistant
  5. from tuya_sharing import (
  6. CustomerDevice,
  7. LoginControl,
  8. Manager,
  9. SharingDeviceListener,
  10. SharingTokenListener,
  11. )
  12. from .const import (
  13. CONF_DEVICE_CID,
  14. CONF_ENDPOINT,
  15. CONF_LOCAL_KEY,
  16. CONF_TERMINAL_ID,
  17. DOMAIN,
  18. TUYA_CLIENT_ID,
  19. TUYA_RESPONSE_CODE,
  20. TUYA_RESPONSE_MSG,
  21. TUYA_RESPONSE_QR_CODE,
  22. TUYA_RESPONSE_RESULT,
  23. TUYA_RESPONSE_SUCCESS,
  24. TUYA_SCHEMA,
  25. )
  26. _LOGGER = logging.getLogger(__name__)
  27. HUB_CATEGORIES = [
  28. "wgsxj", # Gateway camera
  29. "lyqwg", # Router
  30. "bywg", # IoT edge gateway
  31. "zigbee", # Gateway
  32. "wg2", # Gateway
  33. "dgnzk", # Multi-function controller
  34. "videohub", # Videohub
  35. "xnwg", # Virtual gateway
  36. "qtyycp", # Voice gateway composite solution
  37. "alexa_yywg", # Gateway with Alexa
  38. "gywg", # Industrial gateway
  39. "cnwg", # Energy gateway
  40. "wnykq", # Smart IR
  41. ]
  42. class Cloud:
  43. """Optional Tuya cloud interface for getting device information."""
  44. def __init__(self, hass: HomeAssistant):
  45. self.__login_control = LoginControl()
  46. self.__authentication = {}
  47. self.__user_code = None
  48. self.__qr_code = None
  49. self.__hass = hass
  50. self.__error_code = None
  51. self.__error_msg = None
  52. # Restore cached authentication
  53. if cached := self.__hass.data[DOMAIN].get("auth_cache"):
  54. self.__authentication = cached
  55. async def async_get_qr_code(self, user_code: str | None = None) -> bool:
  56. """Get QR code from Tuya server for user code authentication."""
  57. if not user_code:
  58. user_code = self.__user_code
  59. if not user_code:
  60. _LOGGER.error("Cannot get QR code without a user code")
  61. return False, {TUYA_RESPONSE_MSG: "QR code requires a user code"}
  62. response = await self.__hass.async_add_executor_job(
  63. self.__login_control.qr_code,
  64. TUYA_CLIENT_ID,
  65. TUYA_SCHEMA,
  66. user_code,
  67. )
  68. if response.get(TUYA_RESPONSE_SUCCESS, False):
  69. self.__user_code = user_code
  70. self.__qr_code = response[TUYA_RESPONSE_RESULT][TUYA_RESPONSE_QR_CODE]
  71. return self.__qr_code
  72. self.__error_code = response.get(TUYA_RESPONSE_CODE, {})
  73. self.__error_msg = response.get(TUYA_RESPONSE_MSG, "Unknown error")
  74. return False
  75. async def async_login(self) -> bool:
  76. """Login to the Tuya cloud."""
  77. if not self.__user_code or not self.__qr_code:
  78. _LOGGER.warn("Login attempted without successful QR scan")
  79. return False, {}
  80. success, info = await self.__hass.async_add_executor_job(
  81. self.__login_control.login_result,
  82. self.__qr_code,
  83. TUYA_CLIENT_ID,
  84. self.__user_code,
  85. )
  86. if success:
  87. self.__authentication = {
  88. "user_code": self.__user_code,
  89. "terminal_id": info[CONF_TERMINAL_ID],
  90. "endpoint": info[CONF_ENDPOINT],
  91. "token_info": {
  92. "t": info["t"],
  93. "uid": info["uid"],
  94. "expire_time": info["expire_time"],
  95. "access_token": info["access_token"],
  96. "refresh_token": info["refresh_token"],
  97. },
  98. }
  99. self.__hass.data[DOMAIN]["auth_cache"] = self.__authentication
  100. else:
  101. self.__error_code = info.get(TUYA_RESPONSE_CODE, {})
  102. self.__error_msg = info.get(TUYA_RESPONSE_MSG, "Unknown error")
  103. return success
  104. async def async_get_devices(self) -> dict[str, Any]:
  105. """Get all devices associated with the account."""
  106. token_listener = TokenListener(self.__hass)
  107. manager = Manager(
  108. TUYA_CLIENT_ID,
  109. self.__authentication["user_code"],
  110. self.__authentication["terminal_id"],
  111. self.__authentication["endpoint"],
  112. self.__authentication["token_info"],
  113. token_listener,
  114. )
  115. listener = DeviceListener(self.__hass, manager)
  116. manager.add_device_listener(listener)
  117. # Get all devices from Tuya cloud
  118. await self.__hass.async_add_executor_job(manager.update_device_cache)
  119. # Register known device IDs
  120. cloud_devices = {}
  121. domain_data = self.__hass.data.get(DOMAIN)
  122. for device in manager.device_map.values():
  123. cloud_device = {
  124. "category": device.category,
  125. "id": device.id,
  126. "ip": device.ip,
  127. CONF_LOCAL_KEY: device.local_key
  128. if hasattr(device, CONF_LOCAL_KEY)
  129. else "",
  130. "name": device.name,
  131. "node_id": device.node_id if hasattr(device, "node_id") else "",
  132. "online": device.online,
  133. "product_id": device.product_id,
  134. "product_name": device.product_name,
  135. "uid": device.uid,
  136. "uuid": device.uuid,
  137. "support_local": device.support_local,
  138. CONF_DEVICE_CID: None,
  139. "version": None,
  140. "is_hub": device.category in HUB_CATEGORIES or device.local_key == "",
  141. }
  142. _LOGGER.debug("Found device: {cloud_device}")
  143. existing_id = domain_data.get(cloud_device["id"]) if domain_data else None
  144. existing_uuid = (
  145. domain_data.get(cloud_device["uuid"]) if domain_data else None
  146. )
  147. existing = existing_id or existing_uuid
  148. cloud_device["exists"] = existing and existing.get("device")
  149. cloud_devices[cloud_device["id"]] = cloud_device
  150. return cloud_devices
  151. async def async_get_datamodel(self, device_id) -> dict[str, Any] | None:
  152. """Get the data model for the specified device (QueryThingsDataModel)."""
  153. token_listener = TokenListener(self.__hass)
  154. manager = Manager(
  155. TUYA_CLIENT_ID,
  156. self.__authentication["user_code"],
  157. self.__authentication["terminal_id"],
  158. self.__authentication["endpoint"],
  159. self.__authentication["token_info"],
  160. token_listener,
  161. )
  162. response = await self.__hass.async_add_executor_job(
  163. manager.customer_api.get,
  164. f"/v2.0/cloud/things/{device_id}/model",
  165. )
  166. if response.get("result"):
  167. response = response["result"]
  168. if response.get("model"):
  169. return json.loads(response["model"])
  170. return response
  171. @property
  172. def is_authenticated(self) -> bool:
  173. """Is the cloud account authenticated?"""
  174. return True if self.__authentication else False
  175. @property
  176. def last_error(self) -> dict[str, Any] | None:
  177. """The last cloud error code and message, if any."""
  178. if self.__error_code is not None:
  179. return {
  180. TUYA_RESPONSE_MSG: self.__error_msg,
  181. TUYA_RESPONSE_CODE: self.__error_code,
  182. }
  183. class DeviceListener(SharingDeviceListener):
  184. """Device update listener."""
  185. def __init__(
  186. self,
  187. hass: HomeAssistant,
  188. manager: Manager,
  189. ):
  190. self.__hass = hass
  191. self._manager = manager
  192. def update_device(self, device: CustomerDevice) -> None:
  193. """Device status has updated."""
  194. _LOGGER.debug(
  195. "Received update for device %s: %s",
  196. device.id,
  197. self._manager.device_map[device.id].status,
  198. )
  199. def add_device(self, device: CustomerDevice) -> None:
  200. """A new device has been added."""
  201. _LOGGER.device(
  202. "Received add device %s: %s",
  203. device.id,
  204. self._manager.device_map[device.id].status,
  205. )
  206. def remove_device(self, device_id: str) -> None:
  207. """A device has been removed."""
  208. _LOGGER.debug(
  209. "Received remove device %s: %s",
  210. device_id,
  211. self._manager.device_map[device_id].status,
  212. )
  213. class TokenListener(SharingTokenListener):
  214. """Listener for upstream token updates.
  215. This is only needed to get some debug output when tokens are refreshed."""
  216. def __init__(self, hass: HomeAssistant):
  217. self.__hass = hass
  218. def update_token(self, token_info: dict[str, Any]) -> None:
  219. """Update the token information."""
  220. _LOGGER.debug("Token updated")