forked from ET-NCMP/MarineQC
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Tracking_QC_wrapper.py
255 lines (195 loc) · 9.55 KB
/
Tracking_QC_wrapper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
import time
import argparse
import ConfigParser
import sys
import os
import subprocess
import qc
import YMCounter as ym
import json
def safe_dir(out_dir, year, month):
"""
Make a directory for this year and month as subdirectories of out_dir
:param out_dir: base directory in which new directories will be created
:param year: year for subdirectory
:param month: month for subdirectory
:return: None
"""
syr = str(year)
smn = "{:02}".format(month)
d2 = out_dir+'/'+syr+'/'+smn+'/'
return d2
def checkload(uname):
"""
Check how many submitted jobs the user uname has in the queue
:param uname: username of user running the script
:return: number of submitted jobs
"""
stream = subprocess.check_output(['squeue', '-l'])
stream = ''.join(stream)
count = stream.count(uname)
return count
def safemake(folder):
"""
Little script to check if a folder exists and if not, to make it
:param folder: path of folder to create
:return: None
"""
if not os.path.exists(folder):
os.mkdir(folder)
def write_submission(config, targetid, yr1, mn1, yr2, mn2, edge, runmonthid, outdir, runid):
"""
Builds a command to run like this:
python tracking_qc.py -id 51514 -yr1 2004 -mn1 5 -yr2 2006 -mn2 1 -edge start_edge_case -runmonthid 198501-201412
and submits it to the scheduler to run. It will submit a maximum of 250 jobs. If there are more in the queue
then it will wait and check again.
:param config: location of configuration file
:param targetid: the ID of the buoy to be checked
:param yr1: the start year of the chunk of data to be QCd
:param mn1: the start month of the chunk of data to be QCd
:param yr2: the end year of the chunk of data to be QCd
:param mn2: the end month of the chunk of data to be QCd
:param edge: specifies what kind of case this is
:param runmonthid: a runmonthid which will be used to label directories for start and end edge cases
:param outdir: the base directory used to write out tracking QC log files and store submitted jobs
:param runid: the name of the run as specified in the parameter file
:return:
"""
codedir = '/net/home/h04/hadjj/PyWorkspace/Marine_QC/'
fname = 'Tracking_job_{}_{}{:02}_{}{:02}_{}_{}'.format(targetid.replace(" ", ""),
yr1, mn1, yr2, mn2, runmonthid, runid)
options = {'id': targetid, 'config': config,
'yr1': yr1, 'mn1': mn1, 'yr2': yr2, 'mn2': mn2,
'edge': " ".join(edge), 'runmonthid': runmonthid}
command = 'python {}tracking_qc.py'.format(codedir)
for op in options:
command = command + " -{} {}".format(op, options[op])
logdir = outdir + '/TrackingQC/logs'
jobdir = outdir + '/TrackingQC/jobs'
safemake(outdir + '/TrackingQC')
safemake(logdir)
safemake(jobdir)
outfile = open(jobdir+'/'+fname, 'w')
outfile.write("#!/bin/bash -l\n")
outfile.write("#SBATCH --mem=8000\n")
outfile.write("#SBATCH --ntasks=1\n")
outfile.write("#SBATCH --output={}/SPICE_trackqc_errout{}{:02}{}_{}\n".format(logdir, yr2, mn2,
targetid.replace(" ", ""), runid))
outfile.write("#SBATCH --time=30\n")
outfile.write("\n")
outfile.write("module load scitools/default_legacy-current\n")
outfile.write(command + " > {}/SPICE_trackqc_genout{}{:02}{}_{}\n".format(logdir, yr2, mn2,
targetid.replace(" ", ""), runid))
outfile.close()
os.system("sbatch "+jobdir+"/"+fname)
count = checkload('hadjj')
while count > 250:
print("Reached max jobs (250), waiting 20 seconds")
time.sleep(20)
count = checkload('hadjj')
def main(argv):
"""
Tracking_QC_wrapper.py
script to control the running of the tracking QC::
python Tracking_QC_wrapper.py -config configuration.txt -gap 3 -yr1 1985 -yr2 2005 -mn1 1 -mn2 12 -edge new
Reads in files containing list of IDs for each month and decides when to quality control the observations:
Inputs
-config
specifies the location of the configuration file.
-gap
specifies the gap in months that must separate chunks of data
-yr1
year of start month.
-mn1
month of start month.
-yr2
year of end month.
-mn2
month of end month.
-edge
specifies how different cases should be treated. 'all' will run QC for all chunks separated by "gap" months of
data; 'standard' will run for all chunks except for those that start or end fewer than "gap" months from the start
or end of the series; 'new' will run only those chunks that have a gap of exactly "gap" months from the end of the
series; 'noend' will run for all chunks except for those that end fewer than "gap" months from the end of the series.
The four "edge" cases allow for running in different modes. In principle, 'standard' will QC everything that will
not change from the addition of data to the start or end of the series. It is intended for running all the
historical QC in preparation for monthly updates. The flag 'new' can be used for real time updates to only QC those
IDs that have not been eligible for QC in earlier months and have an appropriate gap at the end of the series. The
flag 'all' will QC everything, including chunks at the start and end of the series which may change with extra data
appended to either end of the series. The 'noend' flag can be used to QC everything that will not change from addition
of data to the end of the series, which may be more appropriate ahead of monthly updates.
Note that adding extra data in the middle of the series is liable to change all QC outcomes regardless of whether QC
was run in 'all', 'standard', 'noend' or 'new' configurations.
"""
parser = argparse.ArgumentParser(description='Marine QC system, main program')
parser.add_argument('-config', type=str, default='configuration.txt', help='name of config file')
parser.add_argument('-gap', type=int, default=3, help='gap of -gap months needed to trigger QC of ID')
parser.add_argument('-yr1', type=int, default=1985, help='first year to analyse')
parser.add_argument('-yr2', type=int, default=2019, help='last year to analyse')
parser.add_argument('-mn1', type=int, default=1, help='first month to analyse in first year')
parser.add_argument('-mn2', type=int, default=12, help='last month to analyse in last year')
parser.add_argument('-edge', type=str, default='standard', help='How to deal with edge cases')
args = parser.parse_args()
inputfile = args.config
y1 = args.yr1
y2 = args.yr2
m1 = args.mn1
m2 = args.mn2
gap = args.gap
edge = args.edge
runmonthid = "{}{:02}-{}{:02}".format(y1, m1, y2, m2)
if edge not in ['standard', 'all', 'new', 'noend']:
raise Exception("edge not one of 'standard', 'all', 'new' or 'noend'")
config = ConfigParser.ConfigParser()
config.read(inputfile)
out_dir = config.get('Directories', 'out_dir')
track_out_dir = config.get('Directories', 'track_out_dir')
with open(config.get('Files', 'parameter_file'), 'r') as f:
parameters = json.load(f)
# establish full list of IDs to QC
id_dictionary = {}
for year, month in qc.year_month_gen(y1, m1, y2, m2):
# create directory and file names for the ID list
extdir = safe_dir(out_dir, year, month)
idfile = open(extdir + '/ID_file.txt', 'r')
for line in idfile:
line = line.rstrip("\n")
columns = line.split(',')
if columns[0] in id_dictionary:
id_dictionary[columns[0]].setym(year, month, 1)
else:
id_dictionary[columns[0]] = ym.YMCounter(y1, m1, y2, m2)
id_dictionary[columns[0]].setym(year, month, 1)
idfile.close()
for targetid in id_dictionary:
g = id_dictionary[targetid]
print(targetid, g.counter)
for yy1, mm1, yy2, mm2, cl in g.yield_start_and_end_dates(gap):
if edge == 'all':
print('Submit', yy1, mm1, yy2, mm2, cl)
write_submission(inputfile, targetid, yy1, mm1, yy2, mm2, cl,
runmonthid, track_out_dir, parameters['runid'])
if edge == 'standard':
if 'regular' in cl:
print('Submit', yy1, mm1, yy2, mm2, cl)
write_submission(inputfile, targetid, yy1, mm1, yy2, mm2, cl,
runmonthid, track_out_dir, parameters['runid'])
else:
print('Ignore', yy1, mm1, yy2, mm2, cl)
if edge == 'new':
if 'new' in cl:
print('Submit', yy1, mm1, yy2, mm2, cl)
write_submission(inputfile, targetid, yy1, mm1, yy2, mm2, cl,
runmonthid, track_out_dir, parameters['runid'])
else:
print('Ignore', yy1, mm1, yy2, mm2, cl)
if edge == 'noend':
if 'regular' in cl or 'start_edge_case' in cl:
print('Submit', yy1, mm1, yy2, mm2, cl)
write_submission(inputfile, targetid, yy1, mm1, yy2, mm2, cl,
runmonthid, track_out_dir, parameters['runid'])
else:
print('Ignore', yy1, mm1, yy2, mm2, cl)
print()
if __name__ == '__main__':
main(sys.argv[1:])