diff --git a/cdc/cdc/sink/mq_test.go b/cdc/cdc/sink/mq_test.go index f2bda0a4..29152333 100644 --- a/cdc/cdc/sink/mq_test.go +++ b/cdc/cdc/sink/mq_test.go @@ -60,6 +60,17 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) { opts := map[string]string{} errCh := make(chan error, 1) + newSaramaConfigImplBak := kafkap.NewSaramaConfigImpl + kafkap.NewSaramaConfigImpl = func(ctx context.Context, config *kafkap.Config) (*sarama.Config, error) { + // Idempotent requires Kafka version >= 0.11.0.0 + config.Idempotent = false + cfg, err := newSaramaConfigImplBak(ctx, config) + c.Assert(err, check.IsNil) + return cfg, err + } + defer func() { + kafkap.NewSaramaConfigImpl = newSaramaConfigImplBak + }() kafkap.NewAdminClientImpl = kafka.NewMockAdminClient defer func() { kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient @@ -146,6 +157,8 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) { newSaramaConfigImplBak := kafkap.NewSaramaConfigImpl kafkap.NewSaramaConfigImpl = func(ctx context.Context, config *kafkap.Config) (*sarama.Config, error) { + // Idempotent requires Kafka version >= 0.11.0.0 + config.Idempotent = false cfg, err := newSaramaConfigImplBak(ctx, config) c.Assert(err, check.IsNil) cfg.Producer.Flush.MaxMessages = 1 diff --git a/cdc/cdc/sink/producer/kafka/config.go b/cdc/cdc/sink/producer/kafka/config.go index 61f162e8..cf8152ef 100644 --- a/cdc/cdc/sink/producer/kafka/config.go +++ b/cdc/cdc/sink/producer/kafka/config.go @@ -50,6 +50,8 @@ type Config struct { SaslScram *security.SaslScram // control whether to create topic AutoCreate bool + // Whether to enable idempotent producer + Idempotent bool } // NewConfig returns a default Kafka configuration @@ -63,6 +65,7 @@ func NewConfig() *Config { Credential: &security.Credential{}, SaslScram: &security.SaslScram{}, AutoCreate: true, + Idempotent: true, } } @@ -231,7 +234,14 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { // and https://github.com/tikv/migration/cdc/issues/3352. config.Metadata.Timeout = 1 * time.Minute - config.Producer.Idempotent = true + // See: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#enable-idempotence + config.Producer.Idempotent = c.Idempotent + if c.Idempotent { + config.Net.MaxOpenRequests = 1 + } else { + log.Warn("The idempotent producer is disabled, which may cause data reordering") + } + config.Producer.Partitioner = sarama.NewManualPartitioner config.Producer.MaxMessageBytes = c.MaxMessageBytes config.Producer.Return.Successes = true @@ -261,8 +271,6 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) { config.Admin.Retry.Backoff = 500 * time.Millisecond config.Admin.Timeout = 1 * time.Minute - config.Net.MaxOpenRequests = 1 - if c.Credential != nil && len(c.Credential.CAPath) != 0 { config.Net.TLS.Enable = true config.Net.TLS.Config, err = c.Credential.ToTLSConfig()