Skip to content

Commit

Permalink
fix: cherry pick bug fix for orphan meta and metrics (#34643)
Browse files Browse the repository at this point in the history
issue: #34545 #34041
pr: #34609 #34010 #34592

---------

Signed-off-by: jaime <[email protected]>
Co-authored-by: edward.zeng <[email protected]>
  • Loading branch information
jaime0815 and LoveEachDay authored Jul 12, 2024
1 parent bd974f3 commit 9a1fabb
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 14 deletions.
47 changes: 40 additions & 7 deletions internal/datacoord/garbage_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand Down Expand Up @@ -156,6 +157,7 @@ func (gc *garbageCollector) work(ctx context.Context) {
defer gc.wg.Done()
gc.runRecycleTaskWithPauser(ctx, "meta", gc.option.checkInterval, func(ctx context.Context) {
gc.recycleDroppedSegments(ctx)
gc.recycleChannelCPMeta(ctx)
gc.recycleUnusedIndexes(ctx)
gc.recycleUnusedSegIndexes(ctx)
})
Expand Down Expand Up @@ -473,16 +475,47 @@ func (gc *garbageCollector) recycleDroppedSegments(ctx context.Context) {
continue
}
log.Info("GC segment meta drop segment done")
}
}

if segList := gc.meta.GetSegmentsByChannel(segInsertChannel); len(segList) == 0 &&
!gc.meta.catalog.ChannelExists(context.Background(), segInsertChannel) {
log.Info("empty channel found during gc, manually cleanup channel checkpoints", zap.String("vChannel", segInsertChannel))
// TODO: remove channel checkpoint may be lost, need to be handled before segment GC?
if err := gc.meta.DropChannelCheckpoint(segInsertChannel); err != nil {
log.Warn("failed to drop channel check point during segment garbage collection", zap.String("vchannel", segInsertChannel), zap.Error(err))
}
func (gc *garbageCollector) recycleChannelCPMeta(ctx context.Context) {
channelCPs, err := gc.meta.catalog.ListChannelCheckpoint(ctx)
if err != nil {
log.Warn("list channel cp fail during GC", zap.Error(err))
return
}

collectionID2GcStatus := make(map[int64]bool)
skippedCnt := 0

log.Info("start to GC channel cp", zap.Int("vchannelCnt", len(channelCPs)))
for vChannel := range channelCPs {
collectionID := funcutil.GetCollectionIDFromVChannel(vChannel)

// !!! Skip to GC if vChannel format is illegal, it will lead meta leak in this case
if collectionID == -1 {
skippedCnt++
log.Warn("parse collection id fail, skip to gc channel cp", zap.String("vchannel", vChannel))
continue
}

if _, ok := collectionID2GcStatus[collectionID]; !ok {
collectionID2GcStatus[collectionID] = gc.meta.catalog.GcConfirm(ctx, collectionID, -1)
}

// Skip to GC if all segments meta of the corresponding collection are not removed
if gcConfirmed, _ := collectionID2GcStatus[collectionID]; !gcConfirmed {
skippedCnt++
continue
}

if err := gc.meta.DropChannelCheckpoint(vChannel); err != nil {
// Try to GC in the next gc cycle if drop channel cp meta fail.
log.Warn("failed to drop channel check point during gc", zap.String("vchannel", vChannel), zap.Error(err))
}
}

log.Info("GC channel cp done", zap.Int("skippedChannelCP", skippedCnt))
}

func (gc *garbageCollector) isExpire(dropts Timestamp) bool {
Expand Down
48 changes: 48 additions & 0 deletions internal/datacoord/garbage_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1469,6 +1469,54 @@ func TestGarbageCollector_clearETCD(t *testing.T) {
assert.Nil(t, segB)
}

func TestGarbageCollector_recycleChannelMeta(t *testing.T) {
catalog := catalogmocks.NewDataCoordCatalog(t)

m := &meta{
catalog: catalog,
channelCPs: newChannelCps(),
}

m.channelCPs.checkpoints = map[string]*msgpb.MsgPosition{
"cluster-id-rootcoord-dm_0_123v0": nil,
"cluster-id-rootcoord-dm_0_124v0": nil,
}

gc := newGarbageCollector(m, newMockHandlerWithMeta(m), GcOption{})

t.Run("list channel cp fail", func(t *testing.T) {
catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(nil, errors.New("mock error")).Once()
gc.recycleChannelCPMeta(context.TODO())
assert.Equal(t, 2, len(m.channelCPs.checkpoints))
})

catalog.EXPECT().ListChannelCheckpoint(mock.Anything).Return(map[string]*msgpb.MsgPosition{
"cluster-id-rootcoord-dm_0_123v0": nil,
"cluster-id-rootcoord-dm_0_invalidedCollectionIDv0": nil,
"cluster-id-rootcoord-dm_0_124v0": nil,
}, nil).Twice()

catalog.EXPECT().GcConfirm(mock.Anything, mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, collectionID int64, i2 int64) bool {
if collectionID == 123 {
return true
}
return false
})

t.Run("drop channel cp fail", func(t *testing.T) {
catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once()
gc.recycleChannelCPMeta(context.TODO())
assert.Equal(t, 2, len(m.channelCPs.checkpoints))
})

t.Run("gc ok", func(t *testing.T) {
catalog.EXPECT().DropChannelCheckpoint(mock.Anything, mock.Anything).Return(nil).Once()
gc.recycleChannelCPMeta(context.TODO())
assert.Equal(t, 1, len(m.channelCPs.checkpoints))
})
}

func TestGarbageCollector_removeObjectPool(t *testing.T) {
paramtable.Init()
cm := mocks.NewChunkManager(t)
Expand Down
13 changes: 10 additions & 3 deletions internal/rootcoord/meta_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func (mt *MetaTable) reload() error {

metrics.RootCoordNumOfCollections.Reset()
metrics.RootCoordNumOfPartitions.Reset()
metrics.RootCoordNumOfDatabases.Set(0)

// recover databases.
dbs, err := mt.catalog.ListDatabases(mt.ctx, typeutil.MaxTimestamp)
Expand Down Expand Up @@ -184,6 +185,7 @@ func (mt *MetaTable) reload() error {
}
}

metrics.RootCoordNumOfDatabases.Inc()
metrics.RootCoordNumOfCollections.WithLabelValues(dbName).Add(float64(collectionNum))
log.Info("collections recovered from db", zap.String("db_name", dbName),
zap.Int64("collection_num", collectionNum),
Expand Down Expand Up @@ -255,7 +257,11 @@ func (mt *MetaTable) CreateDatabase(ctx context.Context, db *model.Database, ts
mt.ddLock.Lock()
defer mt.ddLock.Unlock()

return mt.createDatabasePrivate(ctx, db, ts)
if err := mt.createDatabasePrivate(ctx, db, ts); err != nil {
return err
}
metrics.RootCoordNumOfDatabases.Inc()
return nil
}

func (mt *MetaTable) createDatabasePrivate(ctx context.Context, db *model.Database, ts typeutil.Timestamp) error {
Expand All @@ -271,8 +277,8 @@ func (mt *MetaTable) createDatabasePrivate(ctx context.Context, db *model.Databa
mt.names.createDbIfNotExist(dbName)
mt.aliases.createDbIfNotExist(dbName)
mt.dbName2Meta[dbName] = db
log.Ctx(ctx).Info("create database", zap.String("db", dbName), zap.Uint64("ts", ts))

log.Ctx(ctx).Info("create database", zap.String("db", dbName), zap.Uint64("ts", ts))
return nil
}

Expand Down Expand Up @@ -322,8 +328,9 @@ func (mt *MetaTable) DropDatabase(ctx context.Context, dbName string, ts typeuti
mt.names.dropDb(dbName)
mt.aliases.dropDb(dbName)
delete(mt.dbName2Meta, dbName)
log.Ctx(ctx).Info("drop database", zap.String("db", dbName), zap.Uint64("ts", ts))

metrics.RootCoordNumOfDatabases.Dec()
log.Ctx(ctx).Info("drop database", zap.String("db", dbName), zap.Uint64("ts", ts))
return nil
}

Expand Down
2 changes: 0 additions & 2 deletions internal/rootcoord/root_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,6 @@ func (c *Core) CreateDatabase(ctx context.Context, in *milvuspb.CreateDatabaseRe

metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordNumOfDatabases.Inc()
log.Ctx(ctx).Info("done to create database", zap.String("role", typeutil.RootCoordRole),
zap.String("dbName", in.GetDbName()),
zap.Int64("msgID", in.GetBase().GetMsgID()), zap.Uint64("ts", t.GetTs()))
Expand Down Expand Up @@ -912,7 +911,6 @@ func (c *Core) DropDatabase(ctx context.Context, in *milvuspb.DropDatabaseReques

metrics.RootCoordDDLReqCounter.WithLabelValues(method, metrics.SuccessLabel).Inc()
metrics.RootCoordDDLReqLatency.WithLabelValues(method).Observe(float64(tr.ElapseSpan().Milliseconds()))
metrics.RootCoordNumOfDatabases.Dec()
metrics.CleanupRootCoordDBMetrics(in.GetDbName())
log.Ctx(ctx).Info("done to drop database", zap.String("role", typeutil.RootCoordRole),
zap.String("dbName", in.GetDbName()), zap.Int64("msgID", in.GetBase().GetMsgID()),
Expand Down
4 changes: 2 additions & 2 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ const (

var (
// buckets involves durations in milliseconds,
// [1 2 4 8 16 32 64 128 256 512 1024 2048 4096 8192 16384 32768 65536 1.31072e+05]
buckets = prometheus.ExponentialBuckets(1, 2, 18)
// [5, 10, 20, 50, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1500, 3000, 10000, 60000]
buckets = []float64{5, 10, 20, 50, 100, 200, 300, 400, 500, 600, 700, 800, 900, 1000, 1500, 3000, 10000, 60000}

// longTaskBuckets provides long task duration in milliseconds
longTaskBuckets = []float64{1, 100, 500, 1000, 5000, 10000, 20000, 50000, 100000, 250000, 500000, 1000000, 3600000, 5000000, 10000000} // unit milliseconds
Expand Down
13 changes: 13 additions & 0 deletions pkg/util/funcutil/func.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"net"
"reflect"
"regexp"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -229,6 +230,18 @@ func ConvertChannelName(chanName string, tokenFrom string, tokenTo string) (stri
return strings.Replace(chanName, tokenFrom, tokenTo, 1), nil
}

func GetCollectionIDFromVChannel(vChannelName string) int64 {
re := regexp.MustCompile(`.*_(\d+)v\d+`)
matches := re.FindStringSubmatch(vChannelName)
if len(matches) > 1 {
number, err := strconv.ParseInt(matches[1], 0, 64)
if err == nil {
return number
}
}
return -1
}

func getNumRowsOfScalarField(datas interface{}) uint64 {
realTypeDatas := reflect.ValueOf(datas)
return uint64(realTypeDatas.Len())
Expand Down

0 comments on commit 9a1fabb

Please sign in to comment.