diff --git a/internal/querynodev2/delegator/delta_forward.go b/internal/querynodev2/delegator/delta_forward.go index d3844e650f539..c59bdcd2671b7 100644 --- a/internal/querynodev2/delegator/delta_forward.go +++ b/internal/querynodev2/delegator/delta_forward.go @@ -86,14 +86,14 @@ func (sd *shardDelegator) forwardStreamingDeletion(ctx context.Context, deleteDa } func (sd *shardDelegator) addL0ForGrowing(ctx context.Context, segment segments.Segment) error { - switch policy := paramtable.Get().QueryNodeCfg.StreamingDeltaForwardPolicy.GetValue(); policy { - case ForwardPolicyDefault, StreamingForwardPolicyBF: + switch sd.l0ForwardPolicy { + case ForwardPolicyDefault, L0ForwardPolicyBF: return sd.addL0GrowingBF(ctx, segment) - case StreamingForwardPolicyDirect: + case L0ForwardPolicyRemoteLoad: // forward streaming deletion without bf filtering return sd.addL0ForGrowingLoad(ctx, segment) default: - log.Fatal("unsupported streaming forward policy", zap.String("policy", policy)) + log.Fatal("unsupported l0 forward policy", zap.String("policy", sd.l0ForwardPolicy)) } return nil } diff --git a/internal/querynodev2/delegator/delta_forward_test.go b/internal/querynodev2/delegator/delta_forward_test.go index e9661a53bc754..ae6f832266b93 100644 --- a/internal/querynodev2/delegator/delta_forward_test.go +++ b/internal/querynodev2/delegator/delta_forward_test.go @@ -18,14 +18,17 @@ package delegator import ( "context" + "math/rand" "testing" + "github.com/cockroachdb/errors" "github.com/samber/lo" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/querypb" "github.com/milvus-io/milvus/internal/proto/segcorepb" "github.com/milvus-io/milvus/internal/querynodev2/cluster" @@ -265,3 +268,231 @@ func (s *StreamingForwardSuite) TestDirectStreamingForward() { func TestStreamingForward(t *testing.T) { suite.Run(t, new(StreamingForwardSuite)) } + +type GrowingMergeL0Suite struct { + suite.Suite + + collectionID int64 + partitionIDs []int64 + replicaID int64 + vchannelName string + version int64 + schema *schemapb.CollectionSchema + workerManager *cluster.MockManager + manager *segments.Manager + tsafeManager tsafe.Manager + loader *segments.MockLoader + mq *msgstream.MockMsgStream + + delegator *shardDelegator + chunkManager storage.ChunkManager + rootPath string +} + +func (s *GrowingMergeL0Suite) SetupSuite() { + paramtable.Init() + paramtable.SetNodeID(1) +} + +func (s *GrowingMergeL0Suite) SetupTest() { + s.collectionID = 1000 + s.partitionIDs = []int64{500, 501} + s.replicaID = 65535 + s.vchannelName = "rootcoord-dml_1000v0" + s.version = 2000 + s.workerManager = &cluster.MockManager{} + s.manager = segments.NewManager() + s.tsafeManager = tsafe.NewTSafeReplica() + s.loader = &segments.MockLoader{} + s.loader.EXPECT(). + Load(mock.Anything, s.collectionID, segments.SegmentTypeGrowing, int64(0), mock.Anything). + Call.Return(func(ctx context.Context, collectionID int64, segmentType segments.SegmentType, version int64, infos ...*querypb.SegmentLoadInfo) []segments.Segment { + return lo.Map(infos, func(info *querypb.SegmentLoadInfo, _ int) segments.Segment { + ms := &segments.MockSegment{} + ms.EXPECT().ID().Return(info.GetSegmentID()) + ms.EXPECT().Type().Return(segments.SegmentTypeGrowing) + ms.EXPECT().Partition().Return(info.GetPartitionID()) + ms.EXPECT().Collection().Return(info.GetCollectionID()) + ms.EXPECT().Indexes().Return(nil) + ms.EXPECT().RowNum().Return(info.GetNumOfRows()) + ms.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).Return(nil) + return ms + }) + }, nil) + + // init schema + s.schema = &schemapb.CollectionSchema{ + Name: "TestCollection", + Fields: []*schemapb.FieldSchema{ + { + Name: "id", + FieldID: 100, + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + AutoID: true, + }, + { + Name: "vector", + FieldID: 101, + IsPrimaryKey: false, + DataType: schemapb.DataType_BinaryVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + }, + } + s.manager.Collection.PutOrRef(s.collectionID, s.schema, &segcorepb.CollectionIndexMeta{ + MaxIndexRowCount: 100, + IndexMetas: []*segcorepb.FieldIndexMeta{ + { + FieldID: 101, + CollectionID: s.collectionID, + IndexName: "binary_index", + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: "BIN_IVF_FLAT", + }, + { + Key: common.MetricTypeKey, + Value: metric.JACCARD, + }, + }, + }, + }, + }, &querypb.LoadMetaInfo{ + PartitionIDs: s.partitionIDs, + }) + + s.mq = &msgstream.MockMsgStream{} + s.rootPath = "delegator_test" + + // init chunkManager + chunkManagerFactory := storage.NewTestChunkManagerFactory(paramtable.Get(), s.rootPath) + s.chunkManager, _ = chunkManagerFactory.NewPersistentStorageChunkManager(context.Background()) + + delegator, err := NewShardDelegator(context.Background(), s.collectionID, s.replicaID, s.vchannelName, s.version, s.workerManager, s.manager, s.tsafeManager, s.loader, &msgstream.MockMqFactory{ + NewMsgStreamFunc: func(_ context.Context) (msgstream.MsgStream, error) { + return s.mq, nil + }, + }, 10000, nil, s.chunkManager) + s.Require().NoError(err) + + sd, ok := delegator.(*shardDelegator) + s.Require().True(ok) + s.delegator = sd +} + +func (s *GrowingMergeL0Suite) TestAddL0ForGrowingBF() { + sd := s.delegator + sd.l0ForwardPolicy = L0ForwardPolicyBF + + seg := segments.NewMockSegment(s.T()) + coll := s.manager.Collection.Get(s.collectionID) + l0Segment, err := segments.NewL0Segment(coll, segments.SegmentTypeSealed, s.version, &querypb.SegmentLoadInfo{ + SegmentID: 10001, + CollectionID: s.collectionID, + PartitionID: common.AllPartitionsID, + InsertChannel: s.vchannelName, + }) + s.Require().NoError(err) + + n := 10 + deltaData := storage.NewDeltaData(int64(n)) + + for i := 0; i < n; i++ { + deltaData.Append(storage.NewInt64PrimaryKey(rand.Int63()), 0) + } + err = l0Segment.LoadDeltaData(context.Background(), deltaData) + s.Require().NoError(err) + s.manager.Segment.Put(context.Background(), segments.SegmentTypeSealed, l0Segment) + + seg.EXPECT().ID().Return(10000) + seg.EXPECT().Partition().Return(100) + seg.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, pk storage.PrimaryKeys, u []uint64) error { + s.Equal(deltaData.DeletePks(), pk) + s.Equal(deltaData.DeleteTimestamps(), u) + return nil + }).Once() + + err = sd.addL0ForGrowing(context.Background(), seg) + s.NoError(err) + + seg.EXPECT().Delete(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, pk storage.PrimaryKeys, u []uint64) error { + return errors.New("mocked") + }).Once() + err = sd.addL0ForGrowing(context.Background(), seg) + s.Error(err) +} + +func (s *GrowingMergeL0Suite) TestAddL0ForGrowingLoad() { + sd := s.delegator + sd.l0ForwardPolicy = L0ForwardPolicyRemoteLoad + + seg := segments.NewMockSegment(s.T()) + coll := s.manager.Collection.Get(s.collectionID) + l0Segment, err := segments.NewL0Segment(coll, segments.SegmentTypeSealed, s.version, &querypb.SegmentLoadInfo{ + SegmentID: 10001, + CollectionID: s.collectionID, + PartitionID: common.AllPartitionsID, + InsertChannel: s.vchannelName, + Deltalogs: []*datapb.FieldBinlog{ + {Binlogs: []*datapb.Binlog{ + {LogPath: "mocked_log_path"}, + }}, + }, + }) + s.Require().NoError(err) + + n := 10 + deltaData := storage.NewDeltaData(int64(n)) + + for i := 0; i < n; i++ { + deltaData.Append(storage.NewInt64PrimaryKey(rand.Int63()), 0) + } + err = l0Segment.LoadDeltaData(context.Background(), deltaData) + s.Require().NoError(err) + s.manager.Segment.Put(context.Background(), segments.SegmentTypeSealed, l0Segment) + + seg.EXPECT().ID().Return(10000) + seg.EXPECT().Partition().Return(100) + s.loader.EXPECT().LoadDeltaLogs(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, seg segments.Segment, fb []*datapb.FieldBinlog) error { + s.ElementsMatch([]string{"mocked_log_path"}, lo.Flatten(lo.Map(fb, func(fbl *datapb.FieldBinlog, _ int) []string { + return lo.Map(fbl.Binlogs, func(bl *datapb.Binlog, _ int) string { return bl.LogPath }) + }))) + return nil + }).Once() + + err = sd.addL0ForGrowing(context.Background(), seg) + s.NoError(err) + + s.loader.EXPECT().LoadDeltaLogs(mock.Anything, mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, seg segments.Segment, fb []*datapb.FieldBinlog) error { + return errors.New("mocked") + }).Once() + err = sd.addL0ForGrowing(context.Background(), seg) + s.Error(err) +} + +func (s *GrowingMergeL0Suite) TestAddL0ForGrowingInvalid() { + sd := s.delegator + sd.l0ForwardPolicy = "invalid_policy" + + seg := segments.NewMockSegment(s.T()) + s.Panics(func() { + sd.addL0ForGrowing(context.Background(), seg) + }) +} + +func TestGrowingMergeL0(t *testing.T) { + suite.Run(t, new(GrowingMergeL0Suite)) +}