diff --git a/custom_components/tigo/tigo_cca.py b/custom_components/tigo/tigo_cca.py index 90d2266..b1e5261 100644 --- a/custom_components/tigo/tigo_cca.py +++ b/custom_components/tigo/tigo_cca.py @@ -101,10 +101,21 @@ def __str__(self): return result -class _CcaStatusPageParser(HTMLParser): +class _TigoParser(HTMLParser): + def __init__(self, url: str) -> None: + """Initialize the table parser.""" + super().__init__() + self.url: str = url + + def parse(self, text: str) -> None: + self.feed(text) + self.close() + + +class _LmuduiParser(_TigoParser): def __init__(self, cca: TigoCcaStatus) -> None: """Initialize the CCA status page parser.""" - super().__init__() + super().__init__("/cgi-bin/lmudui") self._cca = cca self._in_td = False self._was_temp_td = False @@ -140,30 +151,30 @@ def handle_endtag(self, tag): self._in_td = False -class _TableParser(HTMLParser): - def __init__(self) -> None: +class _TableParser(_TigoParser): + def __init__(self, url: str) -> None: """Initialize the table parser.""" - super().__init__() + super().__init__(url) self._in_table: bool = False self._in_row: bool = False self._td_nr: int = 0 self._td_done: bool = False - def handle_starttag(self, tag, attrs): + def handle_starttag(self, tag: str, attrs: list[tuple[str, str | None]]): if tag == "table" and dict(attrs).get("class") == "list_tb": self._in_table = True - elif tag == "tr" and self._in_table: + elif self._is_tr(tag, attrs) and self._in_table: self._in_row = True elif tag == "td" and self._in_row: self._td_nr += 1 self._td_done = False - def handle_data(self, data): + def handle_data(self, data: str) -> None: if self._td_nr > 0 and data != "n/a" and not self._td_done: self._td_done = True self._on_td(self._td_nr, data) - def handle_endtag(self, tag): + def handle_endtag(self, tag: str) -> None: if tag == "table" and self._in_table: self._in_table = False elif tag == "tr" and self._in_row and self._td_nr > 0: @@ -171,6 +182,9 @@ def handle_endtag(self, tag): self._in_row = False self._td_nr = 0 + def _is_tr(self, tag: str, attrs: list[tuple[str, str | None]]) -> bool: + return tag == "tr" + def _on_td(self, nr: int, data: str) -> None: pass @@ -178,10 +192,79 @@ def _on_tr_end(self) -> None: pass -class _PanelsStatusPageParser(_TableParser): +class _MmdstatusInfoParser(_TableParser): + def __init__(self, infos: dict[str, PanelVersionInfo]) -> None: + """Initialize the panels status/info page parser.""" + super().__init__("/cgi-bin/mmdstatus") + self._infos: dict[str, PanelVersionInfo] = infos + self._info: PanelVersionInfo = None + + def _on_td(self, nr: int, data: str) -> None: + match self._td_nr: + case 1: + self._info = PanelVersionInfo() + self._info.label = data.strip() + case 3: + self._info.mac = data.strip() + case _: + pass + + def _on_tr_end(self) -> None: + if self._info: + self._infos[self._info.label] = self._info + + +class _MmdstatusParser(_TableParser): def __init__(self, cca: TigoCcaStatus) -> None: """Initialize the panel status page parser.""" - super().__init__() + super().__init__("/cgi-bin/mmdstatus") + self._cca: TigoCcaStatus = cca + self._status: PanelStatus = None + + def _is_tr(self, tag: str, attrs: list[tuple[str, str | None]]) -> bool: + if tag != "tr": + return False + cls = dict(attrs).get("class") + if cls: + return "title" not in cls + return True + + def _on_td(self, nr: int, data: str) -> None: + match nr: + case 1: + self._status = PanelStatus() + self._status.label = data.strip() + case 3: + self._status.mac = data.strip() + case 4: + self._status.voltage_in = float(data) + case 6: + self._status.voltage_out = float(data) + case 8: + self._status.current = float(data) + case 9: + self._status.power = float(data) + case 11: + self._status.temperature = float(data) + case 12: + self._status.rssi = int(data) + case 15: + self._status.pwm = int(data) + case 21: + self._status.status = int(data, 16) + case _: + pass + + def _on_tr_end(self) -> None: + if self._status and self._status.current is not None: + self._cca.panels[self._status.label] = self._status + self._status = None + + +class _MeshdatapowereParser(_TableParser): + def __init__(self, cca: TigoCcaStatus) -> None: + """Initialize the panel status page parser.""" + super().__init__("/cgi-bin/meshdatapower") self._cca: TigoCcaStatus = cca self._status: PanelStatus = None self._age: str = None @@ -221,10 +304,10 @@ def _on_tr_end(self) -> None: self._status = None -class _PanelsVersionPageParser(_TableParser): +class _MeshnodeverParser(_TableParser): def __init__(self, infos: dict[str, PanelVersionInfo]) -> None: """Initialize the panels versions page parser.""" - super().__init__() + super().__init__("/cgi-bin/meshnodever") self._infos: dict[str, PanelVersionInfo] = infos self._info: PanelVersionInfo = None @@ -246,10 +329,10 @@ def _on_tr_end(self) -> None: self._infos[self._info.label] = self._info -class _PanelsInfoPageParser(_TableParser): +class _MeshnodeinfoParser(_TableParser): def __init__(self, infos: dict[str, PanelVersionInfo]) -> None: """Initialize the panels versions page parser.""" - super().__init__() + super().__init__("/cgi-bin/meshnodeinfo") self._infos: dict[str, PanelVersionInfo] = infos self._mac: str = None @@ -258,7 +341,9 @@ def _on_td(self, nr: int, data: str) -> None: case 2: self._mac = data case 3: - self._infos[data.strip()].mac = self._mac + info = self._infos.get(data.strip()) + if info is not None: + info.mac = self._mac case _: pass @@ -271,35 +356,43 @@ def __init__(self, ip_address, username, password) -> None: self.url_root = "http://" + ip_address self.auth = aiohttp.BasicAuth(username, password) if username else None self.panels: dict[str, PanelVersionInfo] = {} + self._has_mesh_pages: bool = True - async def _get(self, url): + async def _get(self, url) -> str | None: async with aiohttp.ClientSession() as session: try: async with session.get(self.url_root + url, auth=self.auth) as response: - return await response.text() + if response.ok: + return await response.text() + return None except ServerDisconnectedError as ex: if ex.message.code != 303: raise + async def _parse(self, parser: _TableParser) -> bool: + page = await self._get(parser.url) + if page: + parser.parse(page) + return True + return False + async def read_config(self) -> None: """Read the CCA and panels configuration.""" self.panels = {} - parser = _PanelsVersionPageParser(self.panels) - parser.feed(await self._get("/cgi-bin/meshnodever")) - parser.close() - parser = _PanelsInfoPageParser(self.panels) - parser.feed(await self._get("/cgi-bin/meshnodeinfo")) - parser.close() + self._has_mesh_pages = await self._parse(_MeshnodeverParser(self.panels)) + if self._has_mesh_pages: + await self._parse(_MeshnodeinfoParser(self.panels)) + else: + await self._parse(_MmdstatusInfoParser(self.panels)) async def get_status(self) -> TigoCcaStatus: """Get the latest status of the CCA itself.""" status = TigoCcaStatus() - parser = _CcaStatusPageParser(status) - parser.feed(await self._get("/cgi-bin/lmudui")) - parser.close() - parser = _PanelsStatusPageParser(status) - parser.feed(await self._get("/cgi-bin/meshdatapower")) - parser.close() + await self._parse(_LmuduiParser(status)) + if self._has_mesh_pages: + await self._parse(_MeshdatapowereParser(status)) + else: + await self._parse(_MmdstatusParser(status)) return status async def turn_modules_off(self) -> None: