diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 06112093c9058..cab94190c1cce 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -328,6 +328,20 @@ func (m *meta) GetHealthySegment(segID UniqueID) *SegmentInfo { return nil } +// Get segments By filter function +func (m *meta) GetSegments(segIDs []UniqueID, filterFunc SegmentInfoSelector) []UniqueID { + m.RLock() + defer m.RUnlock() + var result []UniqueID + for _, id := range segIDs { + segment := m.segments.GetSegment(id) + if segment != nil && filterFunc(segment) { + result = append(result, id) + } + } + return result +} + // GetSegment returns segment info with provided id // include the unhealthy segment // if not segment is found, nil will be returned diff --git a/internal/datacoord/segment_manager.go b/internal/datacoord/segment_manager.go index 20ce2ea3f48ee..2bded38eca1e7 100644 --- a/internal/datacoord/segment_manager.go +++ b/internal/datacoord/segment_manager.go @@ -557,24 +557,16 @@ func (s *SegmentManager) SealAllSegments(ctx context.Context, collectionID Uniqu if len(segIDs) != 0 { segCandidates = segIDs } - for _, id := range segCandidates { - info := s.meta.GetHealthySegment(id) - if info == nil { - log.Warn("failed to get seg info from meta", zap.Int64("segmentID", id)) - continue - } - if info.CollectionID != collectionID { - continue - } - // idempotent sealed - if info.State == commonpb.SegmentState_Sealed { - ret = append(ret, id) - continue - } - // segment can be sealed only if it is growing. - if info.State != commonpb.SegmentState_Growing { - continue - } + + sealedSegments := s.meta.GetSegments(segCandidates, func(segment *SegmentInfo) bool { + return segment.CollectionID == collectionID && isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Sealed + }) + growingSegments := s.meta.GetSegments(segCandidates, func(segment *SegmentInfo) bool { + return segment.CollectionID == collectionID && isSegmentHealthy(segment) && segment.State == commonpb.SegmentState_Growing + }) + ret = append(ret, sealedSegments...) + + for _, id := range growingSegments { if err := s.meta.SetState(id, commonpb.SegmentState_Sealed); err != nil { return nil, err }