infrared.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. """
  2. Implementation of Tuya infrared control devices
  3. """
  4. import asyncio
  5. import logging
  6. from homeassistant.components.infrared import InfraredCommand, InfraredEntity
  7. from tinytuya.Contrib.IRRemoteControlDevice import IRRemoteControlDevice as IR
  8. from .device import TuyaLocalDevice
  9. from .entity import TuyaLocalEntity
  10. from .helpers.config import async_tuya_setup_platform
  11. from .helpers.device_config import TuyaEntityConfig
  12. _LOGGER = logging.getLogger(__name__)
  13. async def async_setup_entry(hass, entry, async_add_entities):
  14. """Set up the Tuya Local infrared control platform."""
  15. config = {**entry.data, **entry.options}
  16. await async_tuya_setup_platform(
  17. hass,
  18. async_add_entities,
  19. config,
  20. "infrared",
  21. TuyaLocalInfrared,
  22. )
  23. class TuyaLocalInfrared(TuyaLocalEntity, InfraredEntity):
  24. """Representation of a Tuya Local infrared control device."""
  25. def __init__(self, device: TuyaLocalDevice, config: TuyaEntityConfig):
  26. """Initialize the infrared control device."""
  27. super().__init__()
  28. dps_map = self._init_begin(device, config)
  29. self._send_dp = dps_map.pop("send", None)
  30. self._command_dp = dps_map.pop("control", None)
  31. self._type_dp = dps_map.pop("code_type", None)
  32. self._init_end(dps_map)
  33. async def async_send_command(self, command: InfraredCommand) -> None:
  34. """Handle sending an infrared command."""
  35. timings = command.get_raw_timings()
  36. split = {}
  37. raw = []
  38. i = 0
  39. for timing in timings:
  40. if timing.high_us > 65535:
  41. split[i] = timing.high_us - 65535
  42. raw.append(65535)
  43. raw.append(timing.low_us)
  44. elif timing.low_us > 65535:
  45. raw.append(timing.high_us)
  46. raw.append(65535)
  47. split[i + 2] = timing.low_us - 65535
  48. else:
  49. raw.append(timing.high_us)
  50. raw.append(timing.low_us)
  51. i += 2
  52. start = 0
  53. for s, t in split.items():
  54. tuya_command = IR.pulses_to_base64(raw[start:s])
  55. _LOGGER.info(
  56. "%s sending infrared command: %s", self._config.config_id, tuya_command
  57. )
  58. start = s
  59. await self._ir_send(tuya_command)
  60. await asyncio.sleep(t / 1000000)
  61. if start < len(raw):
  62. tuya_command = IR.pulses_to_base64(raw[start:])
  63. _LOGGER.info(
  64. "%s sending infrared command: %s", self._config.config_id, tuya_command
  65. )
  66. await self._ir_send(tuya_command)
  67. async def _ir_send(self, tuya_command: str):
  68. """Send the infrared command to the device."""
  69. if self._send_dp:
  70. if self._command_dp:
  71. await self._device.async_set_properties(
  72. self._package_multi_dp_send(tuya_command)
  73. )
  74. else:
  75. await self._send_dp.async_set_value(
  76. self._device,
  77. self._package_single_dp_send(tuya_command),
  78. )
  79. def _package_single_dp_send(self, command: str) -> str:
  80. """Package the command for a single DP (usually dp id 201) send."""
  81. json_command = {
  82. "control": "send_ir",
  83. "type": 0,
  84. "head": "",
  85. "key1": "1" + command,
  86. }
  87. return json_command
  88. def _package_multi_dp_send(self, command: str) -> dict:
  89. """Package the command for a multi DP send"""
  90. return {
  91. f"{self._command_dp.id}": "send_ir",
  92. f"{self._type_dp.id}": 0,
  93. f"{self._send_dp.id}": command,
  94. }