Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CWL Pipefish compatibility #4636

Merged
merged 24 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
23c7346
Add a bunch of value resolving logging
adamnovak Oct 18, 2023
eb28215
Quiet debugging a bit
adamnovak Oct 18, 2023
f78f3b6
Move default setting for workflows so it works on subworkflows
adamnovak Oct 18, 2023
b21648e
Remember to keep making a ToilFsAccess on the leader
adamnovak Oct 18, 2023
856e2d6
Satisfy MyPy
adamnovak Oct 18, 2023
5ca06b6
Stop giving CWL containers directories full of broken symlinks
adamnovak Oct 18, 2023
385a627
Merge remote-tracking branch 'origin/master' into issues/4633-cwl-pip…
adamnovak Oct 19, 2023
f1bf10c
Update test to expect no symlinks
adamnovak Oct 19, 2023
2e2eff3
Move CWL integration tests for bioconda/biocontainers to integration …
adamnovak Oct 19, 2023
6fd95a3
Wrap mkdtemp to fix #4644
adamnovak Oct 24, 2023
78033df
Sort imports in example scripts
adamnovak Oct 24, 2023
23b1cc7
Use absolute-ized paths for work and coordination directories
adamnovak Oct 24, 2023
716ad3c
Merge branch 'master' into issues/4633-cwl-pipefish-compatibility
adamnovak Oct 26, 2023
d941815
Merge remote-tracking branch 'upstream/master' into issues/4633-cwl-p…
adamnovak Nov 2, 2023
e0131f5
Split CI run conditions into different rules
adamnovak Nov 2, 2023
ddf653b
Run the CWL integration tests on any branch that changes the CWL runner
adamnovak Nov 2, 2023
cb14501
Ship CWL test logs as junit files as unbounded size so we can get the…
adamnovak Nov 2, 2023
a322c92
Fix yaml
adamnovak Nov 2, 2023
ac1c7a4
Add missing keys and close quotes
adamnovak Nov 2, 2023
1f2df97
Revert "cwl: use the latest commit from the proposed CWL v1.2.1 branc…
adamnovak Nov 9, 2023
e3444bc
Run CWL tests if they themselves change
adamnovak Nov 9, 2023
50d6cea
Turn off bioconda/biocontainers tests since they fail a nontrivial am…
adamnovak Nov 9, 2023
f93d1c9
Merge remote-tracking branch 'upstream/master' into issues/4633-cwl-p…
adamnovak Nov 9, 2023
04ce883
Skip differently
adamnovak Nov 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ cwl_v1.2:
- pwd
- ${MAIN_PYTHON_PKG} -m virtualenv venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[cwl,aws]
- python setup_gitlab_docker.py # login to increase the docker.io rate limit
# Run CWL integration tests excluded from cwl_misc
- make test threads="${TEST_THREADS}" tests="src/toil/test/cwl/cwlTest.py -k '(CWLWorkflowTest or cwl_small) and integrative'"
# Run CWL conformance tests
- make test threads="${TEST_THREADS}" tests=src/toil/test/cwl/cwlTest.py::CWLv12Test::test_run_conformance_with_in_place_update

cwl_on_arm:
Expand All @@ -236,7 +239,7 @@ cwl_misc:
- pwd
- ${MAIN_PYTHON_PKG} -m virtualenv venv && . venv/bin/activate && pip install -U pip wheel && make prepare && make develop extras=[cwl,aws]
- python setup_gitlab_docker.py # login to increase the docker.io rate limit
- make test threads="${TEST_THREADS}" tests='src/toil/test/cwl/cwlTest.py -k "CWLWorkflowTest or cwl_small"'
- make test threads="${TEST_THREADS}" tests="src/toil/test/cwl/cwlTest.py -k '(CWLWorkflowTest or cwl_small) and not integrative'"

#cwl_v1.2_kubernetes:
# stage: main_tests
Expand Down
4 changes: 2 additions & 2 deletions src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any
logger.warning(f'Length of workDir path "{workDir}" is {len(workDir)} characters. '
f'Consider setting a shorter path with --workPath or setting TMPDIR to something '
f'like "/tmp" to avoid overly long paths.')
setattr(namespace, self.dest, values)
setattr(namespace, self.dest, workDir)

