Browse Source

Convert switch to generic component.

Jason Rumney 4 years ago
parent
commit
c5971f760e

+ 6 - 0
custom_components/tuya_local/devices/kogan_switch.yaml

@@ -15,11 +15,17 @@ primary_entity:
       type: integer
       name: current_a
       readonly: true
+      mapping:
+        - scale: 1000
     - id: 5
       type: integer
       name: current_power_w
       readonly: true
+      mapping:
+        - scale: 10
     - id: 6
       type: integer
       name: voltage_v
       readonly: true
+      mapping:
+        - scale: 10

+ 120 - 0
custom_components/tuya_local/generic/switch.py

@@ -0,0 +1,120 @@
+"""
+Platform to control Tuya switches.
+Initially based on the Kogan Switch and secondary switch for Purline M100
+heater open window detector toggle.
+"""
+from homeassistant.components.switch import SwitchEntity
+from homeassistant.components.switch import (
+    ATTR_CURRENT_POWER_W,
+    DEVICE_CLASS_OUTLET,
+    DEVICE_CLASS_SWITCH,
+)
+
+from homeassistant.const import STATE_UNAVAILABLE
+
+from ..device import TuyaLocalDevice
+from ..helpers.device_config import TuyaEntityConfig
+
+
+class TuyaLocalSwitch(SwitchEntity):
+    """Representation of a Tuya Switch"""
+
+    def __init__(self, device: TuyaLocalDevice, config: TuyaEntityConfig):
+        """
+        Initialize the switch.
+        Args:
+            device (TuyaLocalDevice): The device API instance.
+        """
+        self._device = device
+        self._config = config
+        self._attr_dps = []
+        for d in config.dps():
+            if d.name == "switch":
+                self._switch_dps = d
+            else:
+                if d.name == "current_power_w":
+                    self._power_dps = d
+                self._attr_dps.append(d)
+
+    @property
+    def should_poll(self):
+        """Return the polling state."""
+        return True
+
+    @property
+    def name(self):
+        """Return the name of the device."""
+        return self._device.name
+
+    @property
+    def friendly_name(self):
+        """Return the friendly name for this entity."""
+        return self._config.name
+
+    @property
+    def unique_id(self):
+        """Return the unique id of the device."""
+        return self._device.unique_id
+
+    @property
+    def device_info(self):
+        """Return device information about the device."""
+        return self._device.device_info
+
+    @property
+    def device_class(self):
+        """Return the class of this device"""
+        return (
+            DEVICE_CLASS_OUTLET
+            if self._config.device_class == "outlet"
+            else DEVICE_CLASS_SWITCH
+        )
+
+    @property
+    def is_on(self):
+        """Return whether the switch is on or not."""
+        is_switched_on = self._switch_dps.map_from_dps(
+            self._device.get_property(self._switch_dps.id)
+        )
+
+        if is_switched_on is None:
+            return STATE_UNAVAILABLE
+        else:
+            return is_switched_on
+
+    @property
+    def current_power_w(self):
+        """Return the current power consumption in Watts."""
+        if self._power_dps is None:
+            return None
+
+        pwr = self._power_dps.map_from_dps(
+            self._device.get_property(self._power_dps.id)
+        )
+        if pwr is None:
+            return STATE_UNAVAILABLE
+
+        return pwr
+
+    @property
+    def device_state_attributes(self):
+        """Get additional attributes that HA doesn't naturally support."""
+        attr = {}
+        for a in self._attr_dps:
+            attr[a.name] = a.map_from_dps(self._device.get_property(a.id))
+        return attr
+
+    async def async_turn_on(self, **kwargs):
+        """Turn the switch on"""
+        await self._device.async_set_property(
+            self._switch_dps.id, self._switch_dps.map_to_dps(True)
+        )
+
+    async def async_turn_off(self, **kwargs):
+        """Turn the switch off"""
+        await self._device.async_set_property(
+            self._switch_dps.id, self._switch_dps.map_to_dps(False)
+        )
+
+    async def async_update(self):
+        await self._device.async_refresh()

+ 24 - 4
custom_components/tuya_local/helpers/device_config.py

@@ -126,6 +126,11 @@ class TuyaEntityConfig:
         """The entity type of this entity."""
         return self._config["entity"]
 
+    @property
+    def device_class(self):
+        """The device class of this entity."""
+        return self._config.get("class", None)
+
     def dps(self):
         """Iterate through the list of dps for this entity."""
         for d in self._config["dps"]:
@@ -165,9 +170,10 @@ class TuyaDpsConfig:
 
     def map_from_dps(self, value):
         result = value
