4
0

vacuum.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. """
  2. Platform to control Tuya robot vacuums.
  3. """
  4. from homeassistant.components.vacuum import (
  5. SERVICE_CLEAN_SPOT,
  6. SERVICE_RETURN_TO_BASE,
  7. STATE_CLEANING,
  8. STATE_DOCKED,
  9. STATE_RETURNING,
  10. STATE_ERROR,
  11. SUPPORT_BATTERY,
  12. SUPPORT_FAN_SPEED,
  13. SUPPORT_CLEAN_SPOT,
  14. SUPPORT_LOCATE,
  15. SUPPORT_PAUSE,
  16. SUPPORT_RETURN_HOME,
  17. SUPPORT_SEND_COMMAND,
  18. SUPPORT_START,
  19. SUPPORT_STATE,
  20. SUPPORT_STATUS,
  21. SUPPORT_TURN_ON,
  22. SUPPORT_TURN_OFF,
  23. StateVacuumEntity,
  24. )
  25. from ..device import TuyaLocalDevice
  26. from ..helpers.device_config import TuyaEntityConfig
  27. from ..helpers.mixin import TuyaLocalEntity
  28. class TuyaLocalVacuum(TuyaLocalEntity, StateVacuumEntity):
  29. """Representation of a Tuya Vacuum Cleaner"""
  30. def __init__(self, device: TuyaLocalDevice, config: TuyaEntityConfig):
  31. """
  32. Initialise the sensor.
  33. Args:
  34. device (TuyaLocalDevice): the device API instance.
  35. config (TuyaEntityConfig): the configuration for this entity
  36. """
  37. dps_map = self._init_begin(device, config)
  38. self._status_dps = dps_map.get("status")
  39. self._locate_dps = dps_map.get("locate")
  40. self._power_dps = dps_map.get("power")
  41. self._active_dps = dps_map.get("activate")
  42. self._battery_dps = dps_map.pop("battery", None)
  43. self._direction_dps = dps_map.get("direction_control")
  44. self._error_dps = dps_map.get("error")
  45. self._fan_dps = dps_map.pop("fan_speed", None)
  46. if self._status_dps is None:
  47. raise AttributeError(f"{config.name} is missing a status dps")
  48. self._init_end(dps_map)
  49. @property
  50. def supported_features(self):
  51. """Return the features supported by this vacuum cleaner."""
  52. support = SUPPORT_STATE | SUPPORT_STATUS | SUPPORT_SEND_COMMAND
  53. if self._battery_dps:
  54. support |= SUPPORT_BATTERY
  55. if self._fan_dps:
  56. support |= SUPPORT_FAN_SPEED
  57. if self._power_dps:
  58. support |= SUPPORT_TURN_ON | SUPPORT_TURN_OFF
  59. if self._active_dps:
  60. support |= SUPPORT_START | SUPPORT_PAUSE
  61. if self._locate_dps:
  62. support |= SUPPORT_LOCATE
  63. status_support = self._status_dps.values(self._device)
  64. if SERVICE_RETURN_TO_BASE in status_support:
  65. support |= SUPPORT_RETURN_HOME
  66. if SERVICE_CLEAN_SPOT in status_support:
  67. support |= SUPPORT_CLEAN_SPOT
  68. return support
  69. @property
  70. def battery_level(self):
  71. """Return the battery level of the vacuum cleaner."""
  72. if self._battery_dps:
  73. return self._battery_dps.get_value(self._device)
  74. @property
  75. def status(self):
  76. """Return the status of the vacuum cleaner."""
  77. return self._status_dps.get_value(self._device)
  78. @property
  79. def state(self):
  80. """Return the state of the vacuum cleaner."""
  81. status = self._status_dps.get_value(self._device)
  82. if self._error_dps and self._error_dps.get_value(self._device) != 0:
  83. return STATE_ERROR
  84. elif status == SERVICE_RETURN_TO_BASE:
  85. return STATE_RETURNING
  86. elif status == "standby":
  87. return STATE_DOCKED
  88. elif self._power_dps and not self._power_dps.get_value(self._device):
  89. return STATE_DOCKED
  90. elif self._active_dps and not self._active_dps.get_value(self._device):
  91. return STATE_DOCKED
  92. else:
  93. return STATE_CLEANING
  94. async def async_turn_on(self, **kwargs):
  95. """Turn on the vacuum cleaner."""
  96. if self._power_dps:
  97. await self._power_dps.async_set_value(self._device, True)
  98. async def async_turn_off(self, **kwargs):
  99. """Turn off the vacuum cleaner."""
  100. if self._power_dps:
  101. await self._power_dps.async_set_value(self._device, False)
  102. async def async_toggle(self, **kwargs):
  103. """Toggle the vacuum cleaner."""
  104. dps = self._power_dps
  105. if not dps:
  106. dps = self._activate_dps
  107. if dps:
  108. switch_to = not dps.get_value(self._device)
  109. await dps.async_set_value(self._device, switch_to)
  110. async def async_start(self):
  111. if self._active_dps:
  112. await self._active_dps.async_set_value(self._device, True)
  113. async def async_pause(self):
  114. """Pause the vacuum cleaner."""
  115. if self._active_dps:
  116. await self._active_dps.async_set_value(self._device, False)
  117. async def async_return_to_base(self, **kwargs):
  118. """Tell the vacuum cleaner to return to its base."""
  119. if self._status_dps and SERVICE_RETURN_TO_BASE in self._status_dps.values(
  120. self._device
  121. ):
  122. await self._status_dps.async_set_value(self._device, SERVICE_RETURN_TO_BASE)
  123. async def async_clean_spot(self, **kwargs):
  124. """Tell the vacuum cleaner do a spot clean."""
  125. if self._status_dps and SERVICE_CLEAN_SPOT in self._status_dps.values(
  126. self._device
  127. ):
  128. await self._status_dps.async_set_value(self._device, SERVICE_CLEAN_SPOT)
  129. async def async_locate(self, **kwargs):
  130. """Locate the vacuum cleaner."""
  131. if self._locate_dps:
  132. await self._locate_dps.async_set_value(self._device, True)
  133. async def async_send_command(self, command, params=None, **kwargs):
  134. """Send a command to the vacuum cleaner."""
  135. if command in self._status_dps.values(self._device):
  136. await self._status_dps.async_set_value(self._device, command)
  137. elif self._direction_dps and command in self._direction_dps.values(
  138. self._device
  139. ):
  140. await self._direction_dps.async_set_value(self._device, command)
  141. @property
  142. def fan_speed_list(self):
  143. """Return the list of fan speeds supported"""
  144. if self._fan_dps:
  145. return self._fan_dps.values(self._device)
  146. @property
  147. def fan_speed(self):
  148. """Return the current fan speed"""
  149. if self._fan_dps:
  150. return self._fan_dps.get_value(self._device)
  151. async def async_set_fan_speed(self, fan_speed, **kwargs):
  152. """Set the fan speed of the vacuum."""
  153. if self._fan_dps:
  154. await self._fan_dps.async_set_value(self._device, fan_speed)