Skip to content

Commit

Permalink
add test for read delete from msgstream
Browse files Browse the repository at this point in the history
Signed-off-by: aoiasd <[email protected]>
  • Loading branch information
aoiasd committed Apr 9, 2024
1 parent 736829f commit 500c740
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 0 deletions.
33 changes: 33 additions & 0 deletions internal/querynodev2/delegator/delegator_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,6 +1114,39 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() {
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))
}

func (s *DelegatorDataSuite) TestReadDeleteFromMsgstream() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

s.mq.EXPECT().AsConsumer(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
s.mq.EXPECT().Seek(mock.Anything, mock.Anything).Return(nil)
s.mq.EXPECT().Close()
ch := make(chan *msgstream.MsgPack, 10)
s.mq.EXPECT().Chan().Return(ch)

oracle := pkoracle.NewBloomFilterSet(1, 1, commonpb.SegmentState_Sealed)
oracle.UpdateBloomFilter([]storage.PrimaryKey{storage.NewInt64PrimaryKey(1), storage.NewInt64PrimaryKey(2)})

baseMsg := &commonpb.MsgBase{MsgType: commonpb.MsgType_Delete}

datas := []*msgstream.MsgPack{
{EndTs: 10, EndPositions: []*msgpb.MsgPosition{{Timestamp: 10}}, Msgs: []msgstream.TsMsg{
&msgstream.DeleteMsg{DeleteRequest: msgpb.DeleteRequest{Base: baseMsg, CollectionID: s.collectionID, PartitionID: 1, PrimaryKeys: storage.ParseInt64s2IDs(1), Timestamps: []uint64{1}}},
&msgstream.DeleteMsg{DeleteRequest: msgpb.DeleteRequest{Base: baseMsg, CollectionID: s.collectionID, PartitionID: -1, PrimaryKeys: storage.ParseInt64s2IDs(2), Timestamps: []uint64{5}}},
// invalid msg because partition wrong
&msgstream.DeleteMsg{DeleteRequest: msgpb.DeleteRequest{Base: baseMsg, CollectionID: s.collectionID, PartitionID: 2, PrimaryKeys: storage.ParseInt64s2IDs(1), Timestamps: []uint64{10}}},
}},
}

for _, data := range datas {
ch <- data
}

result, err := s.delegator.readDeleteFromMsgstream(ctx, &msgpb.MsgPosition{Timestamp: 0}, 10, oracle)
s.NoError(err)
s.Equal(2, len(result.Pks))
}

func TestDelegatorDataSuite(t *testing.T) {
suite.Run(t, new(DelegatorDataSuite))
}
15 changes: 15 additions & 0 deletions internal/storage/primary_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,3 +410,18 @@ func ParsePrimaryKeys2IDs(pks []PrimaryKey) *schemapb.IDs {

return ret
}

func ParseInt64s2IDs(pks ...int64) *schemapb.IDs {
ret := &schemapb.IDs{}
if len(pks) == 0 {
return ret
}

ret.IdField = &schemapb.IDs_IntId{
IntId: &schemapb.LongArray{
Data: pks,
},
}

return ret
}

0 comments on commit 500c740

Please sign in to comment.