diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index c0c0fea21f..ba4b9b94ce 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -107,8 +107,6 @@ type Handler struct { mtx sync.RWMutex hashring Hashring peers peersContainer - expBackoff backoff.Backoff - peerStates map[string]*retryState receiverMode ReceiverMode forwardRequests *prometheus.CounterVec @@ -132,19 +130,20 @@ func NewHandler(logger log.Logger, o *Options) *Handler { } h := &Handler{ - logger: logger, - writer: o.Writer, - router: route.New(), - options: o, - peers: newPeerGroup(o.DialOpts...), + logger: logger, + writer: o.Writer, + router: route.New(), + options: o, + peers: newPeerGroup( + backoff.Backoff{ + Factor: 2, + Min: 100 * time.Millisecond, + Max: o.MaxBackoff, + Jitter: true, + }, + o.DialOpts...), receiverMode: o.ReceiverMode, - expBackoff: backoff.Backoff{ - Factor: 2, - Min: 100 * time.Millisecond, - Max: o.MaxBackoff, - Jitter: true, - }, - Limiter: o.Limiter, + Limiter: o.Limiter, forwardRequests: promauto.With(registerer).NewCounterVec( prometheus.CounterOpts{ Name: "thanos_receive_forward_requests_total", @@ -266,8 +265,7 @@ func (h *Handler) Hashring(hashring Hashring) { } h.hashring = hashring - h.expBackoff.Reset() - h.peerStates = make(map[string]*retryState) + h.peers.reset() } // getSortedStringSliceDiff returns items which are in slice1 but not in slice2. @@ -434,41 +432,10 @@ func newWriteResponse(seriesIDs []int, err error) writeResponse { } } -func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, wreq *prompb.WriteRequest) error { - tLogger := log.With(h.logger, "tenant", tenant) - - // This replica value is used to detect cycles in cyclic topologies. - // A non-zero value indicates that the request has already been replicated by a previous receive instance. - // For almost all users, this is only used in fully connected topologies of IngestorRouter instances. - // For acyclic topologies that use RouterOnly and IngestorOnly instances, this causes issues when replicating data. - // See discussion in: https://github.com/thanos-io/thanos/issues/4359. - if h.receiverMode == RouterOnly || h.receiverMode == IngestorOnly { - rep = 0 - } - - // The replica value in the header is one-indexed, thus we need >. - if rep > h.options.ReplicationFactor { - level.Error(tLogger).Log("err", errBadReplica, "msg", "write request rejected", - "request_replica", rep, "replication_factor", h.options.ReplicationFactor) - return errBadReplica - } - - r := replica{n: rep, replicated: rep != 0} - - // On the wire, format is 1-indexed and in-code is 0-indexed, so we decrement the value if it was already replicated. - if r.replicated { - r.n-- - } - - // Forward any time series as necessary. All time series - // destined for the local node will be written to the receiver. - // Time series will be replicated as necessary. - return h.forward(ctx, tenant, r, wreq) -} - func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { var err error span, ctx := tracing.StartSpan(r.Context(), "receive_http") + span.SetTag("receiver.mode", string(h.receiverMode)) defer span.Finish() tenant, err := tenancy.GetTenantFromHTTP(r, h.options.TenantHeader, h.options.DefaultTenantID, h.options.TenantField) @@ -479,6 +446,7 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { } tLogger := log.With(h.logger, "tenant", tenant) + span.SetTag("tenant", tenant) writeGate := h.Limiter.WriteGate() tracing.DoInSpan(r.Context(), "receive_write_gate_ismyturn", func(ctx context.Context) { @@ -585,8 +553,8 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { } responseStatusCode := http.StatusOK - if err = h.handleRequest(ctx, rep, tenant, &wreq); err != nil { - level.Debug(tLogger).Log("msg", "failed to handle request", "err", err) + if err := h.handleRequest(ctx, rep, tenant, &wreq); err != nil { + level.Debug(tLogger).Log("msg", "failed to handle request", "err", err.Error()) switch errors.Cause(err) { case errNotReady: responseStatusCode = http.StatusServiceUnavailable @@ -606,6 +574,38 @@ func (h *Handler) receiveHTTP(w http.ResponseWriter, r *http.Request) { h.writeSamplesTotal.WithLabelValues(strconv.Itoa(responseStatusCode), tenant).Observe(float64(totalSamples)) } +func (h *Handler) handleRequest(ctx context.Context, rep uint64, tenant string, wreq *prompb.WriteRequest) error { + tLogger := log.With(h.logger, "tenant", tenant) + + // This replica value is used to detect cycles in cyclic topologies. + // A non-zero value indicates that the request has already been replicated by a previous receive instance. + // For almost all users, this is only used in fully connected topologies of IngestorRouter instances. + // For acyclic topologies that use RouterOnly and IngestorOnly instances, this causes issues when replicating data. + // See discussion in: https://github.com/thanos-io/thanos/issues/4359. + if h.receiverMode == RouterOnly || h.receiverMode == IngestorOnly { + rep = 0 + } + + // The replica value in the header is one-indexed, thus we need >. + if rep > h.options.ReplicationFactor { + level.Error(tLogger).Log("err", errBadReplica, "msg", "write request rejected", + "request_replica", rep, "replication_factor", h.options.ReplicationFactor) + return errBadReplica + } + + r := replica{n: rep, replicated: rep != 0} + + // On the wire, format is 1-indexed and in-code is 0-indexed, so we decrement the value if it was already replicated. + if r.replicated { + r.n-- + } + + // Forward any time series as necessary. All time series + // destined for the local node will be written to the receiver. + // Time series will be replicated as necessary. + return h.forward(ctx, tenant, r, wreq) +} + // forward accepts a write request, batches its time series by // corresponding endpoint, and forwards them in parallel to the // correct endpoint. Requests destined for the local node are written @@ -635,55 +635,29 @@ func (h *Handler) forward(ctx context.Context, tenant string, r replica, wreq *p } } - wreqs := make(map[endpointReplica]trackedSeries) - for tsID, ts := range wreq.Timeseries { - for _, rn := range replicas { - endpoint, err := h.hashring.GetN(tenant, &ts, rn) - if err != nil { - h.mtx.RUnlock() - return err - } - key := endpointReplica{endpoint: endpoint, replica: rn} - writeTarget, ok := wreqs[key] - if !ok { - writeTarget = trackedSeries{ - seriesIDs: make([]int, 0), - timeSeries: make([]prompb.TimeSeries, 0), - } - } - writeTarget.timeSeries = append(wreqs[key].timeSeries, ts) - writeTarget.seriesIDs = append(wreqs[key].seriesIDs, tsID) - wreqs[key] = writeTarget - } + params := remoteWriteParams{ + tenant: tenant, + writeRequest: wreq, + replicas: replicas, + alreadyReplicated: r.replicated, } - h.mtx.RUnlock() - - return h.fanoutForward(ctx, tenant, wreqs, len(wreq.Timeseries), r.replicated) -} -// writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success. -func (h *Handler) writeQuorum() int { - return int((h.options.ReplicationFactor / 2) + 1) + return h.fanoutForward(ctx, params) } -func quorumReached(successes []int, successThreshold int) bool { - for _, success := range successes { - if success < successThreshold { - return false - } - } - - return true +type remoteWriteParams struct { + tenant string + writeRequest *prompb.WriteRequest + replicas []uint64 + alreadyReplicated bool } -// fanoutForward fans out concurrently given set of write requests. It returns status immediately when quorum of -// requests succeeds or fails or if context is canceled. -func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[endpointReplica]trackedSeries, numSeries int, seriesReplicated bool) error { - var errs writeErrors +func (h *Handler) fanoutForward(ctx context.Context, params remoteWriteParams) error { + ctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), ctx), h.options.ForwardTimeout) - fctx, cancel := context.WithTimeout(tracing.CopyTraceContext(context.Background(), pctx), h.options.ForwardTimeout) + var writeErrors writeErrors defer func() { - if errs.ErrOrNil() != nil { + if writeErrors.ErrOrNil() != nil { // NOTICE: The cancel function is not used on all paths intentionally, // if there is no error when quorum is reached, // let forward requests to optimistically run until timeout. @@ -691,140 +665,25 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e } }() - var tLogger log.Logger - { - logTags := []interface{}{"tenant", tenant} - if id, ok := middleware.RequestIDFromContext(pctx); ok { - logTags = append(logTags, "request-id", id) - } - tLogger = log.With(h.logger, logTags...) - } - - // NOTE(GiedriusS): First write locally because inside of the function we check if the local TSDB has cached strings. - // If not then it copies those strings. This is so that the memory allocated for the - // protobuf (except for the labels) can be deallocated. - // This causes a write to the labels field. When fanning out this request to other Receivers, the code calls - // Size() which reads those same fields. We would like to avoid adding locks around each string - // hence we need to write locally first. - var maxBufferedResponses = 0 - for writeTarget := range wreqs { - if writeTarget.endpoint != h.options.Endpoint { - continue - } - maxBufferedResponses++ + logTags := []interface{}{"tenant", params.tenant} + if id, ok := middleware.RequestIDFromContext(ctx); ok { + logTags = append(logTags, "request-id", id) } + requestLogger := log.With(h.logger, logTags...) - responses := make(chan writeResponse, maxBufferedResponses) - - var wg sync.WaitGroup - - for writeTarget := range wreqs { - if writeTarget.endpoint != h.options.Endpoint { - continue - } - // If the endpoint for the write request is the - // local node, then don't make a request but store locally. - // By handing replication to the local node in the same - // function as replication to other nodes, we can treat - // a failure to write locally as just another error that - // can be ignored if the replication factor is met. - var err error - - tracing.DoInSpan(fctx, "receive_tsdb_write", func(_ context.Context) { - err = h.writer.Write(fctx, tenant, &prompb.WriteRequest{ - Timeseries: wreqs[writeTarget].timeSeries, - }) - }) - if err != nil { - level.Debug(tLogger).Log("msg", "local tsdb write failed", "err", err.Error()) - responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(err, "store locally for endpoint %v", writeTarget.endpoint)) - continue - } - - responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, nil) + localWrites, remoteWrites, err := h.distributeTimeseriesToReplicas(params.tenant, params.replicas, params.writeRequest.Timeseries) + if err != nil { + level.Error(requestLogger).Log("msg", "failed to distribute timeseries to replicas", "err", err) + return err } - for writeTarget := range wreqs { - if writeTarget.endpoint == h.options.Endpoint { - continue - } - wg.Add(1) - // Make a request to the specified endpoint. - go func(writeTarget endpointReplica) { - defer wg.Done() - - var ( - err error - cl storepb.WriteableStoreClient - ) - defer func() { - // This is an actual remote forward request so report metric here. - if err != nil { - h.forwardRequests.WithLabelValues(labelError).Inc() - if !seriesReplicated { - h.replications.WithLabelValues(labelError).Inc() - } - return - } - h.forwardRequests.WithLabelValues(labelSuccess).Inc() - if !seriesReplicated { - h.replications.WithLabelValues(labelSuccess).Inc() - } - }() - cl, err = h.peers.get(fctx, writeTarget.endpoint) - if err != nil { - responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(err, "get peer connection for endpoint %v", writeTarget.endpoint)) - return - } - - h.mtx.RLock() - b, ok := h.peerStates[writeTarget.endpoint] - if ok { - if time.Now().Before(b.nextAllowed) { - h.mtx.RUnlock() - responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", writeTarget.endpoint)) - return - } - } - h.mtx.RUnlock() - - // Create a span to track the request made to another receive node. - tracing.DoInSpan(fctx, "receive_forward", func(ctx context.Context) { - // Actually make the request against the endpoint we determined should handle these time series. - _, err = cl.RemoteWrite(ctx, &storepb.WriteRequest{ - Timeseries: wreqs[writeTarget].timeSeries, - Tenant: tenant, - // Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated. - Replica: int64(writeTarget.replica + 1), - }) - }) - if err != nil { - // Check if peer connection is unavailable, don't attempt to send requests constantly. - if st, ok := status.FromError(err); ok { - if st.Code() == codes.Unavailable { - h.mtx.Lock() - if b, ok := h.peerStates[writeTarget.endpoint]; ok { - b.attempt++ - dur := h.expBackoff.ForAttempt(b.attempt) - b.nextAllowed = time.Now().Add(dur) - level.Debug(tLogger).Log("msg", "target unavailable backing off", "for", dur) - } else { - h.peerStates[writeTarget.endpoint] = &retryState{nextAllowed: time.Now().Add(h.expBackoff.ForAttempt(0))} - } - h.mtx.Unlock() - } - } - werr := errors.Wrapf(err, "forwarding request to endpoint %v", writeTarget.endpoint) - responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, werr) - return - } - h.mtx.Lock() - delete(h.peerStates, writeTarget.endpoint) - h.mtx.Unlock() + // Prepare a buffered channel to receive the responses from the local and remote writes. Remote writes will all go + // asynchronously and with this capacity we will never block on writing to the channel. + maxBufferedResponses := len(localWrites) + len(remoteWrites) + responses := make(chan writeResponse, maxBufferedResponses) + wg := sync.WaitGroup{} - responses <- newWriteResponse(wreqs[writeTarget].seriesIDs, nil) - }(writeTarget) - } + h.sendWrites(ctx, &wg, params, localWrites, remoteWrites, responses) go func() { wg.Wait() @@ -835,40 +694,42 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e // This is needed if context is canceled or if we reached success of fail quorum faster. defer func() { go func() { - for wresp := range responses { - if wresp.err != nil { - level.Debug(tLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", wresp.err) + for resp := range responses { + if resp.err != nil { + level.Debug(requestLogger).Log("msg", "request failed, but not needed to achieve quorum", "err", resp.err) } } }() }() quorum := h.writeQuorum() - if seriesReplicated { + if params.alreadyReplicated { quorum = 1 } - successes := make([]int, numSeries) - seriesErrs := newReplicationErrors(quorum, numSeries) + successes := make([]int, len(params.writeRequest.Timeseries)) + seriesErrs := newReplicationErrors(quorum, len(params.writeRequest.Timeseries)) for { select { - case <-fctx.Done(): - return fctx.Err() - case wresp, more := <-responses: - if !more { - for _, rerr := range seriesErrs { - errs.Add(rerr) + case <-ctx.Done(): + return ctx.Err() + case resp, hasMore := <-responses: + if !hasMore { + for _, seriesErr := range seriesErrs { + writeErrors.Add(seriesErr) } - return errs.ErrOrNil() + return writeErrors.ErrOrNil() } - if wresp.err != nil { - for _, tsID := range wresp.seriesIDs { - seriesErrs[tsID].Add(wresp.err) + if resp.err != nil { + // Track errors and successes on a per-series basis. + for _, seriesID := range resp.seriesIDs { + seriesErrs[seriesID].Add(resp.err) } continue } - for _, tsID := range wresp.seriesIDs { - successes[tsID]++ + // At the end, aggregate all errors if there are any and return them. + for _, seriesID := range resp.seriesIDs { + successes[seriesID]++ } if quorumReached(successes, quorum) { return nil @@ -877,6 +738,170 @@ func (h *Handler) fanoutForward(pctx context.Context, tenant string, wreqs map[e } } +// distributeTimeseriesToReplicas distributes the given timeseries from the tenant to different endpoints in a manner +// that achieves the replication factor indicated by replicas. +// The first return value are the series that should be written to the local node. The second return value are the +// series that should be written to remote nodes. +func (h *Handler) distributeTimeseriesToReplicas( + tenant string, + replicas []uint64, + timeseries []prompb.TimeSeries, +) (map[endpointReplica]trackedSeries, map[endpointReplica]trackedSeries, error) { + h.mtx.RLock() + defer h.mtx.RUnlock() + remoteWrites := make(map[endpointReplica]trackedSeries) + localWrites := make(map[endpointReplica]trackedSeries) + for tsIndex, ts := range timeseries { + for _, rn := range replicas { + endpoint, err := h.hashring.GetN(tenant, &ts, rn) + if err != nil { + return nil, nil, err + } + endpointReplica := endpointReplica{endpoint: endpoint, replica: rn} + var writeDestination = remoteWrites + if endpoint == h.options.Endpoint { + writeDestination = localWrites + } + writeableSeries, ok := writeDestination[endpointReplica] + if !ok { + writeableSeries = trackedSeries{ + seriesIDs: make([]int, 0), + timeSeries: make([]prompb.TimeSeries, 0), + } + } + writeableSeries.timeSeries = append(writeDestination[endpointReplica].timeSeries, ts) + writeableSeries.seriesIDs = append(writeDestination[endpointReplica].seriesIDs, tsIndex) + writeDestination[endpointReplica] = writeableSeries + } + } + return localWrites, remoteWrites, nil +} + +// sendWrites sends the local and remote writes to execute concurrently, controlling them through the provided sync.WaitGroup. +// The responses from the writes are sent to the responses channel. +func (h *Handler) sendWrites( + ctx context.Context, + wg *sync.WaitGroup, + params remoteWriteParams, + localWrites map[endpointReplica]trackedSeries, + remoteWrites map[endpointReplica]trackedSeries, + responses chan<- writeResponse, +) { + // Do the writes to the local node first. This should be easy and fast. + for writeDestination := range localWrites { + func(writeDestination endpointReplica) { + h.sendLocalWrite(ctx, writeDestination, params.tenant, localWrites[writeDestination], responses) + }(writeDestination) + } + + // Do the writes to remote nodes. Run them all in parallel. + for writeDestination := range remoteWrites { + wg.Add(1) + go func(writeDestination endpointReplica) { + defer wg.Done() + h.sendRemoteWrite(ctx, params.tenant, writeDestination, remoteWrites[writeDestination], params.alreadyReplicated, responses) + }(writeDestination) + } +} + +// sendLocalWrite sends a write request to the local node. +// The responses are sent to the responses channel. +func (h *Handler) sendLocalWrite( + ctx context.Context, + writeDestination endpointReplica, + tenant string, + trackedSeries trackedSeries, + responses chan<- writeResponse, +) { + span, tracingCtx := tracing.StartSpan(ctx, "receive_local_tsdb_write") + defer span.Finish() + span.SetTag("endpoint", writeDestination.endpoint) + span.SetTag("replica", writeDestination.replica) + err := h.writer.Write(tracingCtx, tenant, &prompb.WriteRequest{ + Timeseries: trackedSeries.timeSeries, + }) + if err != nil { + span.SetTag("error", true) + span.SetTag("error.msg", err.Error()) + responses <- newWriteResponse(trackedSeries.seriesIDs, err) + return + } + responses <- newWriteResponse(trackedSeries.seriesIDs, nil) +} + +// sendRemoteWrite sends a write request to the remote node. It takes care of checking wether the endpoint is up or not +// in the peerGroup, correctly marking them as up or down when appropriate. +// The responses are sent to the responses channel. +func (h *Handler) sendRemoteWrite( + ctx context.Context, + tenant string, + endpointReplica endpointReplica, + trackedSeries trackedSeries, + alreadyReplicated bool, + responses chan<- writeResponse, +) { + endpoint := endpointReplica.endpoint + cl, err := h.peers.getConnection(ctx, endpoint) + if err != nil { + if errors.Is(err, errUnavailable) { + err = errors.Wrapf(errUnavailable, "backing off forward request for endpoint %v", endpointReplica) + } + responses <- newWriteResponse(trackedSeries.seriesIDs, err) + return + } + + span, spanCtx := tracing.StartSpan(ctx, "receive_forward") + // This is called "real" because it's 1-indexed. + realReplicationIndex := int64(endpointReplica.replica + 1) + span.SetTag("endpoint", endpointReplica.endpoint) + span.SetTag("replica", realReplicationIndex) + // Actually make the request against the endpoint we determined should handle these time series. + _, err = cl.RemoteWrite(spanCtx, &storepb.WriteRequest{ + Timeseries: trackedSeries.timeSeries, + Tenant: tenant, + // Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated. + Replica: realReplicationIndex, + }) + if err != nil { + span.SetTag("error", true) + span.SetTag("error.msg", err.Error()) + // Check if peer connection is unavailable, update the peer state to avoid spamming that peer. + if st, ok := status.FromError(err); ok { + if st.Code() == codes.Unavailable { + h.peers.markPeerUnavailable(endpoint) + } + } + h.forwardRequests.WithLabelValues(labelError).Inc() + if !alreadyReplicated { + h.replications.WithLabelValues(labelError).Inc() + } + responses <- newWriteResponse(trackedSeries.seriesIDs, err) + return + } + span.Finish() + h.forwardRequests.WithLabelValues(labelSuccess).Inc() + if !alreadyReplicated { + h.replications.WithLabelValues(labelSuccess).Inc() + } + responses <- newWriteResponse(trackedSeries.seriesIDs, nil) + h.peers.markPeerAvailable(endpoint) +} + +// writeQuorum returns minimum number of replicas that has to confirm write success before claiming replication success. +func (h *Handler) writeQuorum() int { + return int((h.options.ReplicationFactor / 2) + 1) +} + +func quorumReached(successes []int, successThreshold int) bool { + for _, success := range successes { + if success < successThreshold { + return false + } + } + + return true +} + // RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore. func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*storepb.WriteResponse, error) { span, ctx := tracing.StartSpan(ctx, "receive_grpc") @@ -1161,24 +1186,32 @@ func newReplicationErrors(threshold, numErrors int) []*replicationErrors { return errs } -func newPeerGroup(dialOpts ...grpc.DialOption) peersContainer { +func newPeerGroup(backoff backoff.Backoff, dialOpts ...grpc.DialOption) peersContainer { return &peerGroup{ - dialOpts: dialOpts, - cache: map[string]*grpc.ClientConn{}, - m: sync.RWMutex{}, - dialer: grpc.DialContext, + dialOpts: dialOpts, + connections: map[string]*grpc.ClientConn{}, + m: sync.RWMutex{}, + dialer: grpc.DialContext, + peerStates: make(map[string]*retryState), + expBackoff: backoff, } } type peersContainer interface { close(string) error - get(context.Context, string) (storepb.WriteableStoreClient, error) + getConnection(context.Context, string) (storepb.WriteableStoreClient, error) + markPeerUnavailable(string) + markPeerAvailable(string) + reset() } type peerGroup struct { - dialOpts []grpc.DialOption - cache map[string]*grpc.ClientConn - m sync.RWMutex + dialOpts []grpc.DialOption + connections map[string]*grpc.ClientConn + peerStates map[string]*retryState + expBackoff backoff.Backoff + + m sync.RWMutex // dialer is used for testing. dialer func(ctx context.Context, target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) @@ -1188,14 +1221,14 @@ func (p *peerGroup) close(addr string) error { p.m.Lock() defer p.m.Unlock() - c, ok := p.cache[addr] + c, ok := p.connections[addr] if !ok { // NOTE(GiedriusS): this could be valid case when the connection // was never established. return nil } - delete(p.cache, addr) + delete(p.connections, addr) if err := c.Close(); err != nil { return fmt.Errorf("closing connection for %s", addr) } @@ -1203,10 +1236,14 @@ func (p *peerGroup) close(addr string) error { return nil } -func (p *peerGroup) get(ctx context.Context, addr string) (storepb.WriteableStoreClient, error) { +func (p *peerGroup) getConnection(ctx context.Context, addr string) (storepb.WriteableStoreClient, error) { + if !p.isPeerUp(addr) { + return nil, errUnavailable + } + // use a RLock first to prevent blocking if we don't need to. p.m.RLock() - c, ok := p.cache[addr] + c, ok := p.connections[addr] p.m.RUnlock() if ok { return storepb.NewWriteableStoreClient(c), nil @@ -1215,15 +1252,51 @@ func (p *peerGroup) get(ctx context.Context, addr string) (storepb.WriteableStor p.m.Lock() defer p.m.Unlock() // Make sure that another caller hasn't created the connection since obtaining the write lock. - c, ok = p.cache[addr] + c, ok = p.connections[addr] if ok { return storepb.NewWriteableStoreClient(c), nil } conn, err := p.dialer(ctx, addr, p.dialOpts...) if err != nil { - return nil, errors.Wrap(err, "failed to dial peer") + p.markPeerUnavailable(addr) + dialError := errors.Wrap(err, "failed to dial peer") + return nil, errors.Wrap(dialError, errUnavailable.Error()) } - p.cache[addr] = conn + p.connections[addr] = conn return storepb.NewWriteableStoreClient(conn), nil } + +func (p *peerGroup) markPeerUnavailable(addr string) { + p.m.Lock() + defer p.m.Unlock() + + state, ok := p.peerStates[addr] + if !ok { + state = &retryState{attempt: -1} + } + state.attempt++ + state.nextAllowed = time.Now().Add(p.expBackoff.ForAttempt(state.attempt)) + p.peerStates[addr] = state +} + +func (p *peerGroup) markPeerAvailable(addr string) { + p.m.Lock() + defer p.m.Unlock() + delete(p.peerStates, addr) +} + +func (p *peerGroup) isPeerUp(addr string) bool { + p.m.RLock() + defer p.m.RUnlock() + state, ok := p.peerStates[addr] + if !ok { + return true + } + return time.Now().After(state.nextAllowed) +} + +func (p *peerGroup) reset() { + p.expBackoff.Reset() + p.peerStates = make(map[string]*retryState) +} diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index de511dc8b6..1ac535862e 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -44,6 +44,7 @@ import ( "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/extkingpin" + "github.com/thanos-io/thanos/pkg/logging" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/store/labelpb" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -172,6 +173,15 @@ type fakePeersGroup struct { closeCalled map[string]bool } +func (g *fakePeersGroup) markPeerUnavailable(s string) { +} + +func (g *fakePeersGroup) markPeerAvailable(s string) { +} + +func (g *fakePeersGroup) reset() { +} + func (g *fakePeersGroup) close(addr string) error { if g.closeCalled == nil { g.closeCalled = map[string]bool{} @@ -180,7 +190,7 @@ func (g *fakePeersGroup) close(addr string) error { return nil } -func (g *fakePeersGroup) get(_ context.Context, addr string) (storepb.WriteableStoreClient, error) { +func (g *fakePeersGroup) getConnection(_ context.Context, addr string) (storepb.WriteableStoreClient, error) { c, ok := g.clients[addr] if !ok { return nil, fmt.Errorf("client %s not found", addr) @@ -202,8 +212,9 @@ func newTestHandlerHashring(appendables []*fakeAppendable, replicationFactor uin ag := addrGen{} limiter, _ := NewLimiter(NewNopConfig(), nil, RouterIngestor, log.NewNopLogger(), 1*time.Second) + logger := logging.NewLogger("debug", "logfmt", "receive_test") for i := range appendables { - h := NewHandler(nil, &Options{ + h := NewHandler(logger, &Options{ TenantHeader: tenancy.DefaultTenantHeader, ReplicaHeader: DefaultReplicaHeader, ReplicationFactor: replicationFactor, @@ -248,6 +259,7 @@ func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsist replicationFactor uint64 wreq *prompb.WriteRequest appendables []*fakeAppendable + randomNode bool }{ { name: "size 1 success", @@ -605,17 +617,29 @@ func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsist t.Fatalf("unable to create test handler: %v", err) } tenant := "test" - // Test from the point of view of every node - // so that we know status code does not depend - // on which node is erroring and which node is receiving. - for i, handler := range handlers { - // Test that the correct status is returned. + + if tc.randomNode { + handler := handlers[0] rec, err := makeRequest(handler, tenant, tc.wreq) if err != nil { - t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", i+1, err) + t.Fatalf("handler: unexpectedly failed making HTTP request: %v", err) } if rec.Code != tc.status { - t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i+1, tc.status, rec.Code, rec.Body.String()) + t.Errorf("handler: got unexpected HTTP status code: expected %d, got %d; body: %s", tc.status, rec.Code, rec.Body.String()) + } + } else { + // Test from the point of view of every node + // so that we know status code does not depend + // on which node is erroring and which node is receiving. + for i, handler := range handlers { + // Test that the correct status is returned. + rec, err := makeRequest(handler, tenant, tc.wreq) + if err != nil { + t.Fatalf("handler %d: unexpectedly failed making HTTP request: %v", i+1, err) + } + if rec.Code != tc.status { + t.Errorf("handler %d: got unexpected HTTP status code: expected %d, got %d; body: %s", i+1, tc.status, rec.Code, rec.Body.String()) + } } } @@ -654,6 +678,9 @@ func testReceiveQuorum(t *testing.T, hashringAlgo HashringAlgorithm, withConsist // We have len(handlers) copies of each sample because the test case // is run once for each handler and they all use the same appender. expectedMin = int((tc.replicationFactor/2)+1) * len(ts.Samples) + if tc.randomNode { + expectedMin = len(ts.Samples) + } } if uint64(expectedMin) > got { t.Errorf("handler: %d, labels %q: expected minimum of %d samples, got %d", j, lset.String(), expectedMin, got) diff --git a/pkg/server/http/middleware/request_id.go b/pkg/server/http/middleware/request_id.go index f09496b71c..fddeabf932 100644 --- a/pkg/server/http/middleware/request_id.go +++ b/pkg/server/http/middleware/request_id.go @@ -33,8 +33,8 @@ func RequestID(h http.Handler) http.HandlerFunc { reqID := r.Header.Get("X-Request-ID") if reqID == "" { entropy := ulid.Monotonic(rand.New(rand.NewSource(time.Now().UnixNano())), 0) - reqID := ulid.MustNew(ulid.Timestamp(time.Now()), entropy) - r.Header.Set("X-Request-ID", reqID.String()) + reqID = ulid.MustNew(ulid.Timestamp(time.Now()), entropy).String() + r.Header.Set("X-Request-ID", reqID) } ctx := newContextWithRequestID(r.Context(), reqID) h.ServeHTTP(w, r.WithContext(ctx))