Skip to content

Commit

Permalink
Add support for 2.x.x firmwares w/o mesh data pages
Browse files Browse the repository at this point in the history
  • Loading branch information
mletenay committed Jul 5, 2024
1 parent e6c72c3 commit 28a5388
Showing 1 changed file with 123 additions and 30 deletions.
153 changes: 123 additions & 30 deletions custom_components/tigo/tigo_cca.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,21 @@ def __str__(self):
return result


class _CcaStatusPageParser(HTMLParser):
class _TigoParser(HTMLParser):
def __init__(self, url: str) -> None:
"""Initialize the table parser."""
super().__init__()
self.url: str = url

def parse(self, text: str) -> None:
self.feed(text)
self.close()


class _LmuduiParser(_TigoParser):
def __init__(self, cca: TigoCcaStatus) -> None:
"""Initialize the CCA status page parser."""
super().__init__()
super().__init__("/cgi-bin/lmudui")
self._cca = cca
self._in_td = False
self._was_temp_td = False
Expand Down Expand Up @@ -140,48 +151,120 @@ def handle_endtag(self, tag):
self._in_td = False


class _TableParser(HTMLParser):
def __init__(self) -> None:
class _TableParser(_TigoParser):
def __init__(self, url: str) -> None:
"""Initialize the table parser."""
super().__init__()
super().__init__(url)
self._in_table: bool = False
self._in_row: bool = False
self._td_nr: int = 0
self._td_done: bool = False

def handle_starttag(self, tag, attrs):
def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]):
if tag == "table" and dict(attrs).get("class") == "list_tb":
self._in_table = True
elif tag == "tr" and self._in_table:
elif self._is_tr(tag, attrs) and self._in_table:
self._in_row = True
elif tag == "td" and self._in_row:
self._td_nr += 1
self._td_done = False

def handle_data(self, data):
def handle_data(self, data: str) -> None:
if self._td_nr > 0 and data != "n/a" and not self._td_done:
self._td_done = True
self._on_td(self._td_nr, data)

def handle_endtag(self, tag):
def handle_endtag(self, tag: str) -> None:
if tag == "table" and self._in_table:
self._in_table = False
elif tag == "tr" and self._in_row and self._td_nr > 0:
self._on_tr_end()
self._in_row = False
self._td_nr = 0

def _is_tr(self, tag: str, attrs: list[tuple[str, str | None]]) -> bool:
return tag == "tr"

def _on_td(self, nr: int, data: str) -> None:
pass

def _on_tr_end(self) -> None:
pass


class _PanelsStatusPageParser(_TableParser):
class _MmdstatusInfoParser(_TableParser):
def __init__(self, infos: dict[str, PanelVersionInfo]) -> None:
"""Initialize the panels status/info page parser."""
super().__init__("/cgi-bin/mmdstatus")
self._infos: dict[str, PanelVersionInfo] = infos
self._info: PanelVersionInfo = None

def _on_td(self, nr: int, data: str) -> None:
match self._td_nr:
case 1:
self._info = PanelVersionInfo()
self._info.label = data.strip()
case 3:
self._info.mac = data.strip()
case _:
pass

def _on_tr_end(self) -> None:
if self._info:
self._infos[self._info.label] = self._info


class _MmdstatusParser(_TableParser):
def __init__(self, cca: TigoCcaStatus) -> None:
"""Initialize the panel status page parser."""
super().__init__()
super().__init__("/cgi-bin/mmdstatus")
self._cca: TigoCcaStatus = cca
self._status: PanelStatus = None

def _is_tr(self, tag: str, attrs: list[tuple[str, str | None]]) -> bool:
if tag != "tr":
return False
cls = dict(attrs).get("class")
if cls:
return "title" not in cls
return True

def _on_td(self, nr: int, data: str) -> None:
match nr:
case 1:
self._status = PanelStatus()
self._status.label = data.strip()
case 3:
self._status.mac = data.strip()
case 4:
self._status.voltage_in = float(data)
case 6:
self._status.voltage_out = float(data)
case 8:
self._status.current = float(data)
case 9:
self._status.power = float(data)
case 11:
self._status.temperature = float(data)
case 12:
self._status.rssi = int(data)
case 15:
self._status.pwm = int(data)
case 21:
self._status.status = int(data, 16)
case _:
pass

