diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 4edbda87599..b6735181e45 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -374,10 +374,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove }, []string{"user"}), validateMetrics: validation.NewValidateMetrics(reg), + asyncExecutor: util.NewNoOpExecutor(), } if cfg.NumPushWorkers > 0 { - d.asyncExecutor = util.NewWorkerPool(cfg.NumPushWorkers) + d.asyncExecutor = util.NewWorkerPool("distributor", cfg.NumPushWorkers, reg) } promauto.With(reg).NewGauge(prometheus.GaugeOpts{ diff --git a/pkg/util/worker_pool.go b/pkg/util/worker_pool.go index 65bbb81f237..59cb5302649 100644 --- a/pkg/util/worker_pool.go +++ b/pkg/util/worker_pool.go @@ -1,6 +1,11 @@ package util -import "sync" +import ( + "github.com/prometheus/client_golang/prometheus/promauto" + "sync" + + "github.com/prometheus/client_golang/prometheus" +) // This code was based on: https://github.com/grpc/grpc-go/blob/66ba4b264d26808cb7af3c86eee66e843472915e/server.go @@ -13,10 +18,13 @@ const serverWorkerResetThreshold = 1 << 16 type AsyncExecutor interface { Submit(f func()) + Stop() } type noOpExecutor struct{} +func (n noOpExecutor) Stop() {} + func NewNoOpExecutor() AsyncExecutor { return &noOpExecutor{} } @@ -28,11 +36,19 @@ func (n noOpExecutor) Submit(f func()) { type workerPoolExecutor struct { serverWorkerChannel chan func() closeOnce sync.Once + + fallbackTotal prometheus.Counter } -func NewWorkerPool(numWorkers int) AsyncExecutor { +func NewWorkerPool(name string, numWorkers int, reg prometheus.Registerer) AsyncExecutor { wp := &workerPoolExecutor{ serverWorkerChannel: make(chan func()), + fallbackTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Namespace: "cortex", + Name: "worker_pool_fallback_total", + Help: "The total number additional go routines that needed to be created to run jobs.", + ConstLabels: prometheus.Labels{"name": name}, + }), } for i := 0; i < numWorkers; i++ { @@ -52,6 +68,7 @@ func (s *workerPoolExecutor) Submit(f func()) { select { case s.serverWorkerChannel <- f: default: + s.fallbackTotal.Inc() go f() } } diff --git a/pkg/util/worker_pool_test.go b/pkg/util/worker_pool_test.go new file mode 100644 index 00000000000..9c0b1fe65e2 --- /dev/null +++ b/pkg/util/worker_pool_test.go @@ -0,0 +1,86 @@ +package util + +import ( + "bytes" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" + "sync" + "testing" +) + +func TestNewWorkerPool_CreateMultiplesPoolsWithSameRegistry(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + wp1 := NewWorkerPool("test1", 100, reg) + defer wp1.Stop() + wp2 := NewWorkerPool("test2", 100, reg) + defer wp2.Stop() +} + +func TestWorkerPool_TestMetric(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + workerPool := NewWorkerPool("test1", 1, reg) + defer workerPool.Stop() + + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_worker_pool_fallback_total The total number additional go routines that needed to be created to run jobs. + # TYPE cortex_worker_pool_fallback_total counter + cortex_worker_pool_fallback_total{name="test1"} 0 +`), "cortex_worker_pool_fallback_total")) + + wg := &sync.WaitGroup{} + wg.Add(1) + + // Block the first job + workerPool.Submit(func() { + wg.Wait() + }) + + // create an extra job to increment the metric + workerPool.Submit(func() {}) + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_worker_pool_fallback_total The total number additional go routines that needed to be created to run jobs. + # TYPE cortex_worker_pool_fallback_total counter + cortex_worker_pool_fallback_total{name="test1"} 1 +`), "cortex_worker_pool_fallback_total")) + + wg.Done() +} + +func TestWorkerPool_ShouldFallbackWhenAllWorkersAreBusy(t *testing.T) { + reg := prometheus.NewPedanticRegistry() + numberOfWorkers := 10 + workerPool := NewWorkerPool("test1", numberOfWorkers, reg) + defer workerPool.Stop() + + m := sync.Mutex{} + blockerWg := sync.WaitGroup{} + blockerWg.Add(numberOfWorkers) + + // Lets lock all submited jobs + m.Lock() + + for i := 0; i < numberOfWorkers; i++ { + workerPool.Submit(func() { + defer blockerWg.Done() + m.Lock() + m.Unlock() + }) + } + + // At this point all workers should be busy. lets try to create a new job + wg := sync.WaitGroup{} + wg.Add(1) + workerPool.Submit(func() { + defer wg.Done() + }) + + // Make sure the last job ran to the end + wg.Wait() + + // Lets release the jobs + m.Unlock() + + blockerWg.Wait() + +}