Skip to content

Commit

Permalink
fix ut
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 committed Dec 18, 2024
1 parent 6c90db3 commit 457113f
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 15 deletions.
12 changes: 12 additions & 0 deletions internal/datacoord/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,11 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
}
validPartitions = collInfo.Partitions
}
validPartitionsMap := make(map[int64]bool)
for _, partitionID := range validPartitions {
currentPartitionStatsVersion := h.s.meta.partitionStatsMeta.GetCurrentPartitionStatsVersion(channel.GetCollectionID(), partitionID, channel.GetName())
partStatsVersionsMap[partitionID] = currentPartitionStatsVersion
validPartitionsMap[partitionID] = true
}

var (
Expand All @@ -146,6 +148,9 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
indexed := typeutil.NewUniqueSet(lo.Map(indexedSegments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })...)

for _, s := range segments {
if !validPartitionsMap[s.GetPartitionID()] {
continue
}
if s.GetStartPosition() == nil && s.GetDmlPosition() == nil {
continue
}
Expand Down Expand Up @@ -290,6 +295,10 @@ func (h *ServerHandler) GetCurrentSegmentsView(ctx context.Context, channel RWCh
}
validPartitions = collInfo.Partitions
}
validPartitionsMap := make(map[int64]bool)
for _, partitionID := range validPartitions {
validPartitionsMap[partitionID] = true
}

var (
flushedIDs = make(typeutil.UniqueSet)
Expand All @@ -304,6 +313,9 @@ func (h *ServerHandler) GetCurrentSegmentsView(ctx context.Context, channel RWCh

validSegmentInfos := make(map[int64]*SegmentInfo)
for _, s := range segments {
if !validPartitionsMap[s.GetPartitionID()] {
continue
}
if s.GetStartPosition() == nil && s.GetDmlPosition() == nil {
continue
}
Expand Down
99 changes: 84 additions & 15 deletions internal/datacoord/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,17 @@ func TestGetSegmentsByStates(t *testing.T) {
t.Run("normal case", func(t *testing.T) {
svr := newTestServer(t)
defer closeTestServer(t, svr)
channelManager := NewMockChannelManager(t)
channelName := "ch"
channelManager.EXPECT().GetChannelsByCollectionID(mock.Anything).RunAndReturn(func(id int64) []RWChannel {
return []RWChannel{
&channelMeta{
Name: channelName,
CollectionID: id,
},
}
}).Maybe()
svr.channelManager = channelManager
type testCase struct {
collID int64
partID int64
Expand Down Expand Up @@ -622,34 +633,92 @@ func TestGetSegmentsByStates(t *testing.T) {
expected: []int64{9, 10},
},
}
svr.meta.AddCollection(&collectionInfo{
ID: 1,
Partitions: []int64{1, 2},
Schema: nil,
StartPositions: []*commonpb.KeyDataPair{
{
Key: "ch1",
Data: []byte{8, 9, 10},
},
},
})
svr.meta.AddCollection(&collectionInfo{
ID: 2,
Partitions: []int64{3},
Schema: nil,
StartPositions: []*commonpb.KeyDataPair{
{
Key: "ch1",
Data: []byte{8, 9, 10},
},
},
})
for _, tc := range cases {
for _, fs := range tc.flushedSegments {
segInfo := &datapb.SegmentInfo{
ID: fs,
CollectionID: tc.collID,
PartitionID: tc.partID,
State: commonpb.SegmentState_Flushed,
NumOfRows: 1024,
ID: fs,
CollectionID: tc.collID,
PartitionID: tc.partID,
InsertChannel: channelName,
State: commonpb.SegmentState_Flushed,
NumOfRows: 1024,
StartPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{8, 9, 10},
MsgGroup: "",
},
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{11, 12, 13},
MsgGroup: "",
Timestamp: 2,
},
}
assert.Nil(t, svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segInfo)))
}
for _, us := range tc.sealedSegments {
segInfo := &datapb.SegmentInfo{
ID: us,
CollectionID: tc.collID,
PartitionID: tc.partID,
State: commonpb.SegmentState_Sealed,
NumOfRows: 1024,
ID: us,
CollectionID: tc.collID,
PartitionID: tc.partID,
InsertChannel: channelName,
State: commonpb.SegmentState_Sealed,
NumOfRows: 1024,
StartPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{8, 9, 10},
MsgGroup: "",
},
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{11, 12, 13},
MsgGroup: "",
Timestamp: 2,
},
}
assert.Nil(t, svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segInfo)))
}
for _, us := range tc.growingSegments {
segInfo := &datapb.SegmentInfo{
ID: us,
CollectionID: tc.collID,
PartitionID: tc.partID,
State: commonpb.SegmentState_Growing,
NumOfRows: 1024,
ID: us,
CollectionID: tc.collID,
PartitionID: tc.partID,
InsertChannel: channelName,
State: commonpb.SegmentState_Growing,
NumOfRows: 1024,
StartPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{8, 9, 10},
MsgGroup: "",
},
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{11, 12, 13},
MsgGroup: "",
Timestamp: 2,
},
}
assert.Nil(t, svr.meta.AddSegment(context.TODO(), NewSegmentInfo(segInfo)))
}
Expand Down
3 changes: 3 additions & 0 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,9 @@ func (s *Server) GetSegmentsByStates(ctx context.Context, req *datapb.GetSegment
channels := s.channelManager.GetChannelsByCollectionID(collectionID)
for _, channel := range channels {
channelSegmentsView := s.handler.GetCurrentSegmentsView(ctx, channel, partitionID)
if channelSegmentsView == nil {
continue
}
segmentIDs = append(segmentIDs, channelSegmentsView.FlushedSegmentIDs...)
segmentIDs = append(segmentIDs, channelSegmentsView.GrowingSegmentIDs...)
segmentIDs = append(segmentIDs, channelSegmentsView.L0SegmentIDs...)
Expand Down

0 comments on commit 457113f

Please sign in to comment.