Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix configargparse in CWL #4618

Merged
merged 15 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions contrib/mypy-stubs/configargparse/configargparse.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,7 @@ class ArgumentParser(argparse.ArgumentParser):
# There may be a better way of type hinting this without a type: ignore, but mypy gets unhappy pretty much no matter what as the signatures for parse_args doesn't match with its superclass in argparse
def parse_args(self, args: Sequence[str] | None = None, namespace: Namespace | None = None, config_file_contents: str | None = None, env_vars: Any=None) -> Namespace: ... # type: ignore[override]

def get_source_to_settings_dict(self) -> OrderedDict[Any, Any]: ...

Namespace = argparse.Namespace
ArgParser = ArgumentParser
51 changes: 25 additions & 26 deletions src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -945,32 +945,31 @@ def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any
"of memory and disk on a node when autoscaling.")

# Parameters to limit service jobs / detect service deadlocks
if not cwl:
service_options = parser.add_argument_group(
title="Toil options for limiting the number of service jobs and detecting service deadlocks",
description="Allows the specification of the maximum number of service jobs in a cluster. By keeping "
"this limited we can avoid nodes occupied with services causing deadlocks."
)
service_options.add_argument("--maxServiceJobs", dest="maxServiceJobs", default=SYS_MAX_SIZE, type=int,
help=f"The maximum number of service jobs that can be run concurrently, "
f"excluding service jobs running on preemptible nodes. "
f"default=%(default)s")
service_options.add_argument("--maxPreemptibleServiceJobs", dest="maxPreemptibleServiceJobs",
default=SYS_MAX_SIZE,
type=int,
help=f"The maximum number of service jobs that can run concurrently on "
f"preemptible nodes. default=%(default)s")
service_options.add_argument("--deadlockWait", dest="deadlockWait", default=60, type=int,
help=f"Time, in seconds, to tolerate the workflow running only the same service "
f"jobs, with no jobs to use them, before declaring the workflow to be "
f"deadlocked and stopping. default=%(default)s")
service_options.add_argument("--deadlockCheckInterval", dest="deadlockCheckInterval", default=30, type=int,
help="Time, in seconds, to wait between checks to see if the workflow is stuck "
"running only service jobs, with no jobs to use them. Should be shorter "
"than --deadlockWait. May need to be increased if the batch system cannot "
"enumerate running jobs quickly enough, or if polling for running jobs is "
"placing an unacceptable load on a shared cluster. "
f"default=%(default)s")
service_options = parser.add_argument_group(
title="Toil options for limiting the number of service jobs and detecting service deadlocks",
description="Allows the specification of the maximum number of service jobs in a cluster. By keeping "
"this limited we can avoid nodes occupied with services causing deadlocks."
)
service_options.add_argument("--maxServiceJobs", dest="maxServiceJobs", default=SYS_MAX_SIZE, type=int,
help=SUPPRESS if cwl else f"The maximum number of service jobs that can be run concurrently, "
f"excluding service jobs running on preemptible nodes. "
f"default=%(default)s")
service_options.add_argument("--maxPreemptibleServiceJobs", dest="maxPreemptibleServiceJobs",
default=SYS_MAX_SIZE,
type=int,
help=SUPPRESS if cwl else f"The maximum number of service jobs that can run concurrently on "
f"preemptible nodes. default=%(default)s")
service_options.add_argument("--deadlockWait", dest="deadlockWait", default=60, type=int,
help=SUPPRESS if cwl else f"Time, in seconds, to tolerate the workflow running only the same service "
f"jobs, with no jobs to use them, before declaring the workflow to be "
f"deadlocked and stopping. default=%(default)s")
service_options.add_argument("--deadlockCheckInterval", dest="deadlockCheckInterval", default=30, type=int,
help=SUPPRESS if cwl else "Time, in seconds, to wait between checks to see if the workflow is stuck "
"running only service jobs, with no jobs to use them. Should be shorter "
"than --deadlockWait. May need to be increased if the batch system cannot "
"enumerate running jobs quickly enough, or if polling for running jobs is "
"placing an unacceptable load on a shared cluster. "
f"default=%(default)s")

