Skip to content

Commit

Permalink
Refine QueryCoord stopping (#27371)
Browse files Browse the repository at this point in the history
Signed-off-by: yah01 <[email protected]>
  • Loading branch information
yah01 authored Sep 27, 2023
1 parent b80a3e1 commit a8ce1b6
Show file tree
Hide file tree
Showing 27 changed files with 306 additions and 299 deletions.
12 changes: 6 additions & 6 deletions internal/querycoordv2/balance/mock_balancer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 10 additions & 11 deletions internal/querycoordv2/checkers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var (
)

type CheckerController struct {
stopCh chan struct{}
cancel context.CancelFunc
manualCheckChs map[string]chan struct{}
meta *meta.Meta
dist *meta.DistributionManager
Expand Down Expand Up @@ -87,7 +87,6 @@ func NewCheckerController(
}

return &CheckerController{
stopCh: make(chan struct{}),
manualCheckChs: manualCheckChs,
meta: meta,
dist: dist,
Expand All @@ -98,9 +97,12 @@ func NewCheckerController(
}
}

func (controller *CheckerController) Start(ctx context.Context) {
func (controller *CheckerController) Start() {
ctx, cancel := context.WithCancel(context.Background())
controller.cancel = cancel

for checkerType := range controller.checkers {
go controller.StartChecker(ctx, checkerType)
go controller.startChecker(ctx, checkerType)
}
}

Expand All @@ -119,19 +121,14 @@ func getCheckerInterval(checkerType string) time.Duration {
}
}

func (controller *CheckerController) StartChecker(ctx context.Context, checkerType string) {
func (controller *CheckerController) startChecker(ctx context.Context, checkerType string) {
interval := getCheckerInterval(checkerType)
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
log.Info("Checker stopped due to context canceled",
zap.String("type", checkerType))
return

case <-controller.stopCh:
log.Info("Checker stopped",
zap.String("type", checkerType))
return
Expand All @@ -149,7 +146,9 @@ func (controller *CheckerController) StartChecker(ctx context.Context, checkerTy

func (controller *CheckerController) Stop() {
controller.stopOnce.Do(func() {
close(controller.stopCh)
if controller.cancel != nil {
controller.cancel()
}
})
}

Expand Down
4 changes: 1 addition & 3 deletions internal/querycoordv2/checkers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package checkers

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -127,8 +126,7 @@ func (suite *CheckerControllerSuite) TestBasic() {

suite.balancer.EXPECT().AssignSegment(mock.Anything, mock.Anything, mock.Anything).Return(nil)
suite.balancer.EXPECT().AssignChannel(mock.Anything, mock.Anything).Return(nil)
ctx := context.Background()
suite.controller.Start(ctx)
suite.controller.Start()
defer suite.controller.Stop()

suite.Eventually(func() bool {
Expand Down
3 changes: 2 additions & 1 deletion internal/querycoordv2/dist/dist_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ func (dc *ControllerImpl) SyncAll(ctx context.Context) {
func (dc *ControllerImpl) Stop() {
dc.mu.Lock()
defer dc.mu.Unlock()
for _, h := range dc.handlers {
for nodeID, h := range dc.handlers {
h.stop()
delete(dc.handlers, nodeID)
}
}

Expand Down
4 changes: 4 additions & 0 deletions internal/querycoordv2/dist/dist_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,10 @@ func (dh *distHandler) stop() {
dh.stopOnce.Do(func() {
close(dh.c)
dh.wg.Wait()

// clear dist
dh.dist.ChannelDistManager.Update(dh.nodeID)
dh.dist.SegmentDistManager.Update(dh.nodeID)
})
}

Expand Down
8 changes: 4 additions & 4 deletions internal/querycoordv2/dist/mock_controller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions internal/querycoordv2/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ func (suite *JobSuite) SetupTest() {
suite.dist,
suite.broker,
)
suite.targetObserver.Start(context.Background())
suite.targetObserver.Start()
suite.scheduler = NewScheduler()

suite.scheduler.Start(context.Background())
suite.scheduler.Start()
meta.GlobalFailedLoadCache = meta.NewFailedLoadCache()

suite.nodeMgr.Add(session.NewNodeInfo(1000, "localhost"))
Expand Down
92 changes: 40 additions & 52 deletions internal/querycoordv2/job/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
type jobQueue chan Job

type Scheduler struct {
stopCh chan struct{}
cancel context.CancelFunc
wg sync.WaitGroup

processors *typeutil.ConcurrentSet[int64] // Collections of having processor
Expand All @@ -49,73 +49,64 @@ type Scheduler struct {

func NewScheduler() *Scheduler {
return &Scheduler{
stopCh: make(chan struct{}),
processors: typeutil.NewConcurrentSet[int64](),
queues: make(map[int64]jobQueue),
waitQueue: make(jobQueue, waitQueueCap),
}
}

func (scheduler *Scheduler) Start(ctx context.Context) {
scheduler.schedule(ctx)
func (scheduler *Scheduler) Start() {
ctx, cancel := context.WithCancel(context.Background())
scheduler.cancel = cancel

scheduler.wg.Add(1)
go func() {
defer scheduler.wg.Done()
scheduler.schedule(ctx)
}()
}

func (scheduler *Scheduler) Stop() {
scheduler.stopOnce.Do(func() {
close(scheduler.stopCh)
if scheduler.cancel != nil {
scheduler.cancel()
}
scheduler.wg.Wait()
})
}

func (scheduler *Scheduler) schedule(ctx context.Context) {
scheduler.wg.Add(1)
go func() {
defer scheduler.wg.Done()
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("JobManager stopped due to context canceled")
return

case <-scheduler.stopCh:
log.Info("JobManager stopped")
for _, queue := range scheduler.queues {
close(queue)
}
return
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("JobManager stopped")
for _, queue := range scheduler.queues {
close(queue)
}
return

case job := <-scheduler.waitQueue:
queue, ok := scheduler.queues[job.CollectionID()]
if !ok {
queue = make(jobQueue, collectionQueueCap)
scheduler.queues[job.CollectionID()] = queue
}
queue <- job
scheduler.startProcessor(job.CollectionID(), queue)

case <-ticker.C:
for collection, queue := range scheduler.queues {
if len(queue) > 0 {
scheduler.startProcessor(collection, queue)
} else {
// Release resource if no job for the collection
close(queue)
delete(scheduler.queues, collection)
}
case job := <-scheduler.waitQueue:
queue, ok := scheduler.queues[job.CollectionID()]
if !ok {
queue = make(jobQueue, collectionQueueCap)
scheduler.queues[job.CollectionID()] = queue
}
queue <- job
scheduler.startProcessor(job.CollectionID(), queue)

case <-ticker.C:
for collection, queue := range scheduler.queues {
if len(queue) > 0 {
scheduler.startProcessor(collection, queue)
} else {
// Release resource if no job for the collection
close(queue)
delete(scheduler.queues, collection)
}
}
}
}()
}

func (scheduler *Scheduler) isStopped() bool {
select {
case <-scheduler.stopCh:
return true
default:
return false
}
}

Expand All @@ -124,9 +115,6 @@ func (scheduler *Scheduler) Add(job Job) {
}

func (scheduler *Scheduler) startProcessor(collection int64, queue jobQueue) {
if scheduler.isStopped() {
return
}
if !scheduler.processors.Insert(collection) {
return
}
Expand Down
34 changes: 17 additions & 17 deletions internal/querycoordv2/meta/mock_broker.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit a8ce1b6

Please sign in to comment.