Skip to content

Commit

Permalink
Take over manual management of CWL job order options and their metava…
Browse files Browse the repository at this point in the history
…rs and help
  • Loading branch information
adamnovak committed Nov 7, 2024
1 parent c5455a3 commit 7b756d8
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 34 deletions.
28 changes: 19 additions & 9 deletions src/toil/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -777,15 +777,25 @@ def check_arguments(typ: str) -> None:

# if cwl is set, format the namespace for cwl and check that wdl options are not set on the command line
if cwl:
parser.add_argument("cwltool", type=str, help="CWL file to run.")
parser.add_argument(
"cwljob",
nargs="*",
help="Input file or CWL options. If CWL workflow takes an input, "
"the name of the input can be used as an option. "
'For example: "%(prog)s workflow.cwl --file1 file". '
"If an input has the same name as a Toil option, pass '--' before it.",
)
# So we can manually write out the help for this and the inputs
# file/workflow options in the argument parser description, we suppress
# help for this option.
parser.add_argument("cwltool", metavar="WORKFLOW", type=str, help=SUPPRESS)
# We also need a "cwljob" command line argument, holding possibly a
# positional input file and possibly a whole string of option flags
# only known to the workflow.
#
# We don't want to try and parse out the positional argument here
# since, on Python 3.12, we can grab what's really supposed to be an
# argument to a workflow-defined option.
#
# We don't want to use the undocumented argparse.REMAINDER, since that
# will eat any Toil-defined option flags after the first positional
# argument.
#
# So we just use parse_known_args and dump all unknown args into it,
# and manually write help text in the argparse description. So don't
# define it here.
check_arguments(typ="cwl")

