فهرست منبع

More unit tests for the GoldairTuyaDevice class

Nik Rolls 5 سال پیش
والد
کامیت
e03c08f80d
2فایلهای تغییر یافته به همراه166 افزوده شده و 6 حذف شده
  1. 6 0
      .vscode/tasks.json
  2. 160 6
      tests/test_device.py

+ 6 - 0
.vscode/tasks.json

@@ -30,6 +30,12 @@
       "type": "shell",
       "command": "container set-version",
       "problemMatcher": []
+    },
+    {
+      "label": "Unit tests",
+      "type": "shell",
+      "command": "pytest",
+      "problemMatcher": []
     }
   ]
 }

+ 160 - 6
tests/test_device.py

@@ -1,5 +1,9 @@
+from datetime import datetime, timedelta
+from time import time
 from unittest import IsolatedAsyncioTestCase
-from unittest.mock import patch
+from unittest.mock import AsyncMock, call, patch
+
+import pytest
 
 from custom_components.goldair_climate.const import (
     CONF_TYPE_DEHUMIDIFIER,
@@ -21,11 +25,16 @@ from .const import (
 
 class TestDevice(IsolatedAsyncioTestCase):
     def setUp(self):
-        patcher = patch("pytuya.Device")
-        self.addCleanup(patcher.stop)
-        self.mock_api = patcher.start()
+        device_patcher = patch("pytuya.Device")
+        self.addCleanup(device_patcher.stop)
+        self.mock_api = device_patcher.start()
+
+        hass_patcher = patch("homeassistant.core.HomeAssistant")
+        self.addCleanup(hass_patcher.stop)
+        self.hass = hass_patcher.start()
+
         self.subject = GoldairTuyaDevice(
-            "Some name", "some_dev_id", "some.ip.address", "some_local_key", None
+            "Some name", "some_dev_id", "some.ip.address", "some_local_key", self.hass()
         )
 
     def test_configures_pytuya_correctly(self):
@@ -36,7 +45,7 @@ class TestDevice(IsolatedAsyncioTestCase):
 
     def test_name(self):
         """Returns the name given at instantiation."""
-        self.assertEqual("Some name", self.subject.name)
+        self.assertEqual(self.subject.name, "Some name")
 
     def test_unique_id(self):
         """Returns the unique ID presented by the API class."""
@@ -80,3 +89,148 @@ class TestDevice(IsolatedAsyncioTestCase):
     async def test_detects_fan_payload(self):
         self.subject._cached_state = FAN_PAYLOAD
         self.assertEqual(await self.subject.async_inferred_type(), CONF_TYPE_FAN)
+
+    async def test_does_not_refresh_more_often_than_cache_timeout(self):
+        refresh_task = AsyncMock()
+        self.subject._cached_state = {"updated_at": time() - 19}
+        self.subject._refresh_task = awaitable = refresh_task()
+
+        await self.subject.async_refresh()
+
+        refresh_task.assert_awaited()
+        self.assertIs(self.subject._refresh_task, awaitable)
+
+    async def test_refreshes_when_there_is_no_pending_reset(self):
+        async_job = AsyncMock()
+        self.subject._cached_state = {"updated_at": time() - 19}
+        self.subject._hass.async_add_executor_job.return_value = awaitable = async_job()
+
+        await self.subject.async_refresh()
+
+        self.subject._hass.async_add_executor_job.assert_called_once_with(
+            self.subject.refresh
+        )
+        self.assertIs(self.subject._refresh_task, awaitable)
+        async_job.assert_awaited()
+
+    async def test_refreshes_when_there_is_expired_pending_reset(self):
+        async_job = AsyncMock()
+        self.subject._cached_state = {"updated_at": time() - 20}
+        self.subject._hass.async_add_executor_job.return_value = awaitable = async_job()
+        self.subject._refresh_task = {}
+
+        await self.subject.async_refresh()
+
+        self.subject._hass.async_add_executor_job.assert_called_once_with(
+            self.subject.refresh
+        )
+        self.assertIs(self.subject._refresh_task, awaitable)
+        async_job.assert_awaited()
+
+    def test_refresh_reloads_status_from_device(self):
+        self.subject._api.status.return_value = {"dps": {1: False}}
+        self.subject._cached_state = {1: True}
+
+        self.subject.refresh()
+
+        self.subject._api.status.assert_called_once()
+        self.assertEqual(self.subject._cached_state[1], False)
+        self.assertTrue(
+            time() - 1 <= self.subject._cached_state["updated_at"] <= time()
+        )
+
+    def test_refresh_retries_up_to_four_times(self):
+        self.subject._api.status.side_effect = [
+            Exception("Error"),
+            Exception("Error"),
+            Exception("Error"),
+            {"dps": {1: False}},
+        ]
+
+        self.subject.refresh()
+
+        self.assertEqual(self.subject._api.status.call_count, 4)
+        self.assertEqual(self.subject._cached_state[1], False)
+
+    def test_refresh_clears_cached_state_and_pending_updates_after_failing_four_times(
+        self,
+    ):
+        self.subject._cached_state = {1: True}
+        self.subject._pending_updates = {1: False}
+        self.subject._api.status.side_effect = [
+            Exception("Error"),
+            Exception("Error"),
+            Exception("Error"),
+            Exception("Error"),
+        ]
+
+        self.subject.refresh()
+
+        self.assertEqual(self.subject._api.status.call_count, 4)
+        self.assertEqual(self.subject._cached_state, {"updated_at": 0})
+        self.assertEqual(self.subject._pending_updates, {})
+
+    def test_api_protocol_version_is_rotated_with_each_failure(self):
+        self.subject._api.set_version.assert_called_once_with(3.3)
+        self.subject._api.set_version.reset_mock()
+
+        self.subject._api.status.side_effect = [
+            Exception("Error"),
+            Exception("Error"),
+            Exception("Error"),
+            Exception("Error"),
+        ]
+        self.subject.refresh()
+
+        self.subject._api.set_version.assert_has_calls(
+            [call(3.1), call(3.3), call(3.1)]
+        )
+
+    def test_reset_cached_state_clears_cached_state_and_pending_updates(self):
+        self.subject._cached_state = {1: True, "updated_at": time()}
+        self.subject._pending_updates = {1: False}
+
+        self.subject._reset_cached_state()
+
+        self.assertEqual(self.subject._cached_state, {"updated_at": 0})
+        self.assertEqual(self.subject._pending_updates, {})
+
+    def test_get_property_returns_value_from_cached_state(self):
+        self.subject._cached_state = {1: True}
+        self.assertEqual(self.subject.get_property(1), True)
+
+    def test_get_property_returns_pending_update_value(self):
+        self.subject._pending_updates = {1: {"value": False, "updated_at": time() - 9}}
+        self.assertEqual(self.subject.get_property(1), False)
+
+    def test_pending_update_value_overrides_cached_value(self):
+        self.subject._cached_state = {1: True}
+        self.subject._pending_updates = {1: {"value": False, "updated_at": time() - 9}}
+
+        self.assertEqual(self.subject.get_property(1), False)
+
+    def test_expired_pending_update_value_does_not_override_cached_value(self):
+        self.subject._cached_state = {1: True}
+        self.subject._pending_updates = {1: {"value": False, "updated_at": time() - 10}}
+
+        self.assertEqual(self.subject.get_property(1), True)
+
+    def test_get_property_returns_none_when_value_does_not_exist(self):
+        self.subject._cached_state = {1: True}
+        self.assertIs(self.subject.get_property(2), None)
+
+    async def test_async_set_property_schedules_job(self):
+        async_job = AsyncMock()
+        self.subject._hass.async_add_executor_job.return_value = awaitable = async_job()
+
+        await self.subject.async_set_property(1, False)
+
+        self.subject._hass.async_add_executor_job.assert_called_once_with(
+            self.subject.set_property, 1, False
+        )
+        async_job.assert_awaited()
+
+    def test_set_property_immediately_stores_new_value_to_pending_updates(self):
+        self.subject.set_property(1, False)
+        self.subject._cached_state = {1: True}
+        self.assertEqual(self.subject.get_property(1), False)