-
Notifications
You must be signed in to change notification settings - Fork 2
/
execute-ps2bq-suite.sh
69 lines (54 loc) · 1.79 KB
/
execute-ps2bq-suite.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
#!/bin/bash
set -eu
if [ "$#" -ne 2 ] && [ "$#" -ne 3 ]
then
echo "Usage : sh execute-suite-example.sh <gcp project> <run name> <optional params>"
exit -1
fi
MORE_PARAMS=""
if (( $# == 3 ))
then
MORE_PARAMS=$MORE_PARAMS$3
fi
# Beam version var is unset, this will default in the pom.xml definitions
BEAM_VERSION=
# Other manual configurations
PROJECT_ID=$1
TOPIC=$2
REGION=us-central1
BUCKET=$2-staging-$1
echo "creating infrastructure"
pushd infra
# we need to create a ps topic+sub, bq dataset, bq dataset and staging bucket
source ./tf-apply.sh $PROJECT_ID $TOPIC false true true false false
popd
echo "starting data generator"
pushd streaming-data-generator
JOB_NAME=datagen-ps-`echo "$2" | tr _ -`-${USER}
source ./execute-generator.sh $PROJECT_ID $BUCKET " \
--jobName=${JOB_NAME} \
--region=${REGION} \
--outputTopic=projects/${PROJECT_ID}/topics/${TOPIC} \
--className=com.google.cloud.pso.beam.generator.thrift.CompoundEvent \
--generatorRatePerSec=100000 \
--maxRecordsPerBatch=1000 \
--compressionEnabled=true \
--completeObjects=true "$MORE_PARAMS
popd
echo "starting processing pipeline"
pushd canonical-streaming-pipelines
SUBSCRIPTION=$TOPIC-sub
JOB_NAME=ps2bq-`echo "$SUBSCRIPTION" | tr _ -`-${USER}
BQ_TABLE_NAME=`echo "$SUBSCRIPTION" | tr - _`
BQ_DATASET_ID=`echo "${TOPIC}" | tr - _`
source ./execute-ingestion.sh $PROJECT_ID $BUCKET "\
--jobName=${JOB_NAME} \
--region=${REGION} \
--numWorkers=20 \
--thriftClassName=com.google.cloud.pso.beam.generator.thrift.CompoundEvent \
--subscription=projects/${PROJECT_ID}/subscriptions/${SUBSCRIPTION} \
--useStorageApiConnectionPool=true \
--bigQueryWriteMethod=STORAGE_API_AT_LEAST_ONCE \
--outputTable=${PROJECT_ID}:${BQ_DATASET_ID}.stream_${BQ_TABLE_NAME} \
--tableDestinationCount=1 "$MORE_PARAMS
popd