diff --git a/cdc/cmd/kafka-consumer/main.go b/cdc/cmd/kafka-consumer/main.go index b54a8398..a3c09582 100644 --- a/cdc/cmd/kafka-consumer/main.go +++ b/cdc/cmd/kafka-consumer/main.go @@ -37,7 +37,6 @@ import ( "github.com/tikv/migration/cdc/cdc/sink/codec" "github.com/tikv/migration/cdc/pkg/config" - // cdcfilter "github.com/tikv/migration/cdc/pkg/filter" "github.com/tikv/migration/cdc/pkg/logutil" "github.com/tikv/migration/cdc/pkg/security" "github.com/tikv/migration/cdc/pkg/util" @@ -266,7 +265,9 @@ func main() { } }() + wg.Add(1) go func() { + defer wg.Done() if err := consumer.Run(ctx); err != nil { log.Fatal("Error running consumer: %v", zap.Error(err)) } @@ -294,43 +295,27 @@ type partitionSink struct { sink.Sink resolvedTs atomic.Uint64 partitionNo int - // tablesMap sync.Map - lastCRTs atomic.Uint64 + lastCRTs atomic.Uint64 } // Consumer represents a Sarama consumer group consumer type Consumer struct { ready chan bool - // ddlList []*model.DDLEvent - // maxDDLReceivedTs uint64 - // ddlListMu sync.Mutex - sinks []*partitionSink sinksMu sync.Mutex - // ddlSink sink.Sink - // fakeTableIDGenerator *fakeTableIDGenerator - globalResolvedTs atomic.Uint64 } // NewConsumer creates a new cdc kafka consumer func NewConsumer(ctx context.Context) (*Consumer, error) { - // TODO support filter in downstream sink tz, err := util.GetTimezone(timezone) if err != nil { return nil, errors.Annotate(err, "can not load timezone") } ctx = util.PutTimezoneInCtx(ctx, tz) - // filter, err := cdcfilter.NewFilter(config.GetDefaultReplicaConfig()) - // if err != nil { - // return nil, errors.Trace(err) - // } c := new(Consumer) - // c.fakeTableIDGenerator = &fakeTableIDGenerator{ - // tableIDs: make(map[string]int64), - // } c.sinks = make([]*partitionSink, kafkaPartitionNum) ctx, cancel := context.WithCancel(ctx) errCh := make(chan error, 1) @@ -343,11 +328,6 @@ func NewConsumer(ctx context.Context) (*Consumer, error) { } c.sinks[i] = &partitionSink{Sink: s, partitionNo: i} } - // sink, err := sink.New(ctx, "kafka-consumer", downstreamURIStr, config.GetDefaultReplicaConfig(), opts, errCh) - // if err != nil { - // cancel() - // return nil, errors.Trace(err) - // } go func() { err := <-errCh if errors.Cause(err) != context.Canceled { @@ -357,7 +337,6 @@ func NewConsumer(ctx context.Context) (*Consumer, error) { } cancel() }() - // c.ddlSink = sink c.ready = make(chan bool) return c, nil } @@ -410,12 +389,6 @@ ClaimMessages: } switch tp { - // case model.MqMessageTypeDDL: - // ddl, err := batchDecoder.NextDDLEvent() - // if err != nil { - // log.Fatal("decode message value failed", zap.ByteString("value", message.Value)) - // } - // c.appendDDL(ddl) case model.MqMessageTypeKv: kv, err := batchDecoder.NextChangedEvent() if err != nil { @@ -429,22 +402,13 @@ ClaimMessages: zap.Int32("partition", partition)) break ClaimMessages } - // FIXME: hack to set start-ts in row changed event, as start-ts - // is not contained in TiKV CDC open protocol - // kv.StartTs = kv.CommitTs - // var partitionID int64 - // if kv.Table.IsPartition { - // partitionID = kv.Table.TableID - // } - // kv.Table.TableID = - // c.fakeTableIDGenerator.generateFakeTableID(kv.Table.Schema, kv.Table.Table, partitionID) err = sink.EmitChangedEvents(ctx, kv) if err != nil { log.Fatal("emit row changed event failed", zap.Error(err)) } log.Info("Emit ChangedEvent", zap.Any("kv", kv)) - lastCommitTs := sink.lastCRTs.Load() - if lastCommitTs < kv.CRTs { + lastCRTs := sink.lastCRTs.Load() + if lastCRTs < kv.CRTs { sink.lastCRTs.Store(kv.CRTs) } case model.MqMessageTypeResolved: @@ -472,41 +436,6 @@ ClaimMessages: return nil } -// func (c *Consumer) appendDDL(ddl *model.DDLEvent) { -// c.ddlListMu.Lock() -// defer c.ddlListMu.Unlock() -// if ddl.CommitTs <= c.maxDDLReceivedTs { -// return -// } -// globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs) -// if ddl.CommitTs <= globalResolvedTs { -// log.Error("unexpected ddl job", zap.Uint64("ddlts", ddl.CommitTs), zap.Uint64("globalResolvedTs", globalResolvedTs)) -// return -// } -// c.ddlList = append(c.ddlList, ddl) -// c.maxDDLReceivedTs = ddl.CommitTs -// } - -// func (c *Consumer) getFrontDDL() *model.DDLEvent { -// c.ddlListMu.Lock() -// defer c.ddlListMu.Unlock() -// if len(c.ddlList) > 0 { -// return c.ddlList[0] -// } -// return nil -// } - -// func (c *Consumer) popDDL() *model.DDLEvent { -// c.ddlListMu.Lock() -// defer c.ddlListMu.Unlock() -// if len(c.ddlList) > 0 { -// ddl := c.ddlList[0] -// c.ddlList = c.ddlList[1:] -// return ddl -// } -// return nil -// } - func (c *Consumer) forEachSink(fn func(sink *partitionSink) error) error { c.sinksMu.Lock() defer c.sinksMu.Unlock() @@ -528,7 +457,7 @@ func (c *Consumer) Run(ctx context.Context) error { default: } time.Sleep(100 * time.Millisecond) - // handle ddl + globalResolvedTs := uint64(math.MaxUint64) err := c.forEachSink(func(sink *partitionSink) error { resolvedTs := sink.resolvedTs.Load() @@ -540,28 +469,7 @@ func (c *Consumer) Run(ctx context.Context) error { if err != nil { return errors.Trace(err) } - // todoDDL := c.getFrontDDL() - // if todoDDL != nil && globalResolvedTs >= todoDDL.CommitTs { - // // flush DMLs - // err := c.forEachSink(func(sink *partitionSink) error { - // return syncFlushRowChangedEvents(ctx, sink, todoDDL.CommitTs) - // }) - // if err != nil { - // return errors.Trace(err) - // } - - // // execute ddl - // err = c.ddlSink.EmitDDLEvent(ctx, todoDDL) - // if err != nil { - // return errors.Trace(err) - // } - // c.popDDL() - // continue - // } - - // if todoDDL != nil && todoDDL.CommitTs < globalResolvedTs { - // globalResolvedTs = todoDDL.CommitTs - // } + if lastGlobalResolvedTs == globalResolvedTs { continue } @@ -570,7 +478,7 @@ func (c *Consumer) Run(ctx context.Context) error { log.Info("update globalResolvedTs", zap.Uint64("ts", globalResolvedTs)) err = c.forEachSink(func(sink *partitionSink) error { - return syncFlushRowChangedEvents(ctx, sink, globalResolvedTs) + return syncFlushChangedEvents(ctx, sink, globalResolvedTs) }) if err != nil { return errors.Trace(err) @@ -578,19 +486,14 @@ func (c *Consumer) Run(ctx context.Context) error { } } -func syncFlushRowChangedEvents(ctx context.Context, sink *partitionSink, resolvedTs uint64) error { +func syncFlushChangedEvents(ctx context.Context, sink *partitionSink, resolvedTs uint64) error { for { select { case <-ctx.Done(): return ctx.Err() default: } - // tables are flushed - // var ( - // err error - // checkpointTs uint64 - // ) - // resolvedTs := sink.lastCRTs.Load() + keyspanID := model.KeySpanID(0) checkpointTs, err := sink.FlushChangedEvents(ctx, keyspanID, resolvedTs) if err != nil { @@ -603,24 +506,3 @@ func syncFlushRowChangedEvents(ctx context.Context, sink *partitionSink, resolve return nil } } - -// type fakeTableIDGenerator struct { -// tableIDs map[string]int64 -// currentTableID int64 -// mu sync.Mutex -// } - -// func (g *fakeTableIDGenerator) generateFakeTableID(schema, table string, partition int64) int64 { -// g.mu.Lock() -// defer g.mu.Unlock() -// key := quotes.QuoteSchema(schema, table) -// if partition != 0 { -// key = fmt.Sprintf("%s.`%d`", key, partition) -// } -// if tableID, ok := g.tableIDs[key]; ok { -// return tableID -// } -// g.currentTableID++ -// g.tableIDs[key] = g.currentTableID -// return g.currentTableID -// } diff --git a/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml b/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml index 28cb70a5..fc082d99 100644 --- a/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml +++ b/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml @@ -43,4 +43,4 @@ services: - tests/integration_tests/run.sh kafka "${CASE}" & tail -f /dev/null network_mode: "service:kafka" volumes: - - ./logs/tikv_cdc_test:/tmp/tikv_cdc_test + - /tmp/tikv_cdc_test_pingyu/:/tmp/tikv_cdc_test diff --git a/cdc/deployments/tikv-cdc/docker/integration-test.Dockerfile b/cdc/deployments/tikv-cdc/docker/integration-test.Dockerfile index 78933de2..5ed0976d 100644 --- a/cdc/deployments/tikv-cdc/docker/integration-test.Dockerfile +++ b/cdc/deployments/tikv-cdc/docker/integration-test.Dockerfile @@ -45,7 +45,7 @@ RUN yum install -y \ RUN wget http://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm RUN yum install -y epel-release-latest-7.noarch.rpm -RUN yum --enablerepo=epel install -y s3cmd +RUN yum --enablerepo=epel install -y s3cmd jq # Copy go form downloader. COPY --from=downloader /usr/local/go /usr/local/go diff --git a/cdc/tests/integration_tests/_utils/test_prepare b/cdc/tests/integration_tests/_utils/test_prepare index bfa2288a..4541b20d 100644 --- a/cdc/tests/integration_tests/_utils/test_prepare +++ b/cdc/tests/integration_tests/_utils/test_prepare @@ -1,3 +1,5 @@ +#!/bin/bash + UP_TIDB_HOST=${UP_TIDB_HOST:-127.0.0.1} UP_TIDB_PORT=${UP_TIDB_PORT:-4000} UP_TIDB_OTHER_PORT=${UP_TIDB_OTHER_PORT:-4001} @@ -52,3 +54,11 @@ DOWN_TLS_TIKV_PORT=${DOWN_TLS_TIKV_PORT:-23160} DOWN_TLS_TIKV_STATUS_PORT=${DOWN_TLS_TIKV_STATUS_PORT:-23180} KAFKA_VERSION=$(cat /tmp/tikv_cdc_test/KAFKA_VERSION 2>/dev/null || echo 2.4.1) + +# get sink uri of kafka +# usage: get_kafka_sink_uri +function get_kafka_sink_uri() { + TEST_NAME="$1" + TOPIC_NAME="tikvcdc-$TEST_NAME-test-$RANDOM" + echo "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=$KAFKA_VERSION&max-message-bytes=10485760" +} diff --git a/cdc/tests/integration_tests/autorandom/run.sh b/cdc/tests/integration_tests/autorandom/run.sh index 663ebc10..a4c39c3e 100644 --- a/cdc/tests/integration_tests/autorandom/run.sh +++ b/cdc/tests/integration_tests/autorandom/run.sh @@ -18,16 +18,15 @@ function run() { start_ts=$(get_start_ts $UP_PD) run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - TOPIC_NAME="tikvcdc-autorandom-test-$RANDOM" case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; - kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac tikv-cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" if [ "$SINK_TYPE" == "kafka" ]; then - run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760" + run_kafka_consumer $WORK_DIR "$SINK_URI" fi rawkv_op $UP_PD put 5000 diff --git a/cdc/tests/integration_tests/availability/run.sh b/cdc/tests/integration_tests/availability/run.sh index 7601c3a2..052295bb 100644 --- a/cdc/tests/integration_tests/availability/run.sh +++ b/cdc/tests/integration_tests/availability/run.sh @@ -9,6 +9,8 @@ source $CUR/capture.sh source $CUR/processor.sh WORK_DIR=$OUT_DIR/$TEST_NAME CDC_BINARY=tikv-cdc +SINK_TYPE=$1 +UP_PD=http://$UP_PD_HOST_1:$UP_PD_PORT_1 export DOWN_TIDB_HOST export DOWN_TIDB_PORT @@ -21,9 +23,19 @@ function prepare() { cd $WORK_DIR start_ts=$(get_start_ts $UP_PD) + + case $SINK_TYPE in + tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; + *) SINK_URI="" ;; + esac + run_cdc_cli changefeed create \ - --start-ts=$start_ts --sink-uri="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" \ + --start-ts=$start_ts --sink-uri="$SINK_URI" \ --disable-version-check + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi } trap stop_tidb_cluster EXIT diff --git a/cdc/tests/integration_tests/capture_session_done_during_task/run.sh b/cdc/tests/integration_tests/capture_session_done_during_task/run.sh index 93d3016c..02a65672 100644 --- a/cdc/tests/integration_tests/capture_session_done_during_task/run.sh +++ b/cdc/tests/integration_tests/capture_session_done_during_task/run.sh @@ -19,6 +19,7 @@ function run() { case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac @@ -28,6 +29,9 @@ function run() { export GO_FAILPOINTS='github.com/tikv/migration/cdc/cdc/processor/processorManagerHandleNewChangefeedDelay=sleep(2000)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8600" --pd $UP_PD changefeed_id=$(tikv-cdc cli changefeed create --pd=$UP_PD --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi # wait task is dispatched sleep 1 diff --git a/cdc/tests/integration_tests/changefeed_auto_stop/run.sh b/cdc/tests/integration_tests/changefeed_auto_stop/run.sh index 0f36a9c0..4333201c 100755 --- a/cdc/tests/integration_tests/changefeed_auto_stop/run.sh +++ b/cdc/tests/integration_tests/changefeed_auto_stop/run.sh @@ -44,10 +44,15 @@ function run() { case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac changefeedid=$(tikv-cdc cli changefeed create --pd="http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi + # make sure the first capture does job first. sleep 3 export GO_FAILPOINTS='' diff --git a/cdc/tests/integration_tests/changefeed_error/run.sh b/cdc/tests/integration_tests/changefeed_error/run.sh index 78a7dc63..b9851d5f 100755 --- a/cdc/tests/integration_tests/changefeed_error/run.sh +++ b/cdc/tests/integration_tests/changefeed_error/run.sh @@ -114,11 +114,15 @@ function run() { case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac changefeedid="changefeed-error" run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi ensure $MAX_RETRIES check_changefeed_mark_failed_regex $UP_PD ${changefeedid} ".*CDC:ErrStartTsBeforeGC.*" run_cdc_cli changefeed resume -c $changefeedid diff --git a/cdc/tests/integration_tests/changefeed_fast_fail/run.sh b/cdc/tests/integration_tests/changefeed_fast_fail/run.sh index 0d68ae01..e4010d96 100644 --- a/cdc/tests/integration_tests/changefeed_fast_fail/run.sh +++ b/cdc/tests/integration_tests/changefeed_fast_fail/run.sh @@ -45,11 +45,15 @@ function run() { case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac changefeedid="changefeed-fast-fail" run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi ensure $MAX_RETRIES check_changefeed_mark_failed_regex http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "ErrGCTTLExceeded" run_cdc_cli changefeed remove -c $changefeedid diff --git a/cdc/tests/integration_tests/changefeed_finish/run.sh b/cdc/tests/integration_tests/changefeed_finish/run.sh index 5847412f..b44d636e 100755 --- a/cdc/tests/integration_tests/changefeed_finish/run.sh +++ b/cdc/tests/integration_tests/changefeed_finish/run.sh @@ -38,6 +38,7 @@ function run() { case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac @@ -46,6 +47,9 @@ function run() { # 90s after start_ts target_ts=$(($start_ts + 90 * 10 ** 3 * 2 ** 18)) changefeed_id=$(tikv-cdc cli changefeed create --sink-uri="$SINK_URI" --start-ts=$start_ts --target-ts=$target_ts 2>&1 | tail -n2 | head -n1 | awk '{print $2}') + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi rawkv_op $UP_PD put 5000 check_sync_diff $WORK_DIR $UP_PD $DOWN_PD diff --git a/cdc/tests/integration_tests/changefeed_pause_resume/run.sh b/cdc/tests/integration_tests/changefeed_pause_resume/run.sh index 5f109f93..4d5329dc 100755 --- a/cdc/tests/integration_tests/changefeed_pause_resume/run.sh +++ b/cdc/tests/integration_tests/changefeed_pause_resume/run.sh @@ -17,12 +17,16 @@ function run() { case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8600" --pd $UP_PD start_ts=$(get_start_ts $UP_PD) changefeed_id=$(tikv-cdc cli changefeed create --pd=$UP_PD --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi for i in $(seq 1 10); do tikv-cdc cli changefeed pause --changefeed-id=$changefeed_id --pd=$UP_PD diff --git a/cdc/tests/integration_tests/changefeed_reconstruct/run.sh b/cdc/tests/integration_tests/changefeed_reconstruct/run.sh index 093f062d..807e4c37 100755 --- a/cdc/tests/integration_tests/changefeed_reconstruct/run.sh +++ b/cdc/tests/integration_tests/changefeed_reconstruct/run.sh @@ -28,6 +28,7 @@ function run() { case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac @@ -35,6 +36,9 @@ function run() { owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}') start_ts=$(get_start_ts $UP_PD) changefeed_id=$(tikv-cdc cli changefeed create --pd=$UP_PD --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi rawkv_op $UP_PD put 5000 diff --git a/cdc/tests/integration_tests/cli/run.sh b/cdc/tests/integration_tests/cli/run.sh index fb550d73..79969f87 100644 --- a/cdc/tests/integration_tests/cli/run.sh +++ b/cdc/tests/integration_tests/cli/run.sh @@ -48,11 +48,15 @@ function run() { case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac uuid="custom-changefeed-name" run_cdc_cli changefeed create --start-ts=$start_ts --sort-engine=memory --sink-uri="$SINK_URI" --tz="Asia/Shanghai" -c="$uuid" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi # Make sure changefeed is created. check_sync_diff $WORK_DIR $UP_PD $DOWN_PD diff --git a/cdc/tests/integration_tests/disk_full/run.sh b/cdc/tests/integration_tests/disk_full/run.sh index 99ec73c9..b5bc61a9 100644 --- a/cdc/tests/integration_tests/disk_full/run.sh +++ b/cdc/tests/integration_tests/disk_full/run.sh @@ -27,10 +27,14 @@ EOF case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac tikv-cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id=$CF_ID + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi rawkv_op $UP_PD put 5000 diff --git a/cdc/tests/integration_tests/flow_control/run.sh b/cdc/tests/integration_tests/flow_control/run.sh index 97dcd05a..a46eb041 100644 --- a/cdc/tests/integration_tests/flow_control/run.sh +++ b/cdc/tests/integration_tests/flow_control/run.sh @@ -37,10 +37,15 @@ EOF case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac tikv-cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi + # Wait until cdc pulls the data from tikv and store it in soter sleep 90 diff --git a/cdc/tests/integration_tests/gc_safepoint/run.sh b/cdc/tests/integration_tests/gc_safepoint/run.sh index 08f439ad..596988c5 100755 --- a/cdc/tests/integration_tests/gc_safepoint/run.sh +++ b/cdc/tests/integration_tests/gc_safepoint/run.sh @@ -86,6 +86,7 @@ function run() { case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac @@ -93,6 +94,9 @@ function run() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8600" --pd $pd_addr start_ts=$(get_start_ts $UP_PD) changefeed_id=$(tikv-cdc cli changefeed create --pd=$pd_addr --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi pd_cluster_id=$(curl -s $pd_addr/pd/api/v1/cluster | grep -oE "id\":\s[0-9]+" | grep -oE "[0-9]+") clear_gc_worker_safepoint $pd_addr $pd_cluster_id diff --git a/cdc/tests/integration_tests/http_api/run.sh b/cdc/tests/integration_tests/http_api/run.sh index 5024809c..112b07cc 100644 --- a/cdc/tests/integration_tests/http_api/run.sh +++ b/cdc/tests/integration_tests/http_api/run.sh @@ -56,6 +56,7 @@ function run() { case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac @@ -63,6 +64,9 @@ function run() { python3 $CUR/util/test_case.py get_status $TLS_DIR python3 $CUR/util/test_case.py create_changefeed $TLS_DIR "$SINK_URI" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi # wait for changefeed created sleep 2 diff --git a/cdc/tests/integration_tests/kill_owner/run.sh b/cdc/tests/integration_tests/kill_owner/run.sh index fa2fdc53..1955cdd6 100755 --- a/cdc/tests/integration_tests/kill_owner/run.sh +++ b/cdc/tests/integration_tests/kill_owner/run.sh @@ -36,12 +36,16 @@ function run() { case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8600" --pd $UP_PD start_ts=$(get_start_ts $UP_PD) tikv-cdc cli changefeed create --pd=$UP_PD --start-ts=$start_ts --sink-uri="$SINK_URI" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi export GO_FAILPOINTS='github.com/tikv/migration/cdc/cdc/capture/ownerFlushIntervalInject=return(10)' kill_cdc_and_restart $UP_PD $WORK_DIR $CDC_BINARY diff --git a/cdc/tests/integration_tests/kv_client_stream_reconnect/run.sh b/cdc/tests/integration_tests/kv_client_stream_reconnect/run.sh index e57822ce..63b9901a 100644 --- a/cdc/tests/integration_tests/kv_client_stream_reconnect/run.sh +++ b/cdc/tests/integration_tests/kv_client_stream_reconnect/run.sh @@ -20,6 +20,7 @@ function run() { pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac @@ -28,6 +29,9 @@ function run() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8600" --pd $pd_addr start_ts=$(get_start_ts $UP_PD) changefeed_id=$(tikv-cdc cli changefeed create --pd=$pd_addr --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi rawkv_op $UP_PD put 5000 check_sync_diff $WORK_DIR $UP_PD $DOWN_PD diff --git a/cdc/tests/integration_tests/kv_filter/run.sh b/cdc/tests/integration_tests/kv_filter/run.sh index c5404e1b..9176e6ed 100644 --- a/cdc/tests/integration_tests/kv_filter/run.sh +++ b/cdc/tests/integration_tests/kv_filter/run.sh @@ -20,6 +20,7 @@ function run() { case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac @@ -29,6 +30,9 @@ function run() { --sink-uri="$SINK_URI" \ --changefeed-id="$uuid" \ --config $CUR/conf/changefeed.toml + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi rawkv_op $UP_PD put 5000 diff --git a/cdc/tests/integration_tests/multi_capture/run.sh b/cdc/tests/integration_tests/multi_capture/run.sh index 62bc0e90..eee14c54 100755 --- a/cdc/tests/integration_tests/multi_capture/run.sh +++ b/cdc/tests/integration_tests/multi_capture/run.sh @@ -35,12 +35,16 @@ function run() { case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --format="raw" --start-key="$Start_Key" --end-key="$SplitKey1" run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --format="raw" --start-key="$SplitKey1" --end-key="$SplitKey2" run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --format="raw" --start-key="$SplitKey2" --end-key="$End_Key" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi check_sync_diff $WORK_DIR $UP_PD $DOWN_PD rawkv_op $UP_PD delete 5000 diff --git a/cdc/tests/integration_tests/processor_err_chan/run.sh b/cdc/tests/integration_tests/processor_err_chan/run.sh index af6b169c..21ed0b6f 100644 --- a/cdc/tests/integration_tests/processor_err_chan/run.sh +++ b/cdc/tests/integration_tests/processor_err_chan/run.sh @@ -39,6 +39,7 @@ function run() { pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac @@ -47,6 +48,9 @@ function run() { start_ts=$(get_start_ts $UP_PD) changefeed_id=$(tikv-cdc cli changefeed create --pd=$pd_addr --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi retry_time=10 ensure $retry_time check_changefeed_mark_normal $pd_addr $changefeed_id "null" diff --git a/cdc/tests/integration_tests/processor_panic/run.sh b/cdc/tests/integration_tests/processor_panic/run.sh index f34d7d4a..0f84bf53 100644 --- a/cdc/tests/integration_tests/processor_panic/run.sh +++ b/cdc/tests/integration_tests/processor_panic/run.sh @@ -22,11 +22,15 @@ function run() { case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac start_ts=$(get_start_ts $UP_PD) # Make sure the task is assigned to the first cdc run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix 2 --addr 127.0.0.1:8601 diff --git a/cdc/tests/integration_tests/processor_resolved_ts_fallback/run.sh b/cdc/tests/integration_tests/processor_resolved_ts_fallback/run.sh index d81c28ba..fdbc817c 100755 --- a/cdc/tests/integration_tests/processor_resolved_ts_fallback/run.sh +++ b/cdc/tests/integration_tests/processor_resolved_ts_fallback/run.sh @@ -20,11 +20,15 @@ function run() { case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac start_ts=$(get_start_ts $UP_PD) run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi ensure 10 "tikv-cdc cli processor list|jq '.|length'|grep -E '^1$'" export GO_FAILPOINTS='' diff --git a/cdc/tests/integration_tests/processor_stop_delay/run.sh b/cdc/tests/integration_tests/processor_stop_delay/run.sh index 99dd2630..85a17cff 100644 --- a/cdc/tests/integration_tests/processor_stop_delay/run.sh +++ b/cdc/tests/integration_tests/processor_stop_delay/run.sh @@ -18,6 +18,7 @@ function run() { pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac @@ -26,6 +27,9 @@ function run() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8600" --pd $pd_addr start_ts=$(get_start_ts $UP_PD) changefeed_id=$(tikv-cdc cli changefeed create --pd=$pd_addr --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi rawkv_op $UP_PD put 5000 check_sync_diff $WORK_DIR $UP_PD $DOWN_PD diff --git a/cdc/tests/integration_tests/sigstop/run.sh b/cdc/tests/integration_tests/sigstop/run.sh index acb703ba..ee9e237c 100644 --- a/cdc/tests/integration_tests/sigstop/run.sh +++ b/cdc/tests/integration_tests/sigstop/run.sh @@ -22,10 +22,14 @@ function run_kill_upstream() { case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac tikv-cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi rawkv_op $UP_PD put 10000 & sleep 1 @@ -74,10 +78,14 @@ function run_kill_downstream() { case $SINK_TYPE in tikv) SINK_URI="tikv://${UP_PD_HOST_1}:${UP_PD_PORT_1}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac tikv-cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --pd $DOWN_PD + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi rawkv_op $DOWN_PD put 10000 & sleep 1 diff --git a/cdc/tests/integration_tests/sink_hang/run.sh b/cdc/tests/integration_tests/sink_hang/run.sh index 2f8117fe..24068b89 100644 --- a/cdc/tests/integration_tests/sink_hang/run.sh +++ b/cdc/tests/integration_tests/sink_hang/run.sh @@ -34,6 +34,7 @@ function run() { pd_addr="http://$UP_PD_HOST_1:$UP_PD_PORT_1" case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac @@ -41,6 +42,9 @@ function run() { run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8600" --pd $pd_addr start_ts=$(get_start_ts $UP_PD) changefeed_id=$(tikv-cdc cli changefeed create --pd=$pd_addr --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}') + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi rawkv_op $UP_PD put 5000 diff --git a/cdc/tests/integration_tests/sorter/run.sh b/cdc/tests/integration_tests/sorter/run.sh index 79cbb3a5..592187e1 100755 --- a/cdc/tests/integration_tests/sorter/run.sh +++ b/cdc/tests/integration_tests/sorter/run.sh @@ -27,9 +27,13 @@ function run() { case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac run_cdc_cli changefeed create -c $CF_NAME --start-ts=$start_ts --sink-uri="$SINK_URI" --sort-engine="unified" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi check_sync_diff $WORK_DIR $UP_PD $DOWN_PD rawkv_op $UP_PD delete 5000 @@ -47,10 +51,14 @@ function run() { case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac run_cdc_cli changefeed create -c $CF_NAME --start-ts=$start_ts --sink-uri="$SINK_URI" --sort-engine="memory" + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi check_sync_diff $WORK_DIR $UP_PD $DOWN_PD rawkv_op $UP_PD delete 5000 diff --git a/cdc/tests/integration_tests/stop_downstream/run.sh b/cdc/tests/integration_tests/stop_downstream/run.sh index 52328a3c..6670eeea 100644 --- a/cdc/tests/integration_tests/stop_downstream/run.sh +++ b/cdc/tests/integration_tests/stop_downstream/run.sh @@ -21,10 +21,14 @@ function run() { case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;; + kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;; *) SINK_URI="" ;; esac tikv-cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id=$CF_ID + if [ "$SINK_TYPE" == "kafka" ]; then + run_kafka_consumer $WORK_DIR "$SINK_URI" + fi rawkv_op $UP_PD put 5000 diff --git a/cdc/tests/integration_tests/tls/run.sh b/cdc/tests/integration_tests/tls/run.sh index 11ca7d2c..60e044a5 100644 --- a/cdc/tests/integration_tests/tls/run.sh +++ b/cdc/tests/integration_tests/tls/run.sh @@ -61,6 +61,11 @@ function run() { case $SINK_TYPE in tikv) SINK_URI="tikv://${DOWN_TLS_PD_HOST}:${DOWN_TLS_PD_PORT}/?ca-path=$TLS_DIR/ca.pem&cert-path=$TLS_DIR/client.pem&key-path=$TLS_DIR/client-key.pem" ;; + kafka) + # TODO: support TLS + echo "Kafka not support TLS yet. Skip" + return 0 + ;; *) SINK_URI="" ;; esac