Skip to content

Commit

Permalink
Receive: fix issue-7248 by introducing a worker pool
Browse files Browse the repository at this point in the history
Signed-off-by: Yi Jin <[email protected]>
  • Loading branch information
jnyi committed Apr 9, 2024
1 parent 6b3aa32 commit a3ab1c8
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 98 deletions.
72 changes: 72 additions & 0 deletions pkg/pool/worker_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package pool

import (
"context"
"sync"
)

// Work is a unit of item to be worked on, like Java Runnable.
type Work func()

// WorkerPool is a pool of goroutines that are reusable, similar to Java ThreadPool.
type WorkerPool interface {
// Init initializes the worker pool.
Init()

// Go waits until the next worker becomes available and executes the given work.
Go(work Work)

// Close cancels all workers and waits for them to finish.
Close()

// Size returns the number of workers in the pool.
Size() int
}

type workerPool struct {
sync.Once
workCh chan Work
cancel context.CancelFunc
}

func NewWorkerPool(workers uint) WorkerPool {
return &workerPool{
workCh: make(chan Work, workers),
}
}

func (p *workerPool) Init() {
p.Do(func() {
ctx, cancel := context.WithCancel(context.Background())
p.cancel = cancel

for i := 0; i < cap(p.workCh); i++ {
go func() {
for {
select {
case <-ctx.Done():
return
case work := <-p.workCh:
work()
}
}
}()
}
})
}

func (p *workerPool) Go(work Work) {
p.Init()
p.workCh <- work
}

func (p *workerPool) Close() {
p.cancel()
}

func (p *workerPool) Size() int {
return cap(p.workCh)
}
35 changes: 35 additions & 0 deletions pkg/pool/worker_pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package pool

import (
"sync"
"testing"

"github.com/stretchr/testify/require"
)

func TestGo(t *testing.T) {
var expectedWorksDone uint32
var workerPoolSize uint
var mu sync.Mutex
workerPoolSize = 5
p := NewWorkerPool(workerPoolSize)
p.Init()
defer p.Close()

var wg sync.WaitGroup
for i := 0; i < int(workerPoolSize*3); i++ {
wg.Add(1)
p.Go(func() {
mu.Lock()
mu.Unlock()
expectedWorksDone++
wg.Done()
})
}
wg.Wait()
require.Equal(t, uint32(workerPoolSize*3), expectedWorksDone)
require.Equal(t, int(workerPoolSize), p.Size())
}
130 changes: 32 additions & 98 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"context"
"crypto/tls"
"fmt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"io"
stdlog "log"
"math"
Expand All @@ -32,8 +34,6 @@ import (
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -43,6 +43,7 @@ import (
"github.com/thanos-io/thanos/pkg/logging"

extpromhttp "github.com/thanos-io/thanos/pkg/extprom/http"
"github.com/thanos-io/thanos/pkg/pool"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/server/http/middleware"
"github.com/thanos-io/thanos/pkg/store/labelpb"
Expand Down Expand Up @@ -1188,90 +1189,23 @@ func newReplicationErrors(threshold, numErrors int) []*replicationErrors {
return errs
}

func (pw *peerWorker) initWorkers() {
pw.initWorkersOnce.Do(func() {
work := make(chan peerWorkItem)
pw.work = work

ctx, cancel := context.WithCancel(context.Background())
pw.turnOffGoroutines = cancel

for i := 0; i < int(pw.asyncWorkerCount); i++ {
go func() {
for {
select {
case <-ctx.Done():
return
case w := <-work:
pw.forwardDelay.Observe(time.Since(w.sendTime).Seconds())

tracing.DoInSpan(w.workItemCtx, "receive_forward", func(ctx context.Context) {
_, err := storepb.NewWriteableStoreClient(pw.cc).RemoteWrite(ctx, w.req)
w.workResult <- peerWorkResponse{
er: w.er,
err: errors.Wrapf(err, "forwarding request to endpoint %v", w.er.endpoint),
}
if err != nil {
sp := trace.SpanFromContext(ctx)
sp.SetAttributes(attribute.Bool("error", true))
sp.SetAttributes(attribute.String("error.msg", err.Error()))
}
close(w.workResult)
}, opentracing.Tags{
"endpoint": w.er.endpoint,
"replica": w.er.replica,
})

}
}
}()
}

})
}

func newPeerWorker(cc *grpc.ClientConn, forwardDelay prometheus.Histogram, asyncWorkerCount uint) *peerWorker {
return &peerWorker{
cc: cc,
asyncWorkerCount: asyncWorkerCount,
forwardDelay: forwardDelay,
cc: cc,
wp: pool.NewWorkerPool(asyncWorkerCount),
forwardDelay: forwardDelay,
}
}

type peerWorkItem struct {
cc *grpc.ClientConn
req *storepb.WriteRequest
workItemCtx context.Context

workResult chan peerWorkResponse
er endpointReplica
sendTime time.Time
}

func (pw *peerWorker) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, opts ...grpc.CallOption) (*storepb.WriteResponse, error) {
pw.initWorkers()

w := peerWorkItem{
cc: pw.cc,
req: in,
workResult: make(chan peerWorkResponse, 1),
workItemCtx: ctx,
sendTime: time.Now(),
}

pw.work <- w
return nil, (<-w.workResult).err
return storepb.NewWriteableStoreClient(pw.cc).RemoteWrite(ctx, in)
}

type peerWorker struct {
cc *grpc.ClientConn
wp pool.WorkerPool

work chan peerWorkItem
turnOffGoroutines func()

initWorkersOnce sync.Once
asyncWorkerCount uint
forwardDelay prometheus.Histogram
forwardDelay prometheus.Histogram
}

func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, dialOpts ...grpc.DialOption) peersContainer {
Expand All @@ -1295,29 +1229,29 @@ type peersContainer interface {
reset()
}

type peerWorkResponse struct {
er endpointReplica
err error
}

func (p *peerWorker) RemoteWriteAsync(ctx context.Context, req *storepb.WriteRequest, er endpointReplica, seriesIDs []int, responseWriter chan writeResponse, cb func(error)) {
p.initWorkers()

w := peerWorkItem{
cc: p.cc,
req: req,
workResult: make(chan peerWorkResponse, 1),
workItemCtx: ctx,
er: er,

sendTime: time.Now(),
}

p.work <- w
res := <-w.workResult

responseWriter <- newWriteResponse(seriesIDs, res.err, er)
cb(res.err)
now := time.Now()
p.wp.Go(func() {
p.forwardDelay.Observe(time.Since(now).Seconds())

tracing.DoInSpan(ctx, "receive_forward", func(ctx context.Context) {
_, err := storepb.NewWriteableStoreClient(p.cc).RemoteWrite(ctx, req)
responseWriter <- newWriteResponse(
seriesIDs,
errors.Wrapf(err, "forwarding request to endpoint %v", er.endpoint),
er,
)
if err != nil {
sp := trace.SpanFromContext(ctx)
sp.SetAttributes(attribute.Bool("error", true))
sp.SetAttributes(attribute.String("error.msg", err.Error()))
}
cb(err)
}, opentracing.Tags{
"endpoint": er.endpoint,
"replica": er.replica,
})
})
}

type peerGroup struct {
Expand Down Expand Up @@ -1345,7 +1279,7 @@ func (p *peerGroup) close(addr string) error {
return nil
}

p.connections[addr].turnOffGoroutines()
p.connections[addr].wp.Close()
delete(p.connections, addr)
if err := c.cc.Close(); err != nil {
return fmt.Errorf("closing connection for %s", addr)
Expand Down

0 comments on commit a3ab1c8

Please sign in to comment.