-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add integration tests for oidc account linkup
- Loading branch information
Showing
3 changed files
with
110 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -373,6 +373,29 @@ | |
"realmRoles" : [ "bpa-access-role", "galaxy-access-role", "default-roles-gxyrealm" ], | ||
"notBefore" : 0, | ||
"groups" : [ ] | ||
}, { | ||
"id" : "24ffa3ff-d351-4d5e-b10b-8d615082ec9b", | ||
"createdTimestamp" : 1694376671733, | ||
"username" : "gxyuser_existing", | ||
"enabled" : true, | ||
"totp" : false, | ||
"emailVerified" : true, | ||
"firstName" : "Test", | ||
"lastName" : "GalaxyUser", | ||
"email" : "[email protected]", | ||
"credentials" : [ { | ||
"id" : "00d87268-7e21-4d08-9f92-cfc06eca5147", | ||
"type" : "password", | ||
"userLabel" : "My password", | ||
"createdDate" : 1694376754826, | ||
"secretData" : "{\"value\":\"uNBI+UnpCLpXWHhm/tPSnnhuINiNw2MNt1XeDmImJaQ=\",\"salt\":\"fHS/FpnORylnSIco16UHwA==\",\"additionalParameters\":{}}", | ||
"credentialData" : "{\"hashIterations\":27500,\"algorithm\":\"pbkdf2-sha256\",\"additionalParameters\":{}}" | ||
} ], | ||
"disableableCredentialTypes" : [ ], | ||
"requiredActions" : [ ], | ||
"realmRoles" : [ "galaxy-access-role", "default-roles-gxyrealm" ], | ||
"notBefore" : 0, | ||
"groups" : [ ] | ||
}, { | ||
"id" : "24ffa3ff-d351-4d5e-b10b-8d615082ec9d", | ||
"createdTimestamp" : 1694376671733, | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,14 +11,15 @@ | |
|
||
import requests | ||
|
||
from galaxy.model.base import transaction | ||
from galaxy_test.base.api import ApiTestInteractor | ||
from galaxy_test.driver import integration_util | ||
|
||
KEYCLOAK_ADMIN_USERNAME = "admin" | ||
KEYCLOAK_ADMIN_PASSWORD = "admin" | ||
KEYCLOAK_TEST_USERNAME = "gxyuser" | ||
KEYCLOAK_TEST_PASSWORD = "gxypass" | ||
KEYCLOAK_HOST_PORT = 8443 | ||
KEYCLOAK_HOST_PORT = 9443 | ||
KEYCLOAK_URL = f"https://localhost:{KEYCLOAK_HOST_PORT}/realms/gxyrealm" | ||
|
||
|
||
|
@@ -78,7 +79,7 @@ def start_keycloak_docker(container_name, image="keycloak/keycloak:22.0.1"): | |
"--https-certificate-key-file=/opt/keycloak/data/import/keycloak-server.key.pem", | ||
] | ||
subprocess.check_call(START_SLURM_DOCKER) | ||
wait_till_app_ready(KEYCLOAK_URL) | ||
wait_till_app_ready(f"http://localhost:{KEYCLOAK_HOST_PORT}") | ||
|
||
|
||
def stop_keycloak_docker(container_name): | ||
|
@@ -160,17 +161,12 @@ def _get_interactor(self, api_key=None, allow_anonymous=False) -> "ApiTestIntera | |
|
||
class TestGalaxyOIDCLoginIntegration(AbstractTestCases.BaseKeycloakIntegrationTestCase): | ||
REGEX_KEYCLOAK_LOGIN_ACTION = re.compile(r"action=\"(.*)\"\s+") | ||
REGEX_GALAXY_CSRF_TOKEN = re.compile(r"session_csrf_token\": \"(.*)\"") | ||
|
||
def _login_via_keycloak( | ||
self, | ||
username, | ||
password, | ||
expected_codes=None, | ||
save_cookies=False, | ||
): | ||
def _login_via_keycloak(self, username, password, expected_codes=None, save_cookies=False, session=None): | ||
if expected_codes is None: | ||
expected_codes = [200, 404] | ||
session = requests.Session() | ||
session = session or requests.Session() | ||
response = session.get(f"{self.url}authnz/keycloak/login") | ||
provider_url = response.json()["redirect_uri"] | ||
response = session.get(provider_url, verify=False) | ||
|
@@ -197,7 +193,7 @@ def _get_keycloak_access_token( | |
response = requests.post(f"{KEYCLOAK_URL}/protocol/openid-connect/token", data=data, verify=False) | ||
return response.json()["access_token"] | ||
|
||
def test_oidc_login(self): | ||
def test_oidc_login_new_user(self): | ||
_, response = self._login_via_keycloak(KEYCLOAK_TEST_USERNAME, KEYCLOAK_TEST_PASSWORD, save_cookies=True) | ||
# Should have redirected back if auth succeeded | ||
parsed_url = parse.urlparse(response.url) | ||
|
@@ -207,6 +203,85 @@ def test_oidc_login(self): | |
self._assert_status_code_is(response, 200) | ||
assert response.json()["email"] == "[email protected]" | ||
|
||
def test_oidc_login_existing_user(self): | ||
# pre-create a user account manually | ||
sa_session = self._app.model.session | ||
User = self._app.model.User | ||
user = User(email="[email protected]", username="precreated_user") | ||
user.set_password_cleartext("test123") | ||
sa_session.add(user) | ||
try: | ||
with transaction(sa_session): | ||
sa_session.commit() | ||
except Exception: | ||
# User already exists | ||
pass | ||
|
||
# login with the corresponding OIDC user | ||
_, response = self._login_via_keycloak("gxyuser_existing", KEYCLOAK_TEST_PASSWORD, save_cookies=True) | ||
|
||
# Should prompt user to associate accounts | ||
parsed_url = parse.urlparse(response.url) | ||
provider = parse.parse_qs(parsed_url.query)["connect_external_provider"][0] | ||
assert "keycloak" == provider | ||
response = self._get("users/current") | ||
self._assert_status_code_is(response, 400) | ||
|
||
def test_oidc_login_account_linkup(self): | ||
# pre-create a user account manually | ||
sa_session = self._app.model.session | ||
User = self._app.model.User | ||
user = User(email="[email protected]", username="precreated_user") | ||
user.set_password_cleartext("test123") | ||
sa_session.add(user) | ||
try: | ||
with transaction(sa_session): | ||
sa_session.commit() | ||
except Exception: | ||
# User already exists | ||
pass | ||
|
||
# establish a web session | ||
session = requests.Session() | ||
response = session.get(self._api_url("../login/start")) | ||
matches = self.REGEX_GALAXY_CSRF_TOKEN.search(response.text) | ||
assert matches | ||
session_csrf_token = str(matches.groups(1)[0]) | ||
response = session.post( | ||
self._api_url("../user/login"), | ||
data={ | ||
"login": "[email protected]", | ||
"password": "test123", | ||
"session_csrf_token": session_csrf_token, | ||
}, | ||
) | ||
|
||
response = session.get(self._api_url("users/current")) | ||
self._assert_status_code_is(response, 200) | ||
assert response.json()["email"] == "[email protected]" | ||
assert response.json()["username"] == "precreated_user" | ||
|
||
# login with the corresponding OIDC user, while preserving the current session | ||
_, response = self._login_via_keycloak( | ||
"gxyuser_existing", KEYCLOAK_TEST_PASSWORD, save_cookies=True, session=session | ||
) | ||
|
||
# Should now automatically associate account | ||
parsed_url = parse.urlparse(response.url) | ||
notification = parse.parse_qs(parsed_url.query)["notification"][0] | ||
assert "Your Keycloak identity has been linked to your Galaxy account." in notification | ||
response = session.get(self._api_url("users/current")) | ||
self._assert_status_code_is(response, 200) | ||
assert response.json()["email"] == "[email protected]" | ||
assert response.json()["username"] == "precreated_user" | ||
|
||
# Now that the accounts are associated, future logins through OIDC should just work | ||
session, response = self._login_via_keycloak("gxyuser_existing", KEYCLOAK_TEST_PASSWORD, save_cookies=True) | ||
response = session.get(self._api_url("users/current")) | ||
self._assert_status_code_is(response, 200) | ||
assert response.json()["email"] == "[email protected]" | ||
assert response.json()["username"] == "precreated_user" | ||
|
||
def test_oidc_logout(self): | ||
# login | ||
session, _ = self._login_via_keycloak(KEYCLOAK_TEST_USERNAME, KEYCLOAK_TEST_PASSWORD, save_cookies=True) | ||
|