Skip to content

Commit

Permalink
fix setting destination env variables in containerized execution
Browse files Browse the repository at this point in the history
by passing the complete JobDestination to the container. Before
the container only had the information on the params of the
job destination, i.e. `env` was not accessible.

has the additional advantage (IMO) to replace a few of the
ominous `destination_info: Dict[str, Any]` objects where
it was hard to find out what this actually is.
  • Loading branch information
bernt-matthias committed Sep 8, 2023
1 parent 496426f commit 8b5655b
Show file tree
Hide file tree
Showing 6 changed files with 58 additions and 40 deletions.
3 changes: 1 addition & 2 deletions lib/galaxy/jobs/runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -551,8 +551,7 @@ def _find_container(
job_directory_type=job_directory_type,
)

destination_info = job_wrapper.job_destination.params
container = self.app.container_finder.find_container(tool_info, destination_info, job_info)
container = self.app.container_finder.find_container(tool_info, job_wrapper.job_destination, job_info)
if container:
job_wrapper.set_container(container)
return container
Expand Down
11 changes: 7 additions & 4 deletions lib/galaxy/tool_util/deps/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,21 +145,24 @@ def __init__(
plugin_source = self.__build_dependency_resolvers_plugin_source(conf_file)
self.dependency_resolvers = self.__parse_resolver_conf_plugins(plugin_source)
self._enabled_container_types: List[str] = []
self._destination_for_container_type: Dict[str, Dict[str, JobDestination]] = {}
self._destination_for_container_type: Dict[str, List["JobDestination"]] = {}

def set_enabled_container_types(self, container_types_to_destinations):
"""Set the union of all enabled container types."""
self._enabled_container_types = [container_type for container_type in container_types_to_destinations.keys()]
# Just pick first enabled destination for a container type, probably covers the most common deployment scenarios
self._destination_for_container_type = container_types_to_destinations

def get_destination_info_for_container_type(self, container_type, destination_id=None):
def get_destination_info_for_container_type(
self, container_type, destination_id=None
) -> Optional["JobDestination"]:
if destination_id is None:
return next(iter(self._destination_for_container_type[container_type])).params
return next(iter(self._destination_for_container_type[container_type]))
else:
for destination in self._destination_for_container_type[container_type]:
if destination.id == destination_id:
return destination.params
return destination
return None

@property
def enabled_container_types(self):
Expand Down
35 changes: 25 additions & 10 deletions lib/galaxy/tool_util/deps/container_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)

