diff --git a/galaxy_ng/tests/integration/dab/test_dab_rbac_contract.py b/galaxy_ng/tests/integration/dab/test_dab_rbac_contract.py index b7e55eb5d2..3fdbfbdcbf 100644 --- a/galaxy_ng/tests/integration/dab/test_dab_rbac_contract.py +++ b/galaxy_ng/tests/integration/dab/test_dab_rbac_contract.py @@ -5,6 +5,7 @@ from galaxykit import GalaxyClient from galaxykit.utils import GalaxyClientError + GALAXY_API_PATH_PREFIX = "/api/galaxy" # cant import from settings on integration tests @@ -108,7 +109,7 @@ def delete_role(): raise RuntimeError(f"Could not figure out how to delete {role}") assert response.status_code == HTTPStatus.NO_CONTENT - roles_list = gc.get(f"{url_base}?name={data['name']}") + roles_list = gc.get(url_base, params={"name": data['name']}) if roles_list["count"] > 1: raise RuntimeError(f"Found too many {url_base} with expected name {data['name']}") if roles_list["count"] == 1: @@ -184,7 +185,10 @@ def assert_object_role_assignments(gc, user, namespace, expected=0): assert len(data["users"]) == expected # Assure the assignment shows up in the DAB RBAC API - data = gc.get(f"_ui/v2/role_user_assignments/?user={user['id']}&object_id={namespace['id']}") + data = gc.get( + "_ui/v2/role_user_assignments/", + params={"user": user['id'], "object_id": namespace['id']} + ) assert data["count"] == expected if not expected: @@ -226,12 +230,12 @@ def test_create_custom_namespace_system_admin_role(custom_role_factory, galaxy_c assert system_ns_role["name"] == NS_FIXTURE_DATA["name"] gc = galaxy_client("admin") - roles_list = gc.get(f"{DAB_ROLE_URL}?name={NS_FIXTURE_DATA['name']}") + roles_list = gc.get(DAB_ROLE_URL, params={"name": NS_FIXTURE_DATA['name']}) assert roles_list["count"] == 1 dab_role = roles_list["results"][0] assert set(dab_role["permissions"]) == set(NS_FIXTURE_DATA["permissions"]) - roles_list = gc.get(f"{PULP_ROLE_URL}?name={NS_FIXTURE_DATA['name']}") + roles_list = gc.get(PULP_ROLE_URL, params={"name": NS_FIXTURE_DATA['name']}) assert roles_list["count"] == 1 pulp_role = roles_list["results"][0] assert set(pulp_role["permissions"]) == set(NS_FIXTURE_DATA["permissions"]) @@ -342,8 +346,6 @@ def test_give_team_custom_role_system( assert role_definition_resp["name"] == NS_FIXTURE_DATA["name"] assert role_definition_resp["description"] == NS_FIXTURE_DATA["description"] - # assert_system_role_team_assignments(admin_client, user, expected=1) - # Step 3: Check that user with assigned system role has write access to a namespace. response = user_client.put( @@ -385,7 +387,7 @@ def test_give_user_custom_role_object( by_role_api, by_assignment_api ): - gc = galaxy_client("admin") + admin_client = galaxy_client("admin") if by_role_api == 'pulp' and by_assignment_api == 'dab': pytest.skip( @@ -398,27 +400,30 @@ def test_give_user_custom_role_object( data["name"] = "galaxy.namespace_custom_dab_object_role" data["content_type"] = "galaxy.namespace" custom_obj_role_dab = custom_role_factory(data) - custom_obj_role_pulp = gc.get(f"{PULP_ROLE_URL}?name={data['name']}")['results'][0] + custom_obj_role_pulp = admin_client.get( + PULP_ROLE_URL, params={"name": data['name']} + )['results'][0] else: data = NS_FIXTURE_DATA.copy() data["name"] = "galaxy.namespace_custom_pulp_object_role" custom_obj_role_pulp = custom_role_factory(data, url_base=PULP_ROLE_URL) - custom_obj_role_dab = gc.get(f"{DAB_ROLE_URL}?name={data['name']}")['results'][0] + custom_obj_role_dab = admin_client.get( + DAB_ROLE_URL, params={"name": data['name']} + )['results'][0] # NOTE: If basic_user doesn't suffice, the dab proxy supports creating users # and syncing them down to galaxy - user_r = gc.get("_ui/v2/users/") - assert user_r["count"] > 0 - user = user_r["results"][0] + user_client = galaxy_client("basic_user") + user = user_client.get("_ui/v1/me/") # sanity - assignments should not exist at start of this test - assert_object_role_assignments(gc, user, namespace, 0) + assert_object_role_assignments(admin_client, user, namespace, 0) # Give the user permission to the namespace object dab_assignment = None if by_assignment_api == "dab": - dab_assignment = gc.post( + dab_assignment = admin_client.post( "_ui/v2/role_user_assignments/", body={ "role_definition": custom_obj_role_dab["id"], @@ -436,18 +441,18 @@ def test_give_user_custom_role_object( } ], } - gc.put(f"_ui/v1/my-namespaces/{namespace['name']}/", body=payload) + admin_client.put(f"_ui/v1/my-namespaces/{namespace['name']}/", body=payload) # TODO: make a request as the user and see that it works # NOTE: Check if user can have a minimal permission to modify namespace attributes # (e.g. description, company, etc.) # NOTE: Check after permission is revoked, user cannot perform same operations - assert_object_role_assignments(gc, user, namespace, 1) + assert_object_role_assignments(admin_client, user, namespace, 1) # Remove the permission from before if by_assignment_api == "dab": - response = gc.delete( + response = admin_client.delete( f"_ui/v2/role_user_assignments/{dab_assignment['id']}/", parse_json=False ) @@ -457,9 +462,96 @@ def test_give_user_custom_role_object( "name": namespace["name"], "users": [], } - gc.put(f"_ui/v1/my-namespaces/{namespace['name']}/", body=payload) + admin_client.put(f"_ui/v1/my-namespaces/{namespace['name']}/", body=payload) + + assert_object_role_assignments(admin_client, user, namespace, 0) + + +@pytest.mark.deployment_standalone +def test_give_team_custom_role_object( + settings, + galaxy_client, + custom_role_factory, + namespace, + team, +): + if settings.get('ALLOW_LOCAL_RESOURCE_MANAGEMENT') is False: + pytest.skip("galaxykit uses drf tokens, which bypass JWT auth and claims processing") + + # Step 0: Setup test. + + admin_client = galaxy_client("admin") + + user_client = galaxy_client("basic_user") + user = user_client.get("_ui/v1/me/") + add_user_to_team(admin_client, user["id"], team["id"]) + + data = { + "name": "galaxy.namespace_custom_object_role", + "content_type": "galaxy.namespace", + "description": "Galaxy namespace custom object role", + "permissions": [ + "galaxy.change_namespace", + "galaxy.delete_namespace", + "galaxy.view_namespace", + ], + } + custom_obj_role_dab = custom_role_factory(data) + + # Step 1: Check that regular user doesn't have write access to a namespace. + with pytest.raises(GalaxyClientError) as ctx: + user_client.put( + f"_ui/v1/namespaces/{namespace['name']}/", body={ + **namespace, + "company": "Test RBAC Company 1", + } + ) + assert ctx.value.response.status_code == HTTPStatus.FORBIDDEN + + # Step 2: Assign object role to a team + + admin_client.post( + "_ui/v2/role_team_assignments/", + body={ + "team": team["id"], + "role_definition": custom_obj_role_dab["id"], + "content_type": "galaxy.namespace", + "object_id": str(namespace["id"]), + }, + ) - assert_object_role_assignments(gc, user, namespace, 0) + # Step 3: Check that user with assigned system role has write access to a namespace. + + namespace = user_client.get(f"_ui/v1/namespaces/{namespace['name']}/") + namespace = user_client.put( + f"_ui/v1/namespaces/{namespace['name']}/", body={ + **namespace, + "company": "Test RBAC Company 2", + } + ) + assert namespace["company"] == "Test RBAC Company 2" + + role_assignment = admin_client.get( + "_ui/v2/role_team_assignments/", + params={"object_id": namespace["id"]} + )["results"][0] + + response = admin_client.delete( + f"_ui/v2/role_team_assignments/{role_assignment['id']}/", + parse_json=False + ) + assert response.status_code == HTTPStatus.NO_CONTENT + + # Step 5: Check that user with revoked object role doesn't have write access to a namespace. + + with pytest.raises(GalaxyClientError) as ctx: + user_client.put( + f"_ui/v1/namespaces/{namespace['name']}/", body={ + **namespace, + "company": "Test RBAC Company 3", + } + ) + assert ctx.value.response.status_code == HTTPStatus.FORBIDDEN def test_object_role_permission_validation(galaxy_client, custom_role_factory, namespace): @@ -501,7 +593,8 @@ def _rf(user_id, group_id, expected=True): assert group["name"] not in [g["name"] for g in user["groups"]] assignment_r = gc.get( - f"_ui/v2/role_user_assignments/?user={user['id']}&content_type__model=team" + "_ui/v2/role_user_assignments/", + params={"user": user['id'], "content_type__model": "team"}, ) group_ids = [assignment["object_id"] for assignment in assignment_r["results"]] if expected: @@ -515,11 +608,11 @@ def _rf(user_id, group_id, expected=True): def user_and_group(request, galaxy_client): "Return a tuple of a user and group where the user is not in the group" gc = galaxy_client("admin") - user_r = gc.get("_ui/v2/users/?username=jdoe") + user_r = gc.get("_ui/v2/users/", params={"username": "jdoe"}) assert user_r["count"] > 0 user = user_r["results"][0] - group_r = gc.get("_ui/v2/groups/?name=ship_crew") + group_r = gc.get("_ui/v2/groups/", params={"name": "ship_crew"}) assert group_r["count"] > 0 group = group_r["results"][0] @@ -563,12 +656,12 @@ def test_team_member_sync_from_dab_to_pulp(galaxy_client, assert_user_in_group, assert_user_in_group(user["id"], group["id"], expected=False) # Get the DAB team member role - rd_list = gc.get("_ui/v2/role_definitions/?name=Galaxy Team Member") + rd_list = gc.get("_ui/v2/role_definitions/", params={"name": "Galaxy Team Member"}) assert rd_list["count"] == 1 rd = rd_list["results"][0] # Because group and team are different, we look up the team by name, for DAB - team_list = gc.get(f"_ui/v2/teams/?name={group['name']}") + team_list = gc.get("_ui/v2/teams/", params={"name": group['name']}) assert team_list["count"] == 1, team_list team = team_list["results"][0]