Skip to content

Commit

Permalink
fix: add the request ctx for stream pipeline interface (#37835)
Browse files Browse the repository at this point in the history
- issue: #37834

Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG authored Nov 21, 2024
1 parent 19572f5 commit 54aaeda
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 8 deletions.
2 changes: 1 addition & 1 deletion internal/querynodev2/pipeline/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (suite *PipelineManagerTestSuite) TestBasic() {
suite.NotNil(pipeline)

// Init Consumer
err = pipeline.ConsumeMsgStream(&msgpb.MsgPosition{})
err = pipeline.ConsumeMsgStream(context.Background(), &msgpb.MsgPosition{})
suite.NoError(err)

// Start pipeline
Expand Down
2 changes: 1 addition & 1 deletion internal/querynodev2/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (suite *PipelineTestSuite) TestBasic() {
suite.NoError(err)

// Init Consumer
err = pipeline.ConsumeMsgStream(&msgpb.MsgPosition{})
err = pipeline.ConsumeMsgStream(context.Background(), &msgpb.MsgPosition{})
suite.NoError(err)

err = pipeline.Start()
Expand Down
2 changes: 1 addition & 1 deletion internal/querynodev2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (node *QueryNode) WatchDmChannels(ctx context.Context, req *querypb.WatchDm
MsgID: channel.SeekPosition.MsgID,
Timestamp: channel.SeekPosition.Timestamp,
}
err = pipeline.ConsumeMsgStream(position)
err = pipeline.ConsumeMsgStream(ctx, position)
if err != nil {
err = merr.WrapErrServiceUnavailable(err.Error(), "InitPipelineFailed")
log.Warn(err.Error(),
Expand Down
8 changes: 4 additions & 4 deletions internal/util/pipeline/stream_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import (

type StreamPipeline interface {
Pipeline
ConsumeMsgStream(position *msgpb.MsgPosition) error
ConsumeMsgStream(ctx context.Context, position *msgpb.MsgPosition) error
Status() string
}

Expand Down Expand Up @@ -85,7 +85,7 @@ func (p *streamPipeline) Status() string {
return "Healthy"
}

func (p *streamPipeline) ConsumeMsgStream(position *msgpb.MsgPosition) error {
func (p *streamPipeline) ConsumeMsgStream(ctx context.Context, position *msgpb.MsgPosition) error {
var err error
if position == nil {
log.Error("seek stream to nil position")
Expand All @@ -101,7 +101,7 @@ func (p *streamPipeline) ConsumeMsgStream(position *msgpb.MsgPosition) error {
zap.Uint64("timestamp", position.GetTimestamp()),
)
handler := adaptor.NewMsgPackAdaptorHandler()
p.scanner = streaming.WAL().Read(context.Background(), streaming.ReadOption{
p.scanner = streaming.WAL().Read(ctx, streaming.ReadOption{
VChannel: position.GetChannelName(),
DeliverPolicy: options.DeliverPolicyStartFrom(startFrom),
DeliverFilters: []options.DeliverFilter{
Expand All @@ -117,7 +117,7 @@ func (p *streamPipeline) ConsumeMsgStream(position *msgpb.MsgPosition) error {
}

start := time.Now()
p.input, err = p.dispatcher.Register(context.TODO(), p.vChannel, position, common.SubscriptionPositionUnknown)
p.input, err = p.dispatcher.Register(ctx, p.vChannel, position, common.SubscriptionPositionUnknown)
if err != nil {
log.Error("dispatcher register failed", zap.String("channel", position.ChannelName))
return WrapErrRegDispather(err)
Expand Down
3 changes: 2 additions & 1 deletion internal/util/pipeline/stream_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package pipeline

import (
context2 "context"
"fmt"
"testing"

Expand Down Expand Up @@ -63,7 +64,7 @@ func (suite *StreamPipelineSuite) TestBasic() {
})
}

err := suite.pipeline.ConsumeMsgStream(&msgpb.MsgPosition{})
err := suite.pipeline.ConsumeMsgStream(context2.Background(), &msgpb.MsgPosition{})
suite.NoError(err)

suite.pipeline.Start()
Expand Down

0 comments on commit 54aaeda

Please sign in to comment.