diff --git a/cdc/cmd/kafka-consumer/main.go b/cdc/cmd/kafka-consumer/main.go index b4fb875a..a232f3c9 100644 --- a/cdc/cmd/kafka-consumer/main.go +++ b/cdc/cmd/kafka-consumer/main.go @@ -272,7 +272,11 @@ func main() { go func() { defer wg.Done() if err := consumer.Run(ctx); err != nil { - log.Fatal("Error running consumer: %v", zap.Error(err)) + if errors.Cause(err) == context.Canceled { + log.Info("consumer stopped", zap.Error(err)) + } else { + log.Fatal("Error running consumer: %v", zap.Error(err)) + } } }() @@ -368,7 +372,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram } ClaimMessages: for message := range claim.Messages() { - log.Info("Message claimed", zap.Int32("partition", message.Partition), zap.ByteString("key", message.Key), zap.ByteString("value", message.Value)) + log.Debug("Message claimed", zap.Int32("partition", message.Partition), zap.ByteString("key", message.Key), zap.ByteString("value", message.Value)) batchDecoder, err := codec.NewJSONEventBatchDecoder(message.Key, message.Value) if err != nil { return errors.Trace(err) @@ -388,7 +392,7 @@ ClaimMessages: // If the message containing only one event exceeds the length limit, CDC will allow it and issue a warning. if len(message.Key)+len(message.Value) > kafkaMaxMessageBytes && counter > 1 { log.Fatal("kafka max-messages-bytes exceeded", zap.Int("max-message-bytes", kafkaMaxMessageBytes), - zap.Int("recevied-bytes", len(message.Key)+len(message.Value))) + zap.Int("received-bytes", len(message.Key)+len(message.Value))) } switch tp { @@ -409,7 +413,7 @@ ClaimMessages: if err != nil { log.Fatal("emit row changed event failed", zap.Error(err)) } - log.Info("Emit ChangedEvent", zap.Any("kv", kv)) + log.Debug("Emit ChangedEvent", zap.Any("kv", kv)) lastCRTs := sink.lastCRTs.Load() if lastCRTs < kv.CRTs { sink.lastCRTs.Store(kv.CRTs) diff --git a/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml b/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml index 6dfe4174..8d47a2a8 100644 --- a/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml +++ b/cdc/deployments/tikv-cdc/docker-compose/docker-compose-kafka-integration.yml @@ -37,4 +37,4 @@ services: - tests/integration_tests/run.sh kafka "${CASE}" & tail -f /dev/null network_mode: "service:kafka" volumes: - - /tmp/tikv_cdc_test_pingyu/:/tmp/tikv_cdc_test + - /tmp/tikv_cdc_test/:/tmp/tikv_cdc_test diff --git a/cdc/deployments/tikv-cdc/docker/Dockerfile b/cdc/deployments/tikv-cdc/docker/Dockerfile index 733ad89f..e3398174 100644 --- a/cdc/deployments/tikv-cdc/docker/Dockerfile +++ b/cdc/deployments/tikv-cdc/docker/Dockerfile @@ -1,6 +1,11 @@ FROM golang:1.21-alpine3.18 as builder RUN apk add --no-cache git make bash WORKDIR /go/src/github.com/tikv/migration/cdc + +COPY go.mod . +COPY go.sum . +RUN GO111MODULE=on go mod download + COPY . . ENV CDC_ENABLE_VENDOR=0 RUN make release diff --git a/cdc/deployments/tikv-cdc/docker/kafka-consumer.Dockerfile b/cdc/deployments/tikv-cdc/docker/kafka-consumer.Dockerfile new file mode 100644 index 00000000..3f75cfa8 --- /dev/null +++ b/cdc/deployments/tikv-cdc/docker/kafka-consumer.Dockerfile @@ -0,0 +1,16 @@ +FROM golang:1.21-alpine3.18 as builder +RUN apk add --no-cache git make bash +WORKDIR /go/src/github.com/tikv/migration/cdc + +COPY go.mod . +COPY go.sum . +RUN GO111MODULE=on go mod download + +COPY . . +ENV CDC_ENABLE_VENDOR=0 +RUN make kafka_consumer + +FROM alpine:3.18 +RUN apk add --no-cache tzdata bash curl socat kafkacat +COPY --from=builder /go/src/github.com/tikv/migration/cdc/bin/cdc_kafka_consumer /cdc_kafka_consumer +CMD [ "/cdc_kafka_consumer" ] diff --git a/cdc/tests/README.md b/cdc/tests/README.md index c57d49f9..768b859e 100644 --- a/cdc/tests/README.md +++ b/cdc/tests/README.md @@ -70,7 +70,7 @@ Some useful tips: #### Run Integration Tests in Docker for Kafka Sink -1. Run `tests/integration_tests/run_kafka_in_docker.sh --case [test names]`. The rule of `test name` is the same with above. +1. Run `tests/integration_tests/run_kafka_in_docker.sh --case [test names]`. The rule of `test names` is the same with above. ## Writing new tests diff --git a/cdc/tests/integration_tests/_utils/run_kafka_consumer b/cdc/tests/integration_tests/_utils/run_kafka_consumer index acd582e4..5ff35b42 100755 --- a/cdc/tests/integration_tests/_utils/run_kafka_consumer +++ b/cdc/tests/integration_tests/_utils/run_kafka_consumer @@ -45,7 +45,7 @@ while [[ $# -gt 0 ]]; do esac done -echo "[$(date)] <<<<<< START kafka consumer in $TEST_NAME case >>>>>>" +echo "[$(date)] <<<<<< START Kafka consumer in $TEST_NAME case >>>>>>" cd "$workdir" cdc_kafka_consumer \ --log-file "$workdir/cdc_kafka_consumer$log_suffix.log" \ diff --git a/cdc/tests/integration_tests/run_group.sh b/cdc/tests/integration_tests/run_group.sh index ca459a06..a5f37739 100755 --- a/cdc/tests/integration_tests/run_group.sh +++ b/cdc/tests/integration_tests/run_group.sh @@ -5,7 +5,7 @@ set -eo pipefail CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) if [[ $# -eq 1 ]]; then - # TODO: remove this branch when CI pipeline is updated. + # For backward compatibility sink_type=tikv group=$1 elif [[ $# -eq 2 ]]; then diff --git a/cdc/tests/integration_tests/run_kafka_in_docker.sh b/cdc/tests/integration_tests/run_kafka_in_docker.sh index 870e5647..e22b0acf 100755 --- a/cdc/tests/integration_tests/run_kafka_in_docker.sh +++ b/cdc/tests/integration_tests/run_kafka_in_docker.sh @@ -1,6 +1,6 @@ #!/bin/bash # Usage: -# ./tests/integration_tests/run_kafka_in_docker.sh --case [test_name] +# ./tests/integration_tests/run_kafka_in_docker.sh --case [test_names] set -euo pipefail