From b478956f4b45909f65b20ebe65269dc9cdeb4402 Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Tue, 17 Dec 2024 16:20:10 +0800 Subject: [PATCH 1/3] fix: Fix inaccurate general count Signed-off-by: bigsheeper --- internal/rootcoord/constrant_test.go | 89 ++++++++++++++++++++++++++++ internal/rootcoord/meta_table.go | 26 ++++---- 2 files changed, 103 insertions(+), 12 deletions(-) create mode 100644 internal/rootcoord/constrant_test.go diff --git a/internal/rootcoord/constrant_test.go b/internal/rootcoord/constrant_test.go new file mode 100644 index 0000000000000..b49fdee5a7a95 --- /dev/null +++ b/internal/rootcoord/constrant_test.go @@ -0,0 +1,89 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package rootcoord + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "github.com/milvus-io/milvus/internal/metastore/mocks" + "github.com/milvus-io/milvus/internal/metastore/model" + pb "github.com/milvus-io/milvus/internal/proto/etcdpb" + mocktso "github.com/milvus-io/milvus/internal/tso/mocks" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +func TestCheckGeneralCapacity(t *testing.T) { + ctx := context.Background() + + catalog := mocks.NewRootCoordCatalog(t) + catalog.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(nil, nil) + catalog.EXPECT().ListCollections(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + catalog.EXPECT().ListAliases(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) + catalog.EXPECT().CreateDatabase(mock.Anything, mock.Anything, mock.Anything).Return(nil) + + allocator := mocktso.NewAllocator(t) + allocator.EXPECT().GenerateTSO(mock.Anything).Return(1000, nil) + + meta, err := NewMetaTable(ctx, catalog, allocator) + assert.NoError(t, err) + core := newTestCore(withMeta(meta)) + assert.Equal(t, 0, meta.GetGeneralCount(ctx)) + + Params.Save(Params.RootCoordCfg.MaxGeneralCapacity.Key, "512") + defer Params.Reset(Params.RootCoordCfg.MaxGeneralCapacity.Key) + + assert.Equal(t, 0, meta.GetGeneralCount(ctx)) + err = checkGeneralCapacity(ctx, 1, 2, 256, core) + assert.NoError(t, err) + assert.Equal(t, 0, meta.GetGeneralCount(ctx)) + err = checkGeneralCapacity(ctx, 2, 4, 256, core) + assert.Error(t, err) + + catalog.EXPECT().CreateCollection(mock.Anything, mock.Anything, mock.Anything).Return(nil) + err = meta.CreateDatabase(ctx, &model.Database{}, typeutil.MaxTimestamp) + err = meta.AddCollection(ctx, &model.Collection{CollectionID: 1, + State: pb.CollectionState_CollectionCreating, + ShardsNum: 256, + Partitions: []*model.Partition{ + {PartitionID: 100, State: pb.PartitionState_PartitionCreated}, + {PartitionID: 200, State: pb.PartitionState_PartitionCreated}, + }}) + assert.NoError(t, err) + + assert.Equal(t, 0, meta.GetGeneralCount(ctx)) + err = checkGeneralCapacity(ctx, 1, 2, 256, core) + assert.NoError(t, err) + + catalog.EXPECT().AlterCollection(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) + err = meta.ChangeCollectionState(ctx, 1, pb.CollectionState_CollectionCreated, typeutil.MaxTimestamp) + assert.NoError(t, err) + + assert.Equal(t, 512, meta.GetGeneralCount(ctx)) + err = checkGeneralCapacity(ctx, 1, 1, 1, core) + assert.Error(t, err) + + err = meta.ChangeCollectionState(ctx, 1, pb.CollectionState_CollectionDropped, typeutil.MaxTimestamp) + assert.NoError(t, err) + + assert.Equal(t, 0, meta.GetGeneralCount(ctx)) + err = checkGeneralCapacity(ctx, 1, 2, 256, core) + assert.NoError(t, err) +} diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 363be64441b2d..3b1b1a793500e 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -190,11 +190,12 @@ func (mt *MetaTable) reload() error { } for _, collection := range collections { mt.collID2Meta[collection.CollectionID] = collection - mt.generalCnt += len(collection.Partitions) * int(collection.ShardsNum) if collection.Available() { mt.names.insert(dbName, collection.Name, collection.CollectionID) + pn := collection.GetPartitionNum(true) + mt.generalCnt += pn * int(collection.ShardsNum) collectionNum++ - partitionNum += int64(collection.GetPartitionNum(true)) + partitionNum += int64(pn) } } @@ -234,8 +235,10 @@ func (mt *MetaTable) reloadWithNonDatabase() error { mt.collID2Meta[collection.CollectionID] = collection if collection.Available() { mt.names.insert(util.DefaultDBName, collection.Name, collection.CollectionID) + pn := collection.GetPartitionNum(true) + mt.generalCnt += pn * int(collection.ShardsNum) collectionNum++ - partitionNum += int64(collection.GetPartitionNum(true)) + partitionNum += int64(pn) } } @@ -419,8 +422,6 @@ func (mt *MetaTable) AddCollection(ctx context.Context, coll *model.Collection) mt.collID2Meta[coll.CollectionID] = coll.Clone() mt.names.insert(db.Name, coll.Name, coll.CollectionID) - mt.generalCnt += len(coll.Partitions) * int(coll.ShardsNum) - log.Ctx(ctx).Info("add collection to meta table", zap.Int64("dbID", coll.DBID), zap.String("collection", coll.Name), @@ -451,13 +452,17 @@ func (mt *MetaTable) ChangeCollectionState(ctx context.Context, collectionID Uni return fmt.Errorf("dbID not found for collection:%d", collectionID) } + pn := coll.GetPartitionNum(true) + switch state { case pb.CollectionState_CollectionCreated: + mt.generalCnt += pn * int(coll.ShardsNum) metrics.RootCoordNumOfCollections.WithLabelValues(db.Name).Inc() - metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(coll.GetPartitionNum(true))) + metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(pn)) default: + mt.generalCnt -= pn * int(coll.ShardsNum) metrics.RootCoordNumOfCollections.WithLabelValues(db.Name).Dec() - metrics.RootCoordNumOfPartitions.WithLabelValues().Sub(float64(coll.GetPartitionNum(true))) + metrics.RootCoordNumOfPartitions.WithLabelValues().Sub(float64(pn)) } log.Ctx(ctx).Info("change collection state", zap.Int64("collection", collectionID), @@ -525,8 +530,6 @@ func (mt *MetaTable) RemoveCollection(ctx context.Context, collectionID UniqueID mt.removeAllNamesIfMatchedInternal(collectionID, allNames) mt.removeCollectionByIDInternal(collectionID) - mt.generalCnt -= len(coll.Partitions) * int(coll.ShardsNum) - log.Ctx(ctx).Info("remove collection", zap.Int64("dbID", coll.DBID), zap.String("name", coll.Name), @@ -875,8 +878,6 @@ func (mt *MetaTable) AddPartition(ctx context.Context, partition *model.Partitio } mt.collID2Meta[partition.CollectionID].Partitions = append(mt.collID2Meta[partition.CollectionID].Partitions, partition.Clone()) - mt.generalCnt += int(coll.ShardsNum) // 1 partition * shardNum - log.Ctx(ctx).Info("add partition to meta table", zap.Int64("collection", partition.CollectionID), zap.String("partition", partition.PartitionName), zap.Int64("partitionid", partition.PartitionID), zap.Uint64("ts", partition.PartitionCreatedTimestamp)) @@ -904,9 +905,11 @@ func (mt *MetaTable) ChangePartitionState(ctx context.Context, collectionID Uniq switch state { case pb.PartitionState_PartitionCreated: + mt.generalCnt += int(coll.ShardsNum) // 1 partition * shardNum // support Dynamic load/release partitions metrics.RootCoordNumOfPartitions.WithLabelValues().Inc() default: + mt.generalCnt -= int(coll.ShardsNum) // 1 partition * shardNum metrics.RootCoordNumOfPartitions.WithLabelValues().Dec() } @@ -941,7 +944,6 @@ func (mt *MetaTable) RemovePartition(ctx context.Context, dbID int64, collection } if loc != -1 { coll.Partitions = append(coll.Partitions[:loc], coll.Partitions[loc+1:]...) - mt.generalCnt -= int(coll.ShardsNum) // 1 partition * shardNum } log.Info("remove partition", zap.Int64("collection", collectionID), zap.Int64("partition", partitionID), zap.Uint64("ts", ts)) return nil From 317e3e8f0cb21bcd4f5ef1f241ed3575d6ead2cb Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Tue, 17 Dec 2024 17:04:46 +0800 Subject: [PATCH 2/3] code format Signed-off-by: bigsheeper --- internal/rootcoord/constrant_test.go | 11 +++++++---- internal/rootcoord/meta_table.go | 4 ++-- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/internal/rootcoord/constrant_test.go b/internal/rootcoord/constrant_test.go index b49fdee5a7a95..ed68877551dba 100644 --- a/internal/rootcoord/constrant_test.go +++ b/internal/rootcoord/constrant_test.go @@ -59,13 +59,16 @@ func TestCheckGeneralCapacity(t *testing.T) { catalog.EXPECT().CreateCollection(mock.Anything, mock.Anything, mock.Anything).Return(nil) err = meta.CreateDatabase(ctx, &model.Database{}, typeutil.MaxTimestamp) - err = meta.AddCollection(ctx, &model.Collection{CollectionID: 1, - State: pb.CollectionState_CollectionCreating, - ShardsNum: 256, + assert.Error(t, err) + err = meta.AddCollection(ctx, &model.Collection{ + CollectionID: 1, + State: pb.CollectionState_CollectionCreating, + ShardsNum: 256, Partitions: []*model.Partition{ {PartitionID: 100, State: pb.PartitionState_PartitionCreated}, {PartitionID: 200, State: pb.PartitionState_PartitionCreated}, - }}) + }, + }) assert.NoError(t, err) assert.Equal(t, 0, meta.GetGeneralCount(ctx)) diff --git a/internal/rootcoord/meta_table.go b/internal/rootcoord/meta_table.go index 3b1b1a793500e..473e49ac9ac7f 100644 --- a/internal/rootcoord/meta_table.go +++ b/internal/rootcoord/meta_table.go @@ -459,7 +459,7 @@ func (mt *MetaTable) ChangeCollectionState(ctx context.Context, collectionID Uni mt.generalCnt += pn * int(coll.ShardsNum) metrics.RootCoordNumOfCollections.WithLabelValues(db.Name).Inc() metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(pn)) - default: + case pb.CollectionState_CollectionDropping: mt.generalCnt -= pn * int(coll.ShardsNum) metrics.RootCoordNumOfCollections.WithLabelValues(db.Name).Dec() metrics.RootCoordNumOfPartitions.WithLabelValues().Sub(float64(pn)) @@ -908,7 +908,7 @@ func (mt *MetaTable) ChangePartitionState(ctx context.Context, collectionID Uniq mt.generalCnt += int(coll.ShardsNum) // 1 partition * shardNum // support Dynamic load/release partitions metrics.RootCoordNumOfPartitions.WithLabelValues().Inc() - default: + case pb.PartitionState_PartitionDropping: mt.generalCnt -= int(coll.ShardsNum) // 1 partition * shardNum metrics.RootCoordNumOfPartitions.WithLabelValues().Dec() } From 0daee253c57d2c2a618c3176f0efe265fdb9efab Mon Sep 17 00:00:00 2001 From: bigsheeper Date: Tue, 17 Dec 2024 18:04:41 +0800 Subject: [PATCH 3/3] fix ut Signed-off-by: bigsheeper --- internal/rootcoord/constrant_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/rootcoord/constrant_test.go b/internal/rootcoord/constrant_test.go index ed68877551dba..6e8bb9e734ce0 100644 --- a/internal/rootcoord/constrant_test.go +++ b/internal/rootcoord/constrant_test.go @@ -59,7 +59,7 @@ func TestCheckGeneralCapacity(t *testing.T) { catalog.EXPECT().CreateCollection(mock.Anything, mock.Anything, mock.Anything).Return(nil) err = meta.CreateDatabase(ctx, &model.Database{}, typeutil.MaxTimestamp) - assert.Error(t, err) + assert.NoError(t, err) err = meta.AddCollection(ctx, &model.Collection{ CollectionID: 1, State: pb.CollectionState_CollectionCreating, @@ -83,7 +83,7 @@ func TestCheckGeneralCapacity(t *testing.T) { err = checkGeneralCapacity(ctx, 1, 1, 1, core) assert.Error(t, err) - err = meta.ChangeCollectionState(ctx, 1, pb.CollectionState_CollectionDropped, typeutil.MaxTimestamp) + err = meta.ChangeCollectionState(ctx, 1, pb.CollectionState_CollectionDropping, typeutil.MaxTimestamp) assert.NoError(t, err) assert.Equal(t, 0, meta.GetGeneralCount(ctx))