Skip to content

Commit

Permalink
fix: Segment unbalance after many times load/release (#36537)
Browse files Browse the repository at this point in the history
issue: #36536
query coord use `segmentTaskDeleta/channelTaskDelta` to measure the
executing workload for querynode in scheduler, and we maintains the
`segmentTaskDeleta/channelTaskDelta` by `scheulder.Add(task)` and
`scheduler.remove(task)`, but `scheduler.remove(task)` has been called
in unexpected way, which cause a wrong
`segmentTaskDeleta/channelTaskDelta` value and affect the segment assign
logic, causes segment unbalance.

This PR moves to compute the `segmentTaskDeleta/channelTaskDelta` when
access, to avoid the wrong value affect.

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Sep 26, 2024
1 parent 0799d92 commit 5dfa1c3
Showing 1 changed file with 50 additions and 76 deletions.
126 changes: 50 additions & 76 deletions internal/querycoordv2/task/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,6 @@ type taskScheduler struct {
channelTasks map[replicaChannelIndex]Task
processQueue *taskQueue
waitQueue *taskQueue

// executing task delta changes on node: nodeID -> collectionID -> delta changes
// delta changes measure by segment row count and channel num
segmentExecutingTaskDelta map[int64]map[int64]int
channelExecutingTaskDelta map[int64]map[int64]int
}

func NewScheduler(ctx context.Context,
Expand All @@ -198,13 +193,11 @@ func NewScheduler(ctx context.Context,
cluster: cluster,
nodeMgr: nodeMgr,

tasks: make(UniqueSet),
segmentTasks: make(map[replicaSegmentIndex]Task),
channelTasks: make(map[replicaChannelIndex]Task),
processQueue: newTaskQueue(),
waitQueue: newTaskQueue(),
segmentExecutingTaskDelta: make(map[int64]map[int64]int),
channelExecutingTaskDelta: make(map[int64]map[int64]int),
tasks: make(UniqueSet),
segmentTasks: make(map[replicaSegmentIndex]Task),
channelTasks: make(map[replicaChannelIndex]Task),
processQueue: newTaskQueue(),
waitQueue: newTaskQueue(),
}
}

Expand All @@ -217,8 +210,6 @@ func (scheduler *taskScheduler) Stop() {
for nodeID, executor := range scheduler.executors {
executor.Stop()
delete(scheduler.executors, nodeID)
delete(scheduler.segmentExecutingTaskDelta, nodeID)
delete(scheduler.channelExecutingTaskDelta, nodeID)
}

for _, task := range scheduler.segmentTasks {
Expand All @@ -244,8 +235,6 @@ func (scheduler *taskScheduler) AddExecutor(nodeID int64) {
scheduler.cluster,
scheduler.nodeMgr)

scheduler.segmentExecutingTaskDelta[nodeID] = make(map[int64]int)
scheduler.channelExecutingTaskDelta[nodeID] = make(map[int64]int)
scheduler.executors[nodeID] = executor
executor.Start(scheduler.ctx)
log.Info("add executor for new QueryNode", zap.Int64("nodeID", nodeID))
Expand All @@ -259,8 +248,6 @@ func (scheduler *taskScheduler) RemoveExecutor(nodeID int64) {
if ok {
executor.Stop()
delete(scheduler.executors, nodeID)
delete(scheduler.segmentExecutingTaskDelta, nodeID)
delete(scheduler.channelExecutingTaskDelta, nodeID)
log.Info("remove executor of offline QueryNode", zap.Int64("nodeID", nodeID))
}
}
Expand Down Expand Up @@ -293,52 +280,11 @@ func (scheduler *taskScheduler) Add(task Task) error {
}

scheduler.updateTaskMetrics()
scheduler.updateTaskDelta(task)

log.Ctx(task.Context()).Info("task added", zap.String("task", task.String()))
task.RecordStartTs()
return nil
}

func (scheduler *taskScheduler) updateTaskDelta(task Task) {
var delta int
var deltaMap map[int64]map[int64]int
switch task := task.(type) {
case *SegmentTask:
// skip growing segment's count, cause doesn't know realtime row number of growing segment
if task.Actions()[0].(*SegmentAction).Scope() == querypb.DataScope_Historical {
segment := scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.NextTargetFirst)
if segment != nil {
delta = int(segment.GetNumOfRows())
}
}

deltaMap = scheduler.segmentExecutingTaskDelta

case *ChannelTask:
delta = 1
deltaMap = scheduler.channelExecutingTaskDelta
}

// turn delta to negative when try to remove task
if task.Status() == TaskStatusSucceeded || task.Status() == TaskStatusFailed || task.Status() == TaskStatusCanceled {
delta = -delta
}

if delta != 0 {
for _, action := range task.Actions() {
if deltaMap[action.Node()] == nil {
deltaMap[action.Node()] = make(map[int64]int)
}
if action.Type() == ActionTypeGrow {
deltaMap[action.Node()][task.CollectionID()] += delta
} else if action.Type() == ActionTypeReduce {
deltaMap[action.Node()][task.CollectionID()] -= delta
}
}
}
}

func (scheduler *taskScheduler) updateTaskMetrics() {
segmentGrowNum, segmentReduceNum, segmentMoveNum := 0, 0, 0
channelGrowNum, channelReduceNum, channelMoveNum := 0, 0, 0
Expand Down Expand Up @@ -533,34 +479,63 @@ func (scheduler *taskScheduler) GetSegmentTaskDelta(nodeID, collectionID int64)
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()

return scheduler.calculateTaskDelta(nodeID, collectionID, scheduler.segmentExecutingTaskDelta)
targetActions := make([]Action, 0)
for _, t := range scheduler.segmentTasks {
if collectionID != -1 && collectionID != t.CollectionID() {
continue
}
for _, action := range t.Actions() {
if action.Node() == nodeID {
targetActions = append(targetActions, action)
}
}
}

return scheduler.calculateTaskDelta(collectionID, targetActions)
}

func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64) int {
scheduler.rwmutex.RLock()
defer scheduler.rwmutex.RUnlock()

return scheduler.calculateTaskDelta(nodeID, collectionID, scheduler.channelExecutingTaskDelta)
}

