diff --git a/lib/galaxy/managers/groups.py b/lib/galaxy/managers/groups.py index f600369e3184..a32c5f15440a 100644 --- a/lib/galaxy/managers/groups.py +++ b/lib/galaxy/managers/groups.py @@ -11,10 +11,10 @@ RequestParameterInvalidException, ) from galaxy.managers.context import ProvidesAppContext -from galaxy.managers.roles import get_roles_by_ids -from galaxy.managers.users import get_users_by_ids from galaxy.model import Group from galaxy.model.base import transaction +from galaxy.model.db.role import get_roles_by_ids +from galaxy.model.db.user import get_users_by_ids from galaxy.model.scoped_session import galaxy_scoped_session from galaxy.schema.fields import Security from galaxy.schema.groups import GroupCreatePayload diff --git a/lib/galaxy/managers/libraries.py b/lib/galaxy/managers/libraries.py index 3acb16a522d6..66e6903a01d1 100644 --- a/lib/galaxy/managers/libraries.py +++ b/lib/galaxy/managers/libraries.py @@ -10,15 +10,6 @@ Tuple, ) -from sqlalchemy import ( - asc, - false, - func, - not_, - or_, - select, - true, -) from sqlalchemy.exc import ( MultipleResultsFound, NoResultFound, @@ -29,10 +20,16 @@ from galaxy.managers.folders import FolderManager from galaxy.model import ( Library, - LibraryPermissions, Role, ) from galaxy.model.base import transaction +from galaxy.model.db.library import ( + get_libraries_by_name, + get_libraries_for_admins, + get_libraries_for_nonadmins, + get_library_ids, + get_library_permissions_by_role, +) from galaxy.util import ( pretty_print_time_interval, unicodify, @@ -60,7 +57,7 @@ def get(self, trans, decoded_library_id: int, check_accessible: bool = True) -> :rtype: galaxy.model.Library """ try: - library = get_library(trans.sa_session, decoded_library_id) + library = trans.sa_session.get(Library, decoded_library_id) except MultipleResultsFound: raise exceptions.InconsistentDatabase("Multiple libraries found with the same id.") except NoResultFound: @@ -358,52 +355,8 @@ def get_containing_library_from_library_dataset(trans, library_dataset) -> Optio while folder.parent: folder = folder.parent # We have folder set to the library's root folder, which has the same name as the library - stmt = select(Library).where(Library.deleted == false()).where(Library.name == folder.name) - for library in trans.sa_session.scalars(stmt): + for library in get_libraries_by_name(trans.sa_session, folder.name): # Just to double-check if library.root_folder == folder: return library return None - - -def get_library(session, library_id): - stmt = select(Library).where(Library.id == library_id) - return session.execute(stmt).scalar_one() - - -def get_library_ids(session, library_access_action): - stmt = select(LibraryPermissions.library_id).where(LibraryPermissions.action == library_access_action).distinct() - return session.scalars(stmt) - - -def get_library_permissions_by_role(session, role_ids): - stmt = select(LibraryPermissions).where(LibraryPermissions.role_id.in_(role_ids)) - return session.scalars(stmt) - - -def get_libraries_for_admins(session, deleted): - stmt = select(Library) - if deleted is None: - # Flag is not specified, do not filter on it. - pass - elif deleted: - stmt = stmt.where(Library.deleted == true()) - else: - stmt = stmt.where(Library.deleted == false()) - stmt = stmt.order_by(asc(func.lower(Library.name))) - return session.scalars(stmt) - - -def get_libraries_for_nonadmins(session, restricted_library_ids, accessible_restricted_library_ids): - stmt = ( - select(Library) - .where(Library.deleted == false()) - .where( - or_( - not_(Library.id.in_(restricted_library_ids)), - Library.id.in_(accessible_restricted_library_ids), - ) - ) - ) - stmt = stmt.order_by(asc(func.lower(Library.name))) - return session.scalars(stmt) diff --git a/lib/galaxy/managers/roles.py b/lib/galaxy/managers/roles.py index 1f8ef428b101..8864f8900e16 100644 --- a/lib/galaxy/managers/roles.py +++ b/lib/galaxy/managers/roles.py @@ -26,7 +26,6 @@ from galaxy.managers.context import ProvidesUserContext from galaxy.model import Role from galaxy.model.base import transaction -from galaxy.model.scoped_session import galaxy_scoped_session from galaxy.schema.schema import RoleDefinitionModel from galaxy.util import unicodify @@ -162,8 +161,3 @@ def undelete(self, trans: ProvidesUserContext, role: model.Role) -> model.Role: with transaction(trans.sa_session): trans.sa_session.commit() return role - - -def get_roles_by_ids(session: galaxy_scoped_session, role_ids): - stmt = select(Role).where(Role.id.in_(role_ids)) - return session.scalars(stmt).all() diff --git a/lib/galaxy/managers/users.py b/lib/galaxy/managers/users.py index 6ffa1ebabc21..9abc047629c8 100644 --- a/lib/galaxy/managers/users.py +++ b/lib/galaxy/managers/users.py @@ -45,7 +45,10 @@ UserQuotaUsage, ) from galaxy.model.base import transaction -from galaxy.model.scoped_session import galaxy_scoped_session +from galaxy.model.db.user import ( + get_user_by_email, + get_user_by_username, +) from galaxy.security.validate_user_input import ( VALID_EMAIL_RE, validate_email, @@ -872,25 +875,3 @@ def _add_parsers(self): ) self.fn_filter_parsers.update({}) - - -def get_users_by_ids(session: galaxy_scoped_session, user_ids): - stmt = select(User).where(User.id.in_(user_ids)) - return session.scalars(stmt).all() - - -# The get_user_by_email and get_user_by_username functions may be called from -# the tool_shed app, which has its own User model, which is different from -# galaxy.model.User. In that case, the tool_shed user model should be passed as -# the model_class argument. -def get_user_by_email(session, email: str, model_class=User, case_sensitive=True): - filter_clause = model_class.email == email - if not case_sensitive: - filter_clause = func.lower(model_class.email) == func.lower(email) - stmt = select(model_class).where(filter_clause).limit(1) - return session.scalars(stmt).first() - - -def get_user_by_username(session, username: str, model_class=User): - stmt = select(model_class).filter(model_class.username == username).limit(1) - return session.scalars(stmt).first() diff --git a/lib/galaxy/model/db/__init__.py b/lib/galaxy/model/db/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/lib/galaxy/model/db/library.py b/lib/galaxy/model/db/library.py new file mode 100644 index 000000000000..b88fa28433db --- /dev/null +++ b/lib/galaxy/model/db/library.py @@ -0,0 +1,57 @@ +from sqlalchemy import ( + asc, + false, + func, + not_, + or_, + select, + true, +) + +from galaxy.model import ( + Library, + LibraryPermissions, +) + + +def get_library_ids(session, library_access_action): + stmt = select(LibraryPermissions.library_id).where(LibraryPermissions.action == library_access_action).distinct() + return session.scalars(stmt) + + +def get_library_permissions_by_role(session, role_ids): + stmt = select(LibraryPermissions).where(LibraryPermissions.role_id.in_(role_ids)) + return session.scalars(stmt) + + +def get_libraries_for_admins(session, deleted): + stmt = select(Library) + if deleted is None: + # Flag is not specified, do not filter on it. + pass + elif deleted: + stmt = stmt.where(Library.deleted == true()) + else: + stmt = stmt.where(Library.deleted == false()) + stmt = stmt.order_by(asc(func.lower(Library.name))) + return session.scalars(stmt) + + +def get_libraries_for_nonadmins(session, restricted_library_ids, accessible_restricted_library_ids): + stmt = ( + select(Library) + .where(Library.deleted == false()) + .where( + or_( + not_(Library.id.in_(restricted_library_ids)), + Library.id.in_(accessible_restricted_library_ids), + ) + ) + ) + stmt = stmt.order_by(asc(func.lower(Library.name))) + return session.scalars(stmt) + + +def get_libraries_by_name(session, name): + stmt = select(Library).where(Library.deleted == false()).where(Library.name == name) + return session.scalars(stmt) diff --git a/lib/galaxy/model/db/role.py b/lib/galaxy/model/db/role.py new file mode 100644 index 000000000000..46df076ea813 --- /dev/null +++ b/lib/galaxy/model/db/role.py @@ -0,0 +1,43 @@ +from sqlalchemy import ( + and_, + false, + select, +) + +from galaxy.model import ( + Role, + UserRoleAssociation, +) +from galaxy.model.scoped_session import galaxy_scoped_session + + +def get_npns_roles(session): + """ + non-private, non-sharing roles + """ + stmt = ( + select(Role) + .where(and_(Role.deleted == false(), Role.type != Role.types.PRIVATE, Role.type != Role.types.SHARING)) + .order_by(Role.name) + ) + return session.scalars(stmt) + + +def get_private_user_role(user, session): + stmt = ( + select(Role) + .where( + and_( + UserRoleAssociation.user_id == user.id, + Role.id == UserRoleAssociation.role_id, + Role.type == Role.types.PRIVATE, + ) + ) + .distinct() + ) + return session.execute(stmt).scalar_one_or_none() + + +def get_roles_by_ids(session: galaxy_scoped_session, role_ids): + stmt = select(Role).where(Role.id.in_(role_ids)) + return session.scalars(stmt).all() diff --git a/lib/galaxy/model/db/user.py b/lib/galaxy/model/db/user.py new file mode 100644 index 000000000000..823247f725dd --- /dev/null +++ b/lib/galaxy/model/db/user.py @@ -0,0 +1,66 @@ +from typing import Optional + +from sqlalchemy import ( + false, + func, + or_, + select, + true, +) + +from galaxy.model import User +from galaxy.model.scoped_session import galaxy_scoped_session + + +def get_users_by_ids(session: galaxy_scoped_session, user_ids): + stmt = select(User).where(User.id.in_(user_ids)) + return session.scalars(stmt).all() + + +# The get_user_by_email and get_user_by_username functions may be called from +# the tool_shed app, which has its own User model, which is different from +# galaxy.model.User. In that case, the tool_shed user model should be passed as +# the model_class argument. +def get_user_by_email(session, email: str, model_class=User, case_sensitive=True): + filter_clause = model_class.email == email + if not case_sensitive: + filter_clause = func.lower(model_class.email) == func.lower(email) + stmt = select(model_class).where(filter_clause).limit(1) + return session.scalars(stmt).first() + + +def get_user_by_username(session, username: str, model_class=User): + stmt = select(model_class).filter(model_class.username == username).limit(1) + return session.scalars(stmt).first() + + +def get_users_for_index( + session, + deleted: bool, + f_email: Optional[str] = None, + f_name: Optional[str] = None, + f_any: Optional[str] = None, + is_admin: bool = False, + expose_user_email: bool = False, + expose_user_name: bool = False, +): + stmt = select(User) + if f_email and (is_admin or expose_user_email): + stmt = stmt.where(User.email.like(f"%{f_email}%")) + if f_name and (is_admin or expose_user_name): + stmt = stmt.where(User.username.like(f"%{f_name}%")) + if f_any: + if is_admin: + stmt = stmt.where(or_(User.email.like(f"%{f_any}%"), User.username.like(f"%{f_any}%"))) + else: + if expose_user_email and expose_user_name: + stmt = stmt.where(or_(User.email.like(f"%{f_any}%"), User.username.like(f"%{f_any}%"))) + elif expose_user_email: + stmt = stmt.where(User.email.like(f"%{f_any}%")) + elif expose_user_name: + stmt = stmt.where(User.username.like(f"%{f_any}%")) + if deleted: + stmt = stmt.where(User.deleted == true()) + else: + stmt = stmt.where(User.deleted == false()) + return session.scalars(stmt).all() diff --git a/lib/galaxy/model/security.py b/lib/galaxy/model/security.py index 7a761d562edb..8d7574155886 100644 --- a/lib/galaxy/model/security.py +++ b/lib/galaxy/model/security.py @@ -35,6 +35,10 @@ UserRoleAssociation, ) from galaxy.model.base import transaction +from galaxy.model.db.role import ( + get_npns_roles, + get_private_user_role, +) from galaxy.security import ( Action, get_permitted_actions, @@ -80,22 +84,11 @@ def sort_by_attr(self, seq, attr): intermed.sort() return [_[-1] for _ in intermed] - def _get_npns_roles(self, trans): - """ - non-private, non-sharing roles - """ - stmt = ( - select(Role) - .where(and_(Role.deleted == false(), Role.type != Role.types.PRIVATE, Role.type != Role.types.SHARING)) - .order_by(Role.name) - ) - return trans.sa_session.scalars(stmt) - def get_all_roles(self, trans, cntrller): admin_controller = cntrller in ["library_admin"] roles = set() if not trans.user: - return self._get_npns_roles(trans) + return get_npns_roles(trans.sa_session) if admin_controller: # The library is public and the user is an admin, so all roles are legitimate stmt = select(Role).where(Role.deleted == false()).order_by(Role.name) @@ -108,7 +101,7 @@ def get_all_roles(self, trans, cntrller): for role in self.get_sharing_roles(trans.user): roles.add(role) # Add all remaining non-private, non-sharing roles - for role in self._get_npns_roles(trans): + for role in get_npns_roles(trans.sa_session): roles.add(role) return self.sort_by_attr(list(roles), "name") @@ -189,7 +182,7 @@ def get_valid_roles(self, trans, item, query=None, page=None, page_limit=None, i for role in self.get_sharing_roles(trans.user): roles.append(role) # Add all remaining non-private, non-sharing roles - for role in self._get_npns_roles(trans): + for role in get_npns_roles(trans.sa_session): roles.append(role) # User will see all the roles derived from the access roles on the item else: @@ -765,23 +758,9 @@ def create_private_user_role(self, user): return self.get_private_user_role(user) def get_private_user_role(self, user, auto_create=False): - stmt = ( - select(Role) - .where( - and_( - UserRoleAssociation.user_id == user.id, - Role.id == UserRoleAssociation.role_id, - Role.type == Role.types.PRIVATE, - ) - ) - .distinct() - ) - role = self.sa_session.execute(stmt).scalar_one_or_none() - if not role: - if auto_create: - return self.create_private_user_role(user) - else: - return None + role = get_private_user_role(user, self.sa_session) + if not role and auto_create: + role = self.create_private_user_role(user) return role def get_role(self, name, type=None): diff --git a/lib/galaxy/model/unittest_utils/db_helpers.py b/lib/galaxy/model/unittest_utils/db_helpers.py new file mode 100644 index 000000000000..6c56d83278b3 --- /dev/null +++ b/lib/galaxy/model/unittest_utils/db_helpers.py @@ -0,0 +1,12 @@ +from sqlalchemy import select + +from galaxy import model + + +def get_hdca_by_name(session, name): + stmt = ( + select(model.HistoryDatasetCollectionAssociation) + .where(model.HistoryDatasetCollectionAssociation.name == name) + .limit(1) + ) + return session.scalars(stmt).first() diff --git a/lib/galaxy/visualization/genomes.py b/lib/galaxy/visualization/genomes.py index 066785feae7a..d38996c52c32 100644 --- a/lib/galaxy/visualization/genomes.py +++ b/lib/galaxy/visualization/genomes.py @@ -14,11 +14,11 @@ ObjectNotFound, ReferenceDataError, ) -from galaxy.managers.users import get_user_by_username from galaxy.model import ( HistoryDatasetAssociation, User, ) +from galaxy.model.db.user import get_user_by_username from galaxy.structured_app import StructuredApp from galaxy.util.bunch import Bunch diff --git a/lib/galaxy/webapps/galaxy/controllers/page.py b/lib/galaxy/webapps/galaxy/controllers/page.py index b222c7a6be3e..047fa98e28fb 100644 --- a/lib/galaxy/webapps/galaxy/controllers/page.py +++ b/lib/galaxy/webapps/galaxy/controllers/page.py @@ -13,9 +13,9 @@ PageManager, ) from galaxy.managers.sharable import SlugBuilder -from galaxy.managers.users import get_user_by_username from galaxy.managers.workflows import WorkflowsManager from galaxy.model.base import transaction +from galaxy.model.db.user import get_user_by_username from galaxy.model.item_attrs import UsesItemRatings from galaxy.schema.schema import CreatePagePayload from galaxy.structured_app import StructuredApp diff --git a/lib/galaxy/webapps/galaxy/controllers/user.py b/lib/galaxy/webapps/galaxy/controllers/user.py index f0a82c7f7519..8e476f0db3b4 100644 --- a/lib/galaxy/webapps/galaxy/controllers/user.py +++ b/lib/galaxy/webapps/galaxy/controllers/user.py @@ -18,7 +18,7 @@ ) from galaxy.exceptions import Conflict from galaxy.managers import users -from galaxy.managers.users import get_user_by_email +from galaxy.model.db.user import get_user_by_email from galaxy.security.validate_user_input import ( validate_email, validate_publicname, diff --git a/lib/galaxy/webapps/galaxy/services/quotas.py b/lib/galaxy/webapps/galaxy/services/quotas.py index 38b6a69fe849..68b24dc6e970 100644 --- a/lib/galaxy/webapps/galaxy/services/quotas.py +++ b/lib/galaxy/webapps/galaxy/services/quotas.py @@ -11,8 +11,8 @@ from galaxy.managers.context import ProvidesUserContext from galaxy.managers.groups import get_group_by_name from galaxy.managers.quotas import QuotaManager -from galaxy.managers.users import get_user_by_email from galaxy.model import Quota +from galaxy.model.db.user import get_user_by_email from galaxy.model.scoped_session import galaxy_scoped_session from galaxy.quota._schema import ( CreateQuotaParams, diff --git a/lib/galaxy/webapps/galaxy/services/users.py b/lib/galaxy/webapps/galaxy/services/users.py index 4cab9d01ab12..84c1f4a12978 100644 --- a/lib/galaxy/webapps/galaxy/services/users.py +++ b/lib/galaxy/webapps/galaxy/services/users.py @@ -4,13 +4,6 @@ Union, ) -from sqlalchemy import ( - false, - or_, - select, - true, -) - import galaxy.managers.base as managers_base from galaxy import ( exceptions as glx_exceptions, @@ -27,6 +20,7 @@ UserSerializer, ) from galaxy.model import User +from galaxy.model.db.user import get_users_for_index from galaxy.queue_worker import send_local_control_task from galaxy.quota import QuotaAgent from galaxy.schema import APIKeyModel @@ -206,31 +200,11 @@ def get_index( f_name: Optional[str], f_any: Optional[str], ) -> List[MaybeLimitedUserModel]: - rval: List[MaybeLimitedUserModel] = [] - stmt = select(User) - - if f_email and (trans.user_is_admin or trans.app.config.expose_user_email): - stmt = stmt.filter(User.email.like(f"%{f_email}%")) - - if f_name and (trans.user_is_admin or trans.app.config.expose_user_name): - stmt = stmt.filter(User.username.like(f"%{f_name}%")) - - if f_any: - if trans.user_is_admin: - stmt = stmt.filter(or_(User.email.like(f"%{f_any}%"), User.username.like(f"%{f_any}%"))) - else: - if trans.app.config.expose_user_email and trans.app.config.expose_user_name: - stmt = stmt.filter(or_(User.email.like(f"%{f_any}%"), User.username.like(f"%{f_any}%"))) - elif trans.app.config.expose_user_email: - stmt = stmt.filter(User.email.like(f"%{f_any}%")) - elif trans.app.config.expose_user_name: - stmt = stmt.filter(User.username.like(f"%{f_any}%")) - + # check for early return conditions if deleted: - # only admins can see deleted users if not trans.user_is_admin: + # only admins can see deleted users return [] - stmt = stmt.filter(User.deleted == true()) else: # special case: user can see only their own user # special case2: if the galaxy admin has specified that other user email/names are @@ -244,8 +218,19 @@ def get_index( return [UserModel(**trans.user.to_dict())] else: return [] - stmt = stmt.filter(User.deleted == false()) - for user in trans.sa_session.scalars(stmt).all(): + + users = get_users_for_index( + trans.sa_session, + deleted, + f_email, + f_name, + f_any, + trans.user_is_admin, + trans.app.config.expose_user_email, + trans.app.config.expose_user_name, + ) + rval: List[MaybeLimitedUserModel] = [] + for user in users: user_dict = user.to_dict() # If NOT configured to expose_email, do not expose email UNLESS the user is self, or # the user is an admin diff --git a/lib/galaxy/webapps/reports/controllers/jobs.py b/lib/galaxy/webapps/reports/controllers/jobs.py index 759380d78da4..98b1e226d718 100644 --- a/lib/galaxy/webapps/reports/controllers/jobs.py +++ b/lib/galaxy/webapps/reports/controllers/jobs.py @@ -24,8 +24,8 @@ model, util, ) -from galaxy.managers.users import get_user_by_email from galaxy.model import Job +from galaxy.model.db.user import get_user_by_email from galaxy.web.legacy_framework import grids from galaxy.webapps.base.controller import ( BaseUIController, diff --git a/lib/tool_shed/test/base/test_db_util.py b/lib/tool_shed/test/base/test_db_util.py index f957cfe0ea2c..21f08b1451ce 100644 --- a/lib/tool_shed/test/base/test_db_util.py +++ b/lib/tool_shed/test/base/test_db_util.py @@ -9,7 +9,7 @@ import galaxy.model import galaxy.model.tool_shed_install import tool_shed.webapp.model as model -from galaxy.managers.users import ( +from galaxy.model.db.user import ( get_user_by_email, get_user_by_username, ) diff --git a/lib/tool_shed/webapp/controllers/repository.py b/lib/tool_shed/webapp/controllers/repository.py index 2fb4015ce9f4..67fd5ed998cd 100644 --- a/lib/tool_shed/webapp/controllers/repository.py +++ b/lib/tool_shed/webapp/controllers/repository.py @@ -24,8 +24,8 @@ util, web, ) -from galaxy.managers.users import get_user_by_username from galaxy.model.base import transaction +from galaxy.model.db.user import get_user_by_username from galaxy.tool_shed.util import dependency_display from galaxy.tools.repositories import ValidationContext from galaxy.util.tool_shed import encoding_util diff --git a/lib/tool_shed/webapp/controllers/user.py b/lib/tool_shed/webapp/controllers/user.py index eb9a11878eba..63016fe8d70e 100644 --- a/lib/tool_shed/webapp/controllers/user.py +++ b/lib/tool_shed/webapp/controllers/user.py @@ -8,8 +8,8 @@ web, ) from galaxy.managers.api_keys import ApiKeyManager -from galaxy.managers.users import get_user_by_email from galaxy.model.base import transaction +from galaxy.model.db.user import get_user_by_email from galaxy.security.validate_user_input import ( validate_email, validate_password, diff --git a/test/integration/test_user_preferences.py b/test/integration/test_user_preferences.py index b01f4850489d..62e76aa108a8 100644 --- a/test/integration/test_user_preferences.py +++ b/test/integration/test_user_preferences.py @@ -8,7 +8,7 @@ put, ) -from galaxy.managers.users import get_user_by_email +from galaxy.model.db.user import get_user_by_email from galaxy_test.driver import integration_util TEST_USER_EMAIL = "test_user_preferences@bx.psu.edu" diff --git a/test/integration/test_vault_extra_prefs.py b/test/integration/test_vault_extra_prefs.py index e3ec8fd5c7f1..5072ac69d270 100644 --- a/test/integration/test_vault_extra_prefs.py +++ b/test/integration/test_vault_extra_prefs.py @@ -10,7 +10,7 @@ put, ) -from galaxy.managers.users import get_user_by_email +from galaxy.model.db.user import get_user_by_email from galaxy_test.driver import integration_util TEST_USER_EMAIL = "vault_test_user@bx.psu.edu" diff --git a/test/integration/test_vault_file_source.py b/test/integration/test_vault_file_source.py index dc39477cd2f6..71b8ff31dd6f 100644 --- a/test/integration/test_vault_file_source.py +++ b/test/integration/test_vault_file_source.py @@ -1,7 +1,7 @@ import os import tempfile -from galaxy.managers.users import get_user_by_email +from galaxy.model.db.user import get_user_by_email from galaxy.security.vault import UserVaultWrapper from galaxy_test.base import api_asserts from galaxy_test.base.populators import DatasetPopulator diff --git a/test/unit/data/model/conftest.py b/test/unit/data/model/conftest.py index 034be09e51b8..26ea7d8b7cc2 100644 --- a/test/unit/data/model/conftest.py +++ b/test/unit/data/model/conftest.py @@ -73,15 +73,43 @@ def f(**kwd): return f +@pytest.fixture +def make_dataset_collection(session): + def f(**kwd): + model = m.DatasetCollection(**kwd) + write_to_db(session, model) + return model + + return f + + +@pytest.fixture +def make_dataset_collection_element(session, make_hda): + def f(**kwd): + kwd["element"] = kwd.get("element") or make_hda() + model = m.DatasetCollectionElement(**kwd) + write_to_db(session, model) + return model + + return f + + +@pytest.fixture +def make_dataset_permissions(session): + def f(**kwd): + model = m.DatasetPermissions(**kwd) + write_to_db(session, model) + return model + + return f + + @pytest.fixture def make_default_history_permissions(session, make_history, make_role): def f(**kwd): - if "history" not in kwd: - kwd["history"] = make_history() - if "action" not in kwd: - kwd["action"] = random_str() - if "role" not in kwd: - kwd["role"] = make_role() + kwd["history"] = kwd.get("history") or make_history() + kwd["action"] = kwd.get("action") or random_str() + kwd["role"] = kwd.get("role") or make_role() model = m.DefaultHistoryPermissions(**kwd) write_to_db(session, model) return model @@ -112,10 +140,8 @@ def f(**kwd): @pytest.fixture def make_galaxy_session_to_history_association(session, make_history, make_galaxy_session): def f(**kwd): - if "galaxy_session" not in kwd: - kwd["galaxy_session"] = make_galaxy_session() - if "history" not in kwd: - kwd["history"] = make_history() + kwd["galaxy_session"] = kwd.get("galaxy_session") or make_galaxy_session() + kwd["history"] = kwd.get("history") or make_history() model = m.GalaxySessionToHistoryAssociation(**kwd) write_to_db(session, model) return model @@ -123,11 +149,31 @@ def f(**kwd): return f +@pytest.fixture +def make_hda(session, make_history): + def f(**kwd): + kwd["history"] = kwd.get("history") or make_history() + model = m.HistoryDatasetAssociation(**kwd) + write_to_db(session, model) + return model + + return f + + +@pytest.fixture +def make_hdca(session): + def f(**kwd): + model = m.HistoryDatasetCollectionAssociation(**kwd) + write_to_db(session, model) + return model + + return f + + @pytest.fixture def make_history(session, make_user): def f(**kwd): - if "user" not in kwd: - kwd["user"] = make_user() + kwd["user"] = kwd.get("user") or make_user() model = m.History(**kwd) write_to_db(session, model) return model @@ -168,10 +214,8 @@ def f(**kwd): @pytest.fixture def make_history_rating_association(session, make_user, make_history): def f(**kwd): - if "user" not in kwd: - kwd["user"] = make_user() - if "item" not in kwd: - kwd["item"] = make_history() + kwd["user"] = kwd.get("user") or make_user() + kwd["item"] = kwd.get("item") or make_history() model = m.HistoryRatingAssociation(**kwd) write_to_db(session, model) return model @@ -229,6 +273,70 @@ def f(**kwd): return f +@pytest.fixture +def make_ldca(session): + def f(**kwd): + model = m.LibraryDatasetCollectionAssociation(**kwd) + write_to_db(session, model) + return model + + return f + + +@pytest.fixture +def make_ldda(session): + def f(**kwd): + model = m.LibraryDatasetDatasetAssociation(**kwd) + write_to_db(session, model) + return model + + return f + + +@pytest.fixture +def make_library(session): + def f(**kwd): + model = m.Library(**kwd) + write_to_db(session, model) + return model + + return f + + +@pytest.fixture +def make_library_folder(session): + def f(**kwd): + model = m.LibraryFolder(**kwd) + write_to_db(session, model) + return model + + return f + + +@pytest.fixture +def make_library_permissions(session, make_library, make_role): + def f(**kwd): + action = kwd.get("action") or random_str() + library = kwd.get("library") or make_library() + role = kwd.get("role") or make_role() + model = m.LibraryPermissions(action, library, role) + write_to_db(session, model) + return model + + return f + + +@pytest.fixture +def make_page(session, make_user): + def f(**kwd): + kwd["user"] = kwd.get("user") or make_user() + model = m.Page(**kwd) + write_to_db(session, model) + return model + + return f + + @pytest.fixture def make_role(session): def f(**kwd): @@ -240,9 +348,10 @@ def f(**kwd): @pytest.fixture -def make_workflow(session): +def make_stored_workflow(session, make_user): def f(**kwd): - model = m.Workflow(**kwd) + kwd["user"] = kwd.get("user") or make_user() + model = m.StoredWorkflow(**kwd) write_to_db(session, model) return model @@ -250,11 +359,13 @@ def f(**kwd): @pytest.fixture -def make_workflow_invocation(session, make_workflow): +def make_task(session, make_job): def f(**kwd): - if "workflow" not in kwd: - kwd["workflow"] = make_workflow() - model = m.WorkflowInvocation(**kwd) + kwd["job"] = kwd.get("job") or make_job() + # Assumption: if the following args are needed, a test should supply them + kwd["working_directory"] = kwd.get("working_directory") or random_str() + kwd["prepare_files_cmd"] = kwd.get("prepare_files_cmd") or random_str() + model = m.Task(**kwd) write_to_db(session, model) return model @@ -264,12 +375,9 @@ def f(**kwd): @pytest.fixture def make_user(session): def f(**kwd): - if "username" not in kwd: - kwd["username"] = random_str() - if "email" not in kwd: - kwd["email"] = random_email() - if "password" not in kwd: - kwd["password"] = random_str() + kwd["username"] = kwd.get("username") or random_str() + kwd["email"] = kwd.get("email") or random_email() + kwd["password"] = kwd.get("password") or random_str() model = m.User(**kwd) write_to_db(session, model) return model @@ -277,6 +385,58 @@ def f(**kwd): return f +@pytest.fixture +def make_user_item_rating_association(session): + def f(assoc_class, user, item, rating): + model = assoc_class(user, item, rating) + write_to_db(session, model) + return model + + return f + + +@pytest.fixture +def make_user_role_association(session): + def f(user, role): + model = m.UserRoleAssociation(user, role) + write_to_db(session, model) + return model + + return f + + +@pytest.fixture +def make_visualization(session, make_user): + def f(**kwd): + kwd["user"] = kwd.get("user") or make_user() + model = m.Visualization(**kwd) + write_to_db(session, model) + return model + + return f + + +@pytest.fixture +def make_workflow(session): + def f(**kwd): + model = m.Workflow(**kwd) + write_to_db(session, model) + return model + + return f + + +@pytest.fixture +def make_workflow_invocation(session, make_workflow): + def f(**kwd): + kwd["workflow"] = kwd.get("workflow") or make_workflow() + model = m.WorkflowInvocation(**kwd) + write_to_db(session, model) + return model + + return f + + # utility functions diff --git a/test/unit/data/model/db/__init__.py b/test/unit/data/model/db/__init__.py index e69de29bb2d1..13a615086ebe 100644 --- a/test/unit/data/model/db/__init__.py +++ b/test/unit/data/model/db/__init__.py @@ -0,0 +1,24 @@ +from collections import ( + Counter, + namedtuple, +) + +PRIVATE_OBJECT_STORE_ID = "my_private_data" + +MockTransaction = namedtuple("MockTransaction", "user") + + +class MockObjectStore: + + def is_private(self, object): + if object.object_store_id == PRIVATE_OBJECT_STORE_ID: + return True + else: + return False + + +def verify_items(items, expected_items): + """ + Assert that items and expected_items contain the same elements. + """ + assert Counter(items) == Counter(expected_items) diff --git a/test/unit/data/model/db/conftest.py b/test/unit/data/model/db/conftest.py index 240268ef207f..d36a38e71ace 100644 --- a/test/unit/data/model/db/conftest.py +++ b/test/unit/data/model/db/conftest.py @@ -11,6 +11,9 @@ from sqlalchemy.orm import Session from galaxy import model as m +from galaxy.datatypes.registry import Registry as DatatypesRegistry +from galaxy.model.triggers.update_audit_table import install as install_timestamp_triggers +from . import MockObjectStore if TYPE_CHECKING: from sqlalchemy.engine import Engine @@ -39,6 +42,19 @@ def session(engine: "Engine") -> Session: def init_database(engine: "Engine") -> None: """Create database objects.""" m.mapper_registry.metadata.create_all(engine) + install_timestamp_triggers(engine) + + +@pytest.fixture(autouse=True, scope="module") +def init_object_store() -> None: + m.Dataset.object_store = MockObjectStore() # type:ignore[assignment] + + +@pytest.fixture(autouse=True, scope="module") +def init_datatypes() -> None: + datatypes_registry = DatatypesRegistry() + datatypes_registry.load_datatypes() + m.set_datatypes_registry(datatypes_registry) @pytest.fixture(autouse=True) diff --git a/test/unit/data/model/db/test_history_table_pruner.py b/test/unit/data/model/db/test_history_table_pruner.py index 376b0c52939c..916e3d6f1621 100644 --- a/test/unit/data/model/db/test_history_table_pruner.py +++ b/test/unit/data/model/db/test_history_table_pruner.py @@ -4,7 +4,6 @@ from sqlalchemy import ( func, select, - text, ) from galaxy import model as m @@ -69,10 +68,6 @@ def setup_db( make_data_manager_history_association(history=histories[32]) make_cleanup_event_history_association(history_id=histories[33].id) make_galaxy_session_to_history_association(history=histories[34]) - # HistoryAudit is not instantiable, so created association manually. - stmt = text("insert into history_audit values(:history_id, :update_time)") - params = {"history_id": histories[35].id, "update_time": datetime.date.today()} - session.execute(stmt, params) # 6. Create a galaxy_session record referring to a history. # This cannot be deleted, but the history reference can be set to null. @@ -86,7 +81,8 @@ def setup_db( def test_run(setup_db, session, db_url, engine): - + # In this test we do not verify counts of rows in the HistoryAudit table since those rows are created + # automatically via db trigger. def verify_counts(model, expected): assert session.scalar(select(func.count()).select_from(model)) == expected @@ -122,7 +118,6 @@ def verify_counts(model, expected): m.DataManagerHistoryAssociation, m.CleanupEventHistoryAssociation, m.GalaxySessionToHistoryAssociation, - m.HistoryAudit, ]: verify_counts(model, 1) verify_counts( @@ -162,6 +157,5 @@ def verify_counts(model, expected): m.DataManagerHistoryAssociation, m.CleanupEventHistoryAssociation, m.GalaxySessionToHistoryAssociation, - m.HistoryAudit, ]: verify_counts(model, 0) diff --git a/test/unit/data/model/db/test_libraries.py b/test/unit/data/model/db/test_libraries.py new file mode 100644 index 000000000000..80dae0b15b50 --- /dev/null +++ b/test/unit/data/model/db/test_libraries.py @@ -0,0 +1,100 @@ +from galaxy.model.db.library import ( + get_libraries_by_name, + get_libraries_for_admins, + get_libraries_for_nonadmins, + get_library_ids, + get_library_permissions_by_role, +) +from . import verify_items + + +def test_get_library_ids(session, make_library, make_library_permissions): + l1, l2, l3, l4 = make_library(), make_library(), make_library(), make_library() + make_library_permissions(library=l1, action="a") + make_library_permissions(library=l2, action="b") + make_library_permissions(library=l3, action="b") + make_library_permissions(library=l3, action="b") # intentional duplicate + make_library_permissions(library=l4, action="c") + + ids = get_library_ids(session, "b").all() + expected = [l2.id, l3.id] + verify_items(ids, expected) + + +def test_get_library_permissions_by_role(session, make_role, make_library_permissions): + r1, r2 = make_role(), make_role() + make_library_permissions() + make_library_permissions() + make_library_permissions(role=r1) + make_library_permissions(role=r2) + lps = get_library_permissions_by_role(session, (r1.id, r2.id)).all() + + lp_roles = [lp.role for lp in lps] + expected = [r1, r2] + verify_items(lp_roles, expected) + + +def test_get_libraries_for_admins(session, make_library): + libs = [make_library() for _ in range(5)] + libs[0].deleted = True + libs[1].deleted = True + libs[2].deleted = False + libs[3].deleted = False + libs[4].deleted = False + + libs_deleted = get_libraries_for_admins(session, True).all() + expected = [libs[0], libs[1]] + verify_items(libs_deleted, expected) + + libs_not_deleted = get_libraries_for_admins(session, False).all() + expected = [libs[2], libs[3], libs[4]] + verify_items(libs_not_deleted, expected) + + libs_all = get_libraries_for_admins(session, None).all() + verify_items(libs_all, libs) + + +def test_get_libraries_for_admins__ordering(session, make_library): + l1 = make_library(name="c") + l2 = make_library(name="B") + l3 = make_library(name="a") + + libs = get_libraries_for_admins(session, None).all() + assert libs == [l3, l2, l1] # sorted by name, case insensitive + + +def test_get_libraries_for_non_admins(session, make_library): + l1, l2, l3, l4 = (make_library() for _ in range(4)) + restricted_ids = (l2.id, l3.id, l4.id) + accessible_restricted_ids = (l2.id,) + l2.deleted = False # use default for l1.deleted + l3.deleted = False + l4.deleted = True + + allowed = get_libraries_for_nonadmins(session, restricted_ids, accessible_restricted_ids).all() + # Expected: l1 (not deleted, not restricted), l2 (not deleted, restricted but accessible) + # Not returned: l3 (not deleted but restricted), l4 (deleted) + expected = [l1, l2] + verify_items(allowed, expected) + + +def test_get_libraries_for_admins_non_admins__ordering(session, make_library): + l1 = make_library(name="c") + l2 = make_library(name="B") + l3 = make_library(name="a") + + expected = [l3, l2, l1] # sorted by Library.name, case insensitive + libs = get_libraries_for_admins(session, None).all() + assert libs == expected + libs = get_libraries_for_nonadmins(session, [], []).all() + assert libs == expected + + +def test_get_libraries_by_name(session, make_library): + make_library(name="a") + l2 = make_library(name="b") + l3 = make_library(name="b") # intentional duplicate + l3.deleted = True # should not be returned + + libs = get_libraries_by_name(session, "b").all() + assert libs == [l2] diff --git a/test/unit/data/model/db/test_misc.py b/test/unit/data/model/db/test_misc.py new file mode 100644 index 000000000000..b8ef3fe5cf0c --- /dev/null +++ b/test/unit/data/model/db/test_misc.py @@ -0,0 +1,326 @@ +import random + +import pytest +from sqlalchemy import inspect + +from galaxy import model as m +from galaxy.model.unittest_utils.db_helpers import get_hdca_by_name +from . import ( + MockTransaction, + PRIVATE_OBJECT_STORE_ID, +) + + +def test_history_update(make_history, make_hda, session): + """ + Verify the following behavior: + - history updated due to hda insert + - history updated due to hda update + - history NOT updated when hda copied + """ + h1 = make_history() + old_update_time = h1.update_time + + hda = make_hda(history=h1, create_dataset=True, sa_session=session) + # history updated due to hda insert + assert h1.update_time > old_update_time + + old_update_time = h1.update_time + hda.name = "new name" + session.add(hda) + session.commit() + # history updated due to hda update + assert h1.update_time > old_update_time + + old_update_time = h1.update_time + hda2 = hda.copy() + assert hda2 + # history NOT updated when hda copied + assert h1.update_time == old_update_time + + +def test_ratings( + make_user, + make_stored_workflow, + make_history, + make_page, + make_visualization, + make_hdca, + make_ldca, + make_user_item_rating_association, +): + def _test_rating(assoc_class, item, assoc_class_item_attr_name): + user = make_user() + rating = random.randint(0, 100) + rating_assoc = make_user_item_rating_association(assoc_class, user, item, rating) + assert rating_assoc.user == user + assert getattr(rating_assoc, assoc_class_item_attr_name) == item + assert rating_assoc.rating == rating + + _test_rating(m.StoredWorkflowRatingAssociation, make_stored_workflow(), "stored_workflow") + _test_rating(m.HistoryRatingAssociation, make_history(), "history") + _test_rating(m.PageRatingAssociation, make_page(), "page") + _test_rating(m.VisualizationRatingAssociation, make_visualization(), "visualization") + _test_rating(m.HistoryDatasetCollectionRatingAssociation, make_hdca(), "dataset_collection") + _test_rating(m.LibraryDatasetCollectionRatingAssociation, make_ldca(), "dataset_collection") + + +def test_hda_to_library_dataset_dataset_association(session, make_user, make_history, make_hda, make_library_folder): + hda = make_hda(create_dataset=True, sa_session=session) + target_folder = make_library_folder() + mock_trans = MockTransaction(user=None) + + ldda = hda.to_library_dataset_dataset_association( + trans=mock_trans, + target_folder=target_folder, + ) + assert target_folder.item_count == 1 + assert ldda.id + assert ldda.library_dataset.id + assert ldda.library_dataset.library_dataset_dataset_association.id + + new_ldda = hda.to_library_dataset_dataset_association( + trans=mock_trans, target_folder=target_folder, replace_dataset=ldda.library_dataset + ) + assert new_ldda.id != ldda.id + assert new_ldda.library_dataset_id == ldda.library_dataset_id + assert new_ldda.library_dataset.library_dataset_dataset_association_id == new_ldda.id + assert len(new_ldda.library_dataset.expired_datasets) == 1 + assert new_ldda.library_dataset.expired_datasets[0] == ldda + assert target_folder.item_count == 1 + + +def test_hda_to_library_dataset_dataset_association_fails_if_private( + session, make_user, make_history, make_hda, make_library_folder +): + hda = make_hda(create_dataset=True, sa_session=session) + hda.dataset.object_store_id = PRIVATE_OBJECT_STORE_ID + target_folder = make_library_folder() + mock_trans = MockTransaction(user=None) + + with pytest.raises(Exception) as exec_info: + hda.to_library_dataset_dataset_association( + trans=mock_trans, + target_folder=target_folder, + ) + assert m.CANNOT_SHARE_PRIVATE_DATASET_MESSAGE in str(exec_info.value) + + +def test_collection_get_interface(session, make_hda, make_dataset_collection): + c = make_dataset_collection(collection_type="list") + d = make_hda(create_dataset=True, sa_session=session) + elements = 100 + dces = [ + m.DatasetCollectionElement(collection=c, element=d, element_identifier=f"{i}", element_index=i) + for i in range(elements) + ] + for i in range(elements): + assert c[i] == dces[i] + + +def test_collections_in_histories(session, make_dataset_collection, make_dataset_collection_element, make_hdca): + c = make_dataset_collection(collection_type="pair") + dce1 = make_dataset_collection_element(collection=c, element_identifier="left") + dce2 = make_dataset_collection_element(collection=c, element_identifier="right") + make_hdca(name="foo", collection=c) + loaded_dataset_collection = get_hdca_by_name(session, "foo").collection + + assert len(loaded_dataset_collection.elements) == 2 + assert loaded_dataset_collection.collection_type == "pair" + assert loaded_dataset_collection["left"] == dce1 + assert loaded_dataset_collection["right"] == dce2 + + +def test_dataset_action_tuples( + session, + make_user, + make_history, + make_hda, + make_role, + make_dataset_permissions, + make_dataset_collection, + make_dataset_collection_element, +): + role = make_role() + hda1 = make_hda(create_dataset=True, sa_session=session) + hda2 = make_hda(create_dataset=True, sa_session=session) + make_dataset_permissions(action="action1", dataset=hda1.dataset, role=role) + make_dataset_permissions(action=None, dataset=hda1.dataset, role=role) + make_dataset_permissions(action="action3", dataset=hda1.dataset, role=role) + c = make_dataset_collection(collection_type="type1") + make_dataset_collection_element(collection=c, element=hda1) + make_dataset_collection_element(collection=c, element=hda2) + assert c.dataset_action_tuples == [("action1", role.id), ("action3", role.id)] + + +def test_dataset_dbkeys_and_extensions_summary( + session, make_hda, make_dataset_collection, make_dataset_collection_element, make_hdca +): + d1 = make_hda(extension="bam", dbkey="hg19", create_dataset=True, sa_session=session) + d2 = make_hda(extension="txt", dbkey="hg19", create_dataset=True, sa_session=session) + c1 = make_dataset_collection(collection_type="paired") + make_dataset_collection_element(collection=c1, element=d1) + make_dataset_collection_element(collection=c1, element=d2) + + hdca = make_hdca(collection=c1) + assert hdca.dataset_dbkeys_and_extensions_summary[0] == {"hg19"} + assert hdca.dataset_dbkeys_and_extensions_summary[1] == {"bam", "txt"} + + +def test_populated_optimized_ok(session, make_dataset_collection, make_dataset_collection_element, make_hda): + c1 = make_dataset_collection(collection_type="paired") + make_dataset_collection_element(collection=c1, element=make_hda(create_dataset=True, sa_session=session)) + make_dataset_collection_element(collection=c1, element=make_hda(create_dataset=True, sa_session=session)) + assert c1.populated + assert c1.populated_optimized + + +def test_populated_optimized_empty_list_list_ok(make_dataset_collection, make_dataset_collection_element): + c1 = make_dataset_collection(collection_type="list") + c2 = make_dataset_collection(collection_type="list:list") + make_dataset_collection_element(collection=c2, element=c1) + assert c1.populated + assert c1.populated_optimized + assert c2.populated + assert c2.populated_optimized + + +def test_populated_optimized_list_list_not_populated(make_dataset_collection, make_dataset_collection_element): + c1 = make_dataset_collection(collection_type="list", populated=False) + c2 = make_dataset_collection(collection_type="list:list") + make_dataset_collection_element(collection=c2, element=c1) + assert not c1.populated + assert not c1.populated_optimized + assert not c2.populated + assert not c2.populated_optimized + + +def test_default_disk_usage(session, make_user): + u = make_user() + u.adjust_total_disk_usage(1, None) + user_reload = session.get(m.User, u.id) + assert user_reload.disk_usage == 1 + + +def test_history_contents(session, make_history, make_hda): + h1 = make_history() + d1 = make_hda(history=h1, name="1") + d2 = make_hda(history=h1, name="2", visible=False, create_dataset=True, sa_session=session) + d2.dataset.object_store_id = "foobar" + d3 = make_hda(history=h1, name="3", deleted=True, create_dataset=True, sa_session=session) + d3.dataset.object_store_id = "three_store" + d4 = make_hda(history=h1, name="4", visible=False, deleted=True) + + def contents_iter_names(**kwds): + history = session.get(m.History, h1.id) + return [h.name for h in history.contents_iter(**kwds)] + + assert contents_iter_names() == ["1", "2", "3", "4"] + assert contents_iter_names(deleted=False) == ["1", "2"] + assert contents_iter_names(visible=True) == ["1", "3"] + assert contents_iter_names(visible=True, object_store_ids=["three_store"]) == ["3"] + assert contents_iter_names(visible=False) == ["2", "4"] + assert contents_iter_names(deleted=True, visible=False) == ["4"] + assert contents_iter_names(deleted=False, object_store_ids=["foobar"]) == ["2"] + assert contents_iter_names(deleted=False, object_store_ids=["foobar2"]) == [] + assert contents_iter_names(ids=[d1.id, d2.id, d3.id, d4.id]) == ["1", "2", "3", "4"] + assert contents_iter_names(ids=[d1.id, d2.id, d3.id, d4.id], max_in_filter_length=1) == ["1", "2", "3", "4"] + assert contents_iter_names(ids=[d1.id, d3.id]) == ["1", "3"] + + +def test_current_galaxy_session(make_user, make_galaxy_session): + user = make_user() + galaxy_session = make_galaxy_session(user=user) + assert user.current_galaxy_session == galaxy_session + new_galaxy_session = make_galaxy_session() + user.galaxy_sessions.append(new_galaxy_session) + assert user.current_galaxy_session == new_galaxy_session + + +def test_next_hid(make_history): + h = make_history() + assert h.hid_counter == 1 + h._next_hid() + assert h.hid_counter == 2 + h._next_hid(n=3) + assert h.hid_counter == 5 + + +def test_history_hid_counter_is_expired_after_next_hid_call(make_history): + h = make_history() + state = inspect(h) + assert h.hid_counter == 1 + assert "hid_counter" not in state.unloaded + assert "id" not in state.unloaded + + h._next_hid() + + assert "hid_counter" in state.unloaded # this attribute has been expired + assert "id" not in state.unloaded # but other attributes have NOT been expired + assert h.hid_counter == 2 # check this last: this causes this hid_counter to be reloaded + + +def test_get_display_name(make_ldda, make_hda, make_history, make_library, make_library_folder): + + def assert_display_name_converts_to_unicode(item, name): + assert isinstance(item.get_display_name(), str) + assert item.get_display_name() == name + + ldda = make_ldda(name="ldda_name") + assert_display_name_converts_to_unicode(ldda, "ldda_name") + + hda = make_hda(name="hda_name") + assert_display_name_converts_to_unicode(hda, "hda_name") + + history = make_history(name="history_name") + assert_display_name_converts_to_unicode(history, "history_name") + + library = make_library(name="library_name") + assert_display_name_converts_to_unicode(library, "library_name") + + library_folder = make_library_folder(name="library_folder") + assert_display_name_converts_to_unicode(library_folder, "library_folder") + + history = make_history(name="Hello₩◎ґʟⅾ") + assert isinstance(history.name, str) + assert isinstance(history.get_display_name(), str) + assert history.get_display_name() == "Hello₩◎ґʟⅾ" + + +def test_metadata_spec(make_hda): + metadata = dict(chromCol=1, startCol=2, endCol=3) + d = make_hda(extension="interval", metadata=metadata) + assert d.metadata.chromCol == 1 + assert d.metadata.anyAttribute is None + assert "items" not in d.metadata + + +def test_job_metrics(make_job): + job = make_job() + job.add_metric("gx", "galaxy_slots", 5) + job.add_metric("system", "system_name", "localhost") + + assert len(job.text_metrics) == 1 + assert job.text_metrics[0].plugin == "system" + assert job.text_metrics[0].metric_name == "system_name" + assert job.text_metrics[0].metric_value == "localhost" + assert len(job.numeric_metrics) == 1 + assert job.numeric_metrics[0].plugin == "gx" + assert job.numeric_metrics[0].metric_name == "galaxy_slots" + assert job.numeric_metrics[0].metric_value == 5 + + +def test_task_metrics(make_task): + task = make_task() + task.add_metric("foo", "some-name", "some-value") + big_value = ":".join(f"{i}" for i in range(2000)) + task.add_metric("env", "BIG_PATH", big_value) + + assert len(task.text_metrics) == 2 + assert task.text_metrics[0].plugin == "foo" + assert task.text_metrics[0].metric_name == "some-name" + assert task.text_metrics[0].metric_value == "some-value" + assert task.text_metrics[1].plugin == "env" + assert task.text_metrics[1].metric_name == "BIG_PATH" + # Ensure big values truncated + assert len(task.text_metrics[1].metric_value) <= 1023 diff --git a/test/unit/data/model/db/test_role.py b/test/unit/data/model/db/test_role.py new file mode 100644 index 000000000000..59daf8a5a8ea --- /dev/null +++ b/test/unit/data/model/db/test_role.py @@ -0,0 +1,44 @@ +from galaxy.model import Role +from galaxy.model.db.role import ( + get_npns_roles, + get_private_user_role, + get_roles_by_ids, +) +from . import verify_items + + +def test_get_npns_roles(session, make_role): + make_role(deleted=True) + make_role(type=Role.types.PRIVATE) + make_role(type=Role.types.SHARING) + r4 = make_role() + r5 = make_role() + + roles = get_npns_roles(session).all() + # Expected: r4, r5 + # Not returned: r1: deleted, r2: private, r3: sharing + expected = [r4, r5] + verify_items(roles, expected) + + +def test_get_private_user_role(session, make_user, make_role, make_user_role_association): + u1, u2 = make_user(), make_user() + r1 = make_role(type=Role.types.PRIVATE) + r2 = make_role(type=Role.types.PRIVATE) + r3 = make_role() + make_user_role_association(u1, r1) # user1 private + make_user_role_association(u1, r3) # user1 non-private + make_user_role_association(u2, r2) # user2 private + + role = get_private_user_role(u1, session) + assert role is r1 + + +def test_get_roles_by_ids(session, make_role): + roles = [make_role() for _ in range(10)] # create roles + r1, r2, r3 = roles[0], roles[3], roles[7] # select some random roles + ids = [r1.id, r2.id, r3.id] + + roles2 = get_roles_by_ids(session, ids) + expected = [r1, r2, r3] + verify_items(roles2, expected) diff --git a/test/unit/data/model/db/test_user.py b/test/unit/data/model/db/test_user.py new file mode 100644 index 000000000000..5085a71b8b42 --- /dev/null +++ b/test/unit/data/model/db/test_user.py @@ -0,0 +1,82 @@ +import pytest +from sqlalchemy.exc import IntegrityError + +from galaxy.model.db.user import ( + get_user_by_email, + get_user_by_username, + get_users_by_ids, + get_users_for_index, +) +from . import verify_items + + +@pytest.fixture +def make_random_users(session, make_user): + def f(count): + return [make_user() for _ in range(count)] + + return f + + +def test_get_user_by_username(session, make_user): + my_user = make_user(username="a") + make_user() # make another user + make_user(email="other_username") # make another user + + user = get_user_by_username(session, "a") + assert user is my_user + + +def test_get_user_by_email(session, make_user): + my_user = make_user(email="a@foo.bar") + make_user(email="other_email") # make another user + + user = get_user_by_email(session, "a@foo.bar") + assert user is my_user + + +def test_get_users_by_ids(session, make_random_users): + users = make_random_users(10) + u1, u2, u3 = users[0], users[3], users[7] # select some random users + ids = [u1.id, u2.id, u3.id] + + users2 = get_users_by_ids(session, ids) + expected = [u1, u2, u3] + verify_items(users2, expected) + + +def test_get_users_for_index(session, make_user): + u1 = make_user(email="a", username="b") + u2 = make_user(email="c", username="d") + u3 = make_user(email="e", username="f") + u4 = make_user(email="g", username="h") + u5 = make_user(email="i", username="z") + u6 = make_user(email="z", username="i") + + users = get_users_for_index(session, False, f_email="a", expose_user_email=True) + verify_items(users, [u1]) + users = get_users_for_index(session, False, f_email="c", is_admin=True) + verify_items(users, [u2]) + users = get_users_for_index(session, False, f_name="f", expose_user_name=True) + verify_items(users, [u3]) + users = get_users_for_index(session, False, f_name="h", is_admin=True) + verify_items(users, [u4]) + users = get_users_for_index(session, False, f_any="i", is_admin=True) + verify_items(users, [u5, u6]) + users = get_users_for_index(session, False, f_any="i", expose_user_email=True, expose_user_name=True) + verify_items(users, [u5, u6]) + users = get_users_for_index(session, False, f_any="i", expose_user_email=True) + verify_items(users, [u5]) + users = get_users_for_index(session, False, f_any="i", expose_user_name=True) + verify_items(users, [u6]) + + u1.deleted = True + users = get_users_for_index(session, True) + verify_items(users, [u1]) + + +def test_username_is_unique(make_user): + # Verify username model definition + make_user(username="a") + with pytest.raises(IntegrityError): + make_user(username="a") diff --git a/test/unit/data/test_galaxy_mapping.py b/test/unit/data/test_galaxy_mapping.py index 1a88a82f78ed..3e99841db44b 100644 --- a/test/unit/data/test_galaxy_mapping.py +++ b/test/unit/data/test_galaxy_mapping.py @@ -1,4 +1,3 @@ -import collections import os import random import uuid @@ -77,189 +76,6 @@ def expunge(cls): class TestMappings(BaseModelTestCase): - def test_ratings(self): - user_email = "rater@example.com" - u = model.User(email=user_email, password="password") - self.persist(u) - - def persist_and_check_rating(rating_class, item): - rating = 5 - rating_association = rating_class(u, item, rating) - self.persist(rating_association) - self.expunge() - stored_rating = self.model.session.scalars(select(rating_class)).all()[0] - assert stored_rating.rating == rating - assert stored_rating.user.email == user_email - - sw = model.StoredWorkflow() - add_object_to_object_session(sw, u) - sw.user = u - self.persist(sw) - persist_and_check_rating(model.StoredWorkflowRatingAssociation, sw) - - h = model.History(name="History for Rating", user=u) - self.persist(h) - persist_and_check_rating(model.HistoryRatingAssociation, h) - - d1 = model.HistoryDatasetAssociation( - extension="txt", history=h, create_dataset=True, sa_session=self.model.session - ) - self.persist(d1) - persist_and_check_rating(model.HistoryDatasetAssociationRatingAssociation, d1) - - page = model.Page() - page.user = u - self.persist(page) - persist_and_check_rating(model.PageRatingAssociation, page) - - visualization = model.Visualization() - visualization.user = u - self.persist(visualization) - persist_and_check_rating(model.VisualizationRatingAssociation, visualization) - - dataset_collection = model.DatasetCollection(collection_type="paired") - history_dataset_collection = model.HistoryDatasetCollectionAssociation(collection=dataset_collection) - self.persist(history_dataset_collection) - persist_and_check_rating(model.HistoryDatasetCollectionRatingAssociation, history_dataset_collection) - - library_dataset_collection = model.LibraryDatasetCollectionAssociation(collection=dataset_collection) - self.persist(library_dataset_collection) - persist_and_check_rating(model.LibraryDatasetCollectionRatingAssociation, library_dataset_collection) - - def test_display_name(self): - def assert_display_name_converts_to_unicode(item, name): - assert isinstance(item.get_display_name(), str) - assert item.get_display_name() == name - - ldda = model.LibraryDatasetDatasetAssociation(name="ldda_name") - assert_display_name_converts_to_unicode(ldda, "ldda_name") - - hda = model.HistoryDatasetAssociation(name="hda_name") - assert_display_name_converts_to_unicode(hda, "hda_name") - - history = model.History(name="history_name") - assert_display_name_converts_to_unicode(history, "history_name") - - library = model.Library(name="library_name") - assert_display_name_converts_to_unicode(library, "library_name") - - library_folder = model.LibraryFolder(name="library_folder") - assert_display_name_converts_to_unicode(library_folder, "library_folder") - - history = model.History(name="Hello₩◎ґʟⅾ") - - assert isinstance(history.name, str) - assert isinstance(history.get_display_name(), str) - assert history.get_display_name() == "Hello₩◎ґʟⅾ" - - def test_hda_to_library_dataset_dataset_association(self): - model = self.model - u = self.model.User(email="mary@example.com", password="password") - h1 = model.History(name="History 1", user=u) - hda = model.HistoryDatasetAssociation( - name="hda_name", create_dataset=True, history=h1, sa_session=model.session - ) - self.persist(hda) - trans = collections.namedtuple("trans", "user") - target_folder = model.LibraryFolder(name="library_folder") - ldda = hda.to_library_dataset_dataset_association( - trans=trans(user=u), - target_folder=target_folder, - ) - assert target_folder.item_count == 1 - assert ldda.id - assert ldda.library_dataset.id - assert ldda.library_dataset_id - assert ldda.library_dataset.library_dataset_dataset_association - assert ldda.library_dataset.library_dataset_dataset_association_id - library_dataset_id = ldda.library_dataset_id - replace_dataset = ldda.library_dataset - new_ldda = hda.to_library_dataset_dataset_association( - trans=trans(user=u), target_folder=target_folder, replace_dataset=replace_dataset - ) - assert new_ldda.id != ldda.id - assert new_ldda.library_dataset_id == library_dataset_id - assert new_ldda.library_dataset.library_dataset_dataset_association_id == new_ldda.id - assert len(new_ldda.library_dataset.expired_datasets) == 1 - assert new_ldda.library_dataset.expired_datasets[0] == ldda - assert target_folder.item_count == 1 - - def test_hda_to_library_dataset_dataset_association_fails_if_private(self): - model = self.model - u = model.User(email="mary2@example.com", password="password") - h1 = model.History(name="History 1", user=u) - hda = model.HistoryDatasetAssociation( - name="hda_name", create_dataset=True, history=h1, sa_session=model.session - ) - hda.dataset.object_store_id = PRIVATE_OBJECT_STORE_ID - self.persist(hda) - trans = collections.namedtuple("trans", "user") - target_folder = model.LibraryFolder(name="library_folder") - with pytest.raises(Exception) as exec_info: - hda.to_library_dataset_dataset_association( - trans=trans(user=u), - target_folder=target_folder, - ) - assert galaxy.model.CANNOT_SHARE_PRIVATE_DATASET_MESSAGE in str(exec_info.value) - - def test_tags(self): - TAG_NAME = "Test Tag" - my_tag = model.Tag(name=TAG_NAME) - u = model.User(email="tagger@example.com", password="password") - self.persist(my_tag, u) - - def tag_and_test(taggable_object, tag_association_class): - q = select(tag_association_class).join(model.Tag).where(model.Tag.name == TAG_NAME) - - assert len(self.model.session.execute(q).all()) == 0 - - tag_association = tag_association_class() - tag_association.tag = my_tag - taggable_object.tags = [tag_association] - self.persist(tag_association, taggable_object) - - assert len(self.model.session.execute(q).all()) == 1 - - sw = model.StoredWorkflow(user=u) - tag_and_test(sw, model.StoredWorkflowTagAssociation) - - h = model.History(name="History for Tagging", user=u) - tag_and_test(h, model.HistoryTagAssociation) - - d1 = model.HistoryDatasetAssociation( - extension="txt", history=h, create_dataset=True, sa_session=self.model.session - ) - tag_and_test(d1, model.HistoryDatasetAssociationTagAssociation) - - page = model.Page(user=u) - tag_and_test(page, model.PageTagAssociation) - - visualization = model.Visualization(user=u) - tag_and_test(visualization, model.VisualizationTagAssociation) - - dataset_collection = model.DatasetCollection(collection_type="paired") - history_dataset_collection = model.HistoryDatasetCollectionAssociation(collection=dataset_collection) - tag_and_test(history_dataset_collection, model.HistoryDatasetCollectionTagAssociation) - - library_dataset_collection = model.LibraryDatasetCollectionAssociation(collection=dataset_collection) - tag_and_test(library_dataset_collection, model.LibraryDatasetCollectionTagAssociation) - - def test_collection_get_interface(self): - u = model.User(email="mary@example.com", password="password") - h1 = model.History(name="History 1", user=u) - d1 = model.HistoryDatasetAssociation( - extension="txt", history=h1, create_dataset=True, sa_session=self.model.session - ) - c1 = model.DatasetCollection(collection_type="list") - elements = 100 - dces = [ - model.DatasetCollectionElement(collection=c1, element=d1, element_identifier=f"{i}", element_index=i) - for i in range(elements) - ] - self.persist(u, h1, d1, c1, *dces, commit=False, expunge=False) - self.model.session.flush() - for i in range(elements): - assert c1[i] == dces[i] def test_dataset_instance_order(self) -> None: u = model.User(email="mary@example.com", password="password") @@ -308,71 +124,6 @@ def test_dataset_instance_order(self) -> None: assert all(d.name == f"forward_{i}" for i, d in enumerate(forward_hdas)) assert all(d.name == f"reverse_{i}" for i, d in enumerate(reverse_hdas)) - def test_collections_in_histories(self): - u = model.User(email="mary@example.com", password="password") - h1 = model.History(name="History 1", user=u) - d1 = model.HistoryDatasetAssociation( - extension="txt", history=h1, create_dataset=True, sa_session=self.model.session - ) - d2 = model.HistoryDatasetAssociation( - extension="txt", history=h1, create_dataset=True, sa_session=self.model.session - ) - - c1 = model.DatasetCollection(collection_type="pair") - hc1 = model.HistoryDatasetCollectionAssociation(history=h1, collection=c1, name="HistoryCollectionTest1") - - dce1 = model.DatasetCollectionElement(collection=c1, element=d1, element_identifier="left") - dce2 = model.DatasetCollectionElement(collection=c1, element=d2, element_identifier="right") - - self.persist(u, h1, d1, d2, c1, hc1, dce1, dce2) - - stmt = ( - select(model.HistoryDatasetCollectionAssociation) - .filter(model.HistoryDatasetCollectionAssociation.name == "HistoryCollectionTest1") - .limit(1) - ) - loaded_dataset_collection = self.model.session.scalars(stmt).first().collection - assert len(loaded_dataset_collection.elements) == 2 - assert loaded_dataset_collection.collection_type == "pair" - assert loaded_dataset_collection["left"] == dce1 - assert loaded_dataset_collection["right"] == dce2 - - def test_collections_in_library_folders(self): - u = model.User(email="mary2@example.com", password="password") - lf = model.LibraryFolder(name="RootFolder") - library = model.Library(name="Library1", root_folder=lf) - ld1 = model.LibraryDataset() - ld2 = model.LibraryDataset() - - ldda1 = model.LibraryDatasetDatasetAssociation(extension="txt", library_dataset=ld1) - ldda2 = model.LibraryDatasetDatasetAssociation(extension="txt", library_dataset=ld1) - - c1 = model.DatasetCollection(collection_type="pair") - dce1 = model.DatasetCollectionElement(collection=c1, element=ldda1) - dce2 = model.DatasetCollectionElement(collection=c1, element=ldda2) - self.persist(u, library, lf, ld1, ld2, c1, ldda1, ldda2, dce1, dce2) - - # TODO: - # loaded_dataset_collection = self.query( model.DatasetCollection ).filter( model.DatasetCollection.name == "LibraryCollectionTest1" ).first() - # assert len(loaded_dataset_collection.datasets) == 2 - # assert loaded_dataset_collection.collection_type == "pair" - - def test_dataset_action_tuples(self): - u = model.User(email="foo", password="foo") - h1 = model.History(user=u) - hda1 = model.HistoryDatasetAssociation(history=h1, create_dataset=True, sa_session=self.model.session) - hda2 = model.HistoryDatasetAssociation(history=h1, create_dataset=True, sa_session=self.model.session) - r1 = model.Role() - dp1 = model.DatasetPermissions(action="action1", dataset=hda1.dataset, role=r1) - dp2 = model.DatasetPermissions(action=None, dataset=hda1.dataset, role=r1) - dp3 = model.DatasetPermissions(action="action3", dataset=hda1.dataset, role=r1) - c1 = model.DatasetCollection(collection_type="type1") - dce1 = model.DatasetCollectionElement(collection=c1, element=hda1) - dce2 = model.DatasetCollectionElement(collection=c1, element=hda2) - self.model.session.add_all([u, h1, hda1, hda2, r1, dp1, dp2, dp3, c1, dce1, dce2]) - self.model.session.flush() - assert c1.dataset_action_tuples == [("action1", r1.id), ("action3", r1.id)] - def test_nested_collection_attributes(self): u = model.User(email="mary2@example.com", password="password") h1 = model.History(name="History 1", user=u) @@ -461,213 +212,6 @@ def test_nested_collection_attributes(self): ] assert c4.dataset_elements == [dce1, dce2] - def test_dataset_dbkeys_and_extensions_summary(self): - u = model.User(email="mary2@example.com", password="password") - h1 = model.History(name="History 1", user=u) - d1 = model.HistoryDatasetAssociation( - extension="bam", dbkey="hg19", history=h1, create_dataset=True, sa_session=self.model.session - ) - d2 = model.HistoryDatasetAssociation( - extension="txt", dbkey="hg19", history=h1, create_dataset=True, sa_session=self.model.session - ) - c1 = model.DatasetCollection(collection_type="paired") - dce1 = model.DatasetCollectionElement(collection=c1, element=d1, element_identifier="forward", element_index=0) - dce2 = model.DatasetCollectionElement(collection=c1, element=d2, element_identifier="reverse", element_index=1) - hdca = model.HistoryDatasetCollectionAssociation(collection=c1, history=h1) - self.model.session.add_all([d1, d2, c1, dce1, dce2, hdca]) - self.model.session.flush() - assert hdca.dataset_dbkeys_and_extensions_summary[0] == {"hg19"} - assert hdca.dataset_dbkeys_and_extensions_summary[1] == {"bam", "txt"} - - def test_populated_optimized_ok(self): - u = model.User(email="mary2@example.com", password="password") - h1 = model.History(name="History 1", user=u) - d1 = model.HistoryDatasetAssociation( - extension="txt", history=h1, create_dataset=True, sa_session=self.model.session - ) - d2 = model.HistoryDatasetAssociation( - extension="txt", history=h1, create_dataset=True, sa_session=self.model.session - ) - c1 = model.DatasetCollection(collection_type="paired") - dce1 = model.DatasetCollectionElement(collection=c1, element=d1, element_identifier="forward", element_index=0) - dce2 = model.DatasetCollectionElement(collection=c1, element=d2, element_identifier="reverse", element_index=1) - self.model.session.add_all([d1, d2, c1, dce1, dce2]) - self.model.session.flush() - assert c1.populated - assert c1.populated_optimized - - def test_populated_optimized_empty_list_list_ok(self): - c1 = model.DatasetCollection(collection_type="list") - c2 = model.DatasetCollection(collection_type="list:list") - dce1 = model.DatasetCollectionElement( - collection=c2, element=c1, element_identifier="empty_list", element_index=0 - ) - self.model.session.add_all([c1, c2, dce1]) - self.model.session.flush() - assert c1.populated - assert c1.populated_optimized - assert c2.populated - assert c2.populated_optimized - - def test_populated_optimized_list_list_not_populated(self): - c1 = model.DatasetCollection(collection_type="list") - c1.populated_state = False - c2 = model.DatasetCollection(collection_type="list:list") - dce1 = model.DatasetCollectionElement( - collection=c2, element=c1, element_identifier="empty_list", element_index=0 - ) - self.model.session.add_all([c1, c2, dce1]) - self.model.session.flush() - assert not c1.populated - assert not c1.populated_optimized - assert not c2.populated - assert not c2.populated_optimized - - def test_default_disk_usage(self): - u = model.User(email="disk_default@test.com", password="password") - self.persist(u) - u.adjust_total_disk_usage(1, None) - u_id = u.id - self.expunge() - user_reload = self.model.session.get(model.User, u_id) - assert user_reload.disk_usage == 1 - - def test_basic(self): - original_user_count = len(self.model.session.scalars(select(model.User)).all()) - - # Make some changes and commit them - u = model.User(email="james@foo.bar.baz", password="password") - h1 = model.History(name="History 1", user=u) - h2 = model.History(name=("H" * 1024)) - self.persist(u, h1, h2) - metadata = dict(chromCol=1, startCol=2, endCol=3) - d1 = model.HistoryDatasetAssociation( - extension="interval", metadata=metadata, history=h2, create_dataset=True, sa_session=self.model.session - ) - self.persist(d1) - - # Check - users = self.model.session.scalars(select(model.User)).all() - assert len(users) == original_user_count + 1 - user = [user for user in users if user.email == "james@foo.bar.baz"][0] - assert user.email == "james@foo.bar.baz" - assert user.password == "password" - assert len(user.histories) == 1 - assert user.histories[0].name == "History 1" - hists = self.model.session.scalars(select(model.History)).all() - hist0 = [history for history in hists if history.id == h1.id][0] - hist1 = [history for history in hists if history.id == h2.id][0] - assert hist0.name == "History 1" - assert hist1.name == ("H" * 255) - assert hist0.user == user - assert hist1.user is None - assert hist1.datasets[0].metadata.chromCol == 1 - # The filename test has moved to objectstore - # id = hist1.datasets[0].id - # assert hist1.datasets[0].file_name == os.path.join( "/tmp", *directory_hash_id( id ) ) + f"/dataset_{id}.dat" - # Do an update and check - hist1.name = "History 2b" - self.expunge() - hists = self.model.session.scalars(select(model.History)).all() - hist0 = [history for history in hists if history.name == "History 1"][0] - hist1 = [history for history in hists if history.name == "History 2b"][0] - assert hist0.name == "History 1" - assert hist1.name == "History 2b" - # gvk TODO need to ad test for GalaxySessions, but not yet sure what they should look like. - - def test_metadata_spec(self): - metadata = dict(chromCol=1, startCol=2, endCol=3) - d = model.HistoryDatasetAssociation(extension="interval", metadata=metadata, sa_session=self.model.session) - assert d.metadata.chromCol == 1 - assert d.metadata.anyAttribute is None - assert "items" not in d.metadata - - def test_dataset_job_relationship(self): - dataset = model.Dataset() - job = model.Job() - dataset.job = job - self.persist(job, dataset) - loaded_dataset = self.model.session.execute( - select(model.Dataset).filter(model.Dataset.id == dataset.id) - ).scalar_one() - assert loaded_dataset.job_id == job.id - - def test_jobs(self): - u = model.User(email="jobtest@foo.bar.baz", password="password") - job = model.Job() - job.user = u - job.tool_id = "cat1" - - self.persist(u, job) - - loaded_job = self.model.session.scalars(select(model.Job).filter(model.Job.user == u).limit(1)).first() - assert loaded_job.tool_id == "cat1" - - def test_job_metrics(self): - u = model.User(email="jobtest@foo.bar.baz", password="password") - job = model.Job() - job.user = u - job.tool_id = "cat1" - - job.add_metric("gx", "galaxy_slots", 5) - job.add_metric("system", "system_name", "localhost") - - self.persist(u, job) - - task = model.Task(job=job, working_directory="/tmp", prepare_files_cmd="split.sh") - task.add_metric("gx", "galaxy_slots", 5) - task.add_metric("system", "system_name", "localhost") - - big_value = ":".join(f"{i}" for i in range(2000)) - task.add_metric("env", "BIG_PATH", big_value) - self.persist(task) - # Ensure big values truncated - assert len(task.text_metrics[1].metric_value) <= 1023 - - def test_tasks(self): - u = model.User(email="jobtest@foo.bar.baz", password="password") - job = model.Job() - task = model.Task(job=job, working_directory="/tmp", prepare_files_cmd="split.sh") - job.user = u - self.persist(u, job, task) - - loaded_task = self.model.session.scalars(select(model.Task).filter(model.Task.job == job).limit(1)).first() - assert loaded_task.prepare_input_files_cmd == "split.sh" - - def test_history_contents(self): - u = model.User(email="contents@foo.bar.baz", password="password") - # gs = model.GalaxySession() - h1 = model.History(name="HistoryContentsHistory1", user=u) - - self.persist(u, h1, expunge=False) - - d1 = self.new_hda(h1, name="1") - d2 = self.new_hda(h1, name="2", visible=False, object_store_id="foobar") - d3 = self.new_hda(h1, name="3", deleted=True, object_store_id="three_store") - d4 = self.new_hda(h1, name="4", visible=False, deleted=True) - - self.session().flush() - - def contents_iter_names(**kwds): - history = self.model.session.scalars( - select(model.History).filter(model.History.name == "HistoryContentsHistory1").limit(1) - ).first() - return [hda.name for hda in history.contents_iter(**kwds)] - - assert contents_iter_names() == ["1", "2", "3", "4"] - assert contents_iter_names(deleted=False) == ["1", "2"] - assert contents_iter_names(visible=True) == ["1", "3"] - assert contents_iter_names(visible=True, object_store_ids=["three_store"]) == ["3"] - assert contents_iter_names(visible=False) == ["2", "4"] - assert contents_iter_names(deleted=True, visible=False) == ["4"] - assert contents_iter_names(deleted=False, object_store_ids=["foobar"]) == ["2"] - assert contents_iter_names(deleted=False, object_store_ids=["foobar2"]) == [] - - assert contents_iter_names(ids=[d1.id, d2.id, d3.id, d4.id]) == ["1", "2", "3", "4"] - assert contents_iter_names(ids=[d1.id, d2.id, d3.id, d4.id], max_in_filter_length=1) == ["1", "2", "3", "4"] - - assert contents_iter_names(ids=[d1.id, d3.id]) == ["1", "3"] - def test_history_audit(self): u = model.User(email="contents@foo.bar.baz", password="password") h1 = model.History(name="HistoryAuditHistory", user=u) @@ -723,17 +267,6 @@ def _non_empty_flush(self): session.add(lf) session.flush() - def test_current_session(self): - user = model.User(email="testworkflows@bx.psu.edu", password="password") - galaxy_session = model.GalaxySession() - galaxy_session.user = user - self.persist(user, galaxy_session) - assert user.current_galaxy_session == galaxy_session - new_galaxy_session = model.GalaxySession() - user.galaxy_sessions.append(new_galaxy_session) - self.persist(user, new_galaxy_session) - assert user.current_galaxy_session == new_galaxy_session - def test_flush_refreshes(self): # Normally I don't believe in unit testing library code, but the behaviors around attribute # states and flushing in SQL Alchemy is very subtle and it is good to have a executable @@ -1035,31 +568,6 @@ def test_can_manage_private_dataset(self): assert security_agent.can_manage_dataset(u_from.all_roles(), d1.dataset) assert not security_agent.can_manage_dataset(u_other.all_roles(), d1.dataset) - def test_history_hid_counter_is_expired_after_next_hid_call(self): - u = model.User(email="hid_abuser@example.com", password="password") - h = model.History(name="History for hid testing", user=u) - self.persist(u, h) - state = inspect(h) - assert h.hid_counter == 1 - assert "hid_counter" not in state.unloaded - assert "id" not in state.unloaded - - h._next_hid() - - assert "hid_counter" in state.unloaded # this attribute has been expired - assert "id" not in state.unloaded # but other attributes have NOT been expired - assert h.hid_counter == 2 # check this last: this causes thie hid_counter to be reloaded - - def test_next_hid(self): - u = model.User(email="hid_abuser@example.com", password="password") - h = model.History(name="History for hid testing", user=u) - self.persist(u, h) - assert h.hid_counter == 1 - h._next_hid() - assert h.hid_counter == 2 - h._next_hid(n=3) - assert h.hid_counter == 5 - def test_cannot_make_private_objectstore_dataset_public(self): security_agent = GalaxyRBACAgent(self.model) u_from, u_to, _ = self._three_users("cannot_make_private_public")