Skip to content

Commit

Permalink
kafka sink in run.sh
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu committed Jan 2, 2024
1 parent 0177038 commit 32ce78f
Show file tree
Hide file tree
Showing 31 changed files with 148 additions and 134 deletions.
138 changes: 10 additions & 128 deletions cdc/cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/tikv/migration/cdc/cdc/sink/codec"
"github.com/tikv/migration/cdc/pkg/config"

// cdcfilter "github.com/tikv/migration/cdc/pkg/filter"
"github.com/tikv/migration/cdc/pkg/logutil"
"github.com/tikv/migration/cdc/pkg/security"
"github.com/tikv/migration/cdc/pkg/util"
Expand Down Expand Up @@ -266,7 +265,9 @@ func main() {
}
}()

wg.Add(1)
go func() {
defer wg.Done()
if err := consumer.Run(ctx); err != nil {
log.Fatal("Error running consumer: %v", zap.Error(err))
}
Expand Down Expand Up @@ -294,43 +295,27 @@ type partitionSink struct {
sink.Sink
resolvedTs atomic.Uint64
partitionNo int
// tablesMap sync.Map
lastCRTs atomic.Uint64
lastCRTs atomic.Uint64
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
ready chan bool

// ddlList []*model.DDLEvent
// maxDDLReceivedTs uint64
// ddlListMu sync.Mutex

sinks []*partitionSink
sinksMu sync.Mutex

// ddlSink sink.Sink
// fakeTableIDGenerator *fakeTableIDGenerator

globalResolvedTs atomic.Uint64
}

// NewConsumer creates a new cdc kafka consumer
func NewConsumer(ctx context.Context) (*Consumer, error) {
// TODO support filter in downstream sink
tz, err := util.GetTimezone(timezone)
if err != nil {
return nil, errors.Annotate(err, "can not load timezone")
}
ctx = util.PutTimezoneInCtx(ctx, tz)
// filter, err := cdcfilter.NewFilter(config.GetDefaultReplicaConfig())
// if err != nil {
// return nil, errors.Trace(err)
// }
c := new(Consumer)
// c.fakeTableIDGenerator = &fakeTableIDGenerator{
// tableIDs: make(map[string]int64),
// }
c.sinks = make([]*partitionSink, kafkaPartitionNum)
ctx, cancel := context.WithCancel(ctx)
errCh := make(chan error, 1)
Expand All @@ -343,11 +328,6 @@ func NewConsumer(ctx context.Context) (*Consumer, error) {
}
c.sinks[i] = &partitionSink{Sink: s, partitionNo: i}
}
// sink, err := sink.New(ctx, "kafka-consumer", downstreamURIStr, config.GetDefaultReplicaConfig(), opts, errCh)
// if err != nil {
// cancel()
// return nil, errors.Trace(err)
// }
go func() {
err := <-errCh
if errors.Cause(err) != context.Canceled {
Expand All @@ -357,7 +337,6 @@ func NewConsumer(ctx context.Context) (*Consumer, error) {
}
cancel()
}()
// c.ddlSink = sink
c.ready = make(chan bool)
return c, nil
}
Expand Down Expand Up @@ -410,12 +389,6 @@ ClaimMessages:
}

switch tp {
// case model.MqMessageTypeDDL:
// ddl, err := batchDecoder.NextDDLEvent()
// if err != nil {
// log.Fatal("decode message value failed", zap.ByteString("value", message.Value))
// }
// c.appendDDL(ddl)
case model.MqMessageTypeKv:
kv, err := batchDecoder.NextChangedEvent()
if err != nil {
Expand All @@ -429,22 +402,13 @@ ClaimMessages:
zap.Int32("partition", partition))
break ClaimMessages
}
// FIXME: hack to set start-ts in row changed event, as start-ts
// is not contained in TiKV CDC open protocol
// kv.StartTs = kv.CommitTs
// var partitionID int64
// if kv.Table.IsPartition {
// partitionID = kv.Table.TableID
// }
// kv.Table.TableID =
// c.fakeTableIDGenerator.generateFakeTableID(kv.Table.Schema, kv.Table.Table, partitionID)
err = sink.EmitChangedEvents(ctx, kv)
if err != nil {
log.Fatal("emit row changed event failed", zap.Error(err))
}
log.Info("Emit ChangedEvent", zap.Any("kv", kv))
lastCommitTs := sink.lastCRTs.Load()
if lastCommitTs < kv.CRTs {
lastCRTs := sink.lastCRTs.Load()
if lastCRTs < kv.CRTs {
sink.lastCRTs.Store(kv.CRTs)
}
case model.MqMessageTypeResolved:
Expand Down Expand Up @@ -472,41 +436,6 @@ ClaimMessages:
return nil
}

