From 921f2675a2c456352856fd2365e7cdc50bd068e5 Mon Sep 17 00:00:00 2001 From: Douglas Camata <159076+douglascamata@users.noreply.github.com> Date: Wed, 17 Jan 2024 14:31:47 +0100 Subject: [PATCH] Receive: refactor handler for improved readability and organization (#6898) * [wip] First checkpoint Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * [wip] Second checkpoint All tests passing, unit and e2e. Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Small random refactors Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Add some useful trace tags Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Concurrent and traced local writes Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Improve variable names in remote writes Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Rename `newFanoutForward` function Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * More refactors Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Fix linting issue Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Add a quorum test with sloppy quorum Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * [wip] Try to make retries work Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * [wip] Checkpoint: wait group still hanging Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Some refactors Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Add some commented code so I don't lose it Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Adapt tests Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Remove sloppy quorum code Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Move some code around Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Remove even more leftover of sloppy quorum Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Extract a type to hold function params Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Remove unused struct field Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Remove useless variable Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Remove type that wasn't used enough Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Delete function to tighten up max buffered responses Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Add comments to some functions Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Fix peer up check Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Fix size of replication tracking slices Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Rename context Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Don't do local writes concurrently Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Remove extra error logging Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Fix syntax after merge Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Add missing methods to peersContainer Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Fix handler test Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Reset peers state on hashring changes Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Handle PR comment regarding waitgroup Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Set span tags to help debug Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Fix concurrency issue We close the request as soon as quorum is reached and leave a few Go routines running to finish replication and so cleanups. This means that the context from the HTTP request is cancelled... which ends up also cancelling the pending replication requests. Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Fix request ID middleware Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Fix `distributeTimeseriesToReplicas` comment Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Extract var with 1-indexed replication index Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Rename methods in peersContainer interface Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Make peerGroup `getConnection` check if peers are up Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Remove yet one more not useful log Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> * Remove logger from `h.sendWrites` Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> --------- Signed-off-by: Douglas Camata <159076+douglascamata@users.noreply.github.com> Signed-off-by: hanyuting8 --- pkg/receive/handler.go | 581 +++++++++++++---------- pkg/receive/handler_test.go | 45 +- pkg/server/http/middleware/request_id.go | 4 +- 3 files changed, 365 insertions(+), 265 deletions(-) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index c0c0fea21f..ba4b9b94ce 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -107,8 +107,6 @@ type Handler struct { mtx sync.RWMutex hashring Hashring peers peersContainer - expBackoff backoff.Backoff - peerStates map[string]*retryState receiverMode ReceiverMode forwardRequests *prometheus.CounterVec @@ -132,19 +130,20 @@ func NewHandler(logger log.Logger, o *Options) *Handler { } h := &Handler{ - logger: logger, - writer: o.Writer, - router: route.New(), - options: o, - peers: newPeerGroup(o.DialOpts...), + logger: logger, + writer: o.Writer, + router: route.New(), + options: o, + peers: newPeerGroup( + backoff.Backoff{ + Factor: 2, + Min: 100 * time.Millisecond, + Max: o.MaxBackoff, + Jitter: true, + }, + o.DialOpts...), receiverMode: o.ReceiverMode, - expBackoff: backoff.Backoff{ - Factor: 2, - Min: 100 * time.Millisecond, - Max: o.MaxBackoff, - Jitter: true, - }, - Limiter: o.Limiter, + Limiter: o.Limiter, forwardRequests: promauto.With(registerer).NewCounterVec( prometheus.CounterOpts{ Name: "thanos_receive_forward_requests_total", @@ -266,8 +265,7 @@ func (h *Handler) Hashring(hashring Hashring) { } h.hashring = hashring - h.expBackoff.Reset() - h.peerStates = make(map[string]*retryState) + h.peers.reset() } // getSortedStringSliceDiff returns items which are in slice1 but not in slice2. @@ -434,41 +432,10 @@ func newWriteResponse(seriesIDs []int, err error) writeResponse { } } -func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, wreq *prompb.WriteRequest) error { - tLogger := log.With(h.logger, "tenant", tenant) - - // This replica value is used to detect cycles in cyclic topologies. - // A non-zero value indicates that the request has already been replicated by a previous receive instance. - // For almost all users, this is only used in fully connected topologies of IngestorRouter instances. - // For acyclic topologies that use RouterOnly and IngestorOnly instances, this causes issues when replicating data. - // See discussion in: https://github.com/thanos-io/thanos/issues/4359. - if h.receiverMode == RouterOnly || h.receiverMode == IngestorOnly { - rep = 0 - } - - // The replica value in the header is one-indexed, thus we need >. - if rep > h.options.ReplicationFactor { - level.Error(tLogger).Log("err", errBadReplica, "msg", "write request rejected", - "request_replica", rep, "replication_factor", h.options.ReplicationFactor) - return errBadReplica - } - - r := replica{n: rep, replicated: rep != 0} - - // On the wire, format is 1-indexed and in-code is 0-indexed, so we decrement the value if it was already replicated. - if r.replicated { - r.n-- - } - - // Forward any time series as necessary. All time series - // destined for the local node will be written to the receiver. - // Time series will be replicated as necessary. - return h.forward(ctx, tenant, r, wreq) -} - func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { var err error span, ctx := tracing.StartSpan(r.Context(), "receive_http") + span.SetTag("receiver.mode", string(h.receiverMode)) defer span.Finish() tenant, err := tenancy.GetTenantFromHTTP(r, h.options.TenantHeader, h.options.DefaultTenantID, h.options.TenantField) @@ -479,6 +446,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { } tLogger := log.With(h.logger, "tenant", tenant) + span.SetTag("tenant", tenant) writeGate := h.Limiter.WriteGate() tracing.DoInSpan(r.Context(), "receive_write_gate_ismyturn", func(ctx context.Context) { @@ -585,8 +553,8 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { } responseStatusCode := http.StatusOK - if err = h.handleRequest(ctx, rep, tenant, &wreq); err != nil { - level.Debug(tLogger).Log("msg", "failed to handle request", "err", err) + if err := h.handleRequest(ctx, rep, tenant, &wreq); err != nil { + level.Debug(tLogger).Log("msg", "failed to handle request", "err", err.Error()) switch errors.Cause(err) { case errNotReady: responseStatusCode = http.StatusServiceUnavailable @@ -606,6 +574,38 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { h.writeSamplesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(totalSamples)) } +func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, wreq *prompb.WriteRequest) error { + tLogger := log.With(h.logger, "tenant", tenant) + + // This replica value is used to detect cycles in cyclic topologies. + // A non-zero value indicates that the request has already been replicated by a previous receive instance. + // For almost all users, this is only used in fully connected topologies of IngestorRouter instances. + // For acyclic topologies that use RouterOnly and IngestorOnly instances, this causes issues when replicating data. + // See discussion in: https://github.com/thanos-io/thanos/issues/4359. + if h.receiverMode == RouterOnly || h.receiverMode == IngestorOnly { + rep = 0 + } + + // The replica value in the header is one-indexed, thus we need >. + if rep > h.options.ReplicationFactor { + level.Error(tLogger).Log("err", errBadReplica, "msg", "write request rejected", + "request_replica", rep, "replication_factor", h.options.ReplicationFactor) + return errBadReplica + } + + r := replica{n: rep, replicated: rep != 0} + + // On the wire, format is 1-indexed and in-code is 0-indexed, so we decrement the value if it was already replicated. + if r.replicated { + r.n-- + } + + // Forward any time series as necessary. All time series + // destined for the local node will be written to the receiver. + // Time series will be replicated as necessary. + return h.forward(ctx, tenant, r, wreq) +} + // forward accepts a write request, batches its time series by // corresponding endpoint, and forwards them in parallel to the // correct endpoint. Requests destined for the local node are written @@ -635,55 +635,29 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p } } - wreqs := make(map[endpointReplica]trackedSeries) - for tsID, ts := range wreq.Timeseries { - for _, rn := range replicas { - endpoint, err := h.hashring.GetN(tenant, &ts, rn) - if err != nil { - h.mtx.RUnlock() - return err - } - key := endpointReplica{endpoint: endpoint, replica: rn} - writeTarget, ok := wreqs[key] - if !ok { - writeTarget = trackedSeries{ - seriesIDs: make([]int, 0), - timeSeries: make([]prompb.TimeSeries, 0), - } - } - writeTarget.timeSeries = append(wreqs[key].timeSeries, ts) - writeTarget.seriesIDs = append(wreqs[key].seriesIDs, tsID) - wreqs[key] = writeTarget - } + params := remoteWriteParams{ + tenant: tenant, + writeRequest: wreq, + replicas: replicas, + alreadyReplicated: r.replicated, } - h.mtx.RUnlock() - - return h.fanoutForward(ctx, tenant, wreqs, len(wreq.Timeseries), r.replicated) -} -// writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success. -func (h *Handler) writeQuorum() int { - return int((h.options.ReplicationFactor / 2) + 1) + return h.fanoutForward(ctx, params) } -func quorumReached(successes []int, successThreshold int) bool { - for _, success := range successes { - if success < successThreshold { - return false - } - } - - return true +type remoteWriteParams struct { + tenant string + writeRequest *prompb.WriteRequest + replicas []uint64 + alreadyReplicated bool } -// fanoutForward fans out concurrently given set of write requests. It returns status immediately when quorum of -// requests succeeds or fails or if context is canceled. -func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]trackedSeries, numSeries int, seriesReplicated bool) error { - var errs writeErrors +func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) error { + ctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), ctx), h.options.ForwardTimeout) - fctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), pctx), h.options.ForwardTimeout) + var writeErrors writeErrors defer func() { - if errs.ErrOrNil() != nil { + if writeErrors.ErrOrNil() != nil { // NOTICE: The cancel function is not used on all paths intentionally, // if there is no error when quorum is reached, // let forward requests to optimistically run until timeout. @@ -691,140 +665,25 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e } }() - var tLogger log.Logger - { - logTags := []interface{}{"tenant", tenant} - if id, ok := middleware.RequestIDFromContext(pctx); ok { - logTags = append(logTags, "request-id", id) - } - tLogger = log.With(h.logger, logTags...) - } - - // NOTE(GiedriusS): First write locally because inside of the function we check if the local TSDB has cached strings. - // If not then it copies those strings. This is so that the memory allocated for the - // protobuf (except for the labels) can be deallocated. - // This causes a write to the labels field. When fanning out this request to other Receivers, the code calls - // Size() which reads those same fields. We would like to avoid adding locks around each string - // hence we need to write locally first. - var maxBufferedResponses = 0 - for writeTarget := range wreqs { - if writeTarget.endpoint != h.options.Endpoint { - continue - } - maxBufferedResponses++ + logTags := []interface{}{"tenant", params.tenant} + if id, ok := middleware.RequestIDFromContext(ctx); ok { + logTags = append(logTags, "request-id", id) } + requestLogger := log.With(h.logger, logTags...) - responses := make(chan writeResponse, maxBufferedResponses) - - var wg sync.WaitGroup - - for writeTarget := range wreqs { - if writeTarget.endpoint != h.options.Endpoint { - continue - } - // If the endpoint for the write request is the - // local node, then don't make a request but store locally. - // By handing replication to the local node in the same - // function as replication to other nodes, we can treat - // a failure to write locally as just another error that - // can be ignored if the replication factor is met. - var err error - - tracing.DoInSpan(fctx, "receive_tsdb_write", func(_ context.Context) { - err = h.writer.Write(fctx, tenant, &prompb.WriteRequest{ - Timeseries: wreqs[writeTarget].timeSeries, - }) - }) - if err != nil { - level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error()) - responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(err, "store locally for endpoint %v", writeTarget.endpoint)) - continue - } - - responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, nil) + localWrites, remoteWrites, err := h.distributeTimeseriesToReplicas(params.tenant, params.replicas, params.writeRequest.Timeseries) + if err != nil { + level.Error(requestLogger).Log("msg", "failed to distribute timeseries to replicas", "err", err) + return err } - for writeTarget := range wreqs { - if writeTarget.endpoint == h.options.Endpoint { - continue - } - wg.Add(1) - // Make a request to the specified endpoint. - go func(writeTarget endpointReplica) { - defer wg.Done() - - var ( - err error - cl storepb.WriteableStoreClient - ) - defer func() { - // This is an actual remote forward request so report metric here. - if err != nil { - h.forwardRequests.WithLabelValues(labelError).Inc() - if !seriesReplicated { - h.replications.WithLabelValues(labelError).Inc() - } - return - } - h.forwardRequests.WithLabelValues(labelSuccess).Inc() - if !seriesReplicated { - h.replications.WithLabelValues(labelSuccess).Inc() - } - }() - cl, err = h.peers.get(fctx, writeTarget.endpoint) - if err != nil { - responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(err, "get peer connection for endpoint %v", writeTarget.endpoint)) - return - } - - h.mtx.RLock() - b, ok := h.peerStates[writeTarget.endpoint] - if ok { - if time.Now().Before(b.nextAllowed) { - h.mtx.RUnlock() - responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", writeTarget.endpoint)) - return - } - } - h.mtx.RUnlock() - - // Create a span to track the request made to another receive node. - tracing.DoInSpan(fctx, "receive_forward", func(ctx context.Context) { - // Actually make the request against the endpoint we determined should handle these time series. - _, err = cl.RemoteWrite(ctx, &storepb.WriteRequest{ - Timeseries: wreqs[writeTarget].timeSeries, - Tenant: tenant, - // Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated. - Replica: int64(writeTarget.replica + 1), - }) - }) - if err != nil { - // Check if peer connection is unavailable, don't attempt to send requests constantly. - if st, ok := status.FromError(err); ok { - if st.Code() == codes.Unavailable { - h.mtx.Lock() - if b, ok := h.peerStates[writeTarget.endpoint]; ok { - b.attempt++ - dur := h.expBackoff.ForAttempt(b.attempt) - b.nextAllowed = time.Now().Add(dur) - level.Debug(tLogger).Log("msg", "target unavailable backing off", "for", dur) - } else { - h.peerStates[writeTarget.endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))} - } - h.mtx.Unlock() - } - } - werr := errors.Wrapf(err, "forwarding request to endpoint %v", writeTarget.endpoint) - responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, werr) - return - } - h.mtx.Lock() - delete(h.peerStates, writeTarget.endpoint) - h.mtx.Unlock() + // Prepare a buffered channel to receive the responses from the local and remote writes. Remote writes will all go + // asynchronously and with this capacity we will never block on writing to the channel. + maxBufferedResponses := len(localWrites) + len(remoteWrites) + responses := make(chan writeResponse, maxBufferedResponses) + wg := sync.WaitGroup{} - responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, nil) - }(writeTarget) - } + h.sendWrites(ctx, &wg, params, localWrites, remoteWrites, responses) go func() { wg.Wait() @@ -835,40 +694,42 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e // This is needed if context is canceled or if we reached success of fail quorum faster. defer func() { go func() { - for wresp := range responses { - if wresp.err != nil { - level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", wresp.err) + for resp := range responses { + if resp.err != nil { + level.Debug(requestLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", resp.err) } } }() }() quorum := h.writeQuorum() - if seriesReplicated { + if params.alreadyReplicated { quorum = 1 } - successes := make([]int, numSeries) - seriesErrs := newReplicationErrors(quorum, numSeries) + successes := make([]int, len(params.writeRequest.Timeseries)) + seriesErrs := newReplicationErrors(quorum, len(params.writeRequest.Timeseries)) for { select { - case <-fctx.Done(): - return fctx.Err() - case wresp, more := <-responses: - if !more { - for _, rerr := range seriesErrs { - errs.Add(rerr) + case <-ctx.Done(): + return ctx.Err() + case resp, hasMore := <-responses: + if !hasMore { + for _, seriesErr := range seriesErrs { + writeErrors.Add(seriesErr) } - return errs.ErrOrNil() + return writeErrors.ErrOrNil() } - if wresp.err != nil { - for _, tsID := range wresp.seriesIDs { - seriesErrs[tsID].Add(wresp.err) + if resp.err != nil { + // Track errors and successes on a per-series basis. + for _, seriesID := range resp.seriesIDs { + seriesErrs[seriesID].Add(resp.err) } continue } - for _, tsID := range wresp.seriesIDs { - successes[tsID]++ + // At the end, aggregate all errors if there are any and return them. + for _, seriesID := range resp.seriesIDs { + successes[seriesID]++ } if quorumReached(successes, quorum) { return nil @@ -877,6 +738,170 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e } } +// distributeTimeseriesToReplicas distributes the given timeseries from the tenant to different endpoints in a manner +// that achieves the replication factor indicated by replicas. +// The first return value are the series that should be written to the local node. The second return value are the +// series that should be written to remote nodes. +func (h *Handler) distributeTimeseriesToReplicas( + tenant string, + replicas []uint64, + timeseries []prompb.TimeSeries, +) (map[endpointReplica]trackedSeries, map[endpointReplica]trackedSeries, error) { + h.mtx.RLock() + defer h.mtx.RUnlock() + remoteWrites := make(map[endpointReplica]trackedSeries) + localWrites := make(map[endpointReplica]trackedSeries) + for tsIndex, ts := range timeseries { + for _, rn := range replicas { + endpoint, err := h.hashring.GetN(tenant, &ts, rn) + if err != nil { + return nil, nil, err + } + endpointReplica := endpointReplica{endpoint: endpoint, replica: rn} + var writeDestination = remoteWrites + if endpoint == h.options.Endpoint { + writeDestination = localWrites + } + writeableSeries, ok := writeDestination[endpointReplica] + if !ok { + writeableSeries = trackedSeries{ + seriesIDs: make([]int, 0), + timeSeries: make([]prompb.TimeSeries, 0), + } + } + writeableSeries.timeSeries = append(writeDestination[endpointReplica].timeSeries, ts) + writeableSeries.seriesIDs = append(writeDestination[endpointReplica].seriesIDs, tsIndex) + writeDestination[endpointReplica] = writeableSeries + } + } + return localWrites, remoteWrites, nil +} + +// sendWrites sends the local and remote writes to execute concurrently, controlling them through the provided sync.WaitGroup. +// The responses from the writes are sent to the responses channel. +func (h *Handler) sendWrites( + ctx context.Context, + wg *sync.WaitGroup, + params remoteWriteParams, + localWrites map[endpointReplica]trackedSeries, + remoteWrites map[endpointReplica]trackedSeries, + responses chan<- writeResponse, +) { + // Do the writes to the local node first. This should be easy and fast. + for writeDestination := range localWrites { + func(writeDestination endpointReplica) { + h.sendLocalWrite(ctx, writeDestination, params.tenant, localWrites[writeDestination], responses) + }(writeDestination) + } + + // Do the writes to remote nodes. Run them all in parallel. + for writeDestination := range remoteWrites { + wg.Add(1) + go func(writeDestination endpointReplica) { + defer wg.Done() + h.sendRemoteWrite(ctx, params.tenant, writeDestination, remoteWrites[writeDestination], params.alreadyReplicated, responses) + }(writeDestination) + } +} + +// sendLocalWrite sends a write request to the local node. +// The responses are sent to the responses channel. +func (h *Handler) sendLocalWrite( + ctx context.Context, + writeDestination endpointReplica, + tenant string, + trackedSeries trackedSeries, + responses chan<- writeResponse, +) { + span, tracingCtx := tracing.StartSpan(ctx, "receive_local_tsdb_write") + defer span.Finish() + span.SetTag("endpoint", writeDestination.endpoint) + span.SetTag("replica", writeDestination.replica) + err := h.writer.Write(tracingCtx, tenant, &prompb.WriteRequest{ + Timeseries: trackedSeries.timeSeries, + }) + if err != nil { + span.SetTag("error", true) + span.SetTag("error.msg", err.Error()) + responses <- newWriteResponse(trackedSeries.seriesIDs, err) + return + } + responses <- newWriteResponse(trackedSeries.seriesIDs, nil) +} + +// sendRemoteWrite sends a write request to the remote node. It takes care of checking wether the endpoint is up or not +// in the peerGroup, correctly marking them as up or down when appropriate. +// The responses are sent to the responses channel. +func (h *Handler) sendRemoteWrite( + ctx context.Context, + tenant string, + endpointReplica endpointReplica, + trackedSeries trackedSeries, + alreadyReplicated bool, + responses chan<- writeResponse, +) { + endpoint := endpointReplica.endpoint + cl, err := h.peers.getConnection(ctx, endpoint) + if err != nil { + if errors.Is(err, errUnavailable) { + err = errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpointReplica) + } + responses <- newWriteResponse(trackedSeries.seriesIDs, err) + return + } + + span, spanCtx := tracing.StartSpan(ctx, "receive_forward") + // This is called "real" because it's 1-indexed. + realReplicationIndex := int64(endpointReplica.replica + 1) + span.SetTag("endpoint", endpointReplica.endpoint) + span.SetTag("replica", realReplicationIndex) + // Actually make the request against the endpoint we determined should handle these time series. + _, err = cl.RemoteWrite(spanCtx, &storepb.WriteRequest{ + Timeseries: trackedSeries.timeSeries, + Tenant: tenant, + // Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated. + Replica: realReplicationIndex, + }) + if err != nil { + span.SetTag("error", true) + span.SetTag("error.msg", err.Error()) + // Check if peer connection is unavailable, update the peer state to avoid spamming that peer. + if st, ok := status.FromError(err); ok { + if st.Code() == codes.Unavailable { + h.peers.markPeerUnavailable(endpoint) + } + } + h.forwardRequests.WithLabelValues(labelError).Inc() + if !alreadyReplicated { + h.replications.WithLabelValues(labelError).Inc() + } + responses <- newWriteResponse(trackedSeries.seriesIDs, err) + return + } + span.Finish() + h.forwardRequests.WithLabelValues(labelSuccess).Inc() + if !alreadyReplicated { + h.replications.WithLabelValues(labelSuccess).Inc() + } + responses <- newWriteResponse(trackedSeries.seriesIDs, nil) + h.peers.markPeerAvailable(endpoint) +} + +// writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success. +func (h *Handler) writeQuorum() int { + return int((h.options.ReplicationFactor / 2) + 1) +} + +func quorumReached(successes []int, successThreshold int) bool { + for _, success := range successes { + if success < successThreshold { + return false + } + } + + return true +} + // RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore. func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*storepb.WriteResponse, error) { span, ctx := tracing.StartSpan(ctx, "receive_grpc") @@ -1161,24 +1186,32 @@ func newReplicationErrors(threshold, numErrors int) []*replicationErrors { return errs } -func newPeerGroup(dialOpts ...grpc.DialOption) peersContainer { +func newPeerGroup(backoff backoff.Backoff, dialOpts ...grpc.DialOption) peersContainer { return &peerGroup{ - dialOpts: dialOpts, - cache: map[string]*grpc.ClientConn{}, - m: sync.RWMutex{}, - dialer: grpc.DialContext, + dialOpts: dialOpts, + connections: map[string]*grpc.ClientConn{}, + m: sync.RWMutex{}, + dialer: grpc.DialContext, + peerStates: make(map[string]*retryState), + expBackoff: backoff, } } type peersContainer interface { close(string) error - get(context.Context, string) (storepb.WriteableStoreClient, error) + getConnection(context.Context, string) (storepb.WriteableStoreClient, error) + markPeerUnavailable(string) + markPeerAvailable(string) + reset() } type peerGroup struct { - dialOpts []grpc.DialOption - cache map[string]*grpc.ClientConn - m sync.RWMutex + dialOpts []grpc.DialOption + connections map[string]*grpc.ClientConn + peerStates map[string]*retryState + expBackoff backoff.Backoff + + m sync.RWMutex // dialer is used for testing. dialer func(ctx context.Context, target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) @@ -1188,14 +1221,14 @@ func (p *peerGroup) close(addr string) error { p.m.Lock() defer p.m.Unlock() - c, ok := p.cache[addr] + c, ok := p.connections[addr] if !ok { // NOTE(GiedriusS): this could be valid case when the connection // was never established. return nil } - delete(p.cache, addr) + delete(p.connections, addr) if err := c.Close(); err != nil { return fmt.Errorf("closing connection for %s", addr) } @@ -1203,10 +1236,14 @@ func (p *peerGroup) close(addr string) error { return nil } -func (p *peerGroup) get(ctx context.Context, addr string) (storepb.WriteableStoreClient, error) { +func (p *peerGroup) getConnection(ctx context.Context, addr string) (storepb.WriteableStoreClient, error) { + if !p.isPeerUp(addr) { + return nil, errUnavailable + } + // use a RLock first to prevent blocking if we don't need to. p.m.RLock() - c, ok := p.cache[addr] + c, ok := p.connections[addr] p.m.RUnlock() if ok { return storepb.NewWriteableStoreClient(c), nil @@ -1215,15 +1252,51 @@ func (p *peerGroup) get(ctx context.Context, addr string) (storepb.WriteableStor p.m.Lock() defer p.m.Unlock() // Make sure that another caller hasn't created the connection since obtaining the write lock. - c, ok = p.cache[addr] + c, ok = p.connections[addr] if ok { return storepb.NewWriteableStoreClient(c), nil } conn, err := p.dialer(ctx, addr, p.dialOpts...) if err != nil { - return nil, errors.Wrap(err, "failed to dial peer") + p.markPeerUnavailable(addr) + dialError := errors.Wrap(err, "failed to dial peer") + return nil, errors.Wrap(dialError, errUnavailable.Error()) } - p.cache[addr] = conn + p.connections[addr] = conn return storepb.NewWriteableStoreClient(conn), nil } + +func (p *peerGroup) markPeerUnavailable(addr string) { + p.m.Lock() + defer p.m.Unlock() + + state, ok := p.peerStates[addr] + if !ok { + state = &retryState{attempt: -1} + } + state.attempt++ + state.nextAllowed = time.Now().Add(p.expBackoff.ForAttempt(state.attempt)) + p.peerStates[addr] = state +} + +func (p *peerGroup) markPeerAvailable(addr string) { + p.m.Lock() + defer p.m.Unlock() + delete(p.peerStates, addr) +} + +func (p *peerGroup) isPeerUp(addr string) bool { + p.m.RLock() + defer p.m.RUnlock() + state, ok := p.peerStates[addr] + if !ok { + return true + } + return time.Now().After(state.nextAllowed) +} + +func (p *peerGroup) reset() { + p.expBackoff.Reset() + p.peerStates = make(map[string]*retryState) +} diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 7eadfee930..a693843aff 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -44,6 +44,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/extkingpin" + "github.com/thanos-io/thanos/pkg/logging" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -172,6 +173,15 @@ type fakePeersGroup struct { closeCalled map[string]bool } +func (g *fakePeersGroup) markPeerUnavailable(s string) { +} + +func (g *fakePeersGroup) markPeerAvailable(s string) { +} + +func (g *fakePeersGroup) reset() { +} + func (g *fakePeersGroup) close(addr string) error { if g.closeCalled == nil { g.closeCalled = map[string]bool{} @@ -180,7 +190,7 @@ func (g *fakePeersGroup) close(addr string) error { return nil } -func (g *fakePeersGroup) get(_ context.Context, addr string) (storepb.WriteableStoreClient, error) { +func (g *fakePeersGroup) getConnection(_ context.Context, addr string) (storepb.WriteableStoreClient, error) { c, ok := g.clients[addr] if !ok { return nil, fmt.Errorf("client %s not found", addr) @@ -202,8 +212,9 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin ag := addrGen{} limiter, _ := NewLimiter(NewNopConfig(), nil, RouterIngestor, log.NewNopLogger(), 1*time.Second) + logger := logging.NewLogger("debug", "logfmt", "receive_test") for i := range appendables { - h := NewHandler(nil, &Options{ + h := NewHandler(logger, &Options{ TenantHeader: tenancy.DefaultTenantHeader, ReplicaHeader: DefaultReplicaHeader, ReplicationFactor: replicationFactor, @@ -248,6 +259,7 @@ func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsist replicationFactor uint64 wreq *prompb.WriteRequest appendables []*fakeAppendable + randomNode bool }{ { name: "size 1 success", @@ -605,17 +617,29 @@ func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsist t.Fatalf("unable to create test handler: %v", err) } tenant := "test" - // Test from the point of view of every node - // so that we know status code does not depend - // on which node is erroring and which node is receiving. - for i, handler := range handlers { - // Test that the correct status is returned. + + if tc.randomNode { + handler := handlers[0] rec, err := makeRequest(handler, tenant, tc.wreq) if err != nil { - t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", i+1, err) + t.Fatalf("handler: unexpectedly failed making HTTP request: %v", err) } if rec.Code != tc.status { - t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i+1, tc.status, rec.Code, rec.Body.String()) + t.Errorf("handler: got unexpected HTTP status code: expected %d, got %d; body: %s", tc.status, rec.Code, rec.Body.String()) + } + } else { + // Test from the point of view of every node + // so that we know status code does not depend + // on which node is erroring and which node is receiving. + for i, handler := range handlers { + // Test that the correct status is returned. + rec, err := makeRequest(handler, tenant, tc.wreq) + if err != nil { + t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", i+1, err) + } + if rec.Code != tc.status { + t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i+1, tc.status, rec.Code, rec.Body.String()) + } } } @@ -654,6 +678,9 @@ func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsist // We have len(handlers) copies of each sample because the test case // is run once for each handler and they all use the same appender. expectedMin = int((tc.replicationFactor/2)+1) * len(ts.Samples) + if tc.randomNode { + expectedMin = len(ts.Samples) + } } if uint64(expectedMin) > got { t.Errorf("handler: %d, labels %q: expected minimum of %d samples, got %d", j, lset.String(), expectedMin, got) diff --git a/pkg/server/http/middleware/request_id.go b/pkg/server/http/middleware/request_id.go index f09496b71c..fddeabf932 100644 --- a/pkg/server/http/middleware/request_id.go +++ b/pkg/server/http/middleware/request_id.go @@ -33,8 +33,8 @@ func RequestID(h http.Handler) http.HandlerFunc { reqID := r.Header.Get("X-Request-ID") if reqID == "" { entropy := ulid.Monotonic(rand.New(rand.NewSource(time.Now().UnixNano())), 0) - reqID := ulid.MustNew(ulid.Timestamp(time.Now()), entropy) - r.Header.Set("X-Request-ID", reqID.String()) + reqID = ulid.MustNew(ulid.Timestamp(time.Now()), entropy).String() + r.Header.Set("X-Request-ID", reqID) } ctx := newContextWithRequestID(r.Context(), reqID) h.ServeHTTP(w, r.WithContext(ctx))