Skip to content

Commit

Permalink
fix: Support get segments from current segments view (#38512)
Browse files Browse the repository at this point in the history
issue: #38511

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Dec 18, 2024
1 parent 06d410b commit a348122
Show file tree
Hide file tree
Showing 8 changed files with 532 additions and 97 deletions.
154 changes: 121 additions & 33 deletions internal/datacoord/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/retry"
Expand All @@ -42,6 +43,15 @@ type Handler interface {
CheckShouldDropChannel(ch string) bool
FinishDropChannel(ch string, collectionID int64) error
GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error)
GetCurrentSegmentsView(ctx context.Context, channel RWChannel, partitionIDs ...UniqueID) *SegmentsView
}

type SegmentsView struct {
FlushedSegmentIDs []int64
GrowingSegmentIDs []int64
DroppedSegmentIDs []int64
L0SegmentIDs []int64
ImportingSegmentIDs []int64
}

// ServerHandler is a helper of Server
Expand Down Expand Up @@ -107,27 +117,26 @@ func (h *ServerHandler) GetDataVChanPositions(channel RWChannel, partitionID Uni
// dropped segmentIDs ---> dropped segments
// level zero segmentIDs ---> L0 segments
func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs ...UniqueID) *datapb.VchannelInfo {
partStatsVersionsMap := make(map[int64]int64)
validPartitions := lo.Filter(partitionIDs, func(partitionID int64, _ int) bool { return partitionID > allPartitionID })
if len(validPartitions) <= 0 {
collInfo, err := h.s.handler.GetCollection(h.s.ctx, channel.GetCollectionID())
if err != nil || collInfo == nil {
log.Warn("collectionInfo is nil")
return nil
filterWithPartition := len(validPartitions) > 0
validPartitionsMap := make(map[int64]bool)
partStatsVersions := h.s.meta.partitionStatsMeta.GetChannelPartitionsStatsVersion(channel.GetCollectionID(), channel.GetName())
partStatsVersionsMap := make(map[int64]int64)
if filterWithPartition {
for _, partitionID := range validPartitions {
partStatsVersionsMap[partitionID] = partStatsVersions[partitionID]
validPartitionsMap[partitionID] = true
}
validPartitions = collInfo.Partitions
}
for _, partitionID := range validPartitions {
currentPartitionStatsVersion := h.s.meta.partitionStatsMeta.GetCurrentPartitionStatsVersion(channel.GetCollectionID(), partitionID, channel.GetName())
partStatsVersionsMap[partitionID] = currentPartitionStatsVersion
validPartitionsMap[common.AllPartitionsID] = true
} else {
partStatsVersionsMap = partStatsVersions
}

var (
flushedIDs = make(typeutil.UniqueSet)
droppedIDs = make(typeutil.UniqueSet)
growingIDs = make(typeutil.UniqueSet)
levelZeroIDs = make(typeutil.UniqueSet)
newFlushedIDs = make(typeutil.UniqueSet)
flushedIDs = make(typeutil.UniqueSet)
droppedIDs = make(typeutil.UniqueSet)
growingIDs = make(typeutil.UniqueSet)
levelZeroIDs = make(typeutil.UniqueSet)
)

// cannot use GetSegmentsByChannel since dropped segments are needed here
Expand All @@ -138,6 +147,9 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
indexed := typeutil.NewUniqueSet(lo.Map(indexedSegments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })...)

for _, s := range segments {
if filterWithPartition && !validPartitionsMap[s.GetPartitionID()] {
continue
}
if s.GetStartPosition() == nil && s.GetDmlPosition() == nil {
continue
}
Expand Down Expand Up @@ -182,6 +194,41 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
// Retrieve unIndexed expected result:
// unIndexed: c, d
// ================================================

segmentIndexed := func(segID UniqueID) bool {
return indexed.Contain(segID) || validSegmentInfos[segID].GetNumOfRows() < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64()
}

flushedIDs, droppedIDs = retrieveSegment(validSegmentInfos, flushedIDs, droppedIDs, segmentIndexed)

log.Info("GetQueryVChanPositions",
zap.Int64("collectionID", channel.GetCollectionID()),
zap.String("channel", channel.GetName()),
zap.Int("numOfSegments", len(segments)),
zap.Int("result flushed", len(flushedIDs)),
zap.Int("result growing", len(growingIDs)),
zap.Int("result L0", len(levelZeroIDs)),
zap.Any("partition stats", partStatsVersionsMap),
)

return &datapb.VchannelInfo{
CollectionID: channel.GetCollectionID(),
ChannelName: channel.GetName(),
SeekPosition: h.GetChannelSeekPosition(channel, partitionIDs...),
FlushedSegmentIds: flushedIDs.Collect(),
UnflushedSegmentIds: growingIDs.Collect(),
DroppedSegmentIds: droppedIDs.Collect(),
LevelZeroSegmentIds: levelZeroIDs.Collect(),
PartitionStatsVersions: partStatsVersionsMap,
}
}

func retrieveSegment(validSegmentInfos map[int64]*SegmentInfo,
flushedIDs, droppedIDs typeutil.UniqueSet,
segmentIndexed func(segID UniqueID) bool,
) (typeutil.UniqueSet, typeutil.UniqueSet) {
newFlushedIDs := make(typeutil.UniqueSet)

isValid := func(ids ...UniqueID) bool {
for _, id := range ids {
if seg, ok := validSegmentInfos[id]; !ok || seg == nil || seg.GetIsInvisible() {
Expand All @@ -192,7 +239,6 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
}

var compactionFromExist func(segID UniqueID) bool

compactionFromExist = func(segID UniqueID) bool {
compactionFrom := validSegmentInfos[segID].GetCompactionFrom()
if len(compactionFrom) == 0 || !isValid(compactionFrom...) {
Expand All @@ -209,10 +255,6 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
return false
}

segmentIndexed := func(segID UniqueID) bool {
return indexed.Contain(segID) || validSegmentInfos[segID].GetNumOfRows() < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64()
}

retrieve := func() bool {
continueRetrieve := false
for id := range flushedIDs {
Expand All @@ -239,27 +281,73 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
newFlushedIDs = make(typeutil.UniqueSet)
}

flushedIDs = newFlushedIDs
return newFlushedIDs, droppedIDs
}

log.Info("GetQueryVChanPositions",
func (h *ServerHandler) GetCurrentSegmentsView(ctx context.Context, channel RWChannel, partitionIDs ...UniqueID) *SegmentsView {
validPartitions := lo.Filter(partitionIDs, func(partitionID int64, _ int) bool { return partitionID > allPartitionID })
filterWithPartition := len(validPartitions) > 0
validPartitionsMap := make(map[int64]bool)
validPartitionsMap[common.AllPartitionsID] = true
for _, partitionID := range validPartitions {
validPartitionsMap[partitionID] = true
}

var (
flushedIDs = make(typeutil.UniqueSet)
droppedIDs = make(typeutil.UniqueSet)
growingIDs = make(typeutil.UniqueSet)
importingIDs = make(typeutil.UniqueSet)
levelZeroIDs = make(typeutil.UniqueSet)
)

// cannot use GetSegmentsByChannel since dropped segments are needed here
segments := h.s.meta.GetRealSegmentsForChannel(channel.GetName())

validSegmentInfos := make(map[int64]*SegmentInfo)
for _, s := range segments {
if filterWithPartition && !validPartitionsMap[s.GetPartitionID()] {
continue
}
if s.GetStartPosition() == nil && s.GetDmlPosition() == nil {
continue
}

validSegmentInfos[s.GetID()] = s
switch {
case s.GetState() == commonpb.SegmentState_Dropped:
droppedIDs.Insert(s.GetID())
case s.GetState() == commonpb.SegmentState_Importing:
importingIDs.Insert(s.GetID())
case s.GetLevel() == datapb.SegmentLevel_L0:
levelZeroIDs.Insert(s.GetID())
case s.GetState() == commonpb.SegmentState_Growing:
growingIDs.Insert(s.GetID())
default:
flushedIDs.Insert(s.GetID())
}
}

flushedIDs, droppedIDs = retrieveSegment(validSegmentInfos, flushedIDs, droppedIDs, func(segID UniqueID) bool {
return true
})

log.Ctx(ctx).Info("GetCurrentSegmentsView",
zap.Int64("collectionID", channel.GetCollectionID()),
zap.String("channel", channel.GetName()),
zap.Int("numOfSegments", len(segments)),
zap.Int("result flushed", len(flushedIDs)),
zap.Int("result growing", len(growingIDs)),
zap.Int("result importing", len(importingIDs)),
zap.Int("result L0", len(levelZeroIDs)),
zap.Any("partition stats", partStatsVersionsMap),
)

return &datapb.VchannelInfo{
CollectionID: channel.GetCollectionID(),
ChannelName: channel.GetName(),
SeekPosition: h.GetChannelSeekPosition(channel, partitionIDs...),
FlushedSegmentIds: flushedIDs.Collect(),
UnflushedSegmentIds: growingIDs.Collect(),
DroppedSegmentIds: droppedIDs.Collect(),
LevelZeroSegmentIds: levelZeroIDs.Collect(),
PartitionStatsVersions: partStatsVersionsMap,
return &SegmentsView{
FlushedSegmentIDs: flushedIDs.Collect(),
GrowingSegmentIDs: growingIDs.Collect(),
DroppedSegmentIDs: droppedIDs.Collect(),
L0SegmentIDs: levelZeroIDs.Collect(),
ImportingSegmentIDs: importingIDs.Collect(),
}
}

Expand Down
Loading

0 comments on commit a348122

Please sign in to comment.