Skip to content

Commit

Permalink
enhance: Avoid to iterate whole segment list for each task's process (#…
Browse files Browse the repository at this point in the history
…33943)

when querycoord process segment task, it will try to iterate whole
segment list to checke whether segment is loaded, which cost too much
cpu if there has thousands of segments.

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Jun 19, 2024
1 parent 298e50b commit 0294595
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 24 deletions.
17 changes: 0 additions & 17 deletions internal/querycoordv2/meta/segment_dist_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,20 +227,3 @@ func (m *SegmentDistManager) GetByFilter(filters ...SegmentDistFilter) []*Segmen
}
return ret
}

// return node list which contains the given segmentID
func (m *SegmentDistManager) GetSegmentDist(segmentID int64) []int64 {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()

ret := make([]int64, 0)
for nodeID, segments := range m.segments {
for _, segment := range segments.segments {
if segment.GetID() == segmentID {
ret = append(ret, nodeID)
break
}
}
}
return ret
}
17 changes: 13 additions & 4 deletions internal/querycoordv2/task/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,20 @@ func (action *SegmentAction) Scope() querypb.DataScope {

func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool {
if action.Type() == ActionTypeGrow {
// rpc finished
if !action.rpcReturned.Load() {
return false
}

// segment found in leader view
views := distMgr.LeaderViewManager.GetByFilter(meta.WithSegment2LeaderView(action.segmentID, false))
nodeSegmentDist := distMgr.SegmentDistManager.GetSegmentDist(action.SegmentID())
return len(views) > 0 &&
lo.Contains(nodeSegmentDist, action.Node()) &&
action.rpcReturned.Load()
if len(views) == 0 {
return false
}

// segment found in dist
segmentInTargetNode := distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(action.Node()), meta.WithSegmentID(action.SegmentID()))
return len(segmentInTargetNode) > 0
} else if action.Type() == ActionTypeReduce {
// FIXME: Now shard leader's segment view is a map of segment ID to node ID,
// loading segment replaces the node ID with the new one,
Expand Down
8 changes: 5 additions & 3 deletions internal/querycoordv2/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,11 @@ func (scheduler *taskScheduler) preAdd(task Task) error {

if taskType == TaskTypeMove {
views := scheduler.distMgr.LeaderViewManager.GetByFilter(meta.WithSegment2LeaderView(task.SegmentID(), false))
nodeSegmentDist := scheduler.distMgr.SegmentDistManager.GetSegmentDist(task.SegmentID())
if len(views) == 0 ||
!lo.Contains(nodeSegmentDist, task.Actions()[1].Node()) {
if len(views) == 0 {
return merr.WrapErrServiceInternal("segment's delegator not found, stop balancing")
}
segmentInTargetNode := scheduler.distMgr.SegmentDistManager.GetByFilter(meta.WithNodeID(task.Actions()[1].Node()), meta.WithSegmentID(task.SegmentID()))
if len(segmentInTargetNode) == 0 {
return merr.WrapErrServiceInternal("source segment released, stop balancing")
}
}
Expand Down

0 comments on commit 0294595

Please sign in to comment.