Skip to content

Commit

Permalink
fix: Avoid segment lack caused by deduplicate segment task (#34782)
Browse files Browse the repository at this point in the history
issue: #34781

when balance segment hasn't finished yet, query coord may found 2 loaded
copy of segment, then it will generate task to deduplicate, which may
cancel the balance task. then the old copy has been released, and the
new copy hasn't be ready yet but canceled, then search failed by segment
lack.

this PR set deduplicate segment task's proirity to low, to avoid balance
segment task canceled by deduplicate task.

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Jul 22, 2024
1 parent 52fa668 commit 40e39ef
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 12 deletions.
8 changes: 7 additions & 1 deletion internal/querycoordv2/checkers/segment_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,9 @@ func (c *SegmentChecker) Check(ctx context.Context) []task.Task {
released := utils.FilterReleased(segments, collectionIDs)
reduceTasks := c.createSegmentReduceTasks(ctx, released, meta.NilReplica, querypb.DataScope_Historical)
task.SetReason("collection released", reduceTasks...)
task.SetPriority(task.TaskPriorityNormal, reduceTasks...)
results = append(results, reduceTasks...)
task.SetPriority(task.TaskPriorityNormal, results...)

return results
}

Expand All @@ -114,24 +115,29 @@ func (c *SegmentChecker) checkReplica(ctx context.Context, replica *meta.Replica
// loadCtx := trace.ContextWithSpan(context.Background(), c.meta.GetCollection(replica.CollectionID).LoadSpan)
tasks := c.createSegmentLoadTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), lacks, replica)
task.SetReason("lacks of segment", tasks...)
task.SetPriority(task.TaskPriorityNormal, tasks...)
ret = append(ret, tasks...)

redundancies = c.filterSegmentInUse(replica, redundancies)
tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Historical)
task.SetReason("segment not exists in target", tasks...)
task.SetPriority(task.TaskPriorityNormal, tasks...)
ret = append(ret, tasks...)

// compare inner dists to find repeated loaded segments
redundancies = c.findRepeatedSealedSegments(replica.GetID())
redundancies = c.filterExistedOnLeader(replica, redundancies)
tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Historical)
task.SetReason("redundancies of segment", tasks...)
// set deduplicate task priority to low, to avoid deduplicate task cancel balance task
task.SetPriority(task.TaskPriorityLow, tasks...)
ret = append(ret, tasks...)

// compare with target to find the lack and redundancy of segments
_, redundancies = c.getGrowingSegmentDiff(replica.GetCollectionID(), replica.GetID())
tasks = c.createSegmentReduceTasks(c.getTraceCtx(ctx, replica.GetCollectionID()), redundancies, replica, querypb.DataScope_Streaming)
task.SetReason("streaming segment not exists in target", tasks...)
task.SetPriority(task.TaskPriorityNormal, tasks...)
ret = append(ret, tasks...)

return ret
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/checkers/segment_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (suite *SegmentCheckerTestSuite) TestReleaseRepeatedSegments() {
suite.Equal(task.ActionTypeReduce, action.Type())
suite.EqualValues(1, action.SegmentID())
suite.EqualValues(1, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)

// test less version exist on leader
checker.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{1: 1}, map[int64]*meta.Segment{}))
Expand Down
24 changes: 14 additions & 10 deletions internal/querycoordv2/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (s *Server) balanceSegments(ctx context.Context,
actions = append(actions, releaseAction)
}

task, err := task.NewSegmentTask(s.ctx,
t, err := task.NewSegmentTask(s.ctx,
Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond),
utils.ManualBalance,
collectionID,
Expand All @@ -140,13 +140,15 @@ func (s *Server) balanceSegments(ctx context.Context,
)
continue
}
task.SetReason("manual balance")
err = s.taskScheduler.Add(task)
t.SetReason("manual balance")
// set manual balance to normal, to avoid manual balance be canceled by other segment task
t.SetPriority(task.TaskPriorityNormal)
err = s.taskScheduler.Add(t)
if err != nil {
task.Cancel(err)
t.Cancel(err)
return err
}
tasks = append(tasks, task)
tasks = append(tasks, t)
}

if sync {
Expand Down Expand Up @@ -198,7 +200,7 @@ func (s *Server) balanceChannels(ctx context.Context,
releaseAction := task.NewChannelAction(plan.From, task.ActionTypeReduce, plan.Channel.GetChannelName())
actions = append(actions, releaseAction)
}
task, err := task.NewChannelTask(s.ctx,
t, err := task.NewChannelTask(s.ctx,
Params.QueryCoordCfg.ChannelTaskTimeout.GetAsDuration(time.Millisecond),
utils.ManualBalance,
collectionID,
Expand All @@ -215,13 +217,15 @@ func (s *Server) balanceChannels(ctx context.Context,
)
continue
}
task.SetReason("manual balance")
err = s.taskScheduler.Add(task)
t.SetReason("manual balance")
// set manual balance channel to high, to avoid manual balance be canceled by other channel task
t.SetPriority(task.TaskPriorityHigh)
err = s.taskScheduler.Add(t)
if err != nil {
task.Cancel(err)
t.Cancel(err)
return err
}
tasks = append(tasks, task)
tasks = append(tasks, t)
}

if sync {
Expand Down

0 comments on commit 40e39ef

Please sign in to comment.