Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu committed Mar 3, 2024
1 parent 1a15bc5 commit 7450b16
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 3 deletions.
13 changes: 13 additions & 0 deletions cdc/cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,17 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) {
opts := map[string]string{}
errCh := make(chan error, 1)

newSaramaConfigImplBak := kafkap.NewSaramaConfigImpl
kafkap.NewSaramaConfigImpl = func(ctx context.Context, config *kafkap.Config) (*sarama.Config, error) {
// Idempotent requires Kafka version >= 0.11.0.0
config.Idempotent = false
cfg, err := newSaramaConfigImplBak(ctx, config)
c.Assert(err, check.IsNil)
return cfg, err
}
defer func() {
kafkap.NewSaramaConfigImpl = newSaramaConfigImplBak
}()
kafkap.NewAdminClientImpl = kafka.NewMockAdminClient
defer func() {
kafkap.NewAdminClientImpl = kafka.NewSaramaAdminClient
Expand Down Expand Up @@ -146,6 +157,8 @@ func (s mqSinkSuite) TestFlushChangedEvents(c *check.C) {

newSaramaConfigImplBak := kafkap.NewSaramaConfigImpl
kafkap.NewSaramaConfigImpl = func(ctx context.Context, config *kafkap.Config) (*sarama.Config, error) {
// Idempotent requires Kafka version >= 0.11.0.0
config.Idempotent = false
cfg, err := newSaramaConfigImplBak(ctx, config)
c.Assert(err, check.IsNil)
cfg.Producer.Flush.MaxMessages = 1
Expand Down
14 changes: 11 additions & 3 deletions cdc/cdc/sink/producer/kafka/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type Config struct {
SaslScram *security.SaslScram
// control whether to create topic
AutoCreate bool
// Whether to enable idempotent producer
Idempotent bool
}

// NewConfig returns a default Kafka configuration
Expand All @@ -63,6 +65,7 @@ func NewConfig() *Config {
Credential: &security.Credential{},
SaslScram: &security.SaslScram{},
AutoCreate: true,
Idempotent: true,
}
}

Expand Down Expand Up @@ -231,7 +234,14 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
// and https://github.com/tikv/migration/cdc/issues/3352.
config.Metadata.Timeout = 1 * time.Minute

config.Producer.Idempotent = true
// See: https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#enable-idempotence
config.Producer.Idempotent = c.Idempotent
if c.Idempotent {
config.Net.MaxOpenRequests = 1
} else {
log.Warn("The idempotent producer is disabled, which may cause data reordering")
}

config.Producer.Partitioner = sarama.NewManualPartitioner
config.Producer.MaxMessageBytes = c.MaxMessageBytes
config.Producer.Return.Successes = true
Expand Down Expand Up @@ -261,8 +271,6 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config.Admin.Retry.Backoff = 500 * time.Millisecond
config.Admin.Timeout = 1 * time.Minute

config.Net.MaxOpenRequests = 1

if c.Credential != nil && len(c.Credential.CAPath) != 0 {
config.Net.TLS.Enable = true
config.Net.TLS.Config, err = c.Credential.ToTLSConfig()
Expand Down

0 comments on commit 7450b16

Please sign in to comment.