Skip to content

Commit

Permalink
fix: [10kcp] Fix slow dist handle and slow observe (#38567)
Browse files Browse the repository at this point in the history
1. Provide partition-level indexing in the collection target.
2. Make SegmentAction not wait for distribution.
3. Optimize logging to reduce CPU overhead.

issue: #37630

pr: #38566

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Dec 18, 2024
1 parent 999437e commit ca234e7
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 65 deletions.
38 changes: 29 additions & 9 deletions internal/querycoordv2/meta/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,52 @@ import (

// CollectionTarget collection target is immutable,
type CollectionTarget struct {
segments map[int64]*datapb.SegmentInfo
dmChannels map[string]*DmChannel
partitions typeutil.Set[int64] // stores target partitions info
version int64
segments map[int64]*datapb.SegmentInfo
partition2Segments map[int64][]*datapb.SegmentInfo
dmChannels map[string]*DmChannel
partitions typeutil.Set[int64] // stores target partitions info
version int64
}

func NewCollectionTarget(segments map[int64]*datapb.SegmentInfo, dmChannels map[string]*DmChannel, partitionIDs []int64) *CollectionTarget {
partition2Segments := make(map[int64][]*datapb.SegmentInfo, len(partitionIDs))
for _, segment := range segments {
partitionID := segment.GetPartitionID()
if _, ok := partition2Segments[partitionID]; !ok {
partition2Segments[partitionID] = make([]*datapb.SegmentInfo, 0)
}
partition2Segments[partitionID] = append(partition2Segments[partitionID], segment)
}
return &CollectionTarget{
segments: segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitionIDs...),
version: time.Now().UnixNano(),
segments: segments,
partition2Segments: partition2Segments,
dmChannels: dmChannels,
partitions: typeutil.NewSet(partitionIDs...),
version: time.Now().UnixNano(),
}
}

func FromPbCollectionTarget(target *querypb.CollectionTarget) *CollectionTarget {
segments := make(map[int64]*datapb.SegmentInfo)
dmChannels := make(map[string]*DmChannel)
partition2Segments := make(map[int64][]*datapb.SegmentInfo)
var partitions []int64

for _, t := range target.GetChannelTargets() {
for _, partition := range t.GetPartitionTargets() {
if _, ok := partition2Segments[partition.GetPartitionID()]; !ok {
partition2Segments[partition.GetPartitionID()] = make([]*datapb.SegmentInfo, 0, len(partition.GetSegments()))
}
for _, segment := range partition.GetSegments() {
segments[segment.GetID()] = &datapb.SegmentInfo{
info := &datapb.SegmentInfo{
ID: segment.GetID(),
Level: segment.GetLevel(),
CollectionID: target.GetCollectionID(),
PartitionID: partition.GetPartitionID(),
InsertChannel: t.GetChannelName(),
}
segments[segment.GetID()] = info
partition2Segments[partition.GetPartitionID()] = append(partition2Segments[partition.GetPartitionID()], info)
}
partitions = append(partitions, partition.GetPartitionID())
}
Expand Down Expand Up @@ -137,6 +153,10 @@ func (p *CollectionTarget) GetAllSegments() map[int64]*datapb.SegmentInfo {
return p.segments
}

func (p *CollectionTarget) GetPartitionSegments(partitionID int64) []*datapb.SegmentInfo {
return p.partition2Segments[partitionID]
}

func (p *CollectionTarget) GetTargetVersion() int64 {
return p.version
}
Expand Down
6 changes: 2 additions & 4 deletions internal/querycoordv2/meta/target_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,10 +468,8 @@ func (mgr *TargetManager) GetSealedSegmentsByPartition(collectionID int64,
targets := mgr.getCollectionTarget(scope, collectionID)
for _, t := range targets {
segments := make(map[int64]*datapb.SegmentInfo)
for _, s := range t.GetAllSegments() {
if s.GetPartitionID() == partitionID {
segments[s.GetID()] = s
}
for _, s := range t.GetPartitionSegments(partitionID) {
segments[s.GetID()] = s
}

if len(segments) > 0 {
Expand Down
32 changes: 18 additions & 14 deletions internal/querycoordv2/observers/collection_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,6 @@ func (ob *CollectionObserver) observeChannelStatus(collectionID int64) (int, int
}

func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, partition *meta.Partition, replicaNum int32, channelTargetNum, subChannelCount int) bool {
log := log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).With(
zap.Int64("collectionID", partition.GetCollectionID()),
zap.Int64("partitionID", partition.GetPartitionID()),
)

segmentTargets := ob.targetMgr.GetSealedSegmentsByPartition(partition.GetCollectionID(), partition.GetPartitionID(), meta.NextTarget)

targetNum := len(segmentTargets) + channelTargetNum
Expand All @@ -338,7 +333,9 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa
return false
}

log.RatedInfo(10, "partition targets",
log.Ctx(ctx).WithRateGroup("qcv2.observePartitionLoadStatus", 1, 60).RatedInfo(10, "partition targets",
zap.Int64("collectionID", partition.GetCollectionID()),
zap.Int64("partitionID", partition.GetPartitionID()),
zap.Int("segmentTargetNum", len(segmentTargets)),
zap.Int("channelTargetNum", channelTargetNum),
zap.Int("totalTargetNum", targetNum),
Expand All @@ -356,7 +353,9 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa
loadedCount += len(group)
}
if loadedCount > 0 {
log.Info("partition load progress",
log.Ctx(ctx).Info("partition load progress",
zap.Int64("collectionID", partition.GetCollectionID()),
zap.Int64("partitionID", partition.GetPartitionID()),
zap.Int("subChannelCount", subChannelCount),
zap.Int("loadSegmentCount", loadedCount-subChannelCount))
}
Expand All @@ -370,30 +369,35 @@ func (ob *CollectionObserver) observePartitionLoadStatus(ctx context.Context, pa
ob.partitionLoadedCount[partition.GetPartitionID()] = loadedCount
if loadPercentage == 100 {
if !ob.targetObserver.Check(ctx, partition.GetCollectionID(), partition.PartitionID) {
log.Warn("failed to manual check current target, skip update load status")
log.Ctx(ctx).Warn("failed to manual check current target, skip update load status",
zap.Int64("collectionID", partition.GetCollectionID()),
zap.Int64("partitionID", partition.GetPartitionID()))
return false
}
delete(ob.partitionLoadedCount, partition.GetPartitionID())
}
err := ob.meta.CollectionManager.UpdatePartitionLoadPercent(partition.PartitionID, loadPercentage)
if err != nil {
log.Warn("failed to update partition load percentage")
log.Ctx(ctx).Warn("failed to update partition load percentage",
zap.Int64("collectionID", partition.GetCollectionID()),
zap.Int64("partitionID", partition.GetPartitionID()))
}
log.Info("partition load status updated",
log.Ctx(ctx).Info("partition load status updated",
zap.Int64("collectionID", partition.GetCollectionID()),
zap.Int64("partitionID", partition.GetPartitionID()),
zap.Int32("partitionLoadPercentage", loadPercentage),
)
eventlog.Record(eventlog.NewRawEvt(eventlog.Level_Info, fmt.Sprintf("partition %d load percentage update: %d", partition.PartitionID, loadPercentage)))
return true
}

func (ob *CollectionObserver) observeCollectionLoadStatus(ctx context.Context, collectionID int64) {
log := log.Ctx(ctx).With(zap.Int64("collectionID", collectionID))

collectionPercentage, err := ob.meta.CollectionManager.UpdateCollectionLoadPercent(collectionID)
if err != nil {
log.Warn("failed to update collection load percentage")
log.Ctx(ctx).Warn("failed to update collection load percentage", zap.Int64("collectionID", collectionID))
}
log.Info("collection load status updated",
log.Ctx(ctx).Info("collection load status updated",
zap.Int64("collectionID", collectionID),
zap.Int32("collectionLoadPercentage", collectionPercentage),
)
if collectionPercentage == 100 {
Expand Down
16 changes: 1 addition & 15 deletions internal/querycoordv2/task/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,21 +118,7 @@ func (action *SegmentAction) Scope() querypb.DataScope {
func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool {
if action.Type() == ActionTypeGrow {
// rpc finished
if !action.rpcReturned.Load() {
return false
}

// segment found in leader view
views := distMgr.LeaderViewManager.GetByFilter(
meta.WithChannelName2LeaderView(action.Shard()),
meta.WithSegment2LeaderView(action.segmentID, false))
if len(views) == 0 {
return false
}

// segment found in dist
segmentInTargetNode := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node()), meta.WithSegmentID(action.SegmentID()))
return len(segmentInTargetNode) > 0
return action.rpcReturned.Load()
} else if action.Type() == ActionTypeReduce {
// FIXME: Now shard leader's segment view is a map of segment ID to node ID,
// loading segment replaces the node ID with the new one,
Expand Down
11 changes: 5 additions & 6 deletions internal/querycoordv2/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,8 @@ func (scheduler *taskScheduler) schedule(node int64) {
scheduler.remove(task)
}

scheduler.updateTaskMetrics()

log.Info("processed tasks",
zap.Int("toProcessNum", len(toProcess)),
zap.Int32("committedNum", commmittedNum.Load()),
Expand Down Expand Up @@ -668,10 +670,6 @@ func (scheduler *taskScheduler) isRelated(task Task, node int64) bool {
// return true if the task should be executed,
// false otherwise
func (scheduler *taskScheduler) preProcess(task Task) bool {
log := log.Ctx(scheduler.ctx).WithRateGroup("qcv2.taskScheduler", 1, 60).With(
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("taskID", task.ID()),
)
if task.Status() != TaskStatusStarted {
return false
}
Expand All @@ -694,7 +692,9 @@ func (scheduler *taskScheduler) preProcess(task Task) bool {
}

if !ready {
log.RatedInfo(30, "Blocking reduce action in balance channel task")
log.Ctx(scheduler.ctx).WithRateGroup("qcv2.taskScheduler", 1, 60).RatedInfo(30, "Blocking reduce action in balance channel task",
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("taskID", task.ID()))
break
}
}
Expand Down Expand Up @@ -813,7 +813,6 @@ func (scheduler *taskScheduler) remove(task Task) {
log = log.With(zap.Int64("segmentID", task.SegmentID()))
}

scheduler.updateTaskMetrics()
log.Info("task removed")

if scheduler.meta.Exist(task.CollectionID()) {
Expand Down
16 changes: 0 additions & 16 deletions internal/querycoordv2/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1135,22 +1135,6 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
suite.dispatchAndWait(targetNode)
suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum)

// Process tasks done
// Dist contains channels, first task stale
view := &meta.LeaderView{
ID: targetNode,
CollectionID: suite.collection,
Segments: map[int64]*querypb.SegmentDist{},
Channel: channel.ChannelName,
}
for _, segment := range suite.loadSegments[1:] {
view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0}
}
distSegments := lo.Map(segments, func(info *datapb.SegmentInfo, _ int) *meta.Segment {
return meta.SegmentFromInfo(info)
})
suite.dist.LeaderViewManager.Update(targetNode, view)
suite.dist.SegmentDistManager.Update(targetNode, distSegments...)
segments = make([]*datapb.SegmentInfo, 0)
for _, segment := range suite.loadSegments[1:] {
segments = append(segments, &datapb.SegmentInfo{
Expand Down
5 changes: 4 additions & 1 deletion internal/querynodev2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -1245,7 +1245,10 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
growingSegments[entry.SegmentID] = &msgpb.MsgPosition{}
continue
}
growingSegments[entry.SegmentID] = segment.StartPosition()
// QueryCoord only requires the timestamp from the position.
growingSegments[entry.SegmentID] = &msgpb.MsgPosition{
Timestamp: segment.StartPosition().GetTimestamp(),
}
numOfGrowingRows += segment.InsertCount()
}

Expand Down

0 comments on commit ca234e7

Please sign in to comment.