Skip to content

Commit

Permalink
enhance: refactor the consumer grpc proto for reusing grpc stream for…
Browse files Browse the repository at this point in the history
… multi-consumer (#37564)

issue: #33285

- Modify the proto of consumer of streaming service.
- Make VChannel as a required option for streaming

---------

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Nov 11, 2024
1 parent 5e6c3df commit 1b6edd0
Show file tree
Hide file tree
Showing 27 changed files with 251 additions and 101 deletions.
6 changes: 6 additions & 0 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,6 +952,12 @@ func (s *Server) GetChannelRecoveryInfo(ctx context.Context, req *datapb.GetChan

channel := NewRWChannel(req.GetVchannel(), collectionID, nil, collection.Schema, 0) // TODO: remove RWChannel, just use vchannel + collectionID
channelInfo := s.handler.GetDataVChanPositions(channel, allPartitionID)
if channelInfo.SeekPosition == nil {
log.Warn("channel recovery start position is not found, may collection is on creating")
resp.Status = merr.Status(merr.WrapErrChannelNotAvailable(req.GetVchannel(), "start position is nil"))
return resp, nil
}

log.Info("datacoord get channel recovery info",
zap.String("channel", channelInfo.GetChannelName()),
zap.Int("# of unflushed segments", len(channelInfo.GetUnflushedSegmentIds())),
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1533,7 +1533,7 @@ func TestGetChannelRecoveryInfo(t *testing.T) {
channelInfo := &datapb.VchannelInfo{
CollectionID: 0,
ChannelName: "ch-1",
SeekPosition: nil,
SeekPosition: &msgpb.MsgPosition{Timestamp: 10},
UnflushedSegmentIds: []int64{1},
FlushedSegmentIds: []int64{2},
DroppedSegmentIds: []int64{3},
Expand Down
9 changes: 7 additions & 2 deletions internal/distributed/streaming/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,18 @@ func (w *walAccesserImpl) getProducer(pchannel string) *producer.ResumableProduc
return p
}

// assertNoSystemMessage asserts the message is not system message.
func assertNoSystemMessage(msgs ...message.MutableMessage) {
// assertValidMessage asserts the message is not system message.
func assertValidMessage(msgs ...message.MutableMessage) {
for _, msg := range msgs {
if msg.MessageType().IsSystem() {
panic("system message is not allowed to append from client")
}
}
for _, msg := range msgs {
if msg.VChannel() == "" {
panic("vchannel is empty")
}
}
}

// We only support delete and insert message for txn now.
Expand Down
3 changes: 3 additions & 0 deletions internal/distributed/streaming/internal/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ type ConsumerOptions struct {
// PChannel is the pchannel of the consumer.
PChannel string

// VChannel is the vchannel of the consumer.
VChannel string

// DeliverPolicy is the deliver policy of the consumer.
DeliverPolicy options.DeliverPolicy

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (rc *resumableConsumerImpl) resumeLoop() {
}
opts := &handler.ConsumerOptions{
PChannel: rc.opts.PChannel,
VChannel: rc.opts.VChannel,
DeliverPolicy: deliverPolicy,
DeliverFilters: deliverFilters,
MessageHandler: nopCloseMH,
Expand Down
1 change: 1 addition & 0 deletions internal/distributed/streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type TxnOption struct {

type ReadOption struct {
// VChannel is the target vchannel to read.
// It must be set to read message from a vchannel.
VChannel string

// DeliverPolicy is the deliver policy of the consumer.
Expand Down
2 changes: 1 addition & 1 deletion internal/distributed/streaming/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type txnImpl struct {

// Append writes records to the log.
func (t *txnImpl) Append(ctx context.Context, msg message.MutableMessage, opts ...AppendOption) error {
assertNoSystemMessage(msg)
assertValidMessage(msg)
assertIsDmlMessage(msg)

t.mu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion internal/distributed/streaming/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
// Otherwise, it will be sent as individual messages.
// !!! This function do not promise the atomicity and deliver order of the messages appending.
func (u *walAccesserImpl) AppendMessages(ctx context.Context, msgs ...message.MutableMessage) AppendResponses {
assertNoSystemMessage(msgs...)
assertValidMessage(msgs...)

// dispatch the messages into different vchannel.
dispatchedMessages, indexes := u.dispatchMessages(msgs...)
Expand Down
10 changes: 6 additions & 4 deletions internal/distributed/streaming/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/milvus-io/milvus/internal/streamingnode/client/handler"
"github.com/milvus-io/milvus/internal/util/streamingutil/status"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/funcutil"
Expand Down Expand Up @@ -55,7 +54,7 @@ type walAccesserImpl struct {

// RawAppend writes a record to the log.
func (w *walAccesserImpl) RawAppend(ctx context.Context, msg message.MutableMessage, opts ...AppendOption) (*types.AppendResult, error) {
assertNoSystemMessage(msg)
assertValidMessage(msg)
if err := w.lifetime.Add(lifetime.IsWorking); err != nil {
return nil, status.NewOnShutdownError("wal accesser closed, %s", err.Error())
}
Expand All @@ -71,14 +70,17 @@ func (w *walAccesserImpl) Read(_ context.Context, opts ReadOption) Scanner {
newErrScanner(status.NewOnShutdownError("wal accesser closed, %s", err.Error()))
}
defer w.lifetime.Done()
if opts.VChannel == "" {
return newErrScanner(status.NewInvaildArgument("vchannel is required"))
}

// TODO: optimize the consumer into pchannel level.
pchannel := funcutil.ToPhysicalChannel(opts.VChannel)
filters := append(opts.DeliverFilters, options.DeliverFilterVChannel(opts.VChannel))
rc := consumer.NewResumableConsumer(w.handlerClient.CreateConsumer, &consumer.ConsumerOptions{
PChannel: pchannel,
VChannel: opts.VChannel,
DeliverPolicy: opts.DeliverPolicy,
DeliverFilters: filters,
DeliverFilters: opts.DeliverFilters,
MessageHandler: opts.MessageHandler,
})
return rc
Expand Down
39 changes: 33 additions & 6 deletions internal/streamingnode/client/handler/consumer/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ type ConsumerOptions struct {
// The cosume target
Assignment *types.PChannelInfoAssigned

// VChannel is the vchannel of the consumer.
VChannel string

// DeliverPolicy is the deliver policy of the consumer.
DeliverPolicy options.DeliverPolicy

Expand Down Expand Up @@ -66,7 +69,7 @@ func CreateConsumer(
cli := &consumerImpl{
ctx: ctx,
walName: createResp.GetWalName(),
assignment: *opts.Assignment,
opts: opts,
grpcStreamClient: streamClient,
handlerClient: handlerClient,
logger: log.With(
Expand All @@ -87,16 +90,14 @@ func createConsumeRequest(ctx context.Context, opts *ConsumerOptions) (context.C
ctx = contextutil.WithPickServerID(ctx, opts.Assignment.Node.ServerID)
// create the consumer request.
return contextutil.WithCreateConsumer(ctx, &streamingpb.CreateConsumerRequest{
Pchannel: types.NewProtoFromPChannelInfo(opts.Assignment.Channel),
DeliverPolicy: opts.DeliverPolicy,
DeliverFilters: opts.DeliverFilters,
Pchannel: types.NewProtoFromPChannelInfo(opts.Assignment.Channel),
}), nil
}

type consumerImpl struct {
ctx context.Context // TODO: the cancel method of consumer should be managed by consumerImpl, fix it in future.
walName string
assignment types.PChannelInfoAssigned
opts *ConsumerOptions
grpcStreamClient streamingpb.StreamingNodeHandlerService_ConsumeClient
handlerClient streamingpb.StreamingNodeHandlerServiceClient
logger *log.MLogger
Expand Down Expand Up @@ -158,7 +159,9 @@ func (c *consumerImpl) recvLoop() (err error) {
c.finishErr.Set(err)
c.msgHandler.Close()
}()

if err := c.createVChannelConsumer(); err != nil {
return err
}
for {
resp, err := c.grpcStreamClient.Recv()
if errors.Is(err, io.EOF) {
Expand Down Expand Up @@ -200,6 +203,30 @@ func (c *consumerImpl) recvLoop() (err error) {
}
}

func (c *consumerImpl) createVChannelConsumer() error {
// Create the vchannel client.
if err := c.grpcStreamClient.Send(&streamingpb.ConsumeRequest{
Request: &streamingpb.ConsumeRequest_CreateVchannelConsumer{
CreateVchannelConsumer: &streamingpb.CreateVChannelConsumerRequest{
Vchannel: c.opts.VChannel,
DeliverPolicy: c.opts.DeliverPolicy,
DeliverFilters: c.opts.DeliverFilters,
},
},
}); err != nil {
return err
}
resp, err := c.grpcStreamClient.Recv()
if err != nil {
return err
}
createVChannelResp := resp.GetCreateVchannel()
if createVChannelResp == nil {
return status.NewInvalidRequestSeq("expect create vchannel response")
}
return nil
}

func (c *consumerImpl) handleTxnMessage(msg message.ImmutableMessage) error {
switch msg.MessageType() {
case message.MessageTypeBeginTxn:
Expand Down
11 changes: 10 additions & 1 deletion internal/streamingnode/client/handler/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,13 @@ func newMockedConsumerImpl(t *testing.T, ctx context.Context, h message.Handler)
})

opts := &ConsumerOptions{
VChannel: "test-1",
Assignment: &types.PChannelInfoAssigned{
Channel: types.PChannelInfo{Name: "test", Term: 1},
Node: types.StreamingNodeInfo{ServerID: 1, Address: "localhost"},
},
DeliverPolicy: options.DeliverPolicyAll(),
DeliverFilters: []options.DeliverFilter{
options.DeliverFilterVChannel("test-1"),
options.DeliverFilterTimeTickGT(100),
},
MessageHandler: h,
Expand All @@ -158,6 +158,15 @@ func newMockedConsumerImpl(t *testing.T, ctx context.Context, h message.Handler)
},
},
}
recvCh <- &streamingpb.ConsumeResponse{
Response: &streamingpb.ConsumeResponse_CreateVchannel{
CreateVchannel: &streamingpb.CreateVChannelConsumerResponse{
Response: &streamingpb.CreateVChannelConsumerResponse_ConsumerId{
ConsumerId: 1,
},
},
},
}
consumer, err := CreateConsumer(ctx, opts, c)
if err != nil {
panic(err)
Expand Down
3 changes: 3 additions & 0 deletions internal/streamingnode/client/handler/handler_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type ConsumerOptions struct {
// PChannel is the pchannel of the consumer.
PChannel string

// VChannel is the vchannel of the consumer.
VChannel string

// DeliverPolicy is the deliver policy of the consumer.
DeliverPolicy options.DeliverPolicy

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (hc *handlerClientImpl) CreateConsumer(ctx context.Context, opts *ConsumerO
}
return hc.newConsumer(ctx, &consumer.ConsumerOptions{
Assignment: assign,
VChannel: opts.VChannel,
DeliverPolicy: opts.DeliverPolicy,
DeliverFilters: opts.DeliverFilters,
MessageHandler: opts.MessageHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,11 @@ func TestHandlerClient(t *testing.T) {

consumer, err := handler.CreateConsumer(ctx, &ConsumerOptions{
PChannel: "pchannel",
VChannel: "vchannel",
DeliverPolicy: options.DeliverPolicyAll(),
DeliverFilters: []options.DeliverFilter{
options.DeliverFilterTimeTickGT(10),
options.DeliverFilterTimeTickGTE(10),
options.DeliverFilterVChannel("vchannel"),
},
MessageHandler: make(message.ChanMessageHandler),
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,8 @@ func (c *channelLifetime) Run() error {
policy := options.DeliverPolicyStartFrom(messageID)
handler := adaptor2.NewMsgPackAdaptorHandler()
ro := wal.ReadOption{
DeliverPolicy: policy,
MessageFilter: []options.DeliverFilter{
options.DeliverFilterVChannel(c.vchannel),
},
VChannel: c.vchannel,
DeliverPolicy: policy,
MesasgeHandler: handler,
}
scanner, err := c.wal.Read(ctx, ro)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ func (p *consumeGrpcServerHelper) SendCreated(resp *streamingpb.CreateConsumerRe
})
}

// SendCreated sends the create response to client.
func (p *consumeGrpcServerHelper) SendCreateVChannelConsumer(resp *streamingpb.CreateVChannelConsumerResponse) error {
return p.Send(&streamingpb.ConsumeResponse{
Response: &streamingpb.ConsumeResponse_CreateVchannel{
CreateVchannel: resp,
},
})
}

// SendClosed sends the close response to client.
// no more message should be sent after sending close response.
func (p *consumeGrpcServerHelper) SendClosed() error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,73 @@ import (
// CreateConsumeServer create a new consumer.
// Expected message sequence:
// CreateConsumeServer:
// -> ConsumeResponse 1
// -> ConsumeResponse 2
// -> ConsumeResponse 3
// <- CreateVChannelConsumer 1
// -> CreateVChannelConsuemr 1
// -> ConsumeMessage 1.1
// <- CreateVChannelConsumer 2
// -> ConsumeMessage 1.2
// -> CreateVChannelConsumer 2
// -> ConsumeMessage 2.1
// -> ConsumeMessage 2.2
// -> ConsumeMessage 1.3
// <- CloseVChannelConsumer 1
// -> CloseVChannelConsumer 1
// -> ConsumeMessage 2.3
// <- CloseVChannelConsumer 2
// -> CloseVChannelConsumer 2
// CloseConsumer:
func CreateConsumeServer(walManager walmanager.Manager, streamServer streamingpb.StreamingNodeHandlerService_ConsumeServer) (*ConsumeServer, error) {
createReq, err := contextutil.GetCreateConsumer(streamServer.Context())
if err != nil {
return nil, status.NewInvaildArgument("create consumer request is required")
}

l, err := walManager.GetAvailableWAL(types.NewPChannelInfoFromProto(createReq.GetPchannel()))
if err != nil {
return nil, err
}
scanner, err := l.Read(streamServer.Context(), wal.ReadOption{
DeliverPolicy: createReq.GetDeliverPolicy(),
MessageFilter: createReq.DeliverFilters,
})
if err != nil {
return nil, err
}
consumeServer := &consumeGrpcServerHelper{
StreamingNodeHandlerService_ConsumeServer: streamServer,
}
if err := consumeServer.SendCreated(&streamingpb.CreateConsumerResponse{
WalName: l.WALName(),
}); err != nil {
return nil, errors.Wrap(err, "at send created")
}

req, err := streamServer.Recv()
if err != nil {
return nil, errors.New("receive create consumer request failed")
}
createVChannelReq := req.GetCreateVchannelConsumer()
if createVChannelReq == nil {
return nil, errors.New("The first message must be create vchannel consumer request")
}
scanner, err := l.Read(streamServer.Context(), wal.ReadOption{
VChannel: createVChannelReq.GetVchannel(),
DeliverPolicy: createVChannelReq.GetDeliverPolicy(),
MessageFilter: createVChannelReq.GetDeliverFilters(),
})
if err != nil {
return nil, err
}

// TODO: consumerID should be generated after we enabling multi-vchannel consuming on same grpc stream.
consumerID := int64(1)
if err := consumeServer.SendCreateVChannelConsumer(&streamingpb.CreateVChannelConsumerResponse{
Response: &streamingpb.CreateVChannelConsumerResponse_ConsumerId{
ConsumerId: consumerID,
},
}); err != nil {
// release the scanner to avoid resource leak.
if err := scanner.Close(); err != nil {
log.Warn("close scanner failed at create consume server", zap.Error(err))
}
return nil, errors.Wrap(err, "at send created")
return nil, err
}
metrics := newConsumerMetrics(l.Channel().Name)
return &ConsumeServer{
consumerID: 1,
scanner: scanner,
consumeServer: consumeServer,
logger: log.With(zap.String("channel", l.Channel().Name), zap.Int64("term", l.Channel().Term)), // Add trace info for all log.
Expand All @@ -64,6 +97,7 @@ func CreateConsumeServer(walManager walmanager.Manager, streamServer streamingpb

// ConsumeServer is a ConsumeServer of log messages.
type ConsumeServer struct {
consumerID int64
scanner wal.Scanner
consumeServer *consumeGrpcServerHelper
logger *log.MLogger
Expand Down Expand Up @@ -151,6 +185,7 @@ func (c *ConsumeServer) sendImmutableMessage(msg message.ImmutableMessage) (err

// Send Consumed message to client and do metrics.
if err := c.consumeServer.SendConsumeMessage(&streamingpb.ConsumeMessageReponse{
ConsumerId: c.consumerID,
Message: &messagespb.ImmutableMessage{
Id: &messagespb.MessageID{
Id: msg.MessageID().Marshal(),
Expand Down
Loading

0 comments on commit 1b6edd0

Please sign in to comment.