Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Support get segments from current segments view #38512

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 121 additions & 33 deletions internal/datacoord/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/retry"
Expand All @@ -42,6 +43,15 @@
CheckShouldDropChannel(ch string) bool
FinishDropChannel(ch string, collectionID int64) error
GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error)
GetCurrentSegmentsView(ctx context.Context, channel RWChannel, partitionIDs ...UniqueID) *SegmentsView
}

type SegmentsView struct {
FlushedSegmentIDs []int64
GrowingSegmentIDs []int64
DroppedSegmentIDs []int64
L0SegmentIDs []int64
ImportingSegmentIDs []int64
}

// ServerHandler is a helper of Server
Expand Down Expand Up @@ -107,27 +117,26 @@
// dropped segmentIDs ---> dropped segments
// level zero segmentIDs ---> L0 segments
func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs ...UniqueID) *datapb.VchannelInfo {
partStatsVersionsMap := make(map[int64]int64)
validPartitions := lo.Filter(partitionIDs, func(partitionID int64, _ int) bool { return partitionID > allPartitionID })
if len(validPartitions) <= 0 {
collInfo, err := h.s.handler.GetCollection(h.s.ctx, channel.GetCollectionID())
if err != nil || collInfo == nil {
log.Warn("collectionInfo is nil")
return nil
filterWithPartition := len(validPartitions) > 0
validPartitionsMap := make(map[int64]bool)
partStatsVersions := h.s.meta.partitionStatsMeta.GetChannelPartitionsStatsVersion(channel.GetCollectionID(), channel.GetName())
partStatsVersionsMap := make(map[int64]int64)
if filterWithPartition {
for _, partitionID := range validPartitions {
partStatsVersionsMap[partitionID] = partStatsVersions[partitionID]
validPartitionsMap[partitionID] = true
}
validPartitions = collInfo.Partitions
}
for _, partitionID := range validPartitions {
currentPartitionStatsVersion := h.s.meta.partitionStatsMeta.GetCurrentPartitionStatsVersion(channel.GetCollectionID(), partitionID, channel.GetName())
partStatsVersionsMap[partitionID] = currentPartitionStatsVersion
validPartitionsMap[common.AllPartitionsID] = true
} else {
partStatsVersionsMap = partStatsVersions
}

var (
flushedIDs = make(typeutil.UniqueSet)
droppedIDs = make(typeutil.UniqueSet)
growingIDs = make(typeutil.UniqueSet)
levelZeroIDs = make(typeutil.UniqueSet)
newFlushedIDs = make(typeutil.UniqueSet)
flushedIDs = make(typeutil.UniqueSet)
droppedIDs = make(typeutil.UniqueSet)
growingIDs = make(typeutil.UniqueSet)
levelZeroIDs = make(typeutil.UniqueSet)
)

// cannot use GetSegmentsByChannel since dropped segments are needed here
Expand All @@ -138,6 +147,9 @@
indexed := typeutil.NewUniqueSet(lo.Map(indexedSegments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })...)

for _, s := range segments {
if filterWithPartition && !validPartitionsMap[s.GetPartitionID()] {
continue
}
if s.GetStartPosition() == nil && s.GetDmlPosition() == nil {
continue
}
Expand Down Expand Up @@ -182,6 +194,41 @@
// Retrieve unIndexed expected result:
// unIndexed: c, d
// ================================================

segmentIndexed := func(segID UniqueID) bool {
return indexed.Contain(segID) || validSegmentInfos[segID].GetNumOfRows() < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64()
}

flushedIDs, droppedIDs = retrieveSegment(validSegmentInfos, flushedIDs, droppedIDs, segmentIndexed)

log.Info("GetQueryVChanPositions",
zap.Int64("collectionID", channel.GetCollectionID()),
zap.String("channel", channel.GetName()),
zap.Int("numOfSegments", len(segments)),
zap.Int("result flushed", len(flushedIDs)),
zap.Int("result growing", len(growingIDs)),
zap.Int("result L0", len(levelZeroIDs)),
zap.Any("partition stats", partStatsVersionsMap),
)

return &datapb.VchannelInfo{
CollectionID: channel.GetCollectionID(),
ChannelName: channel.GetName(),
SeekPosition: h.GetChannelSeekPosition(channel, partitionIDs...),
FlushedSegmentIds: flushedIDs.Collect(),
UnflushedSegmentIds: growingIDs.Collect(),
DroppedSegmentIds: droppedIDs.Collect(),
LevelZeroSegmentIds: levelZeroIDs.Collect(),
PartitionStatsVersions: partStatsVersionsMap,
}
}

func retrieveSegment(validSegmentInfos map[int64]*SegmentInfo,
flushedIDs, droppedIDs typeutil.UniqueSet,
segmentIndexed func(segID UniqueID) bool,
) (typeutil.UniqueSet, typeutil.UniqueSet) {
newFlushedIDs := make(typeutil.UniqueSet)

isValid := func(ids ...UniqueID) bool {
for _, id := range ids {
if seg, ok := validSegmentInfos[id]; !ok || seg == nil || seg.GetIsInvisible() {
Expand All @@ -192,7 +239,6 @@
}

var compactionFromExist func(segID UniqueID) bool

compactionFromExist = func(segID UniqueID) bool {
compactionFrom := validSegmentInfos[segID].GetCompactionFrom()
if len(compactionFrom) == 0 || !isValid(compactionFrom...) {
Expand All @@ -209,10 +255,6 @@
return false
}

segmentIndexed := func(segID UniqueID) bool {
return indexed.Contain(segID) || validSegmentInfos[segID].GetNumOfRows() < Params.DataCoordCfg.MinSegmentNumRowsToEnableIndex.GetAsInt64()
}

retrieve := func() bool {
continueRetrieve := false
for id := range flushedIDs {
Expand All @@ -239,27 +281,73 @@
newFlushedIDs = make(typeutil.UniqueSet)
}

flushedIDs = newFlushedIDs
return newFlushedIDs, droppedIDs
}

log.Info("GetQueryVChanPositions",
func (h *ServerHandler) GetCurrentSegmentsView(ctx context.Context, channel RWChannel, partitionIDs ...UniqueID) *SegmentsView {
czs007 marked this conversation as resolved.
Show resolved Hide resolved
validPartitions := lo.Filter(partitionIDs, func(partitionID int64, _ int) bool { return partitionID > allPartitionID })
filterWithPartition := len(validPartitions) > 0
validPartitionsMap := make(map[int64]bool)
validPartitionsMap[common.AllPartitionsID] = true
for _, partitionID := range validPartitions {
validPartitionsMap[partitionID] = true
}

var (
flushedIDs = make(typeutil.UniqueSet)
droppedIDs = make(typeutil.UniqueSet)
growingIDs = make(typeutil.UniqueSet)
importingIDs = make(typeutil.UniqueSet)
levelZeroIDs = make(typeutil.UniqueSet)
)

// cannot use GetSegmentsByChannel since dropped segments are needed here
segments := h.s.meta.GetRealSegmentsForChannel(channel.GetName())

validSegmentInfos := make(map[int64]*SegmentInfo)
for _, s := range segments {
if filterWithPartition && !validPartitionsMap[s.GetPartitionID()] {
continue
}
if s.GetStartPosition() == nil && s.GetDmlPosition() == nil {
continue

Check warning on line 313 in internal/datacoord/handler.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/handler.go#L313

Added line #L313 was not covered by tests
}

validSegmentInfos[s.GetID()] = s
switch {
case s.GetState() == commonpb.SegmentState_Dropped:
droppedIDs.Insert(s.GetID())
case s.GetState() == commonpb.SegmentState_Importing:
importingIDs.Insert(s.GetID())
case s.GetLevel() == datapb.SegmentLevel_L0:
levelZeroIDs.Insert(s.GetID())
case s.GetState() == commonpb.SegmentState_Growing:
growingIDs.Insert(s.GetID())
default:
flushedIDs.Insert(s.GetID())
}
}

flushedIDs, droppedIDs = retrieveSegment(validSegmentInfos, flushedIDs, droppedIDs, func(segID UniqueID) bool {
return true
})

log.Ctx(ctx).Info("GetCurrentSegmentsView",
zap.Int64("collectionID", channel.GetCollectionID()),
zap.String("channel", channel.GetName()),
zap.Int("numOfSegments", len(segments)),
zap.Int("result flushed", len(flushedIDs)),
zap.Int("result growing", len(growingIDs)),
zap.Int("result importing", len(importingIDs)),
zap.Int("result L0", len(levelZeroIDs)),
zap.Any("partition stats", partStatsVersionsMap),
)

return &datapb.VchannelInfo{
CollectionID: channel.GetCollectionID(),
ChannelName: channel.GetName(),
SeekPosition: h.GetChannelSeekPosition(channel, partitionIDs...),
FlushedSegmentIds: flushedIDs.Collect(),
UnflushedSegmentIds: growingIDs.Collect(),
DroppedSegmentIds: droppedIDs.Collect(),
LevelZeroSegmentIds: levelZeroIDs.Collect(),
PartitionStatsVersions: partStatsVersionsMap,
return &SegmentsView{
FlushedSegmentIDs: flushedIDs.Collect(),
GrowingSegmentIDs: growingIDs.Collect(),
DroppedSegmentIDs: droppedIDs.Collect(),
L0SegmentIDs: levelZeroIDs.Collect(),
ImportingSegmentIDs: importingIDs.Collect(),
}
}

Expand Down
Loading
Loading