func (scheduler *taskScheduler) calculateTaskDelta(nodeID, collectionID int64, deltaMap map[int64]map[int64]int) int {
if nodeID == -1 && collectionID == -1 {
return 0
}

sum := 0
for nid, nInfo := range deltaMap {
if nid != nodeID && -1 != nodeID {
targetActions := make([]Action, 0)
for _, t := range scheduler.channelTasks {
if collectionID != -1 && collectionID != t.CollectionID() {
continue
}

for cid, cInfo := range nInfo {
if cid == collectionID || -1 == collectionID {
sum += cInfo
for _, action := range t.Actions() {
if action.Node() == nodeID {
targetActions = append(targetActions, action)
}
}
}

return scheduler.calculateTaskDelta(collectionID, targetActions)
}

func (scheduler *taskScheduler) calculateTaskDelta(collectionID int64, targetActions []Action) int {
sum := 0
for _, action := range targetActions {
delta := 0
if action.Type() == ActionTypeGrow {
delta = 1
} else if action.Type() == ActionTypeReduce {
delta = -1
}

switch action := action.(type) {
case *SegmentAction:
// skip growing segment's count, cause doesn't know realtime row number of growing segment
if action.Scope() == querypb.DataScope_Historical {
segment := scheduler.targetMgr.GetSealedSegment(collectionID, action.segmentID, meta.NextTargetFirst)
if segment != nil {
sum += int(segment.GetNumOfRows()) * delta
}
}
case *ChannelAction:
sum += delta
}
}
return sum
}

Expand Down Expand Up @@ -836,7 +811,6 @@ func (scheduler *taskScheduler) remove(task Task) {
log = log.With(zap.Int64("segmentID", task.SegmentID()))
}

scheduler.updateTaskDelta(task)
scheduler.updateTaskMetrics()
log.Info("task removed")

Expand Down

0 comments on commit 5dfa1c3

Please sign in to comment.