Skip to content

Commit

Permalink
Fix configargparse in CWL (#4618)
Browse files Browse the repository at this point in the history
* Parse config file separately from rest of args

* Mypy

* update configargparse stub

* Dont try to eat cwl arguments

* Use simpler workaround

* Revert to just CWL

* Change REMAINDER to "*", add help statements and test command line inputs

* Remove extradockergroup name

* Declare type

* Add proper relative path to cwl file

* Remove unnecessary test

---------

Co-authored-by: Adam Novak <[email protected]>
  • Loading branch information
stxue1 and adamnovak authored Oct 26, 2023
1 parent 15763af commit ecebc07
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 99 deletions.
2 changes: 2 additions & 0 deletions contrib/mypy-stubs/configargparse/configargparse.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,7 @@ class ArgumentParser(argparse.ArgumentParser):
# There may be a better way of type hinting this without a type: ignore, but mypy gets unhappy pretty much no matter what as the signatures for parse_args doesn't match with its superclass in argparse
def parse_args(self, args: Sequence[str] | None = None, namespace: Namespace | None = None, config_file_contents: str | None = None, env_vars: Any=None) -> Namespace: ... # type: ignore[override]

def get_source_to_settings_dict(self) -> OrderedDict[Any, Any]: ...

Namespace = argparse.Namespace
ArgParser = ArgumentParser
51 changes: 25 additions & 26 deletions src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -945,32 +945,31 @@ def __call__(self, parser: Any, namespace: Any, values: Any, option_string: Any
"of memory and disk on a node when autoscaling.")

# Parameters to limit service jobs / detect service deadlocks
if not cwl:
service_options = parser.add_argument_group(
title="Toil options for limiting the number of service jobs and detecting service deadlocks",
description="Allows the specification of the maximum number of service jobs in a cluster. By keeping "
"this limited we can avoid nodes occupied with services causing deadlocks."
)
service_options.add_argument("--maxServiceJobs", dest="maxServiceJobs", default=SYS_MAX_SIZE, type=int,
help=f"The maximum number of service jobs that can be run concurrently, "
f"excluding service jobs running on preemptible nodes. "
f"default=%(default)s")
service_options.add_argument("--maxPreemptibleServiceJobs", dest="maxPreemptibleServiceJobs",
default=SYS_MAX_SIZE,
type=int,
help=f"The maximum number of service jobs that can run concurrently on "
f"preemptible nodes. default=%(default)s")
service_options.add_argument("--deadlockWait", dest="deadlockWait", default=60, type=int,
help=f"Time, in seconds, to tolerate the workflow running only the same service "
f"jobs, with no jobs to use them, before declaring the workflow to be "
f"deadlocked and stopping. default=%(default)s")
service_options.add_argument("--deadlockCheckInterval", dest="deadlockCheckInterval", default=30, type=int,
help="Time, in seconds, to wait between checks to see if the workflow is stuck "
"running only service jobs, with no jobs to use them. Should be shorter "
"than --deadlockWait. May need to be increased if the batch system cannot "
"enumerate running jobs quickly enough, or if polling for running jobs is "
"placing an unacceptable load on a shared cluster. "
f"default=%(default)s")
service_options = parser.add_argument_group(
title="Toil options for limiting the number of service jobs and detecting service deadlocks",
description="Allows the specification of the maximum number of service jobs in a cluster. By keeping "
"this limited we can avoid nodes occupied with services causing deadlocks."
)
service_options.add_argument("--maxServiceJobs", dest="maxServiceJobs", default=SYS_MAX_SIZE, type=int,
help=SUPPRESS if cwl else f"The maximum number of service jobs that can be run concurrently, "
f"excluding service jobs running on preemptible nodes. "
f"default=%(default)s")
service_options.add_argument("--maxPreemptibleServiceJobs", dest="maxPreemptibleServiceJobs",
default=SYS_MAX_SIZE,
type=int,
help=SUPPRESS if cwl else f"The maximum number of service jobs that can run concurrently on "
f"preemptible nodes. default=%(default)s")
service_options.add_argument("--deadlockWait", dest="deadlockWait", default=60, type=int,
help=SUPPRESS if cwl else f"Time, in seconds, to tolerate the workflow running only the same service "
f"jobs, with no jobs to use them, before declaring the workflow to be "
f"deadlocked and stopping. default=%(default)s")
service_options.add_argument("--deadlockCheckInterval", dest="deadlockCheckInterval", default=30, type=int,
help=SUPPRESS if cwl else "Time, in seconds, to wait between checks to see if the workflow is stuck "
"running only service jobs, with no jobs to use them. Should be shorter "
"than --deadlockWait. May need to be increased if the batch system cannot "
"enumerate running jobs quickly enough, or if polling for running jobs is "
"placing an unacceptable load on a shared cluster. "
f"default=%(default)s")

# Resource requirements
resource_options = parser.add_argument_group(
Expand Down
58 changes: 35 additions & 23 deletions src/toil/cwl/cwltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import sys
import tempfile
import textwrap
import urllib
import uuid
from threading import Thread
from typing import (
Expand Down Expand Up @@ -104,7 +103,7 @@
from typing_extensions import Literal

from toil.batchSystems.registry import DEFAULT_BATCH_SYSTEM
from toil.common import Config, Toil, addOptions
from toil.common import Toil, addOptions
from toil.cwl import check_cwltool_version
check_cwltool_version()
from toil.cwl.utils import (
Expand Down Expand Up @@ -3251,23 +3250,12 @@ def generate_default_job_store(
]
)


def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
"""Run the main loop for toil-cwl-runner."""
# Remove cwltool logger's stream handler so it uses Toil's
cwllogger.removeHandler(defaultStreamHandler)

if args is None:
args = sys.argv[1:]

config = Config()
config.disableChaining = True
config.cwl = True
parser = ArgParser()
addOptions(parser, jobstore_as_flag=True, cwl=True)
parser.add_argument("cwltool", type=str)
parser.add_argument("cwljob", nargs=argparse.REMAINDER)

def add_cwl_options(parser: argparse.ArgumentParser) -> None:
parser.add_argument("cwltool", type=str, help="CWL file to run.")
parser.add_argument("cwljob", nargs="*", help="Input file or CWL options. If CWL workflow takes an input, "
"the name of the input can be used as an option. "
"For example: \"%(prog)s workflow.cwl --file1 file\". "
"If an input has the same name as a Toil option, pass '--' before it.")
parser.add_argument("--not-strict", action="store_true")
parser.add_argument(
"--enable-dev",
Expand All @@ -3282,7 +3270,7 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
)
parser.add_argument("--quiet", dest="quiet", action="store_true", default=False)
parser.add_argument("--basedir", type=str) # TODO: Might be hard-coded?
parser.add_argument("--outdir", type=str, default=os.getcwd())
parser.add_argument("--outdir", type=str, default=None)
parser.add_argument("--version", action="version", version=baseVersion)
parser.add_argument(
"--log-dir",
Expand Down Expand Up @@ -3574,8 +3562,32 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
type=str,
)

# Parse all the options once.
options = parser.parse_args(args)
def get_options(args: List[str]) -> argparse.Namespace:
"""
Parse given args and properly add non-Toil arguments into the cwljob of the Namespace.
:param args: List of args from command line
:return: options namespace
"""
parser = ArgParser()
addOptions(parser, jobstore_as_flag=True, cwl=True)
add_cwl_options(parser)

options: argparse.Namespace
options, cwl_options = parser.parse_known_args(args)
options.cwljob.extend(cwl_options)

return options


def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
"""Run the main loop for toil-cwl-runner."""
# Remove cwltool logger's stream handler so it uses Toil's
cwllogger.removeHandler(defaultStreamHandler)

if args is None:
args = sys.argv[1:]

options = get_options(args)

# Do cwltool setup
cwltool.main.setup_schema(args=options, custom_schema_callback=None)
Expand Down Expand Up @@ -3627,7 +3639,7 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:

logger.debug(f"Final job store {options.jobStore} and workDir {options.workDir}")

outdir = os.path.abspath(options.outdir)
outdir = os.path.abspath(os.getcwd()) if options.outdir is None else os.path.abspath(options.outdir)
tmp_outdir_prefix = os.path.abspath(options.tmp_outdir_prefix)

fileindex: Dict[str, str] = {}
Expand Down
61 changes: 11 additions & 50 deletions src/toil/test/cwl/cwlTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,17 @@ def tearDown(self):
shutil.rmtree(self.outDir)
unittest.TestCase.tearDown(self)

def test_cwl_cmdline_input(self):
"""
Test that running a CWL workflow with inputs specified on the command line passes.
"""
from toil.cwl import cwltoil
cwlfile = "src/toil/test/cwl/conditional_wf.cwl"
args = [cwlfile, "--message", "str", "--sleep", "2"]
st = StringIO()
# If the workflow runs, it must have had options
cwltoil.main(args, stdout=st)

def _tester(self, cwlfile, jobfile, expect, main_args=[], out_name="output"):
from toil.cwl import cwltoil

Expand Down Expand Up @@ -1232,56 +1243,6 @@ def test_pick_value_with_one_null_value(caplog):
assert "You had a conditional step that did not run, but you did not use pickValue to handle the skipped input." not in line


@needs_cwl
@pytest.mark.cwl_small
def test_usage_message():
"""
This is purely to ensure a (more) helpful error message is printed if a user does
not order their positional args correctly [cwl, cwl-job (json/yml/yaml), jobstore].
"""
toil = "toil-cwl-runner"
cwl = "test/cwl/revsort.cwl"
cwl_job_json = "test/cwl/revsort-job.json"
jobstore = "delete-test-toil"
random_option_1 = "--logInfo"
random_option_2 = "--disableChaining"
cmd_wrong_ordering_1 = [
toil,
cwl,
cwl_job_json,
jobstore,
random_option_1,
random_option_2,
]
cmd_wrong_ordering_2 = [
toil,
cwl,
jobstore,
random_option_1,
random_option_2,
cwl_job_json,
]
cmd_wrong_ordering_3 = [
toil,
jobstore,
random_option_1,
random_option_2,
cwl,
cwl_job_json,
]

for cmd in [cmd_wrong_ordering_1, cmd_wrong_ordering_2, cmd_wrong_ordering_3]:
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
assert (
b"Usage: toil-cwl-runner [options] example.cwl example-job.yaml" in stderr
)
assert (
b"All positional arguments [cwl, yml_or_json] "
b"must always be specified last for toil-cwl-runner." in stderr
)


@needs_cwl
@pytest.mark.cwl_small
def test_workflow_echo_string():
Expand Down

0 comments on commit ecebc07

Please sign in to comment.