+        scale = 1
         if "mapping" in self._config.keys():
             for map in self._config["mapping"]:
-                if map["dps_val"] == value and "value" in map:
+                if "value" in map and ("dps_val" not in map or map["dps_val"] == value):
                     result = map["value"]
                     _LOGGER.debug(
                         "%s: Mapped dps %d value from %s to %s",
@@ -176,13 +182,21 @@ class TuyaDpsConfig:
                         value,
                         result,
                     )
-        return result
+                if "scale" in map and "value" not in map:
+                    scale = map["scale"]
+        return (
+            result
+            if scale == 1 or not isinstance(result, (int, float))
+            else result / scale
+        )
 
     def map_to_dps(self, value):
         result = value
+        scale = 1
         if "mapping" in self._config.keys():
             for map in self._config["mapping"]:
-                if "value" in map and map["value"] == value:
+
+                if "value" in map and "dps_val" in map and map["value"] == value:
                     result = map["dps_val"]
                     _LOGGER.debug(
                         "%s: Mapped dps %d to %s from %s",
@@ -191,7 +205,13 @@ class TuyaDpsConfig:
                         result,
                         value,
                     )
-        return result
+                if "scale" in map and "value" not in map:
+                    scale = map["scale"]
+        return (
+            result
+            if scale == 1 or not isinstance(result, (int, float))
+            else result * scale
+        )
 
 
 def available_configs():

+ 1 - 1
custom_components/tuya_local/manifest.json

@@ -2,7 +2,7 @@
     "domain": "tuya_local",
     "iot_class": "local_polling",
     "name": "Tuya based devices local control",
-    "version": "0.4.1", 
+    "version": "0.4.2", 
     "documentation": "https://github.com/make-all/tuya-local",
     "issue_tracker": "https://github.com/make-all/tuya-local/issues",
     "dependencies": [],

+ 2 - 4
custom_components/tuya_local/switch.py

@@ -10,6 +10,7 @@ from .const import (
     CONF_TYPE,
     CONF_TYPE_AUTO,
 )
+from .generic.switch import TuyaLocalSwitch
 from .helpers.device_config import config_for_legacy_use
 
 _LOGGER = logging.getLogger(__name__)
