Skip to content

Commit

Permalink
Merge pull request #44 from hostcc/bug/history-parsing
Browse files Browse the repository at this point in the history
fix: Better processing of history entries
  • Loading branch information
hostcc authored Dec 14, 2024
2 parents ad46b55 + a8e0262 commit 443baef
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 36 deletions.
7 changes: 7 additions & 0 deletions src/pyg90alarm/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,10 @@ class G90NotificationTypes(IntEnum):
Defines types of notifications sent by the alarm panel.
"""
ARM_DISARM = 1
SENSOR_ADDED = 4
SENSOR_ACTIVITY = 5
DOOR_OPEN_WHEN_ARMING = 6
FIRMWARE_UPDATING = 8


class G90ArmDisarmTypes(IntEnum):
Expand Down Expand Up @@ -195,7 +198,10 @@ class G90AlertSources(IntEnum):
"""
DEVICE = 0
SENSOR = 1
REMOTE = 10
RFID = 11
DOORBELL = 12
FINGERPRINT = 15


class G90AlertStates(IntEnum):
Expand All @@ -204,6 +210,7 @@ class G90AlertStates(IntEnum):
"""
DOOR_CLOSE = 0
DOOR_OPEN = 1
SOS = 2
TAMPER = 3
LOW_BATTERY = 4

Expand Down
100 changes: 66 additions & 34 deletions src/pyg90alarm/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
"""
History protocol entity.
"""
import logging

from typing import Any, Optional, Dict
from dataclasses import dataclass
from datetime import datetime, timezone
Expand All @@ -33,12 +35,13 @@
)
from .device_notifications import G90DeviceAlert

_LOGGER = logging.getLogger(__name__)


# The state of the incoming history entries are mixed of `G90AlertStates` and
# `G90AlertStateChangeTypes`, depending on entry type - the mapping
# consilidates them into unified `G90HistoryStates`. The latter enum can't be
# just an union of former two, since those have conflicting values
states_mapping = {
# `G90AlertStateChangeTypes`, depending on entry type - hence two separate
# dictionaries, since enums used for keys have conflicting values
states_mapping_alerts = {
G90AlertStates.DOOR_CLOSE:
G90HistoryStates.DOOR_CLOSE,
G90AlertStates.DOOR_OPEN:
Expand All @@ -47,6 +50,9 @@
G90HistoryStates.TAMPER,
G90AlertStates.LOW_BATTERY:
G90HistoryStates.LOW_BATTERY,
}

states_mapping_state_changes = {
G90AlertStateChangeTypes.AC_POWER_FAILURE:
G90HistoryStates.AC_POWER_FAILURE,
G90AlertStateChangeTypes.AC_POWER_RECOVER:
Expand Down Expand Up @@ -87,6 +93,7 @@ class G90History:
Represents a history entry from the alarm panel.
"""
def __init__(self, *args: Any, **kwargs: Any):
self._raw_data = args
self._protocol_data = ProtocolData(*args, **kwargs)

@property
Expand All @@ -99,55 +106,79 @@ def datetime(self) -> datetime:
)

@property
def type(self) -> G90AlertTypes:
def type(self) -> Optional[G90AlertTypes]:
"""
Type of the history entry.
"""
return G90AlertTypes(self._protocol_data.type)
try:
return G90AlertTypes(self._protocol_data.type)
except ValueError:
_LOGGER.warning(
"Can't interpret '%s' as alert type (decoded protocol"
" data '%s', raw data '%s')",
self._protocol_data.type, self._protocol_data, self._raw_data
)
return None

