From 1b6edd0b4b3601900d2ec66d82e9d8d145bce6c8 Mon Sep 17 00:00:00 2001 From: Zhen Ye Date: Mon, 11 Nov 2024 17:24:29 +0800 Subject: [PATCH] enhance: refactor the consumer grpc proto for reusing grpc stream for multi-consumer (#37564) issue: #33285 - Modify the proto of consumer of streaming service. - Make VChannel as a required option for streaming --------- Signed-off-by: chyezh --- internal/datacoord/services.go | 6 ++ internal/datacoord/services_test.go | 2 +- internal/distributed/streaming/append.go | 9 ++- .../streaming/internal/consumer/consumer.go | 3 + .../internal/consumer/consumer_impl.go | 1 + internal/distributed/streaming/streaming.go | 1 + internal/distributed/streaming/txn.go | 2 +- internal/distributed/streaming/util.go | 2 +- internal/distributed/streaming/wal.go | 10 +-- .../client/handler/consumer/consumer_impl.go | 39 +++++++++-- .../client/handler/consumer/consumer_test.go | 11 +++- .../client/handler/handler_client.go | 3 + .../client/handler/handler_client_impl.go | 1 + .../client/handler/handler_client_test.go | 2 +- .../flusher/flusherimpl/channel_lifetime.go | 6 +- .../consumer/consume_grpc_server_helper.go | 9 +++ .../handler/consumer/consume_server.go | 57 +++++++++++++---- .../handler/consumer/consume_server_test.go | 37 +++++++++-- .../server/wal/adaptor/scanner_adaptor.go | 9 +++ .../wal/adaptor/scanner_adaptor_test.go | 15 ++++- .../server/wal/adaptor/wal_adaptor_test.go | 8 ++- .../server/wal/adaptor/wal_test.go | 10 ++- internal/streamingnode/server/wal/scanner.go | 1 + .../contextutil/create_consumer_test.go | 4 -- pkg/streaming/proto/streaming.proto | 64 +++++++++++++++---- pkg/streaming/util/options/deliver.go | 19 +----- pkg/streaming/util/options/deliver_test.go | 21 ------ 27 files changed, 251 insertions(+), 101 deletions(-) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index b11550a1c3a5f..086d83ce941a1 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -952,6 +952,12 @@ func (s *Server) GetChannelRecoveryInfo(ctx context.Context, req *datapb.GetChan channel := NewRWChannel(req.GetVchannel(), collectionID, nil, collection.Schema, 0) // TODO: remove RWChannel, just use vchannel + collectionID channelInfo := s.handler.GetDataVChanPositions(channel, allPartitionID) + if channelInfo.SeekPosition == nil { + log.Warn("channel recovery start position is not found, may collection is on creating") + resp.Status = merr.Status(merr.WrapErrChannelNotAvailable(req.GetVchannel(), "start position is nil")) + return resp, nil + } + log.Info("datacoord get channel recovery info", zap.String("channel", channelInfo.GetChannelName()), zap.Int("# of unflushed segments", len(channelInfo.GetUnflushedSegmentIds())), diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 23ed364a43a77..d8b813e702aab 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -1533,7 +1533,7 @@ func TestGetChannelRecoveryInfo(t *testing.T) { channelInfo := &datapb.VchannelInfo{ CollectionID: 0, ChannelName: "ch-1", - SeekPosition: nil, + SeekPosition: &msgpb.MsgPosition{Timestamp: 10}, UnflushedSegmentIds: []int64{1}, FlushedSegmentIds: []int64{2}, DroppedSegmentIds: []int64{3}, diff --git a/internal/distributed/streaming/append.go b/internal/distributed/streaming/append.go index d44cb84ddf5f4..2fd0820e549b2 100644 --- a/internal/distributed/streaming/append.go +++ b/internal/distributed/streaming/append.go @@ -34,13 +34,18 @@ func (w *walAccesserImpl) getProducer(pchannel string) *producer.ResumableProduc return p } -// assertNoSystemMessage asserts the message is not system message. -func assertNoSystemMessage(msgs ...message.MutableMessage) { +// assertValidMessage asserts the message is not system message. +func assertValidMessage(msgs ...message.MutableMessage) { for _, msg := range msgs { if msg.MessageType().IsSystem() { panic("system message is not allowed to append from client") } } + for _, msg := range msgs { + if msg.VChannel() == "" { + panic("vchannel is empty") + } + } } // We only support delete and insert message for txn now. diff --git a/internal/distributed/streaming/internal/consumer/consumer.go b/internal/distributed/streaming/internal/consumer/consumer.go index 1ae5e6492390a..a521f4f1af9da 100644 --- a/internal/distributed/streaming/internal/consumer/consumer.go +++ b/internal/distributed/streaming/internal/consumer/consumer.go @@ -12,6 +12,9 @@ type ConsumerOptions struct { // PChannel is the pchannel of the consumer. PChannel string + // VChannel is the vchannel of the consumer. + VChannel string + // DeliverPolicy is the deliver policy of the consumer. DeliverPolicy options.DeliverPolicy diff --git a/internal/distributed/streaming/internal/consumer/consumer_impl.go b/internal/distributed/streaming/internal/consumer/consumer_impl.go index 918c1255bc018..082c61afd1758 100644 --- a/internal/distributed/streaming/internal/consumer/consumer_impl.go +++ b/internal/distributed/streaming/internal/consumer/consumer_impl.go @@ -100,6 +100,7 @@ func (rc *resumableConsumerImpl) resumeLoop() { } opts := &handler.ConsumerOptions{ PChannel: rc.opts.PChannel, + VChannel: rc.opts.VChannel, DeliverPolicy: deliverPolicy, DeliverFilters: deliverFilters, MessageHandler: nopCloseMH, diff --git a/internal/distributed/streaming/streaming.go b/internal/distributed/streaming/streaming.go index 460f85054228b..8ef6df73619d0 100644 --- a/internal/distributed/streaming/streaming.go +++ b/internal/distributed/streaming/streaming.go @@ -51,6 +51,7 @@ type TxnOption struct { type ReadOption struct { // VChannel is the target vchannel to read. + // It must be set to read message from a vchannel. VChannel string // DeliverPolicy is the deliver policy of the consumer. diff --git a/internal/distributed/streaming/txn.go b/internal/distributed/streaming/txn.go index 771f22ec8eda9..b0d86468c8504 100644 --- a/internal/distributed/streaming/txn.go +++ b/internal/distributed/streaming/txn.go @@ -23,7 +23,7 @@ type txnImpl struct { // Append writes records to the log. func (t *txnImpl) Append(ctx context.Context, msg message.MutableMessage, opts ...AppendOption) error { - assertNoSystemMessage(msg) + assertValidMessage(msg) assertIsDmlMessage(msg) t.mu.Lock() diff --git a/internal/distributed/streaming/util.go b/internal/distributed/streaming/util.go index 8d51ef5671f9d..3a024dc03c5a3 100644 --- a/internal/distributed/streaming/util.go +++ b/internal/distributed/streaming/util.go @@ -14,7 +14,7 @@ import ( // Otherwise, it will be sent as individual messages. // !!! This function do not promise the atomicity and deliver order of the messages appending. func (u *walAccesserImpl) AppendMessages(ctx context.Context, msgs ...message.MutableMessage) AppendResponses { - assertNoSystemMessage(msgs...) + assertValidMessage(msgs...) // dispatch the messages into different vchannel. dispatchedMessages, indexes := u.dispatchMessages(msgs...) diff --git a/internal/distributed/streaming/wal.go b/internal/distributed/streaming/wal.go index 9a3765a934065..4277568472af1 100644 --- a/internal/distributed/streaming/wal.go +++ b/internal/distributed/streaming/wal.go @@ -13,7 +13,6 @@ import ( "github.com/milvus-io/milvus/internal/streamingnode/client/handler" "github.com/milvus-io/milvus/internal/util/streamingutil/status" "github.com/milvus-io/milvus/pkg/streaming/util/message" - "github.com/milvus-io/milvus/pkg/streaming/util/options" "github.com/milvus-io/milvus/pkg/streaming/util/types" "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/funcutil" @@ -55,7 +54,7 @@ type walAccesserImpl struct { // RawAppend writes a record to the log. func (w *walAccesserImpl) RawAppend(ctx context.Context, msg message.MutableMessage, opts ...AppendOption) (*types.AppendResult, error) { - assertNoSystemMessage(msg) + assertValidMessage(msg) if err := w.lifetime.Add(lifetime.IsWorking); err != nil { return nil, status.NewOnShutdownError("wal accesser closed, %s", err.Error()) } @@ -71,14 +70,17 @@ func (w *walAccesserImpl) Read(_ context.Context, opts ReadOption) Scanner { newErrScanner(status.NewOnShutdownError("wal accesser closed, %s", err.Error())) } defer w.lifetime.Done() + if opts.VChannel == "" { + return newErrScanner(status.NewInvaildArgument("vchannel is required")) + } // TODO: optimize the consumer into pchannel level. pchannel := funcutil.ToPhysicalChannel(opts.VChannel) - filters := append(opts.DeliverFilters, options.DeliverFilterVChannel(opts.VChannel)) rc := consumer.NewResumableConsumer(w.handlerClient.CreateConsumer, &consumer.ConsumerOptions{ PChannel: pchannel, + VChannel: opts.VChannel, DeliverPolicy: opts.DeliverPolicy, - DeliverFilters: filters, + DeliverFilters: opts.DeliverFilters, MessageHandler: opts.MessageHandler, }) return rc diff --git a/internal/streamingnode/client/handler/consumer/consumer_impl.go b/internal/streamingnode/client/handler/consumer/consumer_impl.go index dd8cd92ac868b..b880f7064a8d8 100644 --- a/internal/streamingnode/client/handler/consumer/consumer_impl.go +++ b/internal/streamingnode/client/handler/consumer/consumer_impl.go @@ -24,6 +24,9 @@ type ConsumerOptions struct { // The cosume target Assignment *types.PChannelInfoAssigned + // VChannel is the vchannel of the consumer. + VChannel string + // DeliverPolicy is the deliver policy of the consumer. DeliverPolicy options.DeliverPolicy @@ -66,7 +69,7 @@ func CreateConsumer( cli := &consumerImpl{ ctx: ctx, walName: createResp.GetWalName(), - assignment: *opts.Assignment, + opts: opts, grpcStreamClient: streamClient, handlerClient: handlerClient, logger: log.With( @@ -87,16 +90,14 @@ func createConsumeRequest(ctx context.Context, opts *ConsumerOptions) (context.C ctx = contextutil.WithPickServerID(ctx, opts.Assignment.Node.ServerID) // create the consumer request. return contextutil.WithCreateConsumer(ctx, &streamingpb.CreateConsumerRequest{ - Pchannel: types.NewProtoFromPChannelInfo(opts.Assignment.Channel), - DeliverPolicy: opts.DeliverPolicy, - DeliverFilters: opts.DeliverFilters, + Pchannel: types.NewProtoFromPChannelInfo(opts.Assignment.Channel), }), nil } type consumerImpl struct { ctx context.Context // TODO: the cancel method of consumer should be managed by consumerImpl, fix it in future. walName string - assignment types.PChannelInfoAssigned + opts *ConsumerOptions grpcStreamClient streamingpb.StreamingNodeHandlerService_ConsumeClient handlerClient streamingpb.StreamingNodeHandlerServiceClient logger *log.MLogger @@ -158,7 +159,9 @@ func (c *consumerImpl) recvLoop() (err error) { c.finishErr.Set(err) c.msgHandler.Close() }() - + if err := c.createVChannelConsumer(); err != nil { + return err + } for { resp, err := c.grpcStreamClient.Recv() if errors.Is(err, io.EOF) { @@ -200,6 +203,30 @@ func (c *consumerImpl) recvLoop() (err error) { } } +func (c *consumerImpl) createVChannelConsumer() error { + // Create the vchannel client. + if err := c.grpcStreamClient.Send(&streamingpb.ConsumeRequest{ + Request: &streamingpb.ConsumeRequest_CreateVchannelConsumer{ + CreateVchannelConsumer: &streamingpb.CreateVChannelConsumerRequest{ + Vchannel: c.opts.VChannel, + DeliverPolicy: c.opts.DeliverPolicy, + DeliverFilters: c.opts.DeliverFilters, + }, + }, + }); err != nil { + return err + } + resp, err := c.grpcStreamClient.Recv() + if err != nil { + return err + } + createVChannelResp := resp.GetCreateVchannel() + if createVChannelResp == nil { + return status.NewInvalidRequestSeq("expect create vchannel response") + } + return nil +} + func (c *consumerImpl) handleTxnMessage(msg message.ImmutableMessage) error { switch msg.MessageType() { case message.MessageTypeBeginTxn: diff --git a/internal/streamingnode/client/handler/consumer/consumer_test.go b/internal/streamingnode/client/handler/consumer/consumer_test.go index 6a634223bbc57..8481beea30260 100644 --- a/internal/streamingnode/client/handler/consumer/consumer_test.go +++ b/internal/streamingnode/client/handler/consumer/consumer_test.go @@ -139,13 +139,13 @@ func newMockedConsumerImpl(t *testing.T, ctx context.Context, h message.Handler) }) opts := &ConsumerOptions{ + VChannel: "test-1", Assignment: &types.PChannelInfoAssigned{ Channel: types.PChannelInfo{Name: "test", Term: 1}, Node: types.StreamingNodeInfo{ServerID: 1, Address: "localhost"}, }, DeliverPolicy: options.DeliverPolicyAll(), DeliverFilters: []options.DeliverFilter{ - options.DeliverFilterVChannel("test-1"), options.DeliverFilterTimeTickGT(100), }, MessageHandler: h, @@ -158,6 +158,15 @@ func newMockedConsumerImpl(t *testing.T, ctx context.Context, h message.Handler) }, }, } + recvCh <- &streamingpb.ConsumeResponse{ + Response: &streamingpb.ConsumeResponse_CreateVchannel{ + CreateVchannel: &streamingpb.CreateVChannelConsumerResponse{ + Response: &streamingpb.CreateVChannelConsumerResponse_ConsumerId{ + ConsumerId: 1, + }, + }, + }, + } consumer, err := CreateConsumer(ctx, opts, c) if err != nil { panic(err) diff --git a/internal/streamingnode/client/handler/handler_client.go b/internal/streamingnode/client/handler/handler_client.go index 4a3f73ca0e8b9..8377759c70c6c 100644 --- a/internal/streamingnode/client/handler/handler_client.go +++ b/internal/streamingnode/client/handler/handler_client.go @@ -49,6 +49,9 @@ type ConsumerOptions struct { // PChannel is the pchannel of the consumer. PChannel string + // VChannel is the vchannel of the consumer. + VChannel string + // DeliverPolicy is the deliver policy of the consumer. DeliverPolicy options.DeliverPolicy diff --git a/internal/streamingnode/client/handler/handler_client_impl.go b/internal/streamingnode/client/handler/handler_client_impl.go index f612936282c12..b054fd8c303a6 100644 --- a/internal/streamingnode/client/handler/handler_client_impl.go +++ b/internal/streamingnode/client/handler/handler_client_impl.go @@ -71,6 +71,7 @@ func (hc *handlerClientImpl) CreateConsumer(ctx context.Context, opts *ConsumerO } return hc.newConsumer(ctx, &consumer.ConsumerOptions{ Assignment: assign, + VChannel: opts.VChannel, DeliverPolicy: opts.DeliverPolicy, DeliverFilters: opts.DeliverFilters, MessageHandler: opts.MessageHandler, diff --git a/internal/streamingnode/client/handler/handler_client_test.go b/internal/streamingnode/client/handler/handler_client_test.go index 0b685aea4c43f..7779b316b9ced 100644 --- a/internal/streamingnode/client/handler/handler_client_test.go +++ b/internal/streamingnode/client/handler/handler_client_test.go @@ -98,11 +98,11 @@ func TestHandlerClient(t *testing.T) { consumer, err := handler.CreateConsumer(ctx, &ConsumerOptions{ PChannel: "pchannel", + VChannel: "vchannel", DeliverPolicy: options.DeliverPolicyAll(), DeliverFilters: []options.DeliverFilter{ options.DeliverFilterTimeTickGT(10), options.DeliverFilterTimeTickGTE(10), - options.DeliverFilterVChannel("vchannel"), }, MessageHandler: make(message.ChanMessageHandler), }) diff --git a/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go b/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go index afcbace6a8dfc..dbc8007777598 100644 --- a/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go +++ b/internal/streamingnode/server/flusher/flusherimpl/channel_lifetime.go @@ -105,10 +105,8 @@ func (c *channelLifetime) Run() error { policy := options.DeliverPolicyStartFrom(messageID) handler := adaptor2.NewMsgPackAdaptorHandler() ro := wal.ReadOption{ - DeliverPolicy: policy, - MessageFilter: []options.DeliverFilter{ - options.DeliverFilterVChannel(c.vchannel), - }, + VChannel: c.vchannel, + DeliverPolicy: policy, MesasgeHandler: handler, } scanner, err := c.wal.Read(ctx, ro) diff --git a/internal/streamingnode/server/service/handler/consumer/consume_grpc_server_helper.go b/internal/streamingnode/server/service/handler/consumer/consume_grpc_server_helper.go index 4796894b503ea..ea6ee649a3fbd 100644 --- a/internal/streamingnode/server/service/handler/consumer/consume_grpc_server_helper.go +++ b/internal/streamingnode/server/service/handler/consumer/consume_grpc_server_helper.go @@ -25,6 +25,15 @@ func (p *consumeGrpcServerHelper) SendCreated(resp *streamingpb.CreateConsumerRe }) } +// SendCreated sends the create response to client. +func (p *consumeGrpcServerHelper) SendCreateVChannelConsumer(resp *streamingpb.CreateVChannelConsumerResponse) error { + return p.Send(&streamingpb.ConsumeResponse{ + Response: &streamingpb.ConsumeResponse_CreateVchannel{ + CreateVchannel: resp, + }, + }) +} + // SendClosed sends the close response to client. // no more message should be sent after sending close response. func (p *consumeGrpcServerHelper) SendClosed() error { diff --git a/internal/streamingnode/server/service/handler/consumer/consume_server.go b/internal/streamingnode/server/service/handler/consumer/consume_server.go index 1103f3ad16f54..4d1a2bb74b693 100644 --- a/internal/streamingnode/server/service/handler/consumer/consume_server.go +++ b/internal/streamingnode/server/service/handler/consumer/consume_server.go @@ -20,40 +20,73 @@ import ( // CreateConsumeServer create a new consumer. // Expected message sequence: // CreateConsumeServer: -// -> ConsumeResponse 1 -// -> ConsumeResponse 2 -// -> ConsumeResponse 3 +// <- CreateVChannelConsumer 1 +// -> CreateVChannelConsuemr 1 +// -> ConsumeMessage 1.1 +// <- CreateVChannelConsumer 2 +// -> ConsumeMessage 1.2 +// -> CreateVChannelConsumer 2 +// -> ConsumeMessage 2.1 +// -> ConsumeMessage 2.2 +// -> ConsumeMessage 1.3 +// <- CloseVChannelConsumer 1 +// -> CloseVChannelConsumer 1 +// -> ConsumeMessage 2.3 +// <- CloseVChannelConsumer 2 +// -> CloseVChannelConsumer 2 // CloseConsumer: func CreateConsumeServer(walManager walmanager.Manager, streamServer streamingpb.StreamingNodeHandlerService_ConsumeServer) (*ConsumeServer, error) { createReq, err := contextutil.GetCreateConsumer(streamServer.Context()) if err != nil { return nil, status.NewInvaildArgument("create consumer request is required") } + l, err := walManager.GetAvailableWAL(types.NewPChannelInfoFromProto(createReq.GetPchannel())) if err != nil { return nil, err } - scanner, err := l.Read(streamServer.Context(), wal.ReadOption{ - DeliverPolicy: createReq.GetDeliverPolicy(), - MessageFilter: createReq.DeliverFilters, - }) - if err != nil { - return nil, err - } consumeServer := &consumeGrpcServerHelper{ StreamingNodeHandlerService_ConsumeServer: streamServer, } if err := consumeServer.SendCreated(&streamingpb.CreateConsumerResponse{ WalName: l.WALName(), + }); err != nil { + return nil, errors.Wrap(err, "at send created") + } + + req, err := streamServer.Recv() + if err != nil { + return nil, errors.New("receive create consumer request failed") + } + createVChannelReq := req.GetCreateVchannelConsumer() + if createVChannelReq == nil { + return nil, errors.New("The first message must be create vchannel consumer request") + } + scanner, err := l.Read(streamServer.Context(), wal.ReadOption{ + VChannel: createVChannelReq.GetVchannel(), + DeliverPolicy: createVChannelReq.GetDeliverPolicy(), + MessageFilter: createVChannelReq.GetDeliverFilters(), + }) + if err != nil { + return nil, err + } + + // TODO: consumerID should be generated after we enabling multi-vchannel consuming on same grpc stream. + consumerID := int64(1) + if err := consumeServer.SendCreateVChannelConsumer(&streamingpb.CreateVChannelConsumerResponse{ + Response: &streamingpb.CreateVChannelConsumerResponse_ConsumerId{ + ConsumerId: consumerID, + }, }); err != nil { // release the scanner to avoid resource leak. if err := scanner.Close(); err != nil { log.Warn("close scanner failed at create consume server", zap.Error(err)) } - return nil, errors.Wrap(err, "at send created") + return nil, err } metrics := newConsumerMetrics(l.Channel().Name) return &ConsumeServer{ + consumerID: 1, scanner: scanner, consumeServer: consumeServer, logger: log.With(zap.String("channel", l.Channel().Name), zap.Int64("term", l.Channel().Term)), // Add trace info for all log. @@ -64,6 +97,7 @@ func CreateConsumeServer(walManager walmanager.Manager, streamServer streamingpb // ConsumeServer is a ConsumeServer of log messages. type ConsumeServer struct { + consumerID int64 scanner wal.Scanner consumeServer *consumeGrpcServerHelper logger *log.MLogger @@ -151,6 +185,7 @@ func (c *ConsumeServer) sendImmutableMessage(msg message.ImmutableMessage) (err // Send Consumed message to client and do metrics. if err := c.consumeServer.SendConsumeMessage(&streamingpb.ConsumeMessageReponse{ + ConsumerId: c.consumerID, Message: &messagespb.ImmutableMessage{ Id: &messagespb.MessageID{ Id: msg.MessageID().Marshal(), diff --git a/internal/streamingnode/server/service/handler/consumer/consume_server_test.go b/internal/streamingnode/server/service/handler/consumer/consume_server_test.go index 5bcb7ef9f4ec0..1b4ae840714b6 100644 --- a/internal/streamingnode/server/service/handler/consumer/consume_server_test.go +++ b/internal/streamingnode/server/service/handler/consumer/consume_server_test.go @@ -45,9 +45,6 @@ func TestCreateConsumeServer(t *testing.T) { Name: "test", Term: 1, }, - DeliverPolicy: &streamingpb.DeliverPolicy{ - Policy: &streamingpb.DeliverPolicy_All{}, - }, })) ctx := metadata.NewIncomingContext(context.Background(), meta) grpcConsumeServer.ExpectedCalls = nil @@ -55,8 +52,30 @@ func TestCreateConsumeServer(t *testing.T) { manager.EXPECT().GetAvailableWAL(types.PChannelInfo{Name: "test", Term: int64(1)}).Return(nil, errors.New("wal not exist")) assertCreateConsumeServerFail(t, manager, grpcConsumeServer) - // Return error if create scanner failed. + // Return error if send created failed. l := mock_wal.NewMockWAL(t) + manager.ExpectedCalls = nil + l.EXPECT().WALName().Return("test") + manager.EXPECT().GetAvailableWAL(types.PChannelInfo{Name: "test", Term: int64(1)}).Return(l, nil) + grpcConsumeServer.EXPECT().Send(mock.Anything).Return(errors.New("send created failed")) + assertCreateConsumeServerFail(t, manager, grpcConsumeServer) + + // Return error if recv failed. + grpcConsumeServer.EXPECT().Send(mock.Anything).Unset() + grpcConsumeServer.EXPECT().Send(mock.Anything).Return(nil) + grpcConsumeServer.EXPECT().Recv().Return(nil, io.ErrUnexpectedEOF) + assertCreateConsumeServerFail(t, manager, grpcConsumeServer) + + // Return error if create scanner failed. + grpcConsumeServer.EXPECT().Recv().Unset() + grpcConsumeServer.EXPECT().Recv().Return(&streamingpb.ConsumeRequest{ + Request: &streamingpb.ConsumeRequest_CreateVchannelConsumer{ + CreateVchannelConsumer: &streamingpb.CreateVChannelConsumerRequest{ + Vchannel: "test", + }, + }, + }, nil) + l = mock_wal.NewMockWAL(t) l.EXPECT().Read(mock.Anything, mock.Anything).Return(nil, errors.New("create scanner failed")) l.EXPECT().WALName().Return("test") manager.ExpectedCalls = nil @@ -64,7 +83,15 @@ func TestCreateConsumeServer(t *testing.T) { assertCreateConsumeServerFail(t, manager, grpcConsumeServer) // Return error if send created failed. - grpcConsumeServer.EXPECT().Send(mock.Anything).Return(errors.New("send created failed")) + grpcConsumeServer.EXPECT().Send(mock.Anything).Unset() + i := 0 + grpcConsumeServer.EXPECT().Send(mock.Anything).RunAndReturn(func(cr *streamingpb.ConsumeResponse) error { + if i >= 1 { + return io.ErrUnexpectedEOF + } + i++ + return nil + }) l.EXPECT().Read(mock.Anything, mock.Anything).Unset() s := mock_wal.NewMockScanner(t) s.EXPECT().Close().Return(nil) diff --git a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go index c524b0c9242c1..8f17d68773d74 100644 --- a/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go +++ b/internal/streamingnode/server/wal/adaptor/scanner_adaptor.go @@ -28,6 +28,9 @@ func newScannerAdaptor( scanMetrics *metricsutil.ScannerMetrics, cleanup func(), ) wal.Scanner { + if readOption.VChannel == "" { + panic("vchannel of scanner must be set") + } if readOption.MesasgeHandler == nil { readOption.MesasgeHandler = defaultMessageHandler(make(chan message.ImmutableMessage)) } @@ -170,6 +173,12 @@ func (s *scannerAdaptorImpl) handleUpstream(msg message.ImmutableMessage) { return } + // Filtering the vchannel + // If the message is not belong to any vchannel, it should be broadcasted to all vchannels. + // Otherwise, it should be filtered by vchannel. + if msg.VChannel() != "" && s.readOption.VChannel != msg.VChannel() { + return + } // Filtering the message if needed. // System message should never be filtered. if s.filterFunc != nil && !s.filterFunc(msg) { diff --git a/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go b/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go index 216ca2fd15b5b..e80bfffddd1ad 100644 --- a/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go +++ b/internal/streamingnode/server/wal/adaptor/scanner_adaptor_test.go @@ -20,16 +20,25 @@ func TestScannerAdaptorReadError(t *testing.T) { l.EXPECT().Read(mock.Anything, mock.Anything).Return(nil, err) l.EXPECT().Channel().Return(types.PChannelInfo{}) - s := newScannerAdaptor("scanner", - l, + assert.Panics(t, func() { + s := newScannerAdaptor("scanner", l, + wal.ReadOption{ + DeliverPolicy: options.DeliverPolicyAll(), + MessageFilter: nil, + }, + metricsutil.NewScanMetrics(types.PChannelInfo{}).NewScannerMetrics(), + func() {}) + defer s.Close() + }) + s := newScannerAdaptor("scanner", l, wal.ReadOption{ + VChannel: "test", DeliverPolicy: options.DeliverPolicyAll(), MessageFilter: nil, }, metricsutil.NewScanMetrics(types.PChannelInfo{}).NewScannerMetrics(), func() {}) defer s.Close() - <-s.Chan() <-s.Done() assert.ErrorIs(t, s.Error(), err) diff --git a/internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go b/internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go index fe3260954c908..85c3365af4543 100644 --- a/internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go +++ b/internal/streamingnode/server/wal/adaptor/wal_adaptor_test.go @@ -36,7 +36,9 @@ func TestWalAdaptorReadFail(t *testing.T) { }) lAdapted := adaptImplsToWAL(l, nil, func() {}) - scanner, err := lAdapted.Read(context.Background(), wal.ReadOption{}) + scanner, err := lAdapted.Read(context.Background(), wal.ReadOption{ + VChannel: "test", + }) assert.NoError(t, err) assert.NotNil(t, scanner) assert.ErrorIs(t, scanner.Error(), expectedErr) @@ -91,7 +93,7 @@ func TestWALAdaptor(t *testing.T) { go func(i int) { defer wg.Done() - scanner, err := lAdapted.Read(context.Background(), wal.ReadOption{}) + scanner, err := lAdapted.Read(context.Background(), wal.ReadOption{VChannel: "test"}) if err != nil { assertShutdownError(t, err) return @@ -121,7 +123,7 @@ func TestWALAdaptor(t *testing.T) { lAdapted.AppendAsync(context.Background(), msg, func(mi *wal.AppendResult, err error) { assertShutdownError(t, err) }) - _, err = lAdapted.Read(context.Background(), wal.ReadOption{}) + _, err = lAdapted.Read(context.Background(), wal.ReadOption{VChannel: "test"}) assertShutdownError(t, err) } diff --git a/internal/streamingnode/server/wal/adaptor/wal_test.go b/internal/streamingnode/server/wal/adaptor/wal_test.go index 7f996ff58c2af..f9f1fb80be165 100644 --- a/internal/streamingnode/server/wal/adaptor/wal_test.go +++ b/internal/streamingnode/server/wal/adaptor/wal_test.go @@ -30,6 +30,8 @@ import ( "github.com/milvus-io/milvus/pkg/streaming/walimpls/impls/walimplstest" ) +const testVChannel = "v1" + type walTestFramework struct { b wal.OpenerBuilder t *testing.T @@ -184,7 +186,7 @@ func (f *testOneWALFramework) testSendCreateCollection(ctx context.Context, w wa PartitionIds: []int64{2}, }). WithBody(&msgpb.CreateCollectionRequest{}). - WithVChannel("v1"). + WithVChannel(testVChannel). BuildMutable() assert.NoError(f.t, err) @@ -203,7 +205,7 @@ func (f *testOneWALFramework) testSendDropCollection(ctx context.Context, w wal. CollectionId: 1, }). WithBody(&msgpb.DropCollectionRequest{}). - WithVChannel("v1"). + WithVChannel(testVChannel). BuildMutable() assert.NoError(f.t, err) @@ -223,7 +225,7 @@ func (f *testOneWALFramework) testAppend(ctx context.Context, w wal.WAL) ([]mess createPartOfTxn := func() (*message.ImmutableTxnMessageBuilder, *message.TxnContext) { msg, err := message.NewBeginTxnMessageBuilderV2(). - WithVChannel("v1"). + WithVChannel(testVChannel). WithHeader(&message.BeginTxnMessageHeader{ KeepaliveMilliseconds: 1000, }). @@ -322,6 +324,7 @@ func (f *testOneWALFramework) testAppend(ctx context.Context, w wal.WAL) ([]mess func (f *testOneWALFramework) testRead(ctx context.Context, w wal.WAL) ([]message.ImmutableMessage, error) { s, err := w.Read(ctx, wal.ReadOption{ + VChannel: testVChannel, DeliverPolicy: options.DeliverPolicyAll(), MessageFilter: []options.DeliverFilter{ options.DeliverFilterMessageType(message.MessageTypeInsert), @@ -367,6 +370,7 @@ func (f *testOneWALFramework) testReadWithOption(ctx context.Context, w wal.WAL) // Test start from some message and timetick is gte than it. readFromMsg := f.written[idx] s, err := w.Read(ctx, wal.ReadOption{ + VChannel: testVChannel, DeliverPolicy: options.DeliverPolicyStartFrom(readFromMsg.LastConfirmedMessageID()), MessageFilter: []options.DeliverFilter{ options.DeliverFilterTimeTickGTE(readFromMsg.TimeTick()), diff --git a/internal/streamingnode/server/wal/scanner.go b/internal/streamingnode/server/wal/scanner.go index 1c5b5aade90d3..89ca04460f909 100644 --- a/internal/streamingnode/server/wal/scanner.go +++ b/internal/streamingnode/server/wal/scanner.go @@ -16,6 +16,7 @@ var ErrUpstreamClosed = errors.New("upstream closed") // ReadOption is the option for reading records from the wal. type ReadOption struct { + VChannel string // vchannel name DeliverPolicy options.DeliverPolicy MessageFilter []options.DeliverFilter MesasgeHandler MessageHandler // message handler for message processing. diff --git a/internal/util/streamingutil/service/contextutil/create_consumer_test.go b/internal/util/streamingutil/service/contextutil/create_consumer_test.go index b8bff588e71b1..f1410e526c0f0 100644 --- a/internal/util/streamingutil/service/contextutil/create_consumer_test.go +++ b/internal/util/streamingutil/service/contextutil/create_consumer_test.go @@ -17,9 +17,6 @@ func TestWithCreateConsumer(t *testing.T) { Name: "test", Term: 1, }, - DeliverPolicy: &streamingpb.DeliverPolicy{ - Policy: &streamingpb.DeliverPolicy_All{}, - }, } ctx := WithCreateConsumer(context.Background(), req) @@ -32,7 +29,6 @@ func TestWithCreateConsumer(t *testing.T) { assert.Nil(t, err) assert.Equal(t, req.Pchannel.Name, req2.Pchannel.Name) assert.Equal(t, req.Pchannel.Term, req2.Pchannel.Term) - assert.Equal(t, req.DeliverPolicy.String(), req2.DeliverPolicy.String()) // panic case. assert.NotPanics(t, func() { WithCreateConsumer(context.Background(), nil) }) diff --git a/pkg/streaming/proto/streaming.proto b/pkg/streaming/proto/streaming.proto index 2a6eb496c5d7a..e4a6943ae2645 100644 --- a/pkg/streaming/proto/streaming.proto +++ b/pkg/streaming/proto/streaming.proto @@ -156,8 +156,7 @@ message DeliverFilter { oneof filter { DeliverFilterTimeTickGT time_tick_gt = 1; DeliverFilterTimeTickGTE time_tick_gte = 2; - DeliverFilterVChannel vchannel = 3; - DeliverFilterMessageType message_type = 4; + DeliverFilterMessageType message_type = 3; } } @@ -175,11 +174,6 @@ message DeliverFilterTimeTickGTE { // equal to this value. } -// DeliverFilterVChannel is the filter to deliver message with vchannel name. -message DeliverFilterVChannel { - string vchannel = 1; // deliver message with vchannel name. -} - message DeliverFilterMessageType { // deliver message with message type. repeated messages.MessageType message_types = 1; @@ -273,8 +267,8 @@ message ProduceResponse { // CreateProducerResponse is the result of the CreateProducer RPC. message CreateProducerResponse { string wal_name = 1; // wal name at server side. - int64 producer_id = 2; // A unique producer id on streamingnode for this - // producer in streamingnode lifetime. + int64 producer_server_id = 2; // A unique producer server id on streamingnode + // for this producer in streamingnode lifetime. // Is used to identify the producer in streamingnode for other unary grpc // call at producer level. } @@ -304,7 +298,11 @@ message CloseProducerResponse {} // Add more control block in future. message ConsumeRequest { oneof request { - CloseConsumerRequest close = 1; + CreateVChannelConsumerRequest create_vchannel_consumer = 1; + CreateVChannelConsumersRequest create_vchannel_consumers = + 2; // Create multiple vchannel consumers, used for recovery in future. + CloseVChannelConsumerRequest close_vchannel = 3; + CloseConsumerRequest close = 4; } } @@ -316,25 +314,67 @@ message CloseConsumerRequest {} // CreateConsumerRequest is passed in the header of stream. message CreateConsumerRequest { PChannelInfo pchannel = 1; +} + +message CreateVChannelConsumersRequest { + repeated CreateVChannelConsumerRequest create_vchannels = 1; +} + +// CreateVChannelConsumerRequest is the request of the CreateVChannelConsumer +// RPC. It's used to create a new vchannel consumer at server side. +message CreateVChannelConsumerRequest { + string vchannel = 1; DeliverPolicy deliver_policy = 2; // deliver policy. repeated DeliverFilter deliver_filters = 3; // deliver filter. } +// ConsumeMessageRequest is the request of the Consume RPC. +message CreateVChannelConsumersResponse { + repeated CreateVChannelConsumerResponse create_vchannels = 1; +} + +// CreateVChannelConsumerResponse is the response of the CreateVChannelConsumer +// RPC. +message CreateVChannelConsumerResponse { + oneof response { + int64 consumer_id = 1; + StreamingError error = 2; + } +} + +// CloseVChannelConsumerRequest is the request of the CloseVChannelConsumer RPC. +message CloseVChannelConsumerRequest { + int64 consumer_id = 1; +} + +// CloseVChannelConsumerResponse is the response of the CloseVChannelConsumer +// RPC. +message CloseVChannelConsumerResponse { + int64 consumer_id = 1; +} + // ConsumeResponse is the reponse of the Consume RPC. message ConsumeResponse { oneof response { CreateConsumerResponse create = 1; ConsumeMessageReponse consume = 2; - CloseConsumerResponse close = 3; + CreateVChannelConsumerResponse create_vchannel = 3; + CreateVChannelConsumersResponse create_vchannels = 4; + CloseVChannelConsumerResponse close_vchannel = 5; + CloseConsumerResponse close = 6; } } message CreateConsumerResponse { string wal_name = 1; // wal name at server side. + // A unique consumer id on streamingnode for this + // consumer in streamingnode lifetime. + int64 consumer_server_id = 2; } message ConsumeMessageReponse { - messages.ImmutableMessage message = 1; + int64 consumer_id = 1; + messages.ImmutableMessage message = 2; } message CloseConsumerResponse {} diff --git a/pkg/streaming/util/options/deliver.go b/pkg/streaming/util/options/deliver.go index 6e5bdd90234c4..bb978250b854c 100644 --- a/pkg/streaming/util/options/deliver.go +++ b/pkg/streaming/util/options/deliver.go @@ -16,8 +16,7 @@ const ( DeliverFilterTypeTimeTickGT deliverFilterType = 1 DeliverFilterTypeTimeTickGTE deliverFilterType = 2 - DeliverFilterTypeVChannel deliverFilterType = 3 - DeliverFilterTypeMessageType deliverFilterType = 4 + DeliverFilterTypeMessageType deliverFilterType = 3 ) type ( @@ -91,17 +90,6 @@ func DeliverFilterTimeTickGTE(timeTick uint64) DeliverFilter { } } -// DeliverFilterVChannel delivers messages filtered by vchannel. -func DeliverFilterVChannel(vchannel string) DeliverFilter { - return &streamingpb.DeliverFilter{ - Filter: &streamingpb.DeliverFilter_Vchannel{ - Vchannel: &streamingpb.DeliverFilterVChannel{ - Vchannel: vchannel, - }, - }, - } -} - // DeliverFilterMessageType delivers messages filtered by message type. func DeliverFilterMessageType(messageType ...message.MessageType) DeliverFilter { messageTypes := make([]messagespb.MessageType, 0, len(messageType)) @@ -154,11 +142,6 @@ func GetFilterFunc(filters []DeliverFilter) func(message.ImmutableMessage) bool } return true }) - case *streamingpb.DeliverFilter_Vchannel: - filterFuncs = append(filterFuncs, func(im message.ImmutableMessage) bool { - // vchannel == "" is a broadcast operation. - return im.VChannel() == "" || im.VChannel() == filter.GetVchannel().Vchannel - }) case *streamingpb.DeliverFilter_MessageType: filterFuncs = append(filterFuncs, func(im message.ImmutableMessage) bool { // system message cannot be filterred. diff --git a/pkg/streaming/util/options/deliver_test.go b/pkg/streaming/util/options/deliver_test.go index ad8014ae62036..4c1dfad2ad377 100644 --- a/pkg/streaming/util/options/deliver_test.go +++ b/pkg/streaming/util/options/deliver_test.go @@ -33,9 +33,6 @@ func TestDeliverFilter(t *testing.T) { filter = DeliverFilterTimeTickGTE(2) _ = filter.GetFilter().(*streamingpb.DeliverFilter_TimeTickGte) - filter = DeliverFilterVChannel("vchannel") - _ = filter.GetFilter().(*streamingpb.DeliverFilter_Vchannel) - filter = DeliverFilterMessageType(message.MessageTypeDelete) _ = filter.GetFilter().(*streamingpb.DeliverFilter_MessageType) } @@ -43,45 +40,33 @@ func TestDeliverFilter(t *testing.T) { func TestNewMessageFilter(t *testing.T) { filters := []DeliverFilter{ DeliverFilterTimeTickGT(1), - DeliverFilterVChannel("test"), } filterFunc := GetFilterFunc(filters) msg := mock_message.NewMockImmutableMessage(t) - msg.EXPECT().TimeTick().Return(2).Maybe() - msg.EXPECT().VChannel().Return("test2").Maybe() - msg.EXPECT().TxnContext().Return(nil).Maybe() - assert.False(t, filterFunc(msg)) - - msg = mock_message.NewMockImmutableMessage(t) msg.EXPECT().TimeTick().Return(1).Maybe() - msg.EXPECT().VChannel().Return("test").Maybe() msg.EXPECT().TxnContext().Return(nil).Maybe() assert.False(t, filterFunc(msg)) msg = mock_message.NewMockImmutableMessage(t) msg.EXPECT().TimeTick().Return(1).Maybe() - msg.EXPECT().VChannel().Return("").Maybe() // vchannel == "" should not be filtered. msg.EXPECT().TxnContext().Return(nil).Maybe() assert.False(t, filterFunc(msg)) msg = mock_message.NewMockImmutableMessage(t) msg.EXPECT().TimeTick().Return(2).Maybe() - msg.EXPECT().VChannel().Return("test").Maybe() msg.EXPECT().TxnContext().Return(nil).Maybe() assert.True(t, filterFunc(msg)) // if message is a txn message, it should be only filterred by time tick when the message type is commit. msg = mock_message.NewMockImmutableMessage(t) msg.EXPECT().TimeTick().Return(1).Maybe() - msg.EXPECT().VChannel().Return("test").Maybe() msg.EXPECT().TxnContext().Return(&message.TxnContext{}).Maybe() msg.EXPECT().MessageType().Return(message.MessageTypeCommitTxn).Maybe() assert.False(t, filterFunc(msg)) msg = mock_message.NewMockImmutableMessage(t) msg.EXPECT().TimeTick().Return(1).Maybe() - msg.EXPECT().VChannel().Return("test").Maybe() msg.EXPECT().TxnContext().Return(&message.TxnContext{}).Maybe() msg.EXPECT().MessageType().Return(message.MessageTypeInsert).Maybe() assert.True(t, filterFunc(msg)) @@ -89,41 +74,35 @@ func TestNewMessageFilter(t *testing.T) { // if message is a txn message, it should be only filterred by time tick when the message type is commit. msg = mock_message.NewMockImmutableMessage(t) msg.EXPECT().TimeTick().Return(1).Maybe() - msg.EXPECT().VChannel().Return("test").Maybe() msg.EXPECT().TxnContext().Return(&message.TxnContext{}).Maybe() msg.EXPECT().MessageType().Return(message.MessageTypeCommitTxn).Maybe() assert.False(t, filterFunc(msg)) msg = mock_message.NewMockImmutableMessage(t) msg.EXPECT().TimeTick().Return(1).Maybe() - msg.EXPECT().VChannel().Return("test").Maybe() msg.EXPECT().TxnContext().Return(&message.TxnContext{}).Maybe() msg.EXPECT().MessageType().Return(message.MessageTypeInsert).Maybe() assert.True(t, filterFunc(msg)) filters = []*streamingpb.DeliverFilter{ DeliverFilterTimeTickGTE(1), - DeliverFilterVChannel("test"), } filterFunc = GetFilterFunc(filters) msg = mock_message.NewMockImmutableMessage(t) msg.EXPECT().TimeTick().Return(1).Maybe() - msg.EXPECT().VChannel().Return("test").Maybe() msg.EXPECT().TxnContext().Return(nil).Maybe() assert.True(t, filterFunc(msg)) // if message is a txn message, it should be only filterred by time tick when the message type is commit. msg = mock_message.NewMockImmutableMessage(t) msg.EXPECT().TimeTick().Return(1).Maybe() - msg.EXPECT().VChannel().Return("test").Maybe() msg.EXPECT().TxnContext().Return(&message.TxnContext{}).Maybe() msg.EXPECT().MessageType().Return(message.MessageTypeCommitTxn).Maybe() assert.True(t, filterFunc(msg)) msg = mock_message.NewMockImmutableMessage(t) msg.EXPECT().TimeTick().Return(1).Maybe() - msg.EXPECT().VChannel().Return("test").Maybe() msg.EXPECT().TxnContext().Return(&message.TxnContext{}).Maybe() msg.EXPECT().MessageType().Return(message.MessageTypeInsert).Maybe() assert.True(t, filterFunc(msg))