diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 93f4425b84..17066d131f 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -831,6 +831,8 @@ type receiveConfig struct { writeLimitsConfig *extflag.PathOrContent storeRateLimits store.SeriesSelectLimits limitsConfigReloadTimer time.Duration + + asyncForwardWorkerCount uint } func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -888,6 +890,7 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("receive.replica-header", "HTTP header specifying the replica number of a write request.").Default(receive.DefaultReplicaHeader).StringVar(&rc.replicaHeader) + cmd.Flag("receive.forward.async-workers", "Number of concurrent workers processing forwarding of remote-write requests.").Default("5").UintVar(&rc.asyncForwardWorkerCount) compressionOptions := strings.Join([]string{snappy.Name, compressionNone}, ", ") cmd.Flag("receive.grpc-compression", "Compression algorithm to use for gRPC requests to other receivers. Must be one of: "+compressionOptions).Default(snappy.Name).EnumVar(&rc.compression, snappy.Name, compressionNone) diff --git a/docs/components/receive.md b/docs/components/receive.md index a4e9c76b8f..c6f6b3a7ed 100644 --- a/docs/components/receive.md +++ b/docs/components/receive.md @@ -248,6 +248,14 @@ NOTE: - Thanos Receive performs best-effort limiting. In case meta-monitoring is down/unreachable, Thanos Receive will not impose limits and only log errors for meta-monitoring being unreachable. Similarly to when one receiver cannot be scraped. - Support for different limit configuration for different tenants is planned for the future. +## Asynchronous workers + +Instead of spawning a new goroutine each time the Receiver forwards a request to another node, it spawns a fixed number of goroutines (workers) that perform the work. This allows avoiding spawning potentially tens or even hundred thousand goroutines if someone starts sending a lot of small requests. + +This number of workers is controlled by `--receive.forward.async-workers=`. + +Please see the metric `thanos_receive_forward_delay_seconds` to see if you need to increase the number of forwarding workers. + ## Flags ```$ mdox-exec="thanos receive --help" @@ -308,6 +316,9 @@ Flags: --receive.default-tenant-id="default-tenant" Default tenant ID to use when none is provided via a header. + --receive.forward.async-workers=5 + Number of concurrent workers processing + forwarding of remote-write requests. --receive.grpc-compression=snappy Compression algorithm to use for gRPC requests to other receivers. Must be one of: snappy, diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index ba4b9b94ce..fc20a97ebf 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -32,6 +32,8 @@ import ( "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -74,26 +76,32 @@ var ( errInternal = errors.New("internal error") ) +type WriteableStoreAsyncClient interface { + storepb.WriteableStoreClient + RemoteWriteAsync(context.Context, *storepb.WriteRequest, endpointReplica, []int, chan writeResponse, func(error)) +} + // Options for the web Handler. type Options struct { - Writer *Writer - ListenAddress string - Registry *prometheus.Registry - TenantHeader string - TenantField string - DefaultTenantID string - ReplicaHeader string - Endpoint string - ReplicationFactor uint64 - ReceiverMode ReceiverMode - Tracer opentracing.Tracer - TLSConfig *tls.Config - DialOpts []grpc.DialOption - ForwardTimeout time.Duration - MaxBackoff time.Duration - RelabelConfigs []*relabel.Config - TSDBStats TSDBStats - Limiter *Limiter + Writer *Writer + ListenAddress string + Registry *prometheus.Registry + TenantHeader string + TenantField string + DefaultTenantID string + ReplicaHeader string + Endpoint string + ReplicationFactor uint64 + ReceiverMode ReceiverMode + Tracer opentracing.Tracer + TLSConfig *tls.Config + DialOpts []grpc.DialOption + ForwardTimeout time.Duration + MaxBackoff time.Duration + RelabelConfigs []*relabel.Config + TSDBStats TSDBStats + Limiter *Limiter + AsyncForwardWorkerCount uint } // Handler serves a Prometheus remote write receiving HTTP endpoint. @@ -129,6 +137,11 @@ func NewHandler(logger log.Logger, o *Options) *Handler { registerer = o.Registry } + workers := o.AsyncForwardWorkerCount + if workers == 0 { + workers = 1 + } + h := &Handler{ logger: logger, writer: o.Writer, @@ -141,6 +154,14 @@ func NewHandler(logger log.Logger, o *Options) *Handler { Max: o.MaxBackoff, Jitter: true, }, + promauto.With(registerer).NewHistogram( + prometheus.HistogramOpts{ + Name: "thanos_receive_forward_delay_seconds", + Help: "The delay between the time the request was received and the time it was forwarded to a worker. ", + Buckets: prometheus.ExponentialBuckets(0.001, 2, 16), + }, + ), + workers, o.DialOpts...), receiverMode: o.ReceiverMode, Limiter: o.Limiter, @@ -423,12 +444,14 @@ type trackedSeries struct { type writeResponse struct { seriesIDs []int err error + er endpointReplica } -func newWriteResponse(seriesIDs []int, err error) writeResponse { +func newWriteResponse(seriesIDs []int, err error, er endpointReplica) writeResponse { return writeResponse{ seriesIDs: seriesIDs, err: err, + er: er, } } @@ -682,6 +705,7 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) e maxBufferedResponses := len(localWrites) + len(remoteWrites) responses := make(chan writeResponse, maxBufferedResponses) wg := sync.WaitGroup{} + wg.Add(len(remoteWrites)) h.sendWrites(ctx, &wg, params, localWrites, remoteWrites, responses) @@ -725,6 +749,7 @@ func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) e for _, seriesID := range resp.seriesIDs { seriesErrs[seriesID].Add(resp.err) } + continue } // At the end, aggregate all errors if there are any and return them. @@ -785,7 +810,7 @@ func (h *Handler) sendWrites( params remoteWriteParams, localWrites map[endpointReplica]trackedSeries, remoteWrites map[endpointReplica]trackedSeries, - responses chan<- writeResponse, + responses chan writeResponse, ) { // Do the writes to the local node first. This should be easy and fast. for writeDestination := range localWrites { @@ -796,11 +821,7 @@ func (h *Handler) sendWrites( // Do the writes to remote nodes. Run them all in parallel. for writeDestination := range remoteWrites { - wg.Add(1) - go func(writeDestination endpointReplica) { - defer wg.Done() - h.sendRemoteWrite(ctx, params.tenant, writeDestination, remoteWrites[writeDestination], params.alreadyReplicated, responses) - }(writeDestination) + h.sendRemoteWrite(ctx, params.tenant, writeDestination, remoteWrites[writeDestination], params.alreadyReplicated, responses, wg) } } @@ -823,10 +844,10 @@ func (h *Handler) sendLocalWrite( if err != nil { span.SetTag("error", true) span.SetTag("error.msg", err.Error()) - responses <- newWriteResponse(trackedSeries.seriesIDs, err) + responses <- newWriteResponse(trackedSeries.seriesIDs, err, writeDestination) return } - responses <- newWriteResponse(trackedSeries.seriesIDs, nil) + responses <- newWriteResponse(trackedSeries.seriesIDs, nil, writeDestination) } // sendRemoteWrite sends a write request to the remote node. It takes care of checking wether the endpoint is up or not @@ -838,7 +859,8 @@ func (h *Handler) sendRemoteWrite( endpointReplica endpointReplica, trackedSeries trackedSeries, alreadyReplicated bool, - responses chan<- writeResponse, + responses chan writeResponse, + wg *sync.WaitGroup, ) { endpoint := endpointReplica.endpoint cl, err := h.peers.getConnection(ctx, endpoint) @@ -846,45 +868,36 @@ func (h *Handler) sendRemoteWrite( if errors.Is(err, errUnavailable) { err = errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpointReplica) } - responses <- newWriteResponse(trackedSeries.seriesIDs, err) + responses <- newWriteResponse(trackedSeries.seriesIDs, err, endpointReplica) + wg.Done() return } - span, spanCtx := tracing.StartSpan(ctx, "receive_forward") // This is called "real" because it's 1-indexed. realReplicationIndex := int64(endpointReplica.replica + 1) - span.SetTag("endpoint", endpointReplica.endpoint) - span.SetTag("replica", realReplicationIndex) // Actually make the request against the endpoint we determined should handle these time series. - _, err = cl.RemoteWrite(spanCtx, &storepb.WriteRequest{ + cl.RemoteWriteAsync(ctx, &storepb.WriteRequest{ Timeseries: trackedSeries.timeSeries, Tenant: tenant, // Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated. Replica: realReplicationIndex, - }) - if err != nil { - span.SetTag("error", true) - span.SetTag("error.msg", err.Error()) - // Check if peer connection is unavailable, update the peer state to avoid spamming that peer. - if st, ok := status.FromError(err); ok { - if st.Code() == codes.Unavailable { - h.peers.markPeerUnavailable(endpoint) + }, endpointReplica, trackedSeries.seriesIDs, responses, func(err error) { + if err == nil { + h.forwardRequests.WithLabelValues(labelSuccess).Inc() + if !alreadyReplicated { + h.replications.WithLabelValues(labelSuccess).Inc() + } + h.peers.markPeerAvailable(endpoint) + } else { + // Check if peer connection is unavailable, update the peer state to avoid spamming that peer. + if st, ok := status.FromError(err); ok { + if st.Code() == codes.Unavailable { + h.peers.markPeerUnavailable(endpointReplica.endpoint) + } } } - h.forwardRequests.WithLabelValues(labelError).Inc() - if !alreadyReplicated { - h.replications.WithLabelValues(labelError).Inc() - } - responses <- newWriteResponse(trackedSeries.seriesIDs, err) - return - } - span.Finish() - h.forwardRequests.WithLabelValues(labelSuccess).Inc() - if !alreadyReplicated { - h.replications.WithLabelValues(labelSuccess).Inc() - } - responses <- newWriteResponse(trackedSeries.seriesIDs, nil) - h.peers.markPeerAvailable(endpoint) + wg.Done() + }) } // writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success. @@ -1186,30 +1199,145 @@ func newReplicationErrors(threshold, numErrors int) []*replicationErrors { return errs } -func newPeerGroup(backoff backoff.Backoff, dialOpts ...grpc.DialOption) peersContainer { +func (pw *peerWorker) initWorkers() { + pw.initWorkersOnce.Do(func() { + work := make(chan peerWorkItem) + pw.work = work + + ctx, cancel := context.WithCancel(context.Background()) + pw.turnOffGoroutines = cancel + + for i := 0; i < int(pw.asyncWorkerCount); i++ { + go func() { + for { + select { + case <-ctx.Done(): + return + case w := <-work: + pw.forwardDelay.Observe(time.Since(w.sendTime).Seconds()) + + tracing.DoInSpan(w.workItemCtx, "receive_forward", func(ctx context.Context) { + _, err := storepb.NewWriteableStoreClient(pw.cc).RemoteWrite(ctx, w.req) + w.workResult <- peerWorkResponse{ + er: w.er, + err: errors.Wrapf(err, "forwarding request to endpoint %v", w.er.endpoint), + } + if err != nil { + sp := trace.SpanFromContext(ctx) + sp.SetAttributes(attribute.Bool("error", true)) + sp.SetAttributes(attribute.String("error.msg", err.Error())) + } + close(w.workResult) + }, opentracing.Tags{ + "endpoint": w.er.endpoint, + "replica": w.er.replica, + }) + + } + } + }() + } + + }) +} + +func newPeerWorker(cc *grpc.ClientConn, forwardDelay prometheus.Histogram, asyncWorkerCount uint) *peerWorker { + return &peerWorker{ + cc: cc, + asyncWorkerCount: asyncWorkerCount, + forwardDelay: forwardDelay, + } +} + +type peerWorkItem struct { + cc *grpc.ClientConn + req *storepb.WriteRequest + workItemCtx context.Context + + workResult chan peerWorkResponse + er endpointReplica + sendTime time.Time +} + +func (pw *peerWorker) RemoteWrite(ctx context.Context, in *storepb.WriteRequest, opts ...grpc.CallOption) (*storepb.WriteResponse, error) { + pw.initWorkers() + + w := peerWorkItem{ + cc: pw.cc, + req: in, + workResult: make(chan peerWorkResponse, 1), + workItemCtx: ctx, + sendTime: time.Now(), + } + + pw.work <- w + return nil, (<-w.workResult).err +} + +type peerWorker struct { + cc *grpc.ClientConn + + work chan peerWorkItem + turnOffGoroutines func() + + initWorkersOnce sync.Once + asyncWorkerCount uint + forwardDelay prometheus.Histogram +} + +func newPeerGroup(backoff backoff.Backoff, forwardDelay prometheus.Histogram, asyncForwardWorkersCount uint, dialOpts ...grpc.DialOption) peersContainer { return &peerGroup{ - dialOpts: dialOpts, - connections: map[string]*grpc.ClientConn{}, - m: sync.RWMutex{}, - dialer: grpc.DialContext, - peerStates: make(map[string]*retryState), - expBackoff: backoff, + dialOpts: dialOpts, + connections: map[string]*peerWorker{}, + m: sync.RWMutex{}, + dialer: grpc.DialContext, + peerStates: make(map[string]*retryState), + expBackoff: backoff, + forwardDelay: forwardDelay, + asyncForwardWorkersCount: asyncForwardWorkersCount, } } type peersContainer interface { close(string) error - getConnection(context.Context, string) (storepb.WriteableStoreClient, error) + getConnection(context.Context, string) (WriteableStoreAsyncClient, error) markPeerUnavailable(string) markPeerAvailable(string) reset() } +type peerWorkResponse struct { + er endpointReplica + err error +} + +func (p *peerWorker) RemoteWriteAsync(ctx context.Context, req *storepb.WriteRequest, er endpointReplica, seriesIDs []int, responseWriter chan writeResponse, cb func(error)) { + p.initWorkers() + + w := peerWorkItem{ + cc: p.cc, + req: req, + workResult: make(chan peerWorkResponse, 1), + workItemCtx: ctx, + er: er, + + sendTime: time.Now(), + } + + p.work <- w + res := <-w.workResult + + responseWriter <- newWriteResponse(seriesIDs, res.err, er) + cb(res.err) +} + type peerGroup struct { - dialOpts []grpc.DialOption - connections map[string]*grpc.ClientConn - peerStates map[string]*retryState - expBackoff backoff.Backoff + dialOpts []grpc.DialOption + connections map[string]*peerWorker + peerStates map[string]*retryState + expBackoff backoff.Backoff + forwardDelay prometheus.Histogram + asyncForwardWorkersCount uint m sync.RWMutex @@ -1228,15 +1356,16 @@ func (p *peerGroup) close(addr string) error { return nil } + p.connections[addr].turnOffGoroutines() delete(p.connections, addr) - if err := c.Close(); err != nil { + if err := c.cc.Close(); err != nil { return fmt.Errorf("closing connection for %s", addr) } return nil } -func (p *peerGroup) getConnection(ctx context.Context, addr string) (storepb.WriteableStoreClient, error) { +func (p *peerGroup) getConnection(ctx context.Context, addr string) (WriteableStoreAsyncClient, error) { if !p.isPeerUp(addr) { return nil, errUnavailable } @@ -1246,7 +1375,7 @@ func (p *peerGroup) getConnection(ctx context.Context, addr string) (storepb.Wri c, ok := p.connections[addr] p.m.RUnlock() if ok { - return storepb.NewWriteableStoreClient(c), nil + return c, nil } p.m.Lock() @@ -1254,7 +1383,7 @@ func (p *peerGroup) getConnection(ctx context.Context, addr string) (storepb.Wri // Make sure that another caller hasn't created the connection since obtaining the write lock. c, ok = p.connections[addr] if ok { - return storepb.NewWriteableStoreClient(c), nil + return c, nil } conn, err := p.dialer(ctx, addr, p.dialOpts...) if err != nil { @@ -1263,8 +1392,8 @@ func (p *peerGroup) getConnection(ctx context.Context, addr string) (storepb.Wri return nil, errors.Wrap(dialError, errUnavailable.Error()) } - p.connections[addr] = conn - return storepb.NewWriteableStoreClient(conn), nil + p.connections[addr] = newPeerWorker(conn, p.forwardDelay, p.asyncForwardWorkersCount) + return p.connections[addr], nil } func (p *peerGroup) markPeerUnavailable(addr string) { diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index a693843aff..7771776e0d 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -168,7 +168,7 @@ func (f *fakeAppender) AppendCTZeroSample(ref storage.SeriesRef, l labels.Labels } type fakePeersGroup struct { - clients map[string]storepb.WriteableStoreClient + clients map[string]WriteableStoreAsyncClient closeCalled map[string]bool } @@ -190,7 +190,7 @@ func (g *fakePeersGroup) close(addr string) error { return nil } -func (g *fakePeersGroup) getConnection(_ context.Context, addr string) (storepb.WriteableStoreClient, error) { +func (g *fakePeersGroup) getConnection(_ context.Context, addr string) (WriteableStoreAsyncClient, error) { c, ok := g.clients[addr] if !ok { return nil, fmt.Errorf("client %s not found", addr) @@ -207,7 +207,7 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin wOpts = &WriterOptions{} ) fakePeers := &fakePeersGroup{ - clients: map[string]storepb.WriteableStoreClient{}, + clients: map[string]WriteableStoreAsyncClient{}, } ag := addrGen{} @@ -882,6 +882,16 @@ func (f *fakeRemoteWriteGRPCServer) RemoteWrite(ctx context.Context, in *storepb return f.h.RemoteWrite(ctx, in) } +func (f *fakeRemoteWriteGRPCServer) RemoteWriteAsync(ctx context.Context, in *storepb.WriteRequest, er endpointReplica, seriesIDs []int, responses chan writeResponse, cb func(error)) { + _, err := f.h.RemoteWrite(ctx, in) + responses <- writeResponse{ + er: er, + err: err, + seriesIDs: seriesIDs, + } + cb(err) +} + func BenchmarkHandlerReceiveHTTP(b *testing.B) { benchmarkHandlerMultiTSDBReceiveRemoteWrite(testutil.NewTB(b)) }