Skip to content

Commit

Permalink
fix: Fix inaccurate general count (#38524)
Browse files Browse the repository at this point in the history
Checking general count should only count healthy collections and
partitions.

issue: #37630

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Dec 17, 2024
1 parent b18a3cf commit 1aa31e2
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 14 deletions.
92 changes: 92 additions & 0 deletions internal/rootcoord/constrant_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rootcoord

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/metastore/model"
pb "github.com/milvus-io/milvus/internal/proto/etcdpb"
mocktso "github.com/milvus-io/milvus/internal/tso/mocks"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

func TestCheckGeneralCapacity(t *testing.T) {
ctx := context.Background()

catalog := mocks.NewRootCoordCatalog(t)
catalog.EXPECT().ListDatabases(mock.Anything, mock.Anything).Return(nil, nil)
catalog.EXPECT().ListCollections(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
catalog.EXPECT().ListAliases(mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
catalog.EXPECT().CreateDatabase(mock.Anything, mock.Anything, mock.Anything).Return(nil)

allocator := mocktso.NewAllocator(t)
allocator.EXPECT().GenerateTSO(mock.Anything).Return(1000, nil)

meta, err := NewMetaTable(ctx, catalog, allocator)
assert.NoError(t, err)
core := newTestCore(withMeta(meta))
assert.Equal(t, 0, meta.GetGeneralCount(ctx))

Params.Save(Params.RootCoordCfg.MaxGeneralCapacity.Key, "512")
defer Params.Reset(Params.RootCoordCfg.MaxGeneralCapacity.Key)

assert.Equal(t, 0, meta.GetGeneralCount(ctx))
err = checkGeneralCapacity(ctx, 1, 2, 256, core)
assert.NoError(t, err)
assert.Equal(t, 0, meta.GetGeneralCount(ctx))
err = checkGeneralCapacity(ctx, 2, 4, 256, core)
assert.Error(t, err)

catalog.EXPECT().CreateCollection(mock.Anything, mock.Anything, mock.Anything).Return(nil)
err = meta.CreateDatabase(ctx, &model.Database{}, typeutil.MaxTimestamp)
assert.NoError(t, err)
err = meta.AddCollection(ctx, &model.Collection{
CollectionID: 1,
State: pb.CollectionState_CollectionCreating,
ShardsNum: 256,
Partitions: []*model.Partition{
{PartitionID: 100, State: pb.PartitionState_PartitionCreated},
{PartitionID: 200, State: pb.PartitionState_PartitionCreated},
},
})
assert.NoError(t, err)

assert.Equal(t, 0, meta.GetGeneralCount(ctx))
err = checkGeneralCapacity(ctx, 1, 2, 256, core)
assert.NoError(t, err)

catalog.EXPECT().AlterCollection(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
err = meta.ChangeCollectionState(ctx, 1, pb.CollectionState_CollectionCreated, typeutil.MaxTimestamp)
assert.NoError(t, err)

assert.Equal(t, 512, meta.GetGeneralCount(ctx))
err = checkGeneralCapacity(ctx, 1, 1, 1, core)
assert.Error(t, err)

err = meta.ChangeCollectionState(ctx, 1, pb.CollectionState_CollectionDropping, typeutil.MaxTimestamp)
assert.NoError(t, err)

assert.Equal(t, 0, meta.GetGeneralCount(ctx))
err = checkGeneralCapacity(ctx, 1, 2, 256, core)
assert.NoError(t, err)
}
30 changes: 16 additions & 14 deletions internal/rootcoord/meta_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,12 @@ func (mt *MetaTable) reload() error {
collection.DBName = dbName
}
mt.collID2Meta[collection.CollectionID] = collection
mt.generalCnt += len(collection.Partitions) * int(collection.ShardsNum)
if collection.Available() {
mt.names.insert(dbName, collection.Name, collection.CollectionID)
pn := collection.GetPartitionNum(true)
mt.generalCnt += pn * int(collection.ShardsNum)
collectionNum++
partitionNum += int64(collection.GetPartitionNum(true))
partitionNum += int64(pn)
}
}

Expand Down Expand Up @@ -243,8 +244,10 @@ func (mt *MetaTable) reloadWithNonDatabase() error {
mt.collID2Meta[collection.CollectionID] = collection
if collection.Available() {
mt.names.insert(util.DefaultDBName, collection.Name, collection.CollectionID)
pn := collection.GetPartitionNum(true)
mt.generalCnt += pn * int(collection.ShardsNum)
collectionNum++
partitionNum += int64(collection.GetPartitionNum(true))
partitionNum += int64(pn)
}
}

Expand Down Expand Up @@ -428,8 +431,6 @@ func (mt *MetaTable) AddCollection(ctx context.Context, coll *model.Collection)
mt.collID2Meta[coll.CollectionID] = coll.Clone()
mt.names.insert(db.Name, coll.Name, coll.CollectionID)

mt.generalCnt += len(coll.Partitions) * int(coll.ShardsNum)

log.Ctx(ctx).Info("add collection to meta table",
zap.Int64("dbID", coll.DBID),
zap.String("collection", coll.Name),
Expand Down Expand Up @@ -460,13 +461,17 @@ func (mt *MetaTable) ChangeCollectionState(ctx context.Context, collectionID Uni
return fmt.Errorf("dbID not found for collection:%d", collectionID)
}

pn := coll.GetPartitionNum(true)

switch state {
case pb.CollectionState_CollectionCreated:
mt.generalCnt += pn * int(coll.ShardsNum)
metrics.RootCoordNumOfCollections.WithLabelValues(db.Name).Inc()
metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(coll.GetPartitionNum(true)))
default:
metrics.RootCoordNumOfPartitions.WithLabelValues().Add(float64(pn))
case pb.CollectionState_CollectionDropping:
mt.generalCnt -= pn * int(coll.ShardsNum)
metrics.RootCoordNumOfCollections.WithLabelValues(db.Name).Dec()
metrics.RootCoordNumOfPartitions.WithLabelValues().Sub(float64(coll.GetPartitionNum(true)))
metrics.RootCoordNumOfPartitions.WithLabelValues().Sub(float64(pn))
}

log.Ctx(ctx).Info("change collection state", zap.Int64("collection", collectionID),
Expand Down Expand Up @@ -534,8 +539,6 @@ func (mt *MetaTable) RemoveCollection(ctx context.Context, collectionID UniqueID
mt.removeAllNamesIfMatchedInternal(collectionID, allNames)
mt.removeCollectionByIDInternal(collectionID)

mt.generalCnt -= len(coll.Partitions) * int(coll.ShardsNum)

log.Ctx(ctx).Info("remove collection",
zap.Int64("dbID", coll.DBID),
zap.String("name", coll.Name),
Expand Down Expand Up @@ -942,8 +945,6 @@ func (mt *MetaTable) AddPartition(ctx context.Context, partition *model.Partitio
}
mt.collID2Meta[partition.CollectionID].Partitions = append(mt.collID2Meta[partition.CollectionID].Partitions, partition.Clone())

mt.generalCnt += int(coll.ShardsNum) // 1 partition * shardNum

log.Ctx(ctx).Info("add partition to meta table",
zap.Int64("collection", partition.CollectionID), zap.String("partition", partition.PartitionName),
zap.Int64("partitionid", partition.PartitionID), zap.Uint64("ts", partition.PartitionCreatedTimestamp))
Expand Down Expand Up @@ -971,9 +972,11 @@ func (mt *MetaTable) ChangePartitionState(ctx context.Context, collectionID Uniq

switch state {
case pb.PartitionState_PartitionCreated:
mt.generalCnt += int(coll.ShardsNum) // 1 partition * shardNum
// support Dynamic load/release partitions
metrics.RootCoordNumOfPartitions.WithLabelValues().Inc()
default:
case pb.PartitionState_PartitionDropping:
mt.generalCnt -= int(coll.ShardsNum) // 1 partition * shardNum
metrics.RootCoordNumOfPartitions.WithLabelValues().Dec()
}

Expand Down Expand Up @@ -1008,7 +1011,6 @@ func (mt *MetaTable) RemovePartition(ctx context.Context, dbID int64, collection
}
if loc != -1 {
coll.Partitions = append(coll.Partitions[:loc], coll.Partitions[loc+1:]...)
mt.generalCnt -= int(coll.ShardsNum) // 1 partition * shardNum
}
log.Ctx(ctx).Info("remove partition", zap.Int64("collection", collectionID), zap.Int64("partition", partitionID), zap.Uint64("ts", ts))
return nil
Expand Down

0 comments on commit 1aa31e2

Please sign in to comment.