Skip to content

Commit

Permalink
idempotent = true
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu committed Mar 3, 2024
1 parent acf0d83 commit 1a15bc5
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 1 deletion.
3 changes: 3 additions & 0 deletions cdc/cdc/sink/producer/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
// and https://github.com/tikv/migration/cdc/issues/3352.
config.Metadata.Timeout = 1 * time.Minute

config.Producer.Idempotent = true
config.Producer.Partitioner = sarama.NewManualPartitioner
config.Producer.MaxMessageBytes = c.MaxMessageBytes
config.Producer.Return.Successes = true
Expand Down Expand Up @@ -260,6 +261,8 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config.Admin.Retry.Backoff = 500 * time.Millisecond
config.Admin.Timeout = 1 * time.Minute

config.Net.MaxOpenRequests = 1

if c.Credential != nil && len(c.Credential.CAPath) != 0 {
config.Net.TLS.Enable = true
config.Net.TLS.Config, err = c.Credential.ToTLSConfig()
Expand Down
2 changes: 1 addition & 1 deletion cdc/tests/integration_tests/changefeed_pause_resume/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ function run() {
run_kafka_consumer --workdir "$WORK_DIR" --upstream-uri "$SINK_URI"
fi

for _ in $(seq 1 10); do
for _ in $(seq 1 100000); do
tikv-cdc cli changefeed pause --changefeed-id=$changefeed_id --pd=$UP_PD
rawkv_op $UP_PD put 5000
tikv-cdc cli changefeed resume --changefeed-id=$changefeed_id --pd=$UP_PD
Expand Down

0 comments on commit 1a15bc5

Please sign in to comment.