Просмотр исходного кода

Support for persistent connections.

  - create a loop for reading from the persistent connection.
  - read from that loop in a separate async task
  - register links back to entities from the device so that we can asynchronously inform HA of updates
  - remove the old poll mechanism
  - make some old synchronous methods asynchronous, remove duplicates where there were both
  - use asyncio.sleep instead of a separate Timer thread to send with delay
  - send things without waiting for replies (as the reply will come through the read loop)
  - keep track of what was sent and avoid sending it again while we are keeping it to reflect in state.
  - call the api asynchronously in the retry method to avoid blocking the event thread, and return the return value.
  - use HomeAssistant lifecycle events to avoid delaying startup and shutdown

    Issue #92, #146, #176, #231, #249, #253, #277, #286, #311, #330
Jason Rumney 3 лет назад
Родитель
Сommit
72d8b8311e

+ 2 - 2
custom_components/tuya_local/__init__.py

@@ -20,7 +20,7 @@ from .const import (
     CONF_TYPE,
     DOMAIN,
 )
-from .device import setup_device, delete_device
+from .device import setup_device, async_delete_device
 from .helpers.device_config import get_config
 
 _LOGGER = logging.getLogger(__name__)
@@ -196,7 +196,7 @@ async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry):
     for e in entities:
         await hass.config_entries.async_forward_entry_unload(entry, e)
 
-    delete_device(hass, config)
+    await async_delete_device(hass, config)
     del hass.data[DOMAIN][config[CONF_DEVICE_ID]]
 
     return True

+ 183 - 56
custom_components/tuya_local/device.py

@@ -2,14 +2,20 @@
 API for Tuya Local devices.
 """
 
+import asyncio
 import json
 import logging
 import tinytuya
-from threading import Lock, Timer
+from threading import Lock
 from time import time
 
 
-from homeassistant.const import CONF_HOST, CONF_NAME
+from homeassistant.const import (
+    CONF_HOST,
+    CONF_NAME,
+    EVENT_HOMEASSISTANT_STARTED,
+    EVENT_HOMEASSISTANT_STOP,
+)
 from homeassistant.core import HomeAssistant
 
 from .const import (
@@ -45,6 +51,10 @@ class TuyaLocalDevice(object):
             protocol_version (str | number): The protocol version.
         """
         self._name = name
+        self._children = []
+        self._running = False
+        self._shutdown_listener = None
+        self._startup_listener = None
         self._api_protocol_version_index = None
         self._api_protocol_working = False
         self._api = tinytuya.Device(dev_id, address, local_key)
@@ -62,8 +72,8 @@ class TuyaLocalDevice(object):
         # The solution is to keep a temporary list of changed properties that
         # we can overlay onto the state while we wait for the board to update
         # its switches.
-        self._FAKE_IT_TIL_YOU_MAKE_IT_TIMEOUT = 10
-        self._CACHE_TIMEOUT = 20
+        self._FAKE_IT_TIMEOUT = 5
+        self._CACHE_TIMEOUT = 120
         # More attempts are needed in auto mode so we can cycle through all
         # the possibilities a couple of times
         self._AUTO_CONNECTION_ATTEMPTS = 9
@@ -93,6 +103,115 @@ class TuyaLocalDevice(object):
         """Return True if the device has returned some state."""
         return len(self._get_cached_state()) > 1
 
