|
|
@@ -1,11 +1,12 @@
|
|
|
"""
|
|
|
Config parser for Tuya Local devices.
|
|
|
"""
|
|
|
+from base64 import b64decode, b64encode
|
|
|
+
|
|
|
from fnmatch import fnmatch
|
|
|
import logging
|
|
|
from os import walk
|
|
|
from os.path import join, dirname, splitext, exists
|
|
|
-from pydoc import locate
|
|
|
|
|
|
from homeassistant.util import slugify
|
|
|
from homeassistant.util.yaml import load_yaml
|
|
|
@@ -40,6 +41,29 @@ def _scale_range(r, s):
|
|
|
return {"min": r["min"] / s, "max": r["max"] / s}
|
|
|
|
|
|
|
|
|
+_unsignedFmts = {
|
|
|
+ 1: "B",
|
|
|
+ 2: "H",
|
|
|
+ 4: "I",
|
|
|
+}
|
|
|
+
|
|
|
+_signedFmts = {
|
|
|
+ 1: "b",
|
|
|
+ 2: "h",
|
|
|
+ 4: "i",
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+def _bytesToFmt(bytes, signed=False):
|
|
|
+ "Convert a byte count to an unpack format."
|
|
|
+ knownFmts = _signedFmts if signed else _unsignedFmts
|
|
|
+
|
|
|
+ if bytes in knownFmts:
|
|
|
+ return _unsignedFmts[bytes]
|
|
|
+ else:
|
|
|
+ return f"{bytes}s"
|
|
|
+
|
|
|
+
|
|
|
class TuyaDeviceConfig:
|
|
|
"""Representation of a device config for Tuya Local devices."""
|
|
|
|
|
|
@@ -264,10 +288,53 @@ class TuyaDpsConfig:
|
|
|
def name(self):
|
|
|
return self._config["name"]
|
|
|
|
|
|
+ @property
|
|
|
+ def format(self):
|
|
|
+ fmt = self._config.get("format")
|
|
|
+ if fmt:
|
|
|
+ unpack_fmt = ">"
|
|
|
+ ranges = []
|
|
|
+ names = []
|
|
|
+ for f in fmt:
|
|
|
+ name = f.get("name")
|
|
|
+ bytes = f.get("bytes", 1)
|
|
|
+ range = f.get("range")
|
|
|
+ if range:
|
|
|
+ min = range.get("min")
|
|
|
+ max = range.get("max")
|
|
|
+ else:
|
|
|
+ min = 0
|
|
|
+ max = 256 ** bytes - 1
|
|
|
+
|
|
|
+ unpack_fmt = unpack_fmt + _bytesToFmt(bytes, min < 0)
|
|
|
+ ranges.append({"min": min, "max": max})
|
|
|
+ names.append(name)
|
|
|
+ _LOGGER.debug(f"format of {unpack_fmt} found")
|
|
|
+ return {"format": unpack_fmt, "ranges": ranges, "names": names}
|
|
|
+
|
|
|
+ return None
|
|
|
+
|
|
|
def get_value(self, device):
|
|
|
"""Return the value of the dps from the given device."""
|
|
|
return self._map_from_dps(device.get_property(self.id), device)
|
|
|
|
|
|
+ def decoded_value(self, device):
|
|
|
+ v = self.get_value(device)
|
|
|
+ if self.rawtype == "hex":
|
|
|
+ return bytes.fromhex(v)
|
|
|
+ elif self.rawtype == "base64":
|
|
|
+ return b64decode(v)
|
|
|
+ else:
|
|
|
+ return v
|
|
|
+
|
|
|
+ def encode_value(self, v):
|
|
|
+ if self.rawtype == "hex":
|
|
|
+ return v.hex()
|
|
|
+ elif self.rawtype == "base64":
|
|
|
+ return b64encode(v).decode("utf-8")
|
|
|
+ else:
|
|
|
+ return v
|
|
|
+
|
|
|
def _match(self, matchdata, value):
|
|
|
"""Return true val1 matches val2"""
|
|
|
if self.rawtype == "bitfield" and matchdata:
|
|
|
@@ -419,9 +486,12 @@ class TuyaDpsConfig:
|
|
|
self.stringify = False
|
|
|
|
|
|
result = value
|
|
|
+
|
|
|
mapping = self._find_map_for_dps(value)
|
|
|
if mapping:
|
|
|
scale = mapping.get("scale", 1)
|
|
|
+ invert = mapping.get("invert", False)
|
|
|
+
|
|
|
if not isinstance(scale, (int, float)):
|
|
|
scale = 1
|
|
|
redirect = mapping.get("value_redirect")
|
|
|
@@ -454,6 +524,12 @@ class TuyaDpsConfig:
|
|
|
result = result / scale
|
|
|
replaced = True
|
|
|
|
|
|
+ if invert:
|
|
|
+ range = self._config.get("range")
|
|
|
+ if range and "min" in range and "max" in range:
|
|
|
+ result = -1 * result + range["min"] + range["max"]
|
|
|
+ replaced = True
|
|
|
+
|
|
|
if replaced:
|
|
|
_LOGGER.debug(
|
|
|
"%s: Mapped dps %s value from %s to %s",
|
|
|
@@ -512,6 +588,8 @@ class TuyaDpsConfig:
|
|
|
replaced = False
|
|
|
scale = mapping.get("scale", 1)
|
|
|
redirect = mapping.get("value_redirect")
|
|
|
+ invert = mapping.get("invert", False)
|
|
|
+
|
|
|
if not isinstance(scale, (int, float)):
|
|
|
scale = 1
|
|
|
step = mapping.get("step")
|
|
|
@@ -551,6 +629,12 @@ class TuyaDpsConfig:
|
|
|
r_dps = self._entity.find_dps(redirect)
|
|
|
return r_dps.get_values_to_set(device, value)
|
|
|
|
|
|
+ if invert:
|
|
|
+ range = self._config.get("range")
|
|
|
+ if range and "min" in range and "max" in range:
|
|
|
+ result = -1 * result + range["min"] + range["max"]
|
|
|
+ replaced = True
|
|
|
+
|
|
|
if scale != 1 and isinstance(result, (int, float)):
|
|
|
_LOGGER.debug(f"Scaling {result} by {scale}")
|
|
|
result = result * scale
|