From 140bc871930973b94f72dd134644d2a0a0d490de Mon Sep 17 00:00:00 2001 From: Yi Jin <96499497+jnyi@users.noreply.github.com> Date: Thu, 11 Apr 2024 11:02:29 -0700 Subject: [PATCH] Receive: fix issue-7248 with parallel receive_forward (#7267) * Receive: fix issue-7248 by introducing a worker pool Signed-off-by: Yi Jin * fix unit test bug Signed-off-by: Yi Jin * fix CLI flags not pass into the receive handler Signed-off-by: Yi Jin * address comments Signed-off-by: Yi Jin * init context in constructor Signed-off-by: Yi Jin --------- Signed-off-by: Yi Jin --- CHANGELOG.md | 1 + cmd/thanos/receive.go | 2 + pkg/pool/worker_pool.go | 74 ++++++++++++++++++++ pkg/pool/worker_pool_test.go | 35 ++++++++++ pkg/receive/handler.go | 127 +++++++++-------------------------- 5 files changed, 143 insertions(+), 96 deletions(-) create mode 100644 pkg/pool/worker_pool.go create mode 100644 pkg/pool/worker_pool_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 5d41c83c25..72bc41ecc2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#7224](https://github.com/thanos-io/thanos/pull/7224) Query-frontend: Add Redis username to the client configuration. - [#7220](https://github.com/thanos-io/thanos/pull/7220) Store Gateway: Fix lazy expanded postings caching partial expanded postings and bug of estimating remove postings with non existent value. Added `PromQLSmith` based fuzz test to improve correctness. - [#7244](https://github.com/thanos-io/thanos/pull/7244) Query: Fix Internal Server Error unknown targetHealth: "unknown" when trying to open the targets page. +- [#7248](https://github.com/thanos-io/thanos/pull/7248) Receive: Fix RemoteWriteAsync was sequentially executed causing high latency in the ingestion path. - [#7271](https://github.com/thanos-io/thanos/pull/7271) Query: fixing dedup iterator when working on mixed sample types. ### Added diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 4ced493c94..1a193d30bd 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -256,6 +256,8 @@ func runReceive( MaxBackoff: time.Duration(*conf.maxBackoff), TSDBStats: dbs, Limiter: limiter, + + AsyncForwardWorkerCount: conf.asyncForwardWorkerCount, }) grpcProbe := prober.NewGRPC() diff --git a/pkg/pool/worker_pool.go b/pkg/pool/worker_pool.go new file mode 100644 index 0000000000..6e3e77d7ab --- /dev/null +++ b/pkg/pool/worker_pool.go @@ -0,0 +1,74 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package pool + +import ( + "context" + "sync" +) + +// Work is a unit of item to be worked on, like Java Runnable. +type Work func() + +// WorkerPool is a pool of goroutines that are reusable, similar to Java ThreadPool. +type WorkerPool interface { + // Init initializes the worker pool. + Init() + + // Go waits until the next worker becomes available and executes the given work. + Go(work Work) + + // Close cancels all workers and waits for them to finish. + Close() + + // Size returns the number of workers in the pool. + Size() int +} + +type workerPool struct { + sync.Once + ctx context.Context + workCh chan Work + cancel context.CancelFunc +} + +func NewWorkerPool(workers uint) WorkerPool { + ctx, cancel := context.WithCancel(context.Background()) + return &workerPool{ + ctx: ctx, + cancel: cancel, + workCh: make(chan Work, workers), + } +} + +func (p *workerPool) Init() { + p.Do(func() { + for i := 0; i < p.Size(); i++ { + go func() { + for { + select { + case <-p.ctx.Done(): + // TODO: exhaust workCh before exit + return + case work := <-p.workCh: + work() + } + } + }() + } + }) +} + +func (p *workerPool) Go(work Work) { + p.Init() + p.workCh <- work +} + +func (p *workerPool) Close() { + p.cancel() +} + +func (p *workerPool) Size() int { + return cap(p.workCh) +} diff --git a/pkg/pool/worker_pool_test.go b/pkg/pool/worker_pool_test.go new file mode 100644 index 0000000000..bbd73ec6b4 --- /dev/null +++ b/pkg/pool/worker_pool_test.go @@ -0,0 +1,35 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package pool + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestGo(t *testing.T) { + var expectedWorksDone uint32 + var workerPoolSize uint + var mu sync.Mutex + workerPoolSize = 5 + p := NewWorkerPool(workerPoolSize) + p.Init() + defer p.Close() + + var wg sync.WaitGroup + for i := 0; i < int(workerPoolSize*3); i++ { + wg.Add(1) + p.Go(func() { + mu.Lock() + defer mu.Unlock() + expectedWorksDone++ + wg.Done() + }) + } + wg.Wait() + require.Equal(t, uint32(workerPoolSize*3), expectedWorksDone) + require.Equal(t, int(workerPoolSize), p.Size()) +} diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index 5f24c3f747..208c01b131 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -44,6 +44,7 @@ import ( "github.com/thanos-io/thanos/pkg/logging" extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http" + "github.com/thanos-io/thanos/pkg/pool" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/server/http/middleware" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -144,6 +145,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler { if workers == 0 { workers = 1 } + level.Info(logger).Log("msg", "Starting receive handler with async forward workers", "workers", workers) h := &Handler{ logger: logger, @@ -1286,90 +1288,23 @@ func newReplicationErrors(threshold, numErrors int) []*replicationErrors { return errs } -func (pw *peerWorker) initWorkers() { - pw.initWorkersOnce.Do(func() { - work := make(chan peerWorkItem) - pw.work = work - - ctx, cancel := context.WithCancel(context.Background()) - pw.turnOffGoroutines = cancel - - for i := 0; i < int(pw.asyncWorkerCount); i++ { - go func() { - for { - select { - case <-ctx.Done(): - return - case w := <-work: - pw.forwardDelay.Observe(time.Since(w.sendTime).Seconds()) - - tracing.DoInSpan(w.workItemCtx, "receive_forward", func(ctx context.Context) { - _, err := storepb.NewWriteableStoreClient(pw.cc).RemoteWrite(ctx, w.req) - w.workResult <- peerWorkResponse{ - er: w.er, - err: errors.Wrapf(err, "forwarding request to endpoint %v", w.er.endpoint), - } - if err != nil { - sp := trace.SpanFromContext(ctx) - sp.SetAttributes(attribute.Bool("error", true)) - sp.SetAttributes(attribute.String("error.msg", err.Error())) - } - close(w.workResult) - }, opentracing.Tags{ - "endpoint": w.er.endpoint, - "replica": w.er.replica, - }) - - } - } - }() - } - - }) -} - func newPeerWorker(cc *grpc.ClientConn, forwardDelay prometheus.Histogram, asyncWorkerCount uint) *peerWorker { return &peerWorker{ - cc: cc, - asyncWorkerCount: asyncWorkerCount, - forwardDelay: forwardDelay, + cc: cc, + wp: pool.NewWorkerPool(asyncWorkerCount), + forwardDelay: forwardDelay, } } -type peerWorkItem struct { - cc *grpc.ClientConn - req *storepb.WriteRequest - workItemCtx context.Context - - workResult chan peerWorkResponse - er endpointReplica - sendTime time.Time -} - func (pw *peerWorker) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, opts ...grpc.CallOption) (*storepb.WriteResponse, error) { - pw.initWorkers() - - w := peerWorkItem{ - cc: pw.cc, - req: in, - workResult: make(chan peerWorkResponse, 1), - workItemCtx: ctx, - sendTime: time.Now(), - } - - pw.work <- w - return nil, (<-w.workResult).err + return storepb.NewWriteableStoreClient(pw.cc).RemoteWrite(ctx, in) } type peerWorker struct { cc *grpc.ClientConn + wp pool.WorkerPool - work chan peerWorkItem - turnOffGoroutines func() - - initWorkersOnce sync.Once - asyncWorkerCount uint - forwardDelay prometheus.Histogram + forwardDelay prometheus.Histogram } func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, dialOpts ...grpc.DialOption) peersContainer { @@ -1393,29 +1328,29 @@ type peersContainer interface { reset() } -type peerWorkResponse struct { - er endpointReplica - err error -} - func (p *peerWorker) RemoteWriteAsync(ctx context.Context, req *storepb.WriteRequest, er endpointReplica, seriesIDs []int, responseWriter chan writeResponse, cb func(error)) { - p.initWorkers() - - w := peerWorkItem{ - cc: p.cc, - req: req, - workResult: make(chan peerWorkResponse, 1), - workItemCtx: ctx, - er: er, - - sendTime: time.Now(), - } - - p.work <- w - res := <-w.workResult - - responseWriter <- newWriteResponse(seriesIDs, res.err, er) - cb(res.err) + now := time.Now() + p.wp.Go(func() { + p.forwardDelay.Observe(time.Since(now).Seconds()) + + tracing.DoInSpan(ctx, "receive_forward", func(ctx context.Context) { + _, err := storepb.NewWriteableStoreClient(p.cc).RemoteWrite(ctx, req) + responseWriter <- newWriteResponse( + seriesIDs, + errors.Wrapf(err, "forwarding request to endpoint %v", er.endpoint), + er, + ) + if err != nil { + sp := trace.SpanFromContext(ctx) + sp.SetAttributes(attribute.Bool("error", true)) + sp.SetAttributes(attribute.String("error.msg", err.Error())) + } + cb(err) + }, opentracing.Tags{ + "endpoint": er.endpoint, + "replica": er.replica, + }) + }) } type peerGroup struct { @@ -1443,7 +1378,7 @@ func (p *peerGroup) close(addr string) error { return nil } - p.connections[addr].turnOffGoroutines() + p.connections[addr].wp.Close() delete(p.connections, addr) if err := c.cc.Close(); err != nil { return fmt.Errorf("closing connection for %s", addr)