Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix slow dist handle and slow observe #38566

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions internal/querycoordv2/meta/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,36 +30,52 @@ import (

// CollectionTarget collection target is immutable,
type CollectionTarget struct {
segments map[int64]*datapb.SegmentInfo
dmChannels map[string]*DmChannel
partitions typeutil.Set[int64] // stores target partitions info
version int64
segments map[int64]*datapb.SegmentInfo
partition2Segments map[int64][]*datapb.SegmentInfo
dmChannels map[string]*DmChannel
partitions typeutil.Set[int64] // stores target partitions info
version int64
}

func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget {
partition2Segments := make(map[int64][]*datapb.SegmentInfo, len(partitionIDs))
for _, segment := range segments {
partitionID := segment.GetPartitionID()
if _, ok := partition2Segments[partitionID]; !ok {
partition2Segments[partitionID] = make([]*datapb.SegmentInfo, 0)
}
partition2Segments[partitionID] = append(partition2Segments[partitionID], segment)
}
return &CollectionTarget{
segments: segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitionIDs...),
version: time.Now().UnixNano(),
segments: segments,
partition2Segments: partition2Segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitionIDs...),
version: time.Now().UnixNano(),
}
}

func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget {
segments := make(map[int64]*datapb.SegmentInfo)
dmChannels := make(map[string]*DmChannel)
partition2Segments := make(map[int64][]*datapb.SegmentInfo)
var partitions []int64

for _, t := range target.GetChannelTargets() {
for _, partition := range t.GetPartitionTargets() {
if _, ok := partition2Segments[partition.GetPartitionID()]; !ok {
partition2Segments[partition.GetPartitionID()] = make([]*datapb.SegmentInfo, 0, len(partition.GetSegments()))
}
for _, segment := range partition.GetSegments() {
segments[segment.GetID()] = &datapb.SegmentInfo{
info := &datapb.SegmentInfo{
ID: segment.GetID(),
Level: segment.GetLevel(),
CollectionID: target.GetCollectionID(),
PartitionID: partition.GetPartitionID(),
InsertChannel: t.GetChannelName(),
}
segments[segment.GetID()] = info
partition2Segments[partition.GetPartitionID()] = append(partition2Segments[partition.GetPartitionID()], info)
bigsheeper marked this conversation as resolved.
Show resolved Hide resolved
}
partitions = append(partitions, partition.GetPartitionID())
}
Expand Down Expand Up @@ -139,6 +155,10 @@ func (p *CollectionTarget) GetAllSegments() map[int64]*datapb.SegmentInfo {
return p.segments
}

func (p *CollectionTarget) GetPartitionSegments(partitionID int64) []*datapb.SegmentInfo {
return p.partition2Segments[partitionID]
}

func (p *CollectionTarget) GetTargetVersion() int64 {
return p.version
}
Expand Down
6 changes: 2 additions & 4 deletions internal/querycoordv2/meta/target_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,10 +467,8 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(ctx context.Context, coll
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
segments := make(map[int64]*datapb.SegmentInfo)
for _, s := range t.GetAllSegments() {
if s.GetPartitionID() == partitionID {
segments[s.GetID()] = s
}
for _, s := range t.GetPartitionSegments(partitionID) {
segments[s.GetID()] = s
}

if len(segments) > 0 {
Expand Down
32 changes: 18 additions & 14 deletions internal/querycoordv2/observers/collection_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,6 @@ func (ob *CollectionObserver) observeChannelStatus(ctx context.Context, collecti
}

func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, partition *meta.Partition, replicaNum int32, channelTargetNum, subChannelCount int) bool {
log := log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).With(
zap.Int64("collectionID", partition.GetCollectionID()),
zap.Int64("partitionID", partition.GetPartitionID()),
)

segmentTargets := ob.targetMgr.GetSealedSegmentsByPartition(ctx, partition.GetCollectionID(), partition.GetPartitionID(), meta.NextTarget)

targetNum := len(segmentTargets) + channelTargetNum
Expand All @@ -338,7 +333,9 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa
return false
}

log.RatedInfo(10, "partition targets",
log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).RatedInfo(10, "partition targets",
zap.Int64("collectionID", partition.GetCollectionID()),
zap.Int64("partitionID", partition.GetPartitionID()),
zap.Int("segmentTargetNum", len(segmentTargets)),
zap.Int("channelTargetNum", channelTargetNum),
zap.Int("totalTargetNum", targetNum),
Expand All @@ -356,7 +353,9 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa
loadedCount += len(group)
}
if loadedCount > 0 {
log.Info("partition load progress",
log.Ctx(ctx).Info("partition load progress",
zap.Int64("collectionID", partition.GetCollectionID()),
zap.Int64("partitionID", partition.GetPartitionID()),
zap.Int("subChannelCount", subChannelCount),
zap.Int("loadSegmentCount", loadedCount-subChannelCount))
}
Expand All @@ -370,30 +369,35 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa
ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount
if loadPercentage == 100 {
if !ob.targetObserver.Check(ctx, partition.GetCollectionID(), partition.PartitionID) {
log.Warn("failed to manual check current target, skip update load status")
log.Ctx(ctx).Warn("failed to manual check current target, skip update load status",
zap.Int64("collectionID", partition.GetCollectionID()),
zap.Int64("partitionID", partition.GetPartitionID()))
return false
}
delete(ob.partitionLoadedCount, partition.GetPartitionID())
}
err := ob.meta.CollectionManager.UpdatePartitionLoadPercent(ctx, partition.PartitionID, loadPercentage)
if err != nil {
log.Warn("failed to update partition load percentage")
log.Ctx(ctx).Warn("failed to update partition load percentage",
zap.Int64("collectionID", partition.GetCollectionID()),
zap.Int64("partitionID", partition.GetPartitionID()))
}
log.Info("partition load status updated",
log.Ctx(ctx).Info("partition load status updated",
zap.Int64("collectionID", partition.GetCollectionID()),
zap.Int64("partitionID", partition.GetPartitionID()),
zap.Int32("partitionLoadPercentage", loadPercentage),
)
eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("partition %d load percentage update: %d", partition.PartitionID, loadPercentage)))
return true
}

func (ob *CollectionObserver) observeCollectionLoadStatus(ctx context.Context, collectionID int64) {
log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID))

collectionPercentage, err := ob.meta.CollectionManager.UpdateCollectionLoadPercent(ctx, collectionID)
if err != nil {
log.Warn("failed to update collection load percentage")
log.Ctx(ctx).Warn("failed to update collection load percentage", zap.Int64("collectionID", collectionID))
}
log.Info("collection load status updated",
log.Ctx(ctx).Info("collection load status updated",
zap.Int64("collectionID", collectionID),
zap.Int32("collectionLoadPercentage", collectionPercentage),
)
if collectionPercentage == 100 {
Expand Down
16 changes: 1 addition & 15 deletions internal/querycoordv2/task/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,7 @@ func (action *SegmentAction) GetScope() querypb.DataScope {
func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool {
if action.Type() == ActionTypeGrow {
// rpc finished
if !action.rpcReturned.Load() {
return false
}

// segment found in leader view
views := distMgr.LeaderViewManager.GetByFilter(
meta.WithChannelName2LeaderView(action.Shard),
meta.WithSegment2LeaderView(action.SegmentID, false))
if len(views) == 0 {
return false
}

// segment found in dist
segmentInTargetNode := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node()), meta.WithSegmentID(action.SegmentID))
return len(segmentInTargetNode) > 0
return action.rpcReturned.Load()
} else if action.Type() == ActionTypeReduce {
// FIXME: Now shard leader's segment view is a map of segment ID to node ID,
// loading segment replaces the node ID with the new one,
Expand Down
11 changes: 5 additions & 6 deletions internal/querycoordv2/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,8 @@ func (scheduler *taskScheduler) schedule(node int64) {
scheduler.remove(task)
}

scheduler.updateTaskMetrics()

log.Info("processed tasks",
zap.Int("toProcessNum", len(toProcess)),
zap.Int32("committedNum", commmittedNum.Load()),
Expand Down Expand Up @@ -736,10 +738,6 @@ func (scheduler *taskScheduler) isRelated(task Task, node int64) bool {
// return true if the task should be executed,
// false otherwise
func (scheduler *taskScheduler) preProcess(task Task) bool {
log := log.Ctx(scheduler.ctx).WithRateGroup("qcv2.taskScheduler", 1, 60).With(
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("taskID", task.ID()),
)
if task.Status() != TaskStatusStarted {
return false
}
Expand All @@ -762,7 +760,9 @@ func (scheduler *taskScheduler) preProcess(task Task) bool {
}

if !ready {
log.RatedInfo(30, "Blocking reduce action in balance channel task")
log.Ctx(scheduler.ctx).WithRateGroup("qcv2.taskScheduler", 1, 60).RatedInfo(30, "Blocking reduce action in balance channel task",
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("taskID", task.ID()))
break
}
}
Expand Down Expand Up @@ -881,7 +881,6 @@ func (scheduler *taskScheduler) remove(task Task) {
log = log.With(zap.Int64("segmentID", task.SegmentID()))
}

scheduler.updateTaskMetrics()
log.Info("task removed")

if scheduler.meta.Exist(task.Context(), task.CollectionID()) {
Expand Down
16 changes: 0 additions & 16 deletions internal/querycoordv2/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1141,22 +1141,6 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
suite.dispatchAndWait(targetNode)
suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum)

// Process tasks done
// Dist contains channels, first task stale
view := &meta.LeaderView{
ID: targetNode,
CollectionID: suite.collection,
Segments: map[int64]*querypb.SegmentDist{},
Channel: channel.ChannelName,
}
for _, segment := range suite.loadSegments[1:] {
view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0}
}
distSegments := lo.Map(segments, func(info *datapb.SegmentInfo, _ int) *meta.Segment {
return meta.SegmentFromInfo(info)
})
suite.dist.LeaderViewManager.Update(targetNode, view)
suite.dist.SegmentDistManager.Update(targetNode, distSegments...)
segments = make([]*datapb.SegmentInfo, 0)
for _, segment := range suite.loadSegments[1:] {
segments = append(segments, &datapb.SegmentInfo{
Expand Down
5 changes: 4 additions & 1 deletion internal/querynodev2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,10 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
growingSegments[entry.SegmentID] = &msgpb.MsgPosition{}
continue
}
growingSegments[entry.SegmentID] = segment.StartPosition()
// QueryCoord only requires the timestamp from the position.
growingSegments[entry.SegmentID] = &msgpb.MsgPosition{
Timestamp: segment.StartPosition().GetTimestamp(),
}
numOfGrowingRows += segment.InsertCount()
}

Expand Down
Loading