Skip to content

Commit

Permalink
fix: refactor milvus config and change default txn timeout (milvus-io…
Browse files Browse the repository at this point in the history
…#36522)

issue: milvus-io#36498

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Sep 29, 2024
1 parent c43d943 commit a6545b2
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 53 deletions.
5 changes: 5 additions & 0 deletions cmd/tools/config/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,11 @@ func WriteYaml(w io.Writer) {
header: `
# Any configuration related to the streaming node server.`,
},
{
name: "streaming",
header: `
# Any configuration related to the streaming service.`,
},
}
marshller := YamlMarshaller{w, groups, result}
marshller.writeYamlRecursive(lo.Filter(result, func(d DocContent, _ int) bool {
Expand Down
13 changes: 13 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1036,3 +1036,16 @@ streamingNode:
serverMaxRecvSize: 268435456 # The maximum size of each RPC request that the streamingNode can receive, unit: byte
clientMaxSendSize: 268435456 # The maximum size of each RPC request that the clients on streamingNode can send, unit: byte
clientMaxRecvSize: 268435456 # The maximum size of each RPC request that the clients on streamingNode can receive, unit: byte

# Any configuration related to the streaming service.
streaming:
walBalancer:
# The interval of balance task trigger at background, 1 min by default.
# It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration
triggerInterval: 1m
# The initial interval of balance task trigger backoff, 50 ms by default.
# It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration
backoffInitialInterval: 50ms
backoffMultiplier: 2 # The multiplier of balance task trigger backoff, 2 by default
txn:
defaultKeepaliveTimeout: 10s # The default keepalive timeout for wal txn, 10s by default
3 changes: 2 additions & 1 deletion internal/distributed/streaming/streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ type TxnOption struct {

// Keepalive is the time to keepalive of the transaction.
// If the txn don't append message in the keepalive time, the txn will be expired.
// Only make sense when ttl is greater than 1ms.
// Only make sense when keepalive is greater than 1ms.
// The default value is 0, which means the keepalive is setted by the wal at streaming node.
Keepalive time.Duration
}

Expand Down
4 changes: 1 addition & 3 deletions internal/distributed/streaming/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package streaming
import (
"context"
"sync"
"time"

"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/streaming/util/types"
Expand Down Expand Up @@ -97,8 +96,7 @@ func (u *walAccesserImpl) appendToVChannel(ctx context.Context, vchannel string,
// Otherwise, we start a transaction to append the messages.
// The transaction will be committed when all messages are appended.
txn, err := u.Txn(ctx, TxnOption{
VChannel: vchannel,
Keepalive: 5 * time.Second,
VChannel: vchannel,
})
if err != nil {
resp.fillAllError(err)
Expand Down
2 changes: 1 addition & 1 deletion internal/distributed/streaming/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (w *walAccesserImpl) Txn(ctx context.Context, opts TxnOption) (Txn, error)
w.lifetime.Done()
return nil, status.NewInvaildArgument("vchannel is required")
}
if opts.Keepalive < 1*time.Millisecond {
if opts.Keepalive != 0 && opts.Keepalive < 1*time.Millisecond {
w.lifetime.Done()
return nil, status.NewInvaildArgument("ttl must be greater than or equal to 1ms")
}
Expand Down
8 changes: 4 additions & 4 deletions internal/streamingcoord/server/balancer/balancer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,12 +291,12 @@ type backoffConfigFetcher struct{}

func (f *backoffConfigFetcher) BackoffConfig() typeutil.BackoffConfig {
return typeutil.BackoffConfig{
InitialInterval: paramtable.Get().StreamingCoordCfg.AutoBalanceBackoffInitialInterval.GetAsDurationByParse(),
Multiplier: paramtable.Get().StreamingCoordCfg.AutoBalanceBackoffMultiplier.GetAsFloat(),
MaxInterval: paramtable.Get().StreamingCoordCfg.AutoBalanceTriggerInterval.GetAsDurationByParse(),
InitialInterval: paramtable.Get().StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse(),
Multiplier: paramtable.Get().StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat(),
MaxInterval: paramtable.Get().StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse(),
}
}

func (f *backoffConfigFetcher) DefaultInterval() time.Duration {
return paramtable.Get().StreamingCoordCfg.AutoBalanceTriggerInterval.GetAsDurationByParse()
return paramtable.Get().StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse()
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/streaming/util/message"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

// NewTxnManager creates a new transaction manager.
Expand All @@ -36,6 +37,13 @@ type TxnManager struct {
// We only support a transaction work on a streaming node, once the wal is transferred to another node,
// the transaction is treated as expired (rollback), and user will got a expired error, then perform a retry.
func (m *TxnManager) BeginNewTxn(ctx context.Context, timetick uint64, keepalive time.Duration) (*TxnSession, error) {
if keepalive == 0 {
// If keepalive is 0, the txn set the keepalive with default keepalive.
keepalive = paramtable.Get().StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse()
}
if keepalive < 1*time.Millisecond {
return nil, status.NewInvaildArgument("keepalive must be greater than 1ms")
}
id, err := resource.Resource().IDAllocator().Allocate(ctx)
if err != nil {
return nil, err
Expand Down
74 changes: 40 additions & 34 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,17 @@ type ComponentParam struct {
GpuConfig gpuConfig
TraceCfg traceConfig

RootCoordCfg rootCoordConfig
ProxyCfg proxyConfig
QueryCoordCfg queryCoordConfig
QueryNodeCfg queryNodeConfig
DataCoordCfg dataCoordConfig
DataNodeCfg dataNodeConfig
IndexNodeCfg indexNodeConfig
HTTPCfg httpConfig
LogCfg logConfig
RoleCfg roleConfig
StreamingCoordCfg streamingCoordConfig
StreamingNodeCfg streamingNodeConfig
RootCoordCfg rootCoordConfig
ProxyCfg proxyConfig
QueryCoordCfg queryCoordConfig
QueryNodeCfg queryNodeConfig
DataCoordCfg dataCoordConfig
DataNodeCfg dataNodeConfig
IndexNodeCfg indexNodeConfig
HTTPCfg httpConfig
LogCfg logConfig
RoleCfg roleConfig
StreamingCfg streamingConfig

RootCoordGrpcServerCfg GrpcServerConfig
ProxyGrpcServerCfg GrpcServerConfig
Expand Down Expand Up @@ -130,14 +129,11 @@ func (p *ComponentParam) init(bt *BaseTable) {
p.DataCoordCfg.init(bt)
p.DataNodeCfg.init(bt)
p.IndexNodeCfg.init(bt)
p.StreamingCoordCfg.init(bt)
p.StreamingNodeCfg.init(bt)
p.StreamingCfg.init(bt)
p.HTTPCfg.init(bt)
p.LogCfg.init(bt)
p.RoleCfg.init(bt)
p.GpuConfig.init(bt)
p.StreamingCoordCfg.init(bt)
p.StreamingNodeCfg.init(bt)

p.RootCoordGrpcServerCfg.Init("rootCoord", bt)
p.ProxyGrpcServerCfg.Init("proxy", bt)
Expand Down Expand Up @@ -4635,44 +4631,54 @@ func (p *indexNodeConfig) init(base *BaseTable) {
p.GracefulStopTimeout.Init(base.mgr)
}

type streamingCoordConfig struct {
AutoBalanceTriggerInterval ParamItem `refreshable:"true"`
AutoBalanceBackoffInitialInterval ParamItem `refreshable:"true"`
AutoBalanceBackoffMultiplier ParamItem `refreshable:"true"`
type streamingConfig struct {
// balancer
WALBalancerTriggerInterval ParamItem `refreshable:"true"`
WALBalancerBackoffInitialInterval ParamItem `refreshable:"true"`
WALBalancerBackoffMultiplier ParamItem `refreshable:"true"`

// txn
TxnDefaultKeepaliveTimeout ParamItem `refreshable:"true"`
}

func (p *streamingCoordConfig) init(base *BaseTable) {
p.AutoBalanceTriggerInterval = ParamItem{
Key: "streamingCoord.autoBalanceTriggerInterval",
func (p *streamingConfig) init(base *BaseTable) {
// balancer
p.WALBalancerTriggerInterval = ParamItem{
Key: "streaming.walBalancer.triggerInterval",
Version: "2.5.0",
Doc: `The interval of balance task trigger at background, 1 min by default.
It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration`,
DefaultValue: "1m",
Export: true,
}
p.AutoBalanceTriggerInterval.Init(base.mgr)
p.AutoBalanceBackoffInitialInterval = ParamItem{
Key: "streamingCoord.autoBalanceBackoffInitialInterval",
p.WALBalancerTriggerInterval.Init(base.mgr)
p.WALBalancerBackoffInitialInterval = ParamItem{
Key: "streaming.walBalancer.backoffInitialInterval",
Version: "2.5.0",
Doc: `The initial interval of balance task trigger backoff, 50 ms by default.
It's ok to set it into duration string, such as 30s or 1m30s, see time.ParseDuration`,
DefaultValue: "50ms",
Export: true,
}
p.AutoBalanceBackoffInitialInterval.Init(base.mgr)
p.AutoBalanceBackoffMultiplier = ParamItem{
Key: "streamingCoord.autoBalanceBackoffMultiplier",
p.WALBalancerBackoffInitialInterval.Init(base.mgr)
p.WALBalancerBackoffMultiplier = ParamItem{
Key: "streaming.walBalancer.backoffMultiplier",
Version: "2.5.0",
Doc: "The multiplier of balance task trigger backoff, 2 by default",
DefaultValue: "2",
Export: true,
}
p.AutoBalanceBackoffMultiplier.Init(base.mgr)
}

type streamingNodeConfig struct{}
p.WALBalancerBackoffMultiplier.Init(base.mgr)

func (p *streamingNodeConfig) init(base *BaseTable) {
// txn
p.TxnDefaultKeepaliveTimeout = ParamItem{
Key: "streaming.txn.defaultKeepaliveTimeout",
Version: "2.5.0",
Doc: "The default keepalive timeout for wal txn, 10s by default",
DefaultValue: "10s",
Export: true,
}
p.TxnDefaultKeepaliveTimeout.Init(base.mgr)
}

type runtimeConfig struct {
Expand Down
23 changes: 13 additions & 10 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,16 +583,19 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second))
})

t.Run("test streamingCoordConfig", func(t *testing.T) {
assert.Equal(t, 1*time.Minute, params.StreamingCoordCfg.AutoBalanceTriggerInterval.GetAsDurationByParse())
assert.Equal(t, 50*time.Millisecond, params.StreamingCoordCfg.AutoBalanceBackoffInitialInterval.GetAsDurationByParse())
assert.Equal(t, 2.0, params.StreamingCoordCfg.AutoBalanceBackoffMultiplier.GetAsFloat())
params.Save(params.StreamingCoordCfg.AutoBalanceTriggerInterval.Key, "50s")
params.Save(params.StreamingCoordCfg.AutoBalanceBackoffInitialInterval.Key, "50s")
params.Save(params.StreamingCoordCfg.AutoBalanceBackoffMultiplier.Key, "3.5")
assert.Equal(t, 50*time.Second, params.StreamingCoordCfg.AutoBalanceTriggerInterval.GetAsDurationByParse())
assert.Equal(t, 50*time.Second, params.StreamingCoordCfg.AutoBalanceBackoffInitialInterval.GetAsDurationByParse())
assert.Equal(t, 3.5, params.StreamingCoordCfg.AutoBalanceBackoffMultiplier.GetAsFloat())
t.Run("test streamingConfig", func(t *testing.T) {
assert.Equal(t, 1*time.Minute, params.StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse())
assert.Equal(t, 50*time.Millisecond, params.StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse())
assert.Equal(t, 2.0, params.StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat())
assert.Equal(t, 10*time.Second, params.StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse())
params.Save(params.StreamingCfg.WALBalancerTriggerInterval.Key, "50s")
params.Save(params.StreamingCfg.WALBalancerBackoffInitialInterval.Key, "50s")
params.Save(params.StreamingCfg.WALBalancerBackoffMultiplier.Key, "3.5")
params.Save(params.StreamingCfg.TxnDefaultKeepaliveTimeout.Key, "3500ms")
assert.Equal(t, 50*time.Second, params.StreamingCfg.WALBalancerTriggerInterval.GetAsDurationByParse())
assert.Equal(t, 50*time.Second, params.StreamingCfg.WALBalancerBackoffInitialInterval.GetAsDurationByParse())
assert.Equal(t, 3.5, params.StreamingCfg.WALBalancerBackoffMultiplier.GetAsFloat())
assert.Equal(t, 3500*time.Millisecond, params.StreamingCfg.TxnDefaultKeepaliveTimeout.GetAsDurationByParse())
})

t.Run("channel config priority", func(t *testing.T) {
Expand Down

0 comments on commit a6545b2

Please sign in to comment.