# Resource requirements
resource_options = parser.add_argument_group(
Expand Down
58 changes: 35 additions & 23 deletions src/toil/cwl/cwltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import sys
import tempfile
import textwrap
import urllib
import uuid
from threading import Thread
from typing import (
Expand Down Expand Up @@ -104,7 +103,7 @@
from typing_extensions import Literal

from toil.batchSystems.registry import DEFAULT_BATCH_SYSTEM
from toil.common import Config, Toil, addOptions
from toil.common import Toil, addOptions
from toil.cwl import check_cwltool_version
check_cwltool_version()
from toil.cwl.utils import (
Expand Down Expand Up @@ -3251,23 +3250,12 @@ def generate_default_job_store(
]
)


def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
"""Run the main loop for toil-cwl-runner."""
# Remove cwltool logger's stream handler so it uses Toil's
cwllogger.removeHandler(defaultStreamHandler)

if args is None:
args = sys.argv[1:]

config = Config()
config.disableChaining = True
config.cwl = True
parser = ArgParser()
addOptions(parser, jobstore_as_flag=True, cwl=True)
parser.add_argument("cwltool", type=str)
parser.add_argument("cwljob", nargs=argparse.REMAINDER)

def add_cwl_options(parser: argparse.ArgumentParser) -> None:
parser.add_argument("cwltool", type=str, help="CWL file to run.")
parser.add_argument("cwljob", nargs="*", help="Input file or CWL options. If CWL workflow takes an input, "
"the name of the input can be used as an option. "
"For example: \"%(prog)s workflow.cwl --file1 file\". "
"If an input has the same name as a Toil option, pass '--' before it.")
parser.add_argument("--not-strict", action="store_true")
parser.add_argument(
"--enable-dev",
Expand All @@ -3282,7 +3270,7 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
)
parser.add_argument("--quiet", dest="quiet", action="store_true", default=False)
parser.add_argument("--basedir", type=str) # TODO: Might be hard-coded?
parser.add_argument("--outdir", type=str, default=os.getcwd())
parser.add_argument("--outdir", type=str, default=None)
parser.add_argument("--version", action="version", version=baseVersion)
parser.add_argument(
"--log-dir",
Expand Down Expand Up @@ -3574,8 +3562,32 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
type=str,
)

# Parse all the options once.
options = parser.parse_args(args)
def get_options(args: List[str]) -> argparse.Namespace:
"""
Parse given args and properly add non-Toil arguments into the cwljob of the Namespace.
:param args: List of args from command line
:return: options namespace
"""
parser = ArgParser()
addOptions(parser, jobstore_as_flag=True, cwl=True)
add_cwl_options(parser)

options: argparse.Namespace
options, cwl_options = parser.parse_known_args(args)
options.cwljob.extend(cwl_options)

return options


def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
"""Run the main loop for toil-cwl-runner."""
# Remove cwltool logger's stream handler so it uses Toil's
cwllogger.removeHandler(defaultStreamHandler)

if args is None:
args = sys.argv[1:]

options = get_options(args)

# Do cwltool setup
cwltool.main.setup_schema(args=options, custom_schema_callback=None)
Expand Down Expand Up @@ -3627,7 +3639,7 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:

logger.debug(f"Final job store {options.jobStore} and workDir {options.workDir}")

outdir = os.path.abspath(options.outdir)
outdir = os.path.abspath(os.getcwd()) if options.outdir is None else os.path.abspath(options.outdir)
tmp_outdir_prefix = os.path.abspath(options.tmp_outdir_prefix)

fileindex: Dict[str, str] = {}
Expand Down
61 changes: 11 additions & 50 deletions src/toil/test/cwl/cwlTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,17 @@ def tearDown(self):
shutil.rmtree(self.outDir)
unittest.TestCase.tearDown(self)

def test_cwl_cmdline_input(self):
"""
Test that running a CWL workflow with inputs specified on the command line passes.
"""
from toil.cwl import cwltoil
cwlfile = "src/toil/test/cwl/conditional_wf.cwl"
args = [cwlfile, "--message", "str", "--sleep", "2"]
st = StringIO()
# If the workflow runs, it must have had options
cwltoil.main(args, stdout=st)

def _tester(self, cwlfile, jobfile, expect, main_args=[], out_name="output"):
from toil.cwl import cwltoil

Expand Down Expand Up @@ -1232,56 +1243,6 @@ def test_pick_value_with_one_null_value(caplog):
assert "You had a conditional step that did not run, but you did not use pickValue to handle the skipped input." not in line


@needs_cwl
@pytest.mark.cwl_small
def test_usage_message():
"""
This is purely to ensure a (more) helpful error message is printed if a user does
not order their positional args correctly [cwl, cwl-job (json/yml/yaml), jobstore].
"""
toil = "toil-cwl-runner"
cwl = "test/cwl/revsort.cwl"
cwl_job_json = "test/cwl/revsort-job.json"
jobstore = "delete-test-toil"
random_option_1 = "--logInfo"
random_option_2 = "--disableChaining"
cmd_wrong_ordering_1 = [
toil,
cwl,
cwl_job_json,
jobstore,
random_option_1,
random_option_2,
]
cmd_wrong_ordering_2 = [
toil,
cwl,
jobstore,
random_option_1,
random_option_2,
cwl_job_json,
]
cmd_wrong_ordering_3 = [
toil,
jobstore,
random_option_1,
random_option_2,
cwl,
cwl_job_json,
]

for cmd in [cmd_wrong_ordering_1, cmd_wrong_ordering_2, cmd_wrong_ordering_3]:
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
assert (
b"Usage: toil-cwl-runner [options] example.cwl example-job.yaml" in stderr
)
assert (
b"All positional arguments [cwl, yml_or_json] "
b"must always be specified last for toil-cwl-runner." in stderr
)


@needs_cwl
@pytest.mark.cwl_small
def test_workflow_echo_string():
Expand Down