test_device.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. import threading
  2. from datetime import datetime, timedelta
  3. from time import sleep, time
  4. from unittest import IsolatedAsyncioTestCase
  5. from unittest.mock import AsyncMock, call, patch
  6. from homeassistant.const import TEMP_CELSIUS
  7. from custom_components.tuya_local.const import (
  8. CONF_TYPE_DEHUMIDIFIER,
  9. CONF_TYPE_FAN,
  10. CONF_TYPE_GECO_HEATER,
  11. CONF_TYPE_GPCV_HEATER,
  12. CONF_TYPE_GPPH_HEATER,
  13. CONF_TYPE_KOGAN_HEATER,
  14. CONF_TYPE_KOGAN_SWITCH,
  15. )
  16. from custom_components.tuya_local.device import TuyaLocalDevice
  17. from .const import (
  18. DEHUMIDIFIER_PAYLOAD,
  19. FAN_PAYLOAD,
  20. GECO_HEATER_PAYLOAD,
  21. GPCV_HEATER_PAYLOAD,
  22. GPPH_HEATER_PAYLOAD,
  23. KOGAN_HEATER_PAYLOAD,
  24. KOGAN_SOCKET_PAYLOAD,
  25. )
  26. class TestDevice(IsolatedAsyncioTestCase):
  27. def setUp(self):
  28. device_patcher = patch("pytuya.Device")
  29. self.addCleanup(device_patcher.stop)
  30. self.mock_api = device_patcher.start()
  31. hass_patcher = patch("homeassistant.core.HomeAssistant")
  32. self.addCleanup(hass_patcher.stop)
  33. self.hass = hass_patcher.start()
  34. self.subject = TuyaLocalDevice(
  35. "Some name", "some_dev_id", "some.ip.address", "some_local_key", self.hass()
  36. )
  37. def test_configures_pytuya_correctly(self):
  38. self.mock_api.assert_called_once_with(
  39. "some_dev_id", "some.ip.address", "some_local_key", "device"
  40. )
  41. self.assertIs(self.subject._api, self.mock_api())
  42. def test_name(self):
  43. """Returns the name given at instantiation."""
  44. self.assertEqual(self.subject.name, "Some name")
  45. def test_unique_id(self):
  46. """Returns the unique ID presented by the API class."""
  47. self.assertIs(self.subject.unique_id, self.mock_api().id)
  48. def test_device_info(self):
  49. """Returns generic info plus the unique ID for categorisation."""
  50. self.assertEqual(
  51. self.subject.device_info,
  52. {
  53. "identifiers": {("tuya_local", self.mock_api().id)},
  54. "name": "Some name",
  55. "manufacturer": "Tuya",
  56. },
  57. )
  58. def test_temperature_unit(self):
  59. self.assertEqual(self.subject.temperature_unit, TEMP_CELSIUS)
  60. async def test_refreshes_state_if_no_cached_state_exists(self):
  61. self.subject._cached_state = {}
  62. self.subject.async_refresh = AsyncMock()
  63. await self.subject.async_inferred_type()
  64. self.subject.async_refresh.assert_awaited()
  65. async def test_detects_geco_heater_payload(self):
  66. self.subject._cached_state = GECO_HEATER_PAYLOAD
  67. self.assertEqual(
  68. await self.subject.async_inferred_type(), CONF_TYPE_GECO_HEATER
  69. )
  70. async def test_detects_gpcv_heater_payload(self):
  71. self.subject._cached_state = GPCV_HEATER_PAYLOAD
  72. self.assertEqual(
  73. await self.subject.async_inferred_type(), CONF_TYPE_GPCV_HEATER
  74. )
  75. async def test_detects_gpph_heater_payload(self):
  76. self.subject._cached_state = GPPH_HEATER_PAYLOAD
  77. self.assertEqual(
  78. await self.subject.async_inferred_type(), CONF_TYPE_GPPH_HEATER
  79. )
  80. async def test_detects_dehumidifier_payload(self):
  81. self.subject._cached_state = DEHUMIDIFIER_PAYLOAD
  82. self.assertEqual(
  83. await self.subject.async_inferred_type(), CONF_TYPE_DEHUMIDIFIER
  84. )
  85. async def test_detects_fan_payload(self):
  86. self.subject._cached_state = FAN_PAYLOAD
  87. self.assertEqual(await self.subject.async_inferred_type(), CONF_TYPE_FAN)
  88. async def test_detects_kogan_heater_payload(self):
  89. self.subject._cached_state = KOGAN_HEATER_PAYLOAD
  90. self.assertEqual(
  91. await self.subject.async_inferred_type(), CONF_TYPE_KOGAN_HEATER
  92. )
  93. async def test_detects_kogan_socket_payload(self):
  94. self.subject._cached_state = KOGAN_SOCKET_PAYLOAD
  95. self.assertEqual(
  96. await self.subject.async_inferred_type(), CONF_TYPE_KOGAN_SWITCH
  97. )
  98. async def test_detection_returns_none_when_device_type_could_not_be_detected(self):
  99. self.subject._cached_state = {"1": False}
  100. self.assertEqual(await self.subject.async_inferred_type(), None)
  101. async def test_does_not_refresh_more_often_than_cache_timeout(self):
  102. refresh_task = AsyncMock()
  103. self.subject._cached_state = {"updated_at": time() - 19}
  104. self.subject._refresh_task = awaitable = refresh_task()
  105. await self.subject.async_refresh()
  106. refresh_task.assert_awaited()
  107. self.assertIs(self.subject._refresh_task, awaitable)
  108. async def test_refreshes_when_there_is_no_pending_reset(self):
  109. async_job = AsyncMock()
  110. self.subject._cached_state = {"updated_at": time() - 19}
  111. self.subject._hass.async_add_executor_job.return_value = awaitable = async_job()
  112. await self.subject.async_refresh()
  113. self.subject._hass.async_add_executor_job.assert_called_once_with(
  114. self.subject.refresh
  115. )
  116. self.assertIs(self.subject._refresh_task, awaitable)
  117. async_job.assert_awaited()
  118. async def test_refreshes_when_there_is_expired_pending_reset(self):
  119. async_job = AsyncMock()
  120. self.subject._cached_state = {"updated_at": time() - 20}
  121. self.subject._hass.async_add_executor_job.return_value = awaitable = async_job()
  122. self.subject._refresh_task = {}
  123. await self.subject.async_refresh()
  124. self.subject._hass.async_add_executor_job.assert_called_once_with(
  125. self.subject.refresh
  126. )
  127. self.assertIs(self.subject._refresh_task, awaitable)
  128. async_job.assert_awaited()
  129. def test_refresh_reloads_status_from_device(self):
  130. self.subject._api.status.return_value = {"dps": {"1": False}}
  131. self.subject._cached_state = {"1": True}
  132. self.subject.refresh()
  133. self.subject._api.status.assert_called_once()
  134. self.assertEqual(self.subject._cached_state["1"], False)
  135. self.assertTrue(
  136. time() - 1 <= self.subject._cached_state["updated_at"] <= time()
  137. )
  138. def test_refresh_retries_up_to_four_times(self):
  139. self.subject._api.status.side_effect = [
  140. Exception("Error"),
  141. Exception("Error"),
  142. Exception("Error"),
  143. {"dps": {"1": False}},
  144. ]
  145. self.subject.refresh()
  146. self.assertEqual(self.subject._api.status.call_count, 4)
  147. self.assertEqual(self.subject._cached_state["1"], False)
  148. def test_refresh_clears_cached_state_and_pending_updates_after_failing_four_times(
  149. self,
  150. ):
  151. self.subject._cached_state = {"1": True}
  152. self.subject._pending_updates = {"1": False}
  153. self.subject._api.status.side_effect = [
  154. Exception("Error"),
  155. Exception("Error"),
  156. Exception("Error"),
  157. Exception("Error"),
  158. ]
  159. self.subject.refresh()
  160. self.assertEqual(self.subject._api.status.call_count, 4)
  161. self.assertEqual(self.subject._cached_state, {"updated_at": 0})
  162. self.assertEqual(self.subject._pending_updates, {})
  163. def test_api_protocol_version_is_rotated_with_each_failure(self):
  164. self.subject._api.set_version.assert_called_once_with(3.3)
  165. self.subject._api.set_version.reset_mock()
  166. self.subject._api.status.side_effect = [
  167. Exception("Error"),
  168. Exception("Error"),
  169. Exception("Error"),
  170. Exception("Error"),
  171. ]
  172. self.subject.refresh()
  173. self.subject._api.set_version.assert_has_calls(
  174. [call(3.1), call(3.3), call(3.1)]
  175. )
  176. def test_api_protocol_version_is_stable_once_successful(self):
  177. self.subject._api.set_version.assert_called_once_with(3.3)
  178. self.subject._api.set_version.reset_mock()
  179. self.subject._api.status.side_effect = [
  180. {"dps": {"1": False}},
  181. Exception("Error"),
  182. Exception("Error"),
  183. Exception("Error"),
  184. Exception("Error"),
  185. Exception("Error"),
  186. Exception("Error"),
  187. ]
  188. self.subject.refresh()
  189. self.subject.refresh()
  190. self.subject.refresh()
  191. self.subject._api.set_version.assert_has_calls([call(3.1), call(3.3)])
  192. def test_reset_cached_state_clears_cached_state_and_pending_updates(self):
  193. self.subject._cached_state = {"1": True, "updated_at": time()}
  194. self.subject._pending_updates = {"1": False}
  195. self.subject._reset_cached_state()
  196. self.assertEqual(self.subject._cached_state, {"updated_at": 0})
  197. self.assertEqual(self.subject._pending_updates, {})
  198. def test_get_property_returns_value_from_cached_state(self):
  199. self.subject._cached_state = {"1": True}
  200. self.assertEqual(self.subject.get_property("1"), True)
  201. def test_get_property_returns_pending_update_value(self):
  202. self.subject._pending_updates = {
  203. "1": {"value": False, "updated_at": time() - 9}
  204. }
  205. self.assertEqual(self.subject.get_property("1"), False)
  206. def test_pending_update_value_overrides_cached_value(self):
  207. self.subject._cached_state = {"1": True}
  208. self.subject._pending_updates = {
  209. "1": {"value": False, "updated_at": time() - 9}
  210. }
  211. self.assertEqual(self.subject.get_property("1"), False)
  212. def test_expired_pending_update_value_does_not_override_cached_value(self):
  213. self.subject._cached_state = {"1": True}
  214. self.subject._pending_updates = {
  215. "1": {"value": False, "updated_at": time() - 10}
  216. }
  217. self.assertEqual(self.subject.get_property("1"), True)
  218. def test_get_property_returns_none_when_value_does_not_exist(self):
  219. self.subject._cached_state = {"1": True}
  220. self.assertIs(self.subject.get_property("2"), None)
  221. async def test_async_set_property_schedules_job(self):
  222. async_job = AsyncMock()
  223. self.subject._hass.async_add_executor_job.return_value = awaitable = async_job()
  224. await self.subject.async_set_property("1", False)
  225. self.subject._hass.async_add_executor_job.assert_called_once_with(
  226. self.subject.set_property, "1", False
  227. )
  228. async_job.assert_awaited()
  229. def test_set_property_immediately_stores_new_value_to_pending_updates(self):
  230. self.subject.set_property("1", False)
  231. self.subject._cached_state = {"1": True}
  232. self.assertEqual(self.subject.get_property("1"), False)
  233. def test_debounces_multiple_set_calls_into_one_api_call(self):
  234. with patch("custom_components.tuya_local.device.Timer") as mock:
  235. self.subject.set_property("1", True)
  236. mock.assert_called_once_with(1, self.subject._send_pending_updates)
  237. debounce = self.subject._debounce
  238. mock.reset_mock()
  239. self.subject.set_property("2", False)
  240. debounce.cancel.assert_called_once()
  241. mock.assert_called_once_with(1, self.subject._send_pending_updates)
  242. self.subject._api.generate_payload.return_value = "payload"
  243. self.subject._send_pending_updates()
  244. self.subject._api.generate_payload.assert_called_once_with(
  245. "set", {"1": True, "2": False}
  246. )
  247. self.subject._api._send_receive.assert_called_once_with("payload")
  248. def test_set_properties_takes_no_action_when_no_properties_are_provided(self):
  249. with patch("custom_components.tuya_local.device.Timer") as mock:
  250. self.subject._set_properties({})
  251. mock.assert_not_called()
  252. def test_anticipate_property_value_updates_cached_state(self):
  253. self.subject._cached_state = {"1": True}
  254. self.subject.anticipate_property_value("1", False)
  255. self.assertEqual(self.subject._cached_state["1"], False)
  256. def test_get_key_for_value_returns_key_from_object_matching_value(self):
  257. obj = {"key1": "value1", "key2": "value2"}
  258. self.assertEqual(TuyaLocalDevice.get_key_for_value(obj, "value1"), "key1")
  259. self.assertEqual(TuyaLocalDevice.get_key_for_value(obj, "value2"), "key2")
  260. def test_get_key_for_value_returns_fallback_when_value_not_found(self):
  261. obj = {"key1": "value1", "key2": "value2"}
  262. self.assertEqual(
  263. TuyaLocalDevice.get_key_for_value(obj, "value3", fallback="fb"), "fb"
  264. )