-
Notifications
You must be signed in to change notification settings - Fork 2
/
joblog.py
154 lines (133 loc) · 5.56 KB
/
joblog.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#!/usr/bin/env python3
from subprocess import Popen, PIPE, check_call
from json import dump as json_dump
import sys, os, argparse
import re
import time
from datetime import datetime, timedelta
from collections import defaultdict
from dateutil.tz import tzlocal
JOB_FIELDS = ['JobId', 'JobName', 'StartTime', 'EndTime', 'SubmitTime', 'NumNodes', 'NumCPUs', 'NumTasks', 'Dependency', 'ExitCode']
JOB_STEPS_FIELDS = ['JobID','NNodes','NTasks','NCPUS','Start','End','Elapsed','JobName','NodeList','ExitCode','State', 'Submit']
ACTIVE_STATES = ['COMPLETING', 'PENDING', 'RUNNING', 'CONFIGURING', 'RESIZING']
MAJOR_VERSION = 0
MINOR_VERSION = 1
DATE_INPUT_FORMAT = '%Y-%m-%dT%H:%M:%S'
DATE_OUTPUT_FORMAT = '%Y-%m-%dT%H:%M:%S%z'
def output_datetime(tout: str) -> datetime:
idx = tout.rfind(':')
date_str = tout[:idx] + tout[idx + 1:]
return datetime.strptime(date_str,DATE_OUTPUT_FORMAT)
def is_integer(num) -> bool:
try:
int(num)
return True
except:
return False
def contains_step_id(data: str) -> bool:
return re.search(r"[0-9]+\.[0-9]+", data) != None
def convert_timestamp(timestamp: str) -> str:
tmp_date = datetime.strptime(timestamp,DATE_INPUT_FORMAT).astimezone(tzlocal()).isoformat()
return str(tmp_date)
def job_exists(jobid: int):
cmd = "scontrol show jobid {}".format(jobid)
with Popen(cmd, shell=True,stdout=PIPE) as proc:
proc.wait()
return proc.returncode == 0
def export_json(path: str, job_desc: dict) -> None:
with open("{}/job_log.json".format(path),'w') as file:
json_dump(job_desc, file)
def job_has_steps(jobid: int) -> bool:
cmd = "sacct -j {} --format=JobID --nohead".format(jobid)
regex = r"[0-9]+\.[0-9]+"
with Popen(cmd, shell=True, stdout=PIPE) as proc:
proc.wait()
data = proc.stdout.read().decode("utf-8")
return re.search(regex, data) != None
def steps_active(jobid: int) -> bool:
cmd = "sacct -j {} --format=JobID,State --nohead".format(jobid)
with Popen(cmd, shell=True, stdout=PIPE) as proc:
proc.wait()
all_steps = [step.decode("utf-8").rstrip() for step in proc.stdout if contains_step_id(step.decode("utf-8"))]
return any(map(lambda s: s.split()[1] in ACTIVE_STATES, all_steps))
def job_active(jobid: int) -> bool:
cmd = "sacct -j {} --format=State --nohead".format(jobid)
with Popen(cmd, shell=True, stdout=PIPE) as proc:
proc.wait()
state = proc.stdout.read().decode("utf-8").rstrip()
return state in ACTIVE_STATES
def wait_on_slurm(jobid: int) -> None:
wait_time = 0.1
if job_has_steps(jobid):
while steps_active(jobid):
time.sleep(wait_time)
else:
while job_active(jobid):
time.sleep(wait_time)
def job_deps(jobid: int) -> str:
cmd = "scontrol show jobid -dd {}".format(jobid)
with Popen(cmd, shell=True, stdout=PIPE) as proc:
for line in proc.stdout:
l = line.decode("utf-8").rstrip()
fields = [opts.split('=') for opts in l.split(' ') if opts != '']
for field in fields:
if field[0] == 'Dependency':
return field[1]
def job_info(jobid: int) -> dict:
job_info = dict()
job_info["steps"] = dict()
cmd = "sacct -n -P -j {} --format={}".format(jobid, ','.join(JOB_STEPS_FIELDS))
with Popen(cmd, shell=True, stdout=PIPE) as proc:
for line in proc.stdout:
fields = line.decode("utf-8").rstrip().split('|')
# Add job info
if fields[0].isdecimal():
for k, v in zip(JOB_STEPS_FIELDS, fields):
if (k == 'Start' or k == 'End' or k == 'Submit'):
try:
job_info[k] = convert_timestamp(v)
except ValueError:
job_info[k] = None
else:
job_info[k] = v
# Add job step info
if contains_step_id(fields[0]):
job_info["steps"][fields[0]] = dict()
for k, v in zip(JOB_STEPS_FIELDS[1:], fields[1:]):
if k == "Submit":
continue
if k == 'Start' or k == 'End':
v = convert_timestamp(v)
job_info["steps"][fields[0]][k] = v
# Create Queue Time
if 'Start' in job_info and 'Submit' in job_info:
start_time = output_datetime(job_info['Start'])
submit_time = output_datetime(job_info['Submit'])
job_info['QueueTime'] = str(start_time - submit_time)
# Add steps if there no one
if not job_info["steps"]:
virt_step = {k : job_info[k] for k in JOB_STEPS_FIELDS[1:]}
if not virt_step['End'] :
e = datetime.strptime(job_info['Elapsed'] , '%H:%M:%S')
dt = timedelta(seconds=e.second, minutes=e.minute, hours=e.hour)
start = output_datetime(virt_step['Start'])
virt_step['End'] = str(start + dt)
job_info['End'] = virt_step['End']
# TODO elif not job_info['End']: update job_info['End'] by counting elapsed times over all steps
job_info["steps"][job_info['JobID']] = virt_step
return job_info
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("jobid", help="The SLURM job id of the running job.", type=int)
parser.add_argument("output", help="Path to output directory", type=str)
args = parser.parse_args()
wait_on_slurm(args.jobid)
if not os.path.exists(args.output):
sys.exit("Given path does not exist.")
info = job_info(args.jobid)
info.update({ "Version" : {"Major" : MAJOR_VERSION, "Minor" : MINOR_VERSION }})
if not job_active(args.jobid):
print("[INFO] Could not determine job dependencies because your job is not alive.")
else:
info.update({'Dependency': job_deps(args.jobid)})
export_json(args.output, info)