Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reject series/write requests when max pending request limit is hit #114

Merged
merged 2 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,19 @@ func runReceive(
return errors.Wrap(err, "parse limit configuration")
}
}
limiter, err := receive.NewLimiter(conf.writeLimitsConfig, reg, receiveMode, log.With(logger, "component", "receive-limiter"), conf.limitsConfigReloadTimer)
if conf.maxPendingGrpcWriteRequests > 0 {
level.Info(logger).Log("msg", "set max pending gRPC write request in limiter", "max_pending_requests", conf.maxPendingGrpcWriteRequests)
}
limiter, err := receive.NewLimiterWithOptions(
conf.writeLimitsConfig,
Copy link
Collaborator

@jnyi jnyi Dec 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider reuse the writeLimitsConfig so less interface changes?

Copy link
Collaborator Author

@hczhu-db hczhu-db Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it'd be ideal to have this config field in writeLimitsConfig, but DB pods don't load writeLimitsConfig at all. Pantheon-writer pods load that config. I'll have to keep it this way.
The interface is not changed. receive.NewLimiter() stays the same. I added another function receive.NewLimiterWithOptions().

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, that's fair, do you wanna add some unit tests for limiter to test the load shedding behavior?

reg,
receiveMode,
log.With(logger, "component", "receive-limiter"),
conf.limitsConfigReloadTimer,
receive.LimiterOptions{
MaxPendingRequests: int32(conf.maxPendingGrpcWriteRequests),
},
)
if err != nil {
return errors.Wrap(err, "creating limiter")
}
Expand Down Expand Up @@ -408,6 +420,7 @@ func runReceive(
store.LazyRetrieval,
options...,
)

mts := store.NewLimitedStoreServer(store.NewInstrumentedStoreServer(reg, proxy), reg, conf.storeRateLimits)
rw := store.ReadWriteTSDBStore{
StoreServer: mts,
Expand Down Expand Up @@ -974,6 +987,7 @@ type receiveConfig struct {
topMetricsMinimumCardinality uint64
topMetricsUpdateInterval time.Duration
matcherConverterCacheCapacity int
maxPendingGrpcWriteRequests int

featureList *[]string
}
Expand Down Expand Up @@ -1138,6 +1152,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) {
Default("5m").DurationVar(&rc.topMetricsUpdateInterval)
cmd.Flag("receive.store-matcher-converter-cache-capacity", "The number of label matchers to cache in the matcher converter for the Store API. Set to 0 to disable to cache. Default is 0.").
Default("0").IntVar(&rc.matcherConverterCacheCapacity)
cmd.Flag("receive.max-pending-grcp-write-requests", "Reject right away gRPC write requests when this number of requests are pending. Value 0 disables this feature.").
Default("0").IntVar(&rc.maxPendingGrpcWriteRequests)
rc.featureList = cmd.Flag("enable-feature", "Comma separated experimental feature names to enable. The current list of features is "+metricNamesFilter+".").Default("").Strings()
}

Expand Down
19 changes: 6 additions & 13 deletions pkg/receive/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"github.com/prometheus/prometheus/tsdb"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"golang.org/x/exp/slices"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -142,9 +141,6 @@ type Handler struct {
writeTimeseriesTotal *prometheus.HistogramVec
writeE2eLatency *prometheus.HistogramVec

pendingWriteRequests prometheus.Gauge
pendingWriteRequestsCounter atomic.Int32

Limiter *Limiter
}

Expand Down Expand Up @@ -235,12 +231,6 @@ func NewHandler(logger log.Logger, o *Options) *Handler {
Buckets: []float64{1, 5, 10, 20, 30, 40, 50, 60, 90, 120, 300, 600, 900, 1200, 1800, 3600},
}, []string{"code", "tenant", "rollup"},
),
pendingWriteRequests: promauto.With(registerer).NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_pending_write_requests",
Help: "The number of pending write requests.",
},
),
}

