-
Notifications
You must be signed in to change notification settings - Fork 1
/
asa_submitter.py
executable file
·250 lines (221 loc) · 9.09 KB
/
asa_submitter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
#! /usr/bin/env python3
"""
SBATCH job submitter for automated subset analysis
Greg Conan: [email protected]
Created 2020-01-03
Updated 2021-05-03
"""
##################################
#
# Script to submit many automated_subset_analysis.py SBATCH jobs for parallel
# processing on the Exacloud server
#
##################################
import argparse
import os
import subprocess
import sys
import time
# Ensure that this script can find its local imports if parallel processing
if "--parallel" in sys.argv:
sys.path.append(os.path.abspath(sys.argv[sys.argv.index("--parallel")
+ 1]))
# Local custom imports
from automated_subset_analysis import validate_cli_args
from src.conan_tools import *
# Constants: Demographics and job argument names, automated_subset_analysis dir
ASA = "automated_subset_analysis.py"
GP_DEMO_FILE = "group_{}_demo_file"
JOB_SHORTNAME = "automate"
PWD = sys.path[-1] if '--parallel' in sys.argv else get_pwd()
def main():
# Store and print the date and time when this script started running
starting_timestamp = get_and_print_timestamp_when(sys.argv[0], "started")
# Get and validate all command-line arguments from user
cli_args = get_submitter_cli_args(
"Script to run many instances of {} in parallel.".format(ASA),
get_ASA_arg_names(), PWD
)
slurm_out = os.path.join(cli_args["output"], "slurm-out-{}.txt".format(
now().strftime("%Y-%b-%d-%H-%M")
))
open(slurm_out, "w+").close() # os.makedirs(slurm_out, exist_ok=True)
cli_args["sbatch"] = [
"sbatch", "--time={}".format(cli_args["time"]), "--output", slurm_out,
"-A", cli_args["account"], "--mem={}gb".format(cli_args["memory"]),
"-c", str(cli_args["cpus"]), os.path.join(PWD, ASA)
]
try:
submit_batch_jobs(cli_args)
except Exception as e:
get_and_print_timestamp_when(sys.argv[0], "crashed")
raise e
# Print the date and time when this script started and finished running
print(starting_timestamp)
get_and_print_timestamp_when(sys.argv[0], "finished")
def get_submitter_cli_args(script_description, arg_names, pwd, validate=None):
"""
Get and validate all args from command line using argparse.
:param script_description: String describing the basic purpose of a script,
shown when the user runs the script with --help
:param arg_names: List of strings, each of which names a flag which the
user can call the script with
:param pwd: String which is a valid path to the parent directory of the
script currently being run
:param validate: Function to pass output namespace and its parser into to
validate all user inputs
:return: Namespace containing all validated inputted command line arguments
"""
# Create arg parser, and fill it with parameters shared with other scripts
parser = initialize_subset_analysis_parser(argparse.ArgumentParser(
description=script_description
), pwd, arg_names)
# This block differs from conan_tools.get_cli_args by adding a new
# argument and converting cli_args into a dictionary
default_acct = "feczk001"
default_jobs = 100
default_gb_mem = 1
default_sleep = 60
default_time_limit = "04:00:00"
parser.add_argument(
"-A", "--account",
default=default_acct,
help="Name of the account to submit the SBATCH job under."
)
parser.add_argument(
"-c", "--cpus",
type=valid_whole_number,
default=default_gb_mem,
help=("Number of CPUs to assign to each SBATCH job.")
)
parser.add_argument(
"-mem", "--memory",
type=valid_whole_number,
default=default_gb_mem,
help=("Memory in gigabytes (GB) to assign to each sbatch job. The "
"default number is {} GB.".format(default_gb_mem))
)
parser.add_argument(
"-print-cmd",
"--print-command",
action="store_true",
help=("Include this flag to print every command that is run to submit "
"an {} batch job.".format(ASA))
)
parser.add_argument(
"-q", "-queue",
"--queue-max-size",
type=valid_whole_number,
default=default_jobs,
help=("The maximum number of jobs to run simultaneously. By default, "
"a maximum of {} jobs will run at once.".format(default_jobs))
)
parser.add_argument(
"-sleep",
"--seconds-between-jobs",
dest="sleep",
type=valid_whole_number,
default=default_sleep,
help=("Number of seconds to wait between batch job submissions. The "
"default number is {}.".format(default_sleep))
)
parser.add_argument(
"-time",
"--job-time-limit",
dest="time",
type=valid_time_str,
default=default_time_limit,
help=("Time limit for each {} batch job. The time limit must be "
"formatted specifically as HH:MM:SS where HH is hours, MM is "
"minutes, and SS is seconds. {} is the default time limit."
.format(ASA, default_time_limit))
)
return vars(validate(parser.parse_args(), parser)
if validate else parser.parse_args())
def get_asa_options(cli_args):
"""
:param cli_args: Dictionary with all validated command-line
arguments, all of which are used by this function
:return: List of some cli_args optional arguments and their values
"""
asa_optional_args = []
for arg in get_ASA_arg_names():
if arg not in (GP_DEMO_FILE.format(1), GP_DEMO_FILE.format(2),
"n_analyses", "output", "subset_size"):
if cli_args[arg]:
asa_optional_args.append(as_cli_arg(arg))
if isinstance(cli_args[arg], list):
for el in cli_args[arg]:
asa_optional_args.append(str(el))
elif not isinstance(cli_args[arg], bool):
asa_optional_args.append(str(cli_args[arg]))
return asa_optional_args
def valid_time_str(in_arg):
"""
:param in_arg: Object to check if it's a time string in the right format
:return: True if in_arg is a string representing a time limit in the format
HH:MM:SS; otherwise False
"""
try:
split = in_arg.split(":")
assert len(split) == 3
for each_num in split:
assert each_num.isdigit()
assert int(each_num) >= 0
return in_arg
except (TypeError, AssertionError, ValueError):
raise argparse.ArgumentTypeError("Invalid time string.")
def count_jobs_running():
"""
:return: Integer counting how many ASA batch jobs are running right now
"""
return subprocess.check_output("squeue", universal_newlines=True
).count(JOB_SHORTNAME)
def get_batch_command(cli_args, out_num, subset_size):
"""
Get command to run automated_subset_analysis batch job
:param cli_args: Dictionary with all validated command-line
arguments, all of which are used by this function
:param out_num: Integer from 1 to cli_args["n_analyses"] representing which
analysis this batch command is
:param subset_size: Integer which is an element of cli_args["subset_size"]
:return: List of strings which can be called as a command to run an
automated_subset_analysis batch job
"""
return (cli_args["sbatch"] + [
cli_args[GP_DEMO_FILE.format(1)],
cli_args[GP_DEMO_FILE.format(2)],
"--output",
os.path.join(cli_args["output"], OUTPUT_DIRNAME.format(out_num)),
"--n-analyses", "1",
"--parallel", PWD,
"--subset-size", str(subset_size)
] + get_asa_options(cli_args))
def submit_batch_jobs(cli_args):
"""
Submit automated_subset_analysis batch jobs to run in parallel
:param cli_args: Dictionary with all validated command-line
arguments, all of which are used by this function
:return: N/A
"""
all_jobs_subset_sizes = cli_args["subset_size"] * cli_args["n_analyses"]
keep_adding_jobs = True
out_num = 0
while len(all_jobs_subset_sizes) > 0:
jobs_running = count_jobs_running()
with open("./submissions.txt", "a+") as infile: # TODO remove this print?
infile.write("keep_adding_jobs: {}, len(all_jobs_subset_sizes): "
"{}, out_num: {}, running: {}, queue_max: {}\n"
.format(keep_adding_jobs, len(all_jobs_subset_sizes),
out_num, jobs_running,
cli_args["queue_max_size"]))
if keep_adding_jobs:
if all_jobs_subset_sizes[-1] == cli_args["subset_size"][-1]:
out_num += 1
subprocess.check_call(get_batch_command(
cli_args, out_num, all_jobs_subset_sizes.pop()
))
time.sleep(cli_args["sleep"])
keep_adding_jobs = jobs_running < cli_args["queue_max_size"]
if __name__ == "__main__":
main()