@property
def state(self) -> G90HistoryStates:
def state(self) -> Optional[G90HistoryStates]:
"""
State for the history entry.
"""
# Door open/close type, mapped against `G90AlertStates` using `state`
# incoming field
if self.type == G90AlertTypes.DOOR_OPEN_CLOSE:
return G90HistoryStates(
states_mapping[G90AlertStates(self._protocol_data.state)]
try:
# Door open/close or alert types, mapped against `G90AlertStates`
# using `state` incoming field
if self.type in [
G90AlertTypes.DOOR_OPEN_CLOSE, G90AlertTypes.ALARM
]:
return G90HistoryStates(
states_mapping_alerts[
G90AlertStates(self._protocol_data.state)
]
)
except ValueError:
_LOGGER.warning(
"Can't interpret '%s' as alert state (decoded protocol"
" data '%s', raw data '%s')",
self._protocol_data.state, self._protocol_data, self._raw_data
)
return None

# Device state change, mapped against `G90AlertStateChangeTypes` using
# `event_id` incoming field
if self.type == G90AlertTypes.STATE_CHANGE:
try:
# Other types are mapped against `G90AlertStateChangeTypes`
return G90HistoryStates(
states_mapping[
states_mapping_state_changes[
G90AlertStateChangeTypes(self._protocol_data.event_id)
]
)

# Alarm gets mapped to its counterpart in `G90HistoryStates`
if self.type == G90AlertTypes.ALARM:
return G90HistoryStates.ALARM

# Other types are mapped against `G90AlertStateChangeTypes`
return G90HistoryStates(
states_mapping[
G90AlertStateChangeTypes(self._protocol_data.event_id)
]
)
except ValueError:
_LOGGER.warning(
"Can't interpret '%s' as state change (decoded protocol"
" data '%s', raw data '%s')",
self._protocol_data.event_id, self._protocol_data,
self._raw_data
)
return None

@property
def source(self) -> G90AlertSources:
def source(self) -> Optional[G90AlertSources]:
"""
Source of the history entry.
"""
# Device state changes or open/close events are mapped against
# `G90AlertSources` using `source` incoming field
if self.type in [
G90AlertTypes.STATE_CHANGE, G90AlertTypes.DOOR_OPEN_CLOSE
]:
return G90AlertSources(self._protocol_data.source)
try:
# Device state changes or open/close events are mapped against
# `G90AlertSources` using `source` incoming field
if self.type in [
G90AlertTypes.STATE_CHANGE, G90AlertTypes.DOOR_OPEN_CLOSE
]:
return G90AlertSources(self._protocol_data.source)
except ValueError:
_LOGGER.warning(
"Can't interpret '%s' as alert source (decoded protocol"
" data '%s', raw data '%s')",
self._protocol_data.source, self._protocol_data, self._raw_data
)
return None

# Alarm will have `SENSOR` as the source, since that is likely what
# triggered it
Expand Down Expand Up @@ -182,6 +213,7 @@ def as_device_alert(self) -> G90DeviceAlert:
Returns the history entry represented as device alert structure,
suitable for :meth:`G90DeviceNotifications._handle_alert`.
"""

return G90DeviceAlert(
type=self._protocol_data.type,
event_id=self._protocol_data.event_id,
Expand Down
31 changes: 29 additions & 2 deletions tests/test_alarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ async def test_disarm(mock_device: DeviceMock) -> None:

@pytest.mark.g90device(sent_data=[
b'ISTART[200,[[50,1,5],'
b'[3,33,7,254,"Sensor 1",1630147285,""],'
b'[3,33,7,1,"Sensor 1",1630147285,""],'
b'[2,3,0,0,"",1630142877,""],'
b'[2,5,0,0,"",1630142871,""],'
b'[2,4,0,0,"",1630142757,""],'
Expand All @@ -595,6 +595,33 @@ async def test_history(mock_device: DeviceMock) -> None:
assert isinstance(history[0]._asdict(), dict)


@pytest.mark.g90device(sent_data=[
b'ISTART[200,[[3,1,3],'
# Wrong state
b'[3,33,7,254,"Sensor 1",1630147285,""],'
# Wrong source
b'[2,33,254,1,"Sensor 1",1630147285,""],'
# Wrong type
b'[254,33,1,1,"Sensor 1",1630147285,""]'
b']]IEND\0',
])
async def test_history_parsing_error(mock_device: DeviceMock) -> None:
"""
Tests for processing history from the device, when the parsing error
occurs.
"""
g90 = G90Alarm(host=mock_device.host, port=mock_device.port)
history = await g90.history(count=5)
assert len(history) == 3
assert isinstance(history[0], G90History)
assert isinstance(history[0]._asdict(), dict)
# Wrong entry element should result in corresponding key having 'None'
# value
assert history[0]._asdict()['state'] is None
assert history[1]._asdict()['source'] is None
assert history[2]._asdict()['type'] is None


@pytest.mark.g90device(sent_data=[
# Simulate empty history initially
b'ISTART[200,[[0,0,0]]]IEND\0',
Expand All @@ -606,7 +633,7 @@ async def test_history(mock_device: DeviceMock) -> None:
# The records will be used to simulate the device alerts, but only for
# those newer that one above
b'ISTART[200,[[3,1,3],'
b'[3,33,7,254,"Sensor 1",1630147285,""],'
b'[3,33,7,1,"Sensor 1",1630147285,""],'
b'[2,3,0,0,"",1630142877,""],'
b'[2,5,0,0,"",1630142871,""]'
b']]IEND\0',
Expand Down

0 comments on commit 443baef

Please sign in to comment.