Skip to content

Commit

Permalink
enhance: make SyncManager pool size refreshable (#29224)
Browse files Browse the repository at this point in the history
See also #29223

This PR make `conc.Pool` resizable by adding `Resize` method for it. 
Also make newly added datanode `MaxParallelSyncMgrTasks` config
refreshable

---------

Signed-off-by: Congqi.Xia <[email protected]>
  • Loading branch information
congqixia authored Dec 15, 2023
1 parent 25a4525 commit 4731c1b
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 20 deletions.
3 changes: 1 addition & 2 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,7 @@ func (node *DataNode) Init() error {
}

node.chunkManager = chunkManager
syncMgr, err := syncmgr.NewSyncManager(paramtable.Get().DataNodeCfg.MaxParallelSyncMgrTasks.GetAsInt(),
node.chunkManager, node.allocator)
syncMgr, err := syncmgr.NewSyncManager(node.chunkManager, node.allocator)
if err != nil {
initError = err
log.Error("failed to create sync manager", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func newIDLEDataNodeMock(ctx context.Context, pkType schemapb.DataType) *DataNod
node.broker = broker
node.timeTickSender = newTimeTickSender(node.broker, 0)

syncMgr, _ := syncmgr.NewSyncManager(10, node.chunkManager, node.allocator)
syncMgr, _ := syncmgr.NewSyncManager(node.chunkManager, node.allocator)

node.syncMgr = syncMgr
node.writeBufferManager = writebuffer.NewManager(node.syncMgr)
Expand Down
5 changes: 3 additions & 2 deletions internal/datanode/syncmgr/key_lock_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ type keyLockDispatcher[K comparable] struct {
}

func newKeyLockDispatcher[K comparable](maxParallel int) *keyLockDispatcher[K] {
return &keyLockDispatcher[K]{
workerPool: conc.NewPool[error](maxParallel, conc.WithPreAlloc(true)),
dispatcher := &keyLockDispatcher[K]{
workerPool: conc.NewPool[error](maxParallel, conc.WithPreAlloc(false)),
keyLock: lock.NewKeyLock[K](),
}
return dispatcher
}

func (d *keyLockDispatcher[K]) Submit(key K, t Task, callbacks ...func(error)) *conc.Future[error] {
Expand Down
54 changes: 44 additions & 10 deletions internal/datanode/syncmgr/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,18 @@ import (
"fmt"
"strconv"

"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/allocator"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -57,19 +62,48 @@ type syncManager struct {
tasks *typeutil.ConcurrentMap[string, Task]
}

func NewSyncManager(parallelTask int, chunkManager storage.ChunkManager, allocator allocator.Interface) (SyncManager, error) {
if parallelTask < 1 {
return nil, merr.WrapErrParameterInvalid("positive parallel task number", strconv.FormatInt(int64(parallelTask), 10))
func NewSyncManager(chunkManager storage.ChunkManager, allocator allocator.Interface) (SyncManager, error) {
params := paramtable.Get()
initPoolSize := params.DataNodeCfg.MaxParallelSyncMgrTasks.GetAsInt()
if initPoolSize < 1 {
return nil, merr.WrapErrParameterInvalid("positive parallel task number", strconv.FormatInt(int64(initPoolSize), 10))
}
return &syncManager{
keyLockDispatcher: newKeyLockDispatcher[int64](parallelTask),
dispatcher := newKeyLockDispatcher[int64](initPoolSize)
log.Info("sync manager initialized", zap.Int("initPoolSize", initPoolSize))

syncMgr := &syncManager{
keyLockDispatcher: dispatcher,
chunkManager: chunkManager,
allocator: allocator,
tasks: typeutil.NewConcurrentMap[string, Task](),
}, nil
}
// setup config update watcher
params.Watch(params.DataNodeCfg.MaxParallelSyncMgrTasks.Key, config.NewHandler("datanode.syncmgr.poolsize", syncMgr.resizeHandler))

return syncMgr, nil
}

func (mgr *syncManager) resizeHandler(evt *config.Event) {
if evt.HasUpdated {
log := log.Ctx(context.Background()).With(
zap.String("key", evt.Key),
zap.String("value", evt.Value),
)
size, err := strconv.ParseInt(evt.Value, 10, 64)
if err != nil {
log.Warn("failed to parse new datanode syncmgr pool size", zap.Error(err))
return
}
err = mgr.keyLockDispatcher.workerPool.Resize(int(size))
if err != nil {
log.Warn("failed to resize datanode syncmgr pool size", zap.String("key", evt.Key), zap.String("value", evt.Value), zap.Error(err))
return
}
log.Info("sync mgr pool size updated", zap.Int64("newSize", size))
}
}

func (mgr syncManager) SyncData(ctx context.Context, task Task) *conc.Future[error] {
func (mgr *syncManager) SyncData(ctx context.Context, task Task) *conc.Future[error] {
switch t := task.(type) {
case *SyncTask:
t.WithAllocator(mgr.allocator).WithChunkManager(mgr.chunkManager)
Expand All @@ -88,7 +122,7 @@ func (mgr syncManager) SyncData(ctx context.Context, task Task) *conc.Future[err
})
}

func (mgr syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) {
func (mgr *syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) {
var cp *msgpb.MsgPosition
var segmentID int64
mgr.tasks.Range(func(_ string, task Task) bool {
Expand All @@ -106,10 +140,10 @@ func (mgr syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPos
return segmentID, cp
}

func (mgr syncManager) Block(segmentID int64) {
func (mgr *syncManager) Block(segmentID int64) {
mgr.keyLock.Lock(segmentID)
}

func (mgr syncManager) Unblock(segmentID int64) {
func (mgr *syncManager) Unblock(segmentID int64) {
mgr.keyLock.Unlock(segmentID)
}
63 changes: 59 additions & 4 deletions internal/datanode/syncmgr/sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package syncmgr
import (
"context"
"math/rand"
"strconv"
"testing"
"time"

Expand All @@ -21,6 +22,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/config"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
Expand All @@ -41,7 +43,7 @@ type SyncManagerSuite struct {
}

func (s *SyncManagerSuite) SetupSuite() {
paramtable.Get().Init(paramtable.NewBaseTable())
paramtable.Get().Init(paramtable.NewBaseTable(paramtable.SkipRemote(true)))

s.collectionID = 100
s.partitionID = 101
Expand Down Expand Up @@ -155,7 +157,7 @@ func (s *SyncManagerSuite) TestSubmit() {
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()

manager, err := NewSyncManager(10, s.chunkManager, s.allocator)
manager, err := NewSyncManager(s.chunkManager, s.allocator)
s.NoError(err)
task := s.getSuiteSyncTask()
task.WithMetaWriter(BrokerMetaWriter(s.broker))
Expand Down Expand Up @@ -187,7 +189,7 @@ func (s *SyncManagerSuite) TestCompacted() {
s.metacache.EXPECT().GetSegmentsBy(mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()

manager, err := NewSyncManager(10, s.chunkManager, s.allocator)
manager, err := NewSyncManager(s.chunkManager, s.allocator)
s.NoError(err)
task := s.getSuiteSyncTask()
task.WithMetaWriter(BrokerMetaWriter(s.broker))
Expand Down Expand Up @@ -225,7 +227,7 @@ func (s *SyncManagerSuite) TestBlock() {
}
})

manager, err := NewSyncManager(10, s.chunkManager, s.allocator)
manager, err := NewSyncManager(s.chunkManager, s.allocator)
s.NoError(err)

// block
Expand Down Expand Up @@ -253,6 +255,59 @@ func (s *SyncManagerSuite) TestBlock() {
<-sig
}

func (s *SyncManagerSuite) TestResizePool() {
manager, err := NewSyncManager(s.chunkManager, s.allocator)
s.NoError(err)

syncMgr, ok := manager.(*syncManager)
s.Require().True(ok)

cap := syncMgr.keyLockDispatcher.workerPool.Cap()
s.NotZero(cap)

params := paramtable.Get()
configKey := params.DataNodeCfg.MaxParallelSyncMgrTasks.Key

syncMgr.resizeHandler(&config.Event{
Key: configKey,
Value: "abc",
HasUpdated: true,
})

s.Equal(cap, syncMgr.keyLockDispatcher.workerPool.Cap())

syncMgr.resizeHandler(&config.Event{
Key: configKey,
Value: "-1",
HasUpdated: true,
})
s.Equal(cap, syncMgr.keyLockDispatcher.workerPool.Cap())

syncMgr.resizeHandler(&config.Event{
Key: configKey,
Value: strconv.FormatInt(int64(cap*2), 10),
HasUpdated: true,
})
s.Equal(cap*2, syncMgr.keyLockDispatcher.workerPool.Cap())
}

func (s *SyncManagerSuite) TestNewSyncManager() {
manager, err := NewSyncManager(s.chunkManager, s.allocator)
s.NoError(err)

_, ok := manager.(*syncManager)
s.Require().True(ok)

params := paramtable.Get()
configKey := params.DataNodeCfg.MaxParallelSyncMgrTasks.Key
defer params.Reset(configKey)

params.Save(configKey, "0")

_, err = NewSyncManager(s.chunkManager, s.allocator)
s.Error(err)
}

func TestSyncManager(t *testing.T) {
suite.Run(t, new(SyncManagerSuite))
}
13 changes: 13 additions & 0 deletions pkg/util/conc/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ package conc

import (
"fmt"
"strconv"
"sync"

ants "github.com/panjf2000/ants/v2"

"github.com/milvus-io/milvus/pkg/util/generic"
"github.com/milvus-io/milvus/pkg/util/hardware"
"github.com/milvus-io/milvus/pkg/util/merr"
)

// A goroutine pool
Expand Down Expand Up @@ -110,6 +112,17 @@ func (pool *Pool[T]) Release() {
pool.inner.Release()
}

func (pool *Pool[T]) Resize(size int) error {
if pool.opt.preAlloc {
return merr.WrapErrServiceInternal("cannot resize pre-alloc pool")
}
if size <= 0 {
return merr.WrapErrParameterInvalid("positive size", strconv.FormatInt(int64(size), 10))
}
pool.inner.Tune(size)
return nil
}

// WarmupPool do warm up logic for each goroutine in pool
func WarmupPool[T any](pool *Pool[T], warmup func()) {
cap := pool.Cap()
Expand Down
21 changes: 21 additions & 0 deletions pkg/util/conc/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"time"

"github.com/stretchr/testify/assert"

"github.com/milvus-io/milvus/pkg/util/hardware"
)

func TestPool(t *testing.T) {
Expand Down Expand Up @@ -55,6 +57,25 @@ func TestPool(t *testing.T) {
}
}

func TestPoolResize(t *testing.T) {
cpuNum := hardware.GetCPUNum()

pool := NewPool[any](cpuNum)

assert.Equal(t, cpuNum, pool.Cap())

err := pool.Resize(cpuNum * 2)
assert.NoError(t, err)
assert.Equal(t, cpuNum*2, pool.Cap())

err = pool.Resize(0)
assert.Error(t, err)

pool = NewDefaultPool[any]()
err = pool.Resize(cpuNum * 2)
assert.Error(t, err)
}

func TestPoolWithPanic(t *testing.T) {
pool := NewPool[any](1, WithConcealPanic(true))

Expand Down
2 changes: 1 addition & 1 deletion pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -2589,7 +2589,7 @@ type dataNodeConfig struct {
FlowGraphMaxQueueLength ParamItem `refreshable:"false"`
FlowGraphMaxParallelism ParamItem `refreshable:"false"`
MaxParallelSyncTaskNum ParamItem `refreshable:"false"`
MaxParallelSyncMgrTasks ParamItem `refreshable:"false"`
MaxParallelSyncMgrTasks ParamItem `refreshable:"true"`

// skip mode
FlowGraphSkipModeEnable ParamItem `refreshable:"true"`
Expand Down

0 comments on commit 4731c1b

Please sign in to comment.