Skip to content

Commit

Permalink
tests(ticdc): add table actor integration test (#4738)
Browse files Browse the repository at this point in the history
ref #3881
  • Loading branch information
sdojjy authored Mar 7, 2022
1 parent 49f12dc commit 643ac18
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 7 deletions.
11 changes: 8 additions & 3 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ func (n *sorterNode) handleRawEvent(ctx context.Context, event *model.Polymorphi
}
atomic.StoreUint64(&n.resolvedTs, rawKV.CRTs)

if resolvedTs > n.barrierTs &&
if resolvedTs > n.BarrierTs() &&
!redo.IsConsistentEnabled(n.replConfig.Consistent.Level) {
// Do not send resolved ts events that is larger than
// barrier ts.
Expand All @@ -278,7 +278,7 @@ func (n *sorterNode) handleRawEvent(ctx context.Context, event *model.Polymorphi
// resolved ts, conflicts to this change.
// TODO: Remove redolog check once redolog decouples for global
// resolved ts.
event = model.NewResolvedPolymorphicEvent(0, n.barrierTs)
event = model.NewResolvedPolymorphicEvent(0, n.BarrierTs())
}
}
n.sorter.AddEntry(ctx, event)
Expand All @@ -299,7 +299,7 @@ func (n *sorterNode) TryHandleDataMessage(ctx context.Context, msg pipeline.Mess
}

func (n *sorterNode) updateBarrierTs(barrierTs model.Ts) {
if barrierTs > atomic.LoadUint64(&n.barrierTs) {
if barrierTs > n.BarrierTs() {
atomic.StoreUint64(&n.barrierTs, barrierTs)
}
}
Expand Down Expand Up @@ -328,3 +328,8 @@ func (n *sorterNode) Destroy(ctx pipeline.NodeContext) error {
func (n *sorterNode) ResolvedTs() model.Ts {
return atomic.LoadUint64(&n.resolvedTs)
}

// BarrierTs returns the sorter barrierTs
func (n *sorterNode) BarrierTs() model.Ts {
return atomic.LoadUint64(&n.barrierTs)
}
6 changes: 3 additions & 3 deletions cdc/processor/pipeline/sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestSorterResolvedTsLessEqualBarrierTs(t *testing.T) {
)
err = sn.Receive(nctx)
require.Nil(t, err)
require.EqualValues(t, 2, sn.barrierTs)
require.EqualValues(t, 2, sn.BarrierTs())
// Barrier message must be passed to the next node.
require.EqualValues(t, pipeline.BarrierMessage(2), <-ch)

Expand Down Expand Up @@ -152,7 +152,7 @@ func TestSorterUpdateBarrierTs(t *testing.T) {
t.Parallel()
s := &sorterNode{barrierTs: 1}
s.updateBarrierTs(model.Ts(2))
require.Equal(t, model.Ts(2), s.barrierTs)
require.Equal(t, model.Ts(2), s.BarrierTs())
s.updateBarrierTs(model.Ts(1))
require.Equal(t, model.Ts(2), s.barrierTs)
require.Equal(t, model.Ts(2), s.BarrierTs())
}
2 changes: 1 addition & 1 deletion tests/integration_tests/_utils/start_tidb_cluster_impl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ random_file_name=
randomGenSocketsConf() {
random_str=$(date '+%s%N')
if [ "$(uname)" == "Darwin" ]; then
random_str=$(cat /dev/random | LC_CTYPE=C tr -dc "a-zA-Z0-9" | head -c 10)
random_str=$(cat /dev/random | LC_ALL=C tr -dc "a-zA-Z0-9" | head -c 10)
fi
random_file_name="$OUT_DIR/tidb-config-$random_str.toml"

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[debug]
enable-table-actor=true
43 changes: 43 additions & 0 deletions tests/integration_tests/bank_table_actor/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/bin/bash

# TODO: remove this integration test once table actor is enabled by default.

set -eu

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1

function prepare() {

rm -rf $WORK_DIR && mkdir -p $WORK_DIR

start_tidb_cluster --workdir $WORK_DIR

cd $WORK_DIR

# create table to upstream.
run_sql "CREATE DATABASE bank" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "CREATE DATABASE bank" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --config $CUR/conf/ticdc_table_actor.toml

run_cdc_cli changefeed create --sink-uri="mysql://root@${DOWN_TIDB_HOST}:${DOWN_TIDB_PORT}/"
}

trap stop_tidb_cluster EXIT
# kafka is not supported yet.
if [ "$SINK_TYPE" != "kafka" ]; then
prepare $*

cd "$(dirname "$0")"
set -euxo pipefail

GO111MODULE=on go run ../bank/bank.go ../bank/case.go -u "root@tcp(${UP_TIDB_HOST}:${UP_TIDB_PORT})/bank" \
-d "root@tcp(${DOWN_TIDB_HOST}:${DOWN_TIDB_PORT})/bank" --test-round=20000

cleanup_process $CDC_BINARY
echo "[$(date)] <<<<<< run test case $TEST_NAME success! >>>>>>"
fi

0 comments on commit 643ac18

Please sign in to comment.