h.forwardRequests.WithLabelValues(labelSuccess)
Expand Down Expand Up @@ -1083,12 +1073,15 @@ func quorumReached(successes []int, successThreshold int) bool {

// RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore.
func (h *Handler) RemoteWrite(ctx context.Context, r *storepb.WriteRequest) (*storepb.WriteResponse, error) {
if h.Limiter.ShouldRejectNewRequest() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, let's add a unit test for this behavior?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that in a follow-up PR while testing it in Dev. It's quite tricky to write a unit test for such a feature. I want to see how useful it's in Dev before spending time on it.

return nil, status.Error(codes.ResourceExhausted, "too many pending write requests")
}
// NB: ShouldRejectNewRequest() increments the number of pending requests only when it returns false.
defer h.Limiter.DecrementPendingRequests()

span, ctx := tracing.StartSpan(ctx, "receive_grpc")
defer span.Finish()

h.pendingWriteRequests.Set(float64(h.pendingWriteRequestsCounter.Add(1)))
defer h.pendingWriteRequestsCounter.Add(-1)

_, err := h.handleRequest(ctx, uint64(r.Replica), r.Tenant, &prompb.WriteRequest{Timeseries: r.Timeseries})
if err != nil {
level.Debug(h.logger).Log("msg", "failed to handle request", "err", err)
Expand Down
81 changes: 75 additions & 6 deletions pkg/receive/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"

"go.uber.org/atomic"
)

// Limiter is responsible for managing the configuration and initialization of
Expand All @@ -35,6 +37,19 @@ type Limiter struct {
configReloadFailedCounter prometheus.Counter
receiverMode ReceiverMode
configReloadTimer time.Duration

// Reject a request if this limit is reached.
// This filed is set at the instance creation and never changes afterwards.
// So it's safe to read it without a lock.
maxPendingRequests int32
maxPendingRequestLimitHit prometheus.Counter
pendingRequests atomic.Int32
pendingRequestsGauge prometheus.Gauge
}

type LimiterOptions struct {
// Value 0 disables the max pending request limiting hehavior.
MaxPendingRequests int32
}

// headSeriesLimiter encompasses active/head series limiting logic.
Expand Down Expand Up @@ -62,16 +77,50 @@ func (l *Limiter) HeadSeriesLimiter() headSeriesLimiter {
return l.headSeriesLimiter
}

func (l *Limiter) ShouldRejectNewRequest() bool {
// maxPendingRequests doesn't change once set when a limiter instance is created.
// So, it's safe to read it without a lock.
if l.maxPendingRequests > 0 && l.pendingRequests.Load() >= l.maxPendingRequests {
if l.maxPendingRequestLimitHit != nil {
l.maxPendingRequestLimitHit.Inc()
}
return true
}
newValue := l.pendingRequests.Add(1)
if l.pendingRequestsGauge != nil {
l.pendingRequestsGauge.Set(float64(newValue))
}
return false
}

func (l *Limiter) DecrementPendingRequests() {
newValue := l.pendingRequests.Add(-1)
if l.pendingRequestsGauge != nil {
l.pendingRequestsGauge.Set(float64(newValue))
}
}

// NewLimiter creates a new *Limiter given a configuration and prometheus
// registerer.
func NewLimiter(configFile fileContent, reg prometheus.Registerer, r ReceiverMode, logger log.Logger, configReloadTimer time.Duration) (*Limiter, error) {
jnyi marked this conversation as resolved.
Show resolved Hide resolved
return NewLimiterWithOptions(configFile, reg, r, logger, configReloadTimer, LimiterOptions{})
}

func NewLimiterWithOptions(
configFile fileContent,
reg prometheus.Registerer,
r ReceiverMode,
logger log.Logger,
configReloadTimer time.Duration,
opts LimiterOptions) (*Limiter, error) {
limiter := &Limiter{
writeGate: gate.NewNoop(),
requestLimiter: &noopRequestLimiter{},
headSeriesLimiter: NewNopSeriesLimit(),
logger: logger,
receiverMode: r,
configReloadTimer: configReloadTimer,
writeGate: gate.NewNoop(),
requestLimiter: &noopRequestLimiter{},
headSeriesLimiter: NewNopSeriesLimit(),
logger: logger,
receiverMode: r,
configReloadTimer: configReloadTimer,
maxPendingRequests: opts.MaxPendingRequests,
}

if reg != nil {
Expand All @@ -92,6 +141,26 @@ func NewLimiter(configFile fileContent, reg prometheus.Registerer, r ReceiverMod
Help: "How many times the limit configuration failed to reload.",
},
)
limiter.configReloadFailedCounter = promauto.With(limiter.registerer).NewCounter(
prometheus.CounterOpts{
Namespace: "thanos",
Subsystem: "receive",
Name: "limits_config_reload_err_total",
Help: "How many times the limit configuration failed to reload.",
},
)
limiter.maxPendingRequestLimitHit = promauto.With(limiter.registerer).NewCounter(
prometheus.CounterOpts{
Name: "thanos_receive_max_pending_write_request_limit_hit_total",
Help: "Number of times the max pending write request limit was hit",
},
)
limiter.pendingRequestsGauge = promauto.With(limiter.registerer).NewGauge(
prometheus.GaugeOpts{
Name: "thanos_receive_pending_write_requests",
Help: "Number of pending write requests",
},
)
}

if configFile == nil {
Expand Down
27 changes: 27 additions & 0 deletions pkg/store/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/atomic"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/thanos-io/thanos/pkg/extkingpin"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand Down Expand Up @@ -108,11 +110,14 @@ func NewBytesLimiterFactory(limit units.Base2Bytes) BytesLimiterFactory {
type SeriesSelectLimits struct {
SeriesPerRequest uint64
SamplesPerRequest uint64
PendingRequests int32
jnyi marked this conversation as resolved.
Show resolved Hide resolved
}

func (l *SeriesSelectLimits) RegisterFlags(cmd extkingpin.FlagClause) {
cmd.Flag("store.limits.request-series", "The maximum series allowed for a single Series request. The Series call fails if this limit is exceeded. 0 means no limit.").Default("0").Uint64Var(&l.SeriesPerRequest)
cmd.Flag("store.limits.request-samples", "The maximum samples allowed for a single Series request, The Series call fails if this limit is exceeded. 0 means no limit. NOTE: For efficiency the limit is internally implemented as 'chunks limit' considering each chunk contains a maximum of 120 samples.").Default("0").Uint64Var(&l.SamplesPerRequest)
cmd.Flag("store.limits.pending-requests", "Reject gRPC series requests right away when this number of requests are pending. Value 0 disables this feature.").
Default("0").Int32Var(&l.PendingRequests)
}

var _ storepb.StoreServer = &limitedStoreServer{}
Expand All @@ -123,6 +128,13 @@ type limitedStoreServer struct {
newSeriesLimiter SeriesLimiterFactory
newSamplesLimiter ChunksLimiterFactory
failedRequestsCounter *prometheus.CounterVec

// This is a read-only field once it's set.
// Value 0 disables the feature.
maxPendingRequests int32
pendingRequests atomic.Int32
maxPendingRequestLimitHit prometheus.Counter
jnyi marked this conversation as resolved.
Show resolved Hide resolved

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Shall we consider adding an alert around this metrics?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely once the counter is there.

pendingRequestsGauge prometheus.Gauge
}

// NewLimitedStoreServer creates a new limitedStoreServer.
Expand All @@ -135,10 +147,25 @@ func NewLimitedStoreServer(store storepb.StoreServer, reg prometheus.Registerer,
Name: "thanos_store_selects_dropped_total",
Help: "Number of select queries that were dropped due to configured limits.",
}, []string{"reason"}),
pendingRequestsGauge: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_store_server_pending_series_requests",
Help: "Number of pending series requests",
}),
maxPendingRequestLimitHit: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_store_server_hit_max_pending_series_requests_total",
Help: "Number of pending series requests that hit the max pending request limit",
}),
maxPendingRequests: selectLimits.PendingRequests,
}
}

