Skip to content

Commit

Permalink
Rework OAuth in Tesla Fleet (home-assistant#123324)
Browse files Browse the repository at this point in the history
* Rework Oauth

* Improve docstrings

* Update homeassistant/components/tesla_fleet/oauth.py

Co-authored-by: Martin Hjelmare <[email protected]>

* review feedback

* Add tests for user creds

---------

Co-authored-by: Martin Hjelmare <[email protected]>
  • Loading branch information
Bre77 and MartinHjelmare authored Aug 9, 2024
1 parent 00c1a3f commit f8e1c2c
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 70 deletions.
7 changes: 3 additions & 4 deletions homeassistant/components/tesla_fleet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
TeslaFleetError,
)

from homeassistant.components.application_credentials import ClientCredential
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import CONF_ACCESS_TOKEN, CONF_TOKEN, Platform
from homeassistant.core import HomeAssistant
Expand All @@ -27,15 +26,15 @@
import homeassistant.helpers.config_validation as cv
from homeassistant.helpers.device_registry import DeviceInfo

from .application_credentials import TeslaOAuth2Implementation
from .config_flow import OAuth2FlowHandler
from .const import CLIENT_ID, DOMAIN, LOGGER, MODELS, NAME
from .const import DOMAIN, LOGGER, MODELS
from .coordinator import (
TeslaFleetEnergySiteInfoCoordinator,
TeslaFleetEnergySiteLiveCoordinator,
TeslaFleetVehicleDataCoordinator,
)
from .models import TeslaFleetData, TeslaFleetEnergyData, TeslaFleetVehicleData
from .oauth import TeslaSystemImplementation

PLATFORMS: Final = [Platform.BINARY_SENSOR, Platform.DEVICE_TRACKER, Platform.SENSOR]

Expand All @@ -56,7 +55,7 @@ async def async_setup_entry(hass: HomeAssistant, entry: TeslaFleetConfigEntry) -

OAuth2FlowHandler.async_register_implementation(
hass,
TeslaOAuth2Implementation(hass, DOMAIN, ClientCredential(CLIENT_ID, "", NAME)),
TeslaSystemImplementation(hass),
)

implementation = await async_get_config_entry_implementation(hass, entry)
Expand Down
62 changes: 4 additions & 58 deletions homeassistant/components/tesla_fleet/application_credentials.py
Original file line number Diff line number Diff line change
@@ -1,72 +1,18 @@
"""Application Credentials platform the Tesla Fleet integration."""

import base64
import hashlib
import secrets
from typing import Any

from homeassistant.components.application_credentials import (
AuthImplementation,
AuthorizationServer,
ClientCredential,
)
from homeassistant.components.application_credentials import ClientCredential
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_entry_oauth2_flow

from .const import AUTHORIZE_URL, DOMAIN, SCOPES, TOKEN_URL

AUTH_SERVER = AuthorizationServer(AUTHORIZE_URL, TOKEN_URL)
from .oauth import TeslaUserImplementation


