From b16d04d7cc91e911fb7c93d8f1f39ec18120d289 Mon Sep 17 00:00:00 2001 From: wei liu Date: Wed, 25 Dec 2024 15:02:50 +0800 Subject: [PATCH] fix: Fix update loading collection's load config doesn't work (#38737) issue: #38594 pr: #38595 Signed-off-by: Wei Liu --- internal/querycoordv2/services.go | 4 +- internal/querycoordv2/services_test.go | 52 --------- tests/integration/replicas/load/load_test.go | 117 +++++++++++++++++++ 3 files changed, 119 insertions(+), 54 deletions(-) diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index d8aa5178711a4..72ed6101e744d 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -253,7 +253,7 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection var loadJob job.Job collection := s.meta.GetCollection(ctx, req.GetCollectionID()) - if collection != nil && collection.GetStatus() == querypb.LoadStatus_Loaded { + if collection != nil { // if collection is loaded, check if collection is loaded with the same replica number and resource groups // if replica number or resource group changes, switch to update load config collectionUsedRG := s.meta.ReplicaManager.GetResourceGroupByCollection(ctx, collection.GetCollectionID()).Collect() @@ -1180,7 +1180,7 @@ func (s *Server) UpdateLoadConfig(ctx context.Context, req *querypb.UpdateLoadCo jobs := make([]job.Job, 0, len(req.GetCollectionIDs())) for _, collectionID := range req.GetCollectionIDs() { collection := s.meta.GetCollection(ctx, collectionID) - if collection == nil || collection.GetStatus() != querypb.LoadStatus_Loaded { + if collection == nil { err := merr.WrapErrCollectionNotLoaded(collectionID) log.Warn("failed to update load config", zap.Error(err)) continue diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 1381bc2a23b51..77195dce5cd08 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -857,58 +857,6 @@ func (suite *ServiceSuite) TestTransferReplica() { suite.ErrorIs(merr.Error(resp), merr.ErrServiceNotReady) } -func (suite *ServiceSuite) TestLoadCollectionFailed() { - suite.loadAll() - ctx := context.Background() - server := suite.server - - // Test load with different replica number - for _, collection := range suite.collections { - req := &querypb.LoadCollectionRequest{ - CollectionID: collection, - ReplicaNumber: suite.replicaNumber[collection] + 1, - } - resp, err := server.LoadCollection(ctx, req) - suite.NoError(err) - suite.ErrorIs(merr.Error(resp), merr.ErrParameterInvalid) - } - - req := &querypb.LoadCollectionRequest{ - CollectionID: 1001, - ReplicaNumber: 2, - ResourceGroups: []string{meta.DefaultResourceGroupName, "rg"}, - } - resp, err := server.LoadCollection(ctx, req) - suite.NoError(err) - suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode) - - // Test load with partitions loaded - for _, collection := range suite.collections { - if suite.loadTypes[collection] != querypb.LoadType_LoadPartition { - continue - } - - req := &querypb.LoadCollectionRequest{ - CollectionID: collection, - } - resp, err := server.LoadCollection(ctx, req) - suite.NoError(err) - suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode) - } - - // Test load with wrong rg num - for _, collection := range suite.collections { - req := &querypb.LoadCollectionRequest{ - CollectionID: collection, - ReplicaNumber: suite.replicaNumber[collection] + 1, - ResourceGroups: []string{"rg1", "rg2"}, - } - resp, err := server.LoadCollection(ctx, req) - suite.NoError(err) - suite.Equal(commonpb.ErrorCode_IllegalArgument, resp.ErrorCode) - } -} - func (suite *ServiceSuite) TestLoadPartition() { ctx := context.Background() server := suite.server diff --git a/tests/integration/replicas/load/load_test.go b/tests/integration/replicas/load/load_test.go index ecbb918d84b92..3609441cf52ba 100644 --- a/tests/integration/replicas/load/load_test.go +++ b/tests/integration/replicas/load/load_test.go @@ -854,6 +854,123 @@ func (s *LoadTestSuite) TestDynamicUpdateLoadConfigs_WithRGLackOfNode() { s.releaseCollection(dbName, collectionName) } +func (s *LoadTestSuite) TestDynamicUpdateLoadConfigs_OnLoadingCollection() { + ctx := context.Background() + s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{ + DBName: dbName, + Dim: dim, + CollectionName: collectionName, + ChannelNum: 1, + SegmentNum: 1, + RowNumPerSegment: 2000, + }) + + // prepare resource groups + rgNum := 10 + rgs := make([]string, 0) + for i := 0; i < rgNum; i++ { + rgs = append(rgs, fmt.Sprintf("rg_%d", i)) + s.Cluster.QueryCoord.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{ + ResourceGroup: rgs[i], + Config: &rgpb.ResourceGroupConfig{ + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + + TransferFrom: []*rgpb.ResourceGroupTransfer{ + { + ResourceGroup: meta.DefaultResourceGroupName, + }, + }, + TransferTo: []*rgpb.ResourceGroupTransfer{ + { + ResourceGroup: meta.DefaultResourceGroupName, + }, + }, + }, + }) + } + + resp, err := s.Cluster.QueryCoord.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{}) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + s.Len(resp.GetResourceGroups(), rgNum+1) + + for i := 1; i < rgNum; i++ { + s.Cluster.AddQueryNode() + } + + nodesInRG := make(map[string][]int64) + s.Eventually(func() bool { + matchCounter := 0 + for _, rg := range rgs { + resp1, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{ + ResourceGroup: rg, + }) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + if len(resp1.ResourceGroup.Nodes) == 1 { + matchCounter += 1 + nodesInRG[rg] = []int64{resp1.ResourceGroup.Nodes[0].NodeId} + } + } + return matchCounter == rgNum + }, 30*time.Second, time.Second) + + // trigger collection loading, and modify collection's load config during loading + loadStatus, err := s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + ReplicaNumber: 1, + ResourceGroups: rgs[:1], + }) + s.NoError(err) + s.True(merr.Ok(loadStatus)) + loadStatus, err = s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + ReplicaNumber: 3, + ResourceGroups: rgs[1:4], + }) + s.NoError(err) + s.True(merr.Ok(loadStatus)) + loadStatus, err = s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + ReplicaNumber: 5, + ResourceGroups: rgs[4:9], + }) + s.NoError(err) + s.True(merr.Ok(loadStatus)) + + s.Eventually(func() bool { + resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp3.Status)) + return len(resp3.GetReplicas()) == 5 + }, 30*time.Second, 1*time.Second) + + s.Eventually(func() bool { + segmentNum, channelNum := 0, 0 + for _, qn := range s.Cluster.GetAllQueryNodes() { + resp, err := qn.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) + s.NoError(err) + s.True(merr.Ok(resp.Status)) + segmentNum += len(resp.Segments) + channelNum += len(resp.Channels) + } + return segmentNum == 5 && channelNum == 5 + }, 30*time.Second, 1*time.Second) + + s.releaseCollection(dbName, collectionName) +} + func TestReplicas(t *testing.T) { suite.Run(t, new(LoadTestSuite)) }