| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- """
- Setup for different kinds of Tuya cover devices
- """
- import logging
- from homeassistant.components.cover import (
- CoverDeviceClass,
- CoverEntity,
- CoverEntityFeature,
- )
- from homeassistant.util.percentage import (
- percentage_to_ranged_value,
- ranged_value_to_percentage,
- )
- from .device import TuyaLocalDevice
- from .entity import TuyaLocalEntity
- from .helpers.config import async_tuya_setup_platform
- from .helpers.device_config import TuyaEntityConfig
- _LOGGER = logging.getLogger(__name__)
- async def async_setup_entry(hass, config_entry, async_add_entities):
- config = {**config_entry.data, **config_entry.options}
- await async_tuya_setup_platform(
- hass,
- async_add_entities,
- config,
- "cover",
- TuyaLocalCover,
- )
- class TuyaLocalCover(TuyaLocalEntity, CoverEntity):
- """Representation of a Tuya Cover Entity."""
- def __init__(self, device: TuyaLocalDevice, config: TuyaEntityConfig):
- """
- Initialise the cover device.
- Args:
- device (TuyaLocalDevice): The device API instance
- config (TuyaEntityConfig): The entity config
- """
- super().__init__()
- dps_map = self._init_begin(device, config)
- self._position_dp = dps_map.pop("position", None)
- self._currentpos_dp = dps_map.pop("current_position", None)
- self._tiltpos_dp = dps_map.pop("tilt_position", None)
- self._control_dp = dps_map.pop("control", None)
- self._action_dp = dps_map.pop("action", None)
- self._open_dp = dps_map.pop("open", None)
- self._init_end(dps_map)
- self._support_flags = CoverEntityFeature(0)
- if self._position_dp:
- self._support_flags |= CoverEntityFeature.SET_POSITION
- if self._control_dp:
- if "stop" in self._control_dp.values(self._device):
- self._support_flags |= CoverEntityFeature.STOP
- if "open" in self._control_dp.values(self._device):
- self._support_flags |= CoverEntityFeature.OPEN
- if "close" in self._control_dp.values(self._device):
- self._support_flags |= CoverEntityFeature.CLOSE
- if self._tiltpos_dp:
- self._support_flags |= CoverEntityFeature.SET_TILT_POSITION
- # Open/close/stop tilt not yet supported, as no test devices known
- @property
- def device_class(self):
- """Return the class of ths device"""
- dclass = self._config.device_class
- try:
- return CoverDeviceClass(dclass)
- except ValueError:
- if dclass:
- _LOGGER.warning(
- "%s/%s: Unrecognised cover device class of %s ignored",
- self._config._device.config,
- self.name or "cover",
- dclass,
- )
- @property
- def supported_features(self):
- """Inform HA of the supported features."""
- return self._support_flags
- def _state_to_percent(self, state):
- """Convert a state to percent open"""
- if state == "opened":
- return 100
- elif state == "closed":
- return 0
- else:
- return 50
- @property
- def current_cover_position(self):
- """Return current position of cover."""
- if self._currentpos_dp:
- pos = self._currentpos_dp.get_value(self._device)
- if pos is not None:
- return pos
- if self._open_dp:
- state = self._open_dp.get_value(self._device)
- if state is not None:
- return 100 if state else 0
- if self._action_dp:
- state = self._action_dp.get_value(self._device)
- return self._state_to_percent(state)
- if self._position_dp:
- pos = self._position_dp.get_value(self._device)
- return pos
- @property
- def current_cover_tilt_position(self):
- """Return current tilt position of cover."""
- if self._tiltpos_dp:
- r = self._tiltpos_dp.range(self._device)
- val = self._tiltpos_dp.get_value(self._device)
- if r and val is not None:
- return ranged_value_to_percentage(r, val)
- return val
- @property
- def _current_state(self):
- """Return the current state of the cover if it can be determined,
- or None if it is inconclusive.
- """
- if self._action_dp:
- action = self._action_dp.get_value(self._device)
- if action in ["opening", "closing", "opened", "closed"]:
- return action
- pos = self.current_cover_position
- if pos is None:
- return None
- if pos < 5:
- return "closed"
- elif pos > 95:
- return "opened"
- if self._currentpos_dp and self._position_dp:
- setpos = self._position_dp.get_value(self._device)
- if setpos == pos:
- # if the current position is around the set position,
- # which is not closed, then we want is_closed to return
- # false, so HA gets the full state from position.
- return "opened"
- if self._control_dp:
- cmd = self._control_dp.get_value(self._device)
- if cmd == "open":
- return "opening"
- elif cmd == "close":
- return "closing"
- @property
- def is_opening(self):
- """Return if the cover is opening or not."""
- state = self._current_state
- if state is None:
- # If we return false, and is_closing and is_opening are also false,
- # HA assumes open. If we don't know, return None.
- return None
- else:
- return state == "opening"
- @property
- def is_closing(self):
- """Return if the cover is closing or not."""
- state = self._current_state
- if state is None:
- # If we return false, and is_closing and is_opening are also false,
- # HA assumes open. If we don't know, return None.
- return None
- else:
- return state == "closing"
- @property
- def is_closed(self):
- """Return if the cover is closed or not, if it can be determined."""
- state = self._current_state
- if state is None:
- # If we return false, and is_closing and is_opening are also false,
- # HA assumes open. If we don't know, return None.
- return None
- else:
- return state == "closed"
- async def async_open_cover(self, **kwargs):
- """Open the cover."""
- if self._control_dp and "open" in self._control_dp.values(self._device):
- await self._control_dp.async_set_value(self._device, "open")
- elif self._position_dp:
- pos = 100
- await self._position_dp.async_set_value(self._device, pos)
- else:
- raise NotImplementedError()
- async def async_close_cover(self, **kwargs):
- """Close the cover."""
- if self._control_dp and "close" in self._control_dp.values(self._device):
- await self._control_dp.async_set_value(self._device, "close")
- elif self._position_dp:
- pos = 0
- await self._position_dp.async_set_value(self._device, pos)
- else:
- raise NotImplementedError()
- async def async_set_cover_position(self, position, **kwargs):
- """Set the cover to a specific position."""
- if position is None:
- raise AttributeError()
- if self._position_dp:
- await self._position_dp.async_set_value(self._device, position)
- else:
- raise NotImplementedError()
- async def async_set_cover_tilt_position(self, tilt_position, **kwargs):
- """Set the cover tilt position."""
- if self._tiltpos_dp:
- # If there is a fixed list of values, snap to the closest one
- if self._tiltpos_dp.values(self._device):
- tilt_position = min(
- self._tiltpos_dp.values(self._device),
- key=lambda x: abs(x - tilt_position),
- )
- elif self._tiltpos_dp.range(self._device):
- r = self._tiltpos_dp.range(self._device)
- tilt_position = percentage_to_ranged_value(r, tilt_position)
- await self._tiltpos_dp.async_set_value(self._device, tilt_position)
- else:
- raise NotImplementedError
- async def async_stop_cover(self, **kwargs):
- """Stop the cover."""
- if self._control_dp and "stop" in self._control_dp.values(self._device):
- await self._control_dp.async_set_value(self._device, "stop")
- else:
- raise NotImplementedError()
|