| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653 |
- from time import time
- import pytest
- # from homeassistant.const import EVENT_HOMEASSISTANT_STARTED, EVENT_HOMEASSISTANT_STOP
- from custom_components.tuya_local.device import TuyaLocalDevice
- from .const import EUROM_600_HEATER_PAYLOAD
- @pytest.fixture
- def mock_api(mocker):
- mock = mocker.patch("tinytuya.Device")
- mock.parent = None
- yield mock
- @pytest.fixture
- def patched_hass(hass, mocker):
- hass.is_running = True
- hass.is_stopping = False
- hass.data = {"tuya_local": {}}
- async def job(func, *args):
- print(f"{args}")
- return func(*args)
- mocker.patch.object(hass, "async_add_executor_job", side_effect=job)
- mocker.patch.object(hass, "async_create_task")
- return hass
- @pytest.fixture
- def subject(patched_hass, mock_api, mocker):
- subject = TuyaLocalDevice(
- "Some name",
- "some_dev_id",
- "some.ip.address",
- "some_local_key",
- "auto",
- None,
- patched_hass,
- )
- # For most tests we want the protocol working
- subject._api_protocol_version_index = 0
- subject._api_protocol_working = True
- subject._protocol_configured = "auto"
- return subject
- def test_name(subject):
- """Returns the name given at instantiation."""
- assert subject.name == "Some name"
- def test_unique_id(subject, mock_api):
- """Returns the unique ID presented by the API class."""
- assert subject.unique_id is mock_api().id
- def test_device_info(subject, mock_api):
- """Returns generic info plus the unique ID for categorisation."""
- assert subject.device_info == {
- "identifiers": {("tuya_local", mock_api().id)},
- "name": "Some name",
- "manufacturer": "Tuya",
- }
- def test_has_returned_state(subject):
- """Returns True if the device has returned its state."""
- subject._cached_state = EUROM_600_HEATER_PAYLOAD
- assert subject.has_returned_state
- subject._cached_state = {"updated_at": 0}
- assert not subject.has_returned_state
- @pytest.mark.asyncio
- async def test_refreshes_state_if_no_cached_state_exists(subject, mocker):
- subject._cached_state = {}
- subject.async_refresh = mocker.AsyncMock()
- await subject.async_inferred_type()
- subject.async_refresh.assert_awaited()
- @pytest.mark.asyncio
- async def test_detection_returns_none_when_device_type_not_detected(subject):
- subject._cached_state = {"192": False, "updated_at": time()}
- assert await subject.async_inferred_type() is None
- @pytest.mark.asyncio
- async def test_refresh_retries_up_to_eleven_times(subject, mock_api):
- subject._api_protocol_working = False
- mock_api().status.side_effect = [
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- {"dps": {"1": False}},
- ]
- await subject.async_refresh()
- assert mock_api().status.call_count == 11
- assert subject._cached_state["1"] is False
- @pytest.mark.asyncio
- async def test_refresh_clears_cache_after_allowed_failures(subject, mock_api):
- subject._cached_state = {"1": True}
- subject._pending_updates = {
- "1": {"value": False, "updated_at": time(), "sent": True}
- }
- mock_api().status.side_effect = [
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- ]
- await subject.async_refresh()
- assert mock_api().status.call_count == 3
- assert subject._cached_state == {"updated_at": 0}
- assert subject._pending_updates == {}
- @pytest.mark.asyncio
- async def test_api_protocol_version_is_rotated_with_each_failure(
- subject, mock_api, mocker
- ):
- subject._api_protocol_version_index = None
- subject._api_protocol_working = False
- mock_api().status.side_effect = [
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- ]
- await subject.async_refresh()
- mock_api().set_version.assert_has_calls(
- [
- mocker.call(3.1),
- mocker.call(3.2),
- mocker.call(3.4),
- mocker.call(3.5),
- mocker.call(3.3),
- mocker.call(3.3),
- mocker.call(3.1),
- ]
- )
- @pytest.mark.asyncio
- async def test_api_protocol_version_is_stable_once_successful(
- subject, mock_api, mocker
- ):
- subject._api_protocol_version_index = None
- subject._api_protocol_working = False
- mock_api().status.side_effect = [
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- {"dps": {"1": False}},
- {"dps": {"1": False}},
- Exception("Error"),
- Exception("Error"),
- {"dps": {"1": False}},
- ]
- await subject.async_refresh()
- assert subject._api_protocol_version_index == 3
- assert subject._api_protocol_working
- await subject.async_refresh()
- assert subject._api_protocol_version_index == 3
- await subject.async_refresh()
- assert subject._api_protocol_version_index == 3
- mock_api().set_version.assert_has_calls(
- [
- mocker.call(3.1),
- mocker.call(3.2),
- mocker.call(3.4),
- ]
- )
- @pytest.mark.asyncio
- async def test_api_protocol_version_is_not_rotated_when_not_auto(subject, mock_api):
- # Set up preconditions for the test
- subject._protocol_configured = 3.4
- subject._api_protocol_version_index = None
- subject._api_protocol_working = False
- mock_api().status.side_effect = [
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- {"dps": {"1": False}},
- {"dps": {"1": False}},
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- Exception("Error"),
- {"dps": {"1": False}},
- ]
- await subject._rotate_api_protocol_version()
- mock_api().set_version.assert_called_once_with(3.4)
- mock_api().set_version.reset_mock()
- await subject.async_refresh()
- assert subject._api_protocol_version_index == 3
- await subject.async_refresh()
- assert subject._api_protocol_version_index == 3
- await subject.async_refresh()
- assert subject._api_protocol_version_index == 3
- def test_reset_cached_state_clears_cached_state_and_pending_updates(subject):
- subject._cached_state = {"1": True, "updated_at": time()}
- subject._pending_updates = {
- "1": {"value": False, "updated_at": time(), "sent": True}
- }
- subject._reset_cached_state()
- assert subject._cached_state == {"updated_at": 0}
- assert subject._pending_updates == {}
- def test_get_property_returns_value_from_cached_state(subject):
- subject._cached_state = {"1": True}
- assert subject.get_property("1") is True
- def test_get_property_returns_pending_update_value(subject):
- subject._pending_updates = {
- "1": {"value": False, "updated_at": time() - 4, "sent": True}
- }
- assert subject.get_property("1") is False
- def test_pending_update_value_overrides_cached_value(subject):
- subject._cached_state = {"1": True}
- subject._pending_updates = {
- "1": {"value": False, "updated_at": time() - 4, "sent": True}
- }
- assert subject.get_property("1") is False
- def test_expired_pending_update_value_does_not_override_cached_value(subject):
- subject._cached_state = {"1": True}
- subject._pending_updates = {
- "1": {"value": False, "updated_at": time() - 5, "sent": True}
- }
- assert subject.get_property("1") is True
- def test_get_property_returns_none_when_value_does_not_exist(subject):
- subject._cached_state = {"1": True}
- assert subject.get_property("2") is None
- @pytest.mark.asyncio
- async def test_async_set_property_sends_to_api(subject, mock_api):
- await subject.async_set_property("1", False)
- mock_api().set_multiple_values.assert_called_once()
- @pytest.mark.asyncio
- async def test_set_property_immediately_stores_pending_updates(subject):
- subject._cached_state = {"1": True}
- await subject.async_set_property("1", False)
- assert not subject.get_property("1")
- @pytest.mark.asyncio
- async def test_set_properties_takes_no_action_when_nothing_provided(subject, mocker):
- mock = mocker.patch("asyncio.sleep")
- await subject.async_set_properties({})
- mock.assert_not_called()
- def test_anticipate_property_value_updates_cached_state(subject):
- subject._cached_state = {"1": True}
- subject.anticipate_property_value("1", False)
- assert subject._cached_state["1"] is False
- def test_get_key_for_value_returns_key_from_object_matching_value(subject):
- obj = {"key1": "value1", "key2": "value2"}
- assert TuyaLocalDevice.get_key_for_value(obj, "value1") == "key1"
- assert TuyaLocalDevice.get_key_for_value(obj, "value2") == "key2"
- def test_get_key_for_value_returns_fallback_when_value_not_found(subject):
- obj = {"key1": "value1", "key2": "value2"}
- assert TuyaLocalDevice.get_key_for_value(obj, "value3", fallback="fb") == "fb"
- def test_refresh_cached_state(subject, mock_api):
- # set up preconditions
- mock_api().status.return_value = {"dps": {"1": "CHANGED"}}
- subject._cached_state = {"1": "UNCHANGED", "updated_at": 123}
- # call the function under test
- subject._refresh_cached_state()
- # Did it call the API as expected?
- mock_api().status.assert_called_once()
- # Did it update the cached state?
- assert subject._cached_state == {"1": "CHANGED"} | subject._cached_state
- # Did it update the timestamp on the cached state?
- assert subject._cached_state["updated_at"] == pytest.approx(time(), abs=2)
- def test_set_values(subject, mock_api):
- # set up preconditions
- subject._pending_updates = {
- "1": {"value": "sample", "updated_at": time() - 2, "sent": False},
- }
- # call the function under test
- subject._set_values({"1": "sample"})
- # did it send what it was asked?
- mock_api().set_multiple_values.assert_called_once_with({"1": "sample"}, nowait=True)
- # did it mark the pending updates as sent?
- assert subject._pending_updates["1"]["sent"]
- # did it update the time on the pending updates?
- assert subject._pending_updates["1"]["updated_at"] == pytest.approx(time(), abs=2)
- # did it lock and unlock when sending
- # subject._lock.acquire.assert_called_once()
- # subject._lock.release.assert_called_once()
- def test_pending_updates_cleared_on_receipt(subject):
- # Set up the preconditions
- now = time()
- subject._pending_updates = {
- "1": {"value": True, "updated_at": now, "sent": True},
- "2": {"value": True, "updated_at": now, "sent": False}, # unsent
- "3": {"value": True, "updated_at": now, "sent": True}, # unmatched
- "4": {"value": True, "updated_at": now, "sent": True}, # not received
- }
- subject._remove_properties_from_pending_updates({"1": True, "2": True, "3": False})
- assert subject._pending_updates == {
- "2": {"value": True, "updated_at": now, "sent": False},
- "3": {"value": True, "updated_at": now, "sent": True},
- "4": {"value": True, "updated_at": now, "sent": True},
- }
- def test_actually_start(subject, mocker, patched_hass):
- # Set up the preconditions
- mocker.patch.object(subject, "receive_loop", return_value="LOOP")
- mocker.patch.object(subject, "_refresh_task", new=mocker.AsyncMock)
- subject._running = False
- mocker.patch.object(patched_hass, "async_create_task")
- # patched_hass.async_create_task = mocker.MagicMock()
- # patched_hass.bus.async_listen_once = mocker.AsyncMock()
- # patched_hass.bus.async_listen_once.return_value = "LISTENER"
- # run the function under test
- subject.actually_start()
- # did it register a listener for EVENT_HOMEASSISTANT_STOP?
- # patched_hass.bus.async_listen_once.assert_called_once_with(
- # EVENT_HOMEASSISTANT_STOP, subject.async_stop
- # )
- # assert subject._shutdown_listener == "LISTENER"
- # did it set the running flag?
- assert subject._running
- # did it schedule the loop?
- # task.assert_called_once()
- def test_start_starts_when_ha_running(subject, patched_hass, mocker):
- # Set up preconditions
- patched_hass.is_running = True
- listener = mocker.MagicMock()
- subject._startup_listener = listener
- subject.actually_start = mocker.MagicMock()
- # Call the function under test
- subject.start()
- # Did it actually start?
- subject.actually_start.assert_called_once()
- # Did it cancel the startup listener?
- assert subject._startup_listener is None
- listener.assert_called_once()
- def test_start_schedules_for_later_when_ha_starting(subject, patched_hass, mocker):
- # Set up preconditions
- patched_hass.is_running = False
- subject.actually_start = mocker.MagicMock()
- # Call the function under test
- subject.start()
- # Did it avoid actually starting?
- subject.actually_start.assert_not_called()
- # Did it register a listener?
- # assert subject._startup_listener == "LISTENER"
- # patched_hass.bus.async_listen_once.assert_called_once_with(
- # EVENT_HOMEASSISTANT_STARTED, subject.actually_start
- # )
- def test_start_does_nothing_when_ha_stopping(subject, patched_hass, mocker):
- # Set up preconditions
- patched_hass.is_running = True
- patched_hass.is_stopping = True
- subject.actually_start = mocker.MagicMock()
- # Call the function under test
- subject.start()
- # Did it avoid actually starting?
- subject.actually_start.assert_not_called()
- # Did it avoid registering a listener?
- # patched_hass.bus.async_listen_once.assert_not_called()
- assert subject._startup_listener is None
- @pytest.mark.asyncio
- async def test_async_stop(subject, mocker):
- # Set up preconditions
- listener = mocker.MagicMock()
- subject._refresh_task = None
- subject._shutdown_listener = listener
- subject._children = [1, 2, 3]
- # Call the function under test
- await subject.async_stop()
- # Shutdown listener doesn't get cancelled as HA does that
- listener.assert_not_called()
- # Were the child entities cleared?
- assert subject._children == []
- # Did it wait for the refresh task to finish then clear it?
- # This doesn't work because AsyncMock only mocks awaitable method calls
- # but we want an awaitable object
- # refresh.assert_awaited_once()
- assert subject._refresh_task is None
- @pytest.mark.asyncio
- async def test_async_stop_when_not_running(subject):
- # Set up preconditions
- _refresh_task = None
- subject._shutdown_listener = None
- subject._children = []
- # Call the function under test
- await subject.async_stop()
- # Was the shutdown listener left empty?
- assert subject._shutdown_listener is None
- # Were the child entities cleared?
- assert subject._children == []
- # Was the refresh task left empty?
- assert subject._refresh_task is None
- def test_register_first_entity_ha_running(subject, mocker):
- # Set up preconditions
- subject._children = []
- subject._running = False
- subject._startup_listener = None
- subject.start = mocker.MagicMock()
- entity = mocker.AsyncMock()
- entity._config = mocker.MagicMock()
- entity._config.dps.return_value = []
- # despite the name, the below HA function is not async and does not need to be awaited
- entity.async_schedule_update_ha_state = mocker.MagicMock()
- # Call the function under test
- subject.register_entity(entity)
- # Was the entity added to the list?
- assert subject._children == [entity]
- # Did we start the loop?
- subject.start.assert_called_once()
- def test_register_subsequent_entity_ha_running(subject, mocker):
- # Set up preconditions
- first = mocker.AsyncMock()
- second = mocker.AsyncMock()
- second._config = mocker.MagicMock()
- second._config.dps.return_value = []
- subject._children = [first]
- subject._running = True
- subject._startup_listener = None
- subject.start = mocker.MagicMock()
- # Call the function under test
- subject.register_entity(second)
- # Was the entity added to the list?
- assert set(subject._children) == set([first, second])
- # Did we avoid restarting the loop?
- subject.start.assert_not_called()
- def test_register_subsequent_entity_ha_starting(subject, mocker):
- # Set up preconditions
- first = mocker.AsyncMock()
- second = mocker.AsyncMock()
- second._config = mocker.MagicMock()
- second._config.dps.return_value = []
- subject._children = [first]
- subject._running = False
- subject._startup_listener = mocker.MagicMock()
- subject.start = mocker.MagicMock()
- # Call the function under test
- subject.register_entity(second)
- # Was the entity added to the list?
- assert set(subject._children) == set([first, second])
- # Did we avoid restarting the loop?
- subject.start.assert_not_called()
- @pytest.mark.asyncio
- async def test_unregister_one_of_many_entities(subject, mocker):
- # Set up preconditions
- subject._children = ["First", "Second"]
- subject.async_stop = mocker.AsyncMock()
- # Call the function under test
- await subject.async_unregister_entity("First")
- # Was the entity removed from the list?
- assert set(subject._children) == set(["Second"])
- # Is the loop still running?
- subject.async_stop.assert_not_called()
- @pytest.mark.asyncio
- async def test_unregister_last_entity(subject, mocker):
- # Set up preconditions
- subject._children = ["Last"]
- subject.async_stop = mocker.AsyncMock()
- # Call the function under test
- await subject.async_unregister_entity("Last")
- # Was the entity removed from the list?
- assert subject._children == []
- # Was the loop stopped?
- subject.async_stop.assert_called_once()
- @pytest.mark.asyncio
- async def test_async_receive(subject, mock_api, mocker):
- # Set up preconditions
- mock_api().status.return_value = {"dps": {"1": "INIT", "2": 2}}
- mock_api().receive.return_value = {"1": "UPDATED"}
- subject._running = True
- subject._cached_state = {"updated_at": 0}
- # Call the function under test
- print("starting test loop...")
- loop = subject.async_receive()
- print("getting first iteration...")
- result = await loop.__anext__()
- # Check that the loop was started, but without persistent connection
- # since there was no state returned yet and it might need to negotiate
- # version.
- mock_api().set_socketPersistent.assert_called_once_with(False)
- # Check that a full poll was done
- mock_api().status.assert_called_once()
- assert result == {"1": "INIT", "2": 2, "full_poll": mocker.ANY}
- # Prepare for next round
- subject._cached_state = subject._cached_state | result
- mock_api().set_socketPersistent.reset_mock()
- mock_api().status.reset_mock()
- subject._cached_state["updated_at"] = time()
- # Call the function under test
- print("getting second iteration...")
- result = await loop.__anext__()
- # Check that a heartbeat poll was done
- mock_api().status.assert_not_called()
- mock_api().heartbeat.assert_called_once()
- mock_api().receive.assert_called_once()
- assert result == {"1": "UPDATED", "full_poll": mocker.ANY}
- # Check that the connection was made persistent now that data has been
- # returned
- mock_api().set_socketPersistent.assert_called_once_with(True)
- # Prepare for next iteration
- subject._running = False
- mock_api().set_socketPersistent.reset_mock()
- # Call the function under test
- print("getting last iteration...")
- try:
- result = await loop.__anext__()
- pytest.fail("Should have raised an exception to quit the loop")
- # Check that the loop terminated
- except StopAsyncIteration:
- pass
- mock_api().set_socketPersistent.assert_called_once_with(False)
- def test_should_poll(subject):
- subject._cached_state = {"1": "sample", "updated_at": time()}
- subject._poll_only = False
- subject._temporary_poll = False
- # Test temporary poll via pause/resume
- assert not subject.should_poll
- subject.pause()
- assert subject.should_poll
- subject.resume()
- assert not subject.should_poll
- # Test configured polling
- subject._poll_only = True
- assert subject.should_poll
- subject._poll_only = False
- # Test initial polling
- subject._cached_state = {}
- assert subject.should_poll
|