func (s *limitedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
if s.maxPendingRequests > 0 && s.pendingRequests.Load() >= s.maxPendingRequests {
jnyi marked this conversation as resolved.
Show resolved Hide resolved
return status.Error(codes.ResourceExhausted, "too many pending series requests")
}
s.pendingRequestsGauge.Set(float64(s.pendingRequests.Add(1)))
defer s.pendingRequests.Add(-1)

seriesLimiter := s.newSeriesLimiter(s.failedRequestsCounter.WithLabelValues("series"))
chunksLimiter := s.newSamplesLimiter(s.failedRequestsCounter.WithLabelValues("chunks"))
limitedSrv := newLimitedServer(srv, seriesLimiter, chunksLimiter)
Expand Down
12 changes: 0 additions & 12 deletions pkg/store/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/thanos-io/thanos/pkg/store/storepb"

"go.uber.org/atomic"
)

// seriesStatsAggregator aggregates results from fanned-out queries into a histogram given their
Expand Down Expand Up @@ -157,12 +155,8 @@ type instrumentedStoreServer struct {
storepb.StoreServer
seriesRequested prometheus.Histogram
chunksRequested prometheus.Histogram

pendingRequests prometheus.Gauge
pendingRequestsCounter atomic.Int32
}

// NewInstrumentedStoreServer creates a new instrumentedStoreServer.
func NewInstrumentedStoreServer(reg prometheus.Registerer, store storepb.StoreServer) storepb.StoreServer {
return &instrumentedStoreServer{
StoreServer: store,
Expand All @@ -176,17 +170,11 @@ func NewInstrumentedStoreServer(reg prometheus.Registerer, store storepb.StoreSe
Help: "Number of requested chunks for Series calls",
Buckets: []float64{1, 100, 1000, 10000, 100000, 10000000, 100000000, 1000000000},
}),
pendingRequests: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_store_server_pending_series_requests",
Help: "Number of pending series requests",
}),
}
}

func (s *instrumentedStoreServer) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
instrumented := newInstrumentedServer(srv)
s.pendingRequests.Set(float64(s.pendingRequestsCounter.Add(1)))
defer s.pendingRequestsCounter.Add(-1)

if err := s.StoreServer.Series(req, instrumented); err != nil {
return err
Expand Down
Loading