async def async_get_auth_implementation(
hass: HomeAssistant, auth_domain: str, credential: ClientCredential
) -> config_entry_oauth2_flow.AbstractOAuth2Implementation:
"""Return auth implementation."""
return TeslaOAuth2Implementation(
return TeslaUserImplementation(
hass,
DOMAIN,
auth_domain,
credential,
)


class TeslaOAuth2Implementation(AuthImplementation):
"""Tesla Fleet API Open Source Oauth2 implementation."""

def __init__(
self, hass: HomeAssistant, domain: str, credential: ClientCredential
) -> None:
"""Initialize local auth implementation."""
self.hass = hass
self._domain = domain

# Setup PKCE
self.code_verifier = secrets.token_urlsafe(32)
hashed_verifier = hashlib.sha256(self.code_verifier.encode()).digest()
self.code_challenge = (
base64.urlsafe_b64encode(hashed_verifier).decode().replace("=", "")
)
super().__init__(
hass,
domain,
credential,
AUTH_SERVER,
)

@property
def extra_authorize_data(self) -> dict[str, Any]:
"""Extra data that needs to be appended to the authorize url."""
return {
"scope": " ".join(SCOPES),
"code_challenge": self.code_challenge, # PKCE
}

async def async_resolve_external_data(self, external_data: Any) -> dict:
"""Resolve the authorization code to tokens."""
return await self._token_request(
{
"grant_type": "authorization_code",
"code": external_data["code"],
"redirect_uri": external_data["state"]["redirect_uri"],
"code_verifier": self.code_verifier, # PKCE
}
)
9 changes: 3 additions & 6 deletions homeassistant/components/tesla_fleet/config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@

import jwt

from homeassistant.components.application_credentials import ClientCredential
from homeassistant.config_entries import ConfigEntry, ConfigFlowResult
from homeassistant.helpers import config_entry_oauth2_flow

from .application_credentials import TeslaOAuth2Implementation
from .const import CLIENT_ID, DOMAIN, LOGGER, NAME
from .const import DOMAIN, LOGGER
from .oauth import TeslaSystemImplementation


class OAuth2FlowHandler(
Expand All @@ -35,9 +34,7 @@ async def async_step_user(
"""Handle a flow start."""
self.async_register_implementation(
self.hass,
TeslaOAuth2Implementation(
self.hass, DOMAIN, ClientCredential(CLIENT_ID, "", NAME)
),
TeslaSystemImplementation(self.hass),
)

return await super().async_step_user()
Expand Down
1 change: 0 additions & 1 deletion homeassistant/components/tesla_fleet/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

LOGGER = logging.getLogger(__package__)

NAME = "Home Assistant"
CLIENT_ID = "71b813eb-4a2e-483a-b831-4dec5cb9bf0d"
AUTHORIZE_URL = "https://auth.tesla.com/oauth2/v3/authorize"
TOKEN_URL = "https://auth.tesla.com/oauth2/v3/token"
Expand Down
86 changes: 86 additions & 0 deletions homeassistant/components/tesla_fleet/oauth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
"""Provide oauth implementations for the Tesla Fleet integration."""

import base64
import hashlib
import secrets
from typing import Any

from homeassistant.components.application_credentials import (
AuthImplementation,
AuthorizationServer,
ClientCredential,
)
from homeassistant.core import HomeAssistant
from homeassistant.helpers import config_entry_oauth2_flow

from .const import AUTHORIZE_URL, CLIENT_ID, DOMAIN, SCOPES, TOKEN_URL


class TeslaSystemImplementation(config_entry_oauth2_flow.LocalOAuth2Implementation):
"""Tesla Fleet API open source Oauth2 implementation."""

code_verifier: str
code_challenge: str

def __init__(self, hass: HomeAssistant) -> None:
"""Initialize open source Oauth2 implementation."""

# Setup PKCE
self.code_verifier = secrets.token_urlsafe(32)
hashed_verifier = hashlib.sha256(self.code_verifier.encode()).digest()
self.code_challenge = (
base64.urlsafe_b64encode(hashed_verifier).decode().replace("=", "")
)
super().__init__(
hass,
DOMAIN,
CLIENT_ID,
"",
AUTHORIZE_URL,
TOKEN_URL,
)

@property
def name(self) -> str:
"""Name of the implementation."""
return "Built-in open source client ID"

@property
def extra_authorize_data(self) -> dict[str, Any]:
"""Extra data that needs to be appended to the authorize url."""
return {
"scope": " ".join(SCOPES),
"code_challenge": self.code_challenge, # PKCE
}

async def async_resolve_external_data(self, external_data: Any) -> dict:
"""Resolve the authorization code to tokens."""
return await self._token_request(
{
"grant_type": "authorization_code",
"code": external_data["code"],
"redirect_uri": external_data["state"]["redirect_uri"],
"code_verifier": self.code_verifier, # PKCE
}
)


class TeslaUserImplementation(AuthImplementation):
"""Tesla Fleet API user Oauth2 implementation."""

def __init__(
self, hass: HomeAssistant, auth_domain: str, credential: ClientCredential
) -> None:
"""Initialize user Oauth2 implementation."""

super().__init__(
hass,
auth_domain,
credential,
AuthorizationServer(AUTHORIZE_URL, TOKEN_URL),
)

@property
def extra_authorize_data(self) -> dict[str, Any]:
"""Extra data that needs to be appended to the authorize url."""
return {"scope": " ".join(SCOPES)}
86 changes: 85 additions & 1 deletion tests/components/tesla_fleet/test_config_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

import pytest

from homeassistant.components.application_credentials import (
ClientCredential,
async_import_client_credential,
)
from homeassistant.components.tesla_fleet.const import (
AUTHORIZE_URL,
CLIENT_ID,
Expand All @@ -16,6 +20,7 @@
from homeassistant.core import HomeAssistant
from homeassistant.data_entry_flow import FlowResultType
from homeassistant.helpers import config_entry_oauth2_flow
from homeassistant.setup import async_setup_component

from tests.common import MockConfigEntry
from tests.test_util.aiohttp import AiohttpClientMocker
Expand All @@ -26,7 +31,7 @@


@pytest.fixture
async def access_token(hass: HomeAssistant) -> dict[str, str | list[str]]:
async def access_token(hass: HomeAssistant) -> str:
"""Return a valid access token."""
return config_entry_oauth2_flow._encode_jwt(
hass,
Expand Down Expand Up @@ -111,6 +116,85 @@ async def test_full_flow(
assert result["result"].data["token"]["refresh_token"] == "mock-refresh-token"


@pytest.mark.usefixtures("current_request_with_host")
async def test_full_flow_user_cred(
hass: HomeAssistant,
hass_client_no_auth: ClientSessionGenerator,
aioclient_mock: AiohttpClientMocker,
access_token,
) -> None:
"""Check full flow."""

# Create user application credential
assert await async_setup_component(hass, "application_credentials", {})
await async_import_client_credential(
hass,
DOMAIN,
ClientCredential("user_client_id", "user_client_secret"),
"user_cred",
)

result = await hass.config_entries.flow.async_init(
DOMAIN, context={"source": SOURCE_USER}
)

assert result["type"] is FlowResultType.FORM

result = await hass.config_entries.flow.async_configure(
result["flow_id"], {"implementation": "user_cred"}
)
assert result["type"] is FlowResultType.EXTERNAL_STEP

state = config_entry_oauth2_flow._encode_jwt(
hass,
{
"flow_id": result["flow_id"],
"redirect_uri": REDIRECT,
},
)

assert result["url"].startswith(AUTHORIZE_URL)
parsed_url = urlparse(result["url"])
parsed_query = parse_qs(parsed_url.query)
assert parsed_query["response_type"][0] == "code"
assert parsed_query["client_id"][0] == "user_client_id"
assert parsed_query["redirect_uri"][0] == REDIRECT
assert parsed_query["state"][0] == state
assert parsed_query["scope"][0] == " ".join(SCOPES)
assert "code_challenge" not in parsed_query # Ensure not a PKCE flow

client = await hass_client_no_auth()
resp = await client.get(f"/auth/external/callback?code=abcd&state={state}")
assert resp.status == 200
assert resp.headers["content-type"] == "text/html; charset=utf-8"

aioclient_mock.clear_requests()
aioclient_mock.post(
TOKEN_URL,
json={
"refresh_token": "mock-refresh-token",
"access_token": access_token,
"type": "Bearer",
"expires_in": 60,
},
)
with patch(
"homeassistant.components.tesla_fleet.async_setup_entry", return_value=True
) as mock_setup:
result = await hass.config_entries.flow.async_configure(result["flow_id"])

assert len(hass.config_entries.async_entries(DOMAIN)) == 1
assert len(mock_setup.mock_calls) == 1

assert result["type"] is FlowResultType.CREATE_ENTRY
assert result["title"] == UNIQUE_ID
assert "result" in result
assert result["result"].unique_id == UNIQUE_ID
assert "token" in result["result"].data
assert result["result"].data["token"]["access_token"] == access_token
assert result["result"].data["token"]["refresh_token"] == "mock-refresh-token"


@pytest.mark.usefixtures("current_request_with_host")
async def test_reauthentication(
hass: HomeAssistant,
Expand Down

0 comments on commit f8e1c2c

Please sign in to comment.