Skip to content

Commit

Permalink
change the worker pool type
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Jin <[email protected]>
  • Loading branch information
jnyi committed Apr 9, 2024
1 parent a5f8b31 commit 500a641
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 30 deletions.
40 changes: 11 additions & 29 deletions pkg/pool/worker_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,6 @@

package pool

import (
"context"
"sync"
)

// Work is a unit of item to be worked on, like Java Runnable.
type Work func()

Expand All @@ -27,44 +22,31 @@ type WorkerPool interface {
}

type workerPool struct {
sync.Once
workCh chan Work
cancel context.CancelFunc
workCh chan struct{}
cancel chan struct{}

Check failure on line 26 in pkg/pool/worker_pool.go

View workflow job for this annotation

GitHub Actions / Linters (Static Analysis) for Go

field `cancel` is unused (unused)
}

func NewWorkerPool(workers uint) WorkerPool {
return &workerPool{
workCh: make(chan Work, workers),
workCh: make(chan struct{}, workers),
}
}

func (p *workerPool) Init() {
p.Do(func() {
ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel

for i := 0; i < cap(p.workCh); i++ {
go func() {
for {
select {
case <-ctx.Done():
return
case work := <-p.workCh:
work()
}
}
}()
}
})
for i := 0; i < cap(p.workCh); i++ {
p.workCh <- struct{}{}
}
}

func (p *workerPool) Go(work Work) {
p.Init()
p.workCh <- work
token := <-p.workCh
go func() {
work()
p.workCh <- token
}()
}

func (p *workerPool) Close() {
p.cancel()
}

func (p *workerPool) Size() int {
Expand Down
4 changes: 3 additions & 1 deletion pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,9 +1190,11 @@ func newReplicationErrors(threshold, numErrors int) []*replicationErrors {
}

func newPeerWorker(cc *grpc.ClientConn, forwardDelay prometheus.Histogram, asyncWorkerCount uint) *peerWorker {
workerPool := pool.NewWorkerPool(asyncWorkerCount)
workerPool.Init()
return &peerWorker{
cc: cc,
wp: pool.NewWorkerPool(asyncWorkerCount),
wp: workerPool,
forwardDelay: forwardDelay,
}
}
Expand Down

0 comments on commit 500a641

Please sign in to comment.