Skip to content

Commit

Permalink
Introspect object stores to find output volumes to mount
Browse files Browse the repository at this point in the history
  • Loading branch information
mvdbeek committed Dec 18, 2023
1 parent 3b52716 commit 809411a
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 8 deletions.
2 changes: 2 additions & 0 deletions lib/galaxy/jobs/runners/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
write_script,
)
from galaxy.model.base import transaction
from galaxy.objectstore import get_disk_paths
from galaxy.tool_util.deps.dependencies import (
JobInfo,
ToolInfo,
Expand Down Expand Up @@ -591,6 +592,7 @@ def _get_metadata_container(
home_directory=None,
job_directory_type=job_directory_type,
job_type="epilog",
output_paths=get_disk_paths(self.app.object_store),
)

return self.app.container_finder.find_container(tool_info, destination_info, job_info)
Expand Down
46 changes: 46 additions & 0 deletions lib/galaxy/objectstore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import abc
import dataclasses
import logging
import os
import random
Expand All @@ -18,6 +19,7 @@
List,
NamedTuple,
Optional,
Set,
Tuple,
Type,
TYPE_CHECKING,
Expand Down Expand Up @@ -303,6 +305,10 @@ def get_concrete_store_by_object_store_id(self, object_store_id: str) -> Optiona
"""If this is a distributed object store, get ConcreteObjectStore by object_store_id."""
return None

@abc.abstractmethod
def get_concrete_store_backends(self) -> List["ConcreteObjectStore"]:
"""Return list of concrete objectstore backends."""

@abc.abstractmethod
def get_store_usage_percent(self):
"""Return the percentage indicating how full the store is."""
Expand Down Expand Up @@ -450,6 +456,9 @@ def update_from_file(self, obj, **kwargs):
def get_object_url(self, obj, **kwargs):
return self._invoke("get_object_url", obj, **kwargs)

def get_concrete_store_backends(self) -> List["ConcreteObjectStore"]:
return self._invoke("get_concrete_store_backends")

def get_concrete_store_name(self, obj):
return self._invoke("get_concrete_store_name", obj)

Expand Down Expand Up @@ -492,6 +501,13 @@ def get_quota_source_map(self):
raise NotImplementedError()


@dataclasses.dataclass
class DiskPath:
file_path: Optional[str] = None
object_store_cache_path: Optional[str] = None
extra_dirs: Optional[Dict[str, str]] = None


class ConcreteObjectStore(BaseObjectStore):
"""Subclass of ObjectStore for stores that don't delegate (non-nested).
Expand Down Expand Up @@ -530,6 +546,12 @@ def __init__(self, config, config_dict=None, **kwargs):
self.quota_enabled = quota_config.get("enabled", DEFAULT_QUOTA_ENABLED)
self.badges = read_badges(config_dict)

def get_concrete_store_backends(self):
return [self]

def get_disk_paths(self) -> DiskPath:
return DiskPath()

def to_dict(self):
rval = super().to_dict()
rval["private"] = self.private
Expand Down Expand Up @@ -661,6 +683,9 @@ def to_dict(self):
as_dict["files_dir"] = self.file_path
return as_dict

def get_disk_paths(self) -> DiskPath:
return DiskPath(file_path=self.file_path, extra_dirs=self.extra_dirs)

def __get_filename(
self, obj, base_dir=None, dir_only=False, extra_dir=None, extra_dir_at_root=False, alt_name=None, obj_dir=False
):
Expand Down Expand Up @@ -971,6 +996,12 @@ def _get_object_url(self, obj, **kwargs):
"""For the first backend that has this `obj`, get its URL."""
return self._call_method("_get_object_url", obj, None, False, **kwargs)

def _get_concrete_store_backends(self, **kwargs):
backends = []
for backend in self.backends.values():
backends.extend(backend.get_concrete_store_backends())
return backends

def _get_concrete_store_name(self, obj):
return self._call_method("_get_concrete_store_name", obj, None, False)

Expand Down Expand Up @@ -1613,3 +1644,18 @@ def persist_extra_files(
create=True,
preserve_symlinks=True,
)


def get_disk_paths(objectstore: BaseObjectStore) -> Set[str]:
backends = objectstore.get_concrete_store_backends()
paths = set()
for backend in backends:
disk_path = backend.get_disk_paths()
if disk_path.file_path:
paths.add(disk_path.file_path)
if disk_path.object_store_cache_path:
paths.add(disk_path.object_store_cache_path)
if disk_path.extra_dirs:
for extra_dir in disk_path.extra_dirs.values():
paths.add(extra_dir)
return paths
8 changes: 7 additions & 1 deletion lib/galaxy/objectstore/azure_blob.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
unlink,
)
from galaxy.util.path import safe_relpath
from . import ConcreteObjectStore
from . import (
ConcreteObjectStore,
DiskPath,
)
from .caching import (
CacheTarget,
enable_cache_monitor,
Expand Down Expand Up @@ -143,6 +146,9 @@ def to_dict(self):
)
return as_dict

def get_disk_paths(self) -> DiskPath:
return DiskPath(object_store_cache_path=self.staging_path)

###################
# Private Methods #
###################
Expand Down
8 changes: 7 additions & 1 deletion lib/galaxy/objectstore/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
umask_fix_perms,
unlink,
)
from . import ConcreteObjectStore
from . import (
ConcreteObjectStore,
DiskPath,
)
from .caching import (
CacheTarget,
enable_cache_monitor,
Expand Down Expand Up @@ -245,6 +248,9 @@ def to_dict(self):
as_dict.update(self._config_to_dict())
return as_dict

def get_disk_paths(self) -> DiskPath:
return DiskPath(object_store_cache_path=self.staging_path)

@property
def cache_target(self) -> CacheTarget:
return CacheTarget(
Expand Down
8 changes: 7 additions & 1 deletion lib/galaxy/objectstore/pithos.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@
umask_fix_perms,
)
from galaxy.util.path import safe_relpath
from . import ConcreteObjectStore
from . import (
ConcreteObjectStore,
DiskPath,
)

NO_KAMAKI_ERROR_MESSAGE = (
"ObjectStore configured, but no kamaki.clients dependency available."
Expand Down Expand Up @@ -123,6 +126,9 @@ def to_dict(self):
as_dict.update(self.config_dict)
return as_dict

def get_disk_paths(self) -> DiskPath:
return DiskPath(object_store_cache_path=self.staging_path)

def _authenticate(self):
auth = self.config_dict["auth"]
url, token = auth["url"], auth["token"]
Expand Down
8 changes: 7 additions & 1 deletion lib/galaxy/objectstore/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
which,
)
from galaxy.util.path import safe_relpath
from . import ConcreteObjectStore
from . import (
ConcreteObjectStore,
DiskPath,
)
from .caching import (
CacheTarget,
enable_cache_monitor,
Expand Down Expand Up @@ -240,6 +243,9 @@ def to_dict(self):
as_dict.update(self._config_to_dict())
return as_dict

def get_disk_paths(self) -> DiskPath:
return DiskPath(object_store_cache_path=self.staging_path, extra_dirs=self.extra_dirs)

@property
def cache_target(self) -> CacheTarget:
return CacheTarget(
Expand Down
14 changes: 11 additions & 3 deletions lib/galaxy/tool_util/deps/container_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ def add_var(name, value):
defaults += ",$tmp_directory:/tmp:rw"
else:
defaults = "$_GALAXY_JOB_TMP_DIR:rw,$TMPDIR:rw,$TMP:rw,$TEMP:rw"
defaults += ",$galaxy_root:default_ro"
defaults += ",$galaxy_root:default_ro,$working_directory:rw"
if self.job_info.tool_directory:
defaults += ",$tool_directory:default_ro"
if self.job_info.job_directory:
Expand All @@ -381,9 +381,17 @@ def add_var(name, value):
if outputs_to_working_directory and self.job_info.job_type == "tool":
# Should need default_file_path (which is of course an estimate given
# object stores anyway).
defaults += ",$working_directory:rw,$default_file_path:default_ro"
outputs_mount_mode = "default_ro"
else:
defaults += ",$working_directory:rw,$default_file_path:rw"
outputs_mount_mode = "rw"
output_paths = None
if self.job_info.output_paths:
output_paths = self.job_info.output_paths
elif self.app_info.default_file_path:
output_paths = {self.app_info.default_file_path}
if output_paths:
outputs_mount = ",".join([f"{p}:{outputs_mount_mode}" for p in output_paths])
defaults += f",{outputs_mount}"

if self.app_info.library_import_dir:
defaults += ",$library_import_dir:default_ro"
Expand Down
4 changes: 3 additions & 1 deletion lib/galaxy/tool_util/deps/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
Dict,
List,
Optional,
Set,
Union,
)

Expand Down Expand Up @@ -96,7 +97,7 @@ def __init__(
home_directory,
job_directory_type: Literal["galaxy", "pulsar"],
job_type: Literal["tool", "prolog", "epilog"] = "tool",
output_paths: Optional[List[str]] = None,
output_paths: Optional[Set[str]] = None,
):
self.working_directory = working_directory
# Tool files may be remote staged - so this is unintuitively a property
Expand All @@ -107,6 +108,7 @@ def __init__(
self.home_directory = home_directory
self.job_directory_type = job_directory_type
self.job_type = job_type
self.output_paths = output_paths


class DependenciesDescription:
Expand Down

0 comments on commit 809411a

Please sign in to comment.