diff --git a/cdc/cdc/sink/statistics.go b/cdc/cdc/sink/statistics.go index b1a1a9fc..1c3c4622 100644 --- a/cdc/cdc/sink/statistics.go +++ b/cdc/cdc/sink/statistics.go @@ -25,7 +25,7 @@ import ( ) const ( - printStatusInterval = 10 * time.Minute + printStatusInterval = 30 * time.Second flushMetricsInterval = 5 * time.Second ) diff --git a/cdc/cdc/sink/tikv.go b/cdc/cdc/sink/tikv.go index 6b1a41b2..968d2ed7 100644 --- a/cdc/cdc/sink/tikv.go +++ b/cdc/cdc/sink/tikv.go @@ -257,7 +257,7 @@ type innerBatch struct { TTLs []uint64 } -type tikvBatcher struct { +type TikvBatcher struct { Batches []innerBatch count int byteSize uint64 @@ -266,22 +266,26 @@ type tikvBatcher struct { statistics *Statistics } -func newTiKVBatcher(statistics *Statistics) *tikvBatcher { - b := &tikvBatcher{ +func NewTiKVBatcher(statistics *Statistics) *TikvBatcher { + b := &TikvBatcher{ statistics: statistics, } return b } -func (b *tikvBatcher) Count() int { +func (b *TikvBatcher) Count() int { return b.count } -func (b *tikvBatcher) ByteSize() uint64 { +func (b *TikvBatcher) IsEmpty() bool { + return b.count == 0 +} + +func (b *TikvBatcher) ByteSize() uint64 { return b.byteSize } -func (b *tikvBatcher) getNow() uint64 { +func (b *TikvBatcher) getNow() uint64 { failpoint.Inject("tikvSinkGetNow", func(val failpoint.Value) { now := uint64(val.(int)) failpoint.Return(now) @@ -316,7 +320,7 @@ func ExtractRawKVEntry(entry *model.RawKVEntry, now uint64) (opType model.OpType return } -func (b *tikvBatcher) Append(entry *model.RawKVEntry) { +func (b *TikvBatcher) Append(entry *model.RawKVEntry) error { if len(b.Batches) == 0 { b.now = b.getNow() } @@ -324,8 +328,10 @@ func (b *tikvBatcher) Append(entry *model.RawKVEntry) { opType, key, value, ttl, err := ExtractRawKVEntry(entry, b.now) if err != nil { log.Error("failed to extract entry", zap.Any("event", entry), zap.Error(err)) - b.statistics.AddInvalidKeyCount() - return + if b.statistics != nil { + b.statistics.AddInvalidKeyCount() + } + return errors.Trace(err) } // NOTE: do NOT separate PUT & DELETE operations into two batch. @@ -353,9 +359,11 @@ func (b *tikvBatcher) Append(entry *model.RawKVEntry) { if opType == model.OpTypePut { b.byteSize += uint64(len(value)) + uint64(unsafe.Sizeof(ttl)) } + + return nil } -func (b *tikvBatcher) Reset() { +func (b *TikvBatcher) Reset() { b.Batches = b.Batches[:0] b.count = 0 b.byteSize = 0 @@ -370,7 +378,7 @@ func (k *tikvSink) runWorker(ctx context.Context, workerIdx uint32) error { tick := time.NewTicker(500 * time.Millisecond) defer tick.Stop() - batcher := newTiKVBatcher(k.statistics) + batcher := NewTiKVBatcher(k.statistics) flushToTiKV := func() error { return k.statistics.RecordBatchExecution(func() (int, error) { @@ -426,7 +434,9 @@ func (k *tikvSink) runWorker(ctx context.Context, workerIdx uint32) error { } continue } - batcher.Append(e.rawKVEntry) + if err := batcher.Append(e.rawKVEntry); err != nil { + return errors.Trace(err) + } if batcher.ByteSize() >= defaultTiKVBatchBytesLimit { if err := flushToTiKV(); err != nil { diff --git a/cdc/cdc/sink/tikv_test.go b/cdc/cdc/sink/tikv_test.go index 0930cb54..a82df3eb 100644 --- a/cdc/cdc/sink/tikv_test.go +++ b/cdc/cdc/sink/tikv_test.go @@ -154,7 +154,7 @@ func TestTiKVSinkBatcher(t *testing.T) { }() statistics := NewStatistics(context.Background(), "TiKV", map[string]string{}) - batcher := newTiKVBatcher(statistics) + batcher := NewTiKVBatcher(statistics) keys := []string{ "a", "b", "c", "d", "e", "f", } @@ -183,8 +183,8 @@ func TestTiKVSinkBatcher(t *testing.T) { ExpiredTs: expires[i], CRTs: uint64(i), } - batcher.Append(entry0) - batcher.Append(entry1) + require.NoError(batcher.Append(entry0)) + require.Error(batcher.Append(entry1)) } require.Len(batcher.Batches, 3) require.Equal(6, batcher.Count()) diff --git a/cdc/cmd/kafka-consumer/main.go b/cdc/cmd/kafka-consumer/main.go index ec1a4c4b..efcef40c 100644 --- a/cdc/cmd/kafka-consumer/main.go +++ b/cdc/cmd/kafka-consumer/main.go @@ -18,6 +18,7 @@ import ( "flag" "fmt" "math" + "math/rand" "net/url" "os" "os/signal" @@ -44,7 +45,7 @@ import ( ) const ( - downstreamRetryInterval = 500 * time.Millisecond + downstreamRetryIntervalMs int = 200 ) // Sarama configuration options @@ -379,7 +380,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram if sink == nil { panic("sink should initialized") } -ClaimMessages: + kvs := make([]*model.RawKVEntry, 0) for message := range claim.Messages() { log.Debug("Message claimed", zap.Int32("partition", message.Partition), zap.ByteString("key", message.Key), zap.ByteString("value", message.Value)) batchDecoder, err := codec.NewJSONEventBatchDecoder(message.Key, message.Value) @@ -387,7 +388,37 @@ ClaimMessages: return errors.Trace(err) } + // Return error only when the session is closed + emitChangedEvents := func() error { + if len(kvs) == 0 { + return nil + } + for { + err = sink.EmitChangedEvents(ctx, kvs...) + if err == nil { + log.Debug("emit changed events", zap.Any("kvs", kvs)) + lastCRTs := sink.lastCRTs.Load() + lastKv := kvs[len(kvs)-1] + if lastCRTs < lastKv.CRTs { + sink.lastCRTs.Store(lastKv.CRTs) + } + kvs = kvs[:0] + return nil + } + + log.Warn("emit row changed event failed", zap.Error(err)) + if session.Context().Err() != nil { + log.Warn("session closed", zap.Error(session.Context().Err())) + return session.Context().Err() + } + + sleepMs := downstreamRetryIntervalMs + rand.Intn(downstreamRetryIntervalMs) + time.Sleep(time.Duration(sleepMs) * time.Millisecond) + } + } + counter := 0 + KvLoop: for { tp, hasNext, err := batchDecoder.HasNext() if err != nil { @@ -416,32 +447,21 @@ ClaimMessages: zap.Uint64("globalResolvedTs", globalResolvedTs), zap.Uint64("sinkResolvedTs", sink.resolvedTs.Load()), zap.Int32("partition", partition)) - break ClaimMessages + continue KvLoop } - for { - err = sink.EmitChangedEvents(ctx, kv) - if err == nil { - log.Debug("emit changed events", zap.Any("kv", kv)) - lastCRTs := sink.lastCRTs.Load() - if lastCRTs < kv.CRTs { - sink.lastCRTs.Store(kv.CRTs) - } - break - } - - log.Warn("emit row changed event failed", zap.Error(err)) - if session.Context().Err() != nil { - log.Warn("session closed", zap.Error(session.Context().Err())) - return nil - } - time.Sleep(downstreamRetryInterval) - } + kvs = append(kvs, kv) case model.MqMessageTypeResolved: ts, err := batchDecoder.NextResolvedEvent() if err != nil { log.Fatal("decode message value failed", zap.ByteString("value", message.Value)) } + + if err := emitChangedEvents(); err != nil { + log.Info("session closed", zap.Error(err)) + return nil + } + resolvedTs := sink.resolvedTs.Load() if resolvedTs < ts { log.Debug("update sink resolved ts", @@ -450,13 +470,19 @@ ClaimMessages: sink.resolvedTs.Store(ts) } } - session.MarkMessage(message, "") } if counter > kafkaMaxBatchSize { log.Fatal("Open Protocol max-batch-size exceeded", zap.Int("max-batch-size", kafkaMaxBatchSize), zap.Int("actual-batch-size", counter)) } + + if err := emitChangedEvents(); err != nil { + log.Info("session closed", zap.Error(err)) + return nil + } + + session.MarkMessage(message, "") } return nil diff --git a/cdc/cmd/kafka-consumer/tikv.go b/cdc/cmd/kafka-consumer/tikv.go index 8cba5f46..bebbddaa 100644 --- a/cdc/cmd/kafka-consumer/tikv.go +++ b/cdc/cmd/kafka-consumer/tikv.go @@ -17,7 +17,6 @@ import ( "context" "math" "net/url" - "time" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" @@ -30,7 +29,8 @@ import ( ) const ( - defaultPDErrorRetry int = math.MaxInt + defaultPDErrorRetry int = math.MaxInt + defaultTiKVBatchBytesLimit uint64 = 40 * 1024 * 1024 // 40MB ) var _ sink.Sink = (*tikvSimpleSink)(nil) @@ -38,7 +38,8 @@ var _ sink.Sink = (*tikvSimpleSink)(nil) // tikvSimpleSink is a sink that sends events to downstream TiKV cluster. // The reason why we need this sink other than `cdc/sink/tikv.tikvSink` is that we need Kafka message offset to handle TiKV errors, which is not provided by `tikvSink`. type tikvSimpleSink struct { - client *rawkv.Client + client *rawkv.Client + batcher *sink.TikvBatcher } func newSimpleTiKVSink(ctx context.Context, sinkURI *url.URL, _ *config.ReplicaConfig, opts map[string]string, _ chan error) (*tikvSimpleSink, error) { @@ -56,34 +57,50 @@ func newSimpleTiKVSink(ctx context.Context, sinkURI *url.URL, _ *config.ReplicaC return nil, errors.Trace(err) } return &tikvSimpleSink{ - client: client, + client: client, + batcher: sink.NewTiKVBatcher(nil), }, nil } func (s *tikvSimpleSink) EmitChangedEvents(ctx context.Context, rawKVEntries ...*model.RawKVEntry) error { - now := uint64(time.Now().Unix()) + s.batcher.Reset() - for _, entry := range rawKVEntries { - opType, key, value, ttl, err := sink.ExtractRawKVEntry(entry, now) - if err != nil { - return errors.Trace(err) + flushToTiKV := func() error { + if s.batcher.IsEmpty() { + return nil } - if opType == model.OpTypePut { - err := s.client.PutWithTTL(ctx, key, value, ttl) + var err error + for _, batch := range s.batcher.Batches { + if batch.OpType == model.OpTypePut { + err = s.client.BatchPutWithTTL(ctx, batch.Keys, batch.Values, batch.TTLs) + } else if batch.OpType == model.OpTypeDelete { + err = s.client.BatchDelete(ctx, batch.Keys) + } else { + err = errors.Errorf("unexpected OpType: %v", batch.OpType) + } if err != nil { return errors.Trace(err) } - } else if opType == model.OpTypeDelete { - err := s.client.Delete(ctx, key) - if err != nil { + } + s.batcher.Reset() + return nil + } + + for _, entry := range rawKVEntries { + err := s.batcher.Append(entry) + if err != nil { + return errors.Trace(err) + } + + if s.batcher.ByteSize() >= defaultTiKVBatchBytesLimit { + if err := flushToTiKV(); err != nil { return errors.Trace(err) } - } else { - return errors.Errorf("unexpected opType %v", opType) } } - return nil + + return errors.Trace(flushToTiKV()) } func (s *tikvSimpleSink) FlushChangedEvents(ctx context.Context, _ model.KeySpanID, resolvedTs uint64) (uint64, error) { @@ -95,6 +112,7 @@ func (s *tikvSimpleSink) EmitCheckpointTs(ctx context.Context, ts uint64) error } func (s *tikvSimpleSink) Close(ctx context.Context) error { + s.batcher.Reset() return errors.Trace(s.client.Close()) } diff --git a/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml b/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml index 50ab8592..f15dcc07 100644 --- a/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml +++ b/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml @@ -1,4 +1,4 @@ -version: '2.1' +version: "3" services: zookeeper: diff --git a/cdc/tests/integration_tests/_utils/test_prepare b/cdc/tests/integration_tests/_utils/test_prepare index a8b9a76c..0c2459d0 100644 --- a/cdc/tests/integration_tests/_utils/test_prepare +++ b/cdc/tests/integration_tests/_utils/test_prepare @@ -80,6 +80,7 @@ function on_exit() { return 0 else echo "Error $STATUS_CODE occurred on $LINE for sink $SINK_TYPE" - tail -n +1 "$WORK_DIR"/cdc*.log + # CI env already collect "*.log". Uncomment it for other envs. + # tail -n +1 "$WORK_DIR"/cdc*.log fi } diff --git a/cdc/tests/integration_tests/changefeed_pause_resume/run.sh b/cdc/tests/integration_tests/changefeed_pause_resume/run.sh index 9cbdf1cf..35f4eb2e 100755 --- a/cdc/tests/integration_tests/changefeed_pause_resume/run.sh +++ b/cdc/tests/integration_tests/changefeed_pause_resume/run.sh @@ -28,7 +28,7 @@ function run() { run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI" fi - for i in $(seq 1 10); do + for _ in $(seq 1 10); do tikv-cdc cli changefeed pause --changefeed-id=$changefeed_id --pd=$UP_PD rawkv_op $UP_PD put 5000 tikv-cdc cli changefeed resume --changefeed-id=$changefeed_id --pd=$UP_PD