class CoordinationDirAction(Action):
"""
Expand All @@ -725,7 +725,7 @@ def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any
if not os.path.exists(coordination_dir):
raise RuntimeError(
f"The path provided to --coordinationDir ({coordination_dir}) does not exist.")
setattr(namespace, self.dest, values)
setattr(namespace, self.dest, coordination_dir)

def make_closed_interval_action(min: Union[int, float], max: Optional[Union[int, float]] = None) -> Type[
Action]:
Expand Down
72 changes: 53 additions & 19 deletions src/toil/cwl/cwltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
import json
import logging
import os
import pprint
import shutil
import socket
import stat
import sys
import tempfile
import textwrap
import urllib
import uuid
Expand Down Expand Up @@ -101,6 +101,7 @@
from schema_salad.exceptions import ValidationException
from schema_salad.ref_resolver import file_uri, uri_file_path
from schema_salad.sourceline import SourceLine
from tempfile import NamedTemporaryFile, gettempdir
from typing_extensions import Literal

from toil.batchSystems.registry import DEFAULT_BATCH_SYSTEM
Expand All @@ -120,14 +121,15 @@
from toil.jobStores.abstractJobStore import AbstractJobStore, NoSuchFileException
from toil.jobStores.fileJobStore import FileJobStore
from toil.jobStores.utils import JobStoreUnavailableException, generate_locator
from toil.lib.io import mkdtemp
from toil.lib.threading import ExceptionalThread
from toil.statsAndLogging import DEFAULT_LOGLEVEL
from toil.version import baseVersion

logger = logging.getLogger(__name__)

# Find the default temporary directory
DEFAULT_TMPDIR = tempfile.gettempdir()
DEFAULT_TMPDIR = gettempdir()
# And compose a CWL-style default prefix inside it.
# We used to not put this inside anything and we would drop loads of temp
# directories in the current directory and leave them there.
Expand Down Expand Up @@ -352,16 +354,24 @@ def __init__(

def __repr__(self) -> str:
"""Allow for debug printing."""
try:
return "ResolveSource(" + repr(self.resolve()) + ")"
except Exception:
return (
f"ResolveSource({self.name}, {self.input}, {self.source_key}, "
f"{self.promise_tuples})"
)

parts = [f"source key {self.source_key}"]

if "pickValue" in self.input:
parts.append(f"pick value {self.input['pickValue']} from")

if isinstance(self.promise_tuples, list):
names = [n for n, _ in self.promise_tuples]
parts.append(f"names {names} in promises")
else:
name, _ = self.promise_tuples
parts.append(f"name {name} in promise")

return f"ResolveSource({', '.join(parts)})"

def resolve(self) -> Any:
"""First apply linkMerge then pickValue if either present."""

result: Optional[Any] = None
if isinstance(self.promise_tuples, list):
result = self.link_merge(
Expand All @@ -385,6 +395,7 @@ def link_merge(

:param values: result of step
"""

link_merge_type = self.input.get("linkMerge", "merge_nested")

if link_merge_type == "merge_nested":
Expand Down Expand Up @@ -412,6 +423,7 @@ def pick_value(self, values: Union[List[Union[str, SkipNull]], Any]) -> Any:
without modification.
:return:
"""

pick_value_type = cast(str, self.input.get("pickValue"))

if pick_value_type is None:
Expand All @@ -428,6 +440,7 @@ def pick_value(self, values: Union[List[Union[str, SkipNull]], Any]) -> Any:

if pick_value_type == "first_non_null":
if len(result) < 1:
logger.error("Could not find non-null entry for %s:\n%s", self.name, pprint.pformat(self.promise_tuples))
raise cwl_utils.errors.WorkflowException(
"%s: first_non_null operator found no non-null values" % self.name
)
Expand Down Expand Up @@ -482,6 +495,11 @@ def __init__(
self.req = req
self.container_engine = container_engine

def __repr__(self) -> str:
"""Allow for debug printing."""

return f"StepValueFrom({self.expr}, {self.source}, {self.req}, {self.container_engine})"

def eval_prep(
self, step_inputs: CWLObjectType, file_store: AbstractFileStore
) -> None:
Expand Down Expand Up @@ -554,6 +572,11 @@ def __init__(self, default: Any, source: Any):
self.default = default
self.source = source

def __repr__(self) -> str:
"""Allow for debug printing."""

return f"DefaultWithSource({self.default}, {self.source})"

def resolve(self) -> Any:
"""
Determine the final input value when the time is right.
Expand All @@ -576,6 +599,11 @@ def __init__(self, val: Any):
"""Store the value."""
self.val = val

def __repr__(self) -> str:
"""Allow for debug printing."""

return f"JustAValue({self.val})"

def resolve(self) -> Any:
"""Return the value."""
return self.val
Expand Down Expand Up @@ -1205,7 +1233,8 @@ def _abs(self, path: str) -> str:

