diff --git a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go index 8f17d68773d74..6e293353c40e4 100644 --- a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go @@ -114,7 +114,7 @@ func (s *scannerAdaptorImpl) executeConsume() { Message: s.pendingQueue.Next(), }) if handleResult.Error != nil { - s.Finish(err) + s.Finish(handleResult.Error) return } if handleResult.MessageHandled { diff --git a/pkg/streaming/walimpls/impls/kafka/builder.go b/pkg/streaming/walimpls/impls/kafka/builder.go index 3c90e2865e646..5c2cba15cd476 100644 --- a/pkg/streaming/walimpls/impls/kafka/builder.go +++ b/pkg/streaming/walimpls/impls/kafka/builder.go @@ -30,7 +30,7 @@ func (b *builderImpl) Name() string { // Build build a wal instance. func (b *builderImpl) Build() (walimpls.OpenerImpls, error) { - producerConfig, consumerConfig := b.getProducerAndConsumerConfig() + producerConfig, consumerConfig := b.getProducerConfig(), b.getConsumerConfig() p, err := kafka.NewProducer(&producerConfig) if err != nil { @@ -40,10 +40,9 @@ func (b *builderImpl) Build() (walimpls.OpenerImpls, error) { } // getProducerAndConsumerConfig returns the producer and consumer config. -func (b *builderImpl) getProducerAndConsumerConfig() (producerConfig kafka.ConfigMap, consumerConfig kafka.ConfigMap) { +func (b *builderImpl) getProducerConfig() kafka.ConfigMap { config := ¶mtable.Get().KafkaCfg - producerConfig = getBasicConfig(config) - consumerConfig = cloneKafkaConfig(producerConfig) + producerConfig := getBasicConfig(config) producerConfig.SetKey("message.max.bytes", 10485760) producerConfig.SetKey("compression.codec", "zstd") @@ -52,13 +51,17 @@ func (b *builderImpl) getProducerAndConsumerConfig() (producerConfig kafka.Confi for k, v := range config.ProducerExtraConfig.GetValue() { producerConfig.SetKey(k, v) } + return producerConfig +} +func (b *builderImpl) getConsumerConfig() kafka.ConfigMap { + config := ¶mtable.Get().KafkaCfg + consumerConfig := getBasicConfig(config) consumerConfig.SetKey("allow.auto.create.topics", true) for k, v := range config.ConsumerExtraConfig.GetValue() { consumerConfig.SetKey(k, v) } - - return producerConfig, consumerConfig + return consumerConfig } // getBasicConfig returns the basic kafka config. diff --git a/pkg/streaming/walimpls/impls/kafka/opener.go b/pkg/streaming/walimpls/impls/kafka/opener.go index d07e2e696c7ca..4f2464c37008a 100644 --- a/pkg/streaming/walimpls/impls/kafka/opener.go +++ b/pkg/streaming/walimpls/impls/kafka/opener.go @@ -60,7 +60,7 @@ func (o *openerImpl) execute() { } default: // ignore other events - log.Info("kafka producer incoming non-message, non-error event", zap.String("event", ev.String())) + log.Debug("kafka producer incoming non-message, non-error event", zap.String("event", ev.String())) } } } diff --git a/pkg/streaming/walimpls/impls/kafka/wal.go b/pkg/streaming/walimpls/impls/kafka/wal.go index 6994e0e818afd..63d0bcb492044 100644 --- a/pkg/streaming/walimpls/impls/kafka/wal.go +++ b/pkg/streaming/walimpls/impls/kafka/wal.go @@ -59,8 +59,6 @@ func (w *walImpl) Read(ctx context.Context, opt walimpls.ReadOption) (s walimpls // and there's no commit opeartions. consumerConfig := cloneKafkaConfig(w.consumerConfig) consumerConfig.SetKey("group.id", opt.Name) - switch opt.DeliverPolicy.GetPolicy().(type) { - } c, err := kafka.NewConsumer(&consumerConfig) if err != nil { return nil, errors.Wrap(err, "failed to create kafka consumer")