diff --git a/internal/querynodev2/segments/disk_usage_fetcher.go b/internal/querynodev2/segments/disk_usage_fetcher.go new file mode 100644 index 0000000000000..0bb9060af7f9f --- /dev/null +++ b/internal/querynodev2/segments/disk_usage_fetcher.go @@ -0,0 +1,78 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package segments + +import ( + "context" + "fmt" + "time" + + "go.uber.org/atomic" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/metrics" + "github.com/milvus-io/milvus/pkg/util/paramtable" +) + +type diskUsageFetcher struct { + ctx context.Context + path string + diskUsage *atomic.Int64 +} + +func NewDiskUsageFetcher(ctx context.Context) *diskUsageFetcher { + return &diskUsageFetcher{ + ctx: ctx, + path: paramtable.Get().LocalStorageCfg.Path.GetValue(), + diskUsage: atomic.NewInt64(0), + } +} + +func (d *diskUsageFetcher) GetDiskUsage() int64 { + return d.diskUsage.Load() +} + +func (d *diskUsageFetcher) fetch() { + diskUsage, err := GetLocalUsedSize(d.ctx, d.path) + if err != nil { + log.Warn("failed to get disk usage", zap.Error(err)) + return + } + d.diskUsage.Store(diskUsage) + metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(diskUsage) / 1024 / 1024) // in MB + log.Ctx(d.ctx).WithRateGroup("diskUsageFetcher", 1, 300). + RatedInfo(300, "querynode disk usage", zap.Int64("size", diskUsage), zap.Int64("nodeID", paramtable.GetNodeID())) +} + +func (d *diskUsageFetcher) Start() { + d.fetch() // Synchronously fetch once before starting. + + interval := paramtable.Get().QueryNodeCfg.DiskSizeFetchInterval.GetAsDuration(time.Second) + ticker := time.NewTicker(interval) + defer ticker.Stop() + go func() { + for { + select { + case <-d.ctx.Done(): + return + case <-ticker.C: + d.fetch() + } + } + }() +} diff --git a/internal/querynodev2/segments/segment.go b/internal/querynodev2/segments/segment.go index 24a4e41db6f70..e70fb4f1087ef 100644 --- a/internal/querynodev2/segments/segment.go +++ b/internal/querynodev2/segments/segment.go @@ -1624,12 +1624,6 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) { C.DeleteSegment(ptr) - localDiskUsage, err := GetLocalUsedSize(context.Background(), paramtable.Get().LocalStorageCfg.Path.GetValue()) - // ignore error here, shall not block releasing - if err == nil { - metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localDiskUsage) / 1024 / 1024) // in MB - } - log.Info("delete segment from memory") } diff --git a/internal/querynodev2/segments/segment_loader.go b/internal/querynodev2/segments/segment_loader.go index 0cf98a0a589be..4815bc426818f 100644 --- a/internal/querynodev2/segments/segment_loader.go +++ b/internal/querynodev2/segments/segment_loader.go @@ -150,11 +150,12 @@ type segmentLoaderV2 struct { } func NewLoaderV2( + ctx context.Context, manager *Manager, cm storage.ChunkManager, ) *segmentLoaderV2 { return &segmentLoaderV2{ - segmentLoader: NewLoader(manager, cm), + segmentLoader: NewLoader(ctx, manager, cm), } } @@ -545,6 +546,7 @@ func (loader *segmentLoaderV2) loadSealedSegmentFields(ctx context.Context, segm } func NewLoader( + ctx context.Context, manager *Manager, cm storage.ChunkManager, ) *segmentLoader { @@ -564,12 +566,15 @@ func NewLoader( } log.Info("SegmentLoader created", zap.Int("ioPoolSize", ioPoolSize)) + duf := NewDiskUsageFetcher(ctx) + duf.Start() loader := &segmentLoader{ manager: manager, cm: cm, loadingSegments: typeutil.NewConcurrentMap[int64, *loadResult](), committedResourceNotifier: syncutil.NewVersionedNotifier(), + duf: duf, } return loader @@ -605,11 +610,14 @@ type segmentLoader struct { manager *Manager cm storage.ChunkManager - mut sync.Mutex // The channel will be closed as the segment loaded - loadingSegments *typeutil.ConcurrentMap[int64, *loadResult] + loadingSegments *typeutil.ConcurrentMap[int64, *loadResult] + + mut sync.Mutex // guards committedResource committedResource LoadResource committedResourceNotifier *syncutil.VersionedNotifier + + duf *diskUsageFetcher } var _ Loader = (*segmentLoader)(nil) @@ -781,8 +789,6 @@ func (loader *segmentLoader) prepare(ctx context.Context, segmentType SegmentTyp log := log.Ctx(ctx).With( zap.Stringer("segmentType", segmentType), ) - loader.mut.Lock() - defer loader.mut.Unlock() // filter out loaded & loading segments infos := make([]*querypb.SegmentLoadInfo, 0, len(segments)) @@ -805,8 +811,6 @@ func (loader *segmentLoader) prepare(ctx context.Context, segmentType SegmentTyp } func (loader *segmentLoader) unregister(segments ...*querypb.SegmentLoadInfo) { - loader.mut.Lock() - defer loader.mut.Unlock() for i := range segments { result, ok := loader.loadingSegments.GetAndRemove(segments[i].GetSegmentID()) if ok { @@ -841,6 +845,15 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer zap.Int64s("segmentIDs", segmentIDs), ) + memoryUsage := hardware.GetUsedMemoryCount() + totalMemory := hardware.GetMemoryCount() + + diskUsage := loader.duf.GetDiskUsage() + if diskUsage == 0 { + return requestResourceResult{}, errors.New("get local used size failed") + } + diskCap := paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsUint64() + loader.mut.Lock() defer loader.mut.Unlock() @@ -848,15 +861,6 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer CommittedResource: loader.committedResource, } - memoryUsage := hardware.GetUsedMemoryCount() - totalMemory := hardware.GetMemoryCount() - - diskUsage, err := GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue()) - if err != nil { - return result, errors.Wrap(err, "get local used size failed") - } - diskCap := paramtable.Get().QueryNodeCfg.DiskCapacityLimit.GetAsUint64() - if loader.committedResource.MemorySize+memoryUsage >= totalMemory { return result, merr.WrapErrServiceMemoryLimitExceeded(float32(loader.committedResource.MemorySize+memoryUsage), float32(totalMemory)) } else if loader.committedResource.DiskSize+uint64(diskUsage) >= diskCap { @@ -864,7 +868,7 @@ func (loader *segmentLoader) requestResource(ctx context.Context, infos ...*quer } result.ConcurrencyLevel = funcutil.Min(hardware.GetCPUNum(), len(infos)) - mu, du, err := loader.checkSegmentSize(ctx, infos) + mu, du, err := loader.checkSegmentSize(ctx, infos, memoryUsage, totalMemory, diskUsage) if err != nil { log.Warn("no sufficient resource to load segments", zap.Error(err)) return result, err @@ -1527,7 +1531,7 @@ func JoinIDPath(ids ...int64) string { // checkSegmentSize checks whether the memory & disk is sufficient to load the segments // returns the memory & disk usage while loading if possible to load, // otherwise, returns error -func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadInfos []*querypb.SegmentLoadInfo) (uint64, uint64, error) { +func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadInfos []*querypb.SegmentLoadInfo, memUsage, totalMem uint64, localDiskUsage int64) (uint64, uint64, error) { if len(segmentLoadInfos) == 0 { return 0, 0, nil } @@ -1540,18 +1544,11 @@ func (loader *segmentLoader) checkSegmentSize(ctx context.Context, segmentLoadIn return float64(mem) / 1024 / 1024 } - memUsage := hardware.GetUsedMemoryCount() + loader.committedResource.MemorySize - totalMem := hardware.GetMemoryCount() + memUsage = memUsage + loader.committedResource.MemorySize if memUsage == 0 || totalMem == 0 { return 0, 0, errors.New("get memory failed when checkSegmentSize") } - localDiskUsage, err := GetLocalUsedSize(ctx, paramtable.Get().LocalStorageCfg.Path.GetValue()) - if err != nil { - return 0, 0, errors.Wrap(err, "get local used size failed") - } - - metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(toMB(uint64(localDiskUsage))) diskUsage := uint64(localDiskUsage) + loader.committedResource.DiskSize factor := resourceEstimateFactor{ diff --git a/internal/querynodev2/segments/segment_loader_test.go b/internal/querynodev2/segments/segment_loader_test.go index 03c53cce325d4..a60780c87ee3c 100644 --- a/internal/querynodev2/segments/segment_loader_test.go +++ b/internal/querynodev2/segments/segment_loader_test.go @@ -85,7 +85,7 @@ func (suite *SegmentLoaderSuite) SetupTest() { // fmt.Sprintf("/tmp/milvus-ut/%d", rand.Int63()))) chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), suite.rootPath) suite.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(ctx) - suite.loader = NewLoader(suite.manager, suite.chunkManager) + suite.loader = NewLoader(context.Background(), suite.manager, suite.chunkManager) initcore.InitRemoteChunkManager(paramtable.Get()) // Data @@ -751,7 +751,7 @@ func (suite *SegmentLoaderDetailSuite) SetupTest() { ctx := context.Background() chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), suite.rootPath) suite.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(ctx) - suite.loader = NewLoader(suite.manager, suite.chunkManager) + suite.loader = NewLoader(context.Background(), suite.manager, suite.chunkManager) initcore.InitRemoteChunkManager(paramtable.Get()) // Data @@ -948,7 +948,7 @@ func (suite *SegmentLoaderV2Suite) SetupTest() { // fmt.Sprintf("/tmp/milvus-ut/%d", rand.Int63()))) chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), suite.rootPath) suite.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(ctx) - suite.loader = NewLoaderV2(suite.manager, suite.chunkManager) + suite.loader = NewLoaderV2(context.Background(), suite.manager, suite.chunkManager) initcore.InitRemoteChunkManager(paramtable.Get()) // Data diff --git a/internal/querynodev2/server.go b/internal/querynodev2/server.go index b6c806950c728..c4bd320b3b1be 100644 --- a/internal/querynodev2/server.go +++ b/internal/querynodev2/server.go @@ -342,9 +342,9 @@ func (node *QueryNode) Init() error { node.unsubscribingChannels = typeutil.NewConcurrentSet[string]() node.manager = segments.NewManager() if paramtable.Get().CommonCfg.EnableStorageV2.GetAsBool() { - node.loader = segments.NewLoaderV2(node.manager, node.chunkManager) + node.loader = segments.NewLoaderV2(node.ctx, node.manager, node.chunkManager) } else { - node.loader = segments.NewLoader(node.manager, node.chunkManager) + node.loader = segments.NewLoader(node.ctx, node.manager, node.chunkManager) } node.manager.SetLoader(node.loader) node.dispClient = msgdispatcher.NewClient(node.factory, typeutil.QueryNodeRole, node.GetNodeID()) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index dc16159f53d1b..a7652a676e7d6 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2446,6 +2446,7 @@ type queryNodeConfig struct { // loader IoPoolSize ParamItem `refreshable:"false"` DeltaDataExpansionRate ParamItem `refreshable:"true"` + DiskSizeFetchInterval ParamItem `refreshable:"false"` // schedule task policy. SchedulePolicyName ParamItem `refreshable:"false"` @@ -3045,6 +3046,14 @@ Max read concurrency must greater than or equal to 1, and less than or equal to } p.DeltaDataExpansionRate.Init(base.mgr) + p.DiskSizeFetchInterval = ParamItem{ + Key: "querynode.diskSizeFetchInterval", + Version: "2.5.0", + DefaultValue: "60", + Doc: "The time interval in seconds for retrieving disk usage.", + } + p.DiskSizeFetchInterval.Init(base.mgr) + // schedule read task policy. p.SchedulePolicyName = ParamItem{ Key: "queryNode.scheduler.scheduleReadPolicy.name", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 685b75df2aa26..9d39b01a130f3 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -462,6 +462,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, "/var/lib/milvus/data/mmap", Params.MmapDirPath.GetValue()) assert.Equal(t, true, Params.MmapChunkCache.GetAsBool()) + assert.Equal(t, 60*time.Second, Params.DiskSizeFetchInterval.GetAsDuration(time.Second)) }) t.Run("test dataCoordConfig", func(t *testing.T) {