From 4ea9cafcbd8dc0880c79506845cc6f8ddfeb22a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Giedrius=20Statkevi=C4=8Dius?= Date: Wed, 10 Jan 2024 14:13:41 +0200 Subject: [PATCH] receive: use async remote writing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of spawning new goroutines for each peer that we want to remote write to, spawn a fixed number of worker goroutines and then schedule work on top of them. This has reduced the number of goroutines in our case about 10x-20x and the 99p of forwarding dropped from ~30s to just a few hundred milliseconds. Signed-off-by: Giedrius Statkevičius --- cmd/thanos/receive.go | 3 + docs/components/receive.md | 10 ++ pkg/receive/handler.go | 275 +++++++++++++++++++++++++++--------- pkg/receive/handler_test.go | 16 ++- 4 files changed, 231 insertions(+), 73 deletions(-) diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index c3b1574a44f..d8859daae2d 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -261,6 +261,7 @@ func runReceive( MaxBackoff: time.Duration(*conf.maxBackoff), TSDBStats: dbs, Limiter: limiter, + AsyncWorkerCount: conf.asyncWorkerCount, }) grpcProbe := prober.NewGRPC() @@ -837,6 +838,7 @@ type receiveConfig struct { writeLimitsConfig *extflag.PathOrContent storeRateLimits store.SeriesSelectLimits limitsConfigReloadTimer time.Duration + asyncWorkerCount uint } func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -894,6 +896,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("receive.replica-header", "HTTP header specifying the replica number of a write request.").Default(receive.DefaultReplicaHeader).StringVar(&rc.replicaHeader) + cmd.Flag("receive.async-workers", "Number of concurrent workers processing incoming remote-write requests.").Default("5").UintVar(&rc.asyncWorkerCount) compressionOptions := strings.Join([]string{snappy.Name, compressionNone}, ", ") cmd.Flag("receive.grpc-compression", "Compression algorithm to use for gRPC requests to other receivers. Must be one of: "+compressionOptions).Default(snappy.Name).EnumVar(&rc.compression, snappy.Name, compressionNone) diff --git a/docs/components/receive.md b/docs/components/receive.md index a4e9c76b8f8..9a3f468886e 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -248,6 +248,14 @@ NOTE: - Thanos Receive performs best-effort limiting. In case meta-monitoring is down/unreachable, Thanos Receive will not impose limits and only log errors for meta-monitoring being unreachable. Similarly to when one receiver cannot be scraped. - Support for different limit configuration for different tenants is planned for the future. +## Asynchronous workers + +Instead of spawning a new goroutine each time the Receiver forwards a request to another node, it spawns a fixed number of goroutines (workers) that perform the work. This allows avoiding spawning potentially tens or even hundred thousand goroutines if someone starts sending a lot of small requests. + +This number of workers is controlled by `--receive.async-workers=`. + +Please see the metric `thanos_receive_forward_delay_seconds` to see if you need to increase the number of workers. + ## Flags ```$ mdox-exec="thanos receive --help" @@ -305,6 +313,8 @@ Flags: Path to YAML file that contains object store configuration. See format details: https://thanos.io/tip/thanos/storage.md/#configuration + --receive.async-workers=5 Number of concurrent workers processing + incoming remote-write requests. --receive.default-tenant-id="default-tenant" Default tenant ID to use when none is provided via a header. diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index c0c0fea21fe..0ba70f31a92 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -16,6 +16,7 @@ import ( "sort" "strconv" "sync" + "sync/atomic" "time" "github.com/go-kit/log" @@ -94,6 +95,7 @@ type Options struct { RelabelConfigs []*relabel.Config TSDBStats TSDBStats Limiter *Limiter + AsyncWorkerCount uint } // Handler serves a Prometheus remote write receiving HTTP endpoint. @@ -117,6 +119,7 @@ type Handler struct { writeSamplesTotal *prometheus.HistogramVec writeTimeseriesTotal *prometheus.HistogramVec + forwardDelay prometheus.Histogram Limiter *Limiter } @@ -131,12 +134,16 @@ func NewHandler(logger log.Logger, o *Options) *Handler { registerer = o.Registry } + workers := o.AsyncWorkerCount + if workers == 0 { + workers = 1 + } + h := &Handler{ logger: logger, writer: o.Writer, router: route.New(), options: o, - peers: newPeerGroup(o.DialOpts...), receiverMode: o.ReceiverMode, expBackoff: backoff.Backoff{ Factor: 2, @@ -151,6 +158,13 @@ func NewHandler(logger log.Logger, o *Options) *Handler { Help: "The number of forward requests.", }, []string{"result"}, ), + forwardDelay: promauto.With(registerer).NewHistogram( + prometheus.HistogramOpts{ + Name: "thanos_receive_forward_delay_seconds", + Help: "The delay between the time the request was received and the time it was forwarded to a worker. ", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 16), + }, + ), replications: promauto.With(registerer).NewCounterVec( prometheus.CounterOpts{ Name: "thanos_receive_replications_total", @@ -183,6 +197,8 @@ func NewHandler(logger log.Logger, o *Options) *Handler { ), } + h.peers = newPeerGroup(workers, h.forwardDelay, o.DialOpts...) + h.forwardRequests.WithLabelValues(labelSuccess) h.forwardRequests.WithLabelValues(labelError) h.replications.WithLabelValues(labelSuccess) @@ -425,12 +441,14 @@ type trackedSeries struct { type writeResponse struct { seriesIDs []int err error + er endpointReplica } -func newWriteResponse(seriesIDs []int, err error) writeResponse { +func newWriteResponse(seriesIDs []int, err error, er endpointReplica) writeResponse { return writeResponse{ seriesIDs: seriesIDs, err: err, + er: er, } } @@ -679,7 +697,10 @@ func quorumReached(successes []int, successThreshold int) bool { // fanoutForward fans out concurrently given set of write requests. It returns status immediately when quorum of // requests succeeds or fails or if context is canceled. func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]trackedSeries, numSeries int, seriesReplicated bool) error { - var errs writeErrors + var ( + errs writeErrors + waitingFor atomic.Int32 + ) fctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), pctx), h.options.ForwardTimeout) defer func() { @@ -706,17 +727,7 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e // This causes a write to the labels field. When fanning out this request to other Receivers, the code calls // Size() which reads those same fields. We would like to avoid adding locks around each string // hence we need to write locally first. - var maxBufferedResponses = 0 - for writeTarget := range wreqs { - if writeTarget.endpoint != h.options.Endpoint { - continue - } - maxBufferedResponses++ - } - - responses := make(chan writeResponse, maxBufferedResponses) - - var wg sync.WaitGroup + responses := make(chan writeResponse, len(wreqs)) for writeTarget := range wreqs { if writeTarget.endpoint != h.options.Endpoint { @@ -737,24 +748,40 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e }) if err != nil { level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error()) - responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(err, "store locally for endpoint %v", writeTarget.endpoint)) + responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(err, "store locally for endpoint %v", writeTarget.endpoint), writeTarget) continue } - responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, nil) + responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, nil, writeTarget) + } + + reduceWaitingFor := func() { + n := waitingFor.Add(-1) + if n == 0 { + close(responses) + } + } + + for writeTarget := range wreqs { + if writeTarget.endpoint == h.options.Endpoint { + continue + } + waitingFor.Add(1) + } + + if waitingFor.Load() == 0 { + close(responses) } + for writeTarget := range wreqs { if writeTarget.endpoint == h.options.Endpoint { continue } - wg.Add(1) - // Make a request to the specified endpoint. - go func(writeTarget endpointReplica) { - defer wg.Done() + func() { var ( err error - cl storepb.WriteableStoreClient + cl WriteableStoreAsyncClient ) defer func() { // This is an actual remote forward request so report metric here. @@ -773,7 +800,8 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e cl, err = h.peers.get(fctx, writeTarget.endpoint) if err != nil { - responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(err, "get peer connection for endpoint %v", writeTarget.endpoint)) + responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(err, "get peer connection for endpoint %v", writeTarget.endpoint), writeTarget) + reduceWaitingFor() return } @@ -782,55 +810,27 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e if ok { if time.Now().Before(b.nextAllowed) { h.mtx.RUnlock() - responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", writeTarget.endpoint)) + responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", writeTarget.endpoint), writeTarget) + reduceWaitingFor() return } } h.mtx.RUnlock() - // Create a span to track the request made to another receive node. - tracing.DoInSpan(fctx, "receive_forward", func(ctx context.Context) { + tracing.DoInSpan(fctx, "receive_async_write", func(ctx context.Context) { // Actually make the request against the endpoint we determined should handle these time series. - _, err = cl.RemoteWrite(ctx, &storepb.WriteRequest{ + cl.RemoteWriteAsync(ctx, &storepb.WriteRequest{ Timeseries: wreqs[writeTarget].timeSeries, Tenant: tenant, // Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated. Replica: int64(writeTarget.replica + 1), + }, writeTarget, wreqs[writeTarget].seriesIDs, responses, func() { + reduceWaitingFor() }) }) - if err != nil { - // Check if peer connection is unavailable, don't attempt to send requests constantly. - if st, ok := status.FromError(err); ok { - if st.Code() == codes.Unavailable { - h.mtx.Lock() - if b, ok := h.peerStates[writeTarget.endpoint]; ok { - b.attempt++ - dur := h.expBackoff.ForAttempt(b.attempt) - b.nextAllowed = time.Now().Add(dur) - level.Debug(tLogger).Log("msg", "target unavailable backing off", "for", dur) - } else { - h.peerStates[writeTarget.endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))} - } - h.mtx.Unlock() - } - } - werr := errors.Wrapf(err, "forwarding request to endpoint %v", writeTarget.endpoint) - responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, werr) - return - } - h.mtx.Lock() - delete(h.peerStates, writeTarget.endpoint) - h.mtx.Unlock() - - responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, nil) - }(writeTarget) + }() } - go func() { - wg.Wait() - close(responses) - }() - // At the end, make sure to exhaust the channel, letting remaining unnecessary requests finish asynchronously. // This is needed if context is canceled or if we reached success of fail quorum faster. defer func() { @@ -865,8 +865,29 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e for _, tsID := range wresp.seriesIDs { seriesErrs[tsID].Add(wresp.err) } + + // Check if peer connection is unavailable, don't attempt to send requests constantly. + if st, ok := status.FromError(wresp.err); ok { + if st.Code() == codes.Unavailable { + h.mtx.Lock() + if b, ok := h.peerStates[wresp.er.endpoint]; ok { + b.attempt++ + dur := h.expBackoff.ForAttempt(b.attempt) + b.nextAllowed = time.Now().Add(dur) + level.Debug(tLogger).Log("msg", "target unavailable backing off", "for", dur) + } else { + h.peerStates[wresp.er.endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))} + } + h.mtx.Unlock() + } + } continue } + + h.mtx.Lock() + delete(h.peerStates, wresp.er.endpoint) + h.mtx.Unlock() + for _, tsID := range wresp.seriesIDs { successes[tsID]++ } @@ -1161,27 +1182,37 @@ func newReplicationErrors(threshold, numErrors int) []*replicationErrors { return errs } -func newPeerGroup(dialOpts ...grpc.DialOption) peersContainer { +func newPeerGroup(asyncWorkerCount uint, forwardDelay prometheus.Histogram, dialOpts ...grpc.DialOption) peersContainer { return &peerGroup{ - dialOpts: dialOpts, - cache: map[string]*grpc.ClientConn{}, - m: sync.RWMutex{}, - dialer: grpc.DialContext, + dialOpts: dialOpts, + cache: map[string]*peerWorker{}, + m: sync.RWMutex{}, + dialer: grpc.DialContext, + asyncWorkerCount: asyncWorkerCount, + forwardDelay: forwardDelay, } } +type WriteableStoreAsyncClient interface { + storepb.WriteableStoreClient + RemoteWriteAsync(context.Context, *storepb.WriteRequest, endpointReplica, []int, chan writeResponse, func()) +} + type peersContainer interface { close(string) error - get(context.Context, string) (storepb.WriteableStoreClient, error) + get(context.Context, string) (WriteableStoreAsyncClient, error) } type peerGroup struct { dialOpts []grpc.DialOption - cache map[string]*grpc.ClientConn + cache map[string]*peerWorker m sync.RWMutex // dialer is used for testing. dialer func(ctx context.Context, target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) + + asyncWorkerCount uint + forwardDelay prometheus.Histogram } func (p *peerGroup) close(addr string) error { @@ -1195,21 +1226,47 @@ func (p *peerGroup) close(addr string) error { return nil } + p.cache[addr].turnOffGoroutines() delete(p.cache, addr) - if err := c.Close(); err != nil { + if err := c.cc.Close(); err != nil { return fmt.Errorf("closing connection for %s", addr) } return nil } -func (p *peerGroup) get(ctx context.Context, addr string) (storepb.WriteableStoreClient, error) { +type peerWorkResponse struct { + er endpointReplica + err error +} + +func (p *peerWorker) RemoteWriteAsync(ctx context.Context, req *storepb.WriteRequest, er endpointReplica, seriesIDs []int, responseWriter chan writeResponse, cb func()) { + p.initWorkers() + + w := peerWorkItem{ + cc: p.cc, + req: req, + workResult: make(chan peerWorkResponse, 1), + ctx: ctx, + er: er, + + sendTime: time.Now(), + } + + p.work <- w + res := <-w.workResult + + responseWriter <- newWriteResponse(seriesIDs, res.err, er) + cb() +} + +func (p *peerGroup) get(ctx context.Context, addr string) (WriteableStoreAsyncClient, error) { // use a RLock first to prevent blocking if we don't need to. p.m.RLock() c, ok := p.cache[addr] p.m.RUnlock() if ok { - return storepb.NewWriteableStoreClient(c), nil + return c, nil } p.m.Lock() @@ -1217,13 +1274,91 @@ func (p *peerGroup) get(ctx context.Context, addr string) (storepb.WriteableStor // Make sure that another caller hasn't created the connection since obtaining the write lock. c, ok = p.cache[addr] if ok { - return storepb.NewWriteableStoreClient(c), nil + return c, nil } conn, err := p.dialer(ctx, addr, p.dialOpts...) if err != nil { return nil, errors.Wrap(err, "failed to dial peer") } - p.cache[addr] = conn - return storepb.NewWriteableStoreClient(conn), nil + p.cache[addr] = newPeerWorker(conn, p.forwardDelay, p.asyncWorkerCount) + return p.cache[addr], nil +} + +type peerWorker struct { + cc *grpc.ClientConn + + work chan peerWorkItem + turnOffGoroutines func() + + initWorkersOnce sync.Once + asyncWorkerCount uint + forwardDelay prometheus.Histogram +} + +func (pw *peerWorker) initWorkers() { + pw.initWorkersOnce.Do(func() { + work := make(chan peerWorkItem) + pw.work = work + + ctx, cancel := context.WithCancel(context.Background()) + pw.turnOffGoroutines = cancel + + for i := 0; i < int(pw.asyncWorkerCount); i++ { + go func() { + for { + select { + case <-ctx.Done(): + return + case w := <-work: + pw.forwardDelay.Observe(time.Since(w.sendTime).Seconds()) + + tracing.DoInSpan(w.ctx, "receive_forward", func(ctx context.Context) { + _, err := storepb.NewWriteableStoreClient(pw.cc).RemoteWrite(ctx, w.req) + w.workResult <- peerWorkResponse{ + er: w.er, + err: errors.Wrapf(err, "forwarding request to endpoint %v", w.er.endpoint), + } + close(w.workResult) + }) + + } + } + }() + } + + }) +} + +func newPeerWorker(cc *grpc.ClientConn, forwardDelay prometheus.Histogram, asyncWorkerCount uint) *peerWorker { + return &peerWorker{ + cc: cc, + asyncWorkerCount: asyncWorkerCount, + forwardDelay: forwardDelay, + } +} + +type peerWorkItem struct { + cc *grpc.ClientConn + req *storepb.WriteRequest + ctx context.Context + + workResult chan peerWorkResponse + er endpointReplica + sendTime time.Time +} + +func (pw *peerWorker) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, opts ...grpc.CallOption) (*storepb.WriteResponse, error) { + pw.initWorkers() + + w := peerWorkItem{ + cc: pw.cc, + req: in, + workResult: make(chan peerWorkResponse, 1), + ctx: ctx, + sendTime: time.Now(), + } + + pw.work <- w + return nil, (<-w.workResult).err } diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index de511dc8b69..a919e300bca 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -167,7 +167,7 @@ func (f *fakeAppender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels } type fakePeersGroup struct { - clients map[string]storepb.WriteableStoreClient + clients map[string]WriteableStoreAsyncClient closeCalled map[string]bool } @@ -180,7 +180,7 @@ func (g *fakePeersGroup) close(addr string) error { return nil } -func (g *fakePeersGroup) get(_ context.Context, addr string) (storepb.WriteableStoreClient, error) { +func (g *fakePeersGroup) get(_ context.Context, addr string) (WriteableStoreAsyncClient, error) { c, ok := g.clients[addr] if !ok { return nil, fmt.Errorf("client %s not found", addr) @@ -197,7 +197,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin wOpts = &WriterOptions{} ) fakePeers := &fakePeersGroup{ - clients: map[string]storepb.WriteableStoreClient{}, + clients: map[string]WriteableStoreAsyncClient{}, } ag := addrGen{} @@ -855,6 +855,16 @@ func (f *fakeRemoteWriteGRPCServer) RemoteWrite(ctx context.Context, in *storepb return f.h.RemoteWrite(ctx, in) } +func (f *fakeRemoteWriteGRPCServer) RemoteWriteAsync(ctx context.Context, in *storepb.WriteRequest, er endpointReplica, seriesIDs []int, responses chan writeResponse, cb func()) { + _, err := f.h.RemoteWrite(ctx, in) + responses <- writeResponse{ + er: er, + err: err, + seriesIDs: seriesIDs, + } + cb() +} + func BenchmarkHandlerReceiveHTTP(b *testing.B) { benchmarkHandlerMultiTSDBReceiveRemoteWrite(testutil.NewTB(b)) }