From 97b3bd464adeaed4e93931c041b659cd46b368c4 Mon Sep 17 00:00:00 2001 From: chyezh Date: Fri, 20 Dec 2024 09:50:34 +0800 Subject: [PATCH] fix: some bug Signed-off-by: chyezh --- pkg/streaming/walimpls/impls/kafka/builder.go | 6 +-- .../walimpls/impls/kafka/kafka_test.go | 5 ++- .../walimpls/impls/kafka/message_id.go | 3 +- .../walimpls/impls/kafka/message_id_test.go | 3 +- pkg/streaming/walimpls/impls/kafka/opener.go | 45 +++++++++++++++++++ pkg/streaming/walimpls/impls/kafka/scanner.go | 1 + pkg/streaming/walimpls/impls/kafka/wal.go | 1 + .../walimpls/impls/pulsar/message_id_test.go | 3 +- .../walimpls/impls/rmq/message_id_test.go | 3 +- 9 files changed, 60 insertions(+), 10 deletions(-) diff --git a/pkg/streaming/walimpls/impls/kafka/builder.go b/pkg/streaming/walimpls/impls/kafka/builder.go index 256c52686907e..3c90e2865e646 100644 --- a/pkg/streaming/walimpls/impls/kafka/builder.go +++ b/pkg/streaming/walimpls/impls/kafka/builder.go @@ -2,6 +2,7 @@ package kafka import ( "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/walimpls" "github.com/milvus-io/milvus/pkg/streaming/walimpls/registry" @@ -35,10 +36,7 @@ func (b *builderImpl) Build() (walimpls.OpenerImpls, error) { if err != nil { return nil, err } - return &openerImpl{ - p: p, - consumerConfig: consumerConfig, - }, nil + return newOpenerImpl(p, consumerConfig), nil } // getProducerAndConsumerConfig returns the producer and consumer config. diff --git a/pkg/streaming/walimpls/impls/kafka/kafka_test.go b/pkg/streaming/walimpls/impls/kafka/kafka_test.go index ce2c32d18d13c..dcd407d3f906d 100644 --- a/pkg/streaming/walimpls/impls/kafka/kafka_test.go +++ b/pkg/streaming/walimpls/impls/kafka/kafka_test.go @@ -3,11 +3,12 @@ package kafka import ( "testing" + "github.com/zeebo/assert" + "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/walimpls" "github.com/milvus-io/milvus/pkg/streaming/walimpls/registry" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/zeebo/assert" ) func TestMain(m *testing.M) { @@ -27,7 +28,7 @@ func TestRegistry(t *testing.T) { } func TestKafka(t *testing.T) { - walimpls.NewWALImplsTestFramework(t, 1000, &builderImpl{}).Run() + walimpls.NewWALImplsTestFramework(t, 100, &builderImpl{}).Run() } func TestGetBasicConfig(t *testing.T) { diff --git a/pkg/streaming/walimpls/impls/kafka/message_id.go b/pkg/streaming/walimpls/impls/kafka/message_id.go index 6ce14f19522ac..16ad509860b8a 100644 --- a/pkg/streaming/walimpls/impls/kafka/message_id.go +++ b/pkg/streaming/walimpls/impls/kafka/message_id.go @@ -3,9 +3,10 @@ package kafka import ( "strconv" + "github.com/cockroachdb/errors" "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/milvus-io/milvus/pkg/streaming/util/message" - "github.com/pkg/errors" ) func UnmarshalMessageID(data string) (message.MessageID, error) { diff --git a/pkg/streaming/walimpls/impls/kafka/message_id_test.go b/pkg/streaming/walimpls/impls/kafka/message_id_test.go index ae1184b254d14..507a05e1b7daa 100644 --- a/pkg/streaming/walimpls/impls/kafka/message_id_test.go +++ b/pkg/streaming/walimpls/impls/kafka/message_id_test.go @@ -4,8 +4,9 @@ import ( "testing" "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/streaming/util/message" ) func TestMessageID(t *testing.T) { diff --git a/pkg/streaming/walimpls/impls/kafka/opener.go b/pkg/streaming/walimpls/impls/kafka/opener.go index 6bb9a2954761a..d07e2e696c7ca 100644 --- a/pkg/streaming/walimpls/impls/kafka/opener.go +++ b/pkg/streaming/walimpls/impls/kafka/opener.go @@ -2,15 +2,33 @@ package kafka import ( "context" + "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/walimpls" "github.com/milvus-io/milvus/pkg/streaming/walimpls/helper" + "github.com/milvus-io/milvus/pkg/util/syncutil" ) var _ walimpls.OpenerImpls = (*openerImpl)(nil) +// newOpenerImpl creates a new openerImpl instance. +func newOpenerImpl(p *kafka.Producer, consumerConfig kafka.ConfigMap) *openerImpl { + o := &openerImpl{ + n: syncutil.NewAsyncTaskNotifier[struct{}](), + p: p, + consumerConfig: consumerConfig, + } + go o.execute() + return o +} + +// openerImpl is the opener implementation for kafka wal. type openerImpl struct { + n *syncutil.AsyncTaskNotifier[struct{}] p *kafka.Producer consumerConfig kafka.ConfigMap } @@ -23,6 +41,33 @@ func (o *openerImpl) Open(ctx context.Context, opt *walimpls.OpenOption) (walimp }, nil } +func (o *openerImpl) execute() { + defer o.n.Finish(struct{}{}) + + for { + select { + case <-o.n.Context().Done(): + return + case ev, ok := <-o.p.Events(): + if !ok { + panic("kafka producer events channel should never be closed before the execute observer exit") + } + switch ev := ev.(type) { + case kafka.Error: + log.Error("kafka producer error", zap.Error(ev)) + if ev.IsFatal() { + panic(fmt.Sprintf("kafka producer error is fatal, %s", ev.Error())) + } + default: + // ignore other events + log.Info("kafka producer incoming non-message, non-error event", zap.String("event", ev.String())) + } + } + } +} + func (o *openerImpl) Close() { + o.n.Cancel() + o.n.BlockUntilFinish() o.p.Close() } diff --git a/pkg/streaming/walimpls/impls/kafka/scanner.go b/pkg/streaming/walimpls/impls/kafka/scanner.go index a55981d9dc7f8..934b00da84b94 100644 --- a/pkg/streaming/walimpls/impls/kafka/scanner.go +++ b/pkg/streaming/walimpls/impls/kafka/scanner.go @@ -4,6 +4,7 @@ import ( "time" "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/walimpls" "github.com/milvus-io/milvus/pkg/streaming/walimpls/helper" diff --git a/pkg/streaming/walimpls/impls/kafka/wal.go b/pkg/streaming/walimpls/impls/kafka/wal.go index af83a9e710036..6994e0e818afd 100644 --- a/pkg/streaming/walimpls/impls/kafka/wal.go +++ b/pkg/streaming/walimpls/impls/kafka/wal.go @@ -5,6 +5,7 @@ import ( "github.com/cockroachdb/errors" "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/walimpls" diff --git a/pkg/streaming/walimpls/impls/pulsar/message_id_test.go b/pkg/streaming/walimpls/impls/pulsar/message_id_test.go index 4475f5d9e22cc..e6d31f7716ce3 100644 --- a/pkg/streaming/walimpls/impls/pulsar/message_id_test.go +++ b/pkg/streaming/walimpls/impls/pulsar/message_id_test.go @@ -4,9 +4,10 @@ import ( "testing" "github.com/apache/pulsar-client-go/pulsar" - "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/stretchr/testify/assert" "google.golang.org/protobuf/proto" + + "github.com/milvus-io/milvus/pkg/streaming/util/message" ) func TestMessageID(t *testing.T) { diff --git a/pkg/streaming/walimpls/impls/rmq/message_id_test.go b/pkg/streaming/walimpls/impls/rmq/message_id_test.go index 54cbfbd5ceb04..e37bfdf056328 100644 --- a/pkg/streaming/walimpls/impls/rmq/message_id_test.go +++ b/pkg/streaming/walimpls/impls/rmq/message_id_test.go @@ -3,8 +3,9 @@ package rmq import ( "testing" - "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/streaming/util/message" ) func TestMessageID(t *testing.T) {