config_flow.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. import asyncio
  2. import logging
  3. import voluptuous as vol
  4. from homeassistant import config_entries
  5. from homeassistant.const import CONF_HOST, CONF_NAME
  6. from homeassistant.core import HomeAssistant, callback
  7. from . import DOMAIN
  8. from .device import TuyaLocalDevice
  9. from .const import (
  10. API_PROTOCOL_VERSIONS,
  11. CONF_DEVICE_ID,
  12. CONF_LOCAL_KEY,
  13. CONF_POLL_ONLY,
  14. CONF_PROTOCOL_VERSION,
  15. CONF_TYPE,
  16. )
  17. from .helpers.device_config import get_config
  18. from .helpers.log import log_json
  19. _LOGGER = logging.getLogger(__name__)
  20. class ConfigFlowHandler(config_entries.ConfigFlow, domain=DOMAIN):
  21. VERSION = 12
  22. CONNECTION_CLASS = config_entries.CONN_CLASS_LOCAL_PUSH
  23. device = None
  24. data = {}
  25. async def async_step_user(self, user_input=None):
  26. errors = {}
  27. devid_opts = {}
  28. host_opts = {"default": "Auto"}
  29. key_opts = {}
  30. proto_opts = {"default": "auto"}
  31. polling_opts = {"default": False}
  32. if user_input is not None:
  33. await self.async_set_unique_id(user_input[CONF_DEVICE_ID])
  34. self._abort_if_unique_id_configured()
  35. self.device = await async_test_connection(user_input, self.hass)
  36. if self.device:
  37. self.data = user_input
  38. return await self.async_step_select_type()
  39. else:
  40. errors["base"] = "connection"
  41. devid_opts["default"] = user_input[CONF_DEVICE_ID]
  42. host_opts["default"] = user_input[CONF_HOST]
  43. key_opts["default"] = user_input[CONF_LOCAL_KEY]
  44. proto_opts["default"] = user_input[CONF_PROTOCOL_VERSION]
  45. polling_opts["default"] = user_input[CONF_POLL_ONLY]
  46. return self.async_show_form(
  47. step_id="user",
  48. data_schema=vol.Schema(
  49. {
  50. vol.Required(CONF_DEVICE_ID, **devid_opts): str,
  51. vol.Required(CONF_HOST, **host_opts): str,
  52. vol.Required(CONF_LOCAL_KEY, **key_opts): str,
  53. vol.Required(
  54. CONF_PROTOCOL_VERSION,
  55. **proto_opts,
  56. ): vol.In(["auto"] + API_PROTOCOL_VERSIONS),
  57. vol.Required(CONF_POLL_ONLY, **polling_opts): bool,
  58. }
  59. ),
  60. errors=errors,
  61. )
  62. async def async_step_select_type(self, user_input=None):
  63. if user_input is not None:
  64. self.data[CONF_TYPE] = user_input[CONF_TYPE]
  65. return await self.async_step_choose_entities()
  66. types = []
  67. best_match = 0
  68. best_matching_type = None
  69. async for type in self.device.async_possible_types():
  70. types.append(type.config_type)
  71. q = type.match_quality(self.device._get_cached_state())
  72. if q > best_match:
  73. best_match = q
  74. best_matching_type = type.config_type
  75. if best_match < 100:
  76. best_match = int(best_match)
  77. dps = self.device._get_cached_state()
  78. _LOGGER.warning(
  79. "Device matches %s with quality of %d%%. DPS: %s",
  80. best_matching_type,
  81. best_match,
  82. log_json(dps),
  83. )
  84. _LOGGER.warning(
  85. "Report this to https://github.com/make-all/tuya-local/issues/"
  86. )
  87. if types:
  88. return self.async_show_form(
  89. step_id="select_type",
  90. data_schema=vol.Schema(
  91. {
  92. vol.Required(
  93. CONF_TYPE,
  94. default=best_matching_type,
  95. ): vol.In(types),
  96. }
  97. ),
  98. )
  99. else:
  100. return self.async_abort(reason="not_supported")
  101. async def async_step_choose_entities(self, user_input=None):
  102. if user_input is not None:
  103. title = user_input[CONF_NAME]
  104. del user_input[CONF_NAME]
  105. return self.async_create_entry(
  106. title=title, data={**self.data, **user_input}
  107. )
  108. config = get_config(self.data[CONF_TYPE])
  109. schema = {vol.Required(CONF_NAME, default=config.name): str}
  110. return self.async_show_form(
  111. step_id="choose_entities",
  112. data_schema=vol.Schema(schema),
  113. )
  114. @staticmethod
  115. @callback
  116. def async_get_options_flow(config_entry):
  117. return OptionsFlowHandler(config_entry)
  118. class OptionsFlowHandler(config_entries.OptionsFlow):
  119. def __init__(self, config_entry):
  120. """Initialize options flow."""
  121. self.config_entry = config_entry
  122. async def async_step_init(self, user_input=None):
  123. return await self.async_step_user(user_input)
  124. async def async_step_user(self, user_input=None):
  125. """Manage the options."""
  126. errors = {}
  127. config = {**self.config_entry.data, **self.config_entry.options}
  128. if user_input is not None:
  129. config = {**config, **user_input}
  130. device = await async_test_connection(config, self.hass)
  131. if device:
  132. return self.async_create_entry(title="", data=user_input)
  133. else:
  134. errors["base"] = "connection"
  135. schema = {
  136. vol.Required(
  137. CONF_LOCAL_KEY,
  138. default=config.get(CONF_LOCAL_KEY, ""),
  139. ): str,
  140. vol.Required(CONF_HOST, default=config.get(CONF_HOST, "")): str,
  141. vol.Required(
  142. CONF_PROTOCOL_VERSION,
  143. default=config.get(CONF_PROTOCOL_VERSION, "auto"),
  144. ): vol.In(["auto"] + API_PROTOCOL_VERSIONS),
  145. vol.Required(
  146. CONF_POLL_ONLY, default=config.get(CONF_POLL_ONLY, False)
  147. ): bool,
  148. }
  149. cfg = get_config(config[CONF_TYPE])
  150. if cfg is None:
  151. return self.async_abort(reason="not_supported")
  152. return self.async_show_form(
  153. step_id="user",
  154. data_schema=vol.Schema(schema),
  155. errors=errors,
  156. )
  157. async def async_test_connection(config: dict, hass: HomeAssistant):
  158. domain_data = hass.data.get(DOMAIN)
  159. existing = domain_data.get(config[CONF_DEVICE_ID]) if domain_data else None
  160. if existing:
  161. existing["device"].pause()
  162. await asyncio.sleep(5)
  163. try:
  164. device = TuyaLocalDevice(
  165. "Test",
  166. config[CONF_DEVICE_ID],
  167. config[CONF_HOST],
  168. config[CONF_LOCAL_KEY],
  169. config[CONF_PROTOCOL_VERSION],
  170. hass,
  171. True,
  172. )
  173. await device.async_refresh()
  174. retval = device if device.has_returned_state else None
  175. except Exception as e:
  176. _LOGGER.warning("Connection test failed with %s %s", type(e), e)
  177. retval = None
  178. if existing:
  179. existing["device"].resume()
  180. return retval