-
Notifications
You must be signed in to change notification settings - Fork 0
/
start-worker.sh
executable file
·76 lines (62 loc) · 3.4 KB
/
start-worker.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#!/usr/bin/env bash
FLINK_HOME="/opt/flink"
FLINK_BIN_DIR="${FLINK_HOME}/bin"
FLINK_CONF_DIR="${FLINK_HOME}/conf"
FLINK_MASTER_HOSTNAME="flink-master"
FLINK_MASTER_WEBUI_PORT=8081
for i in {1..5}; do getent hosts ${FLINK_MASTER_HOSTNAME} && break || echo "Cannot reach jobmanager at hostname ${FLINK_MASTER_HOSTNAME}, will retry" && sleep 5; done
. "${FLINK_BIN_DIR}"/config.sh
# if memory allocation mode is lazy and no other JVM options are set,
# set the 'Concurrent Mark Sweep GC'
if [[ $FLINK_TM_MEM_PRE_ALLOCATE == "false" ]] && [ -z $FLINK_ENV_JAVA_OPTS ]; then
JAVA_VERSION=$($JAVA_RUN -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
# set the GC to G1 in Java 8 and to CMS in Java 7
if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
if [ "$JAVA_VERSION" -lt 18 ]; then
export JVM_ARGS="$JVM_ARGS -XX:+UseConcMarkSweepGC -XX:+CMSClassUnloadingEnabled"
else
export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
fi
fi
fi
if [[ ! ${FLINK_TM_HEAP} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP}" -lt "0" ]]; then
echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
exit 1
fi
if [ "${FLINK_TM_HEAP}" -gt "0" ]; then
TM_HEAP_SIZE=${FLINK_TM_HEAP}
# Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used
TM_MAX_OFFHEAP_SIZE="8388607T"
if useOffHeapMemory; then
if [[ "${FLINK_TM_MEM_MANAGED_SIZE}" -gt "0" ]]; then
# We split up the total memory in heap and off-heap memory
if [[ "${FLINK_TM_HEAP}" -le "${FLINK_TM_MEM_MANAGED_SIZE}" ]]; then
echo "[ERROR] Configured TaskManager memory size ('${KEY_TASKM_MEM_SIZE}') must be larger than the managed memory size ('${KEY_TASKM_MEM_MANAGED_SIZE}')."
exit 1
fi
TM_HEAP_SIZE=$((FLINK_TM_HEAP - FLINK_TM_MEM_MANAGED_SIZE))
else
# Bash only performs integer arithmetic so floating point computation is performed using awk
command -v awk >/dev/null 2>&1
if [[ $? -ne 0 ]]; then
echo "[ERROR] Program 'awk' not found."
echo "Please install 'awk' or define '${KEY_TASKM_MEM_MANAGED_SIZE}' instead of '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}."
exit 1
fi
# We calculate the memory using a fraction of the total memory
if [[ `awk '{ if ($1 > 0.0 && $1 < 1.0) print "1"; }' <<< "${FLINK_TM_MEM_MANAGED_FRACTION}"` != "1" ]]; then
echo "[ERROR] Configured TaskManager managed memory fraction '${FLINK_TM_MEM_MANAGED_FRACTION}' is not a valid value."
echo "It must be between 0.0 and 1.0."
echo "Please set '${KEY_TASKM_MEM_MANAGED_FRACTION}' in ${FLINK_CONF_FILE}."
exit 1
fi
# recalculate the JVM heap memory by taking the off-heap ratio into account
OFFHEAP_MANAGED_MEMORY_SIZE=`awk "BEGIN { printf \"%.0f\n\", ${FLINK_TM_HEAP} * ${FLINK_TM_MEM_MANAGED_FRACTION} }"`
TM_HEAP_SIZE=$((FLINK_TM_HEAP - OFFHEAP_MANAGED_MEMORY_SIZE))
fi
fi
export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"
fi
# Startup parameters
args=("--configDir" "${FLINK_CONF_DIR}")
"${FLINK_BIN_DIR}"/start-common.sh taskmanager "${args[@]}"