From dd1b7d256f08ea6b8ddb72f34cd202518e53f933 Mon Sep 17 00:00:00 2001 From: - <-> Date: Tue, 24 Aug 2021 09:33:19 +0200 Subject: [PATCH] 0.7: fixed sml time and added a workaround for devices that report texts with a scaler --- src/smllib/__version__.py | 2 +- src/smllib/sml/time.py | 33 +++++++++++++++------------------ src/smllib/sml_fields.py | 16 +++++++++++----- tests/test_complete.py | 5 ++++- tests/test_sml_fields.py | 19 ++++++++++++++++++- tests/test_sml_time.py | 13 +++++++++++++ 6 files changed, 62 insertions(+), 26 deletions(-) create mode 100644 tests/test_sml_time.py diff --git a/src/smllib/__version__.py b/src/smllib/__version__.py index 27fda16..e220fa9 100644 --- a/src/smllib/__version__.py +++ b/src/smllib/__version__.py @@ -1 +1 @@ -__version__ = '0.6' +__version__ = '0.7' diff --git a/src/smllib/sml/time.py b/src/smllib/sml/time.py index 38bb75b..c20ea2c 100644 --- a/src/smllib/sml/time.py +++ b/src/smllib/sml/time.py @@ -1,25 +1,20 @@ -from datetime import datetime, timedelta, tzinfo -from typing import Optional +from datetime import datetime, timedelta +from typing import Union -class SmlTzOffset(tzinfo): - def __init__(self, utc_offset: int, dst_offset: int): - self._utc_offset = timedelta(minutes=utc_offset) - self._dst_offset = timedelta(minutes=dst_offset) - - def tzname(self, dt: Optional[datetime]) -> Optional[str]: - return 'SmlTzOffset' - - def utcoffset(self, dt: Optional[datetime]) -> Optional[timedelta]: - return self._utc_offset +class SmlTime: + HINT = Union[None, int, datetime] - def dst(self, dt: Optional[datetime]) -> Optional[timedelta]: - return self._dst_offset + @staticmethod + def from_list(_in): + if _in is None: + return _in + # This is a workaround for times that are not reported according to specification + # Instead of a choice list these devices report just the timestamp - however I am unsure about it. + if isinstance(_in, int): + return _in -class SmlTime: - @classmethod - def from_list(cls, _in): _t = _in[0] _v = _in[1] if _t == 1: @@ -27,4 +22,6 @@ def from_list(cls, _in): if _t == 2: return datetime.fromtimestamp(_v) if _t == 3: - return datetime.fromtimestamp(_v[0], tz=SmlTzOffset(_v[1], _v[2])) + return datetime.fromtimestamp(_v[0]) + timedelta(minutes=_v[1]) + timedelta(minutes=_v[2]) + + raise ValueError(f'Can not build SmlTime from {_in}') diff --git a/src/smllib/sml_fields.py b/src/smllib/sml_fields.py index e89d845..6e069fc 100644 --- a/src/smllib/sml_fields.py +++ b/src/smllib/sml_fields.py @@ -145,6 +145,10 @@ def from_list(cls, _in): if _in[5] is None: raise ValueError('value is required!') + # Convert Time + if _in[2] is not None: + _in[2] = SmlTime.from_list(_in[2]) + # Maybe it's ascii so we try to decode it if isinstance(_in[5], str): v = a2b_hex(_in[5]).decode(errors='ignore') @@ -153,11 +157,12 @@ def from_list(cls, _in): return cls(*_in) - def __init__(self, obis: str, status: Optional[int], val_time, unit: Optional[int], scaler: Optional[int] = None, - value: Union[None, str, int, float] = None, value_signature: Optional[str] = None): + def __init__(self, obis: str, status: Optional[int], val_time: SmlTime.HINT, unit: Optional[int], + scaler: Optional[int] = None, value: Union[None, str, int, float] = None, + value_signature: Optional[str] = None): self.obis = obis self.status: Optional[int] = status - self.val_time = val_time + self.val_time: SmlTime.HINT = val_time self.unit: Optional[int] = unit self.scaler: Optional[int] = scaler self.value = value @@ -191,8 +196,9 @@ def format_msg(self, indent: int = 0): r += f'{INDENT*indent}-> {summary:s}\n' return r - def get_value(self) -> float: - if self.scaler is None: + def get_value(self) -> Union[float, str]: + # Some devices report the texts with scaler 0 + if self.scaler is None or isinstance(self.value, str): return self.value return round(self.value * 10**self.scaler, abs(self.scaler) + 3) diff --git a/tests/test_complete.py b/tests/test_complete.py index 54082b9..4c2bccf 100644 --- a/tests/test_complete.py +++ b/tests/test_complete.py @@ -26,7 +26,7 @@ b'162085200630138017707010051071aff01016208520063011101770701000e0700ff0101622c52ff6301f40177070100000200' b'000101010109312e30322e3030370177070100605a02010101010105413031410177070100600500ff0101010165001c8104010' b'10163fc1e00760400000362006200726500000201710163e8230000001b1b1b1b1a0222ed', - id='Frame1' + id='Frame2' ), ) ) @@ -43,3 +43,6 @@ def test_frames(frame): obis_values = frame.get_obis() assert len(obis_values) >= 4, obis_values + + for obis in obis_values: + obis.get_value() diff --git a/tests/test_sml_fields.py b/tests/test_sml_fields.py index 77824f1..3938e2b 100644 --- a/tests/test_sml_fields.py +++ b/tests/test_sml_fields.py @@ -4,7 +4,7 @@ from smllib.sml_frame import SmlFrame -def test_sml_fileds(): +def test_sml_fields(): f = SmlFrame(a2b_hex('77078181c78203ff010101010449534b0177070100000009ff010101010b')) val_list = f.get_value(0) for i, _ in enumerate(val_list): @@ -25,3 +25,20 @@ def test_sml_fileds(): assert o.scaler == -1 assert o.value == 17965876 assert o.get_value() == 1796587.6 + + +def test_val_time(): + # Frame where time is None + f = SmlFrame(a2b_hex('77070100600100ff010101010b0a01484c5902000424a001')) + val_list = f.get_value(0) + f._parse_msg(val_list) + o = SmlListEntry.from_list(val_list) + assert o.val_time is None + + # Frame where secIndex == 1 and time == 0 + # -> 7262016200 + f = SmlFrame(a2b_hex('77070100600100ff017262016200620052000b0a01445a47000282c0b001')) + val_list = f.get_value(0) + f._parse_msg(val_list) + o = SmlListEntry.from_list(val_list) + assert o.val_time == 0 diff --git a/tests/test_sml_time.py b/tests/test_sml_time.py new file mode 100644 index 0000000..73c1a53 --- /dev/null +++ b/tests/test_sml_time.py @@ -0,0 +1,13 @@ +from datetime import datetime + +from smllib.sml import SmlTime + + +def test_sml_time(): + assert SmlTime.from_list(None) is None + assert SmlTime.from_list([1, 253]) == 253 + assert SmlTime.from_list([2, 1609466461]) == datetime(2021, 1, 1, 3, 1, 1) + + assert SmlTime.from_list([3, [1609466461, 120, 0]]) == datetime(2021, 1, 1, 5, 1, 1) + assert SmlTime.from_list([3, [1622509261, 120, 60]]) == datetime(2021, 6, 1, 6, 1, 1) + assert SmlTime.from_list([3, [1622509261, 120, 30]]) == datetime(2021, 6, 1, 5, 31, 1)