Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix consume blocked due to too many consumers #38455

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,9 @@ mq:
mergeCheckInterval: 1 # the interval time(in seconds) for dispatcher to check whether to merge
targetBufSize: 16 # the lenth of channel buffer for targe
maxTolerantLag: 3 # Default value: "3", the timeout(in seconds) that target sends msgPack
maxDispatcherNumPerPchannel: 10 # The maximum number of dispatchers per physical channel, primarily to limit the number of consumers and prevent performance issues(e.g., during recovery when a large number of channels are watched).
retrySleep: 300 # register retry sleep time in seconds
retryTimeout: 5 # register retry timeout in seconds

# Related configuration of pulsar, used to manage Milvus logs of recent mutation operations, output streaming log, and provide log publish-subscribe services.
pulsar:
Expand Down Expand Up @@ -520,7 +523,6 @@ dataCoord:
balanceInterval: 360 # The interval with which the channel manager check dml channel balance status
checkInterval: 1 # The interval in seconds with which the channel manager advances channel states
notifyChannelOperationTimeout: 5 # Timeout notifing channel operations (in seconds).
maxConcurrentChannelTaskNumPerDN: 32 # The maximum concurrency for each DataNode executing channel tasks (watch, release).
segment:
maxSize: 1024 # The maximum size of a segment, unit: MB. datacoord.segment.maxSize and datacoord.segment.sealProportion together determine if a segment can be sealed.
diskSegmentMaxSize: 2048 # Maximun size of a segment in MB for collection which has Disk index
Expand Down
14 changes: 1 addition & 13 deletions internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -451,24 +450,13 @@ func (m *ChannelManagerImpl) AdvanceChannelState(ctx context.Context) {
standbys := m.store.GetNodeChannelsBy(WithAllNodes(), WithChannelStates(Standby))
toNotifies := m.store.GetNodeChannelsBy(WithoutBufferNode(), WithChannelStates(ToWatch, ToRelease))
toChecks := m.store.GetNodeChannelsBy(WithoutBufferNode(), WithChannelStates(Watching, Releasing))
maxNum := len(m.store.GetNodes()) * paramtable.Get().DataCoordCfg.MaxConcurrentChannelTaskNumPerDN.GetAsInt()
m.mu.RUnlock()

// Processing standby channels
updatedStandbys := false
updatedStandbys = m.advanceStandbys(ctx, standbys)
updatedToCheckes := m.advanceToChecks(ctx, toChecks)

var (
updatedToNotifies bool
executingNum = len(toChecks)
toNotifyNum = maxNum - executingNum
)

if toNotifyNum > 0 {
toNotifies = lo.Slice(toNotifies, 0, toNotifyNum)
updatedToNotifies = m.advanceToNotifies(ctx, toNotifies)
}
updatedToNotifies := m.advanceToNotifies(ctx, toNotifies)

if updatedStandbys || updatedToCheckes || updatedToNotifies {
m.lastActiveTimestamp = time.Now()
Expand Down
57 changes: 44 additions & 13 deletions internal/flushcommon/pipeline/flow_graph_dmstream_input_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"time"

"github.com/cockroachdb/errors"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
Expand All @@ -34,6 +35,7 @@ import (
"github.com/milvus-io/milvus/pkg/mq/msgdispatcher"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -69,6 +71,13 @@ func createNewInputFromDispatcher(initCtx context.Context,
) (<-chan *msgstream.MsgPack, error) {
log := log.With(zap.Int64("nodeID", paramtable.GetNodeID()),
zap.String("vchannel", vchannel))

var (
input <-chan *msgstream.MsgPack
err error
start = time.Now()
)

replicateID, _ := pkgcommon.GetReplicateID(schema.GetProperties())
if replicateID == "" {
log.Info("datanode consume without replicateID, try to get replicateID from dbProperties", zap.Any("dbProperties", dbProperties))
Expand All @@ -77,30 +86,52 @@ func createNewInputFromDispatcher(initCtx context.Context,
replicateConfig := msgstream.GetReplicateConfig(replicateID, schema.GetDbName(), schema.GetName())

if seekPos != nil && len(seekPos.MsgID) != 0 {
input, err := dispatcherClient.Register(initCtx, &msgdispatcher.StreamConfig{
VChannel: vchannel,
Pos: seekPos,
SubPos: common.SubscriptionPositionUnknown,
ReplicateConfig: replicateConfig,
})
err := retry.Handle(initCtx, func() (bool, error) {
input, err = dispatcherClient.Register(initCtx, &msgdispatcher.StreamConfig{
VChannel: vchannel,
Pos: seekPos,
SubPos: common.SubscriptionPositionUnknown,
ReplicateConfig: replicateConfig,
})
if err != nil {
log.Warn("datanode consume failed", zap.Error(err))
return errors.Is(err, msgdispatcher.ErrTooManyConsumers), err
}
return false, nil
}, retry.Sleep(paramtable.Get().MQCfg.RetrySleep.GetAsDuration(time.Second)), // 5 seconds
retry.MaxSleepTime(paramtable.Get().MQCfg.RetryTimeout.GetAsDuration(time.Second))) // 5 minutes
if err != nil {
log.Warn("datanode consume failed after retried", zap.Error(err))
return nil, err
}

log.Info("datanode seek successfully when register to msgDispatcher",
zap.ByteString("msgID", seekPos.GetMsgID()),
zap.Time("tsTime", tsoutil.PhysicalTime(seekPos.GetTimestamp())),
zap.Duration("tsLag", time.Since(tsoutil.PhysicalTime(seekPos.GetTimestamp()))))
zap.Duration("tsLag", time.Since(tsoutil.PhysicalTime(seekPos.GetTimestamp()))),
zap.Duration("dur", time.Since(start)))
return input, err
}
input, err := dispatcherClient.Register(initCtx, &msgdispatcher.StreamConfig{
VChannel: vchannel,
Pos: nil,
SubPos: common.SubscriptionPositionEarliest,
ReplicateConfig: replicateConfig,
})

err = retry.Handle(initCtx, func() (bool, error) {
input, err = dispatcherClient.Register(initCtx, &msgdispatcher.StreamConfig{
VChannel: vchannel,
Pos: nil,
SubPos: common.SubscriptionPositionEarliest,
ReplicateConfig: replicateConfig,
})
if err != nil {
log.Warn("datanode consume failed", zap.Error(err))
return errors.Is(err, msgdispatcher.ErrTooManyConsumers), err
}
return false, nil
}, retry.Sleep(paramtable.Get().MQCfg.RetrySleep.GetAsDuration(time.Second)), // 5 seconds
retry.MaxSleepTime(paramtable.Get().MQCfg.RetryTimeout.GetAsDuration(time.Second))) // 5 minutes
if err != nil {
log.Warn("datanode consume failed after retried", zap.Error(err))
return nil, err
}

log.Info("datanode consume successfully when register to msgDispatcher")
return input, err
}
26 changes: 19 additions & 7 deletions internal/util/pipeline/stream_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"time"

"github.com/cockroachdb/errors"
"go.uber.org/atomic"
"go.uber.org/zap"

Expand All @@ -35,6 +36,8 @@ import (
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/message/adaptor"
"github.com/milvus-io/milvus/pkg/streaming/util/options"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/retry"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)

Expand Down Expand Up @@ -119,16 +122,25 @@ func (p *streamPipeline) ConsumeMsgStream(ctx context.Context, position *msgpb.M
}

start := time.Now()
p.input, err = p.dispatcher.Register(ctx, &msgdispatcher.StreamConfig{
VChannel: p.vChannel,
Pos: position,
SubPos: common.SubscriptionPositionUnknown,
ReplicateConfig: p.replicateConfig,
})
err = retry.Handle(ctx, func() (bool, error) {
p.input, err = p.dispatcher.Register(ctx, &msgdispatcher.StreamConfig{
VChannel: p.vChannel,
Pos: position,
SubPos: common.SubscriptionPositionUnknown,
ReplicateConfig: p.replicateConfig,
})
if err != nil {
log.Warn("dispatcher register failed", zap.String("channel", position.ChannelName), zap.Error(err))
return errors.Is(err, msgdispatcher.ErrTooManyConsumers), err
}
return false, nil
}, retry.Sleep(paramtable.Get().MQCfg.RetrySleep.GetAsDuration(time.Second)), // 5 seconds
retry.MaxSleepTime(paramtable.Get().MQCfg.RetryTimeout.GetAsDuration(time.Second))) // 5 minutes
if err != nil {
log.Error("dispatcher register failed", zap.String("channel", position.ChannelName))
log.Error("dispatcher register failed after retried", zap.String("channel", position.ChannelName), zap.Error(err))
return WrapErrRegDispather(err)
}

ts, _ := tsoutil.ParseTS(position.GetTimestamp())
log.Info("stream pipeline seeks from position with msgDispatcher",
zap.String("pchannel", position.ChannelName),
Expand Down
3 changes: 3 additions & 0 deletions internal/util/pipeline/stream_pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"testing"

"github.com/milvus-io/milvus/pkg/util/paramtable"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

Expand All @@ -42,6 +44,7 @@ type StreamPipelineSuite struct {
}

func (suite *StreamPipelineSuite) SetupTest() {
paramtable.Init()
suite.channel = "test-channel"
suite.inChannel = make(chan *msgstream.MsgPack, 1)
suite.outChannel = make(chan msgstream.Timestamp)
Expand Down
18 changes: 15 additions & 3 deletions pkg/mq/msgdispatcher/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package msgdispatcher

import (
"context"
"time"

"github.com/cockroachdb/errors"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
Expand All @@ -27,9 +29,12 @@ import (
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

var ErrTooManyConsumers = errors.New("consumer number limit exceeded")

type (
Pos = msgpb.MsgPosition
MsgPack = msgstream.MsgPack
Expand Down Expand Up @@ -82,6 +87,7 @@ func (c *client) Register(ctx context.Context, streamConfig *StreamConfig) (<-ch
log := log.With(zap.String("role", c.role),
zap.Int64("nodeID", c.nodeID), zap.String("vchannel", vchannel))
pchannel := funcutil.ToPhysicalChannel(vchannel)
start := time.Now()
c.managerMut.Lock(pchannel)
defer c.managerMut.Unlock(pchannel)
var manager DispatcherManager
Expand All @@ -91,6 +97,11 @@ func (c *client) Register(ctx context.Context, streamConfig *StreamConfig) (<-ch
c.managers.Insert(pchannel, manager)
go manager.Run()
}
// Check if the consumer number limit has been reached.
if manager.Num() >= paramtable.Get().MQCfg.MaxDispatcherNumPerPchannel.GetAsInt() {
return nil, ErrTooManyConsumers
}
// Begin to register
ch, err := manager.Add(ctx, streamConfig)
if err != nil {
if manager.Num() == 0 {
Expand All @@ -100,12 +111,13 @@ func (c *client) Register(ctx context.Context, streamConfig *StreamConfig) (<-ch
log.Error("register failed", zap.Error(err))
return nil, err
}
log.Info("register done")
log.Info("register done", zap.Duration("dur", time.Since(start)))
return ch, nil
}

func (c *client) Deregister(vchannel string) {
pchannel := funcutil.ToPhysicalChannel(vchannel)
start := time.Now()
c.managerMut.Lock(pchannel)
defer c.managerMut.Unlock(pchannel)
if manager, ok := c.managers.Get(pchannel); ok {
Expand All @@ -114,8 +126,8 @@ func (c *client) Deregister(vchannel string) {
manager.Close()
c.managers.Remove(pchannel)
}
log.Info("deregister done", zap.String("role", c.role),
zap.Int64("nodeID", c.nodeID), zap.String("vchannel", vchannel))
log.Info("deregister done", zap.String("role", c.role), zap.Int64("nodeID", c.nodeID),
zap.String("vchannel", vchannel), zap.Duration("dur", time.Since(start)))
}
}

Expand Down
22 changes: 6 additions & 16 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -3279,13 +3279,12 @@ user-task-polling:
// --- datacoord ---
type dataCoordConfig struct {
// --- CHANNEL ---
WatchTimeoutInterval ParamItem `refreshable:"false"`
LegacyVersionWithoutRPCWatch ParamItem `refreshable:"false"`
ChannelBalanceSilentDuration ParamItem `refreshable:"true"`
ChannelBalanceInterval ParamItem `refreshable:"true"`
ChannelCheckInterval ParamItem `refreshable:"true"`
ChannelOperationRPCTimeout ParamItem `refreshable:"true"`
MaxConcurrentChannelTaskNumPerDN ParamItem `refreshable:"true"`
WatchTimeoutInterval ParamItem `refreshable:"false"`
LegacyVersionWithoutRPCWatch ParamItem `refreshable:"false"`
ChannelBalanceSilentDuration ParamItem `refreshable:"true"`
ChannelBalanceInterval ParamItem `refreshable:"true"`
ChannelCheckInterval ParamItem `refreshable:"true"`
ChannelOperationRPCTimeout ParamItem `refreshable:"true"`

// --- SEGMENTS ---
SegmentMaxSize ParamItem `refreshable:"false"`
Expand Down Expand Up @@ -3454,15 +3453,6 @@ func (p *dataCoordConfig) init(base *BaseTable) {
}
p.ChannelOperationRPCTimeout.Init(base.mgr)

p.MaxConcurrentChannelTaskNumPerDN = ParamItem{
Key: "dataCoord.channel.maxConcurrentChannelTaskNumPerDN",
Version: "2.5",
DefaultValue: "32",
Doc: "The maximum concurrency for each DataNode executing channel tasks (watch, release).",
Export: true,
}
p.MaxConcurrentChannelTaskNumPerDN.Init(base.mgr)

p.SegmentMaxSize = ParamItem{
Key: "dataCoord.segment.maxSize",
Version: "2.0.0",
Expand Down
1 change: 0 additions & 1 deletion pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,6 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 4, Params.L0DeleteCompactionSlotUsage.GetAsInt())
params.Save("datacoord.scheduler.taskSlowThreshold", "1000")
assert.Equal(t, 1000*time.Second, Params.TaskSlowThreshold.GetAsDuration(time.Second))
assert.Equal(t, 32, Params.MaxConcurrentChannelTaskNumPerDN.GetAsInt())
})

t.Run("test dataNodeConfig", func(t *testing.T) {
Expand Down
36 changes: 33 additions & 3 deletions pkg/util/paramtable/service_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,9 +518,12 @@ type MQConfig struct {
IgnoreBadPosition ParamItem `refreshable:"true"`

// msgdispatcher
MergeCheckInterval ParamItem `refreshable:"false"`
TargetBufSize ParamItem `refreshable:"false"`
MaxTolerantLag ParamItem `refreshable:"true"`
MergeCheckInterval ParamItem `refreshable:"false"`
TargetBufSize ParamItem `refreshable:"false"`
MaxTolerantLag ParamItem `refreshable:"true"`
MaxDispatcherNumPerPchannel ParamItem `refreshable:"true"`
RetrySleep ParamItem `refreshable:"true"`
RetryTimeout ParamItem `refreshable:"true"`
}

// Init initializes the MQConfig object with a BaseTable.
Expand All @@ -544,6 +547,33 @@ Valid values: [default, pulsar, kafka, rocksmq, natsmq]`,
}
p.MaxTolerantLag.Init(base.mgr)

p.MaxDispatcherNumPerPchannel = ParamItem{
Key: "mq.dispatcher.maxDispatcherNumPerPchannel",
Version: "2.4.19",
DefaultValue: "10",
Doc: `The maximum number of dispatchers per physical channel, primarily to limit the number of consumers and prevent performance issues(e.g., during recovery when a large number of channels are watched).`,
Export: true,
}
p.MaxDispatcherNumPerPchannel.Init(base.mgr)

p.RetrySleep = ParamItem{
Key: "mq.dispatcher.retrySleep",
Version: "2.4.19",
DefaultValue: "5",
Doc: `register retry sleep time in seconds`,
Export: true,
}
p.RetrySleep.Init(base.mgr)

p.RetryTimeout = ParamItem{
Key: "mq.dispatcher.retryTimeout",
Version: "2.4.19",
DefaultValue: "300",
Doc: `register retry timeout in seconds`,
Export: true,
}
p.RetryTimeout.Init(base.mgr)

p.TargetBufSize = ParamItem{
Key: "mq.dispatcher.targetBufSize",
Version: "2.4.4",
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/paramtable/service_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ func TestServiceParam(t *testing.T) {
var SParams ServiceParam
bt := NewBaseTable(SkipRemote(true))
SParams.init(bt)

t.Run("test MQConfig", func(t *testing.T) {
Params := &SParams.MQCfg
assert.Equal(t, 1*time.Second, Params.MergeCheckInterval.GetAsDuration(time.Second))
assert.Equal(t, 16, Params.TargetBufSize.GetAsInt())
assert.Equal(t, 3*time.Second, Params.MaxTolerantLag.GetAsDuration(time.Second))
assert.Equal(t, 10, Params.MaxDispatcherNumPerPchannel.GetAsInt())
assert.Equal(t, 5*time.Second, Params.RetrySleep.GetAsDuration(time.Second))
assert.Equal(t, 300*time.Second, Params.RetryTimeout.GetAsDuration(time.Second))
})

t.Run("test etcdConfig", func(t *testing.T) {
Params := &SParams.EtcdCfg

Expand Down
Loading