Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[to #381] Add more integration tests for Kafka sink #387

Merged
merged 11 commits into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions cdc/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ debug:
$(GOBUILD_DEBUG) -ldflags '$(LDFLAGS)' -o bin/tikv-cdc ./cmd/cdc/main.go

kafka_consumer:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_kafka_consumer ./cmd/kafka-consumer/main.go
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc_kafka_consumer ./cmd/kafka-consumer/

install:
go install ./...
Expand Down Expand Up @@ -248,8 +248,10 @@ integration_test_by_group: prepare_test_binaries check_third_party_binary integr
tests/integration_tests/run_group.sh others

prepare_test_binaries:
cd scripts && ./download-integration-test-binaries.sh "$(TEST_ON_BRANCH)" && cd ..
touch prepare_test_binaries
cd scripts && \
./download-integration-test-binaries.sh "$(TEST_ON_BRANCH)" && \
cd .. && \
touch prepare_test_binaries

check_third_party_binary:
@which scripts/bin/tidb-server
Expand Down
1 change: 1 addition & 0 deletions cdc/cdc/kv/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ func (s *clientSuite) TestRecvLargeMessageSize(c *check.C) {
case <-time.After(30 * time.Second): // Send 128MB object may costs lots of time.
c.Fatalf("receiving message takes too long")
}
c.Assert(event, check.NotNil)
c.Assert(len(event.Val.Value), check.Equals, largeValSize)
}

Expand Down
15 changes: 11 additions & 4 deletions cdc/cdc/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ type Sink interface {
}

var (
sinkIniterMap = make(map[string]sinkInitFunc)
sinkURICheckerMap = make(map[string]sinkInitFunc)
sinkIniterMap = make(map[string]InitFunc)
sinkURICheckerMap = make(map[string]InitFunc)
)

type sinkInitFunc func(context.Context, model.ChangeFeedID, *url.URL, *config.ReplicaConfig, map[string]string, chan error) (Sink, error)
type InitFunc func(context.Context, model.ChangeFeedID, *url.URL, *config.ReplicaConfig, map[string]string, chan error) (Sink, error)

