Skip to content

Commit

Permalink
fix: [Cherry-pick] sync action load segment with lack collection inde…
Browse files Browse the repository at this point in the history
…x info list (#28956)

relate: #28779
#28637
pr: #28788

Signed-off-by: aoiasd <[email protected]>
  • Loading branch information
aoiasd authored Dec 7, 2023
1 parent f670055 commit 8502037
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 321 deletions.
1 change: 1 addition & 0 deletions internal/proto/query_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,7 @@ message SyncDistributionRequest {
LoadMetaInfo load_meta = 6;
int64 replicaID = 7;
int64 version = 8;
repeated index.IndexInfo index_info_list = 9;
}

message ResourceGroup {
Expand Down
615 changes: 312 additions & 303 deletions internal/proto/querypb/query_coord.pb.go

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion internal/querycoordv2/observers/leader_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,14 @@ func (o *LeaderObserver) sync(ctx context.Context, replicaID int64, leaderView *
log.Warn("sync distribution failed, cannot get schema of collection", zap.Error(err))
return false
}

// Get collection index info
indexInfo, err := o.broker.DescribeIndex(ctx, leaderView.CollectionID)
if err != nil {
log.Warn("fail to get index info of collection", zap.Error(err))
return false
}

partitions, err := utils.GetPartitions(o.meta.CollectionManager, leaderView.CollectionID)
if err != nil {
log.Warn("sync distribution failed, cannot get partitions of collection", zap.Error(err))
Expand All @@ -236,7 +244,8 @@ func (o *LeaderObserver) sync(ctx context.Context, replicaID int64, leaderView *
CollectionID: leaderView.CollectionID,
PartitionIDs: partitions,
},
Version: time.Now().UnixNano(),
Version: time.Now().UnixNano(),
IndexInfoList: indexInfo,
}
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond))
defer cancel()
Expand Down
33 changes: 27 additions & 6 deletions internal/querycoordv2/observers/leader_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package observers

import (
"context"
"fmt"
"testing"
"time"

Expand All @@ -31,6 +32,7 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
Expand Down Expand Up @@ -120,6 +122,11 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() {
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, int64(1)).Return(schema, nil)
suite.broker.EXPECT().GetSegmentInfo(mock.Anything, int64(1)).Return(
&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{info}}, nil)
// will cause sync failed once
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error")).Once()
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{
{IndexName: "test"},
}, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
Expand Down Expand Up @@ -154,7 +161,8 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() {
CollectionID: 1,
PartitionIDs: []int64{1},
},
Version: version,
Version: version,
IndexInfoList: []*indexpb.IndexInfo{{IndexName: "test"}},
}
}

Expand Down Expand Up @@ -212,6 +220,9 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() {
&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{info}}, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{
{IndexName: "test"},
}, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.target.UpdateCollectionNextTarget(int64(1))
Expand Down Expand Up @@ -246,7 +257,8 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() {
CollectionID: 1,
PartitionIDs: []int64{1},
},
Version: version,
Version: version,
IndexInfoList: []*indexpb.IndexInfo{{IndexName: "test"}},
}
}
called := atomic.NewBool(false)
Expand Down Expand Up @@ -344,6 +356,7 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegmentsWithReplicas() {
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, int64(1)).Return(schema, nil)
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{{IndexName: "test"}}, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel"))
Expand Down Expand Up @@ -380,7 +393,8 @@ func (suite *LeaderObserverTestSuite) TestSyncLoadedSegmentsWithReplicas() {
CollectionID: 1,
PartitionIDs: []int64{1},
},
Version: version,
Version: version,
IndexInfoList: []*indexpb.IndexInfo{{IndexName: "test"}},
}
}
called := atomic.NewBool(false)
Expand Down Expand Up @@ -412,7 +426,9 @@ func (suite *LeaderObserverTestSuite) TestSyncRemovedSegments() {

schema := utils.CreateTestSchema()
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, int64(1)).Return(schema, nil)

suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{
{IndexName: "test"},
}, nil)
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
Expand Down Expand Up @@ -450,7 +466,8 @@ func (suite *LeaderObserverTestSuite) TestSyncRemovedSegments() {
CollectionID: 1,
PartitionIDs: []int64{1},
},
Version: version,
Version: version,
IndexInfoList: []*indexpb.IndexInfo{{IndexName: "test"}},
}
}
ch := make(chan struct{})
Expand Down Expand Up @@ -494,6 +511,9 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncRemovedSegments() {
suite.broker.EXPECT().GetCollectionSchema(mock.Anything, int64(1)).Return(schema, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{
{IndexName: "test"},
}, nil)
observer.target.UpdateCollectionNextTarget(int64(1))

observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
Expand All @@ -519,7 +539,8 @@ func (suite *LeaderObserverTestSuite) TestIgnoreSyncRemovedSegments() {
CollectionID: 1,
PartitionIDs: []int64{1},
},
Version: version,
Version: version,
IndexInfoList: []*indexpb.IndexInfo{{IndexName: "test"}},
}
}
called := atomic.NewBool(false)
Expand Down
10 changes: 9 additions & 1 deletion internal/querycoordv2/observers/target_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,13 @@ func (ob *TargetObserver) sync(ctx context.Context, replicaID int64, leaderView
return false
}

// Get collection index info
indexInfo, err := ob.broker.DescribeIndex(ctx, leaderView.CollectionID)
if err != nil {
log.Warn("fail to get index info of collection", zap.Error(err))
return false
}

req := &querypb.SyncDistributionRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_SyncDistribution),
Expand All @@ -367,7 +374,8 @@ func (ob *TargetObserver) sync(ctx context.Context, replicaID int64, leaderView
CollectionID: leaderView.CollectionID,
PartitionIDs: partitions,
},
Version: time.Now().UnixNano(),
Version: time.Now().UnixNano(),
IndexInfoList: indexInfo,
}
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond))
defer cancel()
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/observers/target_observer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {
}, 7*time.Second, 1*time.Second)
}

func (suite *TargetObserverSuite) TearDownSuite() {
func (suite *TargetObserverSuite) TearDownTest() {
suite.kv.Close()
suite.observer.Stop()
}
Expand Down
19 changes: 10 additions & 9 deletions internal/querynodev2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -1360,15 +1360,16 @@ func (node *QueryNode) SyncDistribution(ctx context.Context, req *querypb.SyncDi
commonpbutil.WithMsgType(commonpb.MsgType_LoadSegments),
commonpbutil.WithMsgID(req.Base.GetMsgID()),
),
Infos: []*querypb.SegmentLoadInfo{action.GetInfo()},
Schema: req.GetSchema(),
LoadMeta: req.GetLoadMeta(),
CollectionID: req.GetCollectionID(),
ReplicaID: req.GetReplicaID(),
DstNodeID: action.GetNodeID(),
Version: action.GetVersion(),
NeedTransfer: false,
LoadScope: querypb.LoadScope_Delta,
Infos: []*querypb.SegmentLoadInfo{action.GetInfo()},
Schema: req.GetSchema(),
LoadMeta: req.GetLoadMeta(),
CollectionID: req.GetCollectionID(),
ReplicaID: req.GetReplicaID(),
DstNodeID: action.GetNodeID(),
Version: action.GetVersion(),
NeedTransfer: false,
LoadScope: querypb.LoadScope_Delta,
IndexInfoList: req.GetIndexInfoList(),
})
})
case querypb.SyncType_UpdateVersion:
Expand Down

0 comments on commit 8502037

Please sign in to comment.