device.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825
  1. """
  2. API for Tuya Local devices.
  3. """
  4. import asyncio
  5. import logging
  6. from asyncio.exceptions import CancelledError
  7. from threading import Lock
  8. from time import time
  9. import tinytuya
  10. from homeassistant.const import (
  11. CONF_HOST,
  12. CONF_NAME,
  13. EVENT_HOMEASSISTANT_STARTED,
  14. EVENT_HOMEASSISTANT_STOP,
  15. )
  16. from homeassistant.core import HomeAssistant, callback
  17. from .const import (
  18. API_PROTOCOL_VERSIONS,
  19. CONF_DEVICE_CID,
  20. CONF_DEVICE_ID,
  21. CONF_LOCAL_KEY,
  22. CONF_MANUFACTURER,
  23. CONF_MODEL,
  24. CONF_POLL_ONLY,
  25. CONF_PROTOCOL_VERSION,
  26. DOMAIN,
  27. )
  28. from .helpers.config import get_device_id
  29. from .helpers.device_config import possible_matches
  30. from .helpers.log import log_json
  31. _LOGGER = logging.getLogger(__name__)
  32. def _collect_possible_matches(cached_state, product_ids):
  33. """Collect possible matches from generator into an array."""
  34. return list(possible_matches(cached_state, product_ids))
  35. class TuyaLocalDevice(object):
  36. def __init__(
  37. self,
  38. name,
  39. dev_id,
  40. address,
  41. local_key,
  42. protocol_version,
  43. dev_cid,
  44. hass: HomeAssistant,
  45. poll_only=False,
  46. manufacturer=None,
  47. model=None,
  48. ):
  49. """
  50. Represents a Tuya-based device.
  51. Args:
  52. name (str): The device name.
  53. dev_id (str): The device id.
  54. address (str): The network address.
  55. local_key (str): The encryption key.
  56. protocol_version (str | number): The protocol version.
  57. dev_cid (str): The sub device id.
  58. hass (HomeAssistant): The Home Assistant instance.
  59. poll_only (bool): True if the device should be polled only.
  60. manufacturer (str | None): The device manufacturer, if known.
  61. model (str | None): The device model, if known.
  62. """
  63. self._name = name
  64. self._manufacturer = manufacturer
  65. self._model = model
  66. self._children = []
  67. self._force_dps = []
  68. self._product_ids = []
  69. self._running = False
  70. self._shutdown_listener = None
  71. self._startup_listener = None
  72. self._api_protocol_version_index = None
  73. self._api_protocol_working = False
  74. self._api_working_protocol_failures = 0
  75. self.dev_cid = dev_cid
  76. try:
  77. if dev_cid:
  78. if hass.data[DOMAIN].get(dev_id) and name != "Test":
  79. parent = hass.data[DOMAIN][dev_id]["tuyadevice"]
  80. parent_lock = hass.data[DOMAIN][dev_id].get(
  81. "tuyadevicelock", asyncio.Lock()
  82. )
  83. else:
  84. parent = tinytuya.Device(dev_id, address, local_key)
  85. parent_lock = asyncio.Lock()
  86. if name != "Test":
  87. hass.data[DOMAIN][dev_id] = {
  88. "tuyadevice": parent,
  89. "tuyadevicelock": parent_lock,
  90. }
  91. self._api = tinytuya.Device(
  92. dev_cid,
  93. cid=dev_cid,
  94. parent=parent,
  95. )
  96. self._api_lock = parent_lock
  97. else:
  98. if hass.data[DOMAIN].get(dev_id) and name != "Test":
  99. self._api = hass.data[DOMAIN][dev_id]["tuyadevice"]
  100. self._api_lock = hass.data[DOMAIN][dev_id].get(
  101. "tuyadevicelock", asyncio.Lock()
  102. )
  103. else:
  104. self._api = tinytuya.Device(dev_id, address, local_key)
  105. self._api_lock = asyncio.Lock()
  106. if name != "Test":
  107. hass.data[DOMAIN][dev_id] = {
  108. "tuyadevice": self._api,
  109. "tuyadevicelock": self._api_lock,
  110. }
  111. except Exception as e:
  112. _LOGGER.error(
  113. "%s: %s while initialising device %s",
  114. type(e).__name__,
  115. e,
  116. dev_id,
  117. )
  118. raise e
  119. # we handle retries at a higher level so we can rotate protocol version
  120. self._api.set_socketRetryLimit(1)
  121. if self._api.parent:
  122. # Retries cause problems for other children of the parent device
  123. self._api.parent.set_socketRetryLimit(1)
  124. self._refresh_task = None
  125. self._protocol_configured = protocol_version
  126. self._poll_only = poll_only
  127. self._temporary_poll = False
  128. self._reset_cached_state()
  129. self._hass = hass
  130. # API calls to update Tuya devices are asynchronous and non-blocking.
  131. # This means you can send a change and immediately request an updated
  132. # state (like HA does), but because it has not yet finished processing
  133. # you will be returned the old state.
  134. # The solution is to keep a temporary list of changed properties that
  135. # we can overlay onto the state while we wait for the board to update
  136. # its switches.
  137. self._FAKE_IT_TIMEOUT = 5
  138. self._CACHE_TIMEOUT = 30
  139. self._HEARTBEAT_INTERVAL = 10
  140. # More attempts are needed in auto mode so we can cycle through all
  141. # the possibilities a couple of times
  142. self._AUTO_CONNECTION_ATTEMPTS = len(API_PROTOCOL_VERSIONS) * 2 + 1
  143. self._SINGLE_PROTO_CONNECTION_ATTEMPTS = 3
  144. # The number of failures from a working protocol before retrying other protocols.
  145. self._AUTO_FAILURE_RESET_COUNT = 10
  146. self._lock = Lock()
  147. # Serialises the read-modify-write window for entities that share a
  148. # packed dp (e.g. multiple sub-entity lights writing to dp 51 with
  149. # different masks). Without this, two concurrent async_turn_on calls
  150. # both read the same baseline before either updates pending, then the
  151. # second's pending update clobbers the first.
  152. self.set_lock = asyncio.Lock()
  153. @property
  154. def name(self):
  155. return self._name
  156. @property
  157. def unique_id(self):
  158. """Return the unique id for this device (the dev_id or dev_cid)."""
  159. return self.dev_cid or self._api.id
  160. @property
  161. def device_info(self):
  162. """Return the device information for this device."""
  163. info = {
  164. "identifiers": {(DOMAIN, self.unique_id)},
  165. "name": self.name,
  166. "manufacturer": self._manufacturer or "Tuya",
  167. }
  168. if self._model:
  169. info["model"] = self._model
  170. return info
  171. @property
  172. def has_returned_state(self):
  173. """Return True if the device has returned some state."""
  174. cached = self._get_cached_state()
  175. return len(cached) > 1 or cached.get("updated_at", 0) > 0
  176. @callback
  177. def actually_start(self, event=None):
  178. _LOGGER.debug("Starting monitor loop for %s", self.name)
  179. self._running = True
  180. self._shutdown_listener = self._hass.bus.async_listen_once(
  181. EVENT_HOMEASSISTANT_STOP, self.async_stop
  182. )
  183. if not self._refresh_task:
  184. self._refresh_task = self._hass.async_create_task(self.receive_loop())
  185. def start(self):
  186. if self._hass.is_stopping:
  187. return
  188. elif self._hass.is_running:
  189. if self._startup_listener:
  190. self._startup_listener()
  191. self._startup_listener = None
  192. self.actually_start()
  193. else:
  194. self._startup_listener = self._hass.bus.async_listen_once(
  195. EVENT_HOMEASSISTANT_STARTED, self.actually_start
  196. )
  197. async def async_stop(self, event=None):
  198. _LOGGER.debug("Stopping monitor loop for %s", self.name)
  199. self._running = False
  200. self._children.clear()
  201. self._force_dps.clear()
  202. if self._refresh_task:
  203. self._api.set_socketPersistent(False)
  204. if self._api.parent:
  205. self._api.parent.set_socketPersistent(False)
  206. await self._refresh_task
  207. _LOGGER.debug("Monitor loop for %s stopped", self.name)
  208. self._refresh_task = None
  209. def register_entity(self, entity):
  210. # If this is the first child entity to register, and HA is still
  211. # starting, refresh the device state so it shows as available without
  212. # waiting for startup to complete.
  213. should_poll = len(self._children) == 0 and not self._hass.is_running
  214. self._children.append(entity)
  215. for dp in entity._config.dps():
  216. if dp.force and dp.id not in self._force_dps:
  217. self._force_dps.append(int(dp.id))
  218. if not self._running and not self._startup_listener:
  219. self.start()
  220. if self.has_returned_state:
  221. entity.async_schedule_update_ha_state()
  222. elif should_poll:
  223. entity.async_schedule_update_ha_state(True)
  224. async def async_unregister_entity(self, entity):
  225. self._children.remove(entity)
  226. if not self._children:
  227. try:
  228. await self.async_stop()
  229. except CancelledError:
  230. pass
  231. async def receive_loop(self):
  232. """Coroutine wrapper for async_receive generator."""
  233. try:
  234. async for poll in self.async_receive():
  235. if isinstance(poll, dict):
  236. _LOGGER.debug(
  237. "%s received %s",
  238. self.name,
  239. log_json(poll),
  240. )
  241. full_poll = poll.pop("full_poll", False)
  242. self._cached_state = self._cached_state | poll
  243. self._cached_state["updated_at"] = time()
  244. self._remove_properties_from_pending_updates(poll)
  245. for entity in self._children:
  246. # let entities trigger off poll contents directly
  247. try:
  248. entity.on_receive(poll, full_poll)
  249. except Exception as e:
  250. # Don't let exceptions thrown by the entities interrupt the communication loop
  251. # Just log them and move on.
  252. _LOGGER.exception(
  253. "%s on_receive error for entity %s: %s",
  254. self.name,
  255. entity.entity_id,
  256. e,
  257. )
  258. # clear non-persistant dps that were not in a full poll
  259. if full_poll:
  260. for dp in entity._config.dps():
  261. if not dp.persist and dp.id not in poll:
  262. self._cached_state.pop(dp.id, None)
  263. entity.schedule_update_ha_state()
  264. else:
  265. _LOGGER.debug(
  266. "%s received non data %s",
  267. self.name,
  268. log_json(poll),
  269. )
  270. _LOGGER.warning("%s receive loop has terminated", self.name)
  271. except Exception as t:
  272. _LOGGER.exception(
  273. "%s receive loop terminated by exception %s", self.name, t
  274. )
  275. self._api.set_socketPersistent(False)
  276. if self._api.parent:
  277. self._api.parent.set_socketPersistent(False)
  278. @property
  279. def should_poll(self):
  280. return self._poll_only or self._temporary_poll or not self.has_returned_state
  281. def pause(self):
  282. self._temporary_poll = True
  283. self._api.set_socketPersistent(False)
  284. if self._api.parent:
  285. self._api.parent.set_socketPersistent(False)
  286. def resume(self):
  287. self._temporary_poll = False
  288. async def async_receive(self):
  289. """Receive messages from a persistent connection asynchronously."""
  290. # If we didn't yet get any state from the device, we may need to
  291. # negotiate the protocol before making the connection persistent
  292. persist = not self.should_poll
  293. # flag to alternate updatedps and status calls to ensure we get
  294. # all dps updated
  295. dps_updated = False
  296. self._api.set_socketPersistent(persist)
  297. if self._api.parent:
  298. self._api.parent.set_socketPersistent(persist)
  299. last_heartbeat = self._cached_state.get("updated_at", 0)
  300. while self._running:
  301. error_count = self._api_working_protocol_failures
  302. force_backoff = False
  303. try:
  304. await self._api_lock.acquire()
  305. last_cache = self._cached_state.get("updated_at", 0)
  306. now = time()
  307. full_poll = False
  308. if persist == self.should_poll:
  309. # use persistent connections after initial communication
  310. # has been established. Until then, we need to rotate
  311. # the protocol version, which seems to require a fresh
  312. # connection.
  313. persist = not self.should_poll
  314. _LOGGER.debug(
  315. "%s persistant connection set to %s", self.name, persist
  316. )
  317. self._api.set_socketPersistent(persist)
  318. if self._api.parent:
  319. self._api.parent.set_socketPersistent(persist)
  320. self._last_full_poll = 0 # ensure we start with a full poll
  321. needs_full_poll = now - self._last_full_poll > self._CACHE_TIMEOUT
  322. if now - last_cache > self._CACHE_TIMEOUT or (
  323. persist and needs_full_poll
  324. ):
  325. if (
  326. self._force_dps
  327. and not dps_updated
  328. and self._api_protocol_working
  329. ):
  330. poll = await self._retry_on_failed_connection(
  331. lambda: self._api.updatedps(self._force_dps),
  332. f"Failed to update device dps for {self.name}",
  333. )
  334. dps_updated = True
  335. else:
  336. poll = await self._retry_on_failed_connection(
  337. lambda: self._api.status(),
  338. f"Failed to fetch device status for {self.name}",
  339. )
  340. dps_updated = False
  341. full_poll = True
  342. self._last_full_poll = now
  343. last_heartbeat = now # reset heartbeat timer on full poll
  344. elif persist:
  345. if now - last_heartbeat > self._HEARTBEAT_INTERVAL:
  346. await self._hass.async_add_executor_job(
  347. self._api.heartbeat,
  348. True,
  349. )
  350. last_heartbeat = now
  351. poll = await self._hass.async_add_executor_job(
  352. self._api.receive,
  353. )
  354. # Ignore Payload error 904, as 3.4 protocol devices seem to return
  355. # this when there is no new data, instead of just returning nothing.
  356. if poll and "Err" in poll and poll["Err"] == "904":
  357. poll = None
  358. else:
  359. force_backoff = True
  360. poll = None
  361. if poll:
  362. if "Error" in poll:
  363. # increment the error count if not done already
  364. if error_count == self._api_working_protocol_failures:
  365. self._api_working_protocol_failures += 1
  366. if self._api_working_protocol_failures == 1:
  367. _LOGGER.warning(
  368. "%s error reading: %s", self.name, poll["Error"]
  369. )
  370. else:
  371. _LOGGER.debug(
  372. "%s error reading: %s", self.name, poll["Error"]
  373. )
  374. if "Payload" in poll and poll["Payload"]:
  375. _LOGGER.debug(
  376. "%s err payload: %s",
  377. self.name,
  378. poll["Payload"],
  379. )
  380. else:
  381. if "dps" in poll:
  382. poll = poll["dps"]
  383. if isinstance(poll, dict):
  384. poll["full_poll"] = full_poll
  385. yield poll
  386. except CancelledError:
  387. self._running = False
  388. # Close the persistent connection when exiting the loop
  389. persist = False
  390. self._api.set_socketPersistent(False)
  391. if self._api.parent:
  392. self._api.parent.set_socketPersistent(False)
  393. raise
  394. except Exception as t:
  395. _LOGGER.exception(
  396. "%s receive loop error %s:%s",
  397. self.name,
  398. type(t).__name__,
  399. t,
  400. )
  401. persist = False
  402. self._api.set_socketPersistent(False)
  403. if self._api.parent:
  404. self._api.parent.set_socketPersistent(False)
  405. force_backoff = True
  406. finally:
  407. if self._api_lock.locked():
  408. self._api_lock.release()
  409. if not self.has_returned_state:
  410. force_backoff = True
  411. await asyncio.sleep(5 if force_backoff else 0.1)
  412. # Close the persistent connection when exiting the loop
  413. self._api.set_socketPersistent(False)
  414. if self._api.parent:
  415. self._api.parent.set_socketPersistent(False)
  416. def set_detected_product_id(self, product_id):
  417. self._product_ids.append(product_id)
  418. async def async_possible_types(self):
  419. cached_state = self._get_cached_state()
  420. if len(cached_state) <= 1:
  421. # in case of device22 devices, we need to poll them with a dp
  422. # that exists on the device to get anything back. Most switch-like
  423. # devices have dp 1. Lights generally start from 20. 101 is where
  424. # vendor specific dps start. Between them, these three should cover
  425. # most devices. 148 covers a doorbell device that didn't have these
  426. # 201 covers remote controllers and 2 and 9 cover others without 1
  427. self._api.set_dpsUsed(
  428. {
  429. "1": None,
  430. "2": None,
  431. "9": None,
  432. "20": None,
  433. "60": None,
  434. "101": None,
  435. "148": None,
  436. "201": None,
  437. }
  438. )
  439. await self.async_refresh()
  440. cached_state = self._get_cached_state()
  441. return await self._hass.async_add_executor_job(
  442. _collect_possible_matches,
  443. cached_state,
  444. self._product_ids,
  445. )
  446. async def async_inferred_type(self):
  447. best_match = None
  448. best_quality = 0
  449. cached_state = self._get_cached_state()
  450. possible = await self.async_possible_types()
  451. for config in possible:
  452. quality = config.match_quality(cached_state, self._product_ids)
  453. _LOGGER.info(
  454. "%s considering %s with quality %s",
  455. self.name,
  456. config.name,
  457. quality,
  458. )
  459. if quality > best_quality:
  460. best_quality = quality
  461. best_match = config
  462. if best_match:
  463. return best_match.config_type
  464. _LOGGER.warning(
  465. "Detection for %s with dps %s failed",
  466. self.name,
  467. log_json(cached_state),
  468. )
  469. async def async_refresh(self):
  470. _LOGGER.debug("Refreshing device state for %s", self.name)
  471. if not self._running:
  472. await self._retry_on_failed_connection(
  473. lambda: self._refresh_cached_state(),
  474. f"Failed to refresh device state for {self.name}.",
  475. )
  476. def get_property(self, dps_id):
  477. cached_state = self._get_cached_state()
  478. return cached_state.get(dps_id)
  479. async def async_set_property(self, dps_id, value):
  480. await self.async_set_properties({dps_id: value})
  481. def anticipate_property_value(self, dps_id, value):
  482. """
  483. Update a value in the cached state only. This is good for when you
  484. know the device will reflect a new state in the next update, but
  485. don't want to wait for that update for the device to represent
  486. this state.
  487. The anticipated value will be cleared with the next update.
  488. """
  489. self._cached_state[dps_id] = value
  490. def _reset_cached_state(self):
  491. self._cached_state = {"updated_at": 0}
  492. self._pending_updates = {}
  493. self._last_connection = 0
  494. self._last_full_poll = 0
  495. def _refresh_cached_state(self):
  496. new_state = self._api.status()
  497. if new_state:
  498. if "Err" not in new_state:
  499. self._cached_state = self._cached_state | new_state.get("dps", {})
  500. self._cached_state["updated_at"] = time()
  501. for entity in self._children:
  502. for dp in entity._config.dps():
  503. # Clear non-persistant dps that were not in the poll
  504. if not dp.persist and dp.id not in new_state.get("dps", {}):
  505. self._cached_state.pop(dp.id, None)
  506. entity.schedule_update_ha_state()
  507. elif self._api_working_protocol_failures == 1:
  508. _LOGGER.warning(
  509. "%s protocol error %s: %s",
  510. self.name,
  511. new_state.get("Err"),
  512. new_state.get("Error", "message not provided"),
  513. )
  514. else:
  515. _LOGGER.debug(
  516. "%s protocol error %s: %s",
  517. self.name,
  518. new_state.get("Err"),
  519. new_state.get("Error", "message not provided"),
  520. )
  521. _LOGGER.debug(
  522. "%s refreshed device state: %s",
  523. self.name,
  524. log_json(new_state),
  525. )
  526. _LOGGER.debug(
  527. "new state (incl pending): %s",
  528. log_json(self._get_cached_state()),
  529. )
  530. return new_state
  531. async def async_set_properties(self, properties):
  532. if len(properties) == 0:
  533. return
  534. self._add_properties_to_pending_updates(properties)
  535. await self._debounce_sending_updates()
  536. def _add_properties_to_pending_updates(self, properties):
  537. now = time()
  538. pending_updates = self._get_pending_updates()
  539. for key, value in properties.items():
  540. pending_updates[key] = {
  541. "value": value,
  542. "updated_at": now,
  543. "sent": False,
  544. }
  545. _LOGGER.debug(
  546. "%s new pending updates: %s",
  547. self.name,
  548. log_json(pending_updates),
  549. )
  550. def _remove_properties_from_pending_updates(self, data):
  551. self._pending_updates = {
  552. key: value
  553. for key, value in self._pending_updates.items()
  554. if key not in data or not value["sent"] or data[key] != value["value"]
  555. }
  556. async def _debounce_sending_updates(self):
  557. now = time()
  558. since = now - self._last_connection
  559. # set this now to avoid a race condition, it will be updated later
  560. # when the data is actally sent
  561. self._last_connection = now
  562. # Only delay a second if there was recently another command.
  563. # Otherwise delay 1ms, to keep things simple by reusing the
  564. # same send mechanism.
  565. waittime = 1 if since < 1.1 and self.should_poll else 0.001
  566. await asyncio.sleep(waittime)
  567. await self._send_pending_updates()
  568. async def _send_pending_updates(self):
  569. pending_properties = self._get_unsent_properties()
  570. _LOGGER.debug(
  571. "%s sending dps update: %s",
  572. self.name,
  573. log_json(pending_properties),
  574. )
  575. await self._retry_on_failed_connection(
  576. lambda: self._set_values(pending_properties),
  577. "Failed to update device state.",
  578. )
  579. def _set_values(self, properties):
  580. try:
  581. self._lock.acquire()
  582. self._api.set_multiple_values(properties, nowait=True)
  583. self._cached_state["updated_at"] = 0
  584. now = time()
  585. self._last_connection = now
  586. pending_updates = self._get_pending_updates()
  587. for key in properties.keys():
  588. pending_updates[key]["updated_at"] = now
  589. pending_updates[key]["sent"] = True
  590. finally:
  591. self._lock.release()
  592. async def _retry_on_failed_connection(self, func, error_message):
  593. if self._api_protocol_version_index is None:
  594. await self._rotate_api_protocol_version()
  595. auto = (self._protocol_configured == "auto") and (
  596. not self._api_protocol_working
  597. )
  598. connections = (
  599. self._AUTO_CONNECTION_ATTEMPTS
  600. if auto
  601. else self._SINGLE_PROTO_CONNECTION_ATTEMPTS
  602. )
  603. last_err_code = None
  604. for i in range(connections):
  605. try:
  606. if not self._hass.is_stopping:
  607. retval = await self._hass.async_add_executor_job(func)
  608. if isinstance(retval, dict) and "Error" in retval:
  609. last_err_code = retval.get("Err")
  610. if last_err_code == "900":
  611. # Some devices (e.g. IR/RF remotes) never return
  612. # status data; error 900 is their normal response
  613. # to a status query. Treat as reachable with no
  614. # data so commands can still be sent.
  615. self._cached_state["updated_at"] = time()
  616. retval = None
  617. else:
  618. raise AttributeError(retval["Error"])
  619. self._api_protocol_working = True
  620. self._api_working_protocol_failures = 0
  621. return retval
  622. except Exception as e:
  623. _LOGGER.debug(
  624. "Retrying after exception %s %s (%d/%d)",
  625. type(e).__name__,
  626. e,
  627. i,
  628. connections,
  629. )
  630. # Ensure we have a fresh connection for the next attempt
  631. self._api.set_socketPersistent(False)
  632. if self._api.parent:
  633. self._api.parent.set_socketPersistent(False)
  634. if i + 1 == connections:
  635. self._reset_cached_state()
  636. self._api_working_protocol_failures += 1
  637. if (
  638. self._api_working_protocol_failures
  639. > self._AUTO_FAILURE_RESET_COUNT
  640. ):
  641. self._api_protocol_working = False
  642. for entity in self._children:
  643. entity.async_schedule_update_ha_state()
  644. if self._api_working_protocol_failures == 1 and not (
  645. last_err_code == "914" and self._protocol_configured == "auto"
  646. ):
  647. _LOGGER.error(error_message)
  648. else:
  649. _LOGGER.debug(error_message)
  650. if not self._api_protocol_working:
  651. await self._rotate_api_protocol_version()
  652. def _get_cached_state(self):
  653. cached_state = self._cached_state.copy()
  654. return {**cached_state, **self._get_pending_properties()}
  655. def _get_pending_properties(self):
  656. return {key: prop["value"] for key, prop in self._get_pending_updates().items()}
  657. def _get_unsent_properties(self):
  658. return {
  659. key: info["value"]
  660. for key, info in self._get_pending_updates().items()
  661. if not info["sent"]
  662. }
  663. def _get_pending_updates(self):
  664. now = time()
  665. # sort pending updates according to their API identifier
  666. pending_updates_sorted = sorted(
  667. self._pending_updates.items(), key=lambda x: int(x[0])
  668. )
  669. self._pending_updates = {
  670. key: value
  671. for key, value in pending_updates_sorted
  672. if not value["sent"]
  673. or now - value.get("updated_at", 0) < self._FAKE_IT_TIMEOUT
  674. }
  675. return self._pending_updates
  676. async def _rotate_api_protocol_version(self):
  677. if self._api_protocol_version_index is None:
  678. try:
  679. self._api_protocol_version_index = API_PROTOCOL_VERSIONS.index(
  680. self._protocol_configured
  681. )
  682. except ValueError:
  683. self._api_protocol_version_index = 0
  684. # only rotate if configured as auto
  685. elif self._protocol_configured == "auto":
  686. self._api_protocol_version_index += 1
  687. if self._api_protocol_version_index >= len(API_PROTOCOL_VERSIONS):
  688. self._api_protocol_version_index = 0
  689. new_version = API_PROTOCOL_VERSIONS[self._api_protocol_version_index]
  690. _LOGGER.debug(
  691. "Setting protocol version for %s to %s",
  692. self.name,
  693. new_version,
  694. )
  695. # Only enable tinytuya's auto-detect when using 3.22
  696. if new_version == 3.22:
  697. new_version = 3.3
  698. self._api.disabledetect = False
  699. else:
  700. self._api.disabledetect = True
  701. await self._hass.async_add_executor_job(
  702. self._api.set_version,
  703. new_version,
  704. )
  705. if self._api.parent:
  706. await self._hass.async_add_executor_job(
  707. self._api.parent.set_version,
  708. new_version,
  709. )
  710. @staticmethod
  711. def get_key_for_value(obj, value, fallback=None):
  712. keys = list(obj.keys())
  713. values = list(obj.values())
  714. return keys[values.index(value)] if value in values else fallback
  715. def setup_device(hass: HomeAssistant, config: dict):
  716. """Setup a tuya device based on passed in config."""
  717. _LOGGER.info("Creating device: %s", get_device_id(config))
  718. hass.data[DOMAIN] = hass.data.get(DOMAIN, {})
  719. device = TuyaLocalDevice(
  720. config[CONF_NAME],
  721. config[CONF_DEVICE_ID],
  722. config[CONF_HOST],
  723. config[CONF_LOCAL_KEY],
  724. config[CONF_PROTOCOL_VERSION],
  725. config.get(CONF_DEVICE_CID),
  726. hass,
  727. config[CONF_POLL_ONLY],
  728. manufacturer=config.get(CONF_MANUFACTURER),
  729. model=config.get(CONF_MODEL),
  730. )
  731. hass.data[DOMAIN][get_device_id(config)] = {
  732. "device": device,
  733. "tuyadevice": device._api,
  734. "tuyadevicelock": device._api_lock,
  735. }
  736. return device
  737. async def async_delete_device(hass: HomeAssistant, config: dict):
  738. device_id = get_device_id(config)
  739. _LOGGER.info("Deleting device: %s", device_id)
  740. await hass.data[DOMAIN][device_id]["device"].async_stop()
  741. del hass.data[DOMAIN][device_id]["device"]
  742. del hass.data[DOMAIN][device_id]["tuyadevice"]
  743. del hass.data[DOMAIN][device_id]["tuyadevicelock"]