func init() {
// register blackhole sink
Expand All @@ -93,7 +93,7 @@ func init() {
sinkURICheckerMap["tikv"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (Sink, error) {
_, _, err := parseTiKVUri(sinkURI, opts)
_, _, err := ParseTiKVUri(sinkURI, opts)
return nil, err
}

Expand All @@ -113,6 +113,13 @@ func init() {
sinkURICheckerMap["kafka+ssl"] = sinkURICheckerMap["kafka"]
}

func RegisterSink(scheme string, initFunc InitFunc, checkerFunc InitFunc) {
sinkIniterMap[scheme] = initFunc
if checkerFunc != nil {
sinkURICheckerMap[scheme] = checkerFunc
}
}

// New creates a new sink with the sink-uri
func New(ctx context.Context, changefeedID model.ChangeFeedID, sinkURIStr string, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) {
// parse sinkURI as a URI
Expand Down
8 changes: 4 additions & 4 deletions cdc/cdc/sink/tikv.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (b *tikvBatcher) getNow() uint64 {
return uint64(time.Now().Unix()) // TODO: use TSO ?
}

func extractEntry(entry *model.RawKVEntry, now uint64) (opType model.OpType,
func ExtractRawKVEntry(entry *model.RawKVEntry, now uint64) (opType model.OpType,
key []byte, value []byte, ttl uint64, err error,
) {
opType = entry.OpType
Expand Down Expand Up @@ -321,7 +321,7 @@ func (b *tikvBatcher) Append(entry *model.RawKVEntry) {
b.now = b.getNow()
}

opType, key, value, ttl, err := extractEntry(entry, b.now)
opType, key, value, ttl, err := ExtractRawKVEntry(entry, b.now)
if err != nil {
log.Error("failed to extract entry", zap.Any("event", entry), zap.Error(err))
b.statistics.AddInvalidKeyCount()
Expand Down Expand Up @@ -436,7 +436,7 @@ func (k *tikvSink) runWorker(ctx context.Context, workerIdx uint32) error {
}
}

func parseTiKVUri(sinkURI *url.URL, opts map[string]string) (*tikvconfig.Config, []string, error) {
func ParseTiKVUri(sinkURI *url.URL, opts map[string]string) (*tikvconfig.Config, []string, error) {
config := tikvconfig.DefaultConfig()
pdAddrPrefix := "http://"

Expand Down Expand Up @@ -477,7 +477,7 @@ func parseTiKVUri(sinkURI *url.URL, opts map[string]string) (*tikvconfig.Config,
}

func newTiKVSink(ctx context.Context, sinkURI *url.URL, _ *config.ReplicaConfig, opts map[string]string, errCh chan error) (*tikvSink, error) {
config, pdAddr, err := parseTiKVUri(sinkURI, opts)
config, pdAddr, err := ParseTiKVUri(sinkURI, opts)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
6 changes: 3 additions & 3 deletions cdc/cdc/sink/tikv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestExtractRawKVEntry(t *testing.T) {
}

for i, c := range cases {
opType, key, value, ttl, err := extractEntry(c, now)
opType, key, value, ttl, err := ExtractRawKVEntry(c, now)
require.Equal(expects[i].opType, opType)
require.Equal(expects[i].key, key)
require.Equal(expects[i].value, value)
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestTiKVSinkConfig(t *testing.T) {
require.NoError(err)

opts := make(map[string]string)
config, pdAddr, err := parseTiKVUri(sinkURI, opts)
config, pdAddr, err := ParseTiKVUri(sinkURI, opts)
require.NoError(err)
require.Equal(expected[i].pdAddr, pdAddr)
require.Equal(expected[i].concurrency, opts["concurrency"])
Expand Down Expand Up @@ -222,7 +222,7 @@ func TestTiKVSink(t *testing.T) {
require.NoError(err)

opts := make(map[string]string)
config, pdAddr, err := parseTiKVUri(sinkURI, opts)
config, pdAddr, err := ParseTiKVUri(sinkURI, opts)
require.NoError(err)

errCh := make(chan error)
Expand Down
41 changes: 30 additions & 11 deletions cdc/cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ import (
"go.uber.org/zap"
)

const (
downstreamRetryInterval = 500 * time.Millisecond
)

// Sarama configuration options
var (
kafkaAddrs []string
Expand Down Expand Up @@ -105,14 +109,14 @@ func init() {
})
kafkaAddrs = strings.Split(upstreamURI.Host, ",")

config, err := newSaramaConfig()
cnf, err := newSaramaConfig()
if err != nil {
log.Fatal("Error creating sarama config", zap.Error(err))
}

s = upstreamURI.Query().Get("partition-num")
if s == "" {
partition, err := getPartitionNum(kafkaAddrs, kafkaTopic, config)
partition, err := getPartitionNum(kafkaAddrs, kafkaTopic, cnf)
if err != nil {
log.Fatal("can not get partition number", zap.String("topic", kafkaTopic), zap.Error(err))
}
Expand Down Expand Up @@ -144,6 +148,10 @@ func init() {
log.Info("Setting max-batch-size", zap.Int("max-batch-size", c))
kafkaMaxBatchSize = c
}

// Use `tikvSimpleSink` for "tikv".
// As `sink.tikvSink` has internal batch, it is not easy to tolerate errors of TiKV in Kafka consuming scene.
registerSimpleTiKVSink("tikv")
}

func getPartitionNum(address []string, topic string, cfg *sarama.Config) (int32, error) {
Expand Down Expand Up @@ -362,7 +370,8 @@ func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
ctx := context.TODO()
ctx, cancel := context.WithCancel(session.Context())
defer cancel()
partition := claim.Partition()
c.sinksMu.Lock()
sink := c.sinks[partition]
Expand Down Expand Up @@ -409,14 +418,24 @@ ClaimMessages:
zap.Int32("partition", partition))
break ClaimMessages
}
err = sink.EmitChangedEvents(ctx, kv)
if err != nil {
log.Fatal("emit row changed event failed", zap.Error(err))
}
log.Debug("Emit ChangedEvent", zap.Any("kv", kv))
lastCRTs := sink.lastCRTs.Load()
if lastCRTs < kv.CRTs {
sink.lastCRTs.Store(kv.CRTs)

for {
err = sink.EmitChangedEvents(ctx, kv)
if err == nil {
log.Debug("emit changed events", zap.Any("kv", kv))
lastCRTs := sink.lastCRTs.Load()
if lastCRTs < kv.CRTs {
sink.lastCRTs.Store(kv.CRTs)
}
break
}

log.Warn("emit row changed event failed", zap.Error(err))
if session.Context().Err() != nil {
log.Warn("session closed", zap.Error(session.Context().Err()))
return nil
}
time.Sleep(downstreamRetryInterval)
}
case model.MqMessageTypeResolved:
ts, err := batchDecoder.NextResolvedEvent()
Expand Down
117 changes: 117 additions & 0 deletions cdc/cmd/kafka-consumer/tikv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"context"
"net/url"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/tikv/migration/cdc/cdc/model"
"github.com/tikv/migration/cdc/cdc/sink"
"github.com/tikv/migration/cdc/pkg/config"

"github.com/tikv/client-go/v2/rawkv"
pd "github.com/tikv/pd/client"
)

const (
defaultPDErrorRetry int = 10
)

var _ sink.Sink = (*tikvSimpleSink)(nil)

// tikvSimpleSink is a sink that sends events to downstream TiKV cluster.
// The reason why we need this sink other than `cdc/sink/tikv.tikvSink` is that we need Kafka message offset to handle TiKV errors, which is not provided by `tikvSink`.
type tikvSimpleSink struct {
client *rawkv.Client
}

func newSimpleTiKVSink(ctx context.Context, sinkURI *url.URL, _ *config.ReplicaConfig, opts map[string]string, _ chan error) (*tikvSimpleSink, error) {
config, pdAddrs, err := sink.ParseTiKVUri(sinkURI, opts)
if err != nil {
return nil, errors.Trace(err)
}

client, err := rawkv.NewClientWithOpts(ctx, pdAddrs,
rawkv.WithSecurity(config.Security),
rawkv.WithAPIVersion(kvrpcpb.APIVersion_V2),
rawkv.WithPDOptions(pd.WithMaxErrorRetry(defaultPDErrorRetry)),
)
if err != nil {
return nil, errors.Trace(err)
}
return &tikvSimpleSink{
client: client,
}, nil
}

func (s *tikvSimpleSink) EmitChangedEvents(ctx context.Context, rawKVEntries ...*model.RawKVEntry) error {
now := uint64(time.Now().Unix())

for _, entry := range rawKVEntries {
opType, key, value, ttl, err := sink.ExtractRawKVEntry(entry, now)
if err != nil {
return errors.Trace(err)
}

if opType == model.OpTypePut {
err := s.client.PutWithTTL(ctx, key, value, ttl)
if err != nil {
return errors.Trace(err)
}
} else if opType == model.OpTypeDelete {
err := s.client.Delete(ctx, key)
if err != nil {
return errors.Trace(err)
}
} else {
return errors.Errorf("unexpected opType %v", opType)
}
}
return nil
}

func (s *tikvSimpleSink) FlushChangedEvents(ctx context.Context, _ model.KeySpanID, resolvedTs uint64) (uint64, error) {
return resolvedTs, nil
}

func (s *tikvSimpleSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
return nil
}

func (s *tikvSimpleSink) Close(ctx context.Context) error {
return errors.Trace(s.client.Close())
}

func (s *tikvSimpleSink) Barrier(ctx context.Context, keyspanID model.KeySpanID) error {
return nil
}

func registerSimpleTiKVSink(schema string) {
initFunc := func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (sink.Sink, error) {
return newSimpleTiKVSink(ctx, sinkURI, config, opts, errCh)
}
checkerFunc := func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
config *config.ReplicaConfig, opts map[string]string, errCh chan error,
) (sink.Sink, error) {
_, _, err := sink.ParseTiKVUri(sinkURI, opts)
return nil, err
}
sink.RegisterSink(schema, initFunc, checkerFunc)
}
2 changes: 2 additions & 0 deletions cdc/deployments/tikv-cdc/docker/integration-test.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ ARG TEST_ON_BRANCH=master
USER root
WORKDIR /root/download

RUN yum install -y wget

COPY ./scripts/download-integration-test-binaries.sh .
# Download all binaries into bin dir.
RUN ./download-integration-test-binaries.sh ${TEST_ON_BRANCH}
Expand Down
33 changes: 25 additions & 8 deletions cdc/scripts/download-integration-test-binaries.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,19 @@ color-green() { # Green
echo -e "\x1B[1;32m${*}\x1B[0m"
}

function download() {
local url=$1
local file_name=$2
local file_path=$3
if [[ -f "${file_path}" ]]; then
echo "file ${file_name} already exists, skip download"
return
fi
echo ">>>"
echo "download ${file_name} from ${url}"
wget --no-verbose --retry-connrefused --waitretry=1 -t 3 -O "${file_path}" "${url}"
}

# Specify the download branch.
branch=$1

Expand Down Expand Up @@ -56,18 +69,22 @@ mkdir -p tmp
mkdir -p bin

color-green "Download binaries..."
curl "${tidb_download_url}" | tar xz -C tmp bin/tidb-server
curl "${tikv_download_url}" | tar xz -C tmp bin/tikv-server
curl "${pd_download_url}" | tar xz --wildcards -C tmp bin/*
mv tmp/bin/* third_bin

curl "${go_ycsb_download_url}" -o third_bin/go-ycsb
curl -L "${etcd_download_url}" | tar xz -C tmp
mv tmp/etcd-v3.4.7-linux-amd64/etcdctl third_bin

download "$tidb_download_url" "tidb-server.tar.gz" "tmp/tidb-server.tar.gz"
tar -xz -C third_bin bin/tidb-server -f tmp/tidb-server.tar.gz && mv third_bin/bin/tidb-server third_bin/
download "$pd_download_url" "pd-server.tar.gz" "tmp/pd-server.tar.gz"
tar -xz --wildcards -C third_bin 'bin/*' -f tmp/pd-server.tar.gz && mv third_bin/bin/* third_bin/
download "$tikv_download_url" "tikv-server.tar.gz" "tmp/tikv-server.tar.gz"
tar -xz -C third_bin bin/tikv-server -f tmp/tikv-server.tar.gz && mv third_bin/bin/tikv-server third_bin/
download "$go_ycsb_download_url" "go-ycsb" "third_bin/go-ycsb"
download "$etcd_download_url" "etcd.tar.gz" "tmp/etcd.tar.gz"
tar -xz -C third_bin etcd-v3.4.7-linux-amd64/etcdctl -f tmp/etcd.tar.gz && mv third_bin/etcd-v3.4.7-linux-amd64/etcdctl third_bin/

chmod a+x third_bin/*

# Copy it to the bin directory in the root directory.
rm -rf tmp
rm -rf bin/bin
mv third_bin/* ./bin
rm -rf third_bin

Expand Down
Loading
Loading