Skip to content

Commit

Permalink
fix: some bug
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Dec 24, 2024
1 parent a204d04 commit 8eb9821
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (s *scannerAdaptorImpl) executeConsume() {
Message: s.pendingQueue.Next(),
})
if handleResult.Error != nil {
s.Finish(err)
s.Finish(handleResult.Error)
return
}
if handleResult.MessageHandled {
Expand Down
15 changes: 9 additions & 6 deletions pkg/streaming/walimpls/impls/kafka/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (b *builderImpl) Name() string {

// Build build a wal instance.
func (b *builderImpl) Build() (walimpls.OpenerImpls, error) {
producerConfig, consumerConfig := b.getProducerAndConsumerConfig()
producerConfig, consumerConfig := b.getProducerConfig(), b.getConsumerConfig()

p, err := kafka.NewProducer(&producerConfig)
if err != nil {
Expand All @@ -40,10 +40,9 @@ func (b *builderImpl) Build() (walimpls.OpenerImpls, error) {
}

// getProducerAndConsumerConfig returns the producer and consumer config.
func (b *builderImpl) getProducerAndConsumerConfig() (producerConfig kafka.ConfigMap, consumerConfig kafka.ConfigMap) {
func (b *builderImpl) getProducerConfig() kafka.ConfigMap {
config := &paramtable.Get().KafkaCfg
producerConfig = getBasicConfig(config)
consumerConfig = cloneKafkaConfig(producerConfig)
producerConfig := getBasicConfig(config)

producerConfig.SetKey("message.max.bytes", 10485760)
producerConfig.SetKey("compression.codec", "zstd")
Expand All @@ -52,13 +51,17 @@ func (b *builderImpl) getProducerAndConsumerConfig() (producerConfig kafka.Confi
for k, v := range config.ProducerExtraConfig.GetValue() {
producerConfig.SetKey(k, v)
}
return producerConfig
}

func (b *builderImpl) getConsumerConfig() kafka.ConfigMap {
config := &paramtable.Get().KafkaCfg
consumerConfig := getBasicConfig(config)
consumerConfig.SetKey("allow.auto.create.topics", true)
for k, v := range config.ConsumerExtraConfig.GetValue() {
consumerConfig.SetKey(k, v)
}

return producerConfig, consumerConfig
return consumerConfig
}

// getBasicConfig returns the basic kafka config.
Expand Down
2 changes: 1 addition & 1 deletion pkg/streaming/walimpls/impls/kafka/opener.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (o *openerImpl) execute() {
}
default:
// ignore other events
log.Info("kafka producer incoming non-message, non-error event", zap.String("event", ev.String()))
log.Debug("kafka producer incoming non-message, non-error event", zap.String("event", ev.String()))
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/streaming/walimpls/impls/kafka/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,6 @@ func (w *walImpl) Read(ctx context.Context, opt walimpls.ReadOption) (s walimpls
// and there's no commit opeartions.
consumerConfig := cloneKafkaConfig(w.consumerConfig)
consumerConfig.SetKey("group.id", opt.Name)
switch opt.DeliverPolicy.GetPolicy().(type) {
}
c, err := kafka.NewConsumer(&consumerConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to create kafka consumer")
Expand Down

0 comments on commit 8eb9821

Please sign in to comment.