logger.debug("ToilFsAccess downloading %s to %s", cache_key, temp_dir)

# Save it all into this new temp directory
# Save it all into this new temp directory.
# Guaranteed to fill it with real files and not symlinks.
download_structure(self.file_store, {}, {}, contents, temp_dir)

# Make sure we use the same temp directory if we go traversing
Expand Down Expand Up @@ -1235,7 +1264,7 @@ def _abs(self, path: str) -> str:
logger.debug(
"ToilFsAccess fetching directory %s from a JobStore", path
)
dest_dir = tempfile.mkdtemp()
dest_dir = mkdtemp()

# Recursively fetch all the files in the directory.
def download_to(url: str, dest: str) -> None:
Expand All @@ -1258,7 +1287,7 @@ def download_to(url: str, dest: str) -> None:
logger.debug("ToilFsAccess fetching file %s from a JobStore", path)
# Try to grab it with a jobstore implementation, and save it
# somewhere arbitrary.
dest_file = tempfile.NamedTemporaryFile(delete=False)
dest_file = NamedTemporaryFile(delete=False)
AbstractJobStore.read_from_url(path, dest_file)
dest_file.close()
self.dir_to_download[path] = dest_file.name
Expand Down Expand Up @@ -2013,7 +2042,7 @@ def _realpath(
"CreateFile",
"CreateWritableFile",
]: # TODO: CreateFile for buckets is not under testing
with tempfile.NamedTemporaryFile() as f:
with NamedTemporaryFile() as f:
# Make a file with the right contents
f.write(file_id_or_contents.encode("utf-8"))
f.close()
Expand Down Expand Up @@ -2307,7 +2336,7 @@ def run(self, file_store: AbstractFileStore) -> Any:
cwllogger.removeHandler(defaultStreamHandler)
cwllogger.setLevel(logger.getEffectiveLevel())

logger.debug("Loaded order: %s", self.cwljob)
logger.debug("Loaded order:\n%s", self.cwljob)

cwljob = resolve_dict_w_promises(self.cwljob, file_store)

