Skip to content

Commit

Permalink
sync
Browse files Browse the repository at this point in the history
  • Loading branch information
jschlyter committed Jun 28, 2024
1 parent eb8ae5a commit 69e01b4
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 64 deletions.
161 changes: 106 additions & 55 deletions chargeamps/local.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
"""Charge-Amps Local API Client"""

import logging
import uuid
from datetime import datetime
from typing import Self
from urllib.parse import urlparse

from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel, ConfigDict, Field

from .models import (
ChargePointConnectorMode,
Expand Down Expand Up @@ -41,18 +43,47 @@ def from_url(
)


class LocalChargePointConnectorStatus(BaseModel):
power: int | None = None
energy: int | None = None
status_text: str | None = None


class LocalChargePointStatus(BaseModel):
chargepoint: LocalChargePoint
connector_settings: list[ChargePointConnectorSettings]
connector_status: list[LocalChargePointConnectorStatus]

charge_point_id: str | None = None
connector_settings: list[ChargePointConnectorSettings] | None = None
dimmer: int = 0
light: bool = False
ssid: str | None = None
signal_strength: int | None = None
status_text: str | None = None
last_updated: datetime | None = None

def process_message(self, message: str):
parameters = message.split(",")

match preamble := int(parameters[0]):
case 6:
assert len(parameters) == 3
self.charge_point_id = parameters[1]
self.last_updated = parameters[2]
case 7:
assert len(parameters) == 4
self.ssid = parameters[1]
self.signal_strength = int(parameters[2])
pass
case 8:
assert len(parameters) == 7
self.status_text = parameters[6]
case _:
logging.warning("Unknown preamble %d", preamble)


class LocalChargePointStatusHalo(LocalChargePointStatus):
@staticmethod
def get_default_connectors() -> list[ChargePointConnectorSettings]:
return [
connector_settings: list[ChargePointConnectorSettings] = Field(
default=[
ChargePointConnectorSettings(
charge_point_id="",
connector_id=0,
Expand All @@ -70,26 +101,41 @@ def get_default_connectors() -> list[ChargePointConnectorSettings]:
max_current=None,
),
]
)
connector_status: list[LocalChargePointConnectorStatus] = Field(
default=[
LocalChargePointConnectorStatus(),
LocalChargePointConnectorStatus(),
]
)

def process_message(self, message: str):
parameters = message.split(",")

match int(parameters[0]):
case 1:
assert len(parameters) == 8
values = [int(x) for x in parameters.split(",")]

def update_settings(self, message: str):
parameters = [int(x) for x in message.split(",")]
self.connector_settings[0].mode = "On" if values[1] else "Off"
self.connector_settings[1].mode = "On" if values[2] else "Off"
self.connector_settings[0].rfid_lock = bool(values[3])
self.light = bool(values[4])
self.connector_settings[0].max_current = values[5] // 10
self.dimmer = bool(values[6])
assert values[7] == 4

assert parameters[0] == 1
assert len(parameters) == 8
case 108:
assert len(parameters) == 7
# "108,5698,11853,12212,11936,2,Charging",

self.connector_settings[0].mode = "On" if parameters[1] else "Off"
self.connector_settings[1].mode = "On" if parameters[2] else "Off"
self.connector_settings[0].rfid_lock = bool(parameters[3])
self.light = bool(parameters[4])
self.connector_settings[0].max_current = parameters[5] // 10
self.dimmer = bool(parameters[6])
assert parameters[7] == 4
case _:
return super().process_message(message)


class LocalChargePointStatusAura(LocalChargePointStatus):
@staticmethod
def get_default_connectors() -> list[ChargePointConnectorSettings]:
return [
connector_settings: list[ChargePointConnectorSettings] = Field(
default=[
ChargePointConnectorSettings(
charge_point_id="",
connector_id=0,
Expand All @@ -107,44 +153,49 @@ def get_default_connectors() -> list[ChargePointConnectorSettings]:
max_current=None,
),
]
)
connector_status: list[LocalChargePointConnectorStatus] = Field(
default=[
LocalChargePointConnectorStatus(),
LocalChargePointConnectorStatus(),
]
)

def process_message(self, message: str):
parameters = message.split(",")

match int(parameters[0]):
case 1:
assert len(parameters) == 12
values = [int(x) for x in parameters.split(",")]

def update_settings(self, message: str):
parameters = [int(x) for x in message.split(",")]
self.connector_settings[0].mode = "On" if values[1] else "Off"
self.connector_settings[0].cable_lock = bool(values[2])
self.connector_settings[0].rfid_lock = bool(values[3])
self.connector_settings[0].max_current = values[4] // 10
assert values[5] == 4
self.connector_settings[1].mode = "On" if values[6] else "Off"
self.connector_settings[1].cable_lock = bool(values[7])
self.connector_settings[1].rfid_lock = bool(values[8])
self.connector_settings[1].max_current = values[9] // 10
assert values[10] == 4
self.dimmer = values[11]

assert parameters[0] == 101
assert len(parameters) == 12
case 108:
assert len(parameters) == 7
# "108,5698,11853,12212,11936,2,Charging",

self.connector_settings[0].mode = "On" if parameters[1] else "Off"
self.connector_settings[0].cable_lock = bool(parameters[2])
self.connector_settings[0].rfid_lock = bool(parameters[3])
self.connector_settings[0].max_current = parameters[4] // 10
assert parameters[5] == 4
self.connector_settings[1].mode = "On" if parameters[6] else "Off"
self.connector_settings[1].cable_lock = bool(parameters[7])
self.connector_settings[1].rfid_lock = bool(parameters[8])
self.connector_settings[1].max_current = parameters[9] // 10
assert parameters[10] == 4
self.dimmer = parameters[11]
case _:
return super().process_message(message)


class ChargeAmpsLocalClient: # (ChargeAmpsClient):
def __init__(self, chargepoints: list[LocalChargePoint]) -> None:
self.chargepoints: list[LocalChargePoint] = chargepoints
self.status: list[LocalChargePointStatus] = []

for cp in self.chargepoints:
match cp.type:
case ChargePointType.HALO:
self.status.append(
LocalChargePointStatusHalo(
chargepoint=cp,
connector_settings=LocalChargePointStatusHalo.get_default_connectors(),
)
)
case ChargePointType.AURA:
self.status.append(
LocalChargePointStatusAura(
chargepoint=cp,
connector_settings=LocalChargePointStatusAura.get_default_connectors(),
)
)
def __init__(self, chargepoint: LocalChargePoint) -> None:
self.chargepoint = chargepoint

match self.chargepoint.type:
case ChargePointType.HALO:
self.status = LocalChargePointStatusHalo()

case ChargePointType.AURA:
self.status = LocalChargePointStatusAura()
38 changes: 29 additions & 9 deletions tests/test_local.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,37 @@
from chargeamps.local import ChargeAmpsLocalClient, ChargePointType, LocalChargePoint

SETTINGS_AURA = "101,1,1,0,160,4,1,1,0,160,4,100"
SETTINGS_HALO = "1,1,1,0,0,160,0,4"
MESSAGES_HALO = [
"1,1,1,0,0,160,0,4",
"108,5698,11853,12212,11936,2,Charging",
"108,8269,12104,12218,12157,24,Charging",
"108,8330,12187,12275,12157,47,Charging",
"108,8308,12313,12287,12204,71,Charging",
"108,8334,12265,12271,12175,94,Charging",
"108,8290,12299,12087,12018,117,Charging",
"108,8308,12247,12126,12257,142,Charging",
]
MESSAGES_AURA = ["101,1,1,0,160,4,1,1,0,160,4,100"]


def test_local():
aura = LocalChargePoint.from_url(
"ws://127.0.0.1:8001", pin="12345678", type=ChargePointType.AURA
)
def test_local_halo():
halo = LocalChargePoint.from_url(
"ws://127.0.0.1:8002", pin="12345678", type=ChargePointType.HALO
)
client = ChargeAmpsLocalClient(chargepoints=[aura, halo])
client = ChargeAmpsLocalClient(chargepoint=halo)

for message in MESSAGES_AURA:
client.status.process_message(message)

print(client.status)


def test_local_aura():
aura = LocalChargePoint.from_url(
"ws://127.0.0.1:8001", pin="12345678", type=ChargePointType.AURA
)
client = ChargeAmpsLocalClient(chargepoint=aura)

for message in MESSAGES_AURA:
client.status.process_message(message)

client.status[0].update_settings(SETTINGS_AURA)
client.status[1].update_settings(SETTINGS_HALO)
print(client.status)

0 comments on commit 69e01b4

Please sign in to comment.