diff --git a/pyproject.toml b/pyproject.toml index 90f4c93..03edf83 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,8 +13,6 @@ classifiers = [ ] dependencies = [ "aiohttp==3.8.1", - "gql[all]==3.4.0", - "python-keycloak==2.1.1", "beautifulsoup4==4.11.1", "httpx==0.23.0", "pydantic==1.9.1", diff --git a/systemair/saveconnect/auth.py b/systemair/saveconnect/auth.py index 112220f..2dd56d8 100644 --- a/systemair/saveconnect/auth.py +++ b/systemair/saveconnect/auth.py @@ -1,5 +1,5 @@ +import time import httpx -from keycloak import KeycloakOpenID from bs4 import BeautifulSoup @@ -12,53 +12,74 @@ def __init__(self, loop): self._oidc_token: dict = {} self.loop = loop - async def auth_openid(self): - return KeycloakOpenID(server_url="https://sso.systemair.com/auth/", - client_id="iot-application", - realm_name="iot") + self._token_expiry = time.time() async def auth(self, email, password): - # Configure client - keycloak_openid = await self.auth_openid() - - # Get Code With Oauth Authorization Request - auth_url = await self.loop.run_in_executor(None, lambda : keycloak_openid.auth_url( - redirect_uri="https://homesolutions.systemair.com", - scope="openid", - state="xyzABC123")) - - r1 = await self._client.get(auth_url, follow_redirects=True) - - soup = BeautifulSoup(r1.content, features="html.parser") - - login_form = soup.find("form", { + auth_url = ( + "{authorization-endpoint}?client_id={client-id}&response_type={response_type}&redirect_uri={redirect-uri}" + "&scope={scope}&state={state}" + ).format(**{ + "authorization-endpoint": "https://sso.systemair.com/auth/realms/iot/protocol/openid-connect/auth", + "client-id": "iot-application", + "response_type": "code", + "redirect-uri": "https://homesolutions.systemair.com", + "scope": "openid email profile", + "state": "xyzABC123" + }) + + response = await self._client.get(auth_url, follow_redirects=True) + + soup = BeautifulSoup(response.content, features="html.parser") + + login_form_action = soup.find("form", { "id": "kc-form-login" })["action"] - r2 = await self._client.post(login_form, data=dict( + response = await self._client.post(login_form_action, data=dict( username=email, password=password, rememberMe="on", credentialId="" ), follow_redirects=True) + code = response.url.params.get("code") # Get Access Token With Code - self._oidc_token = await self.loop.run_in_executor(None, lambda: keycloak_openid.token( - grant_type='authorization_code', - code=r2.url.params["code"], - redirect_uri="https://homesolutions.systemair.com")) + response = await self._client.post( + url="https://sso.systemair.com/auth/realms/iot/protocol/openid-connect/token", + headers={ + "content-type": "application/x-www-form-urlencoded" + }, + data={ + "grant_type": "authorization_code", + "client_id": "iot-application", + "code": code, + "redirect_uri": "https://homesolutions.systemair.com" + } + ) + + self._oidc_token = response.json() return True if self._oidc_token else False async def refresh_token(self): - keycloak_openid = await self.auth_openid() - - self._oidc_token = await self.loop.run_in_executor(None, lambda: keycloak_openid.refresh_token( - grant_type='refresh_token', - refresh_token=self._oidc_token["refresh_token"])) + response = await self._client.post( + url="https://sso.systemair.com/auth/realms/iot/protocol/openid-connect/token", + headers={ + "content-type": "application/x-www-form-urlencoded" + }, + data={ + "grant_type": "refresh_token", + "client_id": "iot-application", + "refresh_token": self._oidc_token["refresh_token"], + "redirect_uri": "https://homesolutions.systemair.com" + } + ) + self._oidc_token = response.json() + + self._token_expiry = time.time() + self._oidc_token["expires_in"] - (self._oidc_token["expires_in"] * .20) def is_auth(self): - return len(self._oidc_token) > 0 + return self._token_expiry < time.time() and len(self._oidc_token) > 0 @property def token(self): @@ -67,4 +88,3 @@ def token(self): @token.setter def token(self, token): self._oidc_token = token - diff --git a/systemair/saveconnect/data.py b/systemair/saveconnect/data.py index be8b6c6..46d465f 100755 --- a/systemair/saveconnect/data.py +++ b/systemair/saveconnect/data.py @@ -70,3 +70,12 @@ def get(self, device_id, key, value=None): return register_value return register_data + + def set_availability(self, device_id, available): + if device_id not in self.devices: + return + + self.devices[device_id].connectionStatus = "ONLINE" if available else "OFFLINE" + + def get_device(self, device_id): + return self.devices[device_id] diff --git a/systemair/saveconnect/graphql.py b/systemair/saveconnect/graphql.py index c6747de..b4bd013 100755 --- a/systemair/saveconnect/graphql.py +++ b/systemair/saveconnect/graphql.py @@ -1,13 +1,10 @@ import json import logging import typing +from json import JSONDecodeError -from aiohttp import ClientError -from gql import gql, Client -from gql.transport.aiohttp import AIOHTTPTransport -from gql.transport.exceptions import TransportAlreadyConnected +import httpx -from systemair.saveconnect.data import SaveConnectData from systemair.saveconnect.models import SaveConnectDevice from systemair.saveconnect.registry import RegisterWrite from .const import APIRoutes @@ -17,28 +14,25 @@ class SaveConnectGraphQL: - def __init__(self, data: SaveConnectData): - self.data: SaveConnectData = data - transport = AIOHTTPTransport(url="https://homesolutions.systemair.com/gateway/api") - self.client = Client( - transport=transport, - fetch_schema_from_transport=True, - execute_timeout=120 - ) + def __init__(self, api: "SaveConnect"): + self.api = api + self.client: httpx.AsyncClient = httpx.AsyncClient() + self.headers = { + "content-type": "application/json", + "x-access-token": None + } + self.api_url = "https://homesolutions.systemair.com/gateway/api" def set_access_token(self, _oidc_token): - self.client.transport.headers = { - "x-access-token": _oidc_token["access_token"] - } + self.headers["x-access-token"] = _oidc_token["access_token"] async def queryWriteDeviceValues(self, device_id, register_pair: RegisterWrite, is_import=False): - query = gql( - """ + + query = """ mutation ($input: WriteDeviceValuesInputType!) { WriteDeviceValues(input: $input) } """ - ) data = dict( input={ @@ -50,16 +44,17 @@ async def queryWriteDeviceValues(self, device_id, register_pair: RegisterWrite, } ) - response = await self.client.execute_async( - query, - variable_values=data + response_data = await self.post_request( + url=self.api_url, + data=dict(query=query, variables=data), + headers=self.headers ) - return self.data.update(device_id, response) + return self.api.data.update(device_id, response_data) async def queryDeviceView(self, device_id, route): - query = gql( - """ + + query = """ mutation ($input: GetDeviceViewInput!) { GetDeviceView(input: $input) { route @@ -69,9 +64,7 @@ async def queryDeviceView(self, device_id, route): translationVariables } } - """, - ) - + """ data = dict( input=dict( deviceId=device_id, @@ -79,15 +72,13 @@ async def queryDeviceView(self, device_id, route): ) ) - try: - response = await self.client.execute_async( - query, - variable_values=data - ) - except TransportAlreadyConnected as e: - raise ClientError(e) + response_data = await self.post_request( + url=self.api_url, + data=dict(query=query, variables=data), + headers=self.headers + ) - return self.data.update(device_id, response) + return self.api.data.update(device_id, response_data) async def queryGetDeviceData(self, device_id, change_mode=False): success = await self.queryDeviceView( @@ -97,7 +88,7 @@ async def queryGetDeviceData(self, device_id, change_mode=False): return success async def queryGetAccount(self) -> typing.List['SaveConnectDevice']: - query = gql(""" + query = """ { GetAccount { email @@ -152,13 +143,18 @@ async def queryGetAccount(self) -> typing.List['SaveConnectDevice']: } } } - """) - response = await self.client.execute_async(query) + """ + + response_data = await self.post_request( + url=self.api_url, + data=dict(query=query, variables={}), + headers=self.headers + ) - for device_data in response["GetAccount"]["devices"]: - self.data.update_device(device_data=device_data) + for device_data in response_data["GetAccount"]["devices"]: + self.api.data.update_device(device_data=device_data) - return list(self.data.devices.values()) + return list(self.api.data.devices.values()) async def queryDeviceInfo(self, device: SaveConnectDevice): statuses = [] @@ -172,9 +168,29 @@ async def queryDeviceInfo(self, device: SaveConnectDevice): ]: status = await self.queryDeviceView(device.identifier, route) - if not status: _LOGGER.error(f"queryDeviceInfo failed for route={route}") statuses.append(status) return all(statuses) + + async def post_request(self, url, data, headers, retry=False): + + response = await self.client.post( + url=url, + json=data, + headers=headers + ) + + try: + response_data = response.json()["data"] + return response_data + except JSONDecodeError as e: + + if not retry and "UnauthorizedError" in response.text: + _LOGGER.error("Response indicates token expiry. Refreshing token and retry") + await self.api.refresh_token() + return await self.post_request(url, data, headers, retry=True) + + _LOGGER.error(f"Could not parse JSON. Content: {response.content}") + raise e diff --git a/systemair/saveconnect/systemair.py b/systemair/saveconnect/systemair.py index f886784..8b9c0c1 100755 --- a/systemair/saveconnect/systemair.py +++ b/systemair/saveconnect/systemair.py @@ -131,12 +131,12 @@ def __init__(self, @param refresh_token_interval: Refresh interval of the access_token """ self.data = SaveConnectData() - self.graphql = SaveConnectGraphQL(data=self.data) + self.graphql = SaveConnectGraphQL(self) self.auth = SaveConnectAuth(loop=loop) self.user_mode = SaveConnectUserMode(self) self.temperature = SaveConnectTemperature(self) - self._ws = WSClient(url=wss_url, callback=self.on_ws_data, loop=loop) + self._ws = WSClient(self, url=wss_url, callback=self.on_ws_data, loop=loop) """URL for the savecair API.""" self.url = url @@ -214,15 +214,32 @@ async def on_ws_data(self, data) -> bool: data_json = json.loads(data) payload = data_json["payload"] device_id = payload["deviceId"] - - if "dataItems" not in payload: - _LOGGER.error("Could not retrieve dataItems from websocket API.") + message_type = data_json["type"] + + if message_type == "DEVICE_CONNECTED": + self.data.set_availability(device_id, available=True) + elif message_type == "DEVICE_DISCONNECTED": + self.data.set_availability(device_id, available=False) + elif message_type == "DEVICE_PUSH_EVENT": + if "dataItems" not in payload: + _LOGGER.error("Could not retrieve dataItems from websocket API.") + return False + + data_items = payload["dataItems"] + print(data_json) + self.data.update(device_id, data_items) + + # Finally poll for updates + + try: + device = self.data.get_device(device_id=device_id) + await self.read_data(device=device) + except KeyError: + _LOGGER.debug(f"Could not find device with ID={device_id} when polling data in WS.") + else: + _LOGGER.error(f"Unhandled message type for WS connection: {message_type}") return False - data_items = payload["dataItems"] - - self.data.update(device_id, data_items) - return True async def read_data(self, device: SaveConnectDevice) -> bool: @@ -231,6 +248,9 @@ async def read_data(self, device: SaveConnectDevice) -> bool: @param device: SaveConnectDevice object @return: the data that was retrieved from the API """ + if not self.auth.is_auth(): + await self.refresh_token() + status = await self.graphql.queryGetDeviceData(device.identifier) return status diff --git a/systemair/saveconnect/websocket.py b/systemair/saveconnect/websocket.py index b09d627..ecd5663 100755 --- a/systemair/saveconnect/websocket.py +++ b/systemair/saveconnect/websocket.py @@ -3,13 +3,16 @@ import websockets import logging +from websockets.exceptions import InvalidStatusCode + logger = logging.getLogger(__name__) class WSClient: - def __init__(self, url, loop=asyncio.get_event_loop(), **kwargs): + def __init__(self, saveconnect, url, loop=asyncio.get_event_loop(), **kwargs): self.url = url + self.saveconnect = saveconnect self.ws = None self._access_token = None self.loop = loop @@ -19,6 +22,9 @@ def __init__(self, url, loop=asyncio.get_event_loop(), **kwargs): self.sleep_time = kwargs.get('sleep_time') or 5 self.callback = kwargs.get('callback') + def set_callback(self, cb): + self.callback = cb + async def connect(self): self.loop.create_task(self.listen_forever()) @@ -72,3 +78,12 @@ async def listen_forever(self): logger.debug('Retrying connection in {} sec...'.format(self.sleep_time)) await asyncio.sleep(self.sleep_time) continue + except InvalidStatusCode as e: + self.ws = None + logger.error(f"Could not connect to the websocket API. Go code: {e.status_code}") + + if e.status_code == 401: + # Unauthorized + await self.saveconnect.refresh_token() + + await asyncio.sleep(self.sleep_time)