From aa0a87eda728cb46cf777ab6ee1f7753de507577 Mon Sep 17 00:00:00 2001 From: Bingyi Sun Date: Thu, 2 Jan 2025 15:04:58 +0800 Subject: [PATCH] fix: Block warmup submit if pool full in sync mode (#38690) https://github.com/milvus-io/milvus/issues/38692 --------- Signed-off-by: sunby --- internal/core/src/common/ChunkTarget.cpp | 1 + .../querynodev2/delegator/delegator_data.go | 1 + internal/querynodev2/segments/manager_test.go | 1 + internal/querynodev2/segments/pool.go | 2 +- .../querynodev2/segments/retrieve_test.go | 2 + internal/querynodev2/segments/search_test.go | 2 + internal/querynodev2/segments/segment.go | 67 +++++++++++++++++-- .../querynodev2/segments/segment_loader.go | 7 ++ .../segments/segment_loader_test.go | 6 +- internal/querynodev2/segments/segment_test.go | 24 +++++++ internal/querynodev2/server.go | 2 +- internal/querynodev2/server_test.go | 1 + 12 files changed, 106 insertions(+), 10 deletions(-) diff --git a/internal/core/src/common/ChunkTarget.cpp b/internal/core/src/common/ChunkTarget.cpp index 495fa910c75d9..f08a8ebb8584d 100644 --- a/internal/core/src/common/ChunkTarget.cpp +++ b/internal/core/src/common/ChunkTarget.cpp @@ -102,6 +102,7 @@ MmapChunkTarget::get() { write(padding, padding_size); flush(); + file_.FFlush(); auto m = mmap( nullptr, size_, PROT_READ, MAP_SHARED, file_.Descriptor(), offset_); diff --git a/internal/querynodev2/delegator/delegator_data.go b/internal/querynodev2/delegator/delegator_data.go index b21d7784ac616..586f56f2d6794 100644 --- a/internal/querynodev2/delegator/delegator_data.go +++ b/internal/querynodev2/delegator/delegator_data.go @@ -116,6 +116,7 @@ func (sd *shardDelegator) ProcessInsert(insertRecords map[int64]*InsertData) { DeltaPosition: insertData.StartPosition, Level: datapb.SegmentLevel_L1, }, + nil, ) if err != nil { log.Error("failed to create new segment", diff --git a/internal/querynodev2/segments/manager_test.go b/internal/querynodev2/segments/manager_test.go index 7fc0851e16dfb..f9d6ba5a04e3c 100644 --- a/internal/querynodev2/segments/manager_test.go +++ b/internal/querynodev2/segments/manager_test.go @@ -66,6 +66,7 @@ func (s *ManagerSuite) SetupTest() { InsertChannel: s.channels[i], Level: s.levels[i], }, + nil, ) s.Require().NoError(err) s.segments = append(s.segments, segment) diff --git a/internal/querynodev2/segments/pool.go b/internal/querynodev2/segments/pool.go index 578a9c37ef423..ca9971caa93a4 100644 --- a/internal/querynodev2/segments/pool.go +++ b/internal/querynodev2/segments/pool.go @@ -142,7 +142,7 @@ func initWarmupPool() { runtime.LockOSThread() C.SetThreadName(cgoTagWarmup) }), // lock os thread for cgo thread disposal - conc.WithNonBlocking(true), // make warming up non blocking + conc.WithNonBlocking(false), ) warmupPool.Store(pool) diff --git a/internal/querynodev2/segments/retrieve_test.go b/internal/querynodev2/segments/retrieve_test.go index 3e12eb334b68a..0e9193cc1c144 100644 --- a/internal/querynodev2/segments/retrieve_test.go +++ b/internal/querynodev2/segments/retrieve_test.go @@ -98,6 +98,7 @@ func (suite *RetrieveSuite) SetupTest() { InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID), Level: datapb.SegmentLevel_Legacy, }, + nil, ) suite.Require().NoError(err) @@ -126,6 +127,7 @@ func (suite *RetrieveSuite) SetupTest() { InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID), Level: datapb.SegmentLevel_Legacy, }, + nil, ) suite.Require().NoError(err) diff --git a/internal/querynodev2/segments/search_test.go b/internal/querynodev2/segments/search_test.go index 11d003769a87e..f731b7f9817a7 100644 --- a/internal/querynodev2/segments/search_test.go +++ b/internal/querynodev2/segments/search_test.go @@ -88,6 +88,7 @@ func (suite *SearchSuite) SetupTest() { InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID), Level: datapb.SegmentLevel_Legacy, }, + nil, ) suite.Require().NoError(err) @@ -116,6 +117,7 @@ func (suite *SearchSuite) SetupTest() { InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID), Level: datapb.SegmentLevel_Legacy, }, + nil, ) suite.Require().NoError(err) diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index dab60442b3f41..6546cee1e3cb0 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -30,6 +30,7 @@ import ( "context" "fmt" "strings" + "sync" "time" "unsafe" @@ -279,6 +280,7 @@ type LocalSegment struct { lastDeltaTimestamp *atomic.Uint64 fields *typeutil.ConcurrentMap[int64, *FieldInfo] fieldIndexes *typeutil.ConcurrentMap[int64, *IndexedFieldInfo] + warmupDispatcher *AsyncWarmupDispatcher } func NewSegment(ctx context.Context, @@ -286,6 +288,7 @@ func NewSegment(ctx context.Context, segmentType SegmentType, version int64, loadInfo *querypb.SegmentLoadInfo, + warmupDispatcher *AsyncWarmupDispatcher, ) (Segment, error) { log := log.Ctx(ctx) /* @@ -345,9 +348,10 @@ func NewSegment(ctx context.Context, fields: typeutil.NewConcurrentMap[int64, *FieldInfo](), fieldIndexes: typeutil.NewConcurrentMap[int64, *IndexedFieldInfo](), - memSize: atomic.NewInt64(-1), - rowNum: atomic.NewInt64(-1), - insertCount: atomic.NewInt64(0), + memSize: atomic.NewInt64(-1), + rowNum: atomic.NewInt64(-1), + insertCount: atomic.NewInt64(0), + warmupDispatcher: warmupDispatcher, } if err := segment.initializeSegment(); err != nil { @@ -1157,7 +1161,7 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmap return nil, nil }).Await() case "async": - GetWarmupPool().Submit(func() (any, error) { + task := func() (any, error) { // bad implemtation, warmup is async at another goroutine and hold the rlock. // the state transition of segment in segment loader will blocked. // add a waiter to avoid it. @@ -1176,7 +1180,8 @@ func (s *LocalSegment) WarmupChunkCache(ctx context.Context, fieldID int64, mmap } log.Info("warming up chunk cache asynchronously done") return nil, nil - }) + } + s.warmupDispatcher.AddTask(task) default: // no warming up } @@ -1347,3 +1352,55 @@ func (s *LocalSegment) indexNeedLoadRawData(schema *schemapb.CollectionSchema, i } return !typeutil.IsVectorType(fieldSchema.DataType) && s.HasRawData(indexInfo.IndexInfo.FieldID), nil } + +type ( + WarmupTask = func() (any, error) + AsyncWarmupDispatcher struct { + mu sync.RWMutex + tasks []WarmupTask + notify chan struct{} + } +) + +func NewWarmupDispatcher() *AsyncWarmupDispatcher { + return &AsyncWarmupDispatcher{ + notify: make(chan struct{}, 1), + } +} + +func (d *AsyncWarmupDispatcher) AddTask(task func() (any, error)) { + d.mu.Lock() + d.tasks = append(d.tasks, task) + d.mu.Unlock() + select { + case d.notify <- struct{}{}: + default: + } +} + +func (d *AsyncWarmupDispatcher) Run(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-d.notify: + d.mu.RLock() + tasks := make([]WarmupTask, len(d.tasks)) + copy(tasks, d.tasks) + d.mu.RUnlock() + + for _, task := range tasks { + select { + case <-ctx.Done(): + return + default: + GetDynamicPool().Submit(task) + } + } + + d.mu.Lock() + d.tasks = d.tasks[len(tasks):] + d.mu.Unlock() + } + } +} diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 6e883ab30781a..390f4a460d6ef 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -147,6 +147,7 @@ type resourceEstimateFactor struct { } func NewLoader( + ctx context.Context, manager *Manager, cm storage.ChunkManager, ) *segmentLoader { @@ -167,11 +168,14 @@ func NewLoader( log.Info("SegmentLoader created", zap.Int("ioPoolSize", ioPoolSize)) + warmupDispatcher := NewWarmupDispatcher() + go warmupDispatcher.Run(ctx) loader := &segmentLoader{ manager: manager, cm: cm, loadingSegments: typeutil.NewConcurrentMap[int64, *loadResult](), committedResourceNotifier: syncutil.NewVersionedNotifier(), + warmupDispatcher: warmupDispatcher, } return loader @@ -212,6 +216,8 @@ type segmentLoader struct { loadingSegments *typeutil.ConcurrentMap[int64, *loadResult] committedResource LoadResource committedResourceNotifier *syncutil.VersionedNotifier + + warmupDispatcher *AsyncWarmupDispatcher } var _ Loader = (*segmentLoader)(nil) @@ -294,6 +300,7 @@ func (loader *segmentLoader) Load(ctx context.Context, segmentType, version, loadInfo, + loader.warmupDispatcher, ) if err != nil { log.Warn("load segment failed when create new segment", diff --git a/internal/querynodev2/segments/segment_loader_test.go b/internal/querynodev2/segments/segment_loader_test.go index 4b87d4a182afc..0c888c73b5ba7 100644 --- a/internal/querynodev2/segments/segment_loader_test.go +++ b/internal/querynodev2/segments/segment_loader_test.go @@ -81,7 +81,7 @@ func (suite *SegmentLoaderSuite) SetupTest() { // Dependencies suite.manager = NewManager() - suite.loader = NewLoader(suite.manager, suite.chunkManager) + suite.loader = NewLoader(ctx, suite.manager, suite.chunkManager) initcore.InitRemoteChunkManager(paramtable.Get()) // Data @@ -98,7 +98,7 @@ func (suite *SegmentLoaderSuite) SetupTest() { func (suite *SegmentLoaderSuite) SetupBM25() { // Dependencies suite.manager = NewManager() - suite.loader = NewLoader(suite.manager, suite.chunkManager) + suite.loader = NewLoader(context.Background(), suite.manager, suite.chunkManager) initcore.InitRemoteChunkManager(paramtable.Get()) suite.schema = mock_segcore.GenTestBM25CollectionSchema("test") @@ -798,7 +798,7 @@ func (suite *SegmentLoaderDetailSuite) SetupTest() { ctx := context.Background() chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), suite.rootPath) suite.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(ctx) - suite.loader = NewLoader(suite.manager, suite.chunkManager) + suite.loader = NewLoader(ctx, suite.manager, suite.chunkManager) initcore.InitRemoteChunkManager(paramtable.Get()) // Data diff --git a/internal/querynodev2/segments/segment_test.go b/internal/querynodev2/segments/segment_test.go index f35a2d513a0f5..42188cc42f74c 100644 --- a/internal/querynodev2/segments/segment_test.go +++ b/internal/querynodev2/segments/segment_test.go @@ -5,8 +5,11 @@ import ( "fmt" "path/filepath" "testing" + "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" + "go.uber.org/atomic" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/mocks/util/mock_segcore" @@ -91,6 +94,7 @@ func (suite *SegmentSuite) SetupTest() { }, }, }, + nil, ) suite.Require().NoError(err) @@ -122,6 +126,7 @@ func (suite *SegmentSuite) SetupTest() { InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", suite.collectionID), Level: datapb.SegmentLevel_Legacy, }, + nil, ) suite.Require().NoError(err) @@ -222,3 +227,22 @@ func (suite *SegmentSuite) TestSegmentReleased() { func TestSegment(t *testing.T) { suite.Run(t, new(SegmentSuite)) } + +func TestWarmupDispatcher(t *testing.T) { + d := NewWarmupDispatcher() + ctx := context.Background() + go d.Run(ctx) + + completed := atomic.NewInt64(0) + taskCnt := 10000 + for i := 0; i < taskCnt; i++ { + d.AddTask(func() (any, error) { + completed.Inc() + return nil, nil + }) + } + + assert.Eventually(t, func() bool { + return completed.Load() == int64(taskCnt) + }, 10*time.Second, time.Second) +} diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index fd331a6bdfa0b..d9cbae89b13a7 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -369,7 +369,7 @@ func (node *QueryNode) Init() error { node.subscribingChannels = typeutil.NewConcurrentSet[string]() node.unsubscribingChannels = typeutil.NewConcurrentSet[string]() node.manager = segments.NewManager() - node.loader = segments.NewLoader(node.manager, node.chunkManager) + node.loader = segments.NewLoader(node.ctx, node.manager, node.chunkManager) node.manager.SetLoader(node.loader) node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, node.GetNodeID()) // init pipeline manager diff --git a/internal/querynodev2/server_test.go b/internal/querynodev2/server_test.go index 6800e9c4c8c0f..a6ea597b1875c 100644 --- a/internal/querynodev2/server_test.go +++ b/internal/querynodev2/server_test.go @@ -236,6 +236,7 @@ func (suite *QueryNodeSuite) TestStop() { Level: datapb.SegmentLevel_Legacy, InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", 1), }, + nil, ) suite.NoError(err) suite.node.manager.Segment.Put(context.Background(), segments.SegmentTypeSealed, segment)