diff --git a/cdc/processor/pipeline/sorter.go b/cdc/processor/pipeline/sorter.go index e237654be78..81d279d80b1 100644 --- a/cdc/processor/pipeline/sorter.go +++ b/cdc/processor/pipeline/sorter.go @@ -267,7 +267,7 @@ func (n *sorterNode) handleRawEvent(ctx context.Context, event *model.Polymorphi } atomic.StoreUint64(&n.resolvedTs, rawKV.CRTs) - if resolvedTs > n.barrierTs && + if resolvedTs > n.BarrierTs() && !redo.IsConsistentEnabled(n.replConfig.Consistent.Level) { // Do not send resolved ts events that is larger than // barrier ts. @@ -278,7 +278,7 @@ func (n *sorterNode) handleRawEvent(ctx context.Context, event *model.Polymorphi // resolved ts, conflicts to this change. // TODO: Remove redolog check once redolog decouples for global // resolved ts. - event = model.NewResolvedPolymorphicEvent(0, n.barrierTs) + event = model.NewResolvedPolymorphicEvent(0, n.BarrierTs()) } } n.sorter.AddEntry(ctx, event) @@ -299,7 +299,7 @@ func (n *sorterNode) TryHandleDataMessage(ctx context.Context, msg pipeline.Mess } func (n *sorterNode) updateBarrierTs(barrierTs model.Ts) { - if barrierTs > atomic.LoadUint64(&n.barrierTs) { + if barrierTs > n.BarrierTs() { atomic.StoreUint64(&n.barrierTs, barrierTs) } } @@ -328,3 +328,8 @@ func (n *sorterNode) Destroy(ctx pipeline.NodeContext) error { func (n *sorterNode) ResolvedTs() model.Ts { return atomic.LoadUint64(&n.resolvedTs) } + +// BarrierTs returns the sorter barrierTs +func (n *sorterNode) BarrierTs() model.Ts { + return atomic.LoadUint64(&n.barrierTs) +} diff --git a/cdc/processor/pipeline/sorter_test.go b/cdc/processor/pipeline/sorter_test.go index 06cc897bc64..6741b0abf6f 100644 --- a/cdc/processor/pipeline/sorter_test.go +++ b/cdc/processor/pipeline/sorter_test.go @@ -120,7 +120,7 @@ func TestSorterResolvedTsLessEqualBarrierTs(t *testing.T) { ) err = sn.Receive(nctx) require.Nil(t, err) - require.EqualValues(t, 2, sn.barrierTs) + require.EqualValues(t, 2, sn.BarrierTs()) // Barrier message must be passed to the next node. require.EqualValues(t, pipeline.BarrierMessage(2), <-ch) @@ -152,7 +152,7 @@ func TestSorterUpdateBarrierTs(t *testing.T) { t.Parallel() s := &sorterNode{barrierTs: 1} s.updateBarrierTs(model.Ts(2)) - require.Equal(t, model.Ts(2), s.barrierTs) + require.Equal(t, model.Ts(2), s.BarrierTs()) s.updateBarrierTs(model.Ts(1)) - require.Equal(t, model.Ts(2), s.barrierTs) + require.Equal(t, model.Ts(2), s.BarrierTs()) } diff --git a/tests/integration_tests/_utils/start_tidb_cluster_impl b/tests/integration_tests/_utils/start_tidb_cluster_impl index e127bfa290b..4339b572faa 100755 --- a/tests/integration_tests/_utils/start_tidb_cluster_impl +++ b/tests/integration_tests/_utils/start_tidb_cluster_impl @@ -17,7 +17,7 @@ random_file_name= randomGenSocketsConf() { random_str=$(date '+%s%N') if [ "$(uname)" == "Darwin" ]; then - random_str=$(cat /dev/random | LC_CTYPE=C tr -dc "a-zA-Z0-9" | head -c 10) + random_str=$(cat /dev/random | LC_ALL=C tr -dc "a-zA-Z0-9" | head -c 10) fi random_file_name="$OUT_DIR/tidb-config-$random_str.toml" diff --git a/tests/integration_tests/bank_table_actor/conf/ticdc_table_actor.toml b/tests/integration_tests/bank_table_actor/conf/ticdc_table_actor.toml new file mode 100644 index 00000000000..4baed7a5c84 --- /dev/null +++ b/tests/integration_tests/bank_table_actor/conf/ticdc_table_actor.toml @@ -0,0 +1,2 @@ +[debug] +enable-table-actor=true diff --git a/tests/integration_tests/bank_table_actor/run.sh b/tests/integration_tests/bank_table_actor/run.sh new file mode 100644 index 00000000000..69458ac29fc --- /dev/null +++ b/tests/integration_tests/bank_table_actor/run.sh @@ -0,0 +1,43 @@ +#!/bin/bash + +# TODO: remove this integration test once table actor is enabled by default. + +set -eu + +CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +source $CUR/../_utils/test_prepare +WORK_DIR=$OUT_DIR/$TEST_NAME +CDC_BINARY=cdc.test +SINK_TYPE=$1 + +function prepare() { + + rm -rf $WORK_DIR && mkdir -p $WORK_DIR + + start_tidb_cluster --workdir $WORK_DIR + + cd $WORK_DIR + + # create table to upstream. + run_sql "CREATE DATABASE bank" ${UP_TIDB_HOST} ${UP_TIDB_PORT} + run_sql "CREATE DATABASE bank" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT} + + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --config $CUR/conf/ticdc_table_actor.toml + + run_cdc_cli changefeed create --sink-uri="mysql://root@${DOWN_TIDB_HOST}:${DOWN_TIDB_PORT}/" +} + +trap stop_tidb_cluster EXIT +# kafka is not supported yet. +if [ "$SINK_TYPE" != "kafka" ]; then + prepare $* + + cd "$(dirname "$0")" + set -euxo pipefail + + GO111MODULE=on go run ../bank/bank.go ../bank/case.go -u "root@tcp(${UP_TIDB_HOST}:${UP_TIDB_PORT})/bank" \ + -d "root@tcp(${DOWN_TIDB_HOST}:${DOWN_TIDB_PORT})/bank" --test-round=20000 + + cleanup_process $CDC_BINARY + echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>" +fi