Skip to content

Commit

Permalink
fix: encountering orphan channel-cp meta after DataCoord GC (#34612)
Browse files Browse the repository at this point in the history
issue: #34545

Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 authored Jul 11, 2024
1 parent d7966f4 commit a08a0c8
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 9 deletions.
47 changes: 40 additions & 7 deletions internal/datacoord/garbage_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand Down Expand Up @@ -156,6 +157,7 @@ func (gc *garbageCollector) work(ctx context.Context) {
defer gc.wg.Done()
gc.runRecycleTaskWithPauser(ctx, "meta", gc.option.checkInterval, func(ctx context.Context) {
gc.recycleDroppedSegments(ctx)
gc.recycleChannelCPMeta(ctx)
gc.recycleUnusedIndexes(ctx)
gc.recycleUnusedSegIndexes(ctx)
gc.recycleUnusedAnalyzeFiles()
Expand Down Expand Up @@ -474,16 +476,47 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) {
continue
}
log.Info("GC segment meta drop segment done")
}
}

if segList := gc.meta.GetSegmentsByChannel(segInsertChannel); len(segList) == 0 &&
!gc.meta.catalog.ChannelExists(context.Background(), segInsertChannel) {
log.Info("empty channel found during gc, manually cleanup channel checkpoints", zap.String("vChannel", segInsertChannel))
// TODO: remove channel checkpoint may be lost, need to be handled before segment GC?
if err := gc.meta.DropChannelCheckpoint(segInsertChannel); err != nil {
log.Warn("failed to drop channel check point during segment garbage collection", zap.String("vchannel", segInsertChannel), zap.Error(err))
}
func (gc *garbageCollector) recycleChannelCPMeta(ctx context.Context) {
channelCPs, err := gc.meta.catalog.ListChannelCheckpoint(ctx)
if err != nil {
log.Warn("list channel cp fail during GC", zap.Error(err))
return
}

collectionID2GcStatus := make(map[int64]bool)
skippedCnt := 0

log.Info("start to GC channel cp", zap.Int("vchannelCnt", len(channelCPs)))
for vChannel := range channelCPs {
collectionID := funcutil.GetCollectionIDFromVChannel(vChannel)

// !!! Skip to GC if vChannel format is illegal, it will lead meta leak in this case
if collectionID == -1 {
skippedCnt++
log.Warn("parse collection id fail, skip to gc channel cp", zap.String("vchannel", vChannel))
continue
}

if _, ok := collectionID2GcStatus[collectionID]; !ok {
collectionID2GcStatus[collectionID] = gc.meta.catalog.GcConfirm(ctx, collectionID, -1)
}

// Skip to GC if all segments meta of the corresponding collection are not removed
if gcConfirmed, _ := collectionID2GcStatus[collectionID]; !gcConfirmed {
skippedCnt++
continue
}

if err := gc.meta.DropChannelCheckpoint(vChannel); err != nil {
// Try to GC in the next gc cycle if drop channel cp meta fail.
log.Warn("failed to drop channel check point during gc", zap.String("vchannel", vChannel), zap.Error(err))
}
}

log.Info("GC channel cp done", zap.Int("skippedChannelCP", skippedCnt))
}

func (gc *garbageCollector) isExpire(dropts Timestamp) bool {
Expand Down
48 changes: 48 additions & 0 deletions internal/datacoord/garbage_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1476,6 +1476,54 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
assert.Nil(t, segB)
}

func TestGarbageCollector_recycleChannelMeta(t *testing.T) {
catalog := catalogmocks.NewDataCoordCatalog(t)

m := &meta{
catalog: catalog,
channelCPs: newChannelCps(),
}

m.channelCPs.checkpoints = map[string]*msgpb.MsgPosition{
"cluster-id-rootcoord-dm_0_123v0": nil,
"cluster-id-rootcoord-dm_0_124v0": nil,
}

gc := newGarbageCollector(m, newMockHandlerWithMeta(m), GcOption{})

t.Run("list channel cp fail", func(t *testing.T) {
catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, errors.New("mock error")).Once()
gc.recycleChannelCPMeta(context.TODO())
assert.Equal(t, 2, len(m.channelCPs.checkpoints))
})

catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(map[string]*msgpb.MsgPosition{
"cluster-id-rootcoord-dm_0_123v0": nil,
"cluster-id-rootcoord-dm_0_invalidedCollectionIDv0": nil,
"cluster-id-rootcoord-dm_0_124v0": nil,
}, nil).Twice()

catalog.EXPECT().GcConfirm(mock.Anything, mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, collectionID int64, i2 int64) bool {
if collectionID == 123 {
return true
}
return false
})

t.Run("drop channel cp fail", func(t *testing.T) {
catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once()
gc.recycleChannelCPMeta(context.TODO())
assert.Equal(t, 2, len(m.channelCPs.checkpoints))
})

t.Run("gc ok", func(t *testing.T) {
catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Once()
gc.recycleChannelCPMeta(context.TODO())
assert.Equal(t, 1, len(m.channelCPs.checkpoints))
})
}

func TestGarbageCollector_removeObjectPool(t *testing.T) {
paramtable.Init()
cm := mocks.NewChunkManager(t)
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,11 +282,11 @@ func CheckCheckPointsHealth(meta *meta) error {
for channel, cp := range meta.GetChannelCheckpoints() {
collectionID := funcutil.GetCollectionIDFromVChannel(channel)
if collectionID == -1 {
log.Warn("can't parse collection id from vchannel, skip check cp lag", zap.String("vchannel", channel))
log.RatedWarn(60, "can't parse collection id from vchannel, skip check cp lag", zap.String("vchannel", channel))
continue
}
if meta.GetCollection(collectionID) == nil {
log.Warn("corresponding the collection doesn't exists, skip check cp lag", zap.String("vchannel", channel))
log.RatedWarn(60, "corresponding the collection doesn't exists, skip check cp lag", zap.String("vchannel", channel))
continue
}
ts, _ := tsoutil.ParseTS(cp.Timestamp)
Expand Down

0 comments on commit a08a0c8

Please sign in to comment.