# if wdl is set, format the namespace for wdl and check that cwl options are not set on the command line
Expand Down
61 changes: 46 additions & 15 deletions src/toil/cwl/cwltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import cwltool.main
import cwltool.resolver
import schema_salad.ref_resolver
# This is also in configargparse but MyPy doesn't know it
from argparse import RawDescriptionHelpFormatter
from configargparse import ArgParser, Namespace
from cwltool.loghandler import _logger as cwllogger
from cwltool.loghandler import defaultStreamHandler
Expand Down Expand Up @@ -3839,17 +3841,22 @@ def generate_default_job_store(

usage_message = "\n\n" + textwrap.dedent(
"""
* All positional arguments [cwl, yml_or_json] must always be specified last for toil-cwl-runner.
Note: If you're trying to specify a jobstore, please use --jobStore.
Usage: toil-cwl-runner [options] example.cwl example-job.yaml
Example: toil-cwl-runner \\
--jobStore aws:us-west-2:jobstore \\
--realTimeLogging \\
--logInfo \\
example.cwl \\
example-job.yaml
"""[
NOTE:
* Workflow arguments must be specified with --option=value syntax.
* If you're trying to specify a jobstore, you must use --jobStore, not a positional argument.
Usage: toil-cwl-runner [options] <workflow> [<input file>] [workflow options]
Example: toil-cwl-runner \\
--jobStore aws:us-west-2:jobstore \\
--realTimeLogging \\
--logInfo \\
example.cwl \\
example-job.yaml \\
--wf_input="hello world"
"""[
1:
]
)
Expand All @@ -3861,11 +3868,34 @@ def get_options(args: list[str]) -> Namespace:
:param args: List of args from command line
:return: options namespace
"""
parser = ArgParser()
# We can't allow abbreviations in case the workflow defines an option that
# is a prefix of a Toil option.
parser = ArgParser(
allow_abbrev=False,
usage="%(prog)s [options] WORKFLOW [INFILE] [WF_OPTIONS...]",
description=textwrap.dedent("""
positional arguments:
WORKFLOW CWL file to run.
INFILE YAML or JSON file of workflow inputs.
WF_OPTIONS Additional inputs to the workflow as command-line
flags. If CWL workflow takes an input, the name of the
input can be used as an option. For example:
%(prog)s workflow.cwl --file1 file
If an input has the same name as a Toil option, pass
'--' before it.
"""),
formatter_class=RawDescriptionHelpFormatter,
)

addOptions(parser, jobstore_as_flag=True, cwl=True)
options: Namespace
options, cwl_options = parser.parse_known_args(args)
options.cwljob.extend(cwl_options)
options, extra = parser.parse_known_args(args)
options.cwljob = extra

return options

Expand All @@ -3879,6 +3909,7 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int:
args = sys.argv[1:]

options = get_options(args)
print(options)

# Take care of incompatible arguments related to file imports
if options.run_imports_on_workers is True and options.import_workers_disk is None:
Expand Down Expand Up @@ -4121,7 +4152,7 @@ def main(args: Optional[list[str]] = None, stdout: TextIO = sys.stdout) -> int:
if err.code == 2: # raised by argparse's parse_args() function
print(
"\nIf both a CWL file and an input object (YAML/JSON) file were "
"provided, this may be the argument order." + usage_message,
"provided, the problem may be the argument order." + usage_message,
file=sys.stderr,
)
raise
Expand Down
29 changes: 19 additions & 10 deletions src/toil/test/utils/utilsTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,10 +378,18 @@ def testMultipleJobsPerWorkerStats(self):
"Some jobs are not represented in the stats.",
)

def check_status(self, status, status_fn, seconds=20):
def check_status(self, status, status_fn, process=None, seconds=20):
time_elapsed = 0.0
has_stopped = process.poll() is not None if process else False
current_status = status_fn(self.toilDir)
while current_status != status:
if has_stopped:
# If the process has stopped and the stratus is wrong, it will never be right.
self.assertEqual(
current_status,
status,
f"Process returned {process.returncode} without status reaching {status}; stuck at {current_status}",
)
logger.debug(
"Workflow is %s; waiting for %s (%s/%s elapsed)",
current_status,
Expand All @@ -391,6 +399,7 @@ def check_status(self, status, status_fn, seconds=20):
)
time.sleep(0.5)
time_elapsed += 0.5
has_stopped = process.poll() is not None if process else False
current_status = status_fn(self.toilDir)
if time_elapsed > seconds:
self.assertEqual(
Expand All @@ -402,14 +411,14 @@ def check_status(self, status, status_fn, seconds=20):
def testGetPIDStatus(self):
"""Test that ToilStatus.getPIDStatus() behaves as expected."""
wf = subprocess.Popen(self.sort_workflow_cmd)
self.check_status("RUNNING", status_fn=ToilStatus.getPIDStatus, seconds=60)
self.check_status("RUNNING", status_fn=ToilStatus.getPIDStatus, process=wf, seconds=60)
wf.wait()
self.check_status("COMPLETED", status_fn=ToilStatus.getPIDStatus, seconds=60)
self.check_status("COMPLETED", status_fn=ToilStatus.getPIDStatus, process=wf, seconds=60)

# TODO: we need to reach into the FileJobStore's files and delete this
# shared file. We assume we know its internal layout.
os.remove(os.path.join(self.toilDir, "files/shared/pid.log"))
self.check_status("QUEUED", status_fn=ToilStatus.getPIDStatus, seconds=60)
self.check_status("QUEUED", status_fn=ToilStatus.getPIDStatus, process=wf, seconds=60)

def testGetStatusFailedToilWF(self):
"""
Expand All @@ -419,9 +428,9 @@ def testGetStatusFailedToilWF(self):
"""
# --badWorker is set to force failure.
wf = subprocess.Popen(self.sort_workflow_cmd + ["--badWorker=1"])
self.check_status("RUNNING", status_fn=ToilStatus.getStatus, seconds=60)
self.check_status("RUNNING", status_fn=ToilStatus.getStatus, process=wf, seconds=60)
wf.wait()
self.check_status("ERROR", status_fn=ToilStatus.getStatus, seconds=60)
self.check_status("ERROR", status_fn=ToilStatus.getStatus, process=wf, seconds=60)

@needs_cwl
@needs_docker
Expand All @@ -443,9 +452,9 @@ def testGetStatusFailedCWLWF(self):
]
logger.info("Run command: %s", " ".join(cmd))
wf = subprocess.Popen(cmd)
self.check_status("RUNNING", status_fn=ToilStatus.getStatus, seconds=60)
self.check_status("RUNNING", status_fn=ToilStatus.getStatus, process=wf, seconds=60)
wf.wait()
self.check_status("ERROR", status_fn=ToilStatus.getStatus, seconds=60)
self.check_status("ERROR", status_fn=ToilStatus.getStatus, process=wf, seconds=60)

@needs_cwl
@needs_docker
Expand All @@ -463,9 +472,9 @@ def testGetStatusSuccessfulCWLWF(self):
f"--outdir={self.tempDir}",
]
wf = subprocess.Popen(cmd)
self.check_status("RUNNING", status_fn=ToilStatus.getStatus, seconds=60)
self.check_status("RUNNING", status_fn=ToilStatus.getStatus, process=wf, seconds=60)
wf.wait()
self.check_status("COMPLETED", status_fn=ToilStatus.getStatus, seconds=60)
self.check_status("COMPLETED", status_fn=ToilStatus.getStatus, process=wf, seconds=60)

@needs_cwl
@patch("builtins.print")
Expand Down

0 comments on commit 7b756d8

Please sign in to comment.