diff --git a/pkg/pool/worker_pool.go b/pkg/pool/worker_pool.go new file mode 100644 index 00000000000..891bfae29fc --- /dev/null +++ b/pkg/pool/worker_pool.go @@ -0,0 +1,72 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package pool + +import ( + "context" + "sync" +) + +// Work is a unit of item to be worked on, like Java Runnable. +type Work func() + +// WorkerPool is a pool of goroutines that are reusable, similar to Java ThreadPool. +type WorkerPool interface { + // Init initializes the worker pool. + Init() + + // Go waits until the next worker becomes available and executes the given work. + Go(work Work) + + // Close cancels all workers and waits for them to finish. + Close() + + // Size returns the number of workers in the pool. + Size() int +} + +type workerPool struct { + sync.Once + workCh chan Work + cancel context.CancelFunc +} + +func NewWorkerPool(workers uint) WorkerPool { + return &workerPool{ + workCh: make(chan Work, workers), + } +} + +func (p *workerPool) Init() { + p.Do(func() { + ctx, cancel := context.WithCancel(context.Background()) + p.cancel = cancel + + for i := 0; i < cap(p.workCh); i++ { + go func() { + for { + select { + case <-ctx.Done(): + return + case work := <-p.workCh: + work() + } + } + }() + } + }) +} + +func (p *workerPool) Go(work Work) { + p.Init() + p.workCh <- work +} + +func (p *workerPool) Close() { + p.cancel() +} + +func (p *workerPool) Size() int { + return cap(p.workCh) +} diff --git a/pkg/pool/worker_pool_test.go b/pkg/pool/worker_pool_test.go new file mode 100644 index 00000000000..3828ec6d6b2 --- /dev/null +++ b/pkg/pool/worker_pool_test.go @@ -0,0 +1,35 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package pool + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGo(t *testing.T) { + var expectedWorksDone uint32 + var workerPoolSize uint + var mu sync.Mutex + workerPoolSize = 5 + p := NewWorkerPool(workerPoolSize) + p.Init() + defer p.Close() + + var wg sync.WaitGroup + for i := 0; i < int(workerPoolSize*3); i++ { + wg.Add(1) + p.Go(func() { + mu.Lock() + mu.Unlock() + expectedWorksDone++ + wg.Done() + }) + } + wg.Wait() + require.Equal(t, uint32(workerPoolSize*3), expectedWorksDone) + require.Equal(t, int(workerPoolSize), p.Size()) +} diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 5756296db31..dd59645ca69 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -8,6 +8,8 @@ import ( "context" "crypto/tls" "fmt" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "io" stdlog "log" "math" @@ -32,8 +34,6 @@ import ( "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -43,6 +43,7 @@ import ( "github.com/thanos-io/thanos/pkg/logging" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" + "github.com/thanos-io/thanos/pkg/pool" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/server/http/middleware" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -1188,90 +1189,23 @@ func newReplicationErrors(threshold, numErrors int) []*replicationErrors { return errs } -func (pw *peerWorker) initWorkers() { - pw.initWorkersOnce.Do(func() { - work := make(chan peerWorkItem) - pw.work = work - - ctx, cancel := context.WithCancel(context.Background()) - pw.turnOffGoroutines = cancel - - for i := 0; i < int(pw.asyncWorkerCount); i++ { - go func() { - for { - select { - case <-ctx.Done(): - return - case w := <-work: - pw.forwardDelay.Observe(time.Since(w.sendTime).Seconds()) - - tracing.DoInSpan(w.workItemCtx, "receive_forward", func(ctx context.Context) { - _, err := storepb.NewWriteableStoreClient(pw.cc).RemoteWrite(ctx, w.req) - w.workResult <- peerWorkResponse{ - er: w.er, - err: errors.Wrapf(err, "forwarding request to endpoint %v", w.er.endpoint), - } - if err != nil { - sp := trace.SpanFromContext(ctx) - sp.SetAttributes(attribute.Bool("error", true)) - sp.SetAttributes(attribute.String("error.msg", err.Error())) - } - close(w.workResult) - }, opentracing.Tags{ - "endpoint": w.er.endpoint, - "replica": w.er.replica, - }) - - } - } - }() - } - - }) -} - func newPeerWorker(cc *grpc.ClientConn, forwardDelay prometheus.Histogram, asyncWorkerCount uint) *peerWorker { return &peerWorker{ - cc: cc, - asyncWorkerCount: asyncWorkerCount, - forwardDelay: forwardDelay, + cc: cc, + wp: pool.NewWorkerPool(asyncWorkerCount), + forwardDelay: forwardDelay, } } -type peerWorkItem struct { - cc *grpc.ClientConn - req *storepb.WriteRequest - workItemCtx context.Context - - workResult chan peerWorkResponse - er endpointReplica - sendTime time.Time -} - func (pw *peerWorker) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, opts ...grpc.CallOption) (*storepb.WriteResponse, error) { - pw.initWorkers() - - w := peerWorkItem{ - cc: pw.cc, - req: in, - workResult: make(chan peerWorkResponse, 1), - workItemCtx: ctx, - sendTime: time.Now(), - } - - pw.work <- w - return nil, (<-w.workResult).err + return storepb.NewWriteableStoreClient(pw.cc).RemoteWrite(ctx, in) } type peerWorker struct { cc *grpc.ClientConn + wp pool.WorkerPool - work chan peerWorkItem - turnOffGoroutines func() - - initWorkersOnce sync.Once - asyncWorkerCount uint - forwardDelay prometheus.Histogram + forwardDelay prometheus.Histogram } func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, dialOpts ...grpc.DialOption) peersContainer { @@ -1295,29 +1229,29 @@ type peersContainer interface { reset() } -type peerWorkResponse struct { - er endpointReplica - err error -} - func (p *peerWorker) RemoteWriteAsync(ctx context.Context, req *storepb.WriteRequest, er endpointReplica, seriesIDs []int, responseWriter chan writeResponse, cb func(error)) { - p.initWorkers() - - w := peerWorkItem{ - cc: p.cc, - req: req, - workResult: make(chan peerWorkResponse, 1), - workItemCtx: ctx, - er: er, - - sendTime: time.Now(), - } - - p.work <- w - res := <-w.workResult - - responseWriter <- newWriteResponse(seriesIDs, res.err, er) - cb(res.err) + now := time.Now() + p.wp.Go(func() { + p.forwardDelay.Observe(time.Since(now).Seconds()) + + tracing.DoInSpan(ctx, "receive_forward", func(ctx context.Context) { + _, err := storepb.NewWriteableStoreClient(p.cc).RemoteWrite(ctx, req) + responseWriter <- newWriteResponse( + seriesIDs, + errors.Wrapf(err, "forwarding request to endpoint %v", er.endpoint), + er, + ) + if err != nil { + sp := trace.SpanFromContext(ctx) + sp.SetAttributes(attribute.Bool("error", true)) + sp.SetAttributes(attribute.String("error.msg", err.Error())) + } + cb(err) + }, opentracing.Tags{ + "endpoint": er.endpoint, + "replica": er.replica, + }) + }) } type peerGroup struct { @@ -1345,7 +1279,7 @@ func (p *peerGroup) close(addr string) error { return nil } - p.connections[addr].turnOffGoroutines() + p.connections[addr].wp.Close() delete(p.connections, addr) if err := c.cc.Close(); err != nil { return fmt.Errorf("closing connection for %s", addr)