Skip to content

Commit

Permalink
enhance: Put release segment and other misc cgo call into pool
Browse files Browse the repository at this point in the history
Related to #30273

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia committed Dec 3, 2024
1 parent 6f1b1ad commit 544b8f7
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 13 deletions.
26 changes: 17 additions & 9 deletions internal/querynodev2/segments/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,20 +1272,25 @@ func (s *LocalSegment) Release(ctx context.Context, opts ...releaseOption) {
return
}

C.DeleteSegment(ptr)

localDiskUsage, err := segcore.GetLocalUsedSize(context.Background(), paramtable.Get().LocalStorageCfg.Path.GetValue())
// ignore error here, shall not block releasing
if err == nil {
metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localDiskUsage) / 1024 / 1024) // in MB
}
GetDynamicPool().Submit(func() (any, error) {
C.DeleteSegment(ptr)
localDiskUsage, err := segcore.GetLocalUsedSize(context.Background(), paramtable.Get().LocalStorageCfg.Path.GetValue())
// ignore error here, shall not block releasing
if err == nil {
metrics.QueryNodeDiskUsedSize.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(localDiskUsage) / 1024 / 1024) // in MB
}
return nil, nil
}).Await()

log.Info("delete segment from memory")
}

// ReleaseSegmentData releases the segment data.
func (s *LocalSegment) ReleaseSegmentData() {
C.ClearSegmentData(s.ptr)
GetDynamicPool().Submit(func() (any, error) {
C.ClearSegmentData(s.ptr)
return nil, nil
}).Await()
for _, indexInfo := range s.Indexes() {
indexInfo.IsLoaded = false
}
Expand All @@ -1309,7 +1314,10 @@ func (s *LocalSegment) startRelease(scope ReleaseScope) state.LoadStateLockGuard
}

func (s *LocalSegment) RemoveFieldFile(fieldId int64) {
C.RemoveFieldFile(s.ptr, C.int64_t(fieldId))
GetDynamicPool().Submit(func() (any, error) {
C.RemoveFieldFile(s.ptr, C.int64_t(fieldId))
return nil, nil
}).Await()

Check warning on line 1320 in internal/querynodev2/segments/segment.go

View check run for this annotation

Codecov / codecov/patch

internal/querynodev2/segments/segment.go#L1317-L1320

Added lines #L1317 - L1320 were not covered by tests
}

func (s *LocalSegment) RemoveUnusedFieldFiles() error {
Expand Down
7 changes: 5 additions & 2 deletions internal/querynodev2/segments/segment_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1479,8 +1479,11 @@ func getResourceUsageEstimateOfSegment(schema *schemapb.CollectionSchema, loadIn
if fieldIndexInfo, ok := fieldID2IndexInfo[fieldID]; ok {
var estimateResult ResourceEstimate
err := GetCLoadInfoWithFunc(ctx, fieldSchema, loadInfo, fieldIndexInfo, func(c *LoadIndexInfo) error {
loadResourceRequest := C.EstimateLoadIndexResource(c.cLoadIndexInfo)
estimateResult = GetResourceEstimate(&loadResourceRequest)
GetDynamicPool().Submit(func() (any, error) {
loadResourceRequest := C.EstimateLoadIndexResource(c.cLoadIndexInfo)
estimateResult = GetResourceEstimate(&loadResourceRequest)
return nil, nil
}).Await()
return nil
})
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions internal/querynodev2/segments/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,8 +183,12 @@ func mergeRequestCost(requestCosts []*internalpb.CostAggregation) *internalpb.Co
}

func getIndexEngineVersion() (minimal, current int32) {
cMinimal, cCurrent := C.GetMinimalIndexVersion(), C.GetCurrentIndexVersion()
return int32(cMinimal), int32(cCurrent)
GetDynamicPool().Submit(func() (any, error) {
cMinimal, cCurrent := C.GetMinimalIndexVersion(), C.GetCurrentIndexVersion()
minimal, current = int32(cMinimal), int32(cCurrent)
return nil, nil
}).Await()
return minimal, current

Check warning on line 191 in internal/querynodev2/segments/utils.go

View check run for this annotation

Codecov / codecov/patch

internal/querynodev2/segments/utils.go#L186-L191

Added lines #L186 - L191 were not covered by tests
}

// getSegmentMetricLabel returns the label for segment metrics.
Expand Down

0 comments on commit 544b8f7

Please sign in to comment.