From 8b5655bb900d5661b7e31b1929b78b0a8329992c Mon Sep 17 00:00:00 2001 From: Matthias Bernt Date: Fri, 8 Sep 2023 17:14:09 +0200 Subject: [PATCH] fix setting destination env variables in containerized execution by passing the complete JobDestination to the container. Before the container only had the information on the params of the job destination, i.e. `env` was not accessible. has the additional advantage (IMO) to replace a few of the ominous `destination_info: Dict[str, Any]` objects where it was hard to find out what this actually is. --- lib/galaxy/jobs/runners/__init__.py | 3 +- lib/galaxy/tool_util/deps/__init__.py | 11 +++--- .../tool_util/deps/container_classes.py | 35 +++++++++++++------ .../deps/container_resolvers/explicit.py | 6 ++-- .../deps/container_resolvers/mulled.py | 16 ++++----- lib/galaxy/tool_util/deps/containers.py | 27 +++++++------- 6 files changed, 58 insertions(+), 40 deletions(-) diff --git a/lib/galaxy/jobs/runners/__init__.py b/lib/galaxy/jobs/runners/__init__.py index 6c95de6312b3..8a441692fa3e 100644 --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -551,8 +551,7 @@ def _find_container( job_directory_type=job_directory_type, ) - destination_info = job_wrapper.job_destination.params - container = self.app.container_finder.find_container(tool_info, destination_info, job_info) + container = self.app.container_finder.find_container(tool_info, job_wrapper.job_destination, job_info) if container: job_wrapper.set_container(container) return container diff --git a/lib/galaxy/tool_util/deps/__init__.py b/lib/galaxy/tool_util/deps/__init__.py index 2dc75c3dfac3..ff0693ef235c 100644 --- a/lib/galaxy/tool_util/deps/__init__.py +++ b/lib/galaxy/tool_util/deps/__init__.py @@ -145,7 +145,7 @@ def __init__( plugin_source = self.__build_dependency_resolvers_plugin_source(conf_file) self.dependency_resolvers = self.__parse_resolver_conf_plugins(plugin_source) self._enabled_container_types: List[str] = [] - self._destination_for_container_type: Dict[str, Dict[str, JobDestination]] = {} + self._destination_for_container_type: Dict[str, List["JobDestination"]] = {} def set_enabled_container_types(self, container_types_to_destinations): """Set the union of all enabled container types.""" @@ -153,13 +153,16 @@ def set_enabled_container_types(self, container_types_to_destinations): # Just pick first enabled destination for a container type, probably covers the most common deployment scenarios self._destination_for_container_type = container_types_to_destinations - def get_destination_info_for_container_type(self, container_type, destination_id=None): + def get_destination_info_for_container_type( + self, container_type, destination_id=None + ) -> Optional["JobDestination"]: if destination_id is None: - return next(iter(self._destination_for_container_type[container_type])).params + return next(iter(self._destination_for_container_type[container_type])) else: for destination in self._destination_for_container_type[container_type]: if destination.id == destination_id: - return destination.params + return destination + return None @property def enabled_container_types(self): diff --git a/lib/galaxy/tool_util/deps/container_classes.py b/lib/galaxy/tool_util/deps/container_classes.py index e89e55ab6b1a..a81b751c99ba 100644 --- a/lib/galaxy/tool_util/deps/container_classes.py +++ b/lib/galaxy/tool_util/deps/container_classes.py @@ -33,6 +33,7 @@ ) if TYPE_CHECKING: + from galaxy.jobs import JobDestination from .dependencies import ( AppInfo, JobInfo, @@ -122,7 +123,7 @@ def __init__( container_id: str, app_info: "AppInfo", tool_info: "ToolInfo", - destination_info: Dict[str, Any], + job_destination: Optional["JobDestination"], job_info: Optional["JobInfo"], container_description: Optional["ContainerDescription"], container_name: Optional[str] = None, @@ -130,7 +131,11 @@ def __init__( self.container_id = container_id self.app_info = app_info self.tool_info = tool_info - self.destination_info = destination_info + self.job_destination = job_destination + if job_destination: + self.job_destination_params = job_destination.params + else: + self.job_destination_params = {} self.job_info = job_info self.container_description = container_description self.container_name = container_name or uuid4().hex @@ -138,7 +143,7 @@ def __init__( def prop(self, name: str, default: Any) -> Any: destination_name = f"{self.container_type}_{name}" - return self.destination_info.get(destination_name, default) + return self.job_destination_params.get(destination_name, default) @property def resolve_dependencies(self) -> bool: @@ -427,21 +432,26 @@ def containerize_command(self, command: str) -> str: # Allow destinations to explicitly set environment variables just for # docker container. Better approach is to set for destination and then # pass through only what tool needs however. (See todo in ToolInfo.) - for key, value in self.destination_info.items(): + for key, value in self.job_destination_params.items(): if key.startswith("docker_env_"): env = key[len("docker_env_") :] env_directives.append(f'"{env}={value}"') + # set env variables defined by the job destination + if self.job_destination: + for env in self.job_destination.env: + env_directives.append(f'"{env["name"]}={env["value"]}"') + assert self.job_info is not None working_directory = self.job_info.working_directory if not working_directory: raise Exception(f"Cannot containerize command [{working_directory}] without defined working directory.") - volumes_raw = self._expand_volume_str(self.destination_info.get("docker_volumes", "$defaults")) + volumes_raw = self._expand_volume_str(self.job_destination_params.get("docker_volumes", "$defaults")) preprocessed_volumes_list = preprocess_volumes(volumes_raw, self.container_type) # TODO: Remove redundant volumes... volumes = [DockerVolume.from_str(v) for v in preprocessed_volumes_list] - volumes_from = self.destination_info.get("docker_volumes_from", docker_util.DEFAULT_VOLUMES_FROM) + volumes_from = self.job_destination_params.get("docker_volumes_from", docker_util.DEFAULT_VOLUMES_FROM) docker_host_props = self.docker_host_props @@ -499,8 +509,8 @@ def __get_cached_image_file(self) -> Optional[str]: def __get_destination_overridable_property(self, name: str) -> Any: prop_name = f"docker_{name}" - if prop_name in self.destination_info: - return self.destination_info[prop_name] + if prop_name in self.job_destination_params: + return self.job_destination_params[prop_name] else: return getattr(self.app_info, name) @@ -548,17 +558,22 @@ def containerize_command(self, command: str) -> str: # Allow destinations to explicitly set environment variables just for # docker container. Better approach is to set for destination and then # pass through only what tool needs however. (See todo in ToolInfo.) - for key, value in self.destination_info.items(): + for key, value in self.job_destination_params.items(): if key.startswith("singularity_env_"): real_key = key[len("singularity_env_") :] env.append((real_key, value)) + # add env variables from the job destination + if self.job_destination: + for e in self.job_destination.env: + env.append((e["name"], e["value"])) + assert self.job_info is not None working_directory = self.job_info.working_directory if not working_directory: raise Exception(f"Cannot containerize command [{working_directory}] without defined working directory.") - volumes_raw = self._expand_volume_str(self.destination_info.get("singularity_volumes", "$defaults")) + volumes_raw = self._expand_volume_str(self.job_destination_params.get("singularity_volumes", "$defaults")) preprocessed_volumes_list = preprocess_volumes(volumes_raw, self.container_type) volumes = [DockerVolume.from_str(v) for v in preprocessed_volumes_list] diff --git a/lib/galaxy/tool_util/deps/container_resolvers/explicit.py b/lib/galaxy/tool_util/deps/container_resolvers/explicit.py index c85359827ec3..ef537d8cf551 100644 --- a/lib/galaxy/tool_util/deps/container_resolvers/explicit.py +++ b/lib/galaxy/tool_util/deps/container_resolvers/explicit.py @@ -108,15 +108,15 @@ def resolve( image_id = cast(str, container_description.identifier) cache_path = os.path.normpath(os.path.join(self.cache_directory_path, image_id)) if install and not os.path.exists(cache_path): - destination_info = {} destination_for_container_type = kwds.get("destination_for_container_type") + job_destination = None if destination_for_container_type: - destination_info = destination_for_container_type(self.container_type) + job_destination = destination_for_container_type(self.container_type) container = SingularityContainer( container_id=container_description.identifier, app_info=self.app_info, tool_info=tool_info, - destination_info=destination_info, + job_destination=job_destination, job_info=None, container_description=container_description, ) diff --git a/lib/galaxy/tool_util/deps/container_resolvers/mulled.py b/lib/galaxy/tool_util/deps/container_resolvers/mulled.py index 98aca03bd1d4..beef43376433 100644 --- a/lib/galaxy/tool_util/deps/container_resolvers/mulled.py +++ b/lib/galaxy/tool_util/deps/container_resolvers/mulled.py @@ -640,17 +640,17 @@ def resolve( hash_func=self.hash_func, resolution_cache=resolution_cache, ): - destination_info = {} + job_destination = None destination_for_container_type = kwds.get("destination_for_container_type") if destination_for_container_type: - destination_info = destination_for_container_type(self.container_type) + job_destination = destination_for_container_type(self.container_type) container = CONTAINER_CLASSES[self.container_type]( - container_description.identifier, - self.app_info, - tool_info, - destination_info, - None, - container_description, + container_id=container_description.identifier, + app_info=self.app_info, + tool_info=tool_info, + job_destination=job_destination, + job_info=None, + container_description=container_description, ) self.pull(container) if not self.auto_install: diff --git a/lib/galaxy/tool_util/deps/containers.py b/lib/galaxy/tool_util/deps/containers.py index f51948169190..b867e4a99a0a 100644 --- a/lib/galaxy/tool_util/deps/containers.py +++ b/lib/galaxy/tool_util/deps/containers.py @@ -42,6 +42,7 @@ if TYPE_CHECKING: from beaker.cache import Cache + from galaxy.jobs import JobDestination from galaxy.util.plugin_config import PluginConfigSource from .container_resolvers import ContainerResolver from .dependencies import ( @@ -115,9 +116,9 @@ def _container_registry_for_destination(self, destination_info: Dict[str, Any]) return destination_container_registry or self.default_container_registry def find_container( - self, tool_info: "ToolInfo", destination_info: Dict[str, Any], job_info: "JobInfo" + self, tool_info: "ToolInfo", job_destination: "JobDestination", job_info: "JobInfo" ) -> Optional[Container]: - enabled_container_types = self._enabled_container_types(destination_info) + enabled_container_types = self._enabled_container_types(job_destination.params) # Short-cut everything else and just skip checks if no container type is enabled. if not enabled_container_types: @@ -142,7 +143,7 @@ def __destination_container( container_id, container_type, tool_info, - destination_info, + job_destination, job_info, container_description, ) @@ -159,8 +160,8 @@ def container_from_description_from_dicts( return container return None - if "container_override" in destination_info: - container = container_from_description_from_dicts(destination_info["container_override"]) + if "container_override" in job_destination.params: + container = container_from_description_from_dicts(job_destination.params["container_override"]) if container: return container @@ -168,14 +169,14 @@ def container_from_description_from_dicts( # this is likely kind of a corner case. For instance if deployers # do not trust the containers annotated in tools. for container_type in CONTAINER_CLASSES.keys(): - container_id = self.__overridden_container_id(container_type, destination_info) + container_id = self.__overridden_container_id(container_type, job_destination.params) if container_id: container = __destination_container(container_type=container_type, container_id=container_id) if container: return container # Otherwise lets see if we can find container for the tool. - container_registry = self._container_registry_for_destination(destination_info) + container_registry = self._container_registry_for_destination(job_destination.params) container_description = container_registry.find_best_container_description(enabled_container_types, tool_info) container = __destination_container(container_description) if container: @@ -183,13 +184,13 @@ def container_from_description_from_dicts( # If we still don't have a container, check to see if any container # types define a default container id and use that. - if "container" in destination_info: - container = container_from_description_from_dicts(destination_info["container"]) + if "container" in job_destination.params: + container = container_from_description_from_dicts(job_destination.params["container"]) if container: return container for container_type in CONTAINER_CLASSES.keys(): - container_id = self.__default_container_id(container_type, destination_info) + container_id = self.__default_container_id(container_type, job_destination.params) if container_id: container = __destination_container(container_type=container_type, container_id=container_id) if container: @@ -244,12 +245,12 @@ def __destination_container( container_id: str, container_type: str, tool_info: "ToolInfo", - destination_info: Dict[str, Any], + job_destination: "JobDestination", job_info: "JobInfo", container_description: Optional[ContainerDescription] = None, ) -> Optional[Container]: # TODO: ensure destination_info is dict-like - if not self.__container_type_enabled(container_type, destination_info): + if not self.__container_type_enabled(container_type, job_destination.params): return None # TODO: Right now this assumes all containers available when a @@ -257,7 +258,7 @@ def __destination_container( # Checking which are available - settings policies for what can be # auto-fetched, etc.... return CONTAINER_CLASSES[container_type]( - container_id, self.app_info, tool_info, destination_info, job_info, container_description + container_id, self.app_info, tool_info, job_destination, job_info, container_description ) def __container_type_enabled(self, container_type: str, destination_info: Dict[str, Any]) -> bool: