Skip to content

Commit

Permalink
reload loading collection when qc recover (milvus-io#27300)
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Sep 27, 2023
1 parent 5c5f9aa commit 4071132
Show file tree
Hide file tree
Showing 6 changed files with 473 additions and 343 deletions.
2 changes: 2 additions & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ message CollectionLoadInfo {
LoadStatus status = 4;
map<int64, int64> field_indexID = 5;
LoadType load_type = 6;
int32 recover_times = 7;
}

message PartitionLoadInfo {
Expand All @@ -547,6 +548,7 @@ message PartitionLoadInfo {
int32 replica_number = 3; // Deprecated: No longer used; kept for compatibility.
LoadStatus status = 4;
map<int64, int64> field_indexID = 5; // Deprecated: No longer used; kept for compatibility.
int32 recover_times = 7;
}

message Replica {
Expand Down
623 changes: 320 additions & 303 deletions internal/proto/querypb/query_coord.pb.go

Large diffs are not rendered by default.

59 changes: 38 additions & 21 deletions internal/querycoordv2/meta/collection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
. "github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand Down Expand Up @@ -140,16 +141,29 @@ func (m *CollectionManager) Recover(broker Broker) error {
ctxLog.Warn("failed to get collection schema", zap.Error(err))
return err
}
// Collections not loaded done should be deprecated
if collection.GetStatus() != querypb.LoadStatus_Loaded || collection.GetReplicaNumber() <= 0 {
ctxLog.Info("skip recovery and release collection",

if collection.GetReplicaNumber() <= 0 {
ctxLog.Info("skip recovery and release collection due to invalid replica number",
zap.Int64("collectionID", collection.GetCollectionID()),
zap.String("status", collection.GetStatus().String()),
zap.Int32("replicaNumber", collection.GetReplicaNumber()),
)
zap.Int32("replicaNumber", collection.GetReplicaNumber()))
m.catalog.ReleaseCollection(collection.GetCollectionID())
continue
}

if collection.GetStatus() != querypb.LoadStatus_Loaded {
if collection.RecoverTimes >= paramtable.Get().QueryCoordCfg.CollectionRecoverTimesLimit.GetAsInt32() {
m.catalog.ReleaseCollection(collection.CollectionID)
ctxLog.Info("recover loading collection times reach limit, release collection",
zap.Int64("collectionID", collection.CollectionID),
zap.Int32("recoverTimes", collection.RecoverTimes))
break
}
// update recoverTimes meta in etcd
collection.RecoverTimes += 1
m.putCollection(true, &Collection{CollectionLoadInfo: collection})
continue
}

m.collections[collection.CollectionID] = &Collection{
CollectionLoadInfo: collection,
}
Expand All @@ -176,32 +190,31 @@ func (m *CollectionManager) Recover(broker Broker) error {
})
if len(omitPartitions) > 0 {
ctxLog.Info("skip dropped partitions during recovery",
zap.Int64("collection", collection), zap.Int64s("partitions", omitPartitions))
zap.Int64("collection", collection),
zap.Int64s("partitions", omitPartitions))
m.catalog.ReleasePartition(collection, omitPartitions...)
}

sawLoaded := false
for _, partition := range partitions {
// Partitions not loaded done should be deprecated
if partition.GetStatus() != querypb.LoadStatus_Loaded {
log.Info("skip recovery and release partition",
zap.Int64("collectionID", collection),
zap.Int64("partitionID", partition.GetPartitionID()),
zap.String("status", partition.GetStatus().String()),
)
m.catalog.ReleasePartition(collection, partition.GetPartitionID())
if partition.RecoverTimes >= paramtable.Get().QueryCoordCfg.CollectionRecoverTimesLimit.GetAsInt32() {
m.catalog.ReleaseCollection(collection)
ctxLog.Info("recover loading partition times reach limit, release collection",
zap.Int64("collectionID", collection),
zap.Int32("recoverTimes", partition.RecoverTimes))
break
}

partition.RecoverTimes += 1
m.putPartition([]*Partition{{PartitionLoadInfo: partition}}, true)
continue
}

sawLoaded = true
m.partitions[partition.PartitionID] = &Partition{
PartitionLoadInfo: partition,
}
}

if !sawLoaded {
m.catalog.ReleaseCollection(collection)
}
}

err = m.upgradeRecover(broker)
Expand Down Expand Up @@ -496,6 +509,8 @@ func (m *CollectionManager) UpdateLoadPercent(partitionID int64, loadPercent int
if loadPercent == 100 {
savePartition = true
newPartition.Status = querypb.LoadStatus_Loaded
// if partition becomes loaded, clear it's recoverTimes in load info
newPartition.RecoverTimes = 0
elapsed := time.Since(newPartition.CreatedAt)
metrics.QueryCoordLoadLatency.WithLabelValues().Observe(float64(elapsed.Milliseconds()))
eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Partition %d loaded", partitionID)))
Expand All @@ -517,12 +532,14 @@ func (m *CollectionManager) UpdateLoadPercent(partitionID int64, loadPercent int
if collectionPercent == 100 {
saveCollection = true
newCollection.Status = querypb.LoadStatus_Loaded
elapsed := time.Since(newCollection.CreatedAt)

// if collection becomes loaded, clear it's recoverTimes in load info
newCollection.RecoverTimes = 0

// TODO: what if part of the collection has been unloaded? Now we decrease the metric only after
// `ReleaseCollection` is triggered. Maybe it's hard to make this metric really accurate.
metrics.QueryCoordNumCollections.WithLabelValues().Inc()

elapsed := time.Since(newCollection.CreatedAt)
metrics.QueryCoordLoadLatency.WithLabelValues().Observe(float64(elapsed.Milliseconds()))
eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("Collection %d loaded", newCollection.CollectionID)))
}
Expand Down
104 changes: 93 additions & 11 deletions internal/querycoordv2/meta/collection_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ import (
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/querypb"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand Down Expand Up @@ -328,16 +330,96 @@ func (suite *CollectionManagerSuite) TestRecover_normal() {
suite.clearMemory()
err := mgr.Recover(suite.broker)
suite.NoError(err)
for _, collection := range suite.collections {
suite.True(mgr.Exist(collection))
for _, partitionID := range suite.partitions[collection] {
partition := mgr.GetPartition(partitionID)
suite.NotNil(partition)
}
}
}

func (suite *CollectionManagerSuite) TestRecoverLoadingCollection() {
mgr := suite.mgr
suite.releaseAll()
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(nil, nil)
// test put collection with partitions
for i, collection := range suite.collections {
exist := suite.colLoadPercent[i] == 100
suite.Equal(exist, mgr.Exist(collection))
if !exist {
continue
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil).Maybe()
col := &Collection{
CollectionLoadInfo: &querypb.CollectionLoadInfo{
CollectionID: collection,
ReplicaNumber: suite.replicaNumber[i],
Status: querypb.LoadStatus_Loading,
LoadType: suite.loadTypes[i],
},
LoadPercentage: 0,
CreatedAt: time.Now(),
}
for j, partitionID := range suite.partitions[collection] {
partitions := lo.Map(suite.partitions[collection], func(partition int64, j int) *Partition {
return &Partition{
PartitionLoadInfo: &querypb.PartitionLoadInfo{
CollectionID: collection,
PartitionID: partition,
ReplicaNumber: suite.replicaNumber[i],
Status: querypb.LoadStatus_Loading,
},
LoadPercentage: 0,
CreatedAt: time.Now(),
}
})
err := suite.mgr.PutCollection(col, partitions...)
suite.NoError(err)
}

// recover for first time, expected recover success
suite.clearMemory()
err := mgr.Recover(suite.broker)
suite.NoError(err)
for _, collectionID := range suite.collections {
collection := mgr.GetCollection(collectionID)
suite.NotNil(collection)
suite.Equal(int32(1), collection.GetRecoverTimes())
for _, partitionID := range suite.partitions[collectionID] {
partition := mgr.GetPartition(partitionID)
exist = suite.parLoadPercent[collection][j] == 100
suite.Equal(exist, partition != nil)
suite.NotNil(partition)
suite.Equal(int32(1), partition.GetRecoverTimes())
}
}

// update load percent, then recover for second time
for _, collectionID := range suite.collections {
for _, partitionID := range suite.partitions[collectionID] {
mgr.UpdateLoadPercent(partitionID, 10)
}
}
suite.clearMemory()
err = mgr.Recover(suite.broker)
suite.NoError(err)
for _, collectionID := range suite.collections {
collection := mgr.GetCollection(collectionID)
suite.NotNil(collection)
suite.Equal(int32(2), collection.GetRecoverTimes())
for _, partitionID := range suite.partitions[collectionID] {
partition := mgr.GetPartition(partitionID)
suite.NotNil(partition)
suite.Equal(int32(2), partition.GetRecoverTimes())
}
}

// test recover loading collection reach limit
for i := 0; i < int(paramtable.Get().QueryCoordCfg.CollectionRecoverTimesLimit.GetAsInt32()); i++ {
log.Info("stupid", zap.Int("count", i))
suite.clearMemory()
err = mgr.Recover(suite.broker)
suite.NoError(err)
}
for _, collectionID := range suite.collections {
collection := mgr.GetCollection(collectionID)
suite.Nil(collection)
for _, partitionID := range suite.partitions[collectionID] {
partition := mgr.GetPartition(partitionID)
suite.Nil(partition)
}
}
}
Expand Down Expand Up @@ -368,15 +450,15 @@ func (suite *CollectionManagerSuite) TestRecover_with_dropped() {
suite.clearMemory()
err := mgr.Recover(suite.broker)
suite.NoError(err)
for i, collection := range suite.collections {
exist := suite.colLoadPercent[i] == 100 && collection != droppedCollection
for _, collection := range suite.collections {
exist := collection != droppedCollection
suite.Equal(exist, mgr.Exist(collection))
if !exist {
continue
}
for j, partitionID := range suite.partitions[collection] {
for _, partitionID := range suite.partitions[collection] {
partition := mgr.GetPartition(partitionID)
exist = suite.parLoadPercent[collection][j] == 100 && partitionID != droppedPartition
exist = partitionID != droppedPartition
suite.Equal(exist, partition != nil)
}
}
Expand Down
27 changes: 19 additions & 8 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -1174,14 +1174,15 @@ type queryCoordConfig struct {
// Deprecated: Since 2.2.2, use different interval for different checker
CheckInterval ParamItem `refreshable:"true"`

NextTargetSurviveTime ParamItem `refreshable:"true"`
UpdateNextTargetInterval ParamItem `refreshable:"false"`
CheckNodeInReplicaInterval ParamItem `refreshable:"false"`
CheckResourceGroupInterval ParamItem `refreshable:"false"`
EnableRGAutoRecover ParamItem `refreshable:"true"`
CheckHealthInterval ParamItem `refreshable:"false"`
CheckHealthRPCTimeout ParamItem `refreshable:"true"`
BrokerTimeout ParamItem `refreshable:"false"`
NextTargetSurviveTime ParamItem `refreshable:"true"`
UpdateNextTargetInterval ParamItem `refreshable:"false"`
CheckNodeInReplicaInterval ParamItem `refreshable:"false"`
CheckResourceGroupInterval ParamItem `refreshable:"false"`
EnableRGAutoRecover ParamItem `refreshable:"true"`
CheckHealthInterval ParamItem `refreshable:"false"`
CheckHealthRPCTimeout ParamItem `refreshable:"true"`
BrokerTimeout ParamItem `refreshable:"false"`
CollectionRecoverTimesLimit ParamItem `refreshable:"true"`
}

func (p *queryCoordConfig) init(base *BaseTable) {
Expand Down Expand Up @@ -1483,6 +1484,16 @@ func (p *queryCoordConfig) init(base *BaseTable) {
Export: true,
}
p.BrokerTimeout.Init(base.mgr)

p.CollectionRecoverTimesLimit = ParamItem{
Key: "queryCoord.collectionRecoverTimes",
Version: "2.3.3",
DefaultValue: "3",
PanicIfEmpty: true,
Doc: "if collection recover times reach the limit during loading state, release it",
Export: true,
}
p.CollectionRecoverTimesLimit.Init(base.mgr)
}

// /////////////////////////////////////////////////////////////////////////////
Expand Down
1 change: 1 addition & 0 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, 1000, Params.ChannelCheckInterval.GetAsInt())
assert.Equal(t, 10000, Params.BalanceCheckInterval.GetAsInt())
assert.Equal(t, 10000, Params.IndexCheckInterval.GetAsInt())
assert.Equal(t, 3, Params.CollectionRecoverTimesLimit.GetAsInt())
})

t.Run("test queryNodeConfig", func(t *testing.T) {
Expand Down

0 comments on commit 4071132

Please sign in to comment.