diff --git a/internal/datacoord/garbage_collector.go b/internal/datacoord/garbage_collector.go index b62312d55d1c4..e24bd7155b0a4 100644 --- a/internal/datacoord/garbage_collector.go +++ b/internal/datacoord/garbage_collector.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/conc" + "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" "github.com/milvus-io/milvus/pkg/util/paramtable" @@ -156,6 +157,7 @@ func (gc *garbageCollector) work(ctx context.Context) { defer gc.wg.Done() gc.runRecycleTaskWithPauser(ctx, "meta", gc.option.checkInterval, func(ctx context.Context) { gc.recycleDroppedSegments(ctx) + gc.recycleChannelCPMeta(ctx) gc.recycleUnusedIndexes(ctx) gc.recycleUnusedSegIndexes(ctx) }) @@ -473,16 +475,47 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) { continue } log.Info("GC segment meta drop segment done") + } +} - if segList := gc.meta.GetSegmentsByChannel(segInsertChannel); len(segList) == 0 && - !gc.meta.catalog.ChannelExists(context.Background(), segInsertChannel) { - log.Info("empty channel found during gc, manually cleanup channel checkpoints", zap.String("vChannel", segInsertChannel)) - // TODO: remove channel checkpoint may be lost, need to be handled before segment GC? - if err := gc.meta.DropChannelCheckpoint(segInsertChannel); err != nil { - log.Warn("failed to drop channel check point during segment garbage collection", zap.String("vchannel", segInsertChannel), zap.Error(err)) - } +func (gc *garbageCollector) recycleChannelCPMeta(ctx context.Context) { + channelCPs, err := gc.meta.catalog.ListChannelCheckpoint(ctx) + if err != nil { + log.Warn("list channel cp fail during GC", zap.Error(err)) + return + } + + collectionID2GcStatus := make(map[int64]bool) + skippedCnt := 0 + + log.Info("start to GC channel cp", zap.Int("vchannelCnt", len(channelCPs))) + for vChannel := range channelCPs { + collectionID := funcutil.GetCollectionIDFromVChannel(vChannel) + + // !!! Skip to GC if vChannel format is illegal, it will lead meta leak in this case + if collectionID == -1 { + skippedCnt++ + log.Warn("parse collection id fail, skip to gc channel cp", zap.String("vchannel", vChannel)) + continue + } + + if _, ok := collectionID2GcStatus[collectionID]; !ok { + collectionID2GcStatus[collectionID] = gc.meta.catalog.GcConfirm(ctx, collectionID, -1) + } + + // Skip to GC if all segments meta of the corresponding collection are not removed + if gcConfirmed, _ := collectionID2GcStatus[collectionID]; !gcConfirmed { + skippedCnt++ + continue + } + + if err := gc.meta.DropChannelCheckpoint(vChannel); err != nil { + // Try to GC in the next gc cycle if drop channel cp meta fail. + log.Warn("failed to drop channel check point during gc", zap.String("vchannel", vChannel), zap.Error(err)) } } + + log.Info("GC channel cp done", zap.Int("skippedChannelCP", skippedCnt)) } func (gc *garbageCollector) isExpire(dropts Timestamp) bool { diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index a81003cda7bec..7780f183bc058 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -1469,6 +1469,54 @@ func TestGarbageCollector_clearETCD(t *testing.T) { assert.Nil(t, segB) } +func TestGarbageCollector_recycleChannelMeta(t *testing.T) { + catalog := catalogmocks.NewDataCoordCatalog(t) + + m := &meta{ + catalog: catalog, + channelCPs: newChannelCps(), + } + + m.channelCPs.checkpoints = map[string]*msgpb.MsgPosition{ + "cluster-id-rootcoord-dm_0_123v0": nil, + "cluster-id-rootcoord-dm_0_124v0": nil, + } + + gc := newGarbageCollector(m, newMockHandlerWithMeta(m), GcOption{}) + + t.Run("list channel cp fail", func(t *testing.T) { + catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, errors.New("mock error")).Once() + gc.recycleChannelCPMeta(context.TODO()) + assert.Equal(t, 2, len(m.channelCPs.checkpoints)) + }) + + catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(map[string]*msgpb.MsgPosition{ + "cluster-id-rootcoord-dm_0_123v0": nil, + "cluster-id-rootcoord-dm_0_invalidedCollectionIDv0": nil, + "cluster-id-rootcoord-dm_0_124v0": nil, + }, nil).Twice() + + catalog.EXPECT().GcConfirm(mock.Anything, mock.Anything, mock.Anything). + RunAndReturn(func(ctx context.Context, collectionID int64, i2 int64) bool { + if collectionID == 123 { + return true + } + return false + }) + + t.Run("drop channel cp fail", func(t *testing.T) { + catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once() + gc.recycleChannelCPMeta(context.TODO()) + assert.Equal(t, 2, len(m.channelCPs.checkpoints)) + }) + + t.Run("gc ok", func(t *testing.T) { + catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Once() + gc.recycleChannelCPMeta(context.TODO()) + assert.Equal(t, 1, len(m.channelCPs.checkpoints)) + }) +} + func TestGarbageCollector_removeObjectPool(t *testing.T) { paramtable.Init() cm := mocks.NewChunkManager(t) diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 3054276271827..49bca49368780 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -139,6 +139,7 @@ func (mt *MetaTable) reload() error { metrics.RootCoordNumOfCollections.Reset() metrics.RootCoordNumOfPartitions.Reset() + metrics.RootCoordNumOfDatabases.Set(0) // recover databases. dbs, err := mt.catalog.ListDatabases(mt.ctx, typeutil.MaxTimestamp) @@ -184,6 +185,7 @@ func (mt *MetaTable) reload() error { } } + metrics.RootCoordNumOfDatabases.Inc() metrics.RootCoordNumOfCollections.WithLabelValues(dbName).Add(float64(collectionNum)) log.Info("collections recovered from db", zap.String("db_name", dbName), zap.Int64("collection_num", collectionNum), @@ -255,7 +257,11 @@ func (mt *MetaTable) CreateDatabase(ctx context.Context, db *model.Database, ts mt.ddLock.Lock() defer mt.ddLock.Unlock() - return mt.createDatabasePrivate(ctx, db, ts) + if err := mt.createDatabasePrivate(ctx, db, ts); err != nil { + return err + } + metrics.RootCoordNumOfDatabases.Inc() + return nil } func (mt *MetaTable) createDatabasePrivate(ctx context.Context, db *model.Database, ts typeutil.Timestamp) error { @@ -271,8 +277,8 @@ func (mt *MetaTable) createDatabasePrivate(ctx context.Context, db *model.Databa mt.names.createDbIfNotExist(dbName) mt.aliases.createDbIfNotExist(dbName) mt.dbName2Meta[dbName] = db - log.Ctx(ctx).Info("create database", zap.String("db", dbName), zap.Uint64("ts", ts)) + log.Ctx(ctx).Info("create database", zap.String("db", dbName), zap.Uint64("ts", ts)) return nil } @@ -322,8 +328,9 @@ func (mt *MetaTable) DropDatabase(ctx context.Context, dbName string, ts typeuti mt.names.dropDb(dbName) mt.aliases.dropDb(dbName) delete(mt.dbName2Meta, dbName) - log.Ctx(ctx).Info("drop database", zap.String("db", dbName), zap.Uint64("ts", ts)) + metrics.RootCoordNumOfDatabases.Dec() + log.Ctx(ctx).Info("drop database", zap.String("db", dbName), zap.Uint64("ts", ts)) return nil } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 2f40efa440ddf..62f6cef9fd17f 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -867,7 +867,6 @@ func (c *Core) CreateDatabase(ctx context.Context, in *milvuspb.CreateDatabaseRe metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) - metrics.RootCoordNumOfDatabases.Inc() log.Ctx(ctx).Info("done to create database", zap.String("role", typeutil.RootCoordRole), zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs())) @@ -912,7 +911,6 @@ func (c *Core) DropDatabase(ctx context.Context, in *milvuspb.DropDatabaseReques metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc() metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds())) - metrics.RootCoordNumOfDatabases.Dec() metrics.CleanupRootCoordDBMetrics(in.GetDbName()) log.Ctx(ctx).Info("done to drop database", zap.String("role", typeutil.RootCoordRole), zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()), diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index c062b2a7c021e..7ff3a104bb973 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -115,8 +115,8 @@ const ( var ( // buckets involves durations in milliseconds, - // [1 2 4 8 16 32 64 128 256 512 1024 2048 4096 8192 16384 32768 65536 1.31072e+05] - buckets = prometheus.ExponentialBuckets(1, 2, 18) + // [5, 10, 20, 50, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1500, 3000, 10000, 60000] + buckets = []float64{5, 10, 20, 50, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1500, 3000, 10000, 60000} // longTaskBuckets provides long task duration in milliseconds longTaskBuckets = []float64{1, 100, 500, 1000, 5000, 10000, 20000, 50000, 100000, 250000, 500000, 1000000, 3600000, 5000000, 10000000} // unit milliseconds diff --git a/pkg/util/funcutil/func.go b/pkg/util/funcutil/func.go index 3a1ca42e614af..0d89f4ec4032e 100644 --- a/pkg/util/funcutil/func.go +++ b/pkg/util/funcutil/func.go @@ -24,6 +24,7 @@ import ( "fmt" "net" "reflect" + "regexp" "strconv" "strings" "time" @@ -229,6 +230,18 @@ func ConvertChannelName(chanName string, tokenFrom string, tokenTo string) (stri return strings.Replace(chanName, tokenFrom, tokenTo, 1), nil } +func GetCollectionIDFromVChannel(vChannelName string) int64 { + re := regexp.MustCompile(`.*_(\d+)v\d+`) + matches := re.FindStringSubmatch(vChannelName) + if len(matches) > 1 { + number, err := strconv.ParseInt(matches[1], 0, 64) + if err == nil { + return number + } + } + return -1 +} + func getNumRowsOfScalarField(datas interface{}) uint64 { realTypeDatas := reflect.ValueOf(datas) return uint64(realTypeDatas.Len())