diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index 08b3cd54769f4..4ea6accc14dda 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -156,6 +157,7 @@ func (gc *garbageCollector) work(ctx context.Context) { defer gc.wg.Done() gc.runRecycleTaskWithPauser(ctx, "meta", gc.option.checkInterval, func(ctx context.Context) { gc.recycleDroppedSegments(ctx) + gc.recycleChannelCPMeta(ctx) gc.recycleUnusedIndexes(ctx) gc.recycleUnusedSegIndexes(ctx) gc.recycleUnusedAnalyzeFiles() @@ -474,16 +476,47 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) { continue } log.Info("GC segment meta drop segment done") + } +} - if segList := gc.meta.GetSegmentsByChannel(segInsertChannel); len(segList) == 0 && - !gc.meta.catalog.ChannelExists(context.Background(), segInsertChannel) { - log.Info("empty channel found during gc, manually cleanup channel checkpoints", zap.String("vChannel", segInsertChannel)) - // TODO: remove channel checkpoint may be lost, need to be handled before segment GC? - if err := gc.meta.DropChannelCheckpoint(segInsertChannel); err != nil { - log.Warn("failed to drop channel check point during segment garbage collection", zap.String("vchannel", segInsertChannel), zap.Error(err)) - } +func (gc *garbageCollector) recycleChannelCPMeta(ctx context.Context) { + channelCPs, err := gc.meta.catalog.ListChannelCheckpoint(ctx) + if err != nil { + log.Warn("list channel cp fail during GC", zap.Error(err)) + return + } + + collectionID2GcStatus := make(map[int64]bool) + skippedCnt := 0 + + log.Info("start to GC channel cp", zap.Int("vchannelCnt", len(channelCPs))) + for vChannel := range channelCPs { + collectionID := funcutil.GetCollectionIDFromVChannel(vChannel) + + // !!! Skip to GC if vChannel format is illegal, it will lead meta leak in this case + if collectionID == -1 { + skippedCnt++ + log.Warn("parse collection id fail, skip to gc channel cp", zap.String("vchannel", vChannel)) + continue + } + + if _, ok := collectionID2GcStatus[collectionID]; !ok { + collectionID2GcStatus[collectionID] = gc.meta.catalog.GcConfirm(ctx, collectionID, -1) + } + + // Skip to GC if all segments meta of the corresponding collection are not removed + if gcConfirmed, _ := collectionID2GcStatus[collectionID]; !gcConfirmed { + skippedCnt++ + continue + } + + if err := gc.meta.DropChannelCheckpoint(vChannel); err != nil { + // Try to GC in the next gc cycle if drop channel cp meta fail. + log.Warn("failed to drop channel check point during gc", zap.String("vchannel", vChannel), zap.Error(err)) } } + + log.Info("GC channel cp done", zap.Int("skippedChannelCP", skippedCnt)) } func (gc *garbageCollector) isExpire(dropts Timestamp) bool { diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 93a96f7e37429..e64ca2522ec34 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -1476,6 +1476,54 @@ func TestGarbageCollector_clearETCD(t *testing.T) { assert.Nil(t, segB) } +func TestGarbageCollector_recycleChannelMeta(t *testing.T) { + catalog := catalogmocks.NewDataCoordCatalog(t) + + m := &meta{ + catalog: catalog, + channelCPs: newChannelCps(), + } + + m.channelCPs.checkpoints = map[string]*msgpb.MsgPosition{ + "cluster-id-rootcoord-dm_0_123v0": nil, + "cluster-id-rootcoord-dm_0_124v0": nil, + } + + gc := newGarbageCollector(m, newMockHandlerWithMeta(m), GcOption{}) + + t.Run("list channel cp fail", func(t *testing.T) { + catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, errors.New("mock error")).Once() + gc.recycleChannelCPMeta(context.TODO()) + assert.Equal(t, 2, len(m.channelCPs.checkpoints)) + }) + + catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(map[string]*msgpb.MsgPosition{ + "cluster-id-rootcoord-dm_0_123v0": nil, + "cluster-id-rootcoord-dm_0_invalidedCollectionIDv0": nil, + "cluster-id-rootcoord-dm_0_124v0": nil, + }, nil).Twice() + + catalog.EXPECT().GcConfirm(mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, collectionID int64, i2 int64) bool { + if collectionID == 123 { + return true + } + return false + }) + + t.Run("drop channel cp fail", func(t *testing.T) { + catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once() + gc.recycleChannelCPMeta(context.TODO()) + assert.Equal(t, 2, len(m.channelCPs.checkpoints)) + }) + + t.Run("gc ok", func(t *testing.T) { + catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Once() + gc.recycleChannelCPMeta(context.TODO()) + assert.Equal(t, 1, len(m.channelCPs.checkpoints)) + }) +} + func TestGarbageCollector_removeObjectPool(t *testing.T) { paramtable.Init() cm := mocks.NewChunkManager(t) diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index bfc52c1a1d39d..7f65fb825f6d2 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -269,11 +269,11 @@ func CheckCheckPointsHealth(meta *meta) error { for channel, cp := range meta.GetChannelCheckpoints() { collectionID := funcutil.GetCollectionIDFromVChannel(channel) if collectionID == -1 { - log.Warn("can't parse collection id from vchannel, skip check cp lag", zap.String("vchannel", channel)) + log.RatedWarn(60, "can't parse collection id from vchannel, skip check cp lag", zap.String("vchannel", channel)) continue } if meta.GetCollection(collectionID) == nil { - log.Warn("corresponding the collection doesn't exists, skip check cp lag", zap.String("vchannel", channel)) + log.RatedWarn(60, "corresponding the collection doesn't exists, skip check cp lag", zap.String("vchannel", channel)) continue } ts, _ := tsoutil.ParseTS(cp.Timestamp)