Skip to content

Commit

Permalink
fix: some bug
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Dec 20, 2024
1 parent d58a30d commit 97b3bd4
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 10 deletions.
6 changes: 2 additions & 4 deletions pkg/streaming/walimpls/impls/kafka/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"github.com/confluentinc/confluent-kafka-go/kafka"

"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/registry"
Expand Down Expand Up @@ -35,10 +36,7 @@ func (b *builderImpl) Build() (walimpls.OpenerImpls, error) {
if err != nil {
return nil, err
}
return &openerImpl{
p: p,
consumerConfig: consumerConfig,
}, nil
return newOpenerImpl(p, consumerConfig), nil
}

// getProducerAndConsumerConfig returns the producer and consumer config.
Expand Down
5 changes: 3 additions & 2 deletions pkg/streaming/walimpls/impls/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package kafka
import (
"testing"

"github.com/zeebo/assert"

"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/registry"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/zeebo/assert"
)

func TestMain(m *testing.M) {
Expand All @@ -27,7 +28,7 @@ func TestRegistry(t *testing.T) {
}

func TestKafka(t *testing.T) {
walimpls.NewWALImplsTestFramework(t, 1000, &builderImpl{}).Run()
walimpls.NewWALImplsTestFramework(t, 100, &builderImpl{}).Run()
}

func TestGetBasicConfig(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/streaming/walimpls/impls/kafka/message_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package kafka
import (
"strconv"

"github.com/cockroachdb/errors"
"github.com/confluentinc/confluent-kafka-go/kafka"

"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/pkg/errors"
)

func UnmarshalMessageID(data string) (message.MessageID, error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/streaming/walimpls/impls/kafka/message_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ import (
"testing"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus/pkg/streaming/util/message"
)

func TestMessageID(t *testing.T) {
Expand Down
45 changes: 45 additions & 0 deletions pkg/streaming/walimpls/impls/kafka/opener.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,33 @@ package kafka

import (
"context"
"fmt"

"github.com/confluentinc/confluent-kafka-go/kafka"
"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/helper"
"github.com/milvus-io/milvus/pkg/util/syncutil"
)

var _ walimpls.OpenerImpls = (*openerImpl)(nil)

// newOpenerImpl creates a new openerImpl instance.
func newOpenerImpl(p *kafka.Producer, consumerConfig kafka.ConfigMap) *openerImpl {
o := &openerImpl{
n: syncutil.NewAsyncTaskNotifier[struct{}](),
p: p,
consumerConfig: consumerConfig,
}
go o.execute()
return o
}

// openerImpl is the opener implementation for kafka wal.
type openerImpl struct {
n *syncutil.AsyncTaskNotifier[struct{}]
p *kafka.Producer
consumerConfig kafka.ConfigMap
}
Expand All @@ -23,6 +41,33 @@ func (o *openerImpl) Open(ctx context.Context, opt *walimpls.OpenOption) (walimp
}, nil
}

func (o *openerImpl) execute() {
defer o.n.Finish(struct{}{})

for {
select {
case <-o.n.Context().Done():
return
case ev, ok := <-o.p.Events():
if !ok {
panic("kafka producer events channel should never be closed before the execute observer exit")
}
switch ev := ev.(type) {
case kafka.Error:
log.Error("kafka producer error", zap.Error(ev))
if ev.IsFatal() {
panic(fmt.Sprintf("kafka producer error is fatal, %s", ev.Error()))
}
default:
// ignore other events
log.Info("kafka producer incoming non-message, non-error event", zap.String("event", ev.String()))
}
}
}
}

func (o *openerImpl) Close() {
o.n.Cancel()
o.n.BlockUntilFinish()
o.p.Close()
}
1 change: 1 addition & 0 deletions pkg/streaming/walimpls/impls/kafka/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"

"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
"github.com/milvus-io/milvus/pkg/streaming/walimpls/helper"
Expand Down
1 change: 1 addition & 0 deletions pkg/streaming/walimpls/impls/kafka/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/cockroachdb/errors"
"github.com/confluentinc/confluent-kafka-go/kafka"

"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/walimpls"
Expand Down
3 changes: 2 additions & 1 deletion pkg/streaming/walimpls/impls/pulsar/message_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import (
"testing"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus/pkg/streaming/util/message"
)

func TestMessageID(t *testing.T) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/streaming/walimpls/impls/rmq/message_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package rmq
import (
"testing"

"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus/pkg/streaming/util/message"
)

func TestMessageID(t *testing.T) {
Expand Down

0 comments on commit 97b3bd4

Please sign in to comment.