-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Improved error messages and exception handling
- Loading branch information
Showing
3 changed files
with
49 additions
and
30 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -299,15 +299,16 @@ def test_auth_by_access_token_logged_in_once(self): | |
# login at least once | ||
self._login_via_keycloak("gxyuser_logged_in_once", KEYCLOAK_TEST_PASSWORD) | ||
access_token = self._get_keycloak_access_token(username="gxyuser_logged_in_once") | ||
response = self._get("users/current", headers={"Authorization": f"Bearer {access_token}"}) | ||
response = self._get("whoami", headers={"Authorization": f"Bearer {access_token}"}) | ||
self._assert_status_code_is(response, 200) | ||
assert response.json()["email"] == "[email protected]" | ||
|
||
def test_auth_by_access_token_never_logged_in(self): | ||
# If the user has not previously logged in via OIDC at least once, OIDC API calls are not allowed | ||
access_token = self._get_keycloak_access_token(username="gxyuser_never_logged_in") | ||
response = self._get("users/current", headers={"Authorization": f"Bearer {access_token}"}) | ||
self._assert_status_code_is(response, 400) | ||
self._assert_status_code_is(response, 401) | ||
assert "The user should log into Galaxy at least once" in response.json()["err_msg"] | ||
|
||
def test_auth_with_expired_token(self): | ||
_, response = self._login_via_keycloak(KEYCLOAK_TEST_USERNAME, KEYCLOAK_TEST_PASSWORD) | ||
|
@@ -321,7 +322,7 @@ def test_auth_with_expired_token(self): | |
# token should have expired in 7 seconds, so the call should fail | ||
time.sleep(7) | ||
response = self._get("users/current", headers={"Authorization": f"Bearer {access_token}"}) | ||
self._assert_status_code_is(response, 400) | ||
self._assert_status_code_is(response, 401) | ||
|
||
def test_auth_with_another_authorized_client(self): | ||
_, response = self._login_via_keycloak(KEYCLOAK_TEST_USERNAME, KEYCLOAK_TEST_PASSWORD) | ||
|
@@ -336,15 +337,18 @@ def test_auth_with_authorized_client_but_unauthorized_audience(self): | |
_, response = self._login_via_keycloak("bpaonlyuser", KEYCLOAK_TEST_PASSWORD) | ||
access_token = self._get_keycloak_access_token(client_id="bpaclient", username="bpaonlyuser") | ||
response = self._get("users/current", headers={"Authorization": f"Bearer {access_token}"}) | ||
self._assert_status_code_is(response, 400) | ||
self._assert_status_code_is(response, 401) | ||
assert "Invalid access token" in response.json()["err_msg"] | ||
|
||
def test_auth_with_unauthorized_client(self): | ||
_, response = self._login_via_keycloak(KEYCLOAK_TEST_USERNAME, KEYCLOAK_TEST_PASSWORD) | ||
access_token = self._get_keycloak_access_token(client_id="unauthorizedclient") | ||
response = self._get("users/current", headers={"Authorization": f"Bearer {access_token}"}) | ||
self._assert_status_code_is(response, 400) | ||
self._assert_status_code_is(response, 401) | ||
assert "Invalid access token" in response.json()["err_msg"] | ||
|
||
def test_auth_without_required_scopes(self): | ||
access_token = self._get_keycloak_access_token(client_id="bpaclient") | ||
response = self._get("users/current", headers={"Authorization": f"Bearer {access_token}"}) | ||
self._assert_status_code_is(response, 400) | ||
self._assert_status_code_is(response, 401) | ||
assert "Invalid access token" in response.json()["err_msg"] |