Skip to content

Commit

Permalink
fix: load operation when segment is on releasing (milvus-io#31340)
Browse files Browse the repository at this point in the history
issue: milvus-io#30857

---------

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Jun 14, 2024
1 parent 86a36b1 commit 9b69601
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 23 deletions.
85 changes: 67 additions & 18 deletions internal/querynodev2/segments/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ type SegmentManager interface {
Remove(ctx context.Context, segmentID typeutil.UniqueID, scope querypb.DataScope) (int, int)
RemoveBy(ctx context.Context, filters ...SegmentFilter) (int, int)
Clear(ctx context.Context)

// Deprecated: quick fix critical issue: #30857
// TODO: All Segment assigned to querynode should be managed by SegmentManager, including loading or releasing to perform a transaction.
Exist(segmentID typeutil.UniqueID, typ SegmentType) bool
}

var _ SegmentManager = (*segmentManager)(nil)
Expand All @@ -296,14 +300,18 @@ type segmentManager struct {

// releaseCallback is the callback function when a segment is released.
releaseCallback func(s Segment)

growingOnReleasingSegments typeutil.UniqueSet
sealedOnReleasingSegments typeutil.UniqueSet
}

func NewSegmentManager() *segmentManager {
mgr := &segmentManager{
growingSegments: make(map[int64]Segment),
sealedSegments: make(map[int64]Segment),
return &segmentManager{
growingSegments: make(map[int64]Segment),
sealedSegments: make(map[int64]Segment),
growingOnReleasingSegments: typeutil.NewUniqueSet(),
sealedOnReleasingSegments: typeutil.NewUniqueSet(),
}
return mgr
}

func (mgr *segmentManager) Put(ctx context.Context, segmentType SegmentType, segments ...Segment) {
Expand Down Expand Up @@ -354,7 +362,7 @@ func (mgr *segmentManager) Put(ctx context.Context, segmentType SegmentType, seg
if len(replacedSegment) > 0 {
go func() {
for _, segment := range replacedSegment {
mgr.remove(ctx, segment)
mgr.release(ctx, segment)
}
}()
}
Expand All @@ -374,6 +382,29 @@ func (mgr *segmentManager) UpdateBy(action SegmentAction, filters ...SegmentFilt
return updated
}

// Deprecated:
// TODO: All Segment assigned to querynode should be managed by SegmentManager, including loading or releasing to perform a transaction.
func (mgr *segmentManager) Exist(segmentID typeutil.UniqueID, typ SegmentType) bool {
mgr.mu.RLock()
defer mgr.mu.RUnlock()
switch typ {
case SegmentTypeGrowing:
if _, ok := mgr.growingSegments[segmentID]; ok {
return true
} else if mgr.growingOnReleasingSegments.Contain(segmentID) {
return true
}
case SegmentTypeSealed:
if _, ok := mgr.sealedSegments[segmentID]; ok {
return true
} else if mgr.sealedOnReleasingSegments.Contain(segmentID) {
return true
}
}

return false
}

func (mgr *segmentManager) Get(segmentID typeutil.UniqueID) Segment {
mgr.mu.RLock()
defer mgr.mu.RUnlock()
Expand Down Expand Up @@ -639,11 +670,11 @@ func (mgr *segmentManager) Remove(ctx context.Context, segmentID typeutil.Unique
mgr.mu.Unlock()

if growing != nil {
mgr.remove(ctx, growing)
mgr.release(ctx, growing)
}

if sealed != nil {
mgr.remove(ctx, sealed)
mgr.release(ctx, sealed)
}

return removeGrowing, removeSealed
Expand All @@ -655,13 +686,15 @@ func (mgr *segmentManager) removeSegmentWithType(typ SegmentType, segmentID type
s, ok := mgr.growingSegments[segmentID]
if ok {
delete(mgr.growingSegments, segmentID)
mgr.growingOnReleasingSegments.Insert(segmentID)
return s
}

case SegmentTypeSealed:
s, ok := mgr.sealedSegments[segmentID]
if ok {
delete(mgr.sealedSegments, segmentID)
mgr.sealedOnReleasingSegments.Insert(segmentID)
return s
}
default:
Expand Down Expand Up @@ -694,26 +727,34 @@ func (mgr *segmentManager) RemoveBy(ctx context.Context, filters ...SegmentFilte
mgr.mu.Unlock()

for _, s := range removeSegments {
mgr.remove(ctx, s)
mgr.release(ctx, s)
}

return removeGrowing, removeSealed
}

func (mgr *segmentManager) Clear(ctx context.Context) {
mgr.mu.Lock()
defer mgr.mu.Unlock()

for id, segment := range mgr.growingSegments {
delete(mgr.growingSegments, id)
mgr.remove(ctx, segment)
for id := range mgr.growingSegments {
mgr.growingOnReleasingSegments.Insert(id)
}
growingWaitForRelease := mgr.growingSegments
mgr.growingSegments = make(map[int64]Segment)

for id, segment := range mgr.sealedSegments {
delete(mgr.sealedSegments, id)
mgr.remove(ctx, segment)
for id := range mgr.sealedSegments {
mgr.sealedOnReleasingSegments.Insert(id)
}
sealedWaitForRelease := mgr.sealedSegments
mgr.sealedSegments = make(map[int64]Segment)
mgr.updateMetric()
mgr.mu.Unlock()

for _, segment := range growingWaitForRelease {
mgr.release(ctx, segment)
}
for _, segment := range sealedWaitForRelease {
mgr.release(ctx, segment)
}
}

// registerReleaseCallback registers the callback function when a segment is released.
Expand Down Expand Up @@ -741,7 +782,7 @@ func (mgr *segmentManager) updateMetric() {
metrics.QueryNodeNumPartitions.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Set(float64(partiations.Len()))
}

func (mgr *segmentManager) remove(ctx context.Context, segment Segment) bool {
func (mgr *segmentManager) release(ctx context.Context, segment Segment) {
if mgr.releaseCallback != nil {
mgr.releaseCallback(segment)
log.Ctx(ctx).Info("remove segment from cache", zap.Int64("segmentID", segment.ID()))
Expand All @@ -757,5 +798,13 @@ func (mgr *segmentManager) remove(ctx context.Context, segment Segment) bool {
segment.Level().String(),
).Dec()

return true
mgr.mu.Lock()
defer mgr.mu.Unlock()

switch segment.Type() {
case SegmentTypeGrowing:
mgr.growingOnReleasingSegments.Remove(segment.ID())
case SegmentTypeSealed:
mgr.sealedOnReleasingSegments.Remove(segment.ID())
}
}
14 changes: 14 additions & 0 deletions internal/querynodev2/segments/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (s *ManagerSuite) SetupSuite() {

func (s *ManagerSuite) SetupTest() {
s.mgr = NewSegmentManager()
s.segments = nil

for i, id := range s.segmentIDs {
schema := GenTestCollectionSchema("manager-suite", schemapb.DataType_Int64, true)
Expand All @@ -66,6 +67,19 @@ func (s *ManagerSuite) SetupTest() {
}
}

func (s *ManagerSuite) TestExist() {
for _, segment := range s.segments {
s.True(s.mgr.Exist(segment.ID(), segment.Type()))
s.mgr.removeSegmentWithType(segment.Type(), segment.ID())
s.True(s.mgr.Exist(segment.ID(), segment.Type()))
s.mgr.release(context.Background(), segment)
s.False(s.mgr.Exist(segment.ID(), segment.Type()))
}

s.False(s.mgr.Exist(10086, SegmentTypeGrowing))
s.False(s.mgr.Exist(10086, SegmentTypeSealed))
}

func (s *ManagerSuite) TestGetBy() {
for i, partitionID := range s.partitionIDs {
segments := s.mgr.GetBy(WithPartition(partitionID))
Expand Down
43 changes: 43 additions & 0 deletions internal/querynodev2/segments/mock_segment_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/querynodev2/segments/segment_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,8 +761,8 @@ func (loader *segmentLoader) prepare(ctx context.Context, segmentType SegmentTyp
// filter out loaded & loading segments
infos := make([]*querypb.SegmentLoadInfo, 0, len(segments))
for _, segment := range segments {
// Not loaded & loading
if len(loader.manager.Segment.GetBy(WithType(segmentType), WithID(segment.GetSegmentID()))) == 0 &&
// Not loaded & loading & releasing.
if !loader.manager.Segment.Exist(segment.GetSegmentID(), segmentType) &&
!loader.loadingSegments.Contain(segment.GetSegmentID()) {
infos = append(infos, segment)
loader.loadingSegments.Insert(segment.GetSegmentID(), newLoadResult())
Expand Down
6 changes: 3 additions & 3 deletions internal/querynodev2/segments/segment_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() {
idx := 0

var infos []*querypb.SegmentLoadInfo
suite.segmentManager.EXPECT().GetBy(mock.Anything, mock.Anything).Return(nil)
suite.segmentManager.EXPECT().Exist(mock.Anything, mock.Anything).Return(false)
suite.segmentManager.EXPECT().GetWithType(suite.segmentID, SegmentTypeSealed).RunAndReturn(func(segmentID int64, segmentType commonpb.SegmentState) Segment {
defer func() { idx++ }()
if idx == 0 {
Expand Down Expand Up @@ -802,7 +802,7 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() {

var idx int
var infos []*querypb.SegmentLoadInfo
suite.segmentManager.EXPECT().GetBy(mock.Anything, mock.Anything).Return(nil)
suite.segmentManager.EXPECT().Exist(mock.Anything, mock.Anything).Return(false)
suite.segmentManager.EXPECT().GetWithType(suite.segmentID, SegmentTypeSealed).RunAndReturn(func(segmentID int64, segmentType commonpb.SegmentState) Segment {
defer func() { idx++ }()
if idx == 0 {
Expand All @@ -829,7 +829,7 @@ func (suite *SegmentLoaderDetailSuite) TestWaitSegmentLoadDone() {
suite.Run("wait_timeout", func() {
suite.SetupTest()

suite.segmentManager.EXPECT().GetBy(mock.Anything, mock.Anything).Return(nil)
suite.segmentManager.EXPECT().Exist(mock.Anything, mock.Anything).Return(false)
suite.segmentManager.EXPECT().GetWithType(suite.segmentID, SegmentTypeSealed).RunAndReturn(func(segmentID int64, segmentType commonpb.SegmentState) Segment {
return nil
})
Expand Down

0 comments on commit 9b69601

Please sign in to comment.