-
Notifications
You must be signed in to change notification settings - Fork 2
/
execute-kafka2bq-suite.sh
129 lines (109 loc) · 4.67 KB
/
execute-kafka2bq-suite.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/bin/bash
set -eu
if [ "$#" -ne 2 ] && [ "$#" -ne 3 ]
then
echo "Usage : sh execute-suite-example.sh <gcp project> <a string containing topic/bootstrap-servers> <optional params>"
exit -1
fi
MORE_PARAMS=""
if (( $# == 3 ))
then
MORE_PARAMS=$MORE_PARAMS$3
fi
# Beam version var is unset, this will default in the pom.xml definitions
BEAM_VERSION=
# Other manual configurations
PROJECT_ID=$1
RUN_NAME=$2
REGION=us-central1
ZONE=us-central1-a
BUCKET=$2-staging-$1
echo "creating infrastructure"
pushd infra
# we need to create kafka infrastructure, bq dataset and staging bucket
source ./tf-apply.sh $PROJECT_ID $RUN_NAME false true false false true
# capture the outputs in variables
TF_JSON_OUTPUT=$(terraform output -json)
SUBNET=$(echo $TF_JSON_OUTPUT | jq .subnet.value | tr -d '"')
DF_SA=$(echo $TF_JSON_OUTPUT | jq .df_sa.value | tr -d '"')
KAFKA_IP=$(echo $TF_JSON_OUTPUT | jq .kafka_ip.value | tr -d '"')
REMOTE_JMPSVR_IP=$(echo $TF_JSON_OUTPUT | jq .jmpsrv_ip.value | tr -d '"')
popd
echo "give some time to infra to stabilize..."
# now we wait for the first kafka node to be available
KAFKA_NODE=kafka-$RUN_NAME-0
KFK_UP_CMD="while ! nc -z $KAFKA_NODE 9092 ; do sleep 1 ; done"
echo "waiting for online kafka node"
ssh -o "StrictHostKeyChecking=no" $USER@$REMOTE_JMPSVR_IP $KFK_UP_CMD
echo "found kafka online"
# since the kafka IO implementation needs to be able to read the partition metadata
# we need to make sure to build the packaged jar files and upload them to the created jump server
sh build.sh
echo "starting data generator"
JOB_NAME=datagen-kafka-`echo "$RUN_NAME" | tr _ -`-${USER}
TOPIC_AND_BOOTSTRAPSERVERS=$RUN_NAME/$KAFKA_IP:9092
# launch the pipeline locally since the sink does not need to validate against the cluster
java -jar streaming-data-generator/target/streaming-data-generator-bundled-0.0.1-SNAPSHOT.jar \
--project=$PROJECT_ID \
--subnetwork=$SUBNET \
--streaming \
--stagingLocation=gs://$BUCKET/dataflow/staging \
--tempLocation=gs://$BUCKET/dataflow/temp \
--gcpTempLocation=gs://$BUCKET/dataflow/gcptemp \
--enableStreamingEngine \
--autoscalingAlgorithm=THROUGHPUT_BASED \
--numWorkers=50 \
--maxNumWorkers=1000 \
--runner=DataflowRunner \
--workerMachineType=n1-standard-4 \
--usePublicIps=false \
--jobName=${JOB_NAME} \
--region=${REGION} \
--outputTopic=${TOPIC_AND_BOOTSTRAPSERVERS} \
--sinkType=KAFKA \
--className=com.google.cloud.pso.beam.generator.thrift.CompoundEvent \
--generatorRatePerSec=100000 \
--sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients":"WARN"}' \
--maxRecordsPerBatch=1000 \
--compressionEnabled=false \
--serviceAccount=$DF_SA \
--completeObjects=true $MORE_PARAMS
echo "starting processing pipeline"
# in the case of the ingestion pipeline, the read IO needs to validate and capture the partitions from the cluster
# this can not be achieved from the launcher machine since it may not have connectivity to the cluster in GCP
# we will need to upload the jar file to the jump server created by the infrastructure and then launch the pipeline.
scp -o "StrictHostKeyChecking=no" canonical-streaming-pipelines/target/streaming-pipelines-bundled-0.0.1-SNAPSHOT.jar $USER@$REMOTE_JMPSVR_IP:~
JOB_NAME=kafka2bq-`echo "$RUN_NAME-sub" | tr _ -`-${USER}
BQ_TABLE_NAME=`echo "$RUN_NAME-sub" | tr - _`
BQ_DATASET_ID=`echo "$RUN_NAME" | tr - _`
EXEC_CMD="java -cp ~/streaming-pipelines-bundled-0.0.1-SNAPSHOT.jar com.google.cloud.pso.beam.pipelines.StreamingSourceToBigQuery \
--project=$PROJECT_ID \
--subnetwork=$SUBNET \
--streaming \
--stagingLocation=gs://$BUCKET/dataflow/staging \
--tempLocation=gs://$BUCKET/dataflow/temp \
--gcpTempLocation=gs://$BUCKET/dataflow/gcptemp \
--enableStreamingEngine \
--autoscalingAlgorithm=THROUGHPUT_BASED \
--numWorkers=20 \
--maxNumWorkers=400 \
--experiments=min_num_workers=1 \
--runner=DataflowRunner \
--workerMachineType=n2d-standard-4 \
--usePublicIps=false \
--jobName=${JOB_NAME} \
--region=${REGION} \
--serviceAccount=$DF_SA \
--createBQTable \
--subscription=${TOPIC_AND_BOOTSTRAPSERVERS} \
--sourceType=KAFKA \
--timestampType=LOG_APPEND_TIME \
--inputTopic=$RUN_NAME \
--bootstrapServers=$KAFKA_IP:9092 \
--consumerGroupId=$RUN_NAME \
--useStorageApiConnectionPool=false \
--bigQueryWriteMethod=STORAGE_API_AT_LEAST_ONCE \
--sdkHarnessLogLevelOverrides='{\"org.apache.kafka.clients\":\"WARN\", \"org.apache.kafka.clients.consumer.internals\":\"WARN\", \"org.apache.kafka.common.metrics\":\"WARN\", \"org.apache.kafka.common.utils\":\"WARN\"}' \
--outputTable=${PROJECT_ID}:${BQ_DATASET_ID}.stream_${BQ_TABLE_NAME} \
--tableDestinationCount=1 $MORE_PARAMS"
ssh -o "StrictHostKeyChecking=no" $USER@$REMOTE_JMPSVR_IP $EXEC_CMD