-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
3 changed files
with
245 additions
and
53 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,211 @@ | ||
"""Charge-Amps Local API Client""" | ||
|
||
from abc import abstractmethod | ||
from datetime import datetime | ||
from typing import Self | ||
|
||
from pydantic import BaseModel | ||
|
||
|
||
class WebsocketMessage(BaseModel): | ||
message: str | ||
|
||
@classmethod | ||
@abstractmethod | ||
def from_message(cls, message: str) -> Self: | ||
pass | ||
|
||
|
||
class HaloConnectorSettings(WebsocketMessage): | ||
connector_one_enabled: bool | ||
connector_one_rfid_lock: bool | ||
connector_one_max_current: int | ||
connector_one_charge_mode: int | ||
|
||
connector_two_enabled: bool | ||
|
||
dimmer: int | ||
downlight_enabled: bool | ||
|
||
@classmethod | ||
def from_message(cls, message: str) -> Self: | ||
parameters = message.split(",") | ||
assert len(parameters) == 8 | ||
assert int(parameters[0]) == 1 | ||
return cls( | ||
message=message, | ||
connector_one_enabled=bool(int(parameters[1])), | ||
connector_two_enabled=bool(int(parameters[2])), | ||
connector_one_rfid_lock=bool(int(parameters[3])), | ||
downlight_enabled=bool(int(parameters[4])), | ||
connector_one_max_current=int(parameters[5]) // 10, | ||
dimmer=int(parameters[6]), | ||
connector_one_charge_mode=int(parameters[7]), | ||
) | ||
|
||
|
||
class AuraChargePointSettings(WebsocketMessage): | ||
charge_point_total_max_current: int | ||
three_phase_installation: bool | ||
min_current: int | ||
max_current: int | ||
phase_order: int | ||
|
||
connector_one_max_current: int | ||
connector_one_l1_voltage: int | ||
connector_one_l2_voltage: int | ||
connector_one_l3_voltage: int | ||
|
||
connector_two_max_current: int | ||
connector_two_l1_voltage: int | ||
connector_two_l2_voltage: int | ||
connector_two_l3_voltage: int | ||
|
||
@classmethod | ||
def from_message(cls, message: str) -> Self: | ||
parameters = message.split(",") | ||
assert len(parameters) == 15 | ||
assert int(parameters[0]) == 104 | ||
return cls( | ||
message=message, | ||
charge_point_total_max_current=int(parameters[1]), | ||
three_phase_installation=bool(int(parameters[2])), | ||
min_current=int(parameters[3]), | ||
max_current=int(parameters[4]), | ||
phase_order=int(parameters[6]), | ||
connector_one_max_current=int(parameters[7]), | ||
connector_one_l1_voltage=int(parameters[8]), | ||
connector_one_l2_voltage=int(parameters[9]), | ||
connector_one_l3_voltage=int(parameters[10]), | ||
connector_two_max_current=int(parameters[11]), | ||
connector_two_l1_voltage=int(parameters[12]), | ||
connector_two_l2_voltage=int(parameters[13]), | ||
connector_two_l3_voltage=int(parameters[14]), | ||
) | ||
|
||
|
||
class AuraConnectorSettings(WebsocketMessage): | ||
connector_one_enabled: bool | ||
connector_one_cable_lock: bool | ||
connector_one_rfid_lock: bool | ||
connector_one_max_current: int | ||
connector_one_charge_mode: int | ||
|
||
connector_two_enabled: bool | ||
connector_two_cable_lock: bool | ||
connector_two_rfid_lock: bool | ||
connector_two_max_current: int | ||
connector_two_charge_mode: int | ||
|
||
dimmer: int | ||
|
||
@classmethod | ||
def from_message(cls, message: str) -> Self: | ||
parameters = message.split(",") | ||
assert len(parameters) == 12 | ||
assert int(parameters[0]) == 101 | ||
return cls( | ||
message=message, | ||
connector_one_enabled=bool(int(parameters[1])), | ||
connector_one_cable_lock=bool(int(parameters[2])), | ||
connector_one_rfid_lock=bool(int(parameters[3])), | ||
connector_one_max_current=int(parameters[4]) // 10, | ||
connector_one_charge_mode=int(parameters[5]), | ||
connector_two_enabled=bool(int(parameters[6])), | ||
connector_two_cable_lock=bool(int(parameters[7])), | ||
connector_two_rfid_lock=bool(int(parameters[8])), | ||
connector_two_max_current=int(parameters[9]) // 10, | ||
connector_two_charge_mode=int(parameters[10]), | ||
dimmer=int(parameters[11]), | ||
) | ||
|
||
|
||
class ChargePointStatus(WebsocketMessage): | ||
charge_point_id: str | ||
last_update: datetime | ||
|
||
@classmethod | ||
def from_message(cls, message: str) -> Self: | ||
parameters = message.split(",") | ||
assert len(parameters) == 3 | ||
assert int(parameters[0]) == 6 | ||
return cls( | ||
message=message, charge_point_id=parameters[1], last_update=parameters[2] | ||
) | ||
|
||
|
||
class WifiConnectionStatus(WebsocketMessage): | ||
ssid: str | ||
rssi: int | ||
connected: bool | ||
|
||
@classmethod | ||
def from_message(cls, message: str) -> Self: | ||
parameters = message.split(",") | ||
assert len(parameters) == 4 | ||
assert int(parameters[0]) == 7 | ||
return cls( | ||
message=message, | ||
ssid=parameters[1], | ||
rssi=int(parameters[2]), | ||
connected=bool(int(parameters[3])), | ||
) | ||
|
||
|
||
class OcppStatus(WebsocketMessage): | ||
connected: bool | ||
endpoint: str | ||
|
||
@classmethod | ||
def from_message(cls, message: str) -> Self: | ||
parameters = message.split(",") | ||
assert len(parameters) == 3 | ||
assert int(parameters[0]) == 14 | ||
return cls( | ||
message=message, connected=bool(int(parameters[1])), endpoint=parameters[2] | ||
) | ||
|
||
|
||
class ConnectorStatus(WebsocketMessage): | ||
power: int | None = None | ||
current_l1: float | None = None | ||
current_l2: float | None = None | ||
current_l3: float | None = None | ||
charge_session_energy: int | None = None | ||
status: str | None = None | ||
|
||
@classmethod | ||
def from_message(cls, message: str) -> Self: | ||
parameters = message.split(",") | ||
assert len(parameters) == 7 | ||
assert int(parameters[0]) in [8, 108] | ||
return cls( | ||
message=message, | ||
power=int(parameters[1]), | ||
current_l1=int(parameters[2]) / 1000, | ||
current_l2=int(parameters[3]) / 1000, | ||
current_l3=int(parameters[4]) / 1000, | ||
charge_session_energy=int(parameters[5]), | ||
status=parameters[6], | ||
) | ||
|
||
|
||
WEBSOCKET_MESSAGE_CLASSES = { | ||
1: HaloConnectorSettings, | ||
104: AuraChargePointSettings, | ||
101: AuraConnectorSettings, | ||
6: ChargePointStatus, | ||
7: WifiConnectionStatus, | ||
8: ConnectorStatus, | ||
14: OcppStatus, | ||
108: ConnectorStatus, | ||
} | ||
|
||
|
||
def parse_websocket_message(message: str) -> WebsocketMessage: | ||
parameters = message.split(",") | ||
preamble = int(parameters[0]) | ||
try: | ||
return WEBSOCKET_MESSAGE_CLASSES[preamble].from_message(message) | ||
except KeyError as exc: | ||
raise ValueError("Unknown message type") from exc |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters