Skip to content

Commit

Permalink
enhance: enable kafka for streaming
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Dec 21, 2024
1 parent 6a5ef9c commit 312b193
Show file tree
Hide file tree
Showing 26 changed files with 179 additions and 49 deletions.
2 changes: 2 additions & 0 deletions internal/distributed/streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type Scanner interface {

// WALAccesser is the interfaces to interact with the milvus write ahead log.
type WALAccesser interface {
WALName() string

// Txn returns a transaction for writing records to the log.
// Once the txn is returned, the Commit or Rollback operation must be called once, otherwise resource leak on wal.
Txn(ctx context.Context, opts TxnOption) (Txn, error)
Expand Down
5 changes: 5 additions & 0 deletions internal/distributed/streaming/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/milvus-io/milvus/internal/streamingcoord/client"
"github.com/milvus-io/milvus/internal/streamingnode/client/handler"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/internal/util/streamingutil/util"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/conc"
Expand Down Expand Up @@ -54,6 +55,10 @@ type walAccesserImpl struct {
dispatchExecutionPool *conc.Pool[struct{}]
}

func (w *walAccesserImpl) WALName() string {
return util.MustSelectWALName()
}

// RawAppend writes a record to the log.
func (w *walAccesserImpl) RawAppend(ctx context.Context, msg message.MutableMessage, opts ...AppendOption) (*types.AppendResult, error) {
assertValidMessage(msg)
Expand Down
45 changes: 45 additions & 0 deletions internal/mocks/distributed/mock_streaming/mock_WALAccesser.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/querynodev2/delegator/delegator_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ func (sd *shardDelegator) createDeleteStreamFromStreamingService(ctx context.Con
s := streaming.WAL().Read(ctx, streaming.ReadOption{
VChannel: position.GetChannelName(),
DeliverPolicy: options.DeliverPolicyStartFrom(
adaptor.MustGetMessageIDFromMQWrapperIDBytes("pulsar", position.GetMsgID()),
adaptor.MustGetMessageIDFromMQWrapperIDBytes(streaming.WAL().WALName(), position.GetMsgID()),

Check warning on line 738 in internal/querynodev2/delegator/delegator_data.go

View check run for this annotation

Codecov / codecov/patch

internal/querynodev2/delegator/delegator_data.go#L738

Added line #L738 was not covered by tests
),
DeliverFilters: []options.DeliverFilter{
// only deliver message which timestamp >= position.Timestamp
Expand Down
1 change: 1 addition & 0 deletions internal/streamingnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/proto/streamingpb"
_ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/kafka"
_ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/pulsar"
_ "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/rmq"
)
Expand Down
2 changes: 1 addition & 1 deletion internal/util/pipeline/stream_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (p *streamPipeline) ConsumeMsgStream(ctx context.Context, position *msgpb.M
}

if streamingutil.IsStreamingServiceEnabled() {
startFrom := adaptor.MustGetMessageIDFromMQWrapperIDBytes("pulsar", position.GetMsgID())
startFrom := adaptor.MustGetMessageIDFromMQWrapperIDBytes(streaming.WAL().WALName(), position.GetMsgID())
log.Info(
"stream pipeline seeks from position with scanner",
zap.String("channel", position.GetChannelName()),
Expand Down
6 changes: 3 additions & 3 deletions pkg/mq/msgstream/mqwrapper/kafka/kafka_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (kc *kafkaClient) Subscribe(ctx context.Context, options mqwrapper.Consumer
}

func (kc *kafkaClient) EarliestMessageID() common.MessageID {
return &kafkaID{messageID: int64(kafka.OffsetBeginning)}
return &KafkaID{MessageID: int64(kafka.OffsetBeginning)}
}

func (kc *kafkaClient) StringToMsgID(id string) (common.MessageID, error) {
Expand All @@ -250,7 +250,7 @@ func (kc *kafkaClient) StringToMsgID(id string) (common.MessageID, error) {
return nil, err
}

return &kafkaID{messageID: offset}, nil
return &KafkaID{MessageID: offset}, nil
}

func (kc *kafkaClient) specialExtraConfig(current *kafka.ConfigMap, special kafka.ConfigMap) {
Expand All @@ -265,7 +265,7 @@ func (kc *kafkaClient) specialExtraConfig(current *kafka.ConfigMap, special kafk

func (kc *kafkaClient) BytesToMsgID(id []byte) (common.MessageID, error) {
offset := DeserializeKafkaID(id)
return &kafkaID{messageID: offset}, nil
return &KafkaID{MessageID: offset}, nil
}

func (kc *kafkaClient) Close() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/mq/msgstream/mqwrapper/kafka/kafka_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func TestKafkaClient_ConsumeWithAck(t *testing.T) {
Consume1(ctx1, t, kc, topic, subName, c, &total1)

lastMsgID := <-c
log.Info("lastMsgID", zap.Any("lastMsgID", lastMsgID.(*kafkaID).messageID))
log.Info("lastMsgID", zap.Any("lastMsgID", lastMsgID.(*KafkaID).MessageID))

ctx2, cancel2 := context.WithTimeout(ctx, 3*time.Second)
Consume2(ctx2, t, kc, topic, subName, lastMsgID, &total2)
Expand Down
6 changes: 3 additions & 3 deletions pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func newKafkaConsumer(config *kafka.ConfigMap, bufSize int64, topic string, grou
return nil, err
}
} else {
offset = kafka.Offset(latestMsgID.(*kafkaID).messageID)
offset = kafka.Offset(latestMsgID.(*KafkaID).MessageID)
kc.skipMsg = true
}
}
Expand Down Expand Up @@ -161,7 +161,7 @@ func (kc *Consumer) Seek(id common.MessageID, inclusive bool) error {
return errors.New("kafka consumer is already assigned, can not seek again")
}

offset := kafka.Offset(id.(*kafkaID).messageID)
offset := kafka.Offset(id.(*KafkaID).MessageID)
return kc.internalSeek(offset, inclusive)
}

Expand Down Expand Up @@ -219,7 +219,7 @@ func (kc *Consumer) GetLatestMsgID() (common.MessageID, error) {
}

log.Info("get latest msg ID ", zap.String("topic", kc.topic), zap.Int64("oldest offset", low), zap.Int64("latest offset", high))
return &kafkaID{messageID: high}, nil
return &KafkaID{MessageID: high}, nil
}

func (kc *Consumer) CheckTopicValid(topic string) error {
Expand Down
14 changes: 7 additions & 7 deletions pkg/mq/msgstream/mqwrapper/kafka/kafka_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ func TestKafkaConsumer_SeekExclusive(t *testing.T) {
data2 := []string{"111", "222", "333"}
testKafkaConsumerProduceData(t, topic, data1, data2)

msgID := &kafkaID{messageID: 1}
msgID := &KafkaID{MessageID: 1}
err = consumer.Seek(msgID, false)
assert.NoError(t, err)

msg := <-consumer.Chan()
assert.Equal(t, 333, BytesToInt(msg.Payload()))
assert.Equal(t, "333", msg.Properties()[common.TraceIDKey])
assert.Equal(t, int64(2), msg.ID().(*kafkaID).messageID)
assert.Equal(t, int64(2), msg.ID().(*KafkaID).MessageID)
assert.Equal(t, topic, msg.Topic())
assert.True(t, len(msg.Properties()) == 1)
}
Expand All @@ -66,14 +66,14 @@ func TestKafkaConsumer_SeekInclusive(t *testing.T) {
data2 := []string{"111", "222", "333"}
testKafkaConsumerProduceData(t, topic, data1, data2)

msgID := &kafkaID{messageID: 1}
msgID := &KafkaID{MessageID: 1}
err = consumer.Seek(msgID, true)
assert.NoError(t, err)

msg := <-consumer.Chan()
assert.Equal(t, 222, BytesToInt(msg.Payload()))
assert.Equal(t, "222", msg.Properties()[common.TraceIDKey])
assert.Equal(t, int64(1), msg.ID().(*kafkaID).messageID)
assert.Equal(t, int64(1), msg.ID().(*KafkaID).MessageID)
assert.Equal(t, topic, msg.Topic())
assert.True(t, len(msg.Properties()) == 1)
}
Expand All @@ -88,7 +88,7 @@ func TestKafkaConsumer_GetSeek(t *testing.T) {
assert.NoError(t, err)
defer consumer.Close()

msgID := &kafkaID{messageID: 0}
msgID := &KafkaID{MessageID: 0}
err = consumer.Seek(msgID, false)
assert.NoError(t, err)

Expand Down Expand Up @@ -163,15 +163,15 @@ func TestKafkaConsumer_GetLatestMsgID(t *testing.T) {
defer consumer.Close()

latestMsgID, err := consumer.GetLatestMsgID()
assert.Equal(t, int64(0), latestMsgID.(*kafkaID).messageID)
assert.Equal(t, int64(0), latestMsgID.(*KafkaID).MessageID)
assert.NoError(t, err)

data1 := []int{111, 222, 333}
data2 := []string{"111", "222", "333"}
testKafkaConsumerProduceData(t, topic, data1, data2)

latestMsgID, err = consumer.GetLatestMsgID()
assert.Equal(t, int64(2), latestMsgID.(*kafkaID).messageID)
assert.Equal(t, int64(2), latestMsgID.(*KafkaID).MessageID)
assert.NoError(t, err)
}

Expand Down
28 changes: 17 additions & 11 deletions pkg/mq/msgstream/mqwrapper/kafka/kafka_id.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,32 @@ import (
mqcommon "github.com/milvus-io/milvus/pkg/mq/common"
)

type kafkaID struct {
messageID int64
func NewKafkaID(messageID int64) mqcommon.MessageID {
return &KafkaID{
MessageID: messageID,
}
}

var _ mqcommon.MessageID = &kafkaID{}
type KafkaID struct {
MessageID int64
}

var _ mqcommon.MessageID = &KafkaID{}

func (kid *kafkaID) Serialize() []byte {
return SerializeKafkaID(kid.messageID)
func (kid *KafkaID) Serialize() []byte {
return SerializeKafkaID(kid.MessageID)
}

func (kid *kafkaID) AtEarliestPosition() bool {
return kid.messageID <= 0
func (kid *KafkaID) AtEarliestPosition() bool {
return kid.MessageID <= 0
}

func (kid *kafkaID) Equal(msgID []byte) (bool, error) {
return kid.messageID == DeserializeKafkaID(msgID), nil
func (kid *KafkaID) Equal(msgID []byte) (bool, error) {
return kid.MessageID == DeserializeKafkaID(msgID), nil
}

func (kid *kafkaID) LessOrEqualThan(msgID []byte) (bool, error) {
return kid.messageID <= DeserializeKafkaID(msgID), nil
func (kid *KafkaID) LessOrEqualThan(msgID []byte) (bool, error) {
return kid.MessageID <= DeserializeKafkaID(msgID), nil
}

func SerializeKafkaID(messageID int64) []byte {
Expand Down
18 changes: 9 additions & 9 deletions pkg/mq/msgstream/mqwrapper/kafka/kafka_id_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,24 @@ import (
)

func TestKafkaID_Serialize(t *testing.T) {
rid := &kafkaID{messageID: 8}
rid := &KafkaID{MessageID: 8}
bin := rid.Serialize()
assert.NotNil(t, bin)
assert.NotZero(t, len(bin))
}

func TestKafkaID_AtEarliestPosition(t *testing.T) {
rid := &kafkaID{messageID: 8}
rid := &KafkaID{MessageID: 8}
assert.False(t, rid.AtEarliestPosition())

rid = &kafkaID{messageID: 0}
rid = &KafkaID{MessageID: 0}
assert.True(t, rid.AtEarliestPosition())
}

func TestKafkaID_LessOrEqualThan(t *testing.T) {
{
rid1 := &kafkaID{messageID: 8}
rid2 := &kafkaID{messageID: 0}
rid1 := &KafkaID{MessageID: 8}
rid2 := &KafkaID{MessageID: 0}
ret, err := rid1.LessOrEqualThan(rid2.Serialize())
assert.NoError(t, err)
assert.False(t, ret)
Expand All @@ -35,17 +35,17 @@ func TestKafkaID_LessOrEqualThan(t *testing.T) {
}

{
rid1 := &kafkaID{messageID: 0}
rid2 := &kafkaID{messageID: 0}
rid1 := &KafkaID{MessageID: 0}
rid2 := &KafkaID{MessageID: 0}
ret, err := rid1.LessOrEqualThan(rid2.Serialize())
assert.NoError(t, err)
assert.True(t, ret)
}
}

func TestKafkaID_Equal(t *testing.T) {
rid1 := &kafkaID{messageID: 0}
rid2 := &kafkaID{messageID: 1}
rid1 := &KafkaID{MessageID: 0}
rid2 := &KafkaID{MessageID: 1}

{
ret, err := rid1.Equal(rid1.Serialize())
Expand Down
2 changes: 1 addition & 1 deletion pkg/mq/msgstream/mqwrapper/kafka/kafka_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,6 @@ func (km *kafkaMessage) Payload() []byte {
}

func (km *kafkaMessage) ID() common.MessageID {
kid := &kafkaID{messageID: int64(km.msg.TopicPartition.Offset)}
kid := &KafkaID{MessageID: int64(km.msg.TopicPartition.Offset)}
return kid
}
2 changes: 1 addition & 1 deletion pkg/mq/msgstream/mqwrapper/kafka/kafka_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestKafkaMessage_All(t *testing.T) {
km := &kafkaMessage{msg: msg}
properties := make(map[string]string)
assert.Equal(t, topic, km.Topic())
assert.Equal(t, int64(0), km.ID().(*kafkaID).messageID)
assert.Equal(t, int64(0), km.ID().(*KafkaID).MessageID)
assert.Nil(t, km.Payload())
assert.Equal(t, properties, km.Properties())
}
2 changes: 1 addition & 1 deletion pkg/mq/msgstream/mqwrapper/kafka/kafka_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (kp *kafkaProducer) Send(ctx context.Context, message *mqcommon.ProducerMes
metrics.MsgStreamRequestLatency.WithLabelValues(metrics.SendMsgLabel).Observe(float64(elapsed.Milliseconds()))
metrics.MsgStreamOpCounter.WithLabelValues(metrics.SendMsgLabel, metrics.SuccessLabel).Inc()

return &kafkaID{messageID: int64(m.TopicPartition.Offset)}, nil
return &KafkaID{MessageID: int64(m.TopicPartition.Offset)}, nil
}

func (kp *kafkaProducer) Close() {
Expand Down
Loading

0 comments on commit 312b193

Please sign in to comment.