+    def actually_start(self, event=None):
+        _LOGGER.debug(f"Starting monitor loop for {self.name}")
+        self._running = True
+        self._shutdown_listener = self._hass.bus.async_listen_once(
+            EVENT_HOMEASSISTANT_STOP, self.async_stop
+        )
+        self._refresh_task = self._hass.async_create_task(self.receive_loop())
+
+    def start(self):
+        if self._hass.is_running and not self._hass.is_stopping:
+            if self._startup_listener:
+                self._startup_listener()
+                self._startup_listener = None
+            self.actually_start()
+        else:
+            self._startup_listener = self._hass.bus.async_listen_once(
+                EVENT_HOMEASSISTANT_STARTED, self.actually_start
+            )
+
+    async def async_stop(self, event=None):
+        _LOGGER.debug(f"Stopping monitor loop for {self.name}")
+        self._running = False
+        if self._shutdown_listener:
+            self._shutdown_listener()
+            self._shutdown_listener = None
+        self._children.clear()
+        if self._refresh_task:
+            await self._refresh_task
+        _LOGGER.debug(f"Monitor loop for {self.name} stopped")
+        self._refresh_task = None
+
+    def register_entity(self, entity):
+        self._children.append(entity)
+        if not self._running and not self._startup_listener:
+            self.start()
+
+    async def async_unregister_entity(self, entity):
+        self._children.remove(entity)
+        if not self._children:
+            await self.async_stop()
+
+    async def receive_loop(self):
+        """Coroutine wrapper for async_receive generator."""
+        try:
+            async for poll in self.async_receive():
+                if type(poll) is dict:
+                    _LOGGER.debug(f"{self.name} received {poll}")
+                    self._cached_state = self._cached_state | poll
+                    self._cached_state["updated_at"] = time()
+                    for entity in self._children:
+                        entity.async_schedule_update_ha_state()
+                else:
+                    _LOGGER.debug(f"{self.name} received non data {poll}")
+            _LOGGER.warning(f"{self.name} receive loop has terminated")
+
+        except BaseException as t:
+            _LOGGER.exception(
+                f"{self.name} receive loop terminated by exception {t}",
+            )
+
+    async def async_receive(self):
+        """Receive messages from a persistent connection asynchronously."""
+        self._api.set_socketPersistent(self._running)
+        while self._running:
+            try:
+                last_cache = self._cached_state["updated_at"]
+                now = time()
+                if now - last_cache > self._CACHE_TIMEOUT:
+                    poll = await self._retry_on_failed_connection(
+                        lambda: self._api.status(),
+                        f"Failed to refresh device state for {self.name}",
+                    )
+                else:
+                    await self._hass.async_add_executor_job(
+                        self._api.heartbeat,
+                        True,
+                    )
+                    poll = await self._hass.async_add_executor_job(
+                        self._api.receive,
+                    )
+
+                if poll:
+                    if "Error" in poll:
+                        _LOGGER.warning(
+                            f"{self.name} error reading: {poll['Error']}",
+                        )
+                        if "Payload" in poll and poll["Payload"]:
+                            _LOGGER.info(
+                                f"{self.name} err payload: {poll['Payload']}",
+                            )
+                    else:
+                        if "dps" in poll:
+                            poll = poll["dps"]
+                        yield poll
+
+                await asyncio.sleep(0.1)
+
+            except asyncio.CancelledError:
+                self._running = False
+                # Close the persistent connection when exiting the loop
+                self._api.set_socketPersistent(False)
+                raise
+            except BaseException as t:
+                _LOGGER.exception(
+                    f"{self.name} receive loop error {type(t)}:{t}",
+                )
+        # Close the persistent connection when exiting the loop
+        self._api.set_socketPersistent(False)
+
     async def async_possible_types(self):
         cached_state = self._get_cached_state()
         if len(cached_state) <= 1:
@@ -117,27 +236,16 @@ class TuyaLocalDevice(object):
                 best_match = config
 
         if best_match is None:
-            _LOGGER.warning(f"Detection for {self.name} with dps {cached_state} failed")
+            _LOGGER.warning(
+                f"Detection for {self.name} with dps {cached_state} failed",
+            )
             return None
 
         return best_match.config_type
 
     async def async_refresh(self):
-        cache = self._get_cached_state()
-        if "updated_at" in cache:
-            last_updated = self._get_cached_state()["updated_at"]
-        else:
-            last_updated = 0
-
-        if self._refresh_task is None or time() - last_updated >= self._CACHE_TIMEOUT:
-            self._cached_state["updated_at"] = time()
-            self._refresh_task = self._hass.async_add_executor_job(self.refresh)
-
-        await self._refresh_task
-
-    def refresh(self):
         _LOGGER.debug(f"Refreshing device state for {self.name}.")
