From a009ae107043297bf577fd3a3335ac89d17ff055 Mon Sep 17 00:00:00 2001 From: Samuel Gulliksson Date: Sun, 16 Sep 2018 12:48:15 +0200 Subject: [PATCH] Refactor implementation and API (#30) * Refactor library and public API, preparing for multi-provider support. * Update example app, and tests for it. * Rename permanent session config value and change the default to True. With a permanent session by the default, the user data will be persisted even after browser restarts. * Update documentation. --- .gitignore | 1 + .travis.yml | 2 +- README.md | 147 ++++--- example/__init__.py | 0 example/app.py | 34 +- example/test_example_app.py | 86 ++++ setup.py | 5 +- src/flask_pyoidc/flask_pyoidc.py | 225 +++------- src/flask_pyoidc/provider_configuration.py | 169 +++++++ src/flask_pyoidc/pyoidc_facade.py | 164 +++++++ src/flask_pyoidc/user_session.py | 69 +++ tests/requirements.txt | 3 +- tests/test_flask_pyoidc.py | 489 ++++++++++----------- tests/test_provider_configuration.py | 104 +++++ tests/test_pyoidc_facade.py | 202 +++++++++ tests/test_user_session.py | 59 +++ tox.ini | 2 +- 17 files changed, 1267 insertions(+), 494 deletions(-) create mode 100644 example/__init__.py create mode 100644 example/test_example_app.py create mode 100644 src/flask_pyoidc/provider_configuration.py create mode 100644 src/flask_pyoidc/pyoidc_facade.py create mode 100644 src/flask_pyoidc/user_session.py create mode 100644 tests/test_provider_configuration.py create mode 100644 tests/test_pyoidc_facade.py create mode 100644 tests/test_user_session.py diff --git a/.gitignore b/.gitignore index 668cbcd..a6dbce3 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ dist/ .cache/ .coverage coverage.xml +.pytest_cache diff --git a/.travis.yml b/.travis.yml index 9980311..37e1537 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,5 +22,5 @@ script: - tox after_success: - - py.test tests/ --cov=./ + - py.test tests/ example/ --cov=./ - codecov diff --git a/README.md b/README.md index 92c7291..38312a9 100644 --- a/README.md +++ b/README.md @@ -4,10 +4,15 @@ [![codecov.io](https://codecov.io/github/zamzterz/Flask-pyoidc/coverage.svg?branch=master)](https://codecov.io/github/its-dirg/Flask-pyoidc?branch=master) [![Build Status](https://travis-ci.org/zamzterz/Flask-pyoidc.svg?branch=master)](https://travis-ci.org/zamzterz/Flask-pyoidc) -This Flask extension provides simple OpenID Connect authentication, by using [pyoidc](https://github.com/rohe/pyoidc). -Currently only ["Code Flow"](http://openid.net/specs/openid-connect-core-1_0.html#CodeFlowAuth)) is supported. +This Flask extension provides simple OpenID Connect authentication, backed by [pyoidc](https://github.com/rohe/pyoidc). -## Usage +*Currently only ["Authorization Code Flow"](http://openid.net/specs/openid-connect-core-1_0.html#CodeFlowAuth) is supported.* + +## Example + +Have a look at the [example Flask app](example/app.py) for a full example of how to use this extension. + +## Configuration ### Provider and client configuration @@ -19,87 +24,112 @@ of the client registration modes. To use a provider which supports dynamic discovery it suffices to specify the issuer URL: ```python -auth = OIDCAuthentication(issuer='https://op.example.com') +from flask_pyoidc.provider_configuration import ProviderConfiguration +from flask_pyoidc.flask_pyoidc import OIDCAuthentication + +auth = OIDCAuthentication(ProviderConfiguration(issuer='https://op.example.com', [client configuration])) ``` #### Static provider configuration -To use a provider not supporting dynamic discovery, the static provider configuration can be specified: +To use a provider not supporting dynamic discovery, the static provider metadata can be specified: ```python -provider_config = { - 'issuer': 'https://op.example.com', - 'authorization_endpoint': 'https://op.example.com/authorize', - 'token_endpoint': 'https://op.example.com/token', - 'userinfo_endpoint': 'https://op.example.com/userinfo' -} -auth = OIDCAuthentication(provider_configuration_info=provider_config) +from flask_pyoidc.provider_configuration import ProviderConfiguration, ProviderMetadata +from flask_pyoidc.flask_pyoidc import OIDCAuthentication + +provider_metadata = ProviderMetadata(issuer='https://op.example.com', + authorization_endpoint='https://op.example.com/auth', + jwks_uri='https://op.example.com/jwks') +auth = OIDCAuthentication(ProviderConfiguration(provider_metadata=provider_metadata, [client configuration])) ``` See the OpenID Connect specification for more information about the [provider metadata](https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderMetadata). - #### Static client registration -If you have already registered a client with the provider all client registration information can be specified: +If you have already registered a client with the provider, specify the client credentials directly: ```python -client_info = { - 'client_id': 'cl41ekfb9j', - 'client_secret': 'm1C659wLipXfUUR50jlZ', +from flask_pyoidc.provider_configuration import ProviderConfiguration, ClientMetadata +from flask_pyoidc.flask_pyoidc import OIDCAuthentication -} -auth = OIDCAuthentication(client_registration_info=client_info) +client_metadata = ClientMetadata(client_id='cl41ekfb9j', client_secret='m1C659wLipXfUUR50jlZ') +auth = OIDCAuthentication(ProviderConfiguration([provider configuration], client_metadata=client_metadata)) ``` **Note: The redirect URIs registered with the provider MUST include `/redirect_uri`, -where `` is the URL for the Flask application.** +where `` is the URL of the Flask application.** -#### Session refresh +#### Dynamic client registration + +To dynamically register a new client for your application, the required client registration info can be specified: -If your OpenID Connect provider supports the `prompt=none` parameter, the library can automatically support session refresh on your behalf. -This ensures that the user session attributes (OIDC claims, user being active, etc.) are valid and up-to-date without having to log the user out and back in. -To use the feature simply pass the parameter requesting the session refresh interval as such: ```python -client_info = { - 'client_id': 'cl41ekfb9j', - 'client_secret': 'm1C659wLipXfUUR50jlZ', - 'session_refresh_interval_seconds': 900 +from flask_pyoidc.provider_configuration import ProviderConfiguration, ClientRegistrationInfo +from flask_pyoidc.flask_pyoidc import OIDCAuthentication -} -auth = OIDCAuthentication(client_registration_info=client_info) +client_registration_info = ClientRegistrationInfo(client_name='Test App', contacts=['dev@rp.example.com']) +auth = OIDCAuthentication(ProviderConfiguration([provider configuration], client_registration_info=client_registration_info)) ``` -**Note: The client will still be logged out at whichever expiration time you set for the Flask session. +### Flask configuration -#### Dynamic client registration +The application using this extension **MUST** set the following +[builtin configuration values of Flask](http://flask.pocoo.org/docs/config/#builtin-configuration-values): + +* `SERVER_NAME`: **MUST** be the same as `` if using static client registration. +* `SECRET_KEY`: This extension relies on [Flask sessions](http://flask.pocoo.org/docs/quickstart/#sessions), which + requires `SECRET_KEY`. -If no `client_id` is specified in the `client_registration_info` constructor parameter, the library will try to -dynamically register a client with the specified provider. +You may also configure the way the user sessions created by this extension are handled: -### Protect an endpoint by authentication +* `OIDC_SESSION_PERMANENT`: If set to `True` (which is the default) the user session will live until the ID Token + expiration time. If set to `False` the session will be deleted when the user closes the browser. + +### Session refresh + +If your provider supports the `prompt=none` authentication request parameter, this extension can automatically refresh +user sessions. This ensures that the user attributes (OIDC claims, user being active, etc.) are kept up-to-date without +having to log the user out and back in. To enable and configure the feature, specify the interval (in seconds) between +refreshes: +```python +from flask_pyoidc.provider_configuration import ProviderConfiguration +from flask_pyoidc.flask_pyoidc import OIDCAuthentication + +auth = OIDCAuthentication(ProviderConfiguration(session_refresh_interval_seconds=1800, [provider/client config]) +``` + +**Note: The user will still be logged out when the session expires (as described above).** + +## Protect an endpoint by authentication To add authentication to one of your endpoints use the `oidc_auth` decorator: ```python import flask from flask import Flask, jsonify +from flask_pyoidc.user_session import UserSession + app = Flask(__name__) -@app.route('/') +@app.route('/login') @auth.oidc_auth def index(): - return jsonify(id_token=flask.session['id_token'], access_token=flask.session['access_token'], - userinfo=flask.session['userinfo']) + user_session = UserSession(flask.session) + return jsonify(access_token=user_session.access_token, + id_token=user_session.id_token, + userinfo=user_session.userinfo) ``` -This extension will place three things in the session if they are received from the provider: +After a successful login, this extension will place three things in the user session (if they are received from the +provider): * [ID Token](http://openid.net/specs/openid-connect-core-1_0.html#IDToken) -* [access token](http://openid.net/specs/openid-connect-core-1_0.html#TokenResponse) -* [userinfo response](http://openid.net/specs/openid-connect-core-1_0.html#UserInfoResponse) +* [Access Token](http://openid.net/specs/openid-connect-core-1_0.html#TokenResponse) +* [Userinfo Response](http://openid.net/specs/openid-connect-core-1_0.html#UserInfoResponse) -### User logout +## User logout -To support user logout use the `oidc_logout` decorator: +To support user logout, use the `oidc_logout` decorator: ```python @app.route('/logout') @auth.oidc_logout @@ -108,12 +138,13 @@ def logout(): ``` This extension also supports [RP-Initiated Logout](http://openid.net/specs/openid-connect-session-1_0.html#RPLogout), -if the provider allows it. +if the provider allows it. Make sure the `end_session_endpoint` is defined in the provider metadata to enable notifying +the provider when the user logs out. -### Specify the error view +## Specify the error view If an OAuth error response is received, either in the authentication or token response, it will be passed to the -specified error view. An error view is specified by using the `error_view` decorator: +"error view", specified using the `error_view` decorator: ```python from flask import jsonify @@ -124,25 +155,7 @@ def error(error=None, error_description=None): ``` The function specified as the error view MUST accept two parameters, `error` and `error_description`, which corresponds -to the [OIDC/OAuth error parameters](http://openid.net/specs/openid-connect-core-1_0.html#AuthError). - -If no error view is specified a generic error message will be displayed to the user. - - -## Configuration - -The application using this extension MUST set the following [builtin configuration values of Flask](http://flask.pocoo.org/docs/0.10/config/#builtin-configuration-values): - -* `SERVER_NAME` (MUST be the same as `` if using static client registration) -* `SECRET_KEY` (this extension relies on [Flask sessions](http://flask.pocoo.org/docs/0.11/quickstart/#sessions), which requires `SECRET_KEY`) - -You may also configure the way Flask sessions handles the user session: - -* `PERMANENT_SESSION` (added by this extension; makes the session cookie expire after a configurable length of time instead of being tied to the browser session) -* `PERMANENT_SESSION_LIFETIME` (the lifetime of a permanent session) - -See the [Flask documentation](http://flask.pocoo.org/docs/0.11/config/#builtin-configuration-values) for an exhaustive list of configuration options. - -## Example +to the [OIDC error parameters](http://openid.net/specs/openid-connect-core-1_0.html#AuthError), and return the content +that should be displayed to the user. -Have a look at the example Flask app in [app.py](example/app.py) for an idea of how to use it. +If no error view is specified, a generic error message will be displayed to the user. diff --git a/example/__init__.py b/example/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/example/app.py b/example/app.py index 0ce7c34..6cfd24f 100644 --- a/example/app.py +++ b/example/app.py @@ -1,33 +1,39 @@ import flask +import logging from flask import Flask, jsonify from flask_pyoidc.flask_pyoidc import OIDCAuthentication +from flask_pyoidc.provider_configuration import ProviderConfiguration, ClientMetadata +from flask_pyoidc.user_session import UserSession -PORT = 5000 app = Flask(__name__) - - # See http://flask.pocoo.org/docs/0.12/config/ -app.config.update({'SERVER_NAME': 'example.com', - 'SECRET_KEY': 'dev_key', - 'PREFERRED_URL_SCHEME': 'https', - 'SESSION_PERMANENT': True, # turn on flask session support - 'PERMANENT_SESSION_LIFETIME': 2592000, # session time in seconds (30 days) +app.config.update({'SERVER_NAME': 'localhost:5000', + 'SECRET_KEY': 'dev_key', # make sure to change this!! + 'PREFERRED_URL_SCHEME': 'http', 'DEBUG': True}) -auth = OIDCAuthentication(app, issuer="auth.example.net") +ISSUER = 'https://provider.example.com' +CLIENT_ID = 'client1' +CLIENT_SECRET = 'very_secret' +provider_configuration = ProviderConfiguration(issuer=ISSUER, + client_metadata=ClientMetadata(CLIENT_ID, CLIENT_SECRET)) +auth = OIDCAuthentication(provider_configuration) + @app.route('/') @auth.oidc_auth def index(): - return jsonify(id_token=flask.session['id_token'], access_token=flask.session['access_token'], - userinfo=flask.session['userinfo']) + user_session = UserSession(flask.session) + return jsonify(access_token=user_session.access_token, + id_token=user_session.id_token, + userinfo=user_session.userinfo) @app.route('/logout') @auth.oidc_logout def logout(): - return 'You\'ve been successfully logged out!' + return "You've been successfully logged out!" @auth.error_view @@ -36,4 +42,6 @@ def error(error=None, error_description=None): if __name__ == '__main__': - app.run(port=PORT) + logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') + auth.init_app(app) + app.run() diff --git a/example/test_example_app.py b/example/test_example_app.py new file mode 100644 index 0000000..7915db2 --- /dev/null +++ b/example/test_example_app.py @@ -0,0 +1,86 @@ +import time + +import json +import pytest +import responses +from oic.oic import IdToken +from six.moves.urllib.parse import parse_qsl, urlencode, urlparse + +from .app import app, auth, CLIENT_ID, ISSUER + + +class TestExampleApp(object): + PROVIDER_METADATA = { + 'issuer': ISSUER, + 'authorization_endpoint': ISSUER + '/auth', + 'jwks_uri': ISSUER + '/jwks', + 'token_endpoint': ISSUER + '/token', + 'userinfo_endpoint': ISSUER + '/userinfo' + } + USER_ID = 'user1' + + @pytest.fixture('session', autouse=True) + def setup(self): + app.testing = True + + with responses.RequestsMock() as r: + # mock provider discovery + r.add(responses.GET, ISSUER + '/.well-known/openid-configuration', json=self.PROVIDER_METADATA) + auth.init_app(app) + + @responses.activate + def perform_authentication(self, client): + # index page should make auth request + auth_redirect = client.get('/') + parsed_auth_request = dict(parse_qsl(urlparse(auth_redirect.location).query)) + + now = int(time.time()) + # mock token response + id_token = IdToken(iss=ISSUER, + aud=CLIENT_ID, + sub=self.USER_ID, + exp=now + 10, + iat=now, + nonce=parsed_auth_request['nonce']) + token_response = {'access_token': 'test_access_token', 'token_type': 'Bearer', 'id_token': id_token.to_jwt()} + responses.add(responses.POST, self.PROVIDER_METADATA['token_endpoint'], json=token_response) + + # mock userinfo response + userinfo = {'sub': self.USER_ID, 'name': 'Test User'} + responses.add(responses.GET, self.PROVIDER_METADATA['userinfo_endpoint'], json=userinfo) + + # fake auth response sent to redirect URI + fake_auth_response = 'code=fake_auth_code&state={}'.format(parsed_auth_request['state']) + logged_in_page = client.get('/redirect_uri?{}'.format(fake_auth_response), follow_redirects=True) + result = json.loads(logged_in_page.data.decode('utf-8')) + + assert result['access_token'] == 'test_access_token' + assert result['id_token'] == id_token.to_dict() + assert result['userinfo'] == {'sub': self.USER_ID, 'name': 'Test User'} + + def test_login_logout(self): + client = app.test_client() + + self.perform_authentication(client) + + response = client.get('/logout') + assert response.data.decode('utf-8') == "You've been successfully logged out!" + + def test_error_view(self): + client = app.test_client() + + auth_redirect = client.get('/') + parsed_auth_request = dict(parse_qsl(urlparse(auth_redirect.location).query)) + + # fake auth error response sent to redirect_uri + error_auth_response = { + 'error': 'invalid_request', + 'error_description': 'test error', + 'state': parsed_auth_request['state'] + } + error_page = client.get('/redirect_uri?{}'.format(urlencode(error_auth_response)), follow_redirects=True) + + assert json.loads(error_page.data.decode('utf-8')) == { + 'error': error_auth_response['error'], + 'message': error_auth_response['error_description'] + } diff --git a/setup.py b/setup.py index abf3aba..cf33e0e 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,8 @@ author_email='samuel.gulliksson@gmail.com', description='Flask extension for OpenID Connect authentication.', install_requires=[ - 'oic==0.11.0.1', - 'Flask' + 'oic==0.12', + 'Flask', + 'requests' ] ) diff --git a/src/flask_pyoidc/flask_pyoidc.py b/src/flask_pyoidc/flask_pyoidc.py index e03b18d..071faa7 100644 --- a/src/flask_pyoidc/flask_pyoidc.py +++ b/src/flask_pyoidc/flask_pyoidc.py @@ -1,5 +1,5 @@ """ - Copyright 2017 Samuel Gulliksson + Copyright 2018 Samuel Gulliksson Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,157 +14,89 @@ limitations under the License. """ -import functools -import logging import time import flask +import functools +import logging from flask import current_app from flask.helpers import url_for from oic import rndstr -from oic.oic import Client -from oic.oic.message import AuthorizationResponse from oic.oic.message import EndSessionRequest -from oic.oic.message import ProviderConfigurationResponse -from oic.oic.message import RegistrationRequest -from oic.utils.authn.client import CLIENT_AUTHN_METHOD from werkzeug.utils import redirect -logger = logging.getLogger(__name__) +from .pyoidc_facade import PyoidcFacade +from .user_session import UserSession +logger = logging.getLogger(__name__) -class _Session(object): - """Session object for user login state. - Wraps comparison of times necessary for session handling. +class OIDCAuthentication(object): + """ + OIDCAuthentication object for Flask extension. """ - def __init__(self, flask_session, session_refresh_interval_seconds=None): - self.flask_session = flask_session - self.session_refresh_interval_seconds = session_refresh_interval_seconds + REDIRECT_URI_ENDPOINT = 'redirect_uri' - def is_authenticated(self): + def __init__(self, provider_configuration, app=None): """ - flask_session is empty when the session hasn't been initialised or has expired. - Thus checking for existence of any item is enough to determine if we're authenticated. + Args: + provider_configuration (flask_pyoidc.provider_configuration.ProviderConfiguration): provider configuration + app (flask.app.Flask): optional Flask app """ + self._provider_configuration = provider_configuration - return self.flask_session.get('last_authenticated') is not None - - def should_refresh(self): - return self._supports_refresh() and self._needs_refresh() - - def _refresh_time(self): - last = self.flask_session.get('last_authenticated') - refresh = self.session_refresh_interval_seconds - return last + refresh - - def _supports_refresh(self): - return self.session_refresh_interval_seconds is not None - - def _needs_refresh(self): - return self._refresh_time() < time.time() - - -class OIDCAuthentication(object): - """OIDCAuthentication object for Flask extension. - - Takes a Flask app object, client, registration info, - provider configuration, and supports optional extra request args to the - OIDC identity provider. - """ - - def __init__(self, app=None, client_registration_info=None, - issuer=None, provider_configuration_info=None, - userinfo_endpoint_method='POST', - extra_request_args=None): - self.userinfo_endpoint_method = userinfo_endpoint_method - self.extra_request_args = extra_request_args or {} - - self.client = Client(client_authn_method=CLIENT_AUTHN_METHOD) - - # Raise exception if oic auth will fail based on lack of data. - if not issuer and not provider_configuration_info: - raise ValueError( - 'Either \'issuer\' (for dynamic discovery) or provider_configuration_info' - ' for static configuration must be specified.' - ) - # If only issuer provided assume discovery and initalize anyway. - if issuer and not provider_configuration_info: - self.client.provider_config(issuer) - else: - # Otherwise assume non-discovery for oidc - self.client.handle_provider_config( - ProviderConfigurationResponse(**provider_configuration_info), - provider_configuration_info['issuer'] - ) - - self.client_registration_info = client_registration_info or {} - - self.logout_view = None + self.client = None + self._logout_view = None self._error_view = None + if app: self.init_app(app) def init_app(self, app): # setup redirect_uri as a flask route - app.add_url_rule('/redirect_uri', 'redirect_uri', self._handle_authentication_response) + app.add_url_rule('/redirect_uri', self.REDIRECT_URI_ENDPOINT, self._handle_authentication_response) # dynamically add the Flask redirect uri to the client info with app.app_context(): - self.client_registration_info['redirect_uris'] = url_for('redirect_uri') + self.client = PyoidcFacade(self._provider_configuration, url_for(self.REDIRECT_URI_ENDPOINT)) - # if non-discovery client add the provided info from the constructor - if 'client_id' in self.client_registration_info: - # static client info provided - self.client.store_registration_info(RegistrationRequest(**self.client_registration_info)) + def _get_post_logout_redirect_uri(self): + if self._logout_view: + return url_for(self._logout_view.__name__, _external=True) + return None + + def _register_client(self): + client_registration_args = {} + post_logout_redirect_uri = self._get_post_logout_redirect_uri() + if post_logout_redirect_uri: + logger.debug('registering with post_logout_redirect_uri=%s', post_logout_redirect_uri) + client_registration_args['post_logout_redirect_uris'] = [post_logout_redirect_uri] + self.client.register(client_registration_args) def _authenticate(self, interactive=True): - if 'client_id' not in self.client_registration_info: - logger.debug('performing dynamic client registration') - # do dynamic registration - if self.logout_view: - # handle support for logout - with current_app.app_context(): - post_logout_redirect_uri = url_for(self.logout_view.__name__, _external=True) - logger.debug('built post_logout_redirect_uri=%s', post_logout_redirect_uri) - self.client_registration_info['post_logout_redirect_uris'] = [post_logout_redirect_uri] - - registration_response = self.client.register( - self.client.provider_info['registration_endpoint'], - **self.client_registration_info - ) - logger.debug('client registration response: %s', registration_response.to_json()) + if not self.client.is_registered(): + self._register_client() flask.session['destination'] = flask.request.url flask.session['state'] = rndstr() flask.session['nonce'] = rndstr() - args = { - 'client_id': self.client.client_id, - 'response_type': 'code', - 'scope': ['openid'], - 'redirect_uri': self.client.registration_response['redirect_uris'][0], - 'state': flask.session['state'], - 'nonce': flask.session['nonce'], - } - # Use silent authentication for session refresh # This will not show login prompt to the user + extra_auth_params = {} if not interactive: - args['prompt'] = 'none' - - args.update(self.extra_request_args) - auth_req = self.client.construct_AuthorizationRequest(request_args=args) - logger.debug('sending authentication request: %s', auth_req.to_json()) + extra_auth_params['prompt'] = 'none' - login_url = auth_req.request(self.client.authorization_endpoint) + login_url = self.client.authentication_request(flask.session['state'], + flask.session['nonce'], + extra_auth_params) return redirect(login_url) def _handle_authentication_response(self): # parse authentication response query_string = flask.request.query_string.decode('utf-8') - authn_resp = self.client.parse_response(AuthorizationResponse, info=query_string, sformat='urlencoded') + authn_resp = self.client.parse_authentication_response(query_string) logger.debug('received authentication response: %s', authn_resp.to_json()) if authn_resp['state'] != flask.session.pop('state'): @@ -173,26 +105,15 @@ def _handle_authentication_response(self): if 'error' in authn_resp: return self._handle_error_response(authn_resp) - # do token request - args = { - 'code': authn_resp['code'], - 'redirect_uri': self.client.registration_response['redirect_uris'][0] - } - - logger.debug('making token request') - token_resp = self.client.do_access_token_request( - state=authn_resp['state'], - request_args=args, - authn_method=self.client.registration_response.get('token_endpoint_auth_method', 'client_secret_basic') - ) - logger.debug('received token response: %s', token_resp.to_json()) + token_resp = self.client.token_request(authn_resp['code']) if 'error' in token_resp: return self._handle_error_response(token_resp) - flask.session['access_token'] = token_resp['access_token'] + access_token = token_resp['access_token'] - id_token = None + id_token_claims = None + id_token_jwt = None if 'id_token' in token_resp: id_token = token_resp['id_token'] logger.debug('received id token: %s', id_token.to_json()) @@ -200,53 +121,40 @@ def _handle_authentication_response(self): if id_token['nonce'] != flask.session.pop('nonce'): raise ValueError('The \'nonce\' parameter does not match.') - flask.session['id_token'] = id_token.to_dict() - flask.session['id_token_jwt'] = id_token.to_jwt() + id_token_claims = id_token.to_dict() + id_token_jwt = id_token.to_jwt() # set the session as requested by the OP if we have no default - if current_app.config.get('SESSION_PERMANENT'): + if current_app.config.get('OIDC_SESSION_PERMANENT', True): flask.session.permanent = True - flask.session.permanent_session_lifetime = id_token.get('exp') - time.time() + flask.session.permanent_session_lifetime = id_token['exp'] - time.time() # do userinfo request - userinfo = self._do_userinfo_request(authn_resp['state'], self.userinfo_endpoint_method) + userinfo = self.client.userinfo_request(access_token) + userinfo_claims = None + if userinfo: + userinfo_claims = userinfo.to_dict() - if id_token and userinfo and userinfo['sub'] != id_token['sub']: + if id_token_claims and userinfo_claims and userinfo_claims['sub'] != id_token_claims['sub']: raise ValueError('The \'sub\' of userinfo does not match \'sub\' of ID Token.') - # store the current user session - if userinfo: - flask.session['userinfo'] = userinfo.to_dict() + UserSession(flask.session).update(time.time(), access_token, id_token_claims, id_token_jwt, userinfo_claims) - flask.session['last_authenticated'] = time.time() destination = flask.session.pop('destination') - return redirect(destination) - def _do_userinfo_request(self, state, userinfo_endpoint_method): - if userinfo_endpoint_method is None: - return None - - logger.debug('making userinfo request') - userinfo_response = self.client.do_user_info_request(method=userinfo_endpoint_method, state=state) - logger.debug('received userinfo response: %s', userinfo_response.to_json()) - - return userinfo_response - def _handle_error_response(self, error_response): if self._error_view: error = {k: error_response[k] for k in ['error', 'error_description'] if k in error_response} return self._error_view(**error) - return "Something went wrong with the authentication, please try to login again." + return 'Something went wrong with the authentication, please try to login again.' def oidc_auth(self, view_func): @functools.wraps(view_func) def wrapper(*args, **kwargs): - session = _Session( - flask_session=flask.session, - session_refresh_interval_seconds=self.client_registration_info.get('session_refresh_interval_seconds')) + session = UserSession(flask.session) - if session.should_refresh(): + if session.should_refresh(self.client.session_refresh_interval_seconds): logger.debug('user auth will be refreshed "silently"') return self._authenticate(interactive=False) elif session.is_authenticated(): @@ -260,25 +168,24 @@ def wrapper(*args, **kwargs): def _logout(self): logger.debug('user logout') - id_token_jwt = flask.session['id_token_jwt'] - flask.session.clear() + session = UserSession(flask.session) + id_token_jwt = session.id_token_jwt + session.clear() - if 'end_session_endpoint' in self.client.provider_info: + if self.client.provider_end_session_endpoint: flask.session['end_session_state'] = rndstr() - end_session_request = EndSessionRequest( - id_token_hint=id_token_jwt, - post_logout_redirect_uri=self.client_registration_info['post_logout_redirect_uris'][0], - state=flask.session['end_session_state'] - ) + end_session_request = EndSessionRequest(id_token_hint=id_token_jwt, + post_logout_redirect_uri=self._get_post_logout_redirect_uri(), + state=flask.session['end_session_state']) logger.debug('send endsession request: %s', end_session_request.to_json()) - return redirect(end_session_request.request(self.client.provider_info['end_session_endpoint']), 303) + return redirect(end_session_request.request(self.client.provider_end_session_endpoint), 303) return None def oidc_logout(self, view_func): - self.logout_view = view_func + self._logout_view = view_func @functools.wraps(view_func) def wrapper(*args, **kwargs): diff --git a/src/flask_pyoidc/provider_configuration.py b/src/flask_pyoidc/provider_configuration.py new file mode 100644 index 0000000..66b3952 --- /dev/null +++ b/src/flask_pyoidc/provider_configuration.py @@ -0,0 +1,169 @@ +import collections +import logging + +import requests + +logger = logging.getLogger(__name__) + + +class OIDCData(collections.MutableMapping): + """ + Basic OIDC data representation providing validation of required fields. + """ + + def __init__(self, *args, **kwargs): + """ + Args: + args (List[Tuple[String, String]]): key-value pairs to store + kwargs (Dict[string, string]): key-value pairs to store + """ + self.store = dict() + self.update(dict(*args, **kwargs)) + + def __getitem__(self, key): + return self.store[key] + + def __setitem__(self, key, value): + self.store[key] = value + + def __delitem__(self, key): + del self.store[key] + + def __iter__(self): + return iter(self.store) + + def __len__(self): + return len(self.store) + + def __str__(self): + data = self.store.copy() + if 'client_secret' in data: + data['client_secret'] = '' + return str(data) + + def __repr__(self): + return str(self.store) + + def __nonzero__(self): + return True + + __bool__ = __nonzero__ # for Python 3 + + def copy(self, **kwargs): + values = self.to_dict() + values.update(kwargs) + return self.__class__(**values) + + def to_dict(self): + return self.store.copy() + + +class ProviderMetadata(OIDCData): + def __init__(self, issuer=None, authorization_endpoint=None, jwks_uri=None, **kwargs): + super(ProviderMetadata, self).__init__(issuer=issuer, authorization_endpoint=authorization_endpoint, jwks_uri=jwks_uri, **kwargs) + + +class ClientRegistrationInfo(OIDCData): + pass + + +class ClientMetadata(OIDCData): + def __init__(self, client_id=None, client_secret=None, **kwargs): + super(ClientMetadata, self).__init__(client_id=client_id, client_secret=client_secret, **kwargs) + + +class ProviderConfiguration(object): + """ + Metadata for communicating with a OpenID Connect Provider (OP). + + Attributes: + auth_request_params (dict): Extra parameters, as key-value pairs, to include in the query parameters + of the authentication request + registered_client_metadata (ClientMetadata): The client metadata registered with the provider. + requests_session (requests.Session): Requests object to use when communicating with the provider. + session_refresh_interval_seconds (int): Number of seconds between updates of user data (tokens, user data, etc.) + fetched from the provider. If `None` is specified, no silent updates should be made user data will be made. + userinfo_endpoint_method (str): HTTP method ("GET" or "POST") to use when making the UserInfo Request. If + `None` is specifed, no UserInfo Request will be made. + """ + + DEFAULT_REQUEST_TIMEOUT = 5 + + def __init__(self, + issuer=None, + provider_metadata=None, + userinfo_http_method='GET', + client_registration_info=None, + client_metadata=None, + auth_request_params=None, + session_refresh_interval_seconds=None, + requests_session=None): + """ + Args: + issuer (str): OP Issuer Identifier. If this is specified discovery will be used to fetch the provider + metadata, otherwise `provider_metadata` must be specified. + provider_metadata (ProviderMetadata): OP metadata, + userinfo_http_method (Optional[str]): HTTP method (GET or POST) to use when sending the UserInfo Request. + If `none` is specified, no userinfo request will be sent. + client_registration_info (ClientRegistrationInfo): Client metadata to register your app + dynamically with the provider. Either this or `registered_client_metadata` must be specified. + client_metadata (ClientMetadata): Client metadata if your app is statically + registered with the provider. Either this or `client_registration_info` must be specified. + auth_request_params (dict): Extra parameters that should be included in the authentication request. + session_refresh_interval_seconds (int): Length of interval (in seconds) between attempted user data + refreshes. + requests_session (requests.Session): custom requests object to allow for example retry handling, etc. + """ + + if not issuer and not provider_metadata: + raise ValueError("Specify either 'issuer' or 'provider_metadata'.") + + if not client_registration_info and not client_metadata: + raise ValueError("Specify either 'client_registration_info' or 'client_metadata'.") + + self._issuer = issuer + self._provider_metadata = provider_metadata + + self._client_registration_info = client_registration_info + self._client_metadata = client_metadata + + self.userinfo_endpoint_method = userinfo_http_method + self.auth_request_params = auth_request_params or {} + self.session_refresh_interval_seconds = session_refresh_interval_seconds + + self.requests_session = requests_session or requests.Session() + + def ensure_provider_metadata(self): + if not self._provider_metadata: + resp = self.requests_session \ + .get(self._issuer + '/.well-known/openid-configuration', + timeout=self.DEFAULT_REQUEST_TIMEOUT) + logger.debug('Received discovery response: ' + resp.text) + + self._provider_metadata = ProviderMetadata(**resp.json()) + + return self._provider_metadata + + @property + def registered_client_metadata(self): + return self._client_metadata + + def register_client(self, redirect_uris, extra_parameters=None): + if not self._client_metadata: + if 'registration_endpoint' not in self._provider_metadata: + raise ValueError("Can't use dynamic client registration, provider metadata is missing " + "'registration_endpoint'.") + + registration_request = self._client_registration_info.to_dict() + registration_request['redirect_uris'] = redirect_uris + if extra_parameters: + registration_request.update(extra_parameters) + + resp = self.requests_session \ + .post(self._provider_metadata['registration_endpoint'], + json=registration_request, + timeout=self.DEFAULT_REQUEST_TIMEOUT) + self._client_metadata = ClientMetadata(redirect_uris=redirect_uris, **resp.json()) + logger.debug('Received registration response: client_id=' + self._client_metadata['client_id']) + + return self._client_metadata diff --git a/src/flask_pyoidc/pyoidc_facade.py b/src/flask_pyoidc/pyoidc_facade.py new file mode 100644 index 0000000..0269dd6 --- /dev/null +++ b/src/flask_pyoidc/pyoidc_facade.py @@ -0,0 +1,164 @@ +import base64 + +import logging +from oic.oic import Client, ProviderConfigurationResponse, RegistrationResponse, AuthorizationResponse, \ + AccessTokenResponse, TokenErrorResponse +from oic.utils.authn.client import CLIENT_AUTHN_METHOD + +logger = logging.getLogger(__name__) + + +class _ClientAuthentication(object): + def __init__(self, client_id, client_secret): + self._client_id = client_id + self._client_secret = client_secret + + def __call__(self, method, request): + """ + Args: + method (str): Client Authentication Method. Only 'client_secret_basic' and 'client_secret_post' is + supported. + request (MutableMapping[str, str]): Token request parameters. This may be modified, i.e. if + 'client_secret_post' is used the client credentials will be added. + + Returns: + (Mapping[str, str]): HTTP headers to be included in the token request, or `None` if no extra HTTPS headers + are required for the token request. + """ + if method == 'client_secret_post': + request['client_id'] = self._client_id + request['client_secret'] = self._client_secret + return None # authentication is in the request body, so no Authorization header is returned + + # default to 'client_secret_basic' + credentials = '{}:{}'.format(self._client_id, self._client_secret) + basic_auth = 'Basic {}'.format(base64.urlsafe_b64encode(credentials.encode('utf-8')).decode('utf-8')) + return {'Authorization': basic_auth} + + +class PyoidcFacade(object): + """ + Wrapper around pyoidc library, coupled with config for a simplified API for flask-pyoidc. + """ + + def __init__(self, provider_configuration, redirect_uri): + """ + Args: + provider_configuration (flask_pyoidc.provider_configuration.ProviderConfiguration) + """ + self._provider_configuration = provider_configuration + self._client = Client(client_authn_method=CLIENT_AUTHN_METHOD) + + provider_metadata = provider_configuration.ensure_provider_metadata() + self._client.handle_provider_config(ProviderConfigurationResponse(**provider_metadata.to_dict()), + provider_metadata['issuer']) + + if self._provider_configuration.registered_client_metadata: + client_metadata = self._provider_configuration.registered_client_metadata.to_dict() + registration_response = RegistrationResponse(**client_metadata) + self._client.store_registration_info(registration_response) + + self._redirect_uri = redirect_uri + + def is_registered(self): + return bool(self._provider_configuration.registered_client_metadata) + + def register(self, extra_registration_params=None): + client_metadata = self._provider_configuration.register_client([self._redirect_uri], extra_registration_params) + logger.debug('client registration response: %s', client_metadata) + self._client.store_registration_info(RegistrationResponse(**client_metadata.to_dict())) + + def authentication_request(self, state, nonce, extra_auth_params): + """ + + :param state: + :param nonce: + :param extra_auth_params: + Returns: + str: Authentication request as a URL to redirect the user to the provider. + """ + args = { + 'client_id': self._client.client_id, + 'response_type': 'code', + 'scope': ['openid'], + 'redirect_uri': self._redirect_uri, + 'state': state, + 'nonce': nonce, + } + + args.update(self._provider_configuration.auth_request_params) + args.update(extra_auth_params) + auth_request = self._client.construct_AuthorizationRequest(request_args=args) + logger.debug('sending authentication request: %s', auth_request.to_json()) + + return auth_request.request(self._client.authorization_endpoint) + + def parse_authentication_response(self, query_string): + return self._client.parse_response(AuthorizationResponse, info=query_string, sformat='urlencoded') + + def token_request(self, authorization_code): + """ + Makes a token request. If the 'token_endpoint' is not configured in the provider metadata, no request will + be made. + + Args: + authorization_code (str): authorization code issued to client after user authorization + + Returns: + Union[AccessTokenResponse, TokenErrorResponse, None]: The parsed token response, or None if no token + request was performed. + """ + if not self._client.token_endpoint: + return None + + request = { + 'grant_type': 'authorization_code', + 'code': authorization_code, + 'redirect_uri': self._redirect_uri + } + + logger.debug('making token request: %s', request) + client_auth_method = self._client.registration_response.get('token_endpoint_auth_method', 'client_secret_basic') + auth_header = _ClientAuthentication(self._client.client_id, self._client.client_secret)(client_auth_method, + request) + resp = self._provider_configuration.requests_session \ + .post(self._client.token_endpoint, + data=request, + headers=auth_header) \ + .json() + + if 'error' in resp: + token_resp = TokenErrorResponse(**resp) + else: + token_resp = AccessTokenResponse(**resp) + token_resp.verify(keyjar=self._client.keyjar) + + logger.debug('received token response: %s', token_resp.to_json()) + return token_resp + + def userinfo_request(self, access_token): + """ + Args: + access_token (str): Bearer access token to use when fetching userinfo + + Returns: + oic.oic.message.OpenIDSchema: UserInfo Response + """ + http_method = self._provider_configuration.userinfo_endpoint_method + if http_method is None or not self._client.userinfo_endpoint: + return None + + logger.debug('making userinfo request') + userinfo_response = self._client.do_user_info_request(method=http_method, token=access_token) + logger.debug('received userinfo response: %s', userinfo_response.to_json()) + + return userinfo_response + + @property + def session_refresh_interval_seconds(self): + return self._provider_configuration.session_refresh_interval_seconds + + @property + def provider_end_session_endpoint(self): + provider_metadata = self._provider_configuration.ensure_provider_metadata() + return provider_metadata.get('end_session_endpoint') diff --git a/src/flask_pyoidc/user_session.py b/src/flask_pyoidc/user_session.py new file mode 100644 index 0000000..706b844 --- /dev/null +++ b/src/flask_pyoidc/user_session.py @@ -0,0 +1,69 @@ +import time + + +class UserSession(object): + """Session object for user login state. + + Wraps comparison of times necessary for session handling. + """ + + KEYS = ['access_token', 'id_token', 'id_token_jwt', 'last_authenticated', 'userinfo'] + + def __init__(self, session_storage): + self._session_storage = session_storage + + def is_authenticated(self): + """ + flask_session is empty when the session hasn't been initialised or has expired. + Thus checking for existence of any item is enough to determine if we're authenticated. + """ + + return self._session_storage.get('last_authenticated') is not None + + def should_refresh(self, refresh_interval_seconds=None): + return refresh_interval_seconds is not None and \ + self._refresh_time(refresh_interval_seconds) < time.time() + + def _refresh_time(self, refresh_interval_seconds): + last = self._session_storage.get('last_authenticated', 0) + return last + refresh_interval_seconds + + def update(self, last_authenticated, access_token=None, id_token=None, id_token_jwt=None, userinfo=None): + """ + Args: + last_authenticated (float) + access_token (str) + id_token (Mapping[str, str]) + id_token_jwt (str) + userinfo (Mapping[str, str]) + """ + + def set_if_defined(session_key, value): + if value: + self._session_storage[session_key] = value + + self._session_storage['last_authenticated'] = last_authenticated + set_if_defined('access_token', access_token) + set_if_defined('id_token', id_token) + set_if_defined('id_token_jwt', id_token_jwt) + set_if_defined('userinfo', userinfo) + + def clear(self): + for key in self.KEYS: + self._session_storage.pop(key, None) + + @property + def access_token(self): + return self._session_storage['access_token'] + + @property + def id_token(self): + return self._session_storage['id_token'] + + @property + def id_token_jwt(self): + return self._session_storage['id_token_jwt'] + + @property + def userinfo(self): + return self._session_storage['userinfo'] diff --git a/tests/requirements.txt b/tests/requirements.txt index 7e93328..69c0d35 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -1,5 +1,4 @@ pytest mock -responses==0.5.1 +responses six -oic diff --git a/tests/test_flask_pyoidc.py b/tests/test_flask_pyoidc.py index 80161c8..75c3917 100644 --- a/tests/test_flask_pyoidc.py +++ b/tests/test_flask_pyoidc.py @@ -1,321 +1,312 @@ -import json import time -from datetime import datetime import flask +import json import pytest import responses +from datetime import datetime from flask import Flask -from mock import MagicMock, patch, Mock -from oic.oic.message import IdToken, OpenIDSchema, AccessTokenResponse +from mock import MagicMock, patch +from oic.oic.message import IdToken from six.moves.urllib.parse import parse_qsl, urlparse, urlencode from flask_pyoidc.flask_pyoidc import OIDCAuthentication -from flask_pyoidc.flask_pyoidc import _Session +from flask_pyoidc.provider_configuration import ProviderConfiguration, ProviderMetadata, ClientMetadata, \ + ClientRegistrationInfo +from flask_pyoidc.user_session import UserSession -ISSUER = 'https://op.example.com' +class TestOIDCAuthentication(object): + PROVIDER_BASEURL = 'https://op.example.com' + CLIENT_ID = 'client1' + CLIENT_DOMAIN = 'client.example.com' + CALLBACK_RETURN_VALUE = 'callback called successfully' -class Test_Session(object): - def test_unauthenticated_session(self): - session = _Session(flask_session={}, session_refresh_interval_seconds=None) - assert not session.is_authenticated() + @pytest.fixture(autouse=True) + def create_flask_app(self): + self.app = Flask(__name__) + self.app.config.update({'SERVER_NAME': self.CLIENT_DOMAIN, 'SECRET_KEY': 'test_key'}) + + def get_authn_instance(self, provider_metadata_extras=None, client_metadata_extras=None, **kwargs): + required_provider_metadata = { + 'issuer': self.PROVIDER_BASEURL, + 'authorization_endpoint': self.PROVIDER_BASEURL + '/auth', + 'jwks_uri': self.PROVIDER_BASEURL + '/jwks' + } + if provider_metadata_extras: + required_provider_metadata.update(provider_metadata_extras) + provider_metadata = ProviderMetadata(**required_provider_metadata) + + required_client_metadata = { + 'client_id': self.CLIENT_ID, + 'client_secret': 'secret1' + } + if client_metadata_extras: + required_client_metadata.update(client_metadata_extras) + client_metadata = ClientMetadata(**required_client_metadata) + + authn = OIDCAuthentication(ProviderConfiguration(provider_metadata=provider_metadata, + client_metadata=client_metadata, + **kwargs)) + authn.init_app(self.app) + return authn - def test_authenticated_session(self): - session = _Session(flask_session={'last_authenticated': 1234}, session_refresh_interval_seconds=None) - assert session.is_authenticated() + def get_view_mock(self): + mock = MagicMock() + mock.__name__ = 'test_callback' # required for Python 2 + mock.return_value = self.CALLBACK_RETURN_VALUE + return mock - def test_should_not_refresh_if_not_supported(self): - session = _Session(flask_session={}, session_refresh_interval_seconds=None) - assert not session.should_refresh() + def assert_auth_redirect(self, auth_redirect): + assert auth_redirect.status_code == 302 + assert auth_redirect.location.startswith(self.PROVIDER_BASEURL) - def test_should_not_refresh_if_authenticated_within_refresh_interval(self): - session = _Session(flask_session={'last_authenticated': time.time() + 5}, session_refresh_interval_seconds=10) - assert not session.should_refresh() + def assert_view_mock(self, callback_mock, result): + assert callback_mock.called + assert result == self.CALLBACK_RETURN_VALUE - def test_should_refresh_if_supported_and_necessary(self): - session = _Session(flask_session={'last_authenticated': time.time() - 11}, # authenticated too far in the past - session_refresh_interval_seconds=10) - assert session.should_refresh() + def test_should_authenticate_if_no_session(self): + authn = self.get_authn_instance() + view_mock = self.get_view_mock() + with self.app.test_request_context('/'): + auth_redirect = authn.oidc_auth(view_mock)() + self.assert_auth_redirect(auth_redirect) + assert not view_mock.called -class TestOIDCAuthentication(object): - mock_time = Mock() - mock_time_int = Mock() - mock_time.return_value = time.mktime(datetime(2017, 1, 1).timetuple()) - mock_time_int.return_value = int(time.mktime(datetime(2017, 1, 1).timetuple())) + def test_should_not_authenticate_if_session_exists(self): + authn = self.get_authn_instance() + view_mock = self.get_view_mock() + with self.app.test_request_context('/'): + UserSession(flask.session).update(time.time()) + result = authn.oidc_auth(view_mock)() + self.assert_view_mock(view_mock, result) - @pytest.fixture(autouse=True) - def create_flask_app(self): - self.app = Flask(__name__) - self.app.config.update({'SERVER_NAME': 'localhost', 'SECRET_KEY': 'test_key'}) + def test_reauthenticate_silent_if_session_expired(self): + authn = self.get_authn_instance(session_refresh_interval_seconds=1) + view_mock = self.get_view_mock() + with self.app.test_request_context('/'): + UserSession(flask.session).update(time.time() - 1) # authenticated in the past + auth_redirect = authn.oidc_auth(view_mock)() + + self.assert_auth_redirect(auth_redirect) + assert 'prompt=none' in auth_redirect.location # ensure silent auth is used + assert not view_mock.called - def get_instance(self, kwargs): - authn = OIDCAuthentication(**kwargs) + def test_dont_reauthenticate_silent_if_session_not_expired(self): + authn = self.get_authn_instance(session_refresh_interval_seconds=999) + view_mock = self.get_view_mock() + with self.app.test_request_context('/'): + UserSession(flask.session).update(time.time()) # freshly authenticated + result = authn.oidc_auth(view_mock)() + self.assert_view_mock(view_mock, result) + + @responses.activate + def test_should_register_client_if_not_registered_before(self): + registration_endpoint = self.PROVIDER_BASEURL + '/register' + provider_metadata = ProviderMetadata(self.PROVIDER_BASEURL, + self.PROVIDER_BASEURL + '/auth', + self.PROVIDER_BASEURL + '/jwks', + registration_endpoint=registration_endpoint) + authn = OIDCAuthentication(ProviderConfiguration(provider_metadata=provider_metadata, + client_registration_info=ClientRegistrationInfo())) authn.init_app(self.app) - return authn + # register logout view to force 'post_logout_redirect_uris' to be included in registration request + logout_view_mock = self.get_view_mock() + self.app.add_url_rule('/logout', view_func=logout_view_mock) + authn.oidc_logout(logout_view_mock) + + responses.add(responses.POST, registration_endpoint, json={'client_id': 'client1', 'client_secret': 'secret1'}) + view_mock = self.get_view_mock() + with self.app.test_request_context('/'): + auth_redirect = authn.oidc_auth(view_mock)() + + self.assert_auth_redirect(auth_redirect) + registration_request = json.loads(responses.calls[0].request.body.decode('utf-8')) + expected_registration_request = { + 'redirect_uris': ['http://{}/redirect_uri'.format(self.CLIENT_DOMAIN)], + 'post_logout_redirect_uris': ['http://{}/logout'.format(self.CLIENT_DOMAIN)] + } + assert registration_request == expected_registration_request + + @patch('time.time') + @patch('oic.utils.time_util.utc_time_sans_frac') # used internally by pyoidc when verifying ID Token @responses.activate - def test_store_internal_redirect_uri_on_static_client_reg(self): - responses.add(responses.GET, ISSUER + '/.well-known/openid-configuration', - body=json.dumps(dict(issuer=ISSUER, token_endpoint=ISSUER + '/token')), - content_type='application/json') - - authn = self.get_instance(dict(issuer=ISSUER, - client_registration_info=dict(client_id='abc', client_secret='foo'))) - assert len(authn.client.registration_response['redirect_uris']) == 1 - assert authn.client.registration_response['redirect_uris'][0] == 'http://localhost/redirect_uri' - - @pytest.mark.parametrize('method', [ - 'GET', - 'POST' - ]) - def test_configurable_userinfo_endpoint_method_is_used(self, method): - state = 'state' - nonce = 'nonce' - sub = 'foobar' - authn = self.get_instance(dict(provider_configuration_info={'issuer': ISSUER, 'token_endpoint': '/token'}, - client_registration_info={'client_id': 'foo'}, - userinfo_endpoint_method=method)) - authn.client.do_access_token_request = MagicMock( - return_value=AccessTokenResponse(**{'id_token': IdToken(**{'sub': sub, 'nonce': nonce}), - 'access_token': 'access_token'}) - ) - userinfo_request_mock = MagicMock(return_value=OpenIDSchema(**{'sub': sub})) - authn.client.do_user_info_request = userinfo_request_mock - with self.app.test_request_context('/redirect_uri?code=foo&state=' + state): + def test_handle_authentication_response(self, time_mock, utc_time_sans_frac_mock): + # freeze time since ID Token validation includes expiration timestamps + timestamp = time.mktime(datetime(2017, 1, 1).timetuple()) + time_mock.return_value = timestamp + utc_time_sans_frac_mock.return_value = int(timestamp) + + # mock token response + user_id = 'user1' + exp_time = 10 + nonce = 'test_nonce' + id_token = IdToken(iss=self.PROVIDER_BASEURL, + aud=self.CLIENT_ID, + sub=user_id, + exp=int(timestamp) + exp_time, + iat=int(timestamp), + nonce=nonce) + access_token = 'test_access_token' + token_response = {'access_token': access_token, 'token_type': 'Bearer', 'id_token': id_token.to_jwt()} + token_endpoint = self.PROVIDER_BASEURL + '/token' + responses.add(responses.POST, token_endpoint, json=token_response) + + # mock userinfo response + userinfo = {'sub': user_id, 'name': 'Test User'} + userinfo_endpoint = self.PROVIDER_BASEURL + '/userinfo' + responses.add(responses.GET, userinfo_endpoint, json=userinfo) + + authn = self.get_authn_instance(provider_metadata_extras={'token_endpoint': token_endpoint, + 'userinfo_endpoint': userinfo_endpoint}) + state = 'test_state' + with self.app.test_request_context('/redirect_uri?state={}&code=test'.format(state)): + flask.session['destination'] = '/' flask.session['state'] = state flask.session['nonce'] = nonce - flask.session['destination'] = '/' authn._handle_authentication_response() - userinfo_request_mock.assert_called_with(method=method, state=state) - - def test_no_userinfo_request_is_done_if_no_userinfo_endpoint_method_is_specified(self): - state = 'state' - authn = self.get_instance(dict(provider_configuration_info={'issuer': ISSUER}, - client_registration_info={'client_id': 'foo'}, - userinfo_endpoint_method=None)) - userinfo_request_mock = MagicMock() - authn.client.do_user_info_request = userinfo_request_mock - authn._do_userinfo_request(state, None) - assert not userinfo_request_mock.called - - def test_authenticatate_with_extra_request_parameters(self): - extra_params = {"foo": "bar", "abc": "xyz"} - authn = self.get_instance(dict(provider_configuration_info={'issuer': ISSUER}, - client_registration_info={'client_id': 'foo'}, - extra_request_args=extra_params)) - - with self.app.test_request_context('/'): - a = authn._authenticate() - request_params = dict(parse_qsl(urlparse(a.location).query)) - assert set(extra_params.items()).issubset(set(request_params.items())) - - def test_reauthenticate_if_no_session(self): - authn = self.get_instance(dict(provider_configuration_info={'issuer': ISSUER}, - client_registration_info={'client_id': 'foo'})) - client_mock = MagicMock() - callback_mock = MagicMock() - callback_mock.__name__ = 'test_callback' # required for Python 2 - authn.client = client_mock - with self.app.test_request_context('/'): - authn.oidc_auth(callback_mock)() - assert client_mock.construct_AuthorizationRequest.called - assert not callback_mock.called - - def test_reauthenticate_silent_if_refresh_expired(self): - authn = self.get_instance(dict(provider_configuration_info={'issuer': ISSUER}, - client_registration_info={'client_id': 'foo', - 'session_refresh_interval_seconds': 1})) - client_mock = MagicMock() - callback_mock = MagicMock() - callback_mock.__name__ = 'test_callback' # required for Python 2 - authn.client = client_mock - with self.app.test_request_context('/'): - flask.session['last_authenticated'] = time.time() - 1 # authenticated in the past - authn.oidc_auth(callback_mock)() - assert client_mock.construct_AuthorizationRequest.called - assert client_mock.construct_AuthorizationRequest.call_args[1]['request_args']['prompt'] == 'none' - assert not callback_mock.called - - def test_dont_reauthenticate_silent_if_authentication_not_expired(self): - authn = self.get_instance(dict(provider_configuration_info={'issuer': ISSUER}, - client_registration_info={'client_id': 'foo', - 'session_refresh_interval_seconds': 999})) - client_mock = MagicMock() - callback_mock = MagicMock() - callback_mock.__name__ = 'test_callback' # required for Python 2 - authn.client = client_mock - with self.app.test_request_context('/'): - flask.session['last_authenticated'] = time.time() # freshly authenticated - authn.oidc_auth(callback_mock)() - assert not client_mock.construct_AuthorizationRequest.called - assert callback_mock.called - - @patch('time.time', mock_time) - @patch('oic.utils.time_util.utc_time_sans_frac', mock_time_int) + session = UserSession(flask.session) + assert session.access_token == access_token + assert session.id_token == id_token.to_dict() + assert IdToken().from_jwt(session.id_token_jwt) == id_token + assert session.userinfo == userinfo + + @patch('time.time') + @patch('oic.utils.time_util.utc_time_sans_frac') # used internally by pyoidc when verifying ID Token @responses.activate - def test_session_expiration_set_to_id_token_exp(self): - token_endpoint = ISSUER + '/token' - userinfo_endpoint = ISSUER + '/userinfo' + def test_session_expiration_set_to_id_token_exp(self, time_mock, utc_time_sans_frac_mock): + timestamp = time.mktime(datetime(2017, 1, 1).timetuple()) + time_mock.return_value = timestamp + utc_time_sans_frac_mock.return_value = int(timestamp) + exp_time = 10 - epoch_int = int(time.mktime(datetime(2017, 1, 1).timetuple())) - id_token = IdToken(**{'sub': 'sub1', 'iat': epoch_int, 'iss': ISSUER, 'aud': 'foo', 'nonce': 'test', - 'exp': epoch_int + exp_time}) + state = 'test_state' + nonce = 'test_nonce' + id_token = IdToken(iss=self.PROVIDER_BASEURL, + aud=self.CLIENT_ID, + sub='sub1', + exp=int(timestamp) + exp_time, + iat=int(timestamp), + nonce=nonce) token_response = {'access_token': 'test', 'token_type': 'Bearer', 'id_token': id_token.to_jwt()} - userinfo_response = {'sub': 'sub1'} - responses.add(responses.POST, token_endpoint, - body=json.dumps(token_response), - content_type='application/json') - responses.add(responses.POST, userinfo_endpoint, - body=json.dumps(userinfo_response), - content_type='application/json') - authn = self.get_instance(dict( - provider_configuration_info={'issuer': ISSUER, - 'token_endpoint': token_endpoint, - 'userinfo_endpoint': userinfo_endpoint}, - client_registration_info={'client_id': 'foo', 'client_secret': 'foo'})) - - self.app.config.update({'SESSION_PERMANENT': True}) - with self.app.test_request_context('/redirect_uri?state=test&code=test'): + token_endpoint = self.PROVIDER_BASEURL + '/token' + responses.add(responses.POST, token_endpoint, json=token_response) + + authn = self.get_authn_instance(provider_metadata_extras={'token_endpoint': token_endpoint}) + with self.app.test_request_context('/redirect_uri?state={}&code=test'.format(state)): flask.session['destination'] = '/' - flask.session['state'] = 'test' - flask.session['nonce'] = 'test' + flask.session['state'] = state + flask.session['nonce'] = nonce authn._handle_authentication_response() assert flask.session.permanent assert int(flask.session.permanent_session_lifetime) == exp_time - def test_logout(self): + def test_logout_redirects_to_provider_if_end_session_endpoint_is_configured(self): end_session_endpoint = 'https://provider.example.com/end_session' - post_logout_uri = 'https://client.example.com/post_logout' - authn = self.get_instance(dict( - provider_configuration_info={'issuer': ISSUER, - 'end_session_endpoint': end_session_endpoint}, - client_registration_info={'client_id': 'foo', - 'post_logout_redirect_uris': [post_logout_uri]})) + authn = self.get_authn_instance(provider_metadata_extras={'end_session_endpoint': end_session_endpoint}) + logout_view_mock = self.get_view_mock() id_token = IdToken(**{'sub': 'sub1', 'nonce': 'nonce'}) - with self.app.test_request_context('/logout'): - flask.session['access_token'] = 'abcde' - flask.session['userinfo'] = {'foo': 'bar', 'abc': 'xyz'} - flask.session['id_token'] = id_token.to_dict() - flask.session['id_token_jwt'] = id_token.to_jwt() - end_session_redirect = authn._logout() - assert all(k not in flask.session for k in ['access_token', 'userinfo', 'id_token', 'id_token_jwt']) + # register logout view + self.app.add_url_rule('/logout', view_func=authn.oidc_logout(logout_view_mock)) - assert end_session_redirect.status_code == 303 - assert end_session_redirect.headers['Location'].startswith(end_session_endpoint) + with self.app.test_request_context('/logout'): + UserSession(flask.session).update(time.time(), + 'test_access_token', + id_token.to_dict(), + id_token.to_jwt(), + {'sub': 'user1'}) + + end_session_redirect = authn.oidc_logout(logout_view_mock)() + # ensure user session has been cleared + assert all(k not in flask.session for k in UserSession.KEYS) parsed_request = dict(parse_qsl(urlparse(end_session_redirect.headers['Location']).query)) assert parsed_request['state'] == flask.session['end_session_state'] - assert parsed_request['id_token_hint'] == id_token.to_jwt() - assert parsed_request['post_logout_redirect_uri'] == post_logout_uri + + assert end_session_redirect.status_code == 303 + assert end_session_redirect.location.startswith(end_session_endpoint) + assert IdToken().from_jwt(parsed_request['id_token_hint']) == id_token + assert parsed_request['post_logout_redirect_uri'] == 'http://{}/logout'.format(self.CLIENT_DOMAIN) + assert not logout_view_mock.called def test_logout_handles_provider_without_end_session_endpoint(self): - post_logout_uri = 'https://client.example.com/post_logout' - authn = self.get_instance(dict( - provider_configuration_info={'issuer': ISSUER}, - client_registration_info={'client_id': 'foo', - 'post_logout_redirect_uris': [post_logout_uri]})) + authn = self.get_authn_instance() id_token = IdToken(**{'sub': 'sub1', 'nonce': 'nonce'}) + logout_view_mock = self.get_view_mock() with self.app.test_request_context('/logout'): - flask.session['access_token'] = 'abcde' - flask.session['userinfo'] = {'foo': 'bar', 'abc': 'xyz'} - flask.session['id_token'] = id_token.to_dict() - flask.session['id_token_jwt'] = id_token.to_jwt() + UserSession(flask.session).update(time.time(), + 'test_access_token', + id_token.to_dict(), + id_token.to_jwt(), + {'sub': 'user1'}) - end_session_redirect = authn._logout() - assert all(k not in flask.session for k in ['access_token', 'userinfo', 'id_token', 'id_token_jwt']) - assert end_session_redirect is None + logout_result = authn.oidc_logout(logout_view_mock)() + assert all(k not in flask.session for k in UserSession.KEYS) - def test_oidc_logout_redirects_to_provider(self): - end_session_endpoint = 'https://provider.example.com/end_session' - post_logout_uri = 'https://client.example.com/post_logout' - authn = self.get_instance(dict( - provider_configuration_info={'issuer': ISSUER, - 'end_session_endpoint': end_session_endpoint}, - client_registration_info={'client_id': 'foo', - 'post_logout_redirect_uris': [post_logout_uri]})) - callback_mock = MagicMock() - callback_mock.__name__ = 'test_callback' # required for Python 2 - id_token = IdToken(**{'sub': 'sub1', 'nonce': 'nonce'}) - with self.app.test_request_context('/logout'): - flask.session['id_token_jwt'] = id_token.to_jwt() - resp = authn.oidc_logout(callback_mock)() - assert resp.status_code == 303 - assert not callback_mock.called + self.assert_view_mock(logout_view_mock, logout_result) - def test_oidc_logout_handles_redirects_from_provider(self): - end_session_endpoint = 'https://provider.example.com/end_session' - post_logout_uri = 'https://client.example.com/post_logout' - authn = self.get_instance(dict( - provider_configuration_info={'issuer': ISSUER, - 'end_session_endpoint': end_session_endpoint}, - client_registration_info={'client_id': 'foo', - 'post_logout_redirect_uris': [post_logout_uri]})) - callback_mock = MagicMock() - callback_mock.__name__ = 'test_callback' # required for Python 2 + def test_logout_handles_redirect_back_from_provider(self): + authn = self.get_authn_instance() + logout_view_mock = self.get_view_mock() state = 'end_session_123' - with self.app.test_request_context('/logout?state=' + state): + with self.app.test_request_context('/logout?state={}'.format(state)): flask.session['end_session_state'] = state - authn.oidc_logout(callback_mock)() + result = authn.oidc_logout(logout_view_mock)() assert 'end_session_state' not in flask.session - assert callback_mock.called - def test_authentication_error_reponse_calls_to_error_view_if_set(self): + self.assert_view_mock(logout_view_mock, result) + + def test_authentication_error_response_calls_to_error_view_if_set(self): state = 'test_tate' error_response = {'error': 'invalid_request', 'error_description': 'test error'} - authn = self.get_instance(dict(provider_configuration_info={'issuer': ISSUER}, - client_registration_info=dict(client_id='abc', client_secret='foo'))) - error_view_mock = MagicMock() - authn._error_view = error_view_mock - with self.app.test_request_context('/redirect_uri?{error}&state={state}'.format( - error=urlencode(error_response), state=state)): + authn = self.get_authn_instance() + error_view_mock = self.get_view_mock() + authn.error_view(error_view_mock) + with self.app.test_request_context('/redirect_uri?{error}&state={state}'.format(error=urlencode(error_response), + state=state)): flask.session['state'] = state - authn._handle_authentication_response() + result = authn._handle_authentication_response() + + self.assert_view_mock(error_view_mock, result) error_view_mock.assert_called_with(**error_response) - def test_authentication_error_reponse_returns_default_error_if_no_error_view_set(self): + def test_authentication_error_response_returns_default_error_if_no_error_view_set(self): state = 'test_tate' - error_response = {'error': 'invalid_request', 'error_description': 'test error'} - authn = self.get_instance(dict(provider_configuration_info={'issuer': ISSUER}, - client_registration_info=dict(client_id='abc', client_secret='foo'))) - with self.app.test_request_context('/redirect_uri?{error}&state={state}'.format( - error=urlencode(error_response), state=state)): + error_response = {'error': 'invalid_request', 'error_description': 'test error', 'state': state} + authn = self.get_authn_instance(dict(provider_configuration_info={'issuer': self.PROVIDER_BASEURL}, + client_registration_info=dict(client_id='abc', client_secret='foo'))) + with self.app.test_request_context('/redirect_uri?{}'.format(urlencode(error_response))): flask.session['state'] = state response = authn._handle_authentication_response() assert response == "Something went wrong with the authentication, please try to login again." @responses.activate - def test_token_error_reponse_calls_to_error_view_if_set(self): - token_endpoint = ISSUER + '/token' + def test_token_error_response_calls_to_error_view_if_set(self): + token_endpoint = self.PROVIDER_BASEURL + '/token' error_response = {'error': 'invalid_request', 'error_description': 'test error'} - responses.add(responses.POST, token_endpoint, - body=json.dumps(error_response), - content_type='application/json') - - authn = self.get_instance(dict( - provider_configuration_info={'issuer': ISSUER, 'token_endpoint': token_endpoint}, - client_registration_info=dict(client_id='abc', client_secret='foo'))) - error_view_mock = MagicMock() - authn._error_view = error_view_mock + responses.add(responses.POST, token_endpoint, json=error_response) + + authn = self.get_authn_instance(provider_metadata_extras={'token_endpoint': token_endpoint}) + error_view_mock = self.get_view_mock() + authn.error_view(error_view_mock) state = 'test_tate' - with self.app.test_request_context('/redirect_uri?code=foo&state=' + state): + with self.app.test_request_context('/redirect_uri?code=foo&state={}'.format(state)): flask.session['state'] = state - authn._handle_authentication_response() + result = authn._handle_authentication_response() + + self.assert_view_mock(error_view_mock, result) error_view_mock.assert_called_with(**error_response) @responses.activate - def test_token_error_reponse_returns_default_error_if_no_error_view_set(self): - token_endpoint = ISSUER + '/token' - error_response = {'error': 'invalid_request', 'error_description': 'test error'} - responses.add(responses.POST, token_endpoint, - body=json.dumps(error_response), - content_type='application/json') - - authn = self.get_instance(dict(provider_configuration_info={'issuer': ISSUER, - 'token_endpoint': token_endpoint}, - client_registration_info=dict(client_id='abc', client_secret='foo'))) + def test_token_error_response_returns_default_error_if_no_error_view_set(self): + token_endpoint = self.PROVIDER_BASEURL + '/token' state = 'test_tate' + error_response = {'error': 'invalid_request', 'error_description': 'test error', 'state': state} + responses.add(responses.POST, token_endpoint, json=error_response) + + authn = self.get_authn_instance(provider_metadata_extras={'token_endpoint': token_endpoint}) with self.app.test_request_context('/redirect_uri?code=foo&state=' + state): flask.session['state'] = state response = authn._handle_authentication_response() diff --git a/tests/test_provider_configuration.py b/tests/test_provider_configuration.py new file mode 100644 index 0000000..2f39e67 --- /dev/null +++ b/tests/test_provider_configuration.py @@ -0,0 +1,104 @@ +import json +import pytest +import responses + +from flask_pyoidc.provider_configuration import ProviderConfiguration, ClientRegistrationInfo, ProviderMetadata, \ + ClientMetadata, OIDCData + + +class TestProviderConfiguration(object): + PROVIDER_BASEURL = 'https://op.example.com' + + def provider_metadata(self, **kwargs): + return ProviderMetadata(issuer='', authorization_endpoint='', jwks_uri='', **kwargs) + + def test_missing_provider_metadata_raises_exception(self): + with pytest.raises(ValueError) as exc_info: + ProviderConfiguration(client_registration_info=ClientRegistrationInfo()) + + exc_message = str(exc_info.value) + assert 'issuer' in exc_message + assert 'provider_metadata' in exc_message + + def test_missing_client_metadata_raises_exception(self): + with pytest.raises(ValueError) as exc_info: + ProviderConfiguration(issuer=self.PROVIDER_BASEURL) + + exc_message = str(exc_info.value) + assert 'client_registration_info' in exc_message + assert 'client_metadata' in exc_message + + @responses.activate + def test_should_fetch_provider_metadata_if_not_given(self): + provider_metadata = { + 'issuer': self.PROVIDER_BASEURL, + 'authorization_endpoint': self.PROVIDER_BASEURL + '/auth', + 'jwks_uri': self.PROVIDER_BASEURL + '/jwks' + } + responses.add(responses.GET, + self.PROVIDER_BASEURL + '/.well-known/openid-configuration', + json=provider_metadata) + + provider_config = ProviderConfiguration(issuer=self.PROVIDER_BASEURL, + client_registration_info=ClientRegistrationInfo()) + provider_config.ensure_provider_metadata() + assert provider_config._provider_metadata['issuer'] == self.PROVIDER_BASEURL + assert provider_config._provider_metadata['authorization_endpoint'] == self.PROVIDER_BASEURL + '/auth' + assert provider_config._provider_metadata['jwks_uri'] == self.PROVIDER_BASEURL + '/jwks' + + def test_should_not_fetch_provider_metadata_if_given(self): + provider_metadata = self.provider_metadata() + provider_config = ProviderConfiguration(provider_metadata=provider_metadata, + client_registration_info=ClientRegistrationInfo()) + + provider_config.ensure_provider_metadata() + assert provider_config._provider_metadata == provider_metadata + + @responses.activate + def test_should_register_dynamic_client_if_client_registration_info_is_given(self): + registration_endpoint = self.PROVIDER_BASEURL + '/register' + responses.add(responses.POST, registration_endpoint, json={'client_id': 'client1', 'client_secret': 'secret1'}) + + provider_config = ProviderConfiguration( + provider_metadata=self.provider_metadata(registration_endpoint=registration_endpoint), + client_registration_info=ClientRegistrationInfo()) + + extra_args = {'extra_args': 'should be passed'} + redirect_uris = ['https://client.example.com/redirect'] + provider_config.register_client(redirect_uris, extra_args) + assert provider_config._client_metadata['client_id'] == 'client1' + assert provider_config._client_metadata['client_secret'] == 'secret1' + assert provider_config._client_metadata['redirect_uris'] == redirect_uris + + expected_registration_request = {'redirect_uris': redirect_uris} + expected_registration_request.update(extra_args) + assert json.loads(responses.calls[0].request.body.decode('utf-8')) == expected_registration_request + + def test_should_not_register_dynamic_client_if_client_metadata_is_given(self): + client_metadata = ClientMetadata(client_id='client1', + client_secret='secret1', + redirect_uris=['https://client.example.com/redirect']) + provider_config = ProviderConfiguration(provider_metadata=self.provider_metadata(), + client_metadata=client_metadata) + provider_config.register_client([]) + assert provider_config._client_metadata == client_metadata + + def test_should_raise_exception_for_non_registered_client_when_missing_registration_endpoint(self): + provider_config = ProviderConfiguration(provider_metadata=self.provider_metadata(), + client_registration_info=ClientRegistrationInfo()) + with pytest.raises(ValueError) as exc_info: + provider_config.register_client([]) + assert 'registration_endpoint' in str(exc_info.value) + + +class TestOIDCData(object): + def test_client_secret_should_not_be_in_string_representation(self): + client_secret = 'secret123456' + client_metadata = OIDCData(client_id='client1', client_secret=client_secret) + assert client_secret not in str(client_metadata) + assert client_secret in repr(client_metadata) + + def test_copy_should_overwrite_existing_value(self): + data = OIDCData(abc='xyz') + copy_data = data.copy(qwe='rty', abc='123') + assert copy_data == {'abc': '123', 'qwe': 'rty'} diff --git a/tests/test_pyoidc_facade.py b/tests/test_pyoidc_facade.py new file mode 100644 index 0000000..1961bce --- /dev/null +++ b/tests/test_pyoidc_facade.py @@ -0,0 +1,202 @@ +import time + +import base64 +import json +import pytest +import responses +from oic.oic import AuthorizationResponse, AccessTokenResponse, IdToken, TokenErrorResponse, OpenIDSchema +from six.moves.urllib.parse import parse_qsl, urlparse + +from flask_pyoidc.provider_configuration import ProviderConfiguration, ClientMetadata, ProviderMetadata, \ + ClientRegistrationInfo +from flask_pyoidc.pyoidc_facade import PyoidcFacade, _ClientAuthentication + + +class TestPyoidcFacade(object): + PROVIDER_BASEURL = 'https://op.example.com' + PROVIDER_METADATA = ProviderMetadata(PROVIDER_BASEURL, + PROVIDER_BASEURL + '/auth', + PROVIDER_BASEURL + '/jwks') + CLIENT_METADATA = ClientMetadata('client1', 'secret1') + REDIRECT_URI = 'https://rp.example.com/redirect_uri' + + def test_registered_client_metadata_is_forwarded_to_pyoidc(self): + config = ProviderConfiguration(provider_metadata=self.PROVIDER_METADATA, client_metadata=self.CLIENT_METADATA) + facade = PyoidcFacade(config, self.REDIRECT_URI) + assert facade._client.registration_response + + def test_no_registered_client_metadata_is_handled(self): + config = ProviderConfiguration(provider_metadata=self.PROVIDER_METADATA, + client_registration_info=ClientRegistrationInfo()) + facade = PyoidcFacade(config, self.REDIRECT_URI) + assert not facade._client.registration_response + + def test_is_registered(self): + unregistered = ProviderConfiguration(provider_metadata=self.PROVIDER_METADATA, + client_registration_info=ClientRegistrationInfo()) + registered = ProviderConfiguration(provider_metadata=self.PROVIDER_METADATA, + client_metadata=self.CLIENT_METADATA) + assert PyoidcFacade(unregistered, self.REDIRECT_URI).is_registered() is False + assert PyoidcFacade(registered, self.REDIRECT_URI).is_registered() is True + + @responses.activate + def test_register(self): + registration_endpoint = self.PROVIDER_BASEURL + '/register' + responses.add(responses.POST, registration_endpoint, json=self.CLIENT_METADATA.to_dict()) + + provider_metadata = self.PROVIDER_METADATA.copy(registration_endpoint=registration_endpoint) + unregistered = ProviderConfiguration(provider_metadata=provider_metadata, + client_registration_info=ClientRegistrationInfo()) + facade = PyoidcFacade(unregistered, self.REDIRECT_URI) + facade.register() + assert facade.is_registered() is True + + def test_authentication_request(self): + extra_user_auth_params = {'foo': 'bar', 'abc': 'xyz'} + config = ProviderConfiguration(provider_metadata=self.PROVIDER_METADATA, + client_metadata=self.CLIENT_METADATA, + auth_request_params=extra_user_auth_params) + + state = 'test_state' + nonce = 'test_nonce' + + facade = PyoidcFacade(config, self.REDIRECT_URI) + extra_lib_auth_params = {'foo': 'baz', 'qwe': 'rty'} + auth_request = facade.authentication_request(state, nonce, extra_lib_auth_params) + assert auth_request.startswith(self.PROVIDER_METADATA['authorization_endpoint']) + + auth_request_params = dict(parse_qsl(urlparse(auth_request).query)) + expected_auth_params = { + 'scope': 'openid', + 'response_type': 'code', + 'client_id': self.CLIENT_METADATA['client_id'], + 'redirect_uri': self.REDIRECT_URI, + 'state': state, + 'nonce': nonce + } + expected_auth_params.update(extra_user_auth_params) + expected_auth_params.update(extra_lib_auth_params) + assert auth_request_params == expected_auth_params + + def test_parse_authentication_response(self): + facade = PyoidcFacade(ProviderConfiguration(provider_metadata=self.PROVIDER_METADATA, + client_metadata=self.CLIENT_METADATA), + self.REDIRECT_URI) + auth_code = 'auth_code-1234' + state = 'state-1234' + parsed_auth_response = facade.parse_authentication_response('code={}&state={}'.format(auth_code, state)) + assert isinstance(parsed_auth_response, AuthorizationResponse) + assert parsed_auth_response['code'] == auth_code + assert parsed_auth_response['state'] == state + + @responses.activate + def test_token_request(self): + token_endpoint = self.PROVIDER_BASEURL + '/token' + id_token = IdToken(iss=self.PROVIDER_METADATA['issuer'], + sub='test_user', + aud=self.CLIENT_METADATA['client_id'], + exp=time.time() + 1, + iat=time.time(), + nonce='test_nonce') + token_response = AccessTokenResponse(access_token='test_access_token', + token_type='Bearer', + id_token=id_token.to_jwt()) + responses.add(responses.POST, token_endpoint, json=token_response.to_dict()) + + provider_metadata = self.PROVIDER_METADATA.copy(token_endpoint=token_endpoint) + facade = PyoidcFacade(ProviderConfiguration(provider_metadata=provider_metadata, + client_metadata=self.CLIENT_METADATA), + self.REDIRECT_URI) + + auth_code = 'auth_code-1234' + token_response = facade.token_request(auth_code) + + expected_token_response = token_response.to_dict() + expected_token_response['id_token'] = id_token.to_dict() + assert token_response.to_dict() == expected_token_response + + token_request = dict(parse_qsl(responses.calls[0].request.body)) + expected_token_request = { + 'grant_type': 'authorization_code', + 'code': auth_code, + 'redirect_uri': self.REDIRECT_URI + + } + assert token_request == expected_token_request + + @responses.activate + def test_token_request_handles_error_response(self): + token_endpoint = self.PROVIDER_BASEURL + '/token' + token_response = TokenErrorResponse(error='invalid_request', error_description='test error description') + responses.add(responses.POST, token_endpoint, json=token_response.to_dict(), status=400) + + provider_metadata = self.PROVIDER_METADATA.copy(token_endpoint=token_endpoint) + facade = PyoidcFacade(ProviderConfiguration(provider_metadata=provider_metadata, + client_metadata=self.CLIENT_METADATA), + self.REDIRECT_URI) + assert facade.token_request('1234') == token_response + + def test_token_request_handles_missing_provider_token_endpoint(self): + facade = PyoidcFacade(ProviderConfiguration(provider_metadata=self.PROVIDER_METADATA, + client_metadata=self.CLIENT_METADATA), + self.REDIRECT_URI) + assert facade.token_request('1234') is None + + @pytest.mark.parametrize('userinfo_http_method', [ + 'GET', + 'POST' + ]) + @responses.activate + def test_configurable_userinfo_endpoint_method_is_used(self, userinfo_http_method): + userinfo_endpoint = self.PROVIDER_BASEURL + '/userinfo' + userinfo_response = OpenIDSchema(sub='user1') + responses.add(userinfo_http_method, userinfo_endpoint, json=userinfo_response.to_dict()) + + provider_metadata = self.PROVIDER_METADATA.copy(userinfo_endpoint=userinfo_endpoint) + facade = PyoidcFacade(ProviderConfiguration(provider_metadata=provider_metadata, + client_metadata=self.CLIENT_METADATA, + userinfo_http_method=userinfo_http_method), + self.REDIRECT_URI) + assert facade.userinfo_request('test_token') == userinfo_response + + def test_no_userinfo_request_is_made_if_no_userinfo_http_method_is_configured(self): + facade = PyoidcFacade(ProviderConfiguration(provider_metadata=self.PROVIDER_METADATA, + client_metadata=self.CLIENT_METADATA, + userinfo_http_method=None), + self.REDIRECT_URI) + assert facade.userinfo_request('test_token') is None + + def test_no_userinfo_request_is_made_if_no_userinfo_endpoint_is_configured(self): + facade = PyoidcFacade(ProviderConfiguration(provider_metadata=self.PROVIDER_METADATA, + client_metadata=self.CLIENT_METADATA), + self.REDIRECT_URI) + assert facade.userinfo_request('test_token') is None + + +class TestClientAuthentication(object): + CLIENT_ID = 'client1' + CLIENT_SECRET = 'secret1' + + @property + def basic_auth(self): + credentials = '{}:{}'.format(self.CLIENT_ID, self.CLIENT_SECRET) + return 'Basic {}'.format(base64.urlsafe_b64encode(credentials.encode('utf-8')).decode('utf-8')) + + @pytest.fixture(autouse=True) + def setup(self): + self.client_auth = _ClientAuthentication(self.CLIENT_ID, self.CLIENT_SECRET) + + def test_client_secret_basic(self): + request = {} + headers = self.client_auth('client_secret_basic', request) + assert headers == {'Authorization': self.basic_auth} + assert request == {} + + def test_client_secret_post(self): + request = {} + headers = self.client_auth('client_secret_post', request) + assert headers is None + assert request == {'client_id': self.CLIENT_ID, 'client_secret': self.CLIENT_SECRET} + + def test_defaults_to_client_secret_basic(self): + assert self.client_auth('invalid_client_auth_method', {}) == self.client_auth('client_secret_basic', {}) diff --git a/tests/test_user_session.py b/tests/test_user_session.py new file mode 100644 index 0000000..8e5c92b --- /dev/null +++ b/tests/test_user_session.py @@ -0,0 +1,59 @@ +import time + +import pytest + +from flask_pyoidc.user_session import UserSession + + +class TestUserSession(object): + def test_unauthenticated_session(self): + session = UserSession({}) + assert not session.is_authenticated() + + def test_authenticated_session(self): + session = UserSession({'last_authenticated': 1234}) + assert session.is_authenticated() + + def test_should_not_refresh_if_not_supported(self): + session = UserSession(session_storage={}) + assert not session.should_refresh() + + def test_should_not_refresh_if_authenticated_within_refresh_interval(self): + refresh_interval = 10 + session = UserSession(session_storage={'last_authenticated': time.time() + (refresh_interval - 1)}) + assert not session.should_refresh(refresh_interval) + + def test_should_refresh_if_supported_and_necessary(self): + refresh_interval = 10 + session = UserSession({'last_authenticated': time.time() - (refresh_interval + 1)}) + assert session.should_refresh(refresh_interval) + + def test_should_refresh_if_supported_and_not_previously_authenticated(self): + session = UserSession({}) + assert session.should_refresh(10) + + @pytest.mark.parametrize('data', [ + {'access_token': 'test_access_token'}, + {'id_token': {'iss': 'issuer1', 'sub': 'user1', 'aud': 'client1', 'exp': 1235, 'iat': 1234}}, + {'id_token_jwt': 'eyJh.eyJz.SflK'}, + {'userinfo': {'sub': 'user1', 'name': 'Test User'}}, + ]) + def test_update(self, data): + storage = {} + auth_time = 1234 + + UserSession(storage).update(auth_time, **data) + + expected_session_data = {'last_authenticated': auth_time} + expected_session_data.update(**data) + assert storage == expected_session_data + + def test_clear(self): + expected_data = {'initial data': 'should remain'} + session_storage = expected_data.copy() + + session = UserSession(session_storage) + session.update(time.time(), 'access_token', {'sub': 'user1'}, 'eyJh.eyJz.SflK', {'sub': 'user1}'}) + session.clear() + + assert session_storage == expected_data diff --git a/tox.ini b/tox.ini index 3b5c24d..6565ab8 100644 --- a/tox.ini +++ b/tox.ini @@ -2,7 +2,7 @@ envlist = py27,py34,py35,py36 [testenv] -commands = py.test tests/ +commands = py.test tests/ example/ deps = -rtests/requirements.txt [flake8]