Skip to content

Commit

Permalink
Receive: fix issue-7248 with parallel receive_forward (#7267)
Browse files Browse the repository at this point in the history
* Receive: fix issue-7248 by introducing a worker pool

Signed-off-by: Yi Jin <[email protected]>

* fix unit test bug

Signed-off-by: Yi Jin <[email protected]>

* fix CLI flags not pass into the receive handler

Signed-off-by: Yi Jin <[email protected]>

* address comments

Signed-off-by: Yi Jin <[email protected]>

* init context in constructor

Signed-off-by: Yi Jin <[email protected]>

---------

Signed-off-by: Yi Jin <[email protected]>
  • Loading branch information
jnyi authored Apr 11, 2024
1 parent 652e8cc commit 140bc87
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 96 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#7224](https://github.com/thanos-io/thanos/pull/7224) Query-frontend: Add Redis username to the client configuration.
- [#7220](https://github.com/thanos-io/thanos/pull/7220) Store Gateway: Fix lazy expanded postings caching partial expanded postings and bug of estimating remove postings with non existent value. Added `PromQLSmith` based fuzz test to improve correctness.
- [#7244](https://github.com/thanos-io/thanos/pull/7244) Query: Fix Internal Server Error unknown targetHealth: "unknown" when trying to open the targets page.
- [#7248](https://github.com/thanos-io/thanos/pull/7248) Receive: Fix RemoteWriteAsync was sequentially executed causing high latency in the ingestion path.
- [#7271](https://github.com/thanos-io/thanos/pull/7271) Query: fixing dedup iterator when working on mixed sample types.

### Added
Expand Down
2 changes: 2 additions & 0 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,8 @@ func runReceive(
MaxBackoff: time.Duration(*conf.maxBackoff),
TSDBStats: dbs,
Limiter: limiter,

AsyncForwardWorkerCount: conf.asyncForwardWorkerCount,
})

grpcProbe := prober.NewGRPC()
Expand Down
74 changes: 74 additions & 0 deletions pkg/pool/worker_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package pool

import (
"context"
"sync"
)

// Work is a unit of item to be worked on, like Java Runnable.
type Work func()

// WorkerPool is a pool of goroutines that are reusable, similar to Java ThreadPool.
type WorkerPool interface {
// Init initializes the worker pool.
Init()

// Go waits until the next worker becomes available and executes the given work.
Go(work Work)

// Close cancels all workers and waits for them to finish.
Close()

// Size returns the number of workers in the pool.
Size() int
}

type workerPool struct {
sync.Once
ctx context.Context
workCh chan Work
cancel context.CancelFunc
}

func NewWorkerPool(workers uint) WorkerPool {
ctx, cancel := context.WithCancel(context.Background())
return &workerPool{
ctx: ctx,
cancel: cancel,
workCh: make(chan Work, workers),
}
}

func (p *workerPool) Init() {
p.Do(func() {
for i := 0; i < p.Size(); i++ {
go func() {
for {
select {
case <-p.ctx.Done():
// TODO: exhaust workCh before exit
return
case work := <-p.workCh:
work()
}
}
}()
}
})
}

func (p *workerPool) Go(work Work) {
p.Init()
p.workCh <- work
}

func (p *workerPool) Close() {
p.cancel()
}

func (p *workerPool) Size() int {
return cap(p.workCh)
}
35 changes: 35 additions & 0 deletions pkg/pool/worker_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package pool

import (
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestGo(t *testing.T) {
var expectedWorksDone uint32
var workerPoolSize uint
var mu sync.Mutex
workerPoolSize = 5
p := NewWorkerPool(workerPoolSize)
p.Init()
defer p.Close()

var wg sync.WaitGroup
for i := 0; i < int(workerPoolSize*3); i++ {
wg.Add(1)
p.Go(func() {
mu.Lock()
defer mu.Unlock()
expectedWorksDone++
wg.Done()
})
}
wg.Wait()
require.Equal(t, uint32(workerPoolSize*3), expectedWorksDone)
require.Equal(t, int(workerPoolSize), p.Size())
}
127 changes: 31 additions & 96 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/thanos-io/thanos/pkg/logging"

extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/pool"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/server/http/middleware"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand Down Expand Up @@ -144,6 +145,7 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
if workers == 0 {
workers = 1
}
level.Info(logger).Log("msg", "Starting receive handler with async forward workers", "workers", workers)

h := &Handler{
logger: logger,
Expand Down Expand Up @@ -1286,90 +1288,23 @@ func newReplicationErrors(threshold, numErrors int) []*replicationErrors {
return errs
}

func (pw *peerWorker) initWorkers() {
pw.initWorkersOnce.Do(func() {
work := make(chan peerWorkItem)
pw.work = work

ctx, cancel := context.WithCancel(context.Background())
pw.turnOffGoroutines = cancel

for i := 0; i < int(pw.asyncWorkerCount); i++ {
go func() {
for {
select {
case <-ctx.Done():
return
case w := <-work:
pw.forwardDelay.Observe(time.Since(w.sendTime).Seconds())

tracing.DoInSpan(w.workItemCtx, "receive_forward", func(ctx context.Context) {
_, err := storepb.NewWriteableStoreClient(pw.cc).RemoteWrite(ctx, w.req)
w.workResult <- peerWorkResponse{
er: w.er,
err: errors.Wrapf(err, "forwarding request to endpoint %v", w.er.endpoint),
}
if err != nil {
sp := trace.SpanFromContext(ctx)
sp.SetAttributes(attribute.Bool("error", true))
sp.SetAttributes(attribute.String("error.msg", err.Error()))
}
close(w.workResult)
}, opentracing.Tags{
"endpoint": w.er.endpoint,
"replica": w.er.replica,
})

}
}
}()
}

})
}

func newPeerWorker(cc *grpc.ClientConn, forwardDelay prometheus.Histogram, asyncWorkerCount uint) *peerWorker {
return &peerWorker{
cc: cc,
asyncWorkerCount: asyncWorkerCount,
forwardDelay: forwardDelay,
cc: cc,
wp: pool.NewWorkerPool(asyncWorkerCount),
forwardDelay: forwardDelay,
}
}

type peerWorkItem struct {
cc *grpc.ClientConn
req *storepb.WriteRequest
workItemCtx context.Context

workResult chan peerWorkResponse
er endpointReplica
sendTime time.Time
}

func (pw *peerWorker) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, opts ...grpc.CallOption) (*storepb.WriteResponse, error) {
pw.initWorkers()

w := peerWorkItem{
cc: pw.cc,
req: in,
workResult: make(chan peerWorkResponse, 1),
workItemCtx: ctx,
sendTime: time.Now(),
}

pw.work <- w
return nil, (<-w.workResult).err
return storepb.NewWriteableStoreClient(pw.cc).RemoteWrite(ctx, in)
}

type peerWorker struct {
cc *grpc.ClientConn
wp pool.WorkerPool

work chan peerWorkItem
turnOffGoroutines func()

initWorkersOnce sync.Once
asyncWorkerCount uint
forwardDelay prometheus.Histogram
forwardDelay prometheus.Histogram
}

func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, dialOpts ...grpc.DialOption) peersContainer {
Expand All @@ -1393,29 +1328,29 @@ type peersContainer interface {
reset()
}

type peerWorkResponse struct {
er endpointReplica
err error
}

func (p *peerWorker) RemoteWriteAsync(ctx context.Context, req *storepb.WriteRequest, er endpointReplica, seriesIDs []int, responseWriter chan writeResponse, cb func(error)) {
p.initWorkers()

w := peerWorkItem{
cc: p.cc,
req: req,
workResult: make(chan peerWorkResponse, 1),
workItemCtx: ctx,
er: er,

sendTime: time.Now(),
}

p.work <- w
res := <-w.workResult

responseWriter <- newWriteResponse(seriesIDs, res.err, er)
cb(res.err)
now := time.Now()
p.wp.Go(func() {
p.forwardDelay.Observe(time.Since(now).Seconds())

tracing.DoInSpan(ctx, "receive_forward", func(ctx context.Context) {
_, err := storepb.NewWriteableStoreClient(p.cc).RemoteWrite(ctx, req)
responseWriter <- newWriteResponse(
seriesIDs,
errors.Wrapf(err, "forwarding request to endpoint %v", er.endpoint),
er,
)
if err != nil {
sp := trace.SpanFromContext(ctx)
sp.SetAttributes(attribute.Bool("error", true))
sp.SetAttributes(attribute.String("error.msg", err.Error()))
}
cb(err)
}, opentracing.Tags{
"endpoint": er.endpoint,
"replica": er.replica,
})
})
}

type peerGroup struct {
Expand Down Expand Up @@ -1443,7 +1378,7 @@ func (p *peerGroup) close(addr string) error {
return nil
}

p.connections[addr].turnOffGoroutines()
p.connections[addr].wp.Close()
delete(p.connections, addr)
if err := c.cc.Close(); err != nil {
return fmt.Errorf("closing connection for %s", addr)
Expand Down

0 comments on commit 140bc87

Please sign in to comment.