Skip to content

Commit

Permalink
enhance: [2.4] Exclude L0 segment from readable snapshot (#35510)
Browse files Browse the repository at this point in the history
Cherry-pick from master
pr: #35507

L0 segments now do not contain insert data and may cause confusion for
query hook optimizer if counted as sealed segment number.

This PR add segment level flag in segment entry and exclude L0 segments
while get readable segment snapshot

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Aug 16, 2024
1 parent 555b7a6 commit bd222e5
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 20 deletions.
1 change: 1 addition & 0 deletions internal/querynodev2/delegator/delegator_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,7 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
PartitionID: info.GetPartitionID(),
NodeID: req.GetDstNodeID(),
Version: req.GetVersion(),
Level: info.GetLevel(),
}
})
if req.GetInfos()[0].GetLevel() == datapb.SegmentLevel_L0 {
Expand Down
33 changes: 19 additions & 14 deletions internal/querynodev2/delegator/delegator_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
Level: datapb.SegmentLevel_L1,
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
Expand All @@ -524,6 +525,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
NodeID: 1,
PartitionID: 500,
TargetVersion: unreadableTargetVersion,
Level: datapb.SegmentLevel_L1,
},
}, sealed[0].Segments)
})
Expand Down Expand Up @@ -599,20 +601,21 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
})
s.NoError(err)

err = s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
Base: commonpbutil.NewMsgBase(),
DstNodeID: 1,
CollectionID: s.collectionID,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: 200,
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
// err = s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
// Base: commonpbutil.NewMsgBase(),
// DstNodeID: 1,
// CollectionID: s.collectionID,
// Infos: []*querypb.SegmentLoadInfo{
// {
// SegmentID: 200,
// PartitionID: 500,
// StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
// DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
// Level: datapb.SegmentLevel_L1,
// InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
// },
// },
// })

s.NoError(err)
sealed, _ := s.delegator.GetSegmentInfo(false)
Expand All @@ -624,12 +627,14 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
NodeID: 1,
PartitionID: 500,
TargetVersion: unreadableTargetVersion,
Level: datapb.SegmentLevel_L1,
},
{
SegmentID: 200,
NodeID: 1,
PartitionID: 500,
TargetVersion: unreadableTargetVersion,
Level: datapb.SegmentLevel_L0,
},
}, sealed[0].Segments)
})
Expand Down
17 changes: 11 additions & 6 deletions internal/querynodev2/delegator/distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down Expand Up @@ -84,6 +85,7 @@ type SegmentEntry struct {
PartitionID UniqueID
Version int64
TargetVersion int64
Level datapb.SegmentLevel
}

// NewDistribution creates a new distribution instance with all field initialized.
Expand Down Expand Up @@ -114,9 +116,7 @@ func (d *distribution) PinReadableSegments(partitions ...int64) (sealed []Snapsh
sealed, growing = current.Get(partitions...)
version = current.version
targetVersion := current.GetTargetVersion()
filterReadable := func(entry SegmentEntry, _ int) bool {
return entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion
}
filterReadable := d.readableFilter(targetVersion)
sealed, growing = d.filterSegments(sealed, growing, filterReadable)
return
}
Expand Down Expand Up @@ -157,9 +157,7 @@ func (d *distribution) PeekSegments(readable bool, partitions ...int64) (sealed

if readable {
targetVersion := current.GetTargetVersion()
filterReadable := func(entry SegmentEntry, _ int) bool {
return entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion
}
filterReadable := d.readableFilter(targetVersion)
sealed, growing = d.filterSegments(sealed, growing, filterReadable)
return
}
Expand Down Expand Up @@ -382,6 +380,13 @@ func (d *distribution) genSnapshot() chan struct{} {
return last.cleared
}

func (d *distribution) readableFilter(targetVersion int64) func(entry SegmentEntry, _ int) bool {
return func(entry SegmentEntry, _ int) bool {
// segment L0 is not readable for now
return entry.Level != datapb.SegmentLevel_L0 && (entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion)
}
}

// getCleanup returns cleanup snapshots function.
func (d *distribution) getCleanup(version int64) snapshotCleanup {
return func() {
Expand Down

0 comments on commit bd222e5

Please sign in to comment.