@@ -35,10 +36,7 @@ async def async_setup_platform(hass, config, async_add_entities, discovery_info=
         if ecfg.entity != "switch":
             raise ValueError(f"{device.name} does not support use as a switch device.")
 
-    legacy_class = ecfg.legacy_class
-    # Instantiate it: Sonarcloud thinks this is a blocker bug, and legacy_class
-    # is not callable, but the unit tests show the object is created...
-    data[CONF_SWITCH] = legacy_class(device)
+    data[CONF_SWITCH] = TuyaLocalSwitch(device, ecfg)
     async_add_entities([data[CONF_SWITCH]])
     _LOGGER.debug(f"Adding switch for {discovery_info[CONF_TYPE]}")
 

+ 124 - 3
tests/test_switch.py

@@ -1,7 +1,10 @@
 """Tests for the switch entity."""
-import pytest
 from pytest_homeassistant_custom_component.common import MockConfigEntry
-from unittest.mock import AsyncMock, Mock
+from unittest import IsolatedAsyncioTestCase
+from unittest.mock import AsyncMock, Mock, patch
+
+from homeassistant.components.switch import ATTR_CURRENT_POWER_W, DEVICE_CLASS_OUTLET
+from homeassistant.const import STATE_UNAVAILABLE
 
 from custom_components.tuya_local.const import (
     CONF_DEVICE_ID,
@@ -11,9 +14,20 @@ from custom_components.tuya_local.const import (
     CONF_TYPE_KOGAN_SWITCH,
     DOMAIN,
 )
+from custom_components.tuya_local.generic.switch import TuyaLocalSwitch
+from custom_components.tuya_local.helpers.device_config import config_for_legacy_use
 from custom_components.tuya_local.kogan_socket.switch import KoganSocketSwitch
 from custom_components.tuya_local.switch import async_setup_entry
 
+from .const import KOGAN_SOCKET_PAYLOAD
+from .helpers import assert_device_properties_set
+
+KOGAN_SWITCH_DPS = "1"
+KOGAN_TIMER_DPS = "2"
+KOGAN_CURRENT_DPS = "4"
+KOGAN_POWER_DPS = "5"
+KOGAN_VOLTAGE_DPS = "6"
+
 
 async def test_init_entry(hass):
     """Test the initialisation."""
@@ -33,5 +47,112 @@ async def test_init_entry(hass):
     hass.data[DOMAIN]["dummy"]["device"] = m_device
 
     await async_setup_entry(hass, entry, m_add_entities)
-    assert type(hass.data[DOMAIN]["dummy"][CONF_SWITCH]) == KoganSocketSwitch
+    assert type(hass.data[DOMAIN]["dummy"][CONF_SWITCH]) == TuyaLocalSwitch
     m_add_entities.assert_called_once()
+
+
+class TestTuyaLocalSwitch(IsolatedAsyncioTestCase):
+    def setUp(self):
+        device_patcher = patch("custom_components.tuya_local.device.TuyaLocalDevice")
+        self.addCleanup(device_patcher.stop)
+        self.mock_device = device_patcher.start()
+        kogan_switch_config = config_for_legacy_use(CONF_TYPE_KOGAN_SWITCH)
+        switch = kogan_switch_config.primary_entity
+
+        self.subject = TuyaLocalSwitch(self.mock_device(), switch)
+        self.dps = KOGAN_SOCKET_PAYLOAD.copy()
+
+        self.subject._device.get_property.side_effect = lambda id: self.dps[id]
+
+    def test_should_poll(self):
+        self.assertTrue(self.subject.should_poll)
+
+    def test_name_returns_device_name(self):
+        self.assertEqual(self.subject.name, self.subject._device.name)
+
+    def test_unique_id_returns_device_unique_id(self):
+        self.assertEqual(self.subject.unique_id, self.subject._device.unique_id)
+
+    def test_device_class_is_outlet(self):
+        self.assertEqual(self.subject.device_class, DEVICE_CLASS_OUTLET)
+
+    def test_is_on(self):
+        self.dps[KOGAN_SWITCH_DPS] - True
+        self.assertTrue(self.subject.is_on)
+
+        self.dps[KOGAN_SWITCH_DPS] = False
+        self.assertFalse(self.subject.is_on)
+
+    def test_is_on_when_unavailable(self):
+        self.dps[KOGAN_SWITCH_DPS] = None
+        self.assertEqual(self.subject.is_on, STATE_UNAVAILABLE)
+
+    async def test_turn_on(self):
+        async with assert_device_properties_set(
+            self.subject._device, {KOGAN_SWITCH_DPS: True}
+        ):
+            await self.subject.async_turn_on()
+
+    async def test_turn_off(self):
+        async with assert_device_properties_set(
+            self.subject._device, {KOGAN_SWITCH_DPS: False}
+        ):
+            await self.subject.async_turn_off()
+
+    async def test_toggle_turns_the_switch_on_when_it_was_off(self):
+        self.dps[KOGAN_SWITCH_DPS] = False
+
+        async with assert_device_properties_set(
+            self.subject._device, {KOGAN_SWITCH_DPS: True}
+        ):
+            await self.subject.async_toggle()
+
+    async def test_toggle_turns_the_switch_off_when_it_was_on(self):
+        self.dps[KOGAN_SWITCH_DPS] = True
+
+        async with assert_device_properties_set(
+            self.subject._device, {KOGAN_SWITCH_DPS: False}
+        ):
+            await self.subject.async_toggle()
+
+    def test_current_power_w(self):
+        self.dps[KOGAN_POWER_DPS] = 1234
+        self.assertEqual(self.subject.current_power_w, 123.4)
+
+    def test_device_state_attributes_set(self):
+        self.dps[KOGAN_TIMER_DPS] = 1
+        self.dps[KOGAN_VOLTAGE_DPS] = 2350
+        self.dps[KOGAN_CURRENT_DPS] = 1234
+        self.dps[KOGAN_POWER_DPS] = 5678
+        self.assertEqual(
+            self.subject.device_state_attributes,
+            {
+                "timer": 1,
+                "current_a": 1.234,
+                "voltage_v": 235.0,
+                "current_power_w": 567.8,
+            },
+        )
+
+        self.dps[KOGAN_TIMER_DPS] = 0
+        self.dps[KOGAN_CURRENT_DPS] = None
+        self.dps[KOGAN_VOLTAGE_DPS] = None
+        self.dps[KOGAN_POWER_DPS] = None
+        self.assertEqual(
+            self.subject.device_state_attributes,
+            {
+                "timer": 0,
+                "current_a": None,
+                "voltage_v": None,
+                "current_power_w": None,
+            },
+        )
+
+    async def test_update(self):
+        result = AsyncMock()
+        self.subject._device.async_refresh.return_value = result()
+
+        await self.subject.async_update()
+
+        self.subject._device.async_refresh.assert_called_once()
+        result.assert_awaited()