Skip to content

Commit

Permalink
Remove mutex from ReusableGoroutinesPool (#610)
Browse files Browse the repository at this point in the history
I find it unsettling that we need to acquire a mutex (a concurrency
primitive) to send data to a channel (another concurrency primitive).

I think this achieves the same effect (we can stop the worker
goroutines in a safe way) without having to mix them and leaving the
main code path clear (just try to send a job, or run a new goroutine).

Signed-off-by: Oleg Zaytsev <[email protected]>
  • Loading branch information
colega authored Oct 14, 2024
1 parent 619c421 commit d3f80b0
Showing 1 changed file with 11 additions and 22 deletions.
33 changes: 11 additions & 22 deletions concurrency/worker.go
Original file line number Diff line number Diff line change
@@ -1,44 +1,36 @@
package concurrency

import (
"sync"
)

// NewReusableGoroutinesPool creates a new worker pool with the given size.
// These workers will run the workloads passed through Go() calls.
// If all workers are busy, Go() will spawn a new goroutine to run the workload.
func NewReusableGoroutinesPool(size int) *ReusableGoroutinesPool {
p := &ReusableGoroutinesPool{
jobs: make(chan func()),
jobs: make(chan func()),
closed: make(chan struct{}),
}
for i := 0; i < size; i++ {
go func() {
for f := range p.jobs {
f()
for {
select {
case f := <-p.jobs:
f()
case <-p.closed:
return
}
}
}()
}
return p
}

type ReusableGoroutinesPool struct {
jobsMu sync.RWMutex
closed bool
jobs chan func()
closed chan struct{}
}

// Go will run the given function in a worker of the pool.
// If all workers are busy, Go() will spawn a new goroutine to run the workload.
func (p *ReusableGoroutinesPool) Go(f func()) {
p.jobsMu.RLock()
defer p.jobsMu.RUnlock()

// If the pool is closed, run the function in a new goroutine.
if p.closed {
go f()
return
}

select {
case p.jobs <- f:
default:
Expand All @@ -51,8 +43,5 @@ func (p *ReusableGoroutinesPool) Go(f func()) {
// Close does NOT wait for all jobs to finish, it is the caller's responsibility to ensure that in the provided workloads.
// Close is intended to be used in tests to ensure that no goroutines are leaked.
func (p *ReusableGoroutinesPool) Close() {
p.jobsMu.Lock()
defer p.jobsMu.Unlock()
p.closed = true
close(p.jobs)
close(p.closed)
}

0 comments on commit d3f80b0

Please sign in to comment.