Bladeren bron

Add a test for async_receive generator.

This should improve test coverage to above the quality threshold for recently
added code, as it will exercise most/all of the biggest function added.
The wrapper async task that calls this is still untested, but that is
relatively small. There are also a couple of optional listener callbacks
untested, as they have the side effect of starting that task.
Jason Rumney 3 jaren geleden
bovenliggende
commit
5603147bac
1 gewijzigde bestanden met toevoegingen van 50 en 0 verwijderingen
  1. 50 0
      tests/test_device.py

+ 50 - 0
tests/test_device.py

@@ -538,3 +538,53 @@ class TestDevice(IsolatedAsyncioTestCase):
         self.assertEqual(self.subject._children, [])
         # Was the loop stopped?
         self.subject.async_stop.assert_called_once()
+
+    async def test_async_receive(self):
+        # Set up preconditions
+        status = AsyncMock(
+            name="status",
+            return_value={"dps": {"1": "INIT", "2": 2}},
+        )
+        self.mock_api().status = status
+        heartbeat = AsyncMock(name="heartbeat")
+        self.mock_api().heartbeat = heartbeat
+        receive = AsyncMock(name="receive", return_value={"1": "UPDATE"})
+        self.mock_api().receive = receive
+        self.subject._running = True
+        self.subject._cached_state = {"updated_at": 0}
+        async_add_executor_job = AsyncMock(
+            name="async_add_executor_job",
+            side_effect=lambda func, *args: func(*args),
+        )
+        self.hass().async_add_executor_job = async_add_executor_job
+
+        # Call the function under test
+        loop = self.subject.async_receive()
+        result = await loop.__anext__()
+
+        # Check that the loop was started
+        self.mock_api().set_socketPersistent.assert_called_once_with(True)
+        # Check that a full poll was done
+        async_add_executor_job.assert_called_once_with(status)
+        self.assertDictEqual(result, {"1": "INIT", "2": 2})
+        # Prepare for next round
+        self.mock_api().set_socketPersistent.reset_mock()
+        self.subject._cached_state["updated_at"] = time()
+
+        # Call the function under test
+        result = await loop.__anext__()
+        # Check that the loop was not restarted
+        self.mock_api().set_socketPersistent.assert_not_called()
+        # Check that a heartbeat poll was done
+        async_add_executor_job.assert_called_with(heartbeat)
+        async_add_executor_job.assert_called_with(receive)
+        self.assertDictEqual(result, {"1": "UPDATED"})
+        # Prepare for next iteration
+        async_add_executor_job.reset_mock()
+        self.subject._running = False
+
+        # Call the function under test
+        result = await loop.__anext__()
+        # Check that the loop terminated
+        self.mock_api().set_socketPersistent.assert_called_once_with(False)
+        self.assertIsNone(result)