Expand Down Expand Up @@ -2824,6 +2853,10 @@ def run(
if self.conditional.is_false(cwljob):
return self.conditional.skipped_outputs()

# Apply default values set in the workflow
fs_access = ToilFsAccess(self.runtime_context.basedir, file_store=file_store)
fill_in_defaults(self.cwlwf.tool["inputs"], cwljob, fs_access)

# `promises` dict
# from: each parameter (workflow input or step output)
# that may be used as a "source" for a step input workflow output
Expand Down Expand Up @@ -2886,6 +2919,8 @@ def run(
get_container_engine(self.runtime_context),
)

logger.debug("Value will come from %s", jobobj.get(key, None))

conditional = Conditional(
expression=step.tool.get("when"),
outputs=step.tool["out"],
Expand Down Expand Up @@ -3587,7 +3622,7 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
workdir = cwltool.utils.create_tmp_dir(options.tmpdir_prefix)
else:
# Use a directory in the default tmpdir
workdir = tempfile.mkdtemp()
workdir = mkdtemp()
# Make sure workdir doesn't exist so it can be a job store
os.rmdir(workdir)

Expand Down Expand Up @@ -3791,10 +3826,8 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
)
raise

# We make a ToilFSAccess to access URLs with, but it has no
# FileStore so it can't do toildir: and toilfile:
fs_access = ToilFsAccess(options.basedir)
fill_in_defaults(tool.tool["inputs"], initialized_job_order, fs_access)
# Leave the defaults un-filled in the top-level order. The tool or
# workflow will fill them when it runs

for inp in tool.tool["inputs"]:
if (
Expand Down Expand Up @@ -3852,6 +3885,7 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:

# Import all the input files, some of which may be missing optional
# files.
fs_access = ToilFsAccess(options.basedir)
import_files(
file_import_function,
fs_access,
Expand Down
9 changes: 7 additions & 2 deletions src/toil/cwl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ def download_structure(
"""
Download nested dictionary from the Toil file store to a local path.

Guaranteed to fill the structure with real files, and not symlinks out of
it to elsewhere.

:param file_store: The Toil file store to download from.

:param index: Maps from downloaded file path back to input Toil URI.
Expand Down Expand Up @@ -173,9 +176,11 @@ def download_structure(
raise RuntimeError(f"Did not find a filestore file at {value}")
logger.debug("Downloading contained file '%s'", name)
dest_path = os.path.join(into_dir, name)
# So download the file into place
# So download the file into place.
# Make sure to get a real copy of the file because we may need to
# mount the directory into a container as a whole.
file_store.readGlobalFile(
FileID.unpack(value[len("toilfile:") :]), dest_path, symlink=True
FileID.unpack(value[len("toilfile:") :]), dest_path, symlink=False
)
# Update the index dicts
# TODO: why?
Expand Down
8 changes: 4 additions & 4 deletions src/toil/fileStores/abstractFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@
# limitations under the License.
import logging
import os
import tempfile
from abc import ABC, abstractmethod
from contextlib import contextmanager
from tempfile import mkstemp
from threading import Event, Semaphore
from typing import (IO,
TYPE_CHECKING,
Expand All @@ -40,7 +40,7 @@
from toil.job import Job, JobDescription
from toil.jobStores.abstractJobStore import AbstractJobStore
from toil.lib.compatibility import deprecated
from toil.lib.io import WriteWatchingStream
from toil.lib.io import WriteWatchingStream, mkdtemp

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -207,7 +207,7 @@ def getLocalTempDir(self) -> str:
to be deleted once the job terminates, removing all files it
contains recursively.
"""
return os.path.abspath(tempfile.mkdtemp(dir=self.localTempDir))
return os.path.abspath(mkdtemp(dir=self.localTempDir))

def getLocalTempFile(self, suffix: Optional[str] = None, prefix: Optional[str] = None) -> str:
"""
Expand All @@ -223,7 +223,7 @@ def getLocalTempFile(self, suffix: Optional[str] = None, prefix: Optional[str] =
for the duration of the job only, and is guaranteed to be deleted
once the job terminates.
"""
handle, tmpFile = tempfile.mkstemp(
handle, tmpFile = mkstemp(
suffix=".tmp" if suffix is None else suffix,
prefix="tmp" if prefix is None else prefix,
dir=self.localTempDir
Expand Down
7 changes: 4 additions & 3 deletions src/toil/fileStores/cachingFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import shutil
import sqlite3
import stat
import tempfile
import threading
import time
from contextlib import contextmanager
from tempfile import mkstemp
from typing import Any, Callable, Generator, Iterator, Optional, Sequence, Tuple

from toil.common import cacheDirName, getDirSizeRecursively, getFileSystemSize
Expand All @@ -36,6 +36,7 @@
from toil.lib.io import (atomic_copy,
atomic_copyobj,
make_public_dir,
mkdtemp,
robust_rmtree)
from toil.lib.retry import ErrorCondition, retry
from toil.lib.threading import get_process_name, process_name_exists
Expand Down Expand Up @@ -600,7 +601,7 @@ def cachingIsFree(self):
emptyID = self.jobStore.getEmptyFileStoreID()

# Read it out to a generated name.
destDir = tempfile.mkdtemp(dir=self.localCacheDir)
destDir = mkdtemp(dir=self.localCacheDir)
cachedFile = os.path.join(destDir, 'sniffLinkCount')
self.jobStore.read_file(emptyID, cachedFile, symlink=False)

Expand Down Expand Up @@ -644,7 +645,7 @@ def _getNewCachingPath(self, fileStoreID):
# sure we can never collide even though we are going to remove the
# file.
# TODO: use a de-slashed version of the ID instead?
handle, path = tempfile.mkstemp(dir=self.localCacheDir, suffix=hasher.hexdigest())
handle, path = mkstemp(dir=self.localCacheDir, suffix=hasher.hexdigest())
os.close(handle)
os.unlink(path)

Expand Down
6 changes: 3 additions & 3 deletions src/toil/jobStores/fileJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import shutil
import stat
import sys
import tempfile
import time
import uuid
from contextlib import contextmanager
Expand All @@ -42,6 +41,7 @@
from toil.lib.io import (AtomicFileCreate,
atomic_copy,
atomic_copyobj,
mkdtemp,
robust_rmtree)

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -147,8 +147,8 @@ def assign_job_id(self, job_description):

# Make a unique temp directory under a directory for this job name,
# possibly sprayed across multiple levels of subdirectories.
absJobDir = tempfile.mkdtemp(prefix=self.JOB_DIR_PREFIX,
dir=self._get_arbitrary_jobs_dir_for_name(usefulFilename))
absJobDir = mkdtemp(prefix=self.JOB_DIR_PREFIX,
dir=self._get_arbitrary_jobs_dir_for_name(usefulFilename))

job_description.jobStoreID = self._get_job_id_from_dir(absJobDir)

Expand Down
Loading