-        self._retry_on_failed_connection(
+        await self._retry_on_failed_connection(
             lambda: self._refresh_cached_state(),
             f"Failed to refresh device state for {self.name}.",
         )
@@ -149,19 +257,15 @@ class TuyaLocalDevice(object):
         else:
             return None
 
-    def set_property(self, dps_id, value):
-        self._set_properties({dps_id: value})
-
     async def async_set_property(self, dps_id, value):
-        await self._hass.async_add_executor_job(self.set_property, dps_id, value)
-
-    async def async_set_properties(self, dps_map):
-        await self._hass.async_add_executor_job(self._set_properties, dps_map)
+        await self.async_set_properties({dps_id: value})
 
     def anticipate_property_value(self, dps_id, value):
         """
-        Update a value in the cached state only. This is good for when you know the device will reflect a new state in
-        the next update, but don't want to wait for that update for the device to represent this state.
+        Update a value in the cached state only. This is good for when you
+        know the device will reflect a new state in the next update, but
+        don't want to wait for that update for the device to represent
+        this state.
 
         The anticipated value will be cleared with the next update.
         """
@@ -176,30 +280,36 @@ class TuyaLocalDevice(object):
         new_state = self._api.status()
         self._cached_state = self._cached_state | new_state["dps"]
         self._cached_state["updated_at"] = time()
-        _LOGGER.debug(f"{self.name} refreshed device state: {json.dumps(new_state)}")
         _LOGGER.debug(
-            f"new cache state (including pending properties): {json.dumps(self._get_cached_state())}"
+            f"{self.name} refreshed device state: {json.dumps(new_state)}",
+        )
+        _LOGGER.debug(
+            f"new state (incl pending): {json.dumps(self._get_cached_state())}"
         )
 
-    def _set_properties(self, properties):
+    async def async_set_properties(self, properties):
         if len(properties) == 0:
             return
 
         self._add_properties_to_pending_updates(properties)
-        self._debounce_sending_updates()
+        await self._debounce_sending_updates()
 
     def _add_properties_to_pending_updates(self, properties):
         now = time()
 
         pending_updates = self._get_pending_updates()
         for key, value in properties.items():
-            pending_updates[key] = {"value": value, "updated_at": now}
+            pending_updates[key] = {
+                "value": value,
+                "updated_at": now,
+                "sent": False,
+            }
 
         _LOGGER.debug(
-            f"{self.name} new pending updates: {json.dumps(self._pending_updates)}"
+            f"{self.name} new pending updates: {json.dumps(pending_updates)}",
         )
 
-    def _debounce_sending_updates(self):
+    async def _debounce_sending_updates(self):
         now = time()
         since = now - self._last_connection
         # set this now to avoid a race condition, it will be updated later
@@ -210,52 +320,56 @@ class TuyaLocalDevice(object):
         # same send mechanism.
         waittime = 1 if since < 1.1 else 0.001
 
-        try:
-            self._debounce.cancel()
-        except AttributeError:
-            pass
-        self._debounce = Timer(waittime, self._send_pending_updates)
-        self._debounce.start()
+        await asyncio.sleep(waittime)
+        await self._send_pending_updates()
 
-    def _send_pending_updates(self):
-        pending_properties = self._get_pending_properties()
-        payload = self._api.generate_payload(tinytuya.CONTROL, pending_properties)
+    async def _send_pending_updates(self):
+        pending_properties = self._get_unsent_properties()
+        payload = self._api.generate_payload(
+            tinytuya.CONTROL,
+            pending_properties,
+        )
 
         _LOGGER.debug(
             f"{self.name} sending dps update: {json.dumps(pending_properties)}"
         )
 
-        self._retry_on_failed_connection(
-            lambda: self._send_payload(payload), "Failed to update device state."
+        await self._retry_on_failed_connection(
+            lambda: self._send_payload(payload),
+            "Failed to update device state.",
         )
 
     def _send_payload(self, payload):
         try:
             self._lock.acquire()
-            self._api._send_receive(payload)
+            self._api.send(payload)
             self._cached_state["updated_at"] = 0
             now = time()
             self._last_connection = now
             pending_updates = self._get_pending_updates()
-            for key, value in pending_updates.items():
+            for key in list(pending_updates):
                 pending_updates[key]["updated_at"] = now
+                pending_updates[key]["sent"] = True
         finally:
             self._lock.release()
 
-    def _retry_on_failed_connection(self, func, error_message):
+    async def _retry_on_failed_connection(self, func, error_message):
         if self._api_protocol_version_index is None:
             self._rotate_api_protocol_version()
+        auto = (self._protocol_configured == "auto") and (
+            not self._api_protocol_working
+        )
         connections = (
             self._AUTO_CONNECTION_ATTEMPTS
-            if (self._protocol_configured == "auto" and not self._api_protocol_working)
+            if auto
             else self._SINGLE_PROTO_CONNECTION_ATTEMPTS
         )
 
         for i in range(connections):
             try:
-                func()
+                retval = await self._hass.async_add_executor_job(func)
                 self._api_protocol_working = True
-                break
+                return retval
             except Exception as e:
                 _LOGGER.debug(f"Retrying after exception {e}")
                 if i + 1 == connections:
@@ -270,14 +384,24 @@ class TuyaLocalDevice(object):
         return {**cached_state, **self._get_pending_properties()}
 
     def _get_pending_properties(self):
-        return {key: info["value"] for key, info in self._get_pending_updates().items()}
+        return {
+            key: property["value"]
+            for key, property in self._get_pending_updates().items()
+        }
+
+    def _get_unsent_properties(self):
+        return {
+            key: info["value"]
+            for key, info in self._get_pending_updates().items()
+            if not info["sent"]
+        }
 
     def _get_pending_updates(self):
         now = time()
         self._pending_updates = {
             key: value
             for key, value in self._pending_updates.items()
-            if now - value["updated_at"] < self._FAKE_IT_TIL_YOU_MAKE_IT_TIMEOUT
+            if now - value["updated_at"] < self._FAKE_IT_TIMEOUT
         }
         return self._pending_updates
 
@@ -298,7 +422,9 @@ class TuyaLocalDevice(object):
             self._api_protocol_version_index = 0
 
         new_version = API_PROTOCOL_VERSIONS[self._api_protocol_version_index]
-        _LOGGER.info(f"Setting protocol version for {self.name} to {new_version}.")
+        _LOGGER.info(
+            f"Setting protocol version for {self.name} to {new_version}.",
+        )
         self._api.set_version(new_version)
 
     @staticmethod
@@ -326,6 +452,7 @@ def setup_device(hass: HomeAssistant, config: dict):
     return device
 
 
-def delete_device(hass: HomeAssistant, config: dict):
+async def async_delete_device(hass: HomeAssistant, config: dict):
     _LOGGER.info(f"Deleting device: {config[CONF_DEVICE_ID]}")
+    await hass.data[DOMAIN][config[CONF_DEVICE_ID]]["device"].async_stop()
     del hass.data[DOMAIN][config[CONF_DEVICE_ID]]["device"]

+ 1 - 0
custom_components/tuya_local/diagnostics.py

@@ -64,6 +64,7 @@ def _async_device_as_dict(
         "status": device._api.dps_cache,
         "cached_state": device._cached_state,
         "pending_state": device._pending_updates,
+        "connected": device._running,
     }
 
     device_registry = dr.async_get(hass)

+ 7 - 1
custom_components/tuya_local/helpers/mixin.py

@@ -28,7 +28,7 @@ class TuyaLocalEntity:
 
     @property
     def should_poll(self):
-        return True
+        return False
 
     @property
     def available(self):
@@ -84,6 +84,12 @@ class TuyaLocalEntity:
     async def async_update(self):
         await self._device.async_refresh()
 
+    async def async_added_to_hass(self):
+        self._device.register_entity(self)
+
+    async def async_will_remove_from_hass(self):
+        await self._device.async_unregister_entity(self)
+
 
 UNIT_ASCII_MAP = {
     "C": UnitOfTemperature.CELSIUS,

+ 1 - 1
custom_components/tuya_local/manifest.json

@@ -1,6 +1,6 @@
 {
     "domain": "tuya_local",
-    "iot_class": "local_polling",
+    "iot_class": "local_push",
     "integration_type": "device",
     "name": "Tuya Local",
     "version": "2022.1.1",

+ 1 - 1
tests/devices/base_device_tests.py

@@ -93,7 +93,7 @@ class TuyaDeviceTestCase(IsolatedAsyncioTestCase):
 
     def test_should_poll(self):
         for e in self.entities.values():
-            self.assertTrue(e.should_poll)
+            self.assertFalse(e.should_poll)
 
     def test_available(self):
         for e in self.entities.values():

+ 8 - 0
tests/test_config_flow.py

@@ -26,6 +26,14 @@ def auto_enable_custom_integrations(enable_custom_integrations):
     yield
 
 
+@pytest.fixture(autouse=True)
+def prevent_task_creation():
+    with patch(
+        "custom_components.tuya_local.device.TuyaLocalDevice.register_entity",
+    ):
+        yield
+
+
 @pytest.fixture
 def bypass_setup():
     """Prevent actual setup of the integration after config flow."""

+ 59 - 91
tests/test_device.py

@@ -1,6 +1,6 @@
 import tinytuya
 from datetime import datetime
-from time import sleep, time
+from time import time
 from unittest import IsolatedAsyncioTestCase
 from unittest.mock import AsyncMock, call, patch
 
@@ -75,74 +75,60 @@ class TestDevice(IsolatedAsyncioTestCase):
         self.subject._cached_state = {"2": False, "updated_at": datetime.now()}
         self.assertEqual(await self.subject.async_inferred_type(), None)
 
-    async def test_does_not_refresh_more_often_than_cache_timeout(self):
-        refresh_task = AsyncMock()
-        self.subject._cached_state = {"updated_at": time() - 19}
-        self.subject._refresh_task = awaitable = refresh_task()
-
-        await self.subject.async_refresh()
-
-        refresh_task.assert_awaited()
-        self.assertIs(self.subject._refresh_task, awaitable)
-
     async def test_refreshes_when_there_is_no_pending_reset(self):
         async_job = AsyncMock()
         self.subject._cached_state = {"updated_at": time() - 19}
         self.subject._hass.async_add_executor_job.return_value = awaitable = async_job()
-
         await self.subject.async_refresh()
 
-        self.subject._hass.async_add_executor_job.assert_called_once_with(
-            self.subject.refresh
-        )
-        self.assertIs(self.subject._refresh_task, awaitable)
         async_job.assert_awaited()
 
     async def test_refreshes_when_there_is_expired_pending_reset(self):
         async_job = AsyncMock()
         self.subject._cached_state = {"updated_at": time() - 20}
         self.subject._hass.async_add_executor_job.return_value = awaitable = async_job()
-        self.subject._refresh_task = {}
-
         await self.subject.async_refresh()
 
-        self.subject._hass.async_add_executor_job.assert_called_once_with(
-            self.subject.refresh
-        )
-        self.assertIs(self.subject._refresh_task, awaitable)
         async_job.assert_awaited()
 
-    def test_refresh_reloads_status_from_device(self):
-        self.subject._api.status.return_value = {"dps": {"1": False}}
-        self.subject._cached_state = {"1": True}
+    async def test_refresh_reloads_status_from_device(self):
+        self.subject._hass.async_add_executor_job = AsyncMock()
+        self.subject._hass.async_add_executor_job.return_value = awaitable = {
+            "dps": {"1": False}
+        }
 
-        self.subject.refresh()
+        await self.subject.async_refresh()
 
-        self.subject._api.status.assert_called_once()
-        self.assertEqual(self.subject._cached_state["1"], False)
-        self.assertTrue(
-            time() - 1 <= self.subject._cached_state["updated_at"] <= time()
-        )
+        self.subject._hass.async_add_executor_job.assert_called_once()
 
-    def test_refresh_retries_up_to_four_times(self):
-        self.subject._api.status.side_effect = [
+    async def test_refresh_retries_up_to_nine_times(self):
+        self.subject._hass.async_add_executor_job = AsyncMock()
+        self.subject._hass.async_add_executor_job.side_effect = [
+            Exception("Error"),
+            Exception("Error"),
+            Exception("Error"),
+            Exception("Error"),
+            Exception("Error"),
             Exception("Error"),
             Exception("Error"),
             Exception("Error"),
             {"dps": {"1": False}},
         ]
 
-        self.subject.refresh()
+        await self.subject.async_refresh()
 
-        self.assertEqual(self.subject._api.status.call_count, 4)
-        self.assertEqual(self.subject._cached_state["1"], False)
+        self.assertEqual(self.subject._hass.async_add_executor_job.call_count, 9)
+        # self.assertEqual(self.subject._cached_state["1"], False)
 
-    def test_refresh_clears_cached_state_and_pending_updates_after_failing_nine_times(
+    async def test_refresh_clears_cached_state_and_pending_updates_after_failing_nine_times(
         self,
     ):
         self.subject._cached_state = {"1": True}
-        self.subject._pending_updates = {"1": False}
-        self.subject._api.status.side_effect = [
+        self.subject._pending_updates = {
+            "1": {"value": False, "updated_at": datetime.now(), "sent": True}
+        }
+        self.subject._hass.async_add_executor_job = AsyncMock()
+        self.subject._hass.async_add_executor_job.side_effect = [
             Exception("Error"),
             Exception("Error"),
             Exception("Error"),
@@ -154,16 +140,16 @@ class TestDevice(IsolatedAsyncioTestCase):
             Exception("Error"),
         ]
 
-        self.subject.refresh()
+        await self.subject.async_refresh()
 
-        self.assertEqual(self.subject._api.status.call_count, 9)
+        self.assertEqual(self.subject._hass.async_add_executor_job.call_count, 9)
         self.assertEqual(self.subject._cached_state, {"updated_at": 0})
         self.assertEqual(self.subject._pending_updates, {})
 
-    def test_api_protocol_version_is_rotated_with_each_failure(self):
+    async def test_api_protocol_version_is_rotated_with_each_failure(self):
         self.subject._api.set_version.reset_mock()
-
-        self.subject._api.status.side_effect = [
+        self.subject._hass.async_add_executor_job = AsyncMock()
+        self.subject._hass.async_add_executor_job.side_effect = [
             Exception("Error"),
             Exception("Error"),
             Exception("Error"),
@@ -171,16 +157,16 @@ class TestDevice(IsolatedAsyncioTestCase):
             Exception("Error"),
             Exception("Error"),
         ]
-        self.subject.refresh()
+        await self.subject.async_refresh()
 
         self.subject._api.set_version.assert_has_calls(
             [call(3.1), call(3.2), call(3.4), call(3.3), call(3.1)]
         )
 
-    def test_api_protocol_version_is_stable_once_successful(self):
+    async def test_api_protocol_version_is_stable_once_successful(self):
         self.subject._api.set_version.reset_mock()
-
-        self.subject._api.status.side_effect = [
+        self.subject._hass.async_add_executor_job = AsyncMock()
+        self.subject._hass.async_add_executor_job.side_effect = [
             Exception("Error"),
             Exception("Error"),
             Exception("Error"),
@@ -190,18 +176,20 @@ class TestDevice(IsolatedAsyncioTestCase):
             Exception("Error"),
             {"dps": {"1": False}},
         ]
-        self.subject.refresh()
+
+        await self.subject.async_refresh()
         self.assertEqual(self.subject._api_protocol_version_index, 3)
-        self.subject.refresh()
+        self.assertTrue(self.subject._api_protocol_working)
+        await self.subject.async_refresh()
         self.assertEqual(self.subject._api_protocol_version_index, 3)
-        self.subject.refresh()
+        await self.subject.async_refresh()
         self.assertEqual(self.subject._api_protocol_version_index, 3)
 
         self.subject._api.set_version.assert_has_calls(
             [call(3.1), call(3.2), call(3.4)]
         )
 
-    def test_api_protocol_version_is_not_rotated_when_not_auto(self):
+    async def test_api_protocol_version_is_not_rotated_when_not_auto(self):
         self.subject._protocol_configured = 3.4
         self.subject._api_protocol_version_index = None
         self.subject._api.set_version.reset_mock()
@@ -209,7 +197,8 @@ class TestDevice(IsolatedAsyncioTestCase):
         self.subject._api.set_version.assert_called_once_with(3.4)
         self.subject._api.set_version.reset_mock()
 
-        self.subject._api.status.side_effect = [
+        self.subject._hass.async_add_executor_job = AsyncMock()
+        self.subject._hass.async_add_executor_job.side_effect = [
             Exception("Error"),
             Exception("Error"),
             Exception("Error"),
@@ -224,16 +213,18 @@ class TestDevice(IsolatedAsyncioTestCase):
             Exception("Error"),
             {"dps": {"1": False}},
         ]
-        self.subject.refresh()
+        await self.subject.async_refresh()
         self.assertEqual(self.subject._api_protocol_version_index, 3)
-        self.subject.refresh()
+        await self.subject.async_refresh()
         self.assertEqual(self.subject._api_protocol_version_index, 3)
-        self.subject.refresh()
+        await self.subject.async_refresh()
         self.assertEqual(self.subject._api_protocol_version_index, 3)
 
     def test_reset_cached_state_clears_cached_state_and_pending_updates(self):
         self.subject._cached_state = {"1": True, "updated_at": time()}
-        self.subject._pending_updates = {"1": False}
+        self.subject._pending_updates = {
+            "1": {"value": False, "updated_at": datetime.now(), "sent": True}
+        }
 
         self.subject._reset_cached_state()
 
@@ -246,14 +237,14 @@ class TestDevice(IsolatedAsyncioTestCase):
 
     def test_get_property_returns_pending_update_value(self):
         self.subject._pending_updates = {
-            "1": {"value": False, "updated_at": time() - 9}
+            "1": {"value": False, "updated_at": time() - 4, "sent": True}
         }
         self.assertEqual(self.subject.get_property("1"), False)
 
     def test_pending_update_value_overrides_cached_value(self):
         self.subject._cached_state = {"1": True}
         self.subject._pending_updates = {
-            "1": {"value": False, "updated_at": time() - 9}
+            "1": {"value": False, "updated_at": time() - 4, "sent": True}
         }
 
         self.assertEqual(self.subject.get_property("1"), False)
@@ -261,7 +252,7 @@ class TestDevice(IsolatedAsyncioTestCase):
     def test_expired_pending_update_value_does_not_override_cached_value(self):
         self.subject._cached_state = {"1": True}
         self.subject._pending_updates = {
-            "1": {"value": False, "updated_at": time() - 10}
+            "1": {"value": False, "updated_at": time() - 5, "sent": True}
         }
 
         self.assertEqual(self.subject.get_property("1"), True)
@@ -276,40 +267,17 @@ class TestDevice(IsolatedAsyncioTestCase):
 
         await self.subject.async_set_property("1", False)
 
-        self.subject._hass.async_add_executor_job.assert_called_once_with(
-            self.subject.set_property, "1", False
-        )
+        self.subject._hass.async_add_executor_job.assert_called_once()
         async_job.assert_awaited()
 
-    def test_set_property_immediately_stores_new_value_to_pending_updates(self):
-        self.subject.set_property("1", False)
+    async def test_set_property_immediately_stores_new_value_to_pending_updates(self):
         self.subject._cached_state = {"1": True}
-        self.assertEqual(self.subject.get_property("1"), False)
-        # wait for the debounce timer to avoid a teardown error
-        sleep(2)
-
-    def test_debounces_multiple_set_calls_into_one_api_call(self):
-        with patch("custom_components.tuya_local.device.Timer") as mock:
-            self.subject.set_property("1", True)
-            mock.assert_called_once_with(0.001, self.subject._send_pending_updates)
-
-            debounce = self.subject._debounce
-            mock.reset_mock()
-
-            self.subject.set_property("2", False)
-            debounce.cancel.assert_called_once()
-            mock.assert_called_once_with(1, self.subject._send_pending_updates)
-
-            self.subject._api.generate_payload.return_value = "payload"
-            self.subject._send_pending_updates()
-            self.subject._api.generate_payload.assert_called_once_with(
-                tinytuya.CONTROL, {"1": True, "2": False}
-            )
-            self.subject._api._send_receive.assert_called_once_with("payload")
-
-    def test_set_properties_takes_no_action_when_no_properties_are_provided(self):
-        with patch("custom_components.tuya_local.device.Timer") as mock:
-            self.subject._set_properties({})
+        await self.subject.async_set_property("1", False)
+        self.assertFalse(self.subject.get_property("1"))
+
+    async def test_set_properties_takes_no_action_when_no_properties_are_provided(self):
+        with patch("asyncio.sleep") as mock:
+            await self.subject.async_set_properties({})
             mock.assert_not_called()
 
     def test_anticipate_property_value_updates_cached_state(self):