forked from ParkinsonLab/MetaPro
-
Notifications
You must be signed in to change notification settings - Fork 0
/
MetaPro_utilities.py
437 lines (373 loc) · 18.4 KB
/
MetaPro_utilities.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
#This program is free software: you can redistribute it and/or modify
#it under the terms of the GNU General Public License as published by
#the Free Software Foundation, either version 3 of the License, or
#(at your option) any later version.
#This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#GNU General Public License for more details.
#You should have received a copy of the GNU General Public License
#along with this program. If not, see <https://www.gnu.org/licenses/>.
#April 12, 2021
#---------------------------------------------
#MetaPro_utilities.py
#This code houses the various helper functions MetaPro uses to coordinate the multi-threaded traffic.
import sys
import os
import os.path
from argparse import ArgumentParser
from configparser import ConfigParser, ExtendedInterpolation
import multiprocessing as mp
import MetaPro_commands as mpcom
import MetaPro_paths as mpp
import time
import zipfile
import pandas as pd
import shutil
from datetime import datetime as dt
import psutil as psu
class mp_util:
def __init__(self, output_folder_path, config_path):
self.mp_store = []
self.output_folder_path = output_folder_path
self.paths = mpp.tool_path_obj(config_path)
self.bypass_log_name = self.paths.bypass_log_name
def mem_checker(self, threshold):
#threshold is a percentage for available memory.
mem = psu.virtual_memory()
available_mem = mem.available
total_mem = mem.total
available_pct = 100 * available_mem / total_mem
if(float(available_pct) <= float(threshold)):
return False
else:
return True
def mem_footprint_checker(self, count, mem_footprint):
#assumes some pre-determinted footprint for the program in question.
#then it doesn't let the program run
mem = psu.virtual_memory()
total_mem = mem.total/(1024*1024*1024)
occupied_mem = count * mem_footprint
max_limit = total_mem * 0.9
if(occupied_mem > max_limit):
return False
else:
return True
def make_folder(self, folder_path):
if not (os.path.exists(folder_path)):
os.makedirs(folder_path)
def delete_folder_simple(self, folder_path):
if(os.path.exists(folder_path)):
print(dt.today(), "Deleting:", folder_path)
shutil.rmtree(folder_path)
print(dt.today(), "finished deleting:", folder_path)
def delete_folder(self, folder_path):
if (os.path.exists(os.path.join(folder_path, "data"))):
print("deleting", os.path.join(folder_path, "data"))
shutil.rmtree(os.path.join(folder_path, "data"))
else:
print("can't delete folder: doesn't exist:", folder_path)
def compress_folder(self, folder_path):
zip_loc = os.path.join(folder_path, "data")
z = zipfile.ZipFile(folder_path + "_data.zip", "a", zipfile.ZIP_DEFLATED)
print("compressing interim files:", folder_path)
for root, dirs, files in os.walk(zip_loc):
#print("root:", root)
#print("dirs:", dirs)
#print("files:", files)
#print("===============================")
for file in files:
z.write(os.path.join(root, file))
z.close()
def write_to_bypass_log(self, folder_path, message):
bypass_log_path = os.path.join(folder_path, self.bypass_log_name)
with open(bypass_log_path, "a") as bypass_log:
bypass_log.write("\n")
new_message = message + "\n"
bypass_log.write(new_message)
def check_bypass_log(self, folder_path, message):
stop_message = "stop_" + str(message)
bypass_keys_list = list()
bypass_log_path = os.path.join(folder_path, self.bypass_log_name)
if(os.path.exists(bypass_log_path)):
with open(bypass_log_path, "r") as bypass_log:
for line in bypass_log:
bypass_key = line.strip("\n")
bypass_keys_list.append(bypass_key)
if(stop_message in bypass_keys_list):
print(dt.today(), "stopping at:", message)
print("to continue, remove:", stop_message, "from the bypass_log")
sys.exit("brakes engaged")
elif(message in bypass_keys_list):
print(dt.today(), "bypassing:", message)
return False
else:
print(dt.today(), "running:", message)
return True
else:
open(bypass_log_path, "a").close()
print(dt.today(), "no bypass log. running:", message)
return True
def conditional_write_to_bypass_log(self, label, stage_folder, file_name):
#convenience for checking if a file exists, and writing to the bypass log
if self.check_bypass_log (self.output_folder_path, label):
file_path = os.path.join(self.output_folder_path, stage_folder, file_name)
if(os.path.exists(file_path)):
self.write_to_bypass_log(self.output_folder_path, label)
# Used to determine quality encoding of fastq sequences.
# Assumes Phred+64 unless there is a character within the first 10000 reads with encoding in the Phred+33 range.
def check_code(self, segment):
encoding = 64
for item in segment:
if(ord(item) < 64):
encoding = 33
break
return encoding
def determine_encoding(self, fastq):
#import the first 10k lines, then check the quality scores.
#if the quality score symbols are below 76, it's phred33.
fastq_df = pd.read_csv(fastq, header=None, names=[None], sep="\n", skip_blank_lines = False, quoting=3, nrows=40000)
fastq_df = pd.DataFrame(fastq_df.values.reshape(int(len(fastq_df)/4), 4))
fastq_df.columns = ["ID", "seq", "junk", "quality"]
quality_encoding = fastq_df["quality"].apply(lambda x: self.check_code(x)).mean() #condense into a single number.
if(quality_encoding == 64): #all must be 64 or else it's 33
quality_encoding = 64
else:
quality_encoding = 33
return quality_encoding
# handles where to kill the pipeline, due to the prev step behaving badly
# logic is: if the files inside the dep_path (or dep job label shortcut to the final_results)
# are empty, then there's an error. kill the pipeline
def check_where_kill(self, dep_job_label=None, dep_path=None):
if dep_job_label is None:
if dep_path is None:
return True
else:
dep_job_path = dep_path
else:
dep_job_path = os.path.join(dep_job_label, "final_results")
file_list = os.listdir(dep_job_path)
if len(file_list) > 0:
for item in file_list:
file_check_path = os.path.join(dep_job_path, item)
if (os.path.getsize(file_check_path)) == 0:
print("empty file detected: rerunning stage")
sys.exit("bad dep")
# run the job, silently
return True
else:
print("stopping the pipeline. dependencies don't exist")
sys.exit("no dep")
# handles where to auto-resume the pipeline on a subsequent run
# label: used as a shorthand for paths we expect
# full path: a bypass for when we want to use it for detecting a location that doesn't fall into the normal format (final_results)
# dep: for checking if the job's dependencies are satisfied-> meant to point to the last stage's "final_results"
# logic is: if the full_path has no files (or the job label shortcut to final_results)
# and the dependencies are ok, start the stage
#Aug 19, 2019: There's a tweak to this: DIAMOND will generate zero-size files, due to no-matches
#it's allowable.
def check_where_resume(self, job_label=None, full_path=None, dep_job_path=None, file_check_bypass = False):
if(not file_check_bypass):
self.check_where_kill(dep_job_path)
if job_label:
job_path = os.path.join(job_label, "final_results")
else:
job_path = full_path
print("looking at:", job_path)
if os.path.exists(job_path):
file_list = os.listdir(job_path)
if(not file_check_bypass):
if len(file_list) > 0:
for item in file_list:
file_check_path = os.path.join(job_path, item)
if (os.path.getsize(file_check_path)) == 0:
print("empty file detected: rerunning stage")
return False
print("bypassing!")
return True
else:
print("no files detected: running")
return False
else:
print("bypassing for special reasons")
return True
else:
print("doesn't exist: running")
return False
def launch_and_create_simple(self, job_location, job_label, command_obj, commands):
#just launches a job. no multi-process.
process = mp.Process(
target=command_obj.create_and_launch,
args=(job_location, job_label, commands)
)
process.start()
process.join()
def launch_and_create_with_mp_store(self, job_location, job_label, command_obj, commands):
#launches a job. doesn't wait. but stores it in the mp_store queue
process = mp.Process(
target=command_obj.create_and_launch,
args=(job_location, job_label, commands)
)
process.start()
self.mp_store.append(process)
def launch_only_simple(self, command_obj, commands):
process = mp.Process(
target=command_obj.launch_only,
args=(commands, len(commands))
)
process.start()
process.join()
def subdivide_and_launch(self, job_delay, mem_threshold, job_limit, job_location, job_label, command_obj, commands):
#just launches a job. no multi-process.
#Jan 25, 2022: now adding job controls.
job_counter = 0
for item in commands:
job_name = job_label + "_" + str(job_counter)
job_counter += 1
job_submitted = False
while(not job_submitted):
if(len(self.mp_store) < job_limit):
if(self.mem_checker(mem_threshold)):
process = mp.Process(
target=command_obj.create_and_launch,
args=(job_location, job_name, [item])
)
process.start()
self.mp_store.append(process)
print(dt.today(), job_name, "job submitted. mem:", psu.virtual_memory().available/(1024*1024*1000), "GB", end='\r')
job_submitted = True
else:
time.sleep(job_delay)
else:
self.wait_for_mp_store()
time.sleep(job_delay)
#final wait for everything to be done
self.wait_for_mp_store()
def launch_only_with_hold(self, mem_threshold, job_limit, job_delay, job_name, command_obj, command):
#launch a job in launch-only mode
job_submitted = False
while(not job_submitted):
if(len(self.mp_store) < job_limit):
if(self.mem_checker(mem_threshold)):
process = mp.Process(
target = command_obj.launch_only,
args = (command, len(command))
)
process.start()
self.mp_store.append(process)
print(dt.today(), job_name, "job submitted. mem:", psu.virtual_memory().available/(1024*1024*1000), "GB", end='\r')
job_submitted = True
else:
#print(dt.today(), job_name, "Pausing. mem limit reached:", psu.virtual_memory().available/(1024*1024*1000), "GB", end='\r')
time.sleep(job_delay)
else:
print(dt.today(), "job limit reached. waiting for queue to flush")
self.wait_for_mp_store()
time.sleep(job_delay)
def launch_and_create_with_hold(self, mem_threshold, job_limit, job_delay, job_location, job_name, command_obj, command):
#launch a job in launch-with-create mode
job_submitted = False
while(not job_submitted):
if(len(self.mp_store) < job_limit):
if(self.mem_checker(mem_threshold)):
process = mp.Process(
target = command_obj.create_and_launch,
args = (job_location, job_name, command)
)
process.start()
self.mp_store.append(process)
print(dt.today(), job_name, "job submitted. mem:", psu.virtual_memory().available/(1024*1024*1000), "GB", end='\r')
job_submitted = True
else:
#print(dt.today(), job_name, "Pausing. mem limit reached:", psu.virtual_memory().available/(1024*1024*1000), "GB", end='\r')
time.sleep(job_delay)
else:
print(dt.today(), "job limit reached. waiting for queue to flush")
self.wait_for_mp_store()
#final wait
#self.wait_for_mp_store()
def launch_and_create_with_mem_footprint(self, mem_footprint, job_limit, job_location, job_name, command_obj, command):
#launch a job in launch-with-create mode
#this controller won't be optimized for the system. It's made to keep the node from exploding.
job_submitted = False
while(not job_submitted):
if(len(self.mp_store) < job_limit):
if(self.mem_footprint_checker(len(self.mp_store), mem_footprint)):
process = mp.Process(
target = command_obj.create_and_launch,
args = (job_location, job_name, command)
)
process.start()
self.mp_store.append(process)
job_submitted = True
print(dt.today(), job_name, "job submitted. mem:", len(self.mp_store) * mem_footprint, "GB", end='\r')
else:
print(dt.today(), "job limit reached. waiting for queue to flush")
self.wait_for_mp_store()
else:
print(dt.today(), "job limit reached. waiting for queue to flush")
self.wait_for_mp_store()
#check if all jobs ran
def check_all_job_markers(self, job_marker_list, final_folder_checklist):
time.sleep(2)
#if it's already been created, that means the job was killed.
if(os.path.exists(final_folder_checklist)):
print(dt.today(), final_folder_checklist, "exists: adding to it")
#open it, import it.
with open(final_folder_checklist, "r") as old_list:
for line in old_list:
cleaned_line = line.strip("\n")
job_marker_list.append(cleaned_line)
#then overwrite it
with open(final_folder_checklist, "w") as checklist:
for item in job_marker_list:
checklist.write(item +"\n")
for item in job_marker_list:
if(not os.path.exists(item)):
print(dt.today(), item, "not found. kill the pipe. restart this stage")
sys.exit("not all jobs completed")
else:
with open(final_folder_checklist, "w") as checklist:
for item in job_marker_list:
checklist.write(item +"\n")
for item in job_marker_list:
if(not os.path.exists(item)):
print(dt.today(), item, "not found. kill the pipe. restart this stage")
sys.exit("not all jobs completed")
def limited_wait_for_mp_store(self, limit):
print(dt.today(), "limit for mp_store:", limit)
if(len(self.mp_store) >= limit):
print(dt.today(), "mp_store limit reached. pausing to flush")
for item in self.mp_store:
item.join()
self.mp_store[:] = []
print(dt.today(), "mp_store flushed. continuing")
def wait_for_mp_store(self):
print(dt.today(), "closing down processes: ", len(self.mp_store))
count = 0
for item in self.mp_store:
print(dt.today(), "closed down: " + str(count) + "/" + str(len(self.mp_store)) + " ", end = "\r")
count += 1
item.join()
self.mp_store[:] = []
def clean_or_compress(self, analysis_path, keep_all, keep_stage):
if(keep_all == "no" and keep_stage == "no"):
self.delete_folder(analysis_path)
elif(keep_all == "compress" or keep_stage == "compress"):
self.compress_folder(analysis_path)
self.delete_folder(analysis_path)
def launch_stage_simple(self, job_label, job_path, commands, command_list, keep_all, keep_job):
#wrapper for simple job launches (quality, host)
cleanup_job_start = 0
cleanup_job_end = 0
print("job path:", job_path)
if self.check_bypass_log(self.output_folder_path, job_label):
print(dt.today(), "NEW CHECK running:", job_label)
self.launch_and_create_simple(job_label, job_label, commands, command_list)
self.write_to_bypass_log(self.output_folder_path, job_label)
cleanup_job_start = time.time()
self.clean_or_compress(job_path, keep_all, keep_job)
cleanup_job_end = time.time()
else:
print(dt.today(), "skipping job:", job_label)
return cleanup_job_start, cleanup_job_end