if TYPE_CHECKING:
from galaxy.jobs import JobDestination
from .dependencies import (
AppInfo,
JobInfo,
Expand Down Expand Up @@ -122,23 +123,27 @@ def __init__(
container_id: str,
app_info: "AppInfo",
tool_info: "ToolInfo",
destination_info: Dict[str, Any],
job_destination: Optional["JobDestination"],
job_info: Optional["JobInfo"],
container_description: Optional["ContainerDescription"],
container_name: Optional[str] = None,
) -> None:
self.container_id = container_id
self.app_info = app_info
self.tool_info = tool_info
self.destination_info = destination_info
self.job_destination = job_destination
if job_destination:
self.job_destination_params = job_destination.params
else:
self.job_destination_params = {}
self.job_info = job_info
self.container_description = container_description
self.container_name = container_name or uuid4().hex
self.container_info: Dict[str, Any] = {}

def prop(self, name: str, default: Any) -> Any:
destination_name = f"{self.container_type}_{name}"
return self.destination_info.get(destination_name, default)
return self.job_destination_params.get(destination_name, default)

@property
def resolve_dependencies(self) -> bool:
Expand Down Expand Up @@ -427,21 +432,26 @@ def containerize_command(self, command: str) -> str:
# Allow destinations to explicitly set environment variables just for
# docker container. Better approach is to set for destination and then
# pass through only what tool needs however. (See todo in ToolInfo.)
for key, value in self.destination_info.items():
for key, value in self.job_destination_params.items():
if key.startswith("docker_env_"):
env = key[len("docker_env_") :]
env_directives.append(f'"{env}={value}"')

# set env variables defined by the job destination
if self.job_destination:
for env in self.job_destination.env:
env_directives.append(f'"{env["name"]}={env["value"]}"')

assert self.job_info is not None
working_directory = self.job_info.working_directory
if not working_directory:
raise Exception(f"Cannot containerize command [{working_directory}] without defined working directory.")

volumes_raw = self._expand_volume_str(self.destination_info.get("docker_volumes", "$defaults"))
volumes_raw = self._expand_volume_str(self.job_destination_params.get("docker_volumes", "$defaults"))
preprocessed_volumes_list = preprocess_volumes(volumes_raw, self.container_type)
# TODO: Remove redundant volumes...
volumes = [DockerVolume.from_str(v) for v in preprocessed_volumes_list]
volumes_from = self.destination_info.get("docker_volumes_from", docker_util.DEFAULT_VOLUMES_FROM)
volumes_from = self.job_destination_params.get("docker_volumes_from", docker_util.DEFAULT_VOLUMES_FROM)

docker_host_props = self.docker_host_props

Expand Down Expand Up @@ -499,8 +509,8 @@ def __get_cached_image_file(self) -> Optional[str]:

def __get_destination_overridable_property(self, name: str) -> Any:
prop_name = f"docker_{name}"
if prop_name in self.destination_info:
return self.destination_info[prop_name]
if prop_name in self.job_destination_params:
return self.job_destination_params[prop_name]
else:
return getattr(self.app_info, name)

Expand Down Expand Up @@ -548,17 +558,22 @@ def containerize_command(self, command: str) -> str:
# Allow destinations to explicitly set environment variables just for
# docker container. Better approach is to set for destination and then
# pass through only what tool needs however. (See todo in ToolInfo.)
for key, value in self.destination_info.items():
for key, value in self.job_destination_params.items():
if key.startswith("singularity_env_"):
real_key = key[len("singularity_env_") :]
env.append((real_key, value))

# add env variables from the job destination
if self.job_destination:
for e in self.job_destination.env:
env.append((e["name"], e["value"]))

assert self.job_info is not None
working_directory = self.job_info.working_directory
if not working_directory:
raise Exception(f"Cannot containerize command [{working_directory}] without defined working directory.")

volumes_raw = self._expand_volume_str(self.destination_info.get("singularity_volumes", "$defaults"))
volumes_raw = self._expand_volume_str(self.job_destination_params.get("singularity_volumes", "$defaults"))
preprocessed_volumes_list = preprocess_volumes(volumes_raw, self.container_type)
volumes = [DockerVolume.from_str(v) for v in preprocessed_volumes_list]

Expand Down
6 changes: 3 additions & 3 deletions lib/galaxy/tool_util/deps/container_resolvers/explicit.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,15 @@ def resolve(
image_id = cast(str, container_description.identifier)
cache_path = os.path.normpath(os.path.join(self.cache_directory_path, image_id))
if install and not os.path.exists(cache_path):
destination_info = {}
destination_for_container_type = kwds.get("destination_for_container_type")
job_destination = None
if destination_for_container_type:
destination_info = destination_for_container_type(self.container_type)
job_destination = destination_for_container_type(self.container_type)
container = SingularityContainer(
container_id=container_description.identifier,
app_info=self.app_info,
tool_info=tool_info,
destination_info=destination_info,
job_destination=job_destination,
job_info=None,
container_description=container_description,
)
Expand Down
16 changes: 8 additions & 8 deletions lib/galaxy/tool_util/deps/container_resolvers/mulled.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,17 +640,17 @@ def resolve(
hash_func=self.hash_func,
resolution_cache=resolution_cache,
):
destination_info = {}
job_destination = None
destination_for_container_type = kwds.get("destination_for_container_type")
if destination_for_container_type:
destination_info = destination_for_container_type(self.container_type)
job_destination = destination_for_container_type(self.container_type)
container = CONTAINER_CLASSES[self.container_type](
container_description.identifier,
self.app_info,
tool_info,
destination_info,
None,
container_description,
container_id=container_description.identifier,
app_info=self.app_info,
tool_info=tool_info,
job_destination=job_destination,
job_info=None,
container_description=container_description,
)
self.pull(container)
if not self.auto_install:
Expand Down
27 changes: 14 additions & 13 deletions lib/galaxy/tool_util/deps/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
if TYPE_CHECKING:
from beaker.cache import Cache

from galaxy.jobs import JobDestination
from galaxy.util.plugin_config import PluginConfigSource
from .container_resolvers import ContainerResolver
from .dependencies import (
Expand Down Expand Up @@ -115,9 +116,9 @@ def _container_registry_for_destination(self, destination_info: Dict[str, Any])
return destination_container_registry or self.default_container_registry

def find_container(
self, tool_info: "ToolInfo", destination_info: Dict[str, Any], job_info: "JobInfo"
self, tool_info: "ToolInfo", job_destination: "JobDestination", job_info: "JobInfo"
) -> Optional[Container]:
enabled_container_types = self._enabled_container_types(destination_info)
enabled_container_types = self._enabled_container_types(job_destination.params)

# Short-cut everything else and just skip checks if no container type is enabled.
if not enabled_container_types:
Expand All @@ -142,7 +143,7 @@ def __destination_container(
container_id,
container_type,
tool_info,
destination_info,
job_destination,
job_info,
container_description,
)
Expand All @@ -159,37 +160,37 @@ def container_from_description_from_dicts(
return container
return None

if "container_override" in destination_info:
container = container_from_description_from_dicts(destination_info["container_override"])
if "container_override" in job_destination.params:
container = container_from_description_from_dicts(job_destination.params["container_override"])
if container:
return container

# If destination forcing Galaxy to use a particular container do it,
# this is likely kind of a corner case. For instance if deployers
# do not trust the containers annotated in tools.
for container_type in CONTAINER_CLASSES.keys():
container_id = self.__overridden_container_id(container_type, destination_info)
container_id = self.__overridden_container_id(container_type, job_destination.params)
if container_id:
container = __destination_container(container_type=container_type, container_id=container_id)
if container:
return container

# Otherwise lets see if we can find container for the tool.
container_registry = self._container_registry_for_destination(destination_info)
container_registry = self._container_registry_for_destination(job_destination.params)
container_description = container_registry.find_best_container_description(enabled_container_types, tool_info)
container = __destination_container(container_description)
if container:
return container

# If we still don't have a container, check to see if any container
# types define a default container id and use that.
if "container" in destination_info:
container = container_from_description_from_dicts(destination_info["container"])
if "container" in job_destination.params:
container = container_from_description_from_dicts(job_destination.params["container"])
if container:
return container

for container_type in CONTAINER_CLASSES.keys():
container_id = self.__default_container_id(container_type, destination_info)
container_id = self.__default_container_id(container_type, job_destination.params)
if container_id:
container = __destination_container(container_type=container_type, container_id=container_id)
if container:
Expand Down Expand Up @@ -244,20 +245,20 @@ def __destination_container(
container_id: str,
container_type: str,
tool_info: "ToolInfo",
destination_info: Dict[str, Any],
job_destination: "JobDestination",
job_info: "JobInfo",
container_description: Optional[ContainerDescription] = None,
) -> Optional[Container]:
# TODO: ensure destination_info is dict-like
if not self.__container_type_enabled(container_type, destination_info):
if not self.__container_type_enabled(container_type, job_destination.params):
return None

# TODO: Right now this assumes all containers available when a
# container type is - there should be more thought put into this.
# Checking which are available - settings policies for what can be
# auto-fetched, etc....
return CONTAINER_CLASSES[container_type](
container_id, self.app_info, tool_info, destination_info, job_info, container_description
container_id, self.app_info, tool_info, job_destination, job_info, container_description
)

def __container_type_enabled(self, container_type: str, destination_info: Dict[str, Any]) -> bool:
Expand Down

0 comments on commit 8b5655b

Please sign in to comment.