def _on_tr_end(self) -> None:
if self._status and self._status.current is not None:
self._cca.panels[self._status.label] = self._status
self._status = None


class _MeshdatapowereParser(_TableParser):
def __init__(self, cca: TigoCcaStatus) -> None:
"""Initialize the panel status page parser."""
super().__init__("/cgi-bin/meshdatapower")
self._cca: TigoCcaStatus = cca
self._status: PanelStatus = None
self._age: str = None
Expand Down Expand Up @@ -221,10 +304,10 @@ def _on_tr_end(self) -> None:
self._status = None


class _PanelsVersionPageParser(_TableParser):
class _MeshnodeverParser(_TableParser):
def __init__(self, infos: dict[str, PanelVersionInfo]) -> None:
"""Initialize the panels versions page parser."""
super().__init__()
super().__init__("/cgi-bin/meshnodever")
self._infos: dict[str, PanelVersionInfo] = infos
self._info: PanelVersionInfo = None

Expand All @@ -246,10 +329,10 @@ def _on_tr_end(self) -> None:
self._infos[self._info.label] = self._info


class _PanelsInfoPageParser(_TableParser):
class _MeshnodeinfoParser(_TableParser):
def __init__(self, infos: dict[str, PanelVersionInfo]) -> None:
"""Initialize the panels versions page parser."""
super().__init__()
super().__init__("/cgi-bin/meshnodeinfo")
self._infos: dict[str, PanelVersionInfo] = infos
self._mac: str = None

Expand All @@ -258,7 +341,9 @@ def _on_td(self, nr: int, data: str) -> None:
case 2:
self._mac = data
case 3:
self._infos[data.strip()].mac = self._mac
info = self._infos.get(data.strip())
if info is not None:
info.mac = self._mac
case _:
pass

Expand All @@ -271,35 +356,43 @@ def __init__(self, ip_address, username, password) -> None:
self.url_root = "http://" + ip_address
self.auth = aiohttp.BasicAuth(username, password) if username else None
self.panels: dict[str, PanelVersionInfo] = {}
self._has_mesh_pages: bool = True

async def _get(self, url):
async def _get(self, url) -> str | None:
async with aiohttp.ClientSession() as session:
try:
async with session.get(self.url_root + url, auth=self.auth) as response:
return await response.text()
if response.ok:
return await response.text()
return None
except ServerDisconnectedError as ex:
if ex.message.code != 303:
raise

async def _parse(self, parser: _TableParser) -> bool:
page = await self._get(parser.url)
if page:
parser.parse(page)
return True
return False

async def read_config(self) -> None:
"""Read the CCA and panels configuration."""
self.panels = {}
parser = _PanelsVersionPageParser(self.panels)
parser.feed(await self._get("/cgi-bin/meshnodever"))
parser.close()
parser = _PanelsInfoPageParser(self.panels)
parser.feed(await self._get("/cgi-bin/meshnodeinfo"))
parser.close()
self._has_mesh_pages = await self._parse(_MeshnodeverParser(self.panels))
if self._has_mesh_pages:
await self._parse(_MeshnodeinfoParser(self.panels))
else:
await self._parse(_MmdstatusInfoParser(self.panels))

async def get_status(self) -> TigoCcaStatus:
"""Get the latest status of the CCA itself."""
status = TigoCcaStatus()
parser = _CcaStatusPageParser(status)
parser.feed(await self._get("/cgi-bin/lmudui"))
parser.close()
parser = _PanelsStatusPageParser(status)
parser.feed(await self._get("/cgi-bin/meshdatapower"))
parser.close()
await self._parse(_LmuduiParser(status))
if self._has_mesh_pages:
await self._parse(_MeshdatapowereParser(status))
else:
await self._parse(_MmdstatusParser(status))
return status

async def turn_modules_off(self) -> None:
Expand Down

0 comments on commit 28a5388

Please sign in to comment.