diff --git a/cmd/tools/config/generate.go b/cmd/tools/config/generate.go index cdc5cd15bfeba..1b50ea64e2ab0 100644 --- a/cmd/tools/config/generate.go +++ b/cmd/tools/config/generate.go @@ -333,6 +333,11 @@ func WriteYaml(w io.Writer) { header: ` # Any configuration related to the streaming node server.`, }, + { + name: "streaming", + header: ` +# Any configuration related to the streaming service.`, + }, } marshller := YamlMarshaller{w, groups, result} marshller.writeYamlRecursive(lo.Filter(result, func(d DocContent, _ int) bool { diff --git a/configs/milvus.yaml b/configs/milvus.yaml index a98371c3783c7..be3343f557e06 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -1036,3 +1036,16 @@ streamingNode: serverMaxRecvSize: 268435456 # The maximum size of each RPC request that the streamingNode can receive, unit: byte clientMaxSendSize: 268435456 # The maximum size of each RPC request that the clients on streamingNode can send, unit: byte clientMaxRecvSize: 268435456 # The maximum size of each RPC request that the clients on streamingNode can receive, unit: byte + +# Any configuration related to the streaming service. +streaming: + walBalancer: + # The interval of balance task trigger at background, 1 min by default. + # It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration + triggerInterval: 1m + # The initial interval of balance task trigger backoff, 50 ms by default. + # It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration + backoffInitialInterval: 50ms + backoffMultiplier: 2 # The multiplier of balance task trigger backoff, 2 by default + txn: + defaultKeepaliveTimeout: 10s # The default keepalive timeout for wal txn, 10s by default diff --git a/internal/distributed/streaming/streaming.go b/internal/distributed/streaming/streaming.go index aa39c4e1de224..460f85054228b 100644 --- a/internal/distributed/streaming/streaming.go +++ b/internal/distributed/streaming/streaming.go @@ -44,7 +44,8 @@ type TxnOption struct { // Keepalive is the time to keepalive of the transaction. // If the txn don't append message in the keepalive time, the txn will be expired. - // Only make sense when ttl is greater than 1ms. + // Only make sense when keepalive is greater than 1ms. + // The default value is 0, which means the keepalive is setted by the wal at streaming node. Keepalive time.Duration } diff --git a/internal/distributed/streaming/util.go b/internal/distributed/streaming/util.go index da1e0156a55e1..8d51ef5671f9d 100644 --- a/internal/distributed/streaming/util.go +++ b/internal/distributed/streaming/util.go @@ -3,7 +3,6 @@ package streaming import ( "context" "sync" - "time" "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/streaming/util/types" @@ -97,8 +96,7 @@ func (u *walAccesserImpl) appendToVChannel(ctx context.Context, vchannel string, // Otherwise, we start a transaction to append the messages. // The transaction will be committed when all messages are appended. txn, err := u.Txn(ctx, TxnOption{ - VChannel: vchannel, - Keepalive: 5 * time.Second, + VChannel: vchannel, }) if err != nil { resp.fillAllError(err) diff --git a/internal/distributed/streaming/wal.go b/internal/distributed/streaming/wal.go index 0835a9d678b57..9a3765a934065 100644 --- a/internal/distributed/streaming/wal.go +++ b/internal/distributed/streaming/wal.go @@ -93,7 +93,7 @@ func (w *walAccesserImpl) Txn(ctx context.Context, opts TxnOption) (Txn, error) w.lifetime.Done() return nil, status.NewInvaildArgument("vchannel is required") } - if opts.Keepalive < 1*time.Millisecond { + if opts.Keepalive != 0 && opts.Keepalive < 1*time.Millisecond { w.lifetime.Done() return nil, status.NewInvaildArgument("ttl must be greater than or equal to 1ms") } diff --git a/internal/streamingcoord/server/balancer/balancer_impl.go b/internal/streamingcoord/server/balancer/balancer_impl.go index 2cd2900cd3ec9..70b087e9864f7 100644 --- a/internal/streamingcoord/server/balancer/balancer_impl.go +++ b/internal/streamingcoord/server/balancer/balancer_impl.go @@ -291,12 +291,12 @@ type backoffConfigFetcher struct{} func (f *backoffConfigFetcher) BackoffConfig() typeutil.BackoffConfig { return typeutil.BackoffConfig{ - InitialInterval: paramtable.Get().StreamingCoordCfg.AutoBalanceBackoffInitialInterval.GetAsDurationByParse(), - Multiplier: paramtable.Get().StreamingCoordCfg.AutoBalanceBackoffMultiplier.GetAsFloat(), - MaxInterval: paramtable.Get().StreamingCoordCfg.AutoBalanceTriggerInterval.GetAsDurationByParse(), + InitialInterval: paramtable.Get().StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse(), + Multiplier: paramtable.Get().StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat(), + MaxInterval: paramtable.Get().StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse(), } } func (f *backoffConfigFetcher) DefaultInterval() time.Duration { - return paramtable.Get().StreamingCoordCfg.AutoBalanceTriggerInterval.GetAsDurationByParse() + return paramtable.Get().StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse() } diff --git a/internal/streamingnode/server/wal/interceptors/txn/txn_manager.go b/internal/streamingnode/server/wal/interceptors/txn/txn_manager.go index 6bdb427b2b004..bc53fa65cf1d5 100644 --- a/internal/streamingnode/server/wal/interceptors/txn/txn_manager.go +++ b/internal/streamingnode/server/wal/interceptors/txn/txn_manager.go @@ -12,6 +12,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/streaming/util/message" "github.com/milvus-io/milvus/pkg/util/lifetime" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) // NewTxnManager creates a new transaction manager. @@ -36,6 +37,13 @@ type TxnManager struct { // We only support a transaction work on a streaming node, once the wal is transferred to another node, // the transaction is treated as expired (rollback), and user will got a expired error, then perform a retry. func (m *TxnManager) BeginNewTxn(ctx context.Context, timetick uint64, keepalive time.Duration) (*TxnSession, error) { + if keepalive == 0 { + // If keepalive is 0, the txn set the keepalive with default keepalive. + keepalive = paramtable.Get().StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse() + } + if keepalive < 1*time.Millisecond { + return nil, status.NewInvaildArgument("keepalive must be greater than 1ms") + } id, err := resource.Resource().IDAllocator().Allocate(ctx) if err != nil { return nil, err diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 4a5da5394e369..b30bba4c97e17 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -69,18 +69,17 @@ type ComponentParam struct { GpuConfig gpuConfig TraceCfg traceConfig - RootCoordCfg rootCoordConfig - ProxyCfg proxyConfig - QueryCoordCfg queryCoordConfig - QueryNodeCfg queryNodeConfig - DataCoordCfg dataCoordConfig - DataNodeCfg dataNodeConfig - IndexNodeCfg indexNodeConfig - HTTPCfg httpConfig - LogCfg logConfig - RoleCfg roleConfig - StreamingCoordCfg streamingCoordConfig - StreamingNodeCfg streamingNodeConfig + RootCoordCfg rootCoordConfig + ProxyCfg proxyConfig + QueryCoordCfg queryCoordConfig + QueryNodeCfg queryNodeConfig + DataCoordCfg dataCoordConfig + DataNodeCfg dataNodeConfig + IndexNodeCfg indexNodeConfig + HTTPCfg httpConfig + LogCfg logConfig + RoleCfg roleConfig + StreamingCfg streamingConfig RootCoordGrpcServerCfg GrpcServerConfig ProxyGrpcServerCfg GrpcServerConfig @@ -130,14 +129,11 @@ func (p *ComponentParam) init(bt *BaseTable) { p.DataCoordCfg.init(bt) p.DataNodeCfg.init(bt) p.IndexNodeCfg.init(bt) - p.StreamingCoordCfg.init(bt) - p.StreamingNodeCfg.init(bt) + p.StreamingCfg.init(bt) p.HTTPCfg.init(bt) p.LogCfg.init(bt) p.RoleCfg.init(bt) p.GpuConfig.init(bt) - p.StreamingCoordCfg.init(bt) - p.StreamingNodeCfg.init(bt) p.RootCoordGrpcServerCfg.Init("rootCoord", bt) p.ProxyGrpcServerCfg.Init("proxy", bt) @@ -4635,44 +4631,54 @@ func (p *indexNodeConfig) init(base *BaseTable) { p.GracefulStopTimeout.Init(base.mgr) } -type streamingCoordConfig struct { - AutoBalanceTriggerInterval ParamItem `refreshable:"true"` - AutoBalanceBackoffInitialInterval ParamItem `refreshable:"true"` - AutoBalanceBackoffMultiplier ParamItem `refreshable:"true"` +type streamingConfig struct { + // balancer + WALBalancerTriggerInterval ParamItem `refreshable:"true"` + WALBalancerBackoffInitialInterval ParamItem `refreshable:"true"` + WALBalancerBackoffMultiplier ParamItem `refreshable:"true"` + + // txn + TxnDefaultKeepaliveTimeout ParamItem `refreshable:"true"` } -func (p *streamingCoordConfig) init(base *BaseTable) { - p.AutoBalanceTriggerInterval = ParamItem{ - Key: "streamingCoord.autoBalanceTriggerInterval", +func (p *streamingConfig) init(base *BaseTable) { + // balancer + p.WALBalancerTriggerInterval = ParamItem{ + Key: "streaming.walBalancer.triggerInterval", Version: "2.5.0", Doc: `The interval of balance task trigger at background, 1 min by default. It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration`, DefaultValue: "1m", Export: true, } - p.AutoBalanceTriggerInterval.Init(base.mgr) - p.AutoBalanceBackoffInitialInterval = ParamItem{ - Key: "streamingCoord.autoBalanceBackoffInitialInterval", + p.WALBalancerTriggerInterval.Init(base.mgr) + p.WALBalancerBackoffInitialInterval = ParamItem{ + Key: "streaming.walBalancer.backoffInitialInterval", Version: "2.5.0", Doc: `The initial interval of balance task trigger backoff, 50 ms by default. It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration`, DefaultValue: "50ms", Export: true, } - p.AutoBalanceBackoffInitialInterval.Init(base.mgr) - p.AutoBalanceBackoffMultiplier = ParamItem{ - Key: "streamingCoord.autoBalanceBackoffMultiplier", + p.WALBalancerBackoffInitialInterval.Init(base.mgr) + p.WALBalancerBackoffMultiplier = ParamItem{ + Key: "streaming.walBalancer.backoffMultiplier", Version: "2.5.0", Doc: "The multiplier of balance task trigger backoff, 2 by default", DefaultValue: "2", Export: true, } - p.AutoBalanceBackoffMultiplier.Init(base.mgr) -} - -type streamingNodeConfig struct{} + p.WALBalancerBackoffMultiplier.Init(base.mgr) -func (p *streamingNodeConfig) init(base *BaseTable) { + // txn + p.TxnDefaultKeepaliveTimeout = ParamItem{ + Key: "streaming.txn.defaultKeepaliveTimeout", + Version: "2.5.0", + Doc: "The default keepalive timeout for wal txn, 10s by default", + DefaultValue: "10s", + Export: true, + } + p.TxnDefaultKeepaliveTimeout.Init(base.mgr) } type runtimeConfig struct { diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 33ab0c8b430f6..f81de8d8595fe 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -583,16 +583,19 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) }) - t.Run("test streamingCoordConfig", func(t *testing.T) { - assert.Equal(t, 1*time.Minute, params.StreamingCoordCfg.AutoBalanceTriggerInterval.GetAsDurationByParse()) - assert.Equal(t, 50*time.Millisecond, params.StreamingCoordCfg.AutoBalanceBackoffInitialInterval.GetAsDurationByParse()) - assert.Equal(t, 2.0, params.StreamingCoordCfg.AutoBalanceBackoffMultiplier.GetAsFloat()) - params.Save(params.StreamingCoordCfg.AutoBalanceTriggerInterval.Key, "50s") - params.Save(params.StreamingCoordCfg.AutoBalanceBackoffInitialInterval.Key, "50s") - params.Save(params.StreamingCoordCfg.AutoBalanceBackoffMultiplier.Key, "3.5") - assert.Equal(t, 50*time.Second, params.StreamingCoordCfg.AutoBalanceTriggerInterval.GetAsDurationByParse()) - assert.Equal(t, 50*time.Second, params.StreamingCoordCfg.AutoBalanceBackoffInitialInterval.GetAsDurationByParse()) - assert.Equal(t, 3.5, params.StreamingCoordCfg.AutoBalanceBackoffMultiplier.GetAsFloat()) + t.Run("test streamingConfig", func(t *testing.T) { + assert.Equal(t, 1*time.Minute, params.StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse()) + assert.Equal(t, 50*time.Millisecond, params.StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse()) + assert.Equal(t, 2.0, params.StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat()) + assert.Equal(t, 10*time.Second, params.StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse()) + params.Save(params.StreamingCfg.WALBalancerTriggerInterval.Key, "50s") + params.Save(params.StreamingCfg.WALBalancerBackoffInitialInterval.Key, "50s") + params.Save(params.StreamingCfg.WALBalancerBackoffMultiplier.Key, "3.5") + params.Save(params.StreamingCfg.TxnDefaultKeepaliveTimeout.Key, "3500ms") + assert.Equal(t, 50*time.Second, params.StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse()) + assert.Equal(t, 50*time.Second, params.StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse()) + assert.Equal(t, 3.5, params.StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat()) + assert.Equal(t, 3500*time.Millisecond, params.StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse()) }) t.Run("channel config priority", func(t *testing.T) {