diff --git a/pkg/pool/worker_pool.go b/pkg/pool/worker_pool.go index 891bfae29fc..d921b3bbbc0 100644 --- a/pkg/pool/worker_pool.go +++ b/pkg/pool/worker_pool.go @@ -3,11 +3,6 @@ package pool -import ( - "context" - "sync" -) - // Work is a unit of item to be worked on, like Java Runnable. type Work func() @@ -27,44 +22,31 @@ type WorkerPool interface { } type workerPool struct { - sync.Once - workCh chan Work - cancel context.CancelFunc + workCh chan struct{} + cancel chan struct{} } func NewWorkerPool(workers uint) WorkerPool { return &workerPool{ - workCh: make(chan Work, workers), + workCh: make(chan struct{}, workers), } } func (p *workerPool) Init() { - p.Do(func() { - ctx, cancel := context.WithCancel(context.Background()) - p.cancel = cancel - - for i := 0; i < cap(p.workCh); i++ { - go func() { - for { - select { - case <-ctx.Done(): - return - case work := <-p.workCh: - work() - } - } - }() - } - }) + for i := 0; i < cap(p.workCh); i++ { + p.workCh <- struct{}{} + } } func (p *workerPool) Go(work Work) { - p.Init() - p.workCh <- work + token := <-p.workCh + go func() { + work() + p.workCh <- token + }() } func (p *workerPool) Close() { - p.cancel() } func (p *workerPool) Size() int { diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index c2a0536fc4c..c72a78cb8ca 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -1190,9 +1190,11 @@ func newReplicationErrors(threshold, numErrors int) []*replicationErrors { } func newPeerWorker(cc *grpc.ClientConn, forwardDelay prometheus.Histogram, asyncWorkerCount uint) *peerWorker { + workerPool := pool.NewWorkerPool(asyncWorkerCount) + workerPool.Init() return &peerWorker{ cc: cc, - wp: pool.NewWorkerPool(asyncWorkerCount), + wp: workerPool, forwardDelay: forwardDelay, } }