From 312b193d51cd37eb82664195c58316cb3752e7fa Mon Sep 17 00:00:00 2001 From: chyezh Date: Fri, 20 Dec 2024 09:50:34 +0800 Subject: [PATCH] enhance: enable kafka for streaming Signed-off-by: chyezh --- internal/distributed/streaming/streaming.go | 2 + internal/distributed/streaming/wal.go | 5 +++ .../mock_streaming/mock_WALAccesser.go | 45 +++++++++++++++++++ .../querynodev2/delegator/delegator_data.go | 2 +- internal/streamingnode/server/server.go | 1 + internal/util/pipeline/stream_pipeline.go | 2 +- .../msgstream/mqwrapper/kafka/kafka_client.go | 6 +-- .../mqwrapper/kafka/kafka_client_test.go | 2 +- .../mqwrapper/kafka/kafka_consumer.go | 6 +-- .../mqwrapper/kafka/kafka_consumer_test.go | 14 +++--- pkg/mq/msgstream/mqwrapper/kafka/kafka_id.go | 28 +++++++----- .../mqwrapper/kafka/kafka_id_test.go | 18 ++++---- .../mqwrapper/kafka/kafka_message.go | 2 +- .../mqwrapper/kafka/kafka_message_test.go | 2 +- .../mqwrapper/kafka/kafka_producer.go | 2 +- .../util/message/adaptor/message_id.go | 13 ++++++ .../util/message/adaptor/message_id_test.go | 4 ++ pkg/streaming/walimpls/impls/kafka/builder.go | 6 +-- .../walimpls/impls/kafka/kafka_test.go | 5 ++- .../walimpls/impls/kafka/message_id.go | 7 ++- .../walimpls/impls/kafka/message_id_test.go | 3 +- pkg/streaming/walimpls/impls/kafka/opener.go | 45 +++++++++++++++++++ pkg/streaming/walimpls/impls/kafka/scanner.go | 1 + pkg/streaming/walimpls/impls/kafka/wal.go | 1 + .../walimpls/impls/pulsar/message_id_test.go | 3 +- .../walimpls/impls/rmq/message_id_test.go | 3 +- 26 files changed, 179 insertions(+), 49 deletions(-) diff --git a/internal/distributed/streaming/streaming.go b/internal/distributed/streaming/streaming.go index 8ef6df73619d0..810b15065d6a3 100644 --- a/internal/distributed/streaming/streaming.go +++ b/internal/distributed/streaming/streaming.go @@ -78,6 +78,8 @@ type Scanner interface { // WALAccesser is the interfaces to interact with the milvus write ahead log. type WALAccesser interface { + WALName() string + // Txn returns a transaction for writing records to the log. // Once the txn is returned, the Commit or Rollback operation must be called once, otherwise resource leak on wal. Txn(ctx context.Context, opts TxnOption) (Txn, error) diff --git a/internal/distributed/streaming/wal.go b/internal/distributed/streaming/wal.go index 8caba0186bc05..b4d7fb5f90080 100644 --- a/internal/distributed/streaming/wal.go +++ b/internal/distributed/streaming/wal.go @@ -12,6 +12,7 @@ import ( "github.com/milvus-io/milvus/internal/streamingcoord/client" "github.com/milvus-io/milvus/internal/streamingnode/client/handler" "github.com/milvus-io/milvus/internal/util/streamingutil/status" + "github.com/milvus-io/milvus/internal/util/streamingutil/util" "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/util/conc" @@ -54,6 +55,10 @@ type walAccesserImpl struct { dispatchExecutionPool *conc.Pool[struct{}] } +func (w *walAccesserImpl) WALName() string { + return util.MustSelectWALName() +} + // RawAppend writes a record to the log. func (w *walAccesserImpl) RawAppend(ctx context.Context, msg message.MutableMessage, opts ...AppendOption) (*types.AppendResult, error) { assertValidMessage(msg) diff --git a/internal/mocks/distributed/mock_streaming/mock_WALAccesser.go b/internal/mocks/distributed/mock_streaming/mock_WALAccesser.go index 6b391ff89a062..e077e04030f15 100644 --- a/internal/mocks/distributed/mock_streaming/mock_WALAccesser.go +++ b/internal/mocks/distributed/mock_streaming/mock_WALAccesser.go @@ -331,6 +331,51 @@ func (_c *MockWALAccesser_Txn_Call) RunAndReturn(run func(context.Context, strea return _c } +// WALName provides a mock function with given fields: +func (_m *MockWALAccesser) WALName() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for WALName") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// MockWALAccesser_WALName_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'WALName' +type MockWALAccesser_WALName_Call struct { + *mock.Call +} + +// WALName is a helper method to define mock.On call +func (_e *MockWALAccesser_Expecter) WALName() *MockWALAccesser_WALName_Call { + return &MockWALAccesser_WALName_Call{Call: _e.mock.On("WALName")} +} + +func (_c *MockWALAccesser_WALName_Call) Run(run func()) *MockWALAccesser_WALName_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockWALAccesser_WALName_Call) Return(_a0 string) *MockWALAccesser_WALName_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockWALAccesser_WALName_Call) RunAndReturn(run func() string) *MockWALAccesser_WALName_Call { + _c.Call.Return(run) + return _c +} + // NewMockWALAccesser creates a new instance of MockWALAccesser. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockWALAccesser(t interface { diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index a6ffc4f1b0d86..b21d7784ac616 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -735,7 +735,7 @@ func (sd *shardDelegator) createDeleteStreamFromStreamingService(ctx context.Con s := streaming.WAL().Read(ctx, streaming.ReadOption{ VChannel: position.GetChannelName(), DeliverPolicy: options.DeliverPolicyStartFrom( - adaptor.MustGetMessageIDFromMQWrapperIDBytes("pulsar", position.GetMsgID()), + adaptor.MustGetMessageIDFromMQWrapperIDBytes(streaming.WAL().WALName(), position.GetMsgID()), ), DeliverFilters: []options.DeliverFilter{ // only deliver message which timestamp >= position.Timestamp diff --git a/internal/streamingnode/server/server.go b/internal/streamingnode/server/server.go index 52413071dab22..8956d8d78eaac 100644 --- a/internal/streamingnode/server/server.go +++ b/internal/streamingnode/server/server.go @@ -11,6 +11,7 @@ import ( "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" + _ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/kafka" _ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar" _ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq" ) diff --git a/internal/util/pipeline/stream_pipeline.go b/internal/util/pipeline/stream_pipeline.go index 204b6cfb0019c..888f1a9da009a 100644 --- a/internal/util/pipeline/stream_pipeline.go +++ b/internal/util/pipeline/stream_pipeline.go @@ -95,7 +95,7 @@ func (p *streamPipeline) ConsumeMsgStream(ctx context.Context, position *msgpb.M } if streamingutil.IsStreamingServiceEnabled() { - startFrom := adaptor.MustGetMessageIDFromMQWrapperIDBytes("pulsar", position.GetMsgID()) + startFrom := adaptor.MustGetMessageIDFromMQWrapperIDBytes(streaming.WAL().WALName(), position.GetMsgID()) log.Info( "stream pipeline seeks from position with scanner", zap.String("channel", position.GetChannelName()), diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go index f2d0bddb10df9..3755060b7ec5a 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go @@ -241,7 +241,7 @@ func (kc *kafkaClient) Subscribe(ctx context.Context, options mqwrapper.Consumer } func (kc *kafkaClient) EarliestMessageID() common.MessageID { - return &kafkaID{messageID: int64(kafka.OffsetBeginning)} + return &KafkaID{MessageID: int64(kafka.OffsetBeginning)} } func (kc *kafkaClient) StringToMsgID(id string) (common.MessageID, error) { @@ -250,7 +250,7 @@ func (kc *kafkaClient) StringToMsgID(id string) (common.MessageID, error) { return nil, err } - return &kafkaID{messageID: offset}, nil + return &KafkaID{MessageID: offset}, nil } func (kc *kafkaClient) specialExtraConfig(current *kafka.ConfigMap, special kafka.ConfigMap) { @@ -265,7 +265,7 @@ func (kc *kafkaClient) specialExtraConfig(current *kafka.ConfigMap, special kafk func (kc *kafkaClient) BytesToMsgID(id []byte) (common.MessageID, error) { offset := DeserializeKafkaID(id) - return &kafkaID{messageID: offset}, nil + return &KafkaID{MessageID: offset}, nil } func (kc *kafkaClient) Close() { diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client_test.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client_test.go index 565fc67cad61f..27417ae56a2f2 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_client_test.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_client_test.go @@ -196,7 +196,7 @@ func TestKafkaClient_ConsumeWithAck(t *testing.T) { Consume1(ctx1, t, kc, topic, subName, c, &total1) lastMsgID := <-c - log.Info("lastMsgID", zap.Any("lastMsgID", lastMsgID.(*kafkaID).messageID)) + log.Info("lastMsgID", zap.Any("lastMsgID", lastMsgID.(*KafkaID).MessageID)) ctx2, cancel2 := context.WithTimeout(ctx, 3*time.Second) Consume2(ctx2, t, kc, topic, subName, lastMsgID, &total2) diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go index bf87b260a7bee..8809d6b28f490 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go @@ -74,7 +74,7 @@ func newKafkaConsumer(config *kafka.ConfigMap, bufSize int64, topic string, grou return nil, err } } else { - offset = kafka.Offset(latestMsgID.(*kafkaID).messageID) + offset = kafka.Offset(latestMsgID.(*KafkaID).MessageID) kc.skipMsg = true } } @@ -161,7 +161,7 @@ func (kc *Consumer) Seek(id common.MessageID, inclusive bool) error { return errors.New("kafka consumer is already assigned, can not seek again") } - offset := kafka.Offset(id.(*kafkaID).messageID) + offset := kafka.Offset(id.(*KafkaID).MessageID) return kc.internalSeek(offset, inclusive) } @@ -219,7 +219,7 @@ func (kc *Consumer) GetLatestMsgID() (common.MessageID, error) { } log.Info("get latest msg ID ", zap.String("topic", kc.topic), zap.Int64("oldest offset", low), zap.Int64("latest offset", high)) - return &kafkaID{messageID: high}, nil + return &KafkaID{MessageID: high}, nil } func (kc *Consumer) CheckTopicValid(topic string) error { diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go index 45bec8dad7535..c058706f918fa 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go @@ -40,14 +40,14 @@ func TestKafkaConsumer_SeekExclusive(t *testing.T) { data2 := []string{"111", "222", "333"} testKafkaConsumerProduceData(t, topic, data1, data2) - msgID := &kafkaID{messageID: 1} + msgID := &KafkaID{MessageID: 1} err = consumer.Seek(msgID, false) assert.NoError(t, err) msg := <-consumer.Chan() assert.Equal(t, 333, BytesToInt(msg.Payload())) assert.Equal(t, "333", msg.Properties()[common.TraceIDKey]) - assert.Equal(t, int64(2), msg.ID().(*kafkaID).messageID) + assert.Equal(t, int64(2), msg.ID().(*KafkaID).MessageID) assert.Equal(t, topic, msg.Topic()) assert.True(t, len(msg.Properties()) == 1) } @@ -66,14 +66,14 @@ func TestKafkaConsumer_SeekInclusive(t *testing.T) { data2 := []string{"111", "222", "333"} testKafkaConsumerProduceData(t, topic, data1, data2) - msgID := &kafkaID{messageID: 1} + msgID := &KafkaID{MessageID: 1} err = consumer.Seek(msgID, true) assert.NoError(t, err) msg := <-consumer.Chan() assert.Equal(t, 222, BytesToInt(msg.Payload())) assert.Equal(t, "222", msg.Properties()[common.TraceIDKey]) - assert.Equal(t, int64(1), msg.ID().(*kafkaID).messageID) + assert.Equal(t, int64(1), msg.ID().(*KafkaID).MessageID) assert.Equal(t, topic, msg.Topic()) assert.True(t, len(msg.Properties()) == 1) } @@ -88,7 +88,7 @@ func TestKafkaConsumer_GetSeek(t *testing.T) { assert.NoError(t, err) defer consumer.Close() - msgID := &kafkaID{messageID: 0} + msgID := &KafkaID{MessageID: 0} err = consumer.Seek(msgID, false) assert.NoError(t, err) @@ -163,7 +163,7 @@ func TestKafkaConsumer_GetLatestMsgID(t *testing.T) { defer consumer.Close() latestMsgID, err := consumer.GetLatestMsgID() - assert.Equal(t, int64(0), latestMsgID.(*kafkaID).messageID) + assert.Equal(t, int64(0), latestMsgID.(*KafkaID).MessageID) assert.NoError(t, err) data1 := []int{111, 222, 333} @@ -171,7 +171,7 @@ func TestKafkaConsumer_GetLatestMsgID(t *testing.T) { testKafkaConsumerProduceData(t, topic, data1, data2) latestMsgID, err = consumer.GetLatestMsgID() - assert.Equal(t, int64(2), latestMsgID.(*kafkaID).messageID) + assert.Equal(t, int64(2), latestMsgID.(*KafkaID).MessageID) assert.NoError(t, err) } diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_id.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_id.go index 2509065c1d4b1..8f2d1926739b6 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_id.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_id.go @@ -5,26 +5,32 @@ import ( mqcommon "github.com/milvus-io/milvus/pkg/mq/common" ) -type kafkaID struct { - messageID int64 +func NewKafkaID(messageID int64) mqcommon.MessageID { + return &KafkaID{ + MessageID: messageID, + } } -var _ mqcommon.MessageID = &kafkaID{} +type KafkaID struct { + MessageID int64 +} + +var _ mqcommon.MessageID = &KafkaID{} -func (kid *kafkaID) Serialize() []byte { - return SerializeKafkaID(kid.messageID) +func (kid *KafkaID) Serialize() []byte { + return SerializeKafkaID(kid.MessageID) } -func (kid *kafkaID) AtEarliestPosition() bool { - return kid.messageID <= 0 +func (kid *KafkaID) AtEarliestPosition() bool { + return kid.MessageID <= 0 } -func (kid *kafkaID) Equal(msgID []byte) (bool, error) { - return kid.messageID == DeserializeKafkaID(msgID), nil +func (kid *KafkaID) Equal(msgID []byte) (bool, error) { + return kid.MessageID == DeserializeKafkaID(msgID), nil } -func (kid *kafkaID) LessOrEqualThan(msgID []byte) (bool, error) { - return kid.messageID <= DeserializeKafkaID(msgID), nil +func (kid *KafkaID) LessOrEqualThan(msgID []byte) (bool, error) { + return kid.MessageID <= DeserializeKafkaID(msgID), nil } func SerializeKafkaID(messageID int64) []byte { diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_id_test.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_id_test.go index 29b501b66aa2b..802fc7efa3396 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_id_test.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_id_test.go @@ -7,24 +7,24 @@ import ( ) func TestKafkaID_Serialize(t *testing.T) { - rid := &kafkaID{messageID: 8} + rid := &KafkaID{MessageID: 8} bin := rid.Serialize() assert.NotNil(t, bin) assert.NotZero(t, len(bin)) } func TestKafkaID_AtEarliestPosition(t *testing.T) { - rid := &kafkaID{messageID: 8} + rid := &KafkaID{MessageID: 8} assert.False(t, rid.AtEarliestPosition()) - rid = &kafkaID{messageID: 0} + rid = &KafkaID{MessageID: 0} assert.True(t, rid.AtEarliestPosition()) } func TestKafkaID_LessOrEqualThan(t *testing.T) { { - rid1 := &kafkaID{messageID: 8} - rid2 := &kafkaID{messageID: 0} + rid1 := &KafkaID{MessageID: 8} + rid2 := &KafkaID{MessageID: 0} ret, err := rid1.LessOrEqualThan(rid2.Serialize()) assert.NoError(t, err) assert.False(t, ret) @@ -35,8 +35,8 @@ func TestKafkaID_LessOrEqualThan(t *testing.T) { } { - rid1 := &kafkaID{messageID: 0} - rid2 := &kafkaID{messageID: 0} + rid1 := &KafkaID{MessageID: 0} + rid2 := &KafkaID{MessageID: 0} ret, err := rid1.LessOrEqualThan(rid2.Serialize()) assert.NoError(t, err) assert.True(t, ret) @@ -44,8 +44,8 @@ func TestKafkaID_LessOrEqualThan(t *testing.T) { } func TestKafkaID_Equal(t *testing.T) { - rid1 := &kafkaID{messageID: 0} - rid2 := &kafkaID{messageID: 1} + rid1 := &KafkaID{MessageID: 0} + rid2 := &KafkaID{MessageID: 1} { ret, err := rid1.Equal(rid1.Serialize()) diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_message.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_message.go index cc33c8db4090f..93f611c9b66fd 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_message.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_message.go @@ -27,6 +27,6 @@ func (km *kafkaMessage) Payload() []byte { } func (km *kafkaMessage) ID() common.MessageID { - kid := &kafkaID{messageID: int64(km.msg.TopicPartition.Offset)} + kid := &KafkaID{MessageID: int64(km.msg.TopicPartition.Offset)} return kid } diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_message_test.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_message_test.go index 3fd936325136c..379f2c6acf687 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_message_test.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_message_test.go @@ -13,7 +13,7 @@ func TestKafkaMessage_All(t *testing.T) { km := &kafkaMessage{msg: msg} properties := make(map[string]string) assert.Equal(t, topic, km.Topic()) - assert.Equal(t, int64(0), km.ID().(*kafkaID).messageID) + assert.Equal(t, int64(0), km.ID().(*KafkaID).MessageID) assert.Nil(t, km.Payload()) assert.Equal(t, properties, km.Properties()) } diff --git a/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go b/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go index e525244d5eae1..edd3016049797 100644 --- a/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go +++ b/pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go @@ -75,7 +75,7 @@ func (kp *kafkaProducer) Send(ctx context.Context, message *mqcommon.ProducerMes metrics.MsgStreamRequestLatency.WithLabelValues(metrics.SendMsgLabel).Observe(float64(elapsed.Milliseconds())) metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.SuccessLabel).Inc() - return &kafkaID{messageID: int64(m.TopicPartition.Offset)}, nil + return &KafkaID{MessageID: int64(m.TopicPartition.Offset)}, nil } func (kp *kafkaProducer) Close() { diff --git a/pkg/streaming/util/message/adaptor/message_id.go b/pkg/streaming/util/message/adaptor/message_id.go index b9bc6dc333375..1cd76ba1a8130 100644 --- a/pkg/streaming/util/message/adaptor/message_id.go +++ b/pkg/streaming/util/message/adaptor/message_id.go @@ -4,11 +4,14 @@ import ( "fmt" "github.com/apache/pulsar-client-go/pulsar" + rawKafka "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/milvus-io/milvus/pkg/mq/common" "github.com/milvus-io/milvus/pkg/mq/mqimpl/rocksmq/server" + mqkafka "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper/kafka" mqpulsar "github.com/milvus-io/milvus/pkg/mq/msgstream/mqwrapper/pulsar" "github.com/milvus-io/milvus/pkg/streaming/util/message" + msgkafka "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/kafka" msgpulsar "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar" "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq" ) @@ -20,6 +23,8 @@ func MustGetMQWrapperIDFromMessage(messageID message.MessageID) common.MessageID return mqpulsar.NewPulsarID(id.PulsarID()) } else if id, ok := messageID.(interface{ RmqID() int64 }); ok { return &server.RmqID{MessageID: id.RmqID()} + } else if id, ok := messageID.(interface{ KafkaID() rawKafka.Offset }); ok { + return mqkafka.NewKafkaID(int64(id.KafkaID())) } panic("unsupported now") } @@ -31,6 +36,8 @@ func MustGetMessageIDFromMQWrapperID(commonMessageID common.MessageID) message.M return msgpulsar.NewPulsarID(id.PulsarID()) } else if id, ok := commonMessageID.(*server.RmqID); ok { return rmq.NewRmqID(id.MessageID) + } else if id, ok := commonMessageID.(*mqkafka.KafkaID); ok { + return msgkafka.NewKafkaID(rawKafka.Offset(id.MessageID)) } return nil } @@ -48,6 +55,9 @@ func DeserializeToMQWrapperID(msgID []byte, walName string) (common.MessageID, e case "rocksmq": rID := server.DeserializeRmqID(msgID) return &server.RmqID{MessageID: rID}, nil + case "kafka": + kID := mqkafka.DeserializeKafkaID(msgID) + return mqkafka.NewKafkaID(kID), nil default: return nil, fmt.Errorf("unsupported mq type %s", walName) } @@ -65,6 +75,9 @@ func MustGetMessageIDFromMQWrapperIDBytes(walName string, msgIDBytes []byte) mes panic(err) } commonMsgID = mqpulsar.NewPulsarID(msgID) + case "kafka": + id := mqkafka.DeserializeKafkaID(msgIDBytes) + commonMsgID = mqkafka.NewKafkaID(id) default: panic("unsupported now") } diff --git a/pkg/streaming/util/message/adaptor/message_id_test.go b/pkg/streaming/util/message/adaptor/message_id_test.go index 6b0944e8cec28..81da9f5e87a1e 100644 --- a/pkg/streaming/util/message/adaptor/message_id_test.go +++ b/pkg/streaming/util/message/adaptor/message_id_test.go @@ -6,6 +6,7 @@ import ( "github.com/apache/pulsar-client-go/pulsar" "github.com/stretchr/testify/assert" + msgkafka "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/kafka" msgpulsar "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar" "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq" ) @@ -17,4 +18,7 @@ func TestIDConvension(t *testing.T) { msgID := pulsar.EarliestMessageID() id = MustGetMessageIDFromMQWrapperID(MustGetMQWrapperIDFromMessage(msgpulsar.NewPulsarID(msgID))) assert.True(t, id.EQ(msgpulsar.NewPulsarID(msgID))) + + kafkaID := MustGetMessageIDFromMQWrapperID(MustGetMQWrapperIDFromMessage(msgkafka.NewKafkaID(1))) + assert.True(t, kafkaID.EQ(msgkafka.NewKafkaID(1))) } diff --git a/pkg/streaming/walimpls/impls/kafka/builder.go b/pkg/streaming/walimpls/impls/kafka/builder.go index 256c52686907e..3c90e2865e646 100644 --- a/pkg/streaming/walimpls/impls/kafka/builder.go +++ b/pkg/streaming/walimpls/impls/kafka/builder.go @@ -2,6 +2,7 @@ package kafka import ( "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/walimpls" "github.com/milvus-io/milvus/pkg/streaming/walimpls/registry" @@ -35,10 +36,7 @@ func (b *builderImpl) Build() (walimpls.OpenerImpls, error) { if err != nil { return nil, err } - return &openerImpl{ - p: p, - consumerConfig: consumerConfig, - }, nil + return newOpenerImpl(p, consumerConfig), nil } // getProducerAndConsumerConfig returns the producer and consumer config. diff --git a/pkg/streaming/walimpls/impls/kafka/kafka_test.go b/pkg/streaming/walimpls/impls/kafka/kafka_test.go index ce2c32d18d13c..dcd407d3f906d 100644 --- a/pkg/streaming/walimpls/impls/kafka/kafka_test.go +++ b/pkg/streaming/walimpls/impls/kafka/kafka_test.go @@ -3,11 +3,12 @@ package kafka import ( "testing" + "github.com/zeebo/assert" + "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/walimpls" "github.com/milvus-io/milvus/pkg/streaming/walimpls/registry" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/zeebo/assert" ) func TestMain(m *testing.M) { @@ -27,7 +28,7 @@ func TestRegistry(t *testing.T) { } func TestKafka(t *testing.T) { - walimpls.NewWALImplsTestFramework(t, 1000, &builderImpl{}).Run() + walimpls.NewWALImplsTestFramework(t, 100, &builderImpl{}).Run() } func TestGetBasicConfig(t *testing.T) { diff --git a/pkg/streaming/walimpls/impls/kafka/message_id.go b/pkg/streaming/walimpls/impls/kafka/message_id.go index 6ce14f19522ac..f99ea4ba13c34 100644 --- a/pkg/streaming/walimpls/impls/kafka/message_id.go +++ b/pkg/streaming/walimpls/impls/kafka/message_id.go @@ -3,9 +3,10 @@ package kafka import ( "strconv" + "github.com/cockroachdb/errors" "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/milvus-io/milvus/pkg/streaming/util/message" - "github.com/pkg/errors" ) func UnmarshalMessageID(data string) (message.MessageID, error) { @@ -24,6 +25,10 @@ func unmarshalMessageID(data string) (kafkaID, error) { return kafkaID(v), nil } +func NewKafkaID(offset kafka.Offset) message.MessageID { + return kafkaID(offset) +} + type kafkaID kafka.Offset // RmqID returns the message id for conversion diff --git a/pkg/streaming/walimpls/impls/kafka/message_id_test.go b/pkg/streaming/walimpls/impls/kafka/message_id_test.go index ae1184b254d14..507a05e1b7daa 100644 --- a/pkg/streaming/walimpls/impls/kafka/message_id_test.go +++ b/pkg/streaming/walimpls/impls/kafka/message_id_test.go @@ -4,8 +4,9 @@ import ( "testing" "github.com/confluentinc/confluent-kafka-go/kafka" - "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/streaming/util/message" ) func TestMessageID(t *testing.T) { diff --git a/pkg/streaming/walimpls/impls/kafka/opener.go b/pkg/streaming/walimpls/impls/kafka/opener.go index 6bb9a2954761a..d07e2e696c7ca 100644 --- a/pkg/streaming/walimpls/impls/kafka/opener.go +++ b/pkg/streaming/walimpls/impls/kafka/opener.go @@ -2,15 +2,33 @@ package kafka import ( "context" + "fmt" "github.com/confluentinc/confluent-kafka-go/kafka" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/walimpls" "github.com/milvus-io/milvus/pkg/streaming/walimpls/helper" + "github.com/milvus-io/milvus/pkg/util/syncutil" ) var _ walimpls.OpenerImpls = (*openerImpl)(nil) +// newOpenerImpl creates a new openerImpl instance. +func newOpenerImpl(p *kafka.Producer, consumerConfig kafka.ConfigMap) *openerImpl { + o := &openerImpl{ + n: syncutil.NewAsyncTaskNotifier[struct{}](), + p: p, + consumerConfig: consumerConfig, + } + go o.execute() + return o +} + +// openerImpl is the opener implementation for kafka wal. type openerImpl struct { + n *syncutil.AsyncTaskNotifier[struct{}] p *kafka.Producer consumerConfig kafka.ConfigMap } @@ -23,6 +41,33 @@ func (o *openerImpl) Open(ctx context.Context, opt *walimpls.OpenOption) (walimp }, nil } +func (o *openerImpl) execute() { + defer o.n.Finish(struct{}{}) + + for { + select { + case <-o.n.Context().Done(): + return + case ev, ok := <-o.p.Events(): + if !ok { + panic("kafka producer events channel should never be closed before the execute observer exit") + } + switch ev := ev.(type) { + case kafka.Error: + log.Error("kafka producer error", zap.Error(ev)) + if ev.IsFatal() { + panic(fmt.Sprintf("kafka producer error is fatal, %s", ev.Error())) + } + default: + // ignore other events + log.Info("kafka producer incoming non-message, non-error event", zap.String("event", ev.String())) + } + } + } +} + func (o *openerImpl) Close() { + o.n.Cancel() + o.n.BlockUntilFinish() o.p.Close() } diff --git a/pkg/streaming/walimpls/impls/kafka/scanner.go b/pkg/streaming/walimpls/impls/kafka/scanner.go index a55981d9dc7f8..934b00da84b94 100644 --- a/pkg/streaming/walimpls/impls/kafka/scanner.go +++ b/pkg/streaming/walimpls/impls/kafka/scanner.go @@ -4,6 +4,7 @@ import ( "time" "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/walimpls" "github.com/milvus-io/milvus/pkg/streaming/walimpls/helper" diff --git a/pkg/streaming/walimpls/impls/kafka/wal.go b/pkg/streaming/walimpls/impls/kafka/wal.go index af83a9e710036..6994e0e818afd 100644 --- a/pkg/streaming/walimpls/impls/kafka/wal.go +++ b/pkg/streaming/walimpls/impls/kafka/wal.go @@ -5,6 +5,7 @@ import ( "github.com/cockroachdb/errors" "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb" "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/walimpls" diff --git a/pkg/streaming/walimpls/impls/pulsar/message_id_test.go b/pkg/streaming/walimpls/impls/pulsar/message_id_test.go index 4475f5d9e22cc..e6d31f7716ce3 100644 --- a/pkg/streaming/walimpls/impls/pulsar/message_id_test.go +++ b/pkg/streaming/walimpls/impls/pulsar/message_id_test.go @@ -4,9 +4,10 @@ import ( "testing" "github.com/apache/pulsar-client-go/pulsar" - "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/stretchr/testify/assert" "google.golang.org/protobuf/proto" + + "github.com/milvus-io/milvus/pkg/streaming/util/message" ) func TestMessageID(t *testing.T) { diff --git a/pkg/streaming/walimpls/impls/rmq/message_id_test.go b/pkg/streaming/walimpls/impls/rmq/message_id_test.go index 54cbfbd5ceb04..e37bfdf056328 100644 --- a/pkg/streaming/walimpls/impls/rmq/message_id_test.go +++ b/pkg/streaming/walimpls/impls/rmq/message_id_test.go @@ -3,8 +3,9 @@ package rmq import ( "testing" - "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/streaming/util/message" ) func TestMessageID(t *testing.T) {