Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/milvus-io/milvus into 241…
Browse files Browse the repository at this point in the history
…2-opt-meta-mutex
  • Loading branch information
bigsheeper committed Dec 5, 2024
2 parents 3a2dbaa + 618f0cb commit 43ec0aa
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 14 deletions.
2 changes: 1 addition & 1 deletion internal/proxy/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3708,7 +3708,7 @@ func TestDefaultPartition(t *testing.T) {
chMgr := newChannelsMgrImpl(dmlChannelsFunc, nil, factory)
defer chMgr.removeAllDMLStream()

_, err = chMgr.getOrCreateDmlStream(collectionID)
_, err = chMgr.getOrCreateDmlStream(ctx, collectionID)
assert.NoError(t, err)
pchans, err := chMgr.getChannels(collectionID)
assert.NoError(t, err)
Expand Down
1 change: 1 addition & 0 deletions internal/querynodev2/delegator/delta_forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func (sd *shardDelegator) forwardL0RemoteLoad(ctx context.Context,
LoadScope: querypb.LoadScope_Delta,
Schema: req.GetSchema(),
IndexInfoList: req.GetIndexInfoList(),
Version: req.GetVersion(),
})
}

Expand Down
26 changes: 17 additions & 9 deletions internal/querynodev2/segments/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,20 +1272,25 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) {
return
}

C.DeleteSegment(ptr)

localDiskUsage, err := segcore.GetLocalUsedSize(context.Background(), paramtable.Get().LocalStorageCfg.Path.GetValue())
// ignore error here, shall not block releasing
if err == nil {
metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localDiskUsage) / 1024 / 1024) // in MB
}
GetDynamicPool().Submit(func() (any, error) {
C.DeleteSegment(ptr)
localDiskUsage, err := segcore.GetLocalUsedSize(context.Background(), paramtable.Get().LocalStorageCfg.Path.GetValue())
// ignore error here, shall not block releasing
if err == nil {
metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localDiskUsage) / 1024 / 1024) // in MB
}
return nil, nil
}).Await()

log.Info("delete segment from memory")
}

// ReleaseSegmentData releases the segment data.
func (s *LocalSegment) ReleaseSegmentData() {
C.ClearSegmentData(s.ptr)
GetDynamicPool().Submit(func() (any, error) {
C.ClearSegmentData(s.ptr)
return nil, nil
}).Await()
for _, indexInfo := range s.Indexes() {
indexInfo.IsLoaded = false
}
Expand All @@ -1309,7 +1314,10 @@ func (s *LocalSegment) startRelease(scope ReleaseScope) state.LoadStateLockGuard
}

func (s *LocalSegment) RemoveFieldFile(fieldId int64) {
C.RemoveFieldFile(s.ptr, C.int64_t(fieldId))
GetDynamicPool().Submit(func() (any, error) {
C.RemoveFieldFile(s.ptr, C.int64_t(fieldId))
return nil, nil
}).Await()
}

func (s *LocalSegment) RemoveUnusedFieldFiles() error {
Expand Down
7 changes: 5 additions & 2 deletions internal/querynodev2/segments/segment_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1479,8 +1479,11 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn
if fieldIndexInfo, ok := fieldID2IndexInfo[fieldID]; ok {
var estimateResult ResourceEstimate
err := GetCLoadInfoWithFunc(ctx, fieldSchema, loadInfo, fieldIndexInfo, func(c *LoadIndexInfo) error {
loadResourceRequest := C.EstimateLoadIndexResource(c.cLoadIndexInfo)
estimateResult = GetResourceEstimate(&loadResourceRequest)
GetDynamicPool().Submit(func() (any, error) {
loadResourceRequest := C.EstimateLoadIndexResource(c.cLoadIndexInfo)
estimateResult = GetResourceEstimate(&loadResourceRequest)
return nil, nil
}).Await()
return nil
})
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions internal/querynodev2/segments/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,12 @@ func mergeRequestCost(requestCosts []*internalpb.CostAggregation) *internalpb.Co
}

func getIndexEngineVersion() (minimal, current int32) {
cMinimal, cCurrent := C.GetMinimalIndexVersion(), C.GetCurrentIndexVersion()
return int32(cMinimal), int32(cCurrent)
GetDynamicPool().Submit(func() (any, error) {
cMinimal, cCurrent := C.GetMinimalIndexVersion(), C.GetCurrentIndexVersion()
minimal, current = int32(cMinimal), int32(cCurrent)
return nil, nil
}).Await()
return minimal, current
}

// getSegmentMetricLabel returns the label for segment metrics.
Expand Down

0 comments on commit 43ec0aa

Please sign in to comment.