From d5945069bc1ed47f66251e8d3e886963a4f95342 Mon Sep 17 00:00:00 2001 From: MrPresent-Han Date: Tue, 26 Sep 2023 13:56:24 +0800 Subject: [PATCH] fix false handle for describe index response(#25363) 1. fix incorrect handling for response of describeIndex on rootCoord 2. remove unuseful indexInfo inside loadParitionsRequest Signed-off-by: MrPresent-Han --- internal/querycoordv2/job/job_test.go | 51 +------------------ internal/querycoordv2/job/utils.go | 11 ++-- .../querycoordv2/meta/coordinator_broker.go | 8 +-- .../meta/coordinator_broker_test.go | 22 ++++++++ internal/querycoordv2/services_test.go | 2 - 5 files changed, 30 insertions(+), 64 deletions(-) diff --git a/internal/querycoordv2/job/job_test.go b/internal/querycoordv2/job/job_test.go index d12ca44072d9e..1f44a0e2e73cb 100644 --- a/internal/querycoordv2/job/job_test.go +++ b/internal/querycoordv2/job/job_test.go @@ -128,8 +128,6 @@ func (suite *JobSuite) SetupSuite() { suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything). Return(nil, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything). - Return(nil, nil) suite.cluster = session.NewMockCluster(suite.T()) suite.cluster.EXPECT(). @@ -1188,56 +1186,10 @@ func (suite *JobSuite) TestLoadCreateReplicaFailed() { } func (suite *JobSuite) TestCallLoadPartitionFailed() { - // call LoadPartitions failed at get index info - getIndexErr := fmt.Errorf("mock get index error") - suite.broker.ExpectedCalls = lo.Filter(suite.broker.ExpectedCalls, func(call *mock.Call, _ int) bool { - return call.Method != "DescribeIndex" - }) - for _, collection := range suite.collections { - suite.broker.EXPECT().DescribeIndex(mock.Anything, collection).Return(nil, getIndexErr) - loadCollectionReq := &querypb.LoadCollectionRequest{ - CollectionID: collection, - } - loadCollectionJob := NewLoadCollectionJob( - context.Background(), - loadCollectionReq, - suite.dist, - suite.meta, - suite.broker, - suite.cluster, - suite.targetMgr, - suite.targetObserver, - suite.nodeMgr, - ) - suite.scheduler.Add(loadCollectionJob) - err := loadCollectionJob.Wait() - suite.T().Logf("%s", err) - suite.ErrorIs(err, getIndexErr) - - loadPartitionReq := &querypb.LoadPartitionsRequest{ - CollectionID: collection, - PartitionIDs: suite.partitions[collection], - } - loadPartitionJob := NewLoadPartitionJob( - context.Background(), - loadPartitionReq, - suite.dist, - suite.meta, - suite.broker, - suite.cluster, - suite.targetMgr, - suite.targetObserver, - suite.nodeMgr, - ) - suite.scheduler.Add(loadPartitionJob) - err = loadPartitionJob.Wait() - suite.ErrorIs(err, getIndexErr) - } - // call LoadPartitions failed at get schema getSchemaErr := fmt.Errorf("mock get schema error") suite.broker.ExpectedCalls = lo.Filter(suite.broker.ExpectedCalls, func(call *mock.Call, _ int) bool { - return call.Method != "GetCollectionSchema" + return call.Method != "GetCollectionSchema" && call.Method != "DescribeIndex" }) for _, collection := range suite.collections { suite.broker.EXPECT().GetCollectionSchema(mock.Anything, collection).Return(nil, getSchemaErr) @@ -1283,7 +1235,6 @@ func (suite *JobSuite) TestCallLoadPartitionFailed() { return call.Method != "DescribeIndex" && call.Method != "GetCollectionSchema" }) suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything).Return(nil, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return(nil, nil) } func (suite *JobSuite) TestCallReleasePartitionFailed() { diff --git a/internal/querycoordv2/job/utils.go b/internal/querycoordv2/job/utils.go index c6a9b26cfcc56..4098adc19234e 100644 --- a/internal/querycoordv2/job/utils.go +++ b/internal/querycoordv2/job/utils.go @@ -78,20 +78,15 @@ func loadPartitions(ctx context.Context, return err } } - indexes, err := broker.DescribeIndex(ctx, collection) - if err != nil { - return err - } replicas := meta.ReplicaManager.GetByCollection(collection) loadReq := &querypb.LoadPartitionsRequest{ Base: &commonpb.MsgBase{ MsgType: commonpb.MsgType_LoadPartitions, }, - CollectionID: collection, - PartitionIDs: partitions, - Schema: schema, - IndexInfoList: indexes, + CollectionID: collection, + PartitionIDs: partitions, + Schema: schema, } for _, replica := range replicas { for _, node := range replica.GetNodes() { diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index 35e90d81753e0..c8ecff0052c18 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -248,15 +248,15 @@ func (broker *CoordinatorBroker) DescribeIndex(ctx context.Context, collectionID ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond)) defer cancel() - resp, err := broker.dataCoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{ + resp, _ := broker.dataCoord.DescribeIndex(ctx, &indexpb.DescribeIndexRequest{ CollectionID: collectionID, }) - if err != nil || resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { + if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { log.Error("failed to fetch index meta", zap.Int64("collection", collectionID), - zap.Error(err)) - return nil, err + zap.Error(merr.Error(resp.GetStatus()))) + return nil, merr.Error(resp.GetStatus()) } return resp.IndexInfos, nil } diff --git a/internal/querycoordv2/meta/coordinator_broker_test.go b/internal/querycoordv2/meta/coordinator_broker_test.go index f3e372d78446b..6d7ff618be8e9 100644 --- a/internal/querycoordv2/meta/coordinator_broker_test.go +++ b/internal/querycoordv2/meta/coordinator_broker_test.go @@ -29,7 +29,9 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/proto/indexpb" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestCoordinatorBroker_GetCollectionSchema(t *testing.T) { @@ -147,3 +149,23 @@ func TestCoordinatorBroker_GetPartitions(t *testing.T) { assert.ErrorIs(t, err, merr.ErrCollectionNotFound) }) } + +func TestCoordinatorBroker_DescribeIndex(t *testing.T) { + paramtable.Init() + t.Run("get error", func(t *testing.T) { + dc := mocks.NewMockDataCoordClient(t) + resp := &indexpb.DescribeIndexResponse{ + Status: &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_UnexpectedError, + Reason: "fake error for test", + }, + } + dc.EXPECT().DescribeIndex(mock.Anything, mock.Anything). + Return(resp, nil) + + broker := &CoordinatorBroker{dataCoord: dc} + descResp, err := broker.DescribeIndex(context.Background(), 1) + assert.Error(t, err) + assert.Nil(t, descResp) + }) +} diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 40f7e64cfb13b..c664588eece5a 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -1737,8 +1737,6 @@ func (suite *ServiceSuite) expectGetRecoverInfo(collection int64) { func (suite *ServiceSuite) expectLoadPartitions() { suite.broker.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything). Return(nil, nil) - suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything). - Return(nil, nil) suite.cluster.EXPECT().LoadPartitions(mock.Anything, mock.Anything, mock.Anything). Return(merr.Status(nil), nil) }