Skip to content

Commit

Permalink
fix: add GetSegments optimization to avoid meta mutex competition (mi…
Browse files Browse the repository at this point in the history
…lvus-io#31025)

milvus-io#30835

Signed-off-by: luzhang <[email protected]>
Co-authored-by: luzhang <[email protected]>
  • Loading branch information
zhagnlu and luzhang authored Mar 5, 2024
1 parent 4bda6c3 commit b9775a1
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 18 deletions.
14 changes: 14 additions & 0 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,20 @@ func (m *meta) GetHealthySegment(segID UniqueID) *SegmentInfo {
return nil
}

// Get segments By filter function
func (m *meta) GetSegments(segIDs []UniqueID, filterFunc SegmentInfoSelector) []UniqueID {
m.RLock()
defer m.RUnlock()
var result []UniqueID
for _, id := range segIDs {
segment := m.segments.GetSegment(id)
if segment != nil && filterFunc(segment) {
result = append(result, id)
}
}
return result
}

// GetSegment returns segment info with provided id
// include the unhealthy segment
// if not segment is found, nil will be returned
Expand Down
28 changes: 10 additions & 18 deletions internal/datacoord/segment_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,24 +557,16 @@ func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID Uniqu
if len(segIDs) != 0 {
segCandidates = segIDs
}
for _, id := range segCandidates {
info := s.meta.GetHealthySegment(id)
if info == nil {
log.Warn("failed to get seg info from meta", zap.Int64("segmentID", id))
continue
}
if info.CollectionID != collectionID {
continue
}
// idempotent sealed
if info.State == commonpb.SegmentState_Sealed {
ret = append(ret, id)
continue
}
// segment can be sealed only if it is growing.
if info.State != commonpb.SegmentState_Growing {
continue
}

sealedSegments := s.meta.GetSegments(segCandidates, func(segment *SegmentInfo) bool {
return segment.CollectionID == collectionID && isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Sealed
})
growingSegments := s.meta.GetSegments(segCandidates, func(segment *SegmentInfo) bool {
return segment.CollectionID == collectionID && isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Growing
})
ret = append(ret, sealedSegments...)

for _, id := range growingSegments {
if err := s.meta.SetState(id, commonpb.SegmentState_Sealed); err != nil {
return nil, err
}
Expand Down

0 comments on commit b9775a1

Please sign in to comment.