Skip to content

Commit

Permalink
0.7: fixed sml time and added a workaround for devices that report te…
Browse files Browse the repository at this point in the history
…xts with a scaler
  • Loading branch information
- committed Aug 24, 2021
1 parent cf33071 commit dd1b7d2
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 26 deletions.
2 changes: 1 addition & 1 deletion src/smllib/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.6'
__version__ = '0.7'
33 changes: 15 additions & 18 deletions src/smllib/sml/time.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,27 @@
from datetime import datetime, timedelta, tzinfo
from typing import Optional
from datetime import datetime, timedelta
from typing import Union


class SmlTzOffset(tzinfo):
def __init__(self, utc_offset: int, dst_offset: int):
self._utc_offset = timedelta(minutes=utc_offset)
self._dst_offset = timedelta(minutes=dst_offset)

def tzname(self, dt: Optional[datetime]) -> Optional[str]:
return 'SmlTzOffset'

def utcoffset(self, dt: Optional[datetime]) -> Optional[timedelta]:
return self._utc_offset
class SmlTime:
HINT = Union[None, int, datetime]

def dst(self, dt: Optional[datetime]) -> Optional[timedelta]:
return self._dst_offset
@staticmethod
def from_list(_in):
if _in is None:
return _in

# This is a workaround for times that are not reported according to specification
# Instead of a choice list these devices report just the timestamp - however I am unsure about it.
if isinstance(_in, int):
return _in

class SmlTime:
@classmethod
def from_list(cls, _in):
_t = _in[0]
_v = _in[1]
if _t == 1:
return _v
if _t == 2:
return datetime.fromtimestamp(_v)
if _t == 3:
return datetime.fromtimestamp(_v[0], tz=SmlTzOffset(_v[1], _v[2]))
return datetime.fromtimestamp(_v[0]) + timedelta(minutes=_v[1]) + timedelta(minutes=_v[2])

raise ValueError(f'Can not build SmlTime from {_in}')
16 changes: 11 additions & 5 deletions src/smllib/sml_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ def from_list(cls, _in):
if _in[5] is None:
raise ValueError('value is required!')

# Convert Time
if _in[2] is not None:
_in[2] = SmlTime.from_list(_in[2])

# Maybe it's ascii so we try to decode it
if isinstance(_in[5], str):
v = a2b_hex(_in[5]).decode(errors='ignore')
Expand All @@ -153,11 +157,12 @@ def from_list(cls, _in):

return cls(*_in)

def __init__(self, obis: str, status: Optional[int], val_time, unit: Optional[int], scaler: Optional[int] = None,
value: Union[None, str, int, float] = None, value_signature: Optional[str] = None):
def __init__(self, obis: str, status: Optional[int], val_time: SmlTime.HINT, unit: Optional[int],
scaler: Optional[int] = None, value: Union[None, str, int, float] = None,
value_signature: Optional[str] = None):
self.obis = obis
self.status: Optional[int] = status
self.val_time = val_time
self.val_time: SmlTime.HINT = val_time
self.unit: Optional[int] = unit
self.scaler: Optional[int] = scaler
self.value = value
Expand Down Expand Up @@ -191,8 +196,9 @@ def format_msg(self, indent: int = 0):
r += f'{INDENT*indent}-> {summary:s}\n'
return r

def get_value(self) -> float:
if self.scaler is None:
def get_value(self) -> Union[float, str]:
# Some devices report the texts with scaler 0
if self.scaler is None or isinstance(self.value, str):
return self.value

return round(self.value * 10**self.scaler, abs(self.scaler) + 3)
Expand Down
5 changes: 4 additions & 1 deletion tests/test_complete.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
b'162085200630138017707010051071aff01016208520063011101770701000e0700ff0101622c52ff6301f40177070100000200'
b'000101010109312e30322e3030370177070100605a02010101010105413031410177070100600500ff0101010165001c8104010'
b'10163fc1e00760400000362006200726500000201710163e8230000001b1b1b1b1a0222ed',
id='Frame1'
id='Frame2'
),
)
)
Expand All @@ -43,3 +43,6 @@ def test_frames(frame):

obis_values = frame.get_obis()
assert len(obis_values) >= 4, obis_values

for obis in obis_values:
obis.get_value()
19 changes: 18 additions & 1 deletion tests/test_sml_fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from smllib.sml_frame import SmlFrame


def test_sml_fileds():
def test_sml_fields():
f = SmlFrame(a2b_hex('77078181c78203ff010101010449534b0177070100000009ff010101010b'))
val_list = f.get_value(0)
for i, _ in enumerate(val_list):
Expand All @@ -25,3 +25,20 @@ def test_sml_fileds():
assert o.scaler == -1
assert o.value == 17965876
assert o.get_value() == 1796587.6


def test_val_time():
# Frame where time is None
f = SmlFrame(a2b_hex('77070100600100ff010101010b0a01484c5902000424a001'))
val_list = f.get_value(0)
f._parse_msg(val_list)
o = SmlListEntry.from_list(val_list)
assert o.val_time is None

# Frame where secIndex == 1 and time == 0
# -> 7262016200
f = SmlFrame(a2b_hex('77070100600100ff017262016200620052000b0a01445a47000282c0b001'))
val_list = f.get_value(0)
f._parse_msg(val_list)
o = SmlListEntry.from_list(val_list)
assert o.val_time == 0
13 changes: 13 additions & 0 deletions tests/test_sml_time.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from datetime import datetime

from smllib.sml import SmlTime


def test_sml_time():
assert SmlTime.from_list(None) is None
assert SmlTime.from_list([1, 253]) == 253
assert SmlTime.from_list([2, 1609466461]) == datetime(2021, 1, 1, 3, 1, 1)

assert SmlTime.from_list([3, [1609466461, 120, 0]]) == datetime(2021, 1, 1, 5, 1, 1)
assert SmlTime.from_list([3, [1622509261, 120, 60]]) == datetime(2021, 6, 1, 6, 1, 1)
assert SmlTime.from_list([3, [1622509261, 120, 30]]) == datetime(2021, 6, 1, 5, 31, 1)

0 comments on commit dd1b7d2

Please sign in to comment.