Skip to content

Commit

Permalink
polish
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu committed Jan 8, 2024
1 parent 85f2041 commit fb2a5e3
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 9 deletions.
12 changes: 8 additions & 4 deletions cdc/cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,11 @@ func main() {
go func() {
defer wg.Done()
if err := consumer.Run(ctx); err != nil {
log.Fatal("Error running consumer: %v", zap.Error(err))
if errors.Cause(err) == context.Canceled {
log.Info("consumer stopped", zap.Error(err))
} else {
log.Fatal("Error running consumer: %v", zap.Error(err))
}
}
}()

Expand Down Expand Up @@ -368,7 +372,7 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
}
ClaimMessages:
for message := range claim.Messages() {
log.Info("Message claimed", zap.Int32("partition", message.Partition), zap.ByteString("key", message.Key), zap.ByteString("value", message.Value))
log.Debug("Message claimed", zap.Int32("partition", message.Partition), zap.ByteString("key", message.Key), zap.ByteString("value", message.Value))
batchDecoder, err := codec.NewJSONEventBatchDecoder(message.Key, message.Value)
if err != nil {
return errors.Trace(err)
Expand All @@ -388,7 +392,7 @@ ClaimMessages:
// If the message containing only one event exceeds the length limit, CDC will allow it and issue a warning.
if len(message.Key)+len(message.Value) > kafkaMaxMessageBytes && counter > 1 {
log.Fatal("kafka max-messages-bytes exceeded", zap.Int("max-message-bytes", kafkaMaxMessageBytes),
zap.Int("recevied-bytes", len(message.Key)+len(message.Value)))
zap.Int("received-bytes", len(message.Key)+len(message.Value)))
}

switch tp {
Expand All @@ -409,7 +413,7 @@ ClaimMessages:
if err != nil {
log.Fatal("emit row changed event failed", zap.Error(err))
}
log.Info("Emit ChangedEvent", zap.Any("kv", kv))
log.Debug("Emit ChangedEvent", zap.Any("kv", kv))
lastCRTs := sink.lastCRTs.Load()
if lastCRTs < kv.CRTs {
sink.lastCRTs.Store(kv.CRTs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,4 @@ services:
- tests/integration_tests/run.sh kafka "${CASE}" & tail -f /dev/null
network_mode: "service:kafka"
volumes:
- /tmp/tikv_cdc_test_pingyu/:/tmp/tikv_cdc_test
- /tmp/tikv_cdc_test/:/tmp/tikv_cdc_test
5 changes: 5 additions & 0 deletions cdc/deployments/tikv-cdc/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
FROM golang:1.21-alpine3.18 as builder
RUN apk add --no-cache git make bash
WORKDIR /go/src/github.com/tikv/migration/cdc

COPY go.mod .
COPY go.sum .
RUN GO111MODULE=on go mod download

COPY . .
ENV CDC_ENABLE_VENDOR=0
RUN make release
Expand Down
16 changes: 16 additions & 0 deletions cdc/deployments/tikv-cdc/docker/kafka-consumer.Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
FROM golang:1.21-alpine3.18 as builder
RUN apk add --no-cache git make bash
WORKDIR /go/src/github.com/tikv/migration/cdc

COPY go.mod .
COPY go.sum .
RUN GO111MODULE=on go mod download

COPY . .
ENV CDC_ENABLE_VENDOR=0
RUN make kafka_consumer

FROM alpine:3.18
RUN apk add --no-cache tzdata bash curl socat kafkacat
COPY --from=builder /go/src/github.com/tikv/migration/cdc/bin/cdc_kafka_consumer /cdc_kafka_consumer
CMD [ "/cdc_kafka_consumer" ]
2 changes: 1 addition & 1 deletion cdc/tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Some useful tips:

#### Run Integration Tests in Docker for Kafka Sink

1. Run `tests/integration_tests/run_kafka_in_docker.sh --case [test names]`. The rule of `test name` is the same with above.
1. Run `tests/integration_tests/run_kafka_in_docker.sh --case [test names]`. The rule of `test names` is the same with above.

## Writing new tests

Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/_utils/run_kafka_consumer
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ while [[ $# -gt 0 ]]; do
esac
done

echo "[$(date)] <<<<<< START kafka consumer in $TEST_NAME case >>>>>>"
echo "[$(date)] <<<<<< START Kafka consumer in $TEST_NAME case >>>>>>"
cd "$workdir"
cdc_kafka_consumer \
--log-file "$workdir/cdc_kafka_consumer$log_suffix.log" \
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/run_group.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ set -eo pipefail
CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)

if [[ $# -eq 1 ]]; then
# TODO: remove this branch when CI pipeline is updated.
# For backward compatibility
sink_type=tikv
group=$1
elif [[ $# -eq 2 ]]; then
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/run_kafka_in_docker.sh
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/bin/bash
# Usage:
# ./tests/integration_tests/run_kafka_in_docker.sh --case [test_name]
# ./tests/integration_tests/run_kafka_in_docker.sh --case [test_names]

set -euo pipefail

Expand Down

0 comments on commit fb2a5e3

Please sign in to comment.