Skip to content

Commit

Permalink
Migrated from gql and python-keycloak
Browse files Browse the repository at this point in the history
* Fixed authentication (reauth when needed)
* Error handling where needed.
  • Loading branch information
perara committed Aug 3, 2022
1 parent c5cbe1b commit 9535d65
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 86 deletions.
2 changes: 0 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ classifiers = [
]
dependencies = [
"aiohttp==3.8.1",
"gql[all]==3.4.0",
"python-keycloak==2.1.1",
"beautifulsoup4==4.11.1",
"httpx==0.23.0",
"pydantic==1.9.1",
Expand Down
82 changes: 51 additions & 31 deletions systemair/saveconnect/auth.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import time
import httpx
from keycloak import KeycloakOpenID
from bs4 import BeautifulSoup


Expand All @@ -12,53 +12,74 @@ def __init__(self, loop):
self._oidc_token: dict = {}
self.loop = loop

async def auth_openid(self):
return KeycloakOpenID(server_url="https://sso.systemair.com/auth/",
client_id="iot-application",
realm_name="iot")
self._token_expiry = time.time()

async def auth(self, email, password):
# Configure client
keycloak_openid = await self.auth_openid()

# Get Code With Oauth Authorization Request
auth_url = await self.loop.run_in_executor(None, lambda : keycloak_openid.auth_url(
redirect_uri="https://homesolutions.systemair.com",
scope="openid",
state="xyzABC123"))

r1 = await self._client.get(auth_url, follow_redirects=True)

soup = BeautifulSoup(r1.content, features="html.parser")

login_form = soup.find("form", {
auth_url = (
"{authorization-endpoint}?client_id={client-id}&response_type={response_type}&redirect_uri={redirect-uri}"
"&scope={scope}&state={state}"
).format(**{
"authorization-endpoint": "https://sso.systemair.com/auth/realms/iot/protocol/openid-connect/auth",
"client-id": "iot-application",
"response_type": "code",
"redirect-uri": "https://homesolutions.systemair.com",
"scope": "openid email profile",
"state": "xyzABC123"
})

response = await self._client.get(auth_url, follow_redirects=True)

soup = BeautifulSoup(response.content, features="html.parser")

login_form_action = soup.find("form", {
"id": "kc-form-login"
})["action"]

r2 = await self._client.post(login_form, data=dict(
response = await self._client.post(login_form_action, data=dict(
username=email,
password=password,
rememberMe="on",
credentialId=""
), follow_redirects=True)
code = response.url.params.get("code")

# Get Access Token With Code
self._oidc_token = await self.loop.run_in_executor(None, lambda: keycloak_openid.token(
grant_type='authorization_code',
code=r2.url.params["code"],
redirect_uri="https://homesolutions.systemair.com"))
response = await self._client.post(
url="https://sso.systemair.com/auth/realms/iot/protocol/openid-connect/token",
headers={
"content-type": "application/x-www-form-urlencoded"
},
data={
"grant_type": "authorization_code",
"client_id": "iot-application",
"code": code,
"redirect_uri": "https://homesolutions.systemair.com"
}
)

self._oidc_token = response.json()

return True if self._oidc_token else False

async def refresh_token(self):
keycloak_openid = await self.auth_openid()

self._oidc_token = await self.loop.run_in_executor(None, lambda: keycloak_openid.refresh_token(
grant_type='refresh_token',
refresh_token=self._oidc_token["refresh_token"]))
response = await self._client.post(
url="https://sso.systemair.com/auth/realms/iot/protocol/openid-connect/token",
headers={
"content-type": "application/x-www-form-urlencoded"
},
data={
"grant_type": "refresh_token",
"client_id": "iot-application",
"refresh_token": self._oidc_token["refresh_token"],
"redirect_uri": "https://homesolutions.systemair.com"
}
)
self._oidc_token = response.json()

self._token_expiry = time.time() + self._oidc_token["expires_in"] - (self._oidc_token["expires_in"] * .20)

def is_auth(self):
return len(self._oidc_token) > 0
return self._token_expiry < time.time() and len(self._oidc_token) > 0

@property
def token(self):
Expand All @@ -67,4 +88,3 @@ def token(self):
@token.setter
def token(self, token):
self._oidc_token = token

9 changes: 9 additions & 0 deletions systemair/saveconnect/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,12 @@ def get(self, device_id, key, value=None):
return register_value

return register_data

def set_availability(self, device_id, available):
if device_id not in self.devices:
return

self.devices[device_id].connectionStatus = "ONLINE" if available else "OFFLINE"

def get_device(self, device_id):
return self.devices[device_id]
102 changes: 59 additions & 43 deletions systemair/saveconnect/graphql.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
import json
import logging
import typing
from json import JSONDecodeError

from aiohttp import ClientError
from gql import gql, Client
from gql.transport.aiohttp import AIOHTTPTransport
from gql.transport.exceptions import TransportAlreadyConnected
import httpx

from systemair.saveconnect.data import SaveConnectData
from systemair.saveconnect.models import SaveConnectDevice
from systemair.saveconnect.registry import RegisterWrite
from .const import APIRoutes
Expand All @@ -17,28 +14,25 @@

class SaveConnectGraphQL:

def __init__(self, data: SaveConnectData):
self.data: SaveConnectData = data
transport = AIOHTTPTransport(url="https://homesolutions.systemair.com/gateway/api")
self.client = Client(
transport=transport,
fetch_schema_from_transport=True,
execute_timeout=120
)
def __init__(self, api: "SaveConnect"):
self.api = api
self.client: httpx.AsyncClient = httpx.AsyncClient()
self.headers = {
"content-type": "application/json",
"x-access-token": None
}
self.api_url = "https://homesolutions.systemair.com/gateway/api"

def set_access_token(self, _oidc_token):
self.client.transport.headers = {
"x-access-token": _oidc_token["access_token"]
}
self.headers["x-access-token"] = _oidc_token["access_token"]

async def queryWriteDeviceValues(self, device_id, register_pair: RegisterWrite, is_import=False):
query = gql(
"""

query = """
mutation ($input: WriteDeviceValuesInputType!) {
WriteDeviceValues(input: $input)
}
"""
)

data = dict(
input={
Expand All @@ -50,16 +44,17 @@ async def queryWriteDeviceValues(self, device_id, register_pair: RegisterWrite,
}
)

response = await self.client.execute_async(
query,
variable_values=data
response_data = await self.post_request(
url=self.api_url,
data=dict(query=query, variables=data),
headers=self.headers
)

return self.data.update(device_id, response)
return self.api.data.update(device_id, response_data)

async def queryDeviceView(self, device_id, route):
query = gql(
"""

query = """
mutation ($input: GetDeviceViewInput!) {
GetDeviceView(input: $input) {
route
Expand All @@ -69,25 +64,21 @@ async def queryDeviceView(self, device_id, route):
translationVariables
}
}
""",
)

"""
data = dict(
input=dict(
deviceId=device_id,
route=route
)
)

try:
response = await self.client.execute_async(
query,
variable_values=data
)
except TransportAlreadyConnected as e:
raise ClientError(e)
response_data = await self.post_request(
url=self.api_url,
data=dict(query=query, variables=data),
headers=self.headers
)

return self.data.update(device_id, response)
return self.api.data.update(device_id, response_data)

async def queryGetDeviceData(self, device_id, change_mode=False):
success = await self.queryDeviceView(
Expand All @@ -97,7 +88,7 @@ async def queryGetDeviceData(self, device_id, change_mode=False):
return success

async def queryGetAccount(self) -> typing.List['SaveConnectDevice']:
query = gql("""
query = """
{
GetAccount {
email
Expand Down Expand Up @@ -152,13 +143,18 @@ async def queryGetAccount(self) -> typing.List['SaveConnectDevice']:
}
}
}
""")
response = await self.client.execute_async(query)
"""

response_data = await self.post_request(
url=self.api_url,
data=dict(query=query, variables={}),
headers=self.headers
)

for device_data in response["GetAccount"]["devices"]:
self.data.update_device(device_data=device_data)
for device_data in response_data["GetAccount"]["devices"]:
self.api.data.update_device(device_data=device_data)

return list(self.data.devices.values())
return list(self.api.data.devices.values())

async def queryDeviceInfo(self, device: SaveConnectDevice):
statuses = []
Expand All @@ -172,9 +168,29 @@ async def queryDeviceInfo(self, device: SaveConnectDevice):
]:
status = await self.queryDeviceView(device.identifier, route)


if not status:
_LOGGER.error(f"queryDeviceInfo failed for route={route}")
statuses.append(status)

return all(statuses)

async def post_request(self, url, data, headers, retry=False):

response = await self.client.post(
url=url,
json=data,
headers=headers
)

try:
response_data = response.json()["data"]
return response_data
except JSONDecodeError as e:

if not retry and "UnauthorizedError" in response.text:
_LOGGER.error("Response indicates token expiry. Refreshing token and retry")
await self.api.refresh_token()
return await self.post_request(url, data, headers, retry=True)

_LOGGER.error(f"Could not parse JSON. Content: {response.content}")
raise e
38 changes: 29 additions & 9 deletions systemair/saveconnect/systemair.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,12 @@ def __init__(self,
@param refresh_token_interval: Refresh interval of the access_token
"""
self.data = SaveConnectData()
self.graphql = SaveConnectGraphQL(data=self.data)
self.graphql = SaveConnectGraphQL(self)
self.auth = SaveConnectAuth(loop=loop)
self.user_mode = SaveConnectUserMode(self)
self.temperature = SaveConnectTemperature(self)

self._ws = WSClient(url=wss_url, callback=self.on_ws_data, loop=loop)
self._ws = WSClient(self, url=wss_url, callback=self.on_ws_data, loop=loop)

"""URL for the savecair API."""
self.url = url
Expand Down Expand Up @@ -214,15 +214,32 @@ async def on_ws_data(self, data) -> bool:
data_json = json.loads(data)
payload = data_json["payload"]
device_id = payload["deviceId"]

if "dataItems" not in payload:
_LOGGER.error("Could not retrieve dataItems from websocket API.")
message_type = data_json["type"]

if message_type == "DEVICE_CONNECTED":
self.data.set_availability(device_id, available=True)
elif message_type == "DEVICE_DISCONNECTED":
self.data.set_availability(device_id, available=False)
elif message_type == "DEVICE_PUSH_EVENT":
if "dataItems" not in payload:
_LOGGER.error("Could not retrieve dataItems from websocket API.")
return False

data_items = payload["dataItems"]
print(data_json)
self.data.update(device_id, data_items)

# Finally poll for updates

try:
device = self.data.get_device(device_id=device_id)
await self.read_data(device=device)
except KeyError:
_LOGGER.debug(f"Could not find device with ID={device_id} when polling data in WS.")
else:
_LOGGER.error(f"Unhandled message type for WS connection: {message_type}")
return False

data_items = payload["dataItems"]

self.data.update(device_id, data_items)

return True

async def read_data(self, device: SaveConnectDevice) -> bool:
Expand All @@ -231,6 +248,9 @@ async def read_data(self, device: SaveConnectDevice) -> bool:
@param device: SaveConnectDevice object
@return: the data that was retrieved from the API
"""
if not self.auth.is_auth():
await self.refresh_token()

status = await self.graphql.queryGetDeviceData(device.identifier)

return status
Expand Down
Loading

0 comments on commit 9535d65

Please sign in to comment.