diff --git a/lib/tool_shed/util/admin_util.py b/lib/tool_shed/util/admin_util.py index 0c5c2bc8c9f5..c85aba5fae69 100644 --- a/lib/tool_shed/util/admin_util.py +++ b/lib/tool_shed/util/admin_util.py @@ -5,12 +5,17 @@ from sqlalchemy import ( false, func, + select, ) from galaxy import ( util, web, ) +from galaxy.model import ( + Library, + LibraryDatasetDatasetAssociation, +) from galaxy.model.base import transaction from galaxy.security.validate_user_input import validate_password from galaxy.util import inflector @@ -89,12 +94,13 @@ def create_role(self, trans, **kwd): create_group_for_role = params.get("create_group_for_role", "") create_group_for_role_checked = CheckboxField.is_checked(create_group_for_role) ok = True + if params.get("create_role_button", False): if not name or not description: message = "Enter a valid name and a description." status = "error" ok = False - elif trans.sa_session.query(trans.app.model.Role).filter(trans.app.model.Role.table.c.name == name).first(): + elif get_role_id(trans.sa_session, trans.app.model.Role, name): message = "Role names must be unique and a role with that name already exists, so choose another name." status = "error" ok = False @@ -103,11 +109,11 @@ def create_role(self, trans, **kwd): role = trans.app.model.Role(name=name, description=description, type=trans.app.model.Role.types.ADMIN) trans.sa_session.add(role) # Create the UserRoleAssociations - for user in [trans.sa_session.query(trans.app.model.User).get(x) for x in in_users]: + for user in [trans.sa_session.get(trans.app.model.User, x) for x in in_users]: ura = trans.app.model.UserRoleAssociation(user, role) trans.sa_session.add(ura) # Create the GroupRoleAssociations - for group in [trans.sa_session.query(trans.app.model.Group).get(x) for x in in_groups]: + for group in [trans.sa_session.get(trans.app.model.Group, x) for x in in_groups]: gra = trans.app.model.GroupRoleAssociation(group, role) trans.sa_session.add(gra) if create_group_for_role_checked: @@ -135,17 +141,9 @@ def create_role(self, trans, **kwd): web.url_for(controller="admin", action="roles", message=util.sanitize_text(message), status="done") ) if ok: - for user in ( - trans.sa_session.query(trans.app.model.User) - .filter(trans.app.model.User.table.c.deleted == false()) - .order_by(trans.app.model.User.table.c.email) - ): + for user in get_current_users(trans.sa_session, trans.app.model.User): out_users.append((user.id, user.email)) - for group in ( - trans.sa_session.query(trans.app.model.Group) - .filter(trans.app.model.Group.table.c.deleted == false()) - .order_by(trans.app.model.Group.table.c.name) - ): + for group in get_current_groups(trans.sa_session, trans.app.model.Group): out_groups.append((group.id, group.name)) return trans.fill_template( "/webapps/tool_shed/admin/dataset_security/role/role_create.mako", @@ -181,12 +179,7 @@ def rename_role(self, trans, **kwd): message = "Enter a valid name" status = "error" else: - existing_role = ( - trans.sa_session.query(trans.app.model.Role) - .filter(trans.app.model.Role.table.c.name == new_name) - .first() - ) - if existing_role and existing_role.id != role.id: + if get_role_id(trans.sa_session, trans.app.model.Role, new_name) != role.id: message = "A role with that name already exists" status = "error" else: @@ -220,10 +213,10 @@ def manage_users_and_groups_for_role(self, trans, **kwd): ) role = get_role(trans, id) if params.get("role_members_edit_button", False): - in_users = [trans.sa_session.query(trans.app.model.User).get(x) for x in util.listify(params.in_users)] + in_users = [trans.sa_session.get(trans.app.model.User, x) for x in util.listify(params.in_users)] if trans.webapp.name == "galaxy": for ura in role.users: - user = trans.sa_session.query(trans.app.model.User).get(ura.user_id) + user = trans.sa_session.get(trans.app.model.User, ura.user_id) if user not in in_users: # Delete DefaultUserPermissions for previously associated users that have been removed from the role for dup in user.default_permissions: @@ -236,7 +229,7 @@ def manage_users_and_groups_for_role(self, trans, **kwd): trans.sa_session.delete(dhp) with transaction(trans.sa_session): trans.sa_session.commit() - in_groups = [trans.sa_session.query(trans.app.model.Group).get(x) for x in util.listify(params.in_groups)] + in_groups = [trans.sa_session.get(trans.app.model.Group, x) for x in util.listify(params.in_groups)] trans.app.security_agent.set_entity_role_associations(roles=[role], users=in_users, groups=in_groups) trans.sa_session.refresh(role) message = "Role '%s' has been updated with %d associated users and %d associated groups" % ( @@ -251,20 +244,12 @@ def manage_users_and_groups_for_role(self, trans, **kwd): out_users = [] in_groups = [] out_groups = [] - for user in ( - trans.sa_session.query(trans.app.model.User) - .filter(trans.app.model.User.table.c.deleted == false()) - .order_by(trans.app.model.User.table.c.email) - ): + for user in get_current_users(trans.sa_session, trans.app.model.User): if user in [x.user for x in role.users]: in_users.append((user.id, user.email)) else: out_users.append((user.id, user.email)) - for group in ( - trans.sa_session.query(trans.app.model.Group) - .filter(trans.app.model.Group.table.c.deleted == false()) - .order_by(trans.app.model.Group.table.c.name) - ): + for group in get_current_groups(trans.sa_session, trans.app.model.Group): if group in [x.group for x in role.groups]: in_groups.append((group.id, group.name)) else: @@ -275,9 +260,7 @@ def manage_users_and_groups_for_role(self, trans, **kwd): # whose DatasetPermissions is associated with the Role # [ ( LibraryDatasetDatasetAssociation [ action, action ] ) ] for dp in role.dataset_actions: - for ldda in trans.sa_session.query(trans.app.model.LibraryDatasetDatasetAssociation).filter( - trans.app.model.LibraryDatasetDatasetAssociation.dataset_id == dp.dataset_id - ): + for ldda in get_ldda_by_dataset(trans.sa_session, dp.dataset_id): root_found = False folder_path = "" folder = ldda.library_dataset.folder @@ -288,11 +271,7 @@ def manage_users_and_groups_for_role(self, trans, **kwd): else: folder = folder.parent folder_path = f"{folder_path} {ldda.name}" - library = ( - trans.sa_session.query(trans.app.model.Library) - .filter(trans.app.model.Library.table.c.root_folder_id == folder.id) - .first() - ) + library = get_library_by_folder(trans.sa_session, folder.id) if library not in library_dataset_actions: library_dataset_actions[library] = {} try: @@ -393,7 +372,7 @@ def purge_role(self, trans, **kwd): ) # Delete UserRoleAssociations for ura in role.users: - user = trans.sa_session.query(trans.app.model.User).get(ura.user_id) + user = trans.sa_session.get(trans.app.model.User, ura.user_id) # Delete DefaultUserPermissions for associated users for dup in user.default_permissions: if role == dup.role: @@ -459,12 +438,7 @@ def rename_group(self, trans, **kwd): message = "Enter a valid name" status = "error" else: - existing_group = ( - trans.sa_session.query(trans.app.model.Group) - .filter(trans.app.model.Group.table.c.name == new_name) - .first() - ) - if existing_group and existing_group.id != group.id: + if get_group_id(trans.sa_session, trans.app.model.Group, new_name) != group.id: message = "A group with that name already exists" status = "error" else: @@ -494,8 +468,8 @@ def manage_users_and_roles_for_group(self, trans, **kwd): status = params.get("status", "done") group = get_group(trans, params.id) if params.get("group_roles_users_edit_button", False): - in_roles = [trans.sa_session.query(trans.app.model.Role).get(x) for x in util.listify(params.in_roles)] - in_users = [trans.sa_session.query(trans.app.model.User).get(x) for x in util.listify(params.in_users)] + in_roles = [trans.sa_session.get(trans.app.model.Role, x) for x in util.listify(params.in_roles)] + in_users = [trans.sa_session.get(trans.app.model.User, x) for x in util.listify(params.in_users)] trans.app.security_agent.set_entity_group_associations(groups=[group], roles=in_roles, users=in_users) trans.sa_session.refresh(group) message += "Group '%s' has been updated with %d associated roles and %d associated users" % ( @@ -510,20 +484,12 @@ def manage_users_and_roles_for_group(self, trans, **kwd): out_roles = [] in_users = [] out_users = [] - for role in ( - trans.sa_session.query(trans.app.model.Role) - .filter(trans.app.model.Role.table.c.deleted == false()) - .order_by(trans.app.model.Role.table.c.name) - ): + for role in get_current_roles(trans.sa_session, trans.app.model.Role): if role in [x.role for x in group.roles]: in_roles.append((role.id, role.name)) else: out_roles.append((role.id, role.name)) - for user in ( - trans.sa_session.query(trans.app.model.User) - .filter(trans.app.model.User.table.c.deleted == false()) - .order_by(trans.app.model.User.table.c.email) - ): + for user in get_current_users(trans.sa_session, trans.app.model.User): if user in [x.user for x in group.users]: in_users.append((user.id, user.email)) else: @@ -563,9 +529,7 @@ def create_group(self, trans, **kwd): message = "Enter a valid name." status = "error" ok = False - elif ( - trans.sa_session.query(trans.app.model.Group).filter(trans.app.model.Group.table.c.name == name).first() - ): + elif get_group_id(trans.sa_session, trans.app.model.Group, name): message = ( "Group names must be unique and a group with that name already exists, so choose another name." ) @@ -578,11 +542,11 @@ def create_group(self, trans, **kwd): with transaction(trans.sa_session): trans.sa_session.commit() # Create the UserRoleAssociations - for user in [trans.sa_session.query(trans.app.model.User).get(x) for x in in_users]: + for user in [trans.sa_session.get(trans.app.model.User, x) for x in in_users]: uga = trans.app.model.UserGroupAssociation(user, group) trans.sa_session.add(uga) # Create the GroupRoleAssociations - for role in [trans.sa_session.query(trans.app.model.Role).get(x) for x in in_roles]: + for role in [trans.sa_session.get(trans.app.model.Role, x) for x in in_roles]: gra = trans.app.model.GroupRoleAssociation(group, role) trans.sa_session.add(gra) if create_role_for_group_checked: @@ -610,17 +574,9 @@ def create_group(self, trans, **kwd): web.url_for(controller="admin", action="groups", message=util.sanitize_text(message), status="done") ) if ok: - for user in ( - trans.sa_session.query(trans.app.model.User) - .filter(trans.app.model.User.table.c.deleted == false()) - .order_by(trans.app.model.User.table.c.email) - ): + for user in get_current_users(trans.sa_session, trans.app.model.User): out_users.append((user.id, user.email)) - for role in ( - trans.sa_session.query(trans.app.model.Role) - .filter(trans.app.model.Role.table.c.deleted == false()) - .order_by(trans.app.model.Role.table.c.name) - ): + for role in get_current_roles(trans.sa_session, trans.app.model.Role): out_roles.append((role.id, role.name)) return trans.fill_template( "/webapps/tool_shed/admin/dataset_security/group/group_create.mako", @@ -881,7 +837,7 @@ def purge_user(self, trans, **kwd): trans.sa_session.refresh(h) for hda in h.active_datasets: # Delete HistoryDatasetAssociation - d = trans.sa_session.query(trans.app.model.Dataset).get(hda.dataset_id) + d = trans.sa_session.get(trans.app.model.Dataset, hda.dataset_id) # Delete Dataset if not d.deleted: d.deleted = True @@ -944,14 +900,8 @@ def users(self, trans, **kwd): @web.require_admin def name_autocomplete_data(self, trans, q=None, limit=None, timestamp=None): """Return autocomplete data for user emails""" - ac_data = "" - for user in ( - trans.sa_session.query(trans.app.model.User) - .filter_by(deleted=False) - .filter(func.lower(trans.app.model.User.email).like(f"{q.lower()}%")) - ): - ac_data = f"{ac_data + user.email}\n" - return ac_data + emails = get_user_emails_by_prefix(trans.sa_session, trans.app.model.User, q) + return "\n".join(emails) @web.expose @web.require_admin @@ -970,19 +920,19 @@ def manage_roles_and_groups_for_user(self, trans, **kwd): # Make sure the user is not dis-associating himself from his private role out_roles = kwd.get("out_roles", []) if out_roles: - out_roles = [trans.sa_session.query(trans.app.model.Role).get(x) for x in util.listify(out_roles)] + out_roles = [trans.sa_session.get(trans.app.model.Role, x) for x in util.listify(out_roles)] if private_role in out_roles: message += "You cannot eliminate a user's private role association. " status = "error" in_roles = kwd.get("in_roles", []) if in_roles: - in_roles = [trans.sa_session.query(trans.app.model.Role).get(x) for x in util.listify(in_roles)] + in_roles = [trans.sa_session.get(trans.app.model.Role, x) for x in util.listify(in_roles)] out_groups = kwd.get("out_groups", []) if out_groups: - out_groups = [trans.sa_session.query(trans.app.model.Group).get(x) for x in util.listify(out_groups)] + out_groups = [trans.sa_session.get(trans.app.model.Group, x) for x in util.listify(out_groups)] in_groups = kwd.get("in_groups", []) if in_groups: - in_groups = [trans.sa_session.query(trans.app.model.Group).get(x) for x in util.listify(in_groups)] + in_groups = [trans.sa_session.get(trans.app.model.Group, x) for x in util.listify(in_groups)] if in_roles: trans.app.security_agent.set_entity_user_associations(users=[user], roles=in_roles, groups=in_groups) trans.sa_session.refresh(user) @@ -997,11 +947,7 @@ def manage_roles_and_groups_for_user(self, trans, **kwd): out_roles = [] in_groups = [] out_groups = [] - for role in ( - trans.sa_session.query(trans.app.model.Role) - .filter(trans.app.model.Role.table.c.deleted == false()) - .order_by(trans.app.model.Role.table.c.name) - ): + for role in get_current_roles(trans.sa_session, trans.app.model.Role): if role in [x.role for x in user.roles]: in_roles.append((role.id, role.name)) elif role.type != trans.app.model.Role.types.PRIVATE: @@ -1010,11 +956,7 @@ def manage_roles_and_groups_for_user(self, trans, **kwd): # role, which should always be in in_roles. The check above is added as an additional # precaution, since for a period of time we were including private roles in the form fields. out_roles.append((role.id, role.name)) - for group in ( - trans.sa_session.query(trans.app.model.Group) - .filter(trans.app.model.Group.table.c.deleted == false()) - .order_by(trans.app.model.Group.table.c.name) - ): + for group in get_current_groups(trans.sa_session, trans.app.model.Group): if group in [x.group for x in user.groups]: in_groups.append((group.id, group.name)) else: @@ -1043,7 +985,7 @@ def manage_roles_and_groups_for_user(self, trans, **kwd): def get_user(trans, user_id): """Get a User from the database by id.""" - user = trans.sa_session.query(trans.model.User).get(trans.security.decode_id(user_id)) + user = trans.sa_session.get(trans.model.User, trans.security.decode_id(user_id)) if not user: return trans.show_error_message(f"User not found for id ({str(user_id)})") return user @@ -1053,7 +995,7 @@ def get_role(trans, id): """Get a Role from the database by id.""" # Load user from database id = trans.security.decode_id(id) - role = trans.sa_session.query(trans.model.Role).get(id) + role = trans.sa_session.get(trans.model.Role, id) if not role: return trans.show_error_message(f"Role not found for id ({str(id)})") return role @@ -1063,7 +1005,51 @@ def get_group(trans, id): """Get a Group from the database by id.""" # Load user from database id = trans.security.decode_id(id) - group = trans.sa_session.query(trans.model.Group).get(id) + group = trans.sa_session.get(trans.model.Group, id) if not group: return trans.show_error_message(f"Group not found for id ({str(id)})") return group + + +def get_role_id(session, role_model, name): + stmt = select(role_model.id).where(role_model.name == name).limit(1) + return session.scalars(stmt).first() + + +def get_group_id(session, group_model, name): + stmt = select(group_model.id).where(group_model.name == name).limit(1) + return session.scalars(stmt).first() + + +def get_current_users(session, user_model): + stmt = select(user_model).where(user_model.deleted == false()).order_by(user_model.email) + return session.scalars(stmt) + + +def get_current_groups(session, group_model): + stmt = select(group_model).where(group_model.deleted == false()).order_by(group_model.name) + return session.scalars(stmt) + + +def get_current_roles(session, role_model): + stmt = select(role_model).where(role_model.deleted == false()).order_by(role_model.name) + return session.scalars(stmt) + + +def get_ldda_by_dataset(session, dataset_id): + stmt = select(LibraryDatasetDatasetAssociation).where(LibraryDatasetDatasetAssociation.dataset_id == dataset_id) + return session.scalars(stmt) + + +def get_library_by_folder(session, folder_id): + stmt = select(Library).where(Library.folder_id == folder_id).limit(1) + return session.scalars(stmt).first() + + +def get_user_emails_by_prefix(session, user_model, prefix): + stmt = ( + select(user_model.email) + .where(user_model.deleted == false()) + .where(func.lower(user_model.email).like(f"{prefix.lower()}%")) + ) + return session.scalars(stmt)