-
Notifications
You must be signed in to change notification settings - Fork 0
/
start_cluster.py
executable file
·136 lines (104 loc) · 3.86 KB
/
start_cluster.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#!/usr/bin/env python
"""
Start a Spark cluster on top of an SGE cluster.
"""
from __future__ import print_function
import os
import re
import time
import pickle
import argparse
import subprocess
from os import path
def main(args):
# Getting the binary path
bin_path = path.abspath(path.dirname(__file__))
# The job IDs (the first one is always the worker
job_ids = []
# The master command
options = ["SPARK_VERSION={}".format(args.spark_version)]
master_command = ["qsub", "-terse", "-v", ",".join(options)]
if args.master_host is not None:
master_command.extend(["-q", args.master_host])
master_command.append(path.join(bin_path, "_start_master.sh"))
# Launching the command
p = subprocess.Popen(master_command, stdout=subprocess.PIPE)
# Remember the process ID.
job_ids.append(_strip_pid(p.stdout.read()))
# Getting the host on which the job is running
master_hostname = get_hostname_of_job_id(job_ids[0])
monitor_hostname = get_monitor_hostname(master_hostname)
print("Master running at", master_hostname)
print("Spark server monitor at", monitor_hostname)
processes = []
for i in range(args.nb_workers):
options = ["SPARK_SLAVES_NB_CORES={}".format(args.nb_cpus),
"SPARK_MASTER_HOSTNAME={}".format(master_hostname),
"SPARK_VERSION={}".format(args.spark_version),
"SPARK_SLAVE_NB={}".format(i+1)]
command = [
"qsub", "-terse", "-pe", "multiprocess", str(args.nb_cpus),
"-v", ",".join(options), path.join(bin_path, "_start_slave.sh"),
]
p = subprocess.Popen(command, stdout=subprocess.PIPE)
processes.append(p)
# I do this in two stages so that we don't have to wait for a task to start
# submit the next one.
for p in processes:
job_ids.append(_strip_pid(p.stdout.read()))
filename = "spark_cluster.pkl"
print("Started the Spark cluster, saving the task IDs to '{}'."
"".format(filename))
with open(filename, "wb") as f:
pickle.dump(job_ids, f)
def get_hostname_of_job_id(job_id):
# Checking if the job was launched
command = ["qstat", "-j", job_id]
# Waiting that the job is launched
while True:
time.sleep(1)
p = subprocess.Popen(command, stdout=subprocess.PIPE)
job_info = p.stdout.read().decode("utf-8")
if u"usage" in job_info:
break
# Waiting that the log file appears (NFS, network issue, etc)
log_file = "_spark_master.e{}".format(job_id)
while not path.isfile(log_file):
time.sleep(1)
while os.stat(log_file).st_size < 1:
time.sleep(1)
# Reading the log
hostname = None
while hostname is None:
with open(log_file) as f:
for line in f:
if "INFO Master: Starting Spark master at" in line:
hostname = re.search(r"spark://\S+", line).group()
time.sleep(1)
return hostname
def get_monitor_hostname(hostname):
return re.sub("[0-9]+$", "8080", re.sub("^spark", "http", hostname))
def _strip_pid(s):
return s.decode("utf-8").strip()
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"--spark-version", metavar="VERSIOn", default="2.1.0",
help="The version of spark. [%(default)s]",
)
parser.add_argument(
"--master-host", metavar="HOST",
help="The host of the master (e.g. all.q@srapl-sg-cnc14).",
)
parser.add_argument(
"--nb-workers", metavar="INT", default=2, type=int,
help="The number of workers to spawn. [%(default)d]",
)
parser.add_argument(
"--nb-cpus", metavar="INT", default=10, type=int,
help="The number of CPUs for each worker. [%(default)d]",
)
return parser.parse_args()
if __name__ == "__main__":
args = parse_args()
main(args)