From d6104c61a39d0d568013fe1ebda973f27be76ae2 Mon Sep 17 00:00:00 2001 From: Nicola Soranzo Date: Wed, 18 Oct 2023 17:05:39 +0100 Subject: [PATCH] Add type annotations. Drop unused functions and re-exports. --- lib/tool_shed/util/shed_util_common.py | 117 ++++++------------------- 1 file changed, 27 insertions(+), 90 deletions(-) diff --git a/lib/tool_shed/util/shed_util_common.py b/lib/tool_shed/util/shed_util_common.py index 016183204139..ed4d160effda 100644 --- a/lib/tool_shed/util/shed_util_common.py +++ b/lib/tool_shed/util/shed_util_common.py @@ -3,7 +3,10 @@ import os import socket import string -from typing import TYPE_CHECKING +from typing import ( + List, + TYPE_CHECKING, +) from sqlalchemy import ( false, @@ -14,17 +17,7 @@ from sqlalchemy.orm import scoped_session from galaxy import util -from galaxy.tool_shed.util.shed_util_common import ( - can_eliminate_repository_dependency, - clean_dependency_relationships, - generate_tool_guid, - get_ctx_rev, - get_next_prior_import_or_install_required_dict_entry, - get_tool_panel_config_tool_path_install_dir, - get_user, - have_shed_tool_conf_for_install, - set_image_paths, -) +from galaxy.tool_shed.util.shed_util_common import get_user from galaxy.util import ( checkers, unicodify, @@ -39,6 +32,7 @@ if TYPE_CHECKING: from tool_shed.structured_app import ToolShedApp + from tool_shed.webapp.model import Repository log = logging.getLogger(__name__) @@ -118,16 +112,7 @@ def get_category_by_name(app: "ToolShedApp", name: str): return sa_session.scalars(stmt).first() -def get_repository_categories(app, id): - """Get categories of a repository on the tool shed side from the database via id""" - sa_session = app.model.session - stmt = select(app.model.RepositoryCategoryAssociation).where( - app.model.RepositoryCategoryAssociation.repository_id == app.security.decode_id(id) - ) - return sa_session.scalars(stmt).all() - - -def get_repository_file_contents(app, file_path, repository_id, is_admin=False): +def get_repository_file_contents(app: "ToolShedApp", file_path: str, repository_id: str, is_admin: bool = False) -> str: """Return the display-safe contents of a repository file for display in a browser.""" safe_str = "" if not _is_path_browsable(app, file_path, repository_id, is_admin): @@ -188,7 +173,7 @@ def get_repository_files(folder_path): return contents -def get_repository_from_refresh_on_change(app, **kwd): +def get_repository_from_refresh_on_change(app: "ToolShedApp", **kwd): # The changeset_revision_select_field in several grids performs a refresh_on_change which sends in request parameters like # changeset_revison_1, changeset_revision_2, etc. One of the many select fields on the grid performed the refresh_on_change, # so we loop through all of the received values to see which value is not the repository tip. If we find it, we know the @@ -206,52 +191,14 @@ def get_repository_from_refresh_on_change(app, **kwd): return v, None -def get_repository_type_from_tool_shed(app, tool_shed_url, name, owner): - """ - Send a request to the tool shed to retrieve the type for a repository defined by the - combination of a name and owner. - """ - tool_shed_url = common_util.get_tool_shed_url_from_tool_shed_registry(app, tool_shed_url) - params = dict(name=name, owner=owner) - pathspec = ["repository", "get_repository_type"] - repository_type = util.url_get( - tool_shed_url, auth=app.tool_shed_registry.url_auth(tool_shed_url), pathspec=pathspec, params=params - ) - return repository_type - - -def get_tool_dependency_definition_metadata_from_tool_shed(app, tool_shed_url, name, owner): - """ - Send a request to the tool shed to retrieve the current metadata for a - repository of type tool_dependency_definition defined by the combination - of a name and owner. - """ - tool_shed_url = common_util.get_tool_shed_url_from_tool_shed_registry(app, tool_shed_url) - params = dict(name=name, owner=owner) - pathspec = ["repository", "get_tool_dependency_definition_metadata"] - metadata = util.url_get( - tool_shed_url, auth=app.tool_shed_registry.url_auth(tool_shed_url), pathspec=pathspec, params=params - ) - return metadata - - -def get_tool_path_by_shed_tool_conf_filename(app, shed_tool_conf): - """ - Return the tool_path config setting for the received shed_tool_conf file by searching the tool box's in-memory list of shed_tool_confs for the - dictionary whose config_filename key has a value matching the received shed_tool_conf. - """ - for shed_tool_conf_dict in app.toolbox.dynamic_confs(include_migrated_tool_conf=True): - config_filename = shed_tool_conf_dict["config_filename"] - if config_filename == shed_tool_conf: - return shed_tool_conf_dict["tool_path"] - else: - file_name = basic_util.strip_path(config_filename) - if file_name == shed_tool_conf: - return shed_tool_conf_dict["tool_path"] - return None - - -def handle_email_alerts(app, host, repository, content_alert_str="", new_repo_alert=False, admin_only=False): +def handle_email_alerts( + app: "ToolShedApp", + host: str, + repository: "Repository", + content_alert_str: str = "", + new_repo_alert: bool = False, + admin_only: bool = False, +) -> None: """ There are 2 complementary features that enable a tool shed user to receive email notification: @@ -353,7 +300,7 @@ def handle_email_alerts(app, host, repository, content_alert_str="", new_repo_al log.exception("An error occurred sending a tool shed repository update alert by email.") -def _is_path_browsable(app, path, repository_id, is_admin=False): +def _is_path_browsable(app: "ToolShedApp", path: str, repository_id: str, is_admin: bool = False) -> bool: """ Detects whether the given path is browsable i.e. is within the allowed repository folders. Admins can additionaly browse folders @@ -364,7 +311,7 @@ def _is_path_browsable(app, path, repository_id, is_admin=False): return is_path_within_repo(app, path, repository_id) -def is_path_within_dependency_dir(app, path): +def is_path_within_dependency_dir(app: "ToolShedApp", path: str) -> bool: """ Detect whether the given path is within the tool_dependency_dir folder on the disk. (Specified by the config option). Use to filter malicious symlinks targeting outside paths. @@ -378,7 +325,7 @@ def is_path_within_dependency_dir(app, path): return allowed -def is_path_within_repo(app, path, repository_id): +def is_path_within_repo(app: "ToolShedApp", path: str, repository_id: str) -> bool: """ Detect whether the given path is within the repository folder on the disk. Use to filter malicious symlinks targeting outside paths. @@ -388,7 +335,9 @@ def is_path_within_repo(app, path, repository_id): return os.path.commonprefix([repo_path, resolved_path]) == repo_path -def open_repository_files_folder(app, folder_path, repository_id, is_admin=False): +def open_repository_files_folder( + app: "ToolShedApp", folder_path: str, repository_id: str, is_admin: bool = False +) -> List: """ Return a list of dictionaries, each of which contains information for a file or directory contained within a directory in a repository file hierarchy. @@ -437,33 +386,21 @@ def tool_shed_is_this_tool_shed(toolshed_base_url, trans=None): return cleaned_toolshed_base_url == cleaned_tool_shed +def get_users_with_repo_alert(session: scoped_session, user_model): + stmt = select(user_model).where(user_model.deleted == false()).where(user_model.new_repo_alert == true()) + return session.scalars(stmt) + + __all__ = ( - "can_eliminate_repository_dependency", - "clean_dependency_relationships", "count_repositories_in_category", - "generate_tool_guid", "get_categories", "get_category", "get_category_by_name", - "get_ctx_rev", - "get_next_prior_import_or_install_required_dict_entry", - "get_repository_categories", "get_repository_file_contents", - "get_repository_type_from_tool_shed", - "get_tool_dependency_definition_metadata_from_tool_shed", - "get_tool_panel_config_tool_path_install_dir", - "get_tool_path_by_shed_tool_conf_filename", "get_user", "handle_email_alerts", - "have_shed_tool_conf_for_install", "is_path_within_dependency_dir", "is_path_within_repo", "open_repository_files_folder", - "set_image_paths", "tool_shed_is_this_tool_shed", ) - - -def get_users_with_repo_alert(session: scoped_session, user_model): - stmt = select(user_model).where(user_model.deleted == false()).where(user_model.new_repo_alert == true()) - return session.scalars(stmt)