// func (c *Consumer) appendDDL(ddl *model.DDLEvent) {
// c.ddlListMu.Lock()
// defer c.ddlListMu.Unlock()
// if ddl.CommitTs <= c.maxDDLReceivedTs {
// return
// }
// globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs)
// if ddl.CommitTs <= globalResolvedTs {
// log.Error("unexpected ddl job", zap.Uint64("ddlts", ddl.CommitTs), zap.Uint64("globalResolvedTs", globalResolvedTs))
// return
// }
// c.ddlList = append(c.ddlList, ddl)
// c.maxDDLReceivedTs = ddl.CommitTs
// }

// func (c *Consumer) getFrontDDL() *model.DDLEvent {
// c.ddlListMu.Lock()
// defer c.ddlListMu.Unlock()
// if len(c.ddlList) > 0 {
// return c.ddlList[0]
// }
// return nil
// }

// func (c *Consumer) popDDL() *model.DDLEvent {
// c.ddlListMu.Lock()
// defer c.ddlListMu.Unlock()
// if len(c.ddlList) > 0 {
// ddl := c.ddlList[0]
// c.ddlList = c.ddlList[1:]
// return ddl
// }
// return nil
// }

func (c *Consumer) forEachSink(fn func(sink *partitionSink) error) error {
c.sinksMu.Lock()
defer c.sinksMu.Unlock()
Expand All @@ -528,7 +457,7 @@ func (c *Consumer) Run(ctx context.Context) error {
default:
}
time.Sleep(100 * time.Millisecond)
// handle ddl

globalResolvedTs := uint64(math.MaxUint64)
err := c.forEachSink(func(sink *partitionSink) error {
resolvedTs := sink.resolvedTs.Load()
Expand All @@ -540,28 +469,7 @@ func (c *Consumer) Run(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
// todoDDL := c.getFrontDDL()
// if todoDDL != nil && globalResolvedTs >= todoDDL.CommitTs {
// // flush DMLs
// err := c.forEachSink(func(sink *partitionSink) error {
// return syncFlushRowChangedEvents(ctx, sink, todoDDL.CommitTs)
// })
// if err != nil {
// return errors.Trace(err)
// }

// // execute ddl
// err = c.ddlSink.EmitDDLEvent(ctx, todoDDL)
// if err != nil {
// return errors.Trace(err)
// }
// c.popDDL()
// continue
// }

// if todoDDL != nil && todoDDL.CommitTs < globalResolvedTs {
// globalResolvedTs = todoDDL.CommitTs
// }

if lastGlobalResolvedTs == globalResolvedTs {
continue
}
Expand All @@ -570,27 +478,22 @@ func (c *Consumer) Run(ctx context.Context) error {
log.Info("update globalResolvedTs", zap.Uint64("ts", globalResolvedTs))

err = c.forEachSink(func(sink *partitionSink) error {
return syncFlushRowChangedEvents(ctx, sink, globalResolvedTs)
return syncFlushChangedEvents(ctx, sink, globalResolvedTs)
})
if err != nil {
return errors.Trace(err)
}
}
}

func syncFlushRowChangedEvents(ctx context.Context, sink *partitionSink, resolvedTs uint64) error {
func syncFlushChangedEvents(ctx context.Context, sink *partitionSink, resolvedTs uint64) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// tables are flushed
// var (
// err error
// checkpointTs uint64
// )
// resolvedTs := sink.lastCRTs.Load()

keyspanID := model.KeySpanID(0)
checkpointTs, err := sink.FlushChangedEvents(ctx, keyspanID, resolvedTs)
if err != nil {
Expand All @@ -603,24 +506,3 @@ func syncFlushRowChangedEvents(ctx context.Context, sink *partitionSink, resolve
return nil
}
}

// type fakeTableIDGenerator struct {
// tableIDs map[string]int64
// currentTableID int64
// mu sync.Mutex
// }

// func (g *fakeTableIDGenerator) generateFakeTableID(schema, table string, partition int64) int64 {
// g.mu.Lock()
// defer g.mu.Unlock()
// key := quotes.QuoteSchema(schema, table)
// if partition != 0 {
// key = fmt.Sprintf("%s.`%d`", key, partition)
// }
// if tableID, ok := g.tableIDs[key]; ok {
// return tableID
// }
// g.currentTableID++
// g.tableIDs[key] = g.currentTableID
// return g.currentTableID
// }
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ services:
- tests/integration_tests/run.sh kafka "${CASE}" & tail -f /dev/null
network_mode: "service:kafka"
volumes:
- ./logs/tikv_cdc_test:/tmp/tikv_cdc_test
- /tmp/tikv_cdc_test_pingyu/:/tmp/tikv_cdc_test
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ RUN yum install -y \

RUN wget http://dl.fedoraproject.org/pub/epel/epel-release-latest-7.noarch.rpm
RUN yum install -y epel-release-latest-7.noarch.rpm
RUN yum --enablerepo=epel install -y s3cmd
RUN yum --enablerepo=epel install -y s3cmd jq

# Copy go form downloader.
COPY --from=downloader /usr/local/go /usr/local/go
Expand Down
10 changes: 10 additions & 0 deletions cdc/tests/integration_tests/_utils/test_prepare
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/bin/bash

UP_TIDB_HOST=${UP_TIDB_HOST:-127.0.0.1}
UP_TIDB_PORT=${UP_TIDB_PORT:-4000}
UP_TIDB_OTHER_PORT=${UP_TIDB_OTHER_PORT:-4001}
Expand Down Expand Up @@ -52,3 +54,11 @@ DOWN_TLS_TIKV_PORT=${DOWN_TLS_TIKV_PORT:-23160}
DOWN_TLS_TIKV_STATUS_PORT=${DOWN_TLS_TIKV_STATUS_PORT:-23180}

KAFKA_VERSION=$(cat /tmp/tikv_cdc_test/KAFKA_VERSION 2>/dev/null || echo 2.4.1)

# get sink uri of kafka
# usage: get_kafka_sink_uri <test_name>
function get_kafka_sink_uri() {
TEST_NAME="$1"
TOPIC_NAME="tikvcdc-$TEST_NAME-test-$RANDOM"
echo "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=$KAFKA_VERSION&max-message-bytes=10485760"
}
5 changes: 2 additions & 3 deletions cdc/tests/integration_tests/autorandom/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,15 @@ function run() {
start_ts=$(get_start_ts $UP_PD)
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

TOPIC_NAME="tikvcdc-autorandom-test-$RANDOM"
case $SINK_TYPE in
tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;;
kafka) SINK_URI="kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&kafka-version=${KAFKA_VERSION}&max-message-bytes=10485760" ;;
kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;;
*) SINK_URI="" ;;
esac

tikv-cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI"
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
run_kafka_consumer $WORK_DIR "$SINK_URI"
fi

rawkv_op $UP_PD put 5000
Expand Down
14 changes: 13 additions & 1 deletion cdc/tests/integration_tests/availability/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ source $CUR/capture.sh
source $CUR/processor.sh
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=tikv-cdc
SINK_TYPE=$1
UP_PD=http://$UP_PD_HOST_1:$UP_PD_PORT_1

export DOWN_TIDB_HOST
export DOWN_TIDB_PORT
Expand All @@ -21,9 +23,19 @@ function prepare() {
cd $WORK_DIR

start_ts=$(get_start_ts $UP_PD)

case $SINK_TYPE in
tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;;
kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;;
*) SINK_URI="" ;;
esac

run_cdc_cli changefeed create \
--start-ts=$start_ts --sink-uri="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" \
--start-ts=$start_ts --sink-uri="$SINK_URI" \
--disable-version-check
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
fi
}

trap stop_tidb_cluster EXIT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ function run() {

case $SINK_TYPE in
tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;;
kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;;
*) SINK_URI="" ;;
esac

Expand All @@ -28,6 +29,9 @@ function run() {
export GO_FAILPOINTS='github.com/tikv/migration/cdc/cdc/processor/processorManagerHandleNewChangefeedDelay=sleep(2000)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8600" --pd $UP_PD
changefeed_id=$(tikv-cdc cli changefeed create --pd=$UP_PD --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
fi

# wait task is dispatched
sleep 1
Expand Down
5 changes: 5 additions & 0 deletions cdc/tests/integration_tests/changefeed_auto_stop/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,15 @@ function run() {

case $SINK_TYPE in
tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;;
kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;;
*) SINK_URI="" ;;
esac

changefeedid=$(tikv-cdc cli changefeed create --pd="http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
fi

# make sure the first capture does job first.
sleep 3
export GO_FAILPOINTS=''
Expand Down
4 changes: 4 additions & 0 deletions cdc/tests/integration_tests/changefeed_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,15 @@ function run() {

case $SINK_TYPE in
tikv) SINK_URI="tikv://${DOWN_PD_HOST}:${DOWN_PD_PORT}" ;;
kafka) SINK_URI=$(get_kafka_sink_uri "$TEST_NAME") ;;
*) SINK_URI="" ;;
esac

changefeedid="changefeed-error"
run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" -c $changefeedid
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "$SINK_URI"
fi

ensure $MAX_RETRIES check_changefeed_mark_failed_regex $UP_PD ${changefeedid} ".*CDC:ErrStartTsBeforeGC.*"
run_cdc_cli changefeed resume -c $changefeedid
Expand Down
Loading

0 comments on commit 32ce78f

Please sign in to comment.