device.py 28 KB


  1. """
  2. API for Tuya Local devices.
  3. """
  4. import asyncio
  5. import logging
  6. from asyncio.exceptions import CancelledError
  7. from threading import Lock
  8. from time import time
  9. import tinytuya
  10. from homeassistant.const import (
  11. CONF_HOST,
  12. CONF_NAME,
  13. EVENT_HOMEASSISTANT_STARTED,
  14. EVENT_HOMEASSISTANT_STOP,
  15. )
  16. from homeassistant.core import HomeAssistant, callback
  17. from .const import (
  18. API_PROTOCOL_VERSIONS,
  19. CONF_DEVICE_CID,
  20. CONF_DEVICE_ID,
  21. CONF_LOCAL_KEY,
  22. CONF_POLL_ONLY,
  23. CONF_PROTOCOL_VERSION,
  24. DOMAIN,
  25. )
  26. from .helpers.config import get_device_id
  27. from .helpers.device_config import possible_matches
  28. from .helpers.log import log_json
  29. _LOGGER = logging.getLogger(__name__)
  30. def _collect_possible_matches(cached_state, product_ids):
  31. """Collect possible matches from generator into an array."""
  32. return list(possible_matches(cached_state, product_ids))
  33. class TuyaLocalDevice(object):
  34. def __init__(
  35. self,
  36. name,
  37. dev_id,
  38. address,
  39. local_key,
  40. protocol_version,
  41. dev_cid,
  42. hass: HomeAssistant,
  43. poll_only=False,
  44. ):
  45. """
  46. Represents a Tuya-based device.
  47. Args:
  48. name (str): The device name.
  49. dev_id (str): The device id.
  50. address (str): The network address.
  51. local_key (str): The encryption key.
  52. protocol_version (str | number): The protocol version.
  53. dev_cid (str): The sub device id.
  54. hass (HomeAssistant): The Home Assistant instance.
  55. poll_only (bool): True if the device should be polled only
  56. """
  57. self._name = name
  58. self._children = []
  59. self._force_dps = []
  60. self._product_ids = []
  61. self._running = False
  62. self._shutdown_listener = None
  63. self._startup_listener = None
  64. self._api_protocol_version_index = None
  65. self._api_protocol_working = False
  66. self._api_working_protocol_failures = 0
  67. self.dev_cid = dev_cid
  68. self.address = address
  69. self.dev_id = dev_id
  70. self.local_key = local_key
  71. self._api = None
  72. if hass.data[DOMAIN].get(dev_id) and name != "Test":
  73. self._api = hass.data[DOMAIN][dev_id]["tuyadevice"]
  74. self._refresh_task = None
  75. self._protocol_configured = protocol_version
  76. self._poll_only = poll_only
  77. self._temporary_poll = False
  78. self._reset_cached_state()
  79. self._hass = hass
  80. # API calls to update Tuya devices are asynchronous and non-blocking.
  81. # This means you can send a change and immediately request an updated
  82. # state (like HA does), but because it has not yet finished processing
  83. # you will be returned the old state.
  84. # The solution is to keep a temporary list of changed properties that
  85. # we can overlay onto the state while we wait for the board to update
  86. # its switches.
  87. self._FAKE_IT_TIMEOUT = 5
  88. self._CACHE_TIMEOUT = 30
  89. # More attempts are needed in auto mode so we can cycle through all
  90. # the possibilities a couple of times
  91. self._AUTO_CONNECTION_ATTEMPTS = len(API_PROTOCOL_VERSIONS) * 2 + 1
  92. self._SINGLE_PROTO_CONNECTION_ATTEMPTS = 3
  93. # The number of failures from a working protocol before retrying other protocols.
  94. self._AUTO_FAILURE_RESET_COUNT = 10
  95. self._lock = Lock()
  96. @property
  97. def name(self):
  98. return self._name
  99. @property
  100. def unique_id(self):
  101. """Return the unique id for this device (the dev_id or dev_cid)."""
  102. return self.dev_cid or self.dev_id
  103. @property
  104. def device_info(self):
  105. """Return the device information for this device."""
  106. return {
  107. "identifiers": {(DOMAIN, self.unique_id)},
  108. "name": self.name,
  109. "manufacturer": "Tuya",
  110. }
  111. @property
  112. def has_returned_state(self):
  113. """Return True if the device has returned some state."""
  114. cached = self._get_cached_state()
  115. return len(cached) > 1 or cached.get("updated_at", 0) > 0
  116. @callback
  117. def actually_start(self, event=None):
  118. _LOGGER.debug("Starting monitor loop for %s", self.name)
  119. self._running = True
  120. self._shutdown_listener = self._hass.bus.async_listen_once(
  121. EVENT_HOMEASSISTANT_STOP, self.async_stop
  122. )
  123. if not self._refresh_task:
  124. self._refresh_task = self._hass.async_create_task(self.receive_loop())
  125. def start(self):
  126. if self._hass.is_stopping:
  127. return
  128. elif self._hass.is_running:
  129. if self._startup_listener:
  130. self._startup_listener()
  131. self._startup_listener = None
  132. self.actually_start()
  133. else:
  134. self._startup_listener = self._hass.bus.async_listen_once(
  135. EVENT_HOMEASSISTANT_STARTED, self.actually_start
  136. )
  137. async def async_stop(self, event=None):
  138. _LOGGER.debug("Stopping monitor loop for %s", self.name)
  139. self._running = False
  140. self._children.clear()
  141. self._force_dps.clear()
  142. if self._refresh_task:
  143. self._api.set_socketPersistent(False)
  144. if self._api.parent:
  145. self._api.parent.set_socketPersistent(False)
  146. await self._refresh_task
  147. _LOGGER.debug("Monitor loop for %s stopped", self.name)
  148. self._refresh_task = None
  149. if self._api:
  150. await self._api.close()
  151. self._api = None
  152. def register_entity(self, entity):
  153. # If this is the first child entity to register, and HA is still
  154. # starting, refresh the device state so it shows as available without
  155. # waiting for startup to complete.
  156. should_poll = len(self._children) == 0 and not self._hass.is_running
  157. self._children.append(entity)
  158. for dp in entity._config.dps():
  159. if dp.force and dp.id not in self._force_dps:
  160. self._force_dps.append(int(dp.id))
  161. if not self._running and not self._startup_listener:
  162. self.start()
  163. if self.has_returned_state:
  164. entity.async_schedule_update_ha_state()
  165. elif should_poll:
  166. entity.async_schedule_update_ha_state(True)
  167. async def async_unregister_entity(self, entity):
  168. self._children.remove(entity)
  169. if not self._children:
  170. try:
  171. await self.async_stop()
  172. except CancelledError:
  173. pass
  174. async def receive_loop(self):
  175. """Coroutine wrapper for async_receive generator."""
  176. try:
  177. async for poll in self.async_receive():
  178. if isinstance(poll, dict):
  179. _LOGGER.debug(
  180. "%s received %s",
  181. self.name,
  182. log_json(poll),
  183. )
  184. full_poll = poll.pop("full_poll", False)
  185. self._cached_state = self._cached_state | poll
  186. self._cached_state["updated_at"] = time()
  187. self._remove_properties_from_pending_updates(poll)
  188. for entity in self._children:
  189. # let entities trigger off poll contents directly
  190. entity.on_receive(poll, full_poll)
  191. # clear non-persistant dps that were not in a full poll
  192. if full_poll:
  193. for dp in entity._config.dps():
  194. if not dp.persist and dp.id not in poll:
  195. self._cached_state.pop(dp.id, None)
  196. entity.schedule_update_ha_state()
  197. else:
  198. _LOGGER.debug(
  199. "%s received non data %s",
  200. self.name,
  201. log_json(poll),
  202. )
  203. _LOGGER.warning("%s receive loop has terminated", self.name)
  204. except Exception as t:
  205. _LOGGER.exception(
  206. "%s receive loop terminated by exception %s", self.name, t
  207. )
  208. self._api.set_socketPersistent(False)
  209. if self._api.parent:
  210. self._api.parent.set_socketPersistent(False)
  211. @property
  212. def should_poll(self):
  213. return self._poll_only or self._temporary_poll or not self.has_returned_state
  214. def pause(self):
  215. self._temporary_poll = True
  216. if self._api:
  217. self._api.set_socketPersistent(False)
  218. if self._api.parent:
  219. self._api.parent.set_socketPersistent(False)
  220. def resume(self):
  221. self._temporary_poll = False
  222. async def async_ensure_connection(self):
  223. """Ensure the device is connected and has returned state."""
  224. if self._api is None:
  225. try:
  226. if self.dev_cid:
  227. if self._hass.data[DOMAIN].get(self.dev_id) and self.name != "Test":
  228. parent = self._hass.data[DOMAIN][self.dev_id]["tuyadevice"]
  229. else:
  230. parent = None
  231. if self.name != "Test":
  232. self._hass.data[DOMAIN][self.dev_id] = {
  233. "tuyadevice": parent
  234. }
  235. self._api = await tinytuya.DeviceAsync.create(
  236. self.dev_cid,
  237. cid=self.dev_cid,
  238. parent=parent,
  239. )
  240. else:
  241. if self._hass.data[DOMAIN].get(self.dev_id) and self.name != "Test":
  242. self._api = self._hass.data[DOMAIN][self.dev_id]["tuyadevice"]
  243. else:
  244. self._api = await tinytuya.DeviceAsync.create(
  245. self.dev_id, self.address, self.local_key
  246. )
  247. if self.name != "Test":
  248. self._hass.data[DOMAIN][self.dev_id] = {
  249. "tuyadevice": self._api
  250. }
  251. except Exception as e:
  252. _LOGGER.error(
  253. "%s: %s while initialising device %s",
  254. type(e).__name__,
  255. e,
  256. self.dev_id,
  257. )
  258. raise e
  259. async def async_receive(self):
  260. """Receive messages from a persistent connection asynchronously."""
  261. # If we didn't yet get any state from the device, we may need to
  262. # negotiate the protocol before making the connection persistent
  263. persist = not self.should_poll
  264. # flag to alternate updatedps and status calls to ensure we get
  265. # all dps updated
  266. dps_updated = False
  267. await self.async_ensure_connection()
  268. # we handle retries at a higher level so we can rotate protocol version
  269. self._api.set_socketRetryLimit(1)
  270. if self._api.parent:
  271. # Retries cause problems for other children of the parent device
  272. self._api.parent.set_socketRetryLimit(1)
  273. self._api.set_socketPersistent(persist)
  274. if self._api.parent:
  275. self._api.parent.set_socketPersistent(persist)
  276. while self._running:
  277. error_count = self._api_working_protocol_failures
  278. try:
  279. last_cache = self._cached_state.get("updated_at", 0)
  280. now = time()
  281. full_poll = False
  282. if persist == self.should_poll:
  283. # use persistent connections after initial communication
  284. # has been established. Until then, we need to rotate
  285. # the protocol version, which seems to require a fresh
  286. # connection.
  287. persist = not self.should_poll
  288. _LOGGER.debug(
  289. "%s persistant connection set to %s", self.name, persist
  290. )
  291. self._api.set_socketPersistent(persist)
  292. if self._api.parent:
  293. self._api.parent.set_socketPersistent(persist)
  294. if now - last_cache > self._CACHE_TIMEOUT:
  295. if (
  296. self._force_dps
  297. and not dps_updated
  298. and self._api_protocol_working
  299. ):
  300. poll = await self._retry_on_failed_connection(
  301. lambda: self._api.updatedps(self._force_dps),
  302. f"Failed to update device dps for {self.name}",
  303. )
  304. dps_updated = True
  305. else:
  306. poll = await self._retry_on_failed_connection(
  307. lambda: self._api.status(),
  308. f"Failed to fetch device status for {self.name}",
  309. )
  310. dps_updated = False
  311. full_poll = True
  312. elif persist:
  313. await self._api.heartbeat(True)
  314. poll = await self._api.receive()
  315. else:
  316. await asyncio.sleep(5)
  317. poll = None
  318. if poll:
  319. if "Error" in poll:
  320. # increment the error count if not done already
  321. if error_count == self._api_working_protocol_failures:
  322. self._api_working_protocol_failures += 1
  323. if self._api_working_protocol_failures == 1:
  324. _LOGGER.warning(
  325. "%s error reading: %s", self.name, poll["Error"]
  326. )
  327. else:
  328. _LOGGER.debug(
  329. "%s error reading: %s", self.name, poll["Error"]
  330. )
  331. if "Payload" in poll and poll["Payload"]:
  332. _LOGGER.debug(
  333. "%s err payload: %s",
  334. self.name,
  335. poll["Payload"],
  336. )
  337. else:
  338. if "dps" in poll:
  339. poll = poll["dps"]
  340. poll["full_poll"] = full_poll
  341. yield poll
  342. await asyncio.sleep(0.1 if self.has_returned_state else 5)
  343. except CancelledError:
  344. self._running = False
  345. # Close the persistent connection when exiting the loop
  346. self._api.set_socketPersistent(False)
  347. if self._api.parent:
  348. self._api.parent.set_socketPersistent(False)
  349. raise
  350. except Exception as t:
  351. _LOGGER.exception(
  352. "%s receive loop error %s:%s",
  353. self.name,
  354. type(t).__name__,
  355. t,
  356. )
  357. self._api.set_socketPersistent(False)
  358. if self._api.parent:
  359. self._api.parent.set_socketPersistent(False)
  360. await asyncio.sleep(5)
  361. # Close the persistent connection when exiting the loop
  362. self._api.set_socketPersistent(False)
  363. if self._api.parent:
  364. self._api.parent.set_socketPersistent(False)
  365. def set_detected_product_id(self, product_id):
  366. self._product_ids.append(product_id)
  367. async def async_possible_types(self):
  368. await self.async_ensure_connection()
  369. cached_state = self._get_cached_state()
  370. if len(cached_state) <= 1:
  371. # in case of device22 devices, we need to poll them with a dp
  372. # that exists on the device to get anything back. Most switch-like
  373. # devices have dp 1. Lights generally start from 20. 101 is where
  374. # vendor specific dps start. Between them, these three should cover
  375. # most devices. 148 covers a doorbell device that didn't have these
  376. # 201 covers remote controllers and 2 and 9 cover others without 1
  377. self._api.set_dpsUsed(
  378. {
  379. "1": None,
  380. "2": None,
  381. "9": None,
  382. "20": None,
  383. "60": None,
  384. "101": None,
  385. "148": None,
  386. "201": None,
  387. }
  388. )
  389. await self.async_refresh()
  390. cached_state = self._get_cached_state()
  391. return await self._hass.async_add_executor_job(
  392. _collect_possible_matches,
  393. cached_state,
  394. self._product_ids,
  395. )
  396. async def async_inferred_type(self):
  397. best_match = None
  398. best_quality = 0
  399. cached_state = self._get_cached_state()
  400. possible = await self.async_possible_types()
  401. for config in possible:
  402. quality = config.match_quality(cached_state, self._product_ids)
  403. _LOGGER.info(
  404. "%s considering %s with quality %s",
  405. self.name,
  406. config.name,
  407. quality,
  408. )
  409. if quality > best_quality:
  410. best_quality = quality
  411. best_match = config
  412. if best_match:
  413. return best_match.config_type
  414. _LOGGER.warning(
  415. "Detection for %s with dps %s failed",
  416. self.name,
  417. log_json(cached_state),
  418. )
  419. async def async_refresh(self):
  420. _LOGGER.debug("Refreshing device state for %s", self.name)
  421. if not self._running:
  422. await self._retry_on_failed_connection(
  423. lambda: self._refresh_cached_state(),
  424. f"Failed to refresh device state for {self.name}.",
  425. )
  426. def get_property(self, dps_id):
  427. cached_state = self._get_cached_state()
  428. return cached_state.get(dps_id)
  429. async def async_set_property(self, dps_id, value):
  430. await self.async_set_properties({dps_id: value})
  431. def anticipate_property_value(self, dps_id, value):
  432. """
  433. Update a value in the cached state only. This is good for when you
  434. know the device will reflect a new state in the next update, but
  435. don't want to wait for that update for the device to represent
  436. this state.
  437. The anticipated value will be cleared with the next update.
  438. """
  439. self._cached_state[dps_id] = value
  440. def _reset_cached_state(self):
  441. self._cached_state = {"updated_at": 0}
  442. self._pending_updates = {}
  443. self._last_connection = 0
  444. async def _refresh_cached_state(self):
  445. await self.async_ensure_connection()
  446. new_state = await self._api.status()
  447. if new_state and "Err" not in new_state:
  448. self._cached_state = self._cached_state | new_state.get("dps", {})
  449. self._cached_state["updated_at"] = time()
  450. for entity in self._children:
  451. for dp in entity._config.dps():
  452. # Clear non-persistant dps that were not in the poll
  453. if not dp.persist and dp.id not in new_state.get("dps", {}):
  454. self._cached_state.pop(dp.id, None)
  455. entity.schedule_update_ha_state()
  456. _LOGGER.debug(
  457. "%s refreshed device state: %s",
  458. self.name,
  459. log_json(new_state),
  460. )
  461. if "Err" in new_state:
  462. if self._api_working_protocol_failures == 1:
  463. _LOGGER.warning(
  464. "%s protocol error %s: %s",
  465. self.name,
  466. new_state.get("Err"),
  467. new_state.get("Error", "message not provided"),
  468. )
  469. else:
  470. _LOGGER.debug(
  471. "%s protocol error %s: %s",
  472. self.name,
  473. new_state.get("Err"),
  474. new_state.get("Error", "message not provided"),
  475. )
  476. _LOGGER.debug(
  477. "new state (incl pending): %s",
  478. log_json(self._get_cached_state()),
  479. )
  480. return new_state
  481. async def async_set_properties(self, properties):
  482. if len(properties) == 0:
  483. return
  484. self._add_properties_to_pending_updates(properties)
  485. await self._debounce_sending_updates()
  486. def _add_properties_to_pending_updates(self, properties):
  487. now = time()
  488. pending_updates = self._get_pending_updates()
  489. for key, value in properties.items():
  490. pending_updates[key] = {
  491. "value": value,
  492. "updated_at": now,
  493. "sent": False,
  494. }
  495. _LOGGER.debug(
  496. "%s new pending updates: %s",
  497. self.name,
  498. log_json(pending_updates),
  499. )
  500. def _remove_properties_from_pending_updates(self, data):
  501. self._pending_updates = {
  502. key: value
  503. for key, value in self._pending_updates.items()
  504. if key not in data or not value["sent"] or data[key] != value["value"]
  505. }
  506. async def _debounce_sending_updates(self):
  507. now = time()
  508. since = now - self._last_connection
  509. # set this now to avoid a race condition, it will be updated later
  510. # when the data is actally sent
  511. self._last_connection = now
  512. # Only delay a second if there was recently another command.
  513. # Otherwise delay 1ms, to keep things simple by reusing the
  514. # same send mechanism.
  515. waittime = 1 if since < 1.1 and self.should_poll else 0.001
  516. await asyncio.sleep(waittime)
  517. await self._send_pending_updates()
  518. async def _send_pending_updates(self):
  519. pending_properties = self._get_unsent_properties()
  520. _LOGGER.debug(
  521. "%s sending dps update: %s",
  522. self.name,
  523. log_json(pending_properties),
  524. )
  525. await self._retry_on_failed_connection(
  526. lambda: self._set_values(pending_properties),
  527. "Failed to update device state.",
  528. )
  529. async def _set_values(self, properties):
  530. await self.async_ensure_connection()
  531. try:
  532. self._lock.acquire()
  533. await self._api.set_multiple_values(properties, nowait=True)
  534. self._cached_state["updated_at"] = 0
  535. now = time()
  536. self._last_connection = now
  537. pending_updates = self._get_pending_updates()
  538. for key in properties.keys():
  539. pending_updates[key]["updated_at"] = now
  540. pending_updates[key]["sent"] = True
  541. finally:
  542. self._lock.release()
  543. async def _retry_on_failed_connection(self, func, error_message):
  544. await self.async_ensure_connection()
  545. if self._api_protocol_version_index is None:
  546. self._rotate_api_protocol_version()
  547. auto = (self._protocol_configured == "auto") and (
  548. not self._api_protocol_working
  549. )
  550. connections = (
  551. self._AUTO_CONNECTION_ATTEMPTS
  552. if auto
  553. else self._SINGLE_PROTO_CONNECTION_ATTEMPTS
  554. )
  555. for i in range(connections):
  556. try:
  557. if not self._hass.is_stopping:
  558. retval = await func()
  559. if isinstance(retval, dict) and "Error" in retval:
  560. raise AttributeError(retval["Error"])
  561. self._api_protocol_working = True
  562. self._api_working_protocol_failures = 0
  563. return retval
  564. except Exception as e:
  565. _LOGGER.debug(
  566. "Retrying after exception %s %s (%d/%d)",
  567. type(e).__name__,
  568. e,
  569. i,
  570. connections,
  571. )
  572. if i + 1 == connections:
  573. self._reset_cached_state()
  574. self._api_working_protocol_failures += 1
  575. if (
  576. self._api_working_protocol_failures
  577. > self._AUTO_FAILURE_RESET_COUNT
  578. ):
  579. self._api_protocol_working = False
  580. for entity in self._children:
  581. entity.async_schedule_update_ha_state()
  582. if self._api_working_protocol_failures == 1:
  583. _LOGGER.error(error_message)
  584. else:
  585. _LOGGER.debug(error_message)
  586. if not self._api_protocol_working:
  587. self._rotate_api_protocol_version()
  588. def _get_cached_state(self):
  589. cached_state = self._cached_state.copy()
  590. return {**cached_state, **self._get_pending_properties()}
  591. def _get_pending_properties(self):
  592. return {
  593. key: property["value"]
  594. for key, property in self._get_pending_updates().items()
  595. }
  596. def _get_unsent_properties(self):
  597. return {
  598. key: info["value"]
  599. for key, info in self._get_pending_updates().items()
  600. if not info["sent"]
  601. }
  602. def _get_pending_updates(self):
  603. now = time()
  604. # sort pending updates according to their API identifier
  605. pending_updates_sorted = sorted(
  606. self._pending_updates.items(), key=lambda x: int(x[0])
  607. )
  608. self._pending_updates = {
  609. key: value
  610. for key, value in pending_updates_sorted
  611. if not value["sent"]
  612. or now - value.get("updated_at", 0) < self._FAKE_IT_TIMEOUT
  613. }
  614. return self._pending_updates
  615. def _rotate_api_protocol_version(self):
  616. if self._api_protocol_version_index is None:
  617. try:
  618. self._api_protocol_version_index = API_PROTOCOL_VERSIONS.index(
  619. self._protocol_configured
  620. )
  621. except ValueError:
  622. self._api_protocol_version_index = 0
  623. # only rotate if configured as auto
  624. elif self._protocol_configured == "auto":
  625. self._api_protocol_version_index += 1
  626. if self._api_protocol_version_index >= len(API_PROTOCOL_VERSIONS):
  627. self._api_protocol_version_index = 0
  628. new_version = API_PROTOCOL_VERSIONS[self._api_protocol_version_index]
  629. _LOGGER.debug(
  630. "Setting protocol version for %s to %0.1f",
  631. self.name,
  632. new_version,
  633. )
  634. # If we don't have a connection, don't set the version yet
  635. if not self._api:
  636. return
  637. # Only enable tinytuya's auto-detect when using 3.22
  638. if new_version == 3.22:
  639. new_version = 3.3
  640. self._api.disabledetect = False
  641. else:
  642. self._api.disabledetect = True
  643. self._api.set_version(new_version)
  644. if self._api.parent:
  645. self._api.parent.set_version(new_version)
  646. @staticmethod
  647. def get_key_for_value(obj, value, fallback=None):
  648. keys = list(obj.keys())
  649. values = list(obj.values())
  650. return keys[values.index(value)] if value in values else fallback
  651. def setup_device(hass: HomeAssistant, config: dict):
  652. """Setup a tuya device based on passed in config."""
  653. _LOGGER.info("Creating device: %s", get_device_id(config))
  654. hass.data[DOMAIN] = hass.data.get(DOMAIN, {})
  655. device = TuyaLocalDevice(
  656. config[CONF_NAME],
  657. config[CONF_DEVICE_ID],
  658. config[CONF_HOST],
  659. config[CONF_LOCAL_KEY],
  660. config[CONF_PROTOCOL_VERSION],
  661. config.get(CONF_DEVICE_CID),
  662. hass,
  663. config[CONF_POLL_ONLY],
  664. )
  665. hass.data[DOMAIN][get_device_id(config)] = {
  666. "device": device,
  667. "tuyadevice": device._api,
  668. }
  669. return device
  670. async def async_delete_device(hass: HomeAssistant, config: dict):
  671. device_id = get_device_id(config)
  672. _LOGGER.info("Deleting device: %s", device_id)
  673. await hass.data[DOMAIN][device_id]["device"].async_stop()
  674. del hass.data[DOMAIN][device_id]["device"]
  675. del hass.data[DOMAIN][device_id]["tuyadevice"]