Skip to content

Commit

Permalink
distributor: allow enforcing maximum buffer size for write request po…
Browse files Browse the repository at this point in the history
…ol (#8082)

This commit adds a new experimental CLI parameter called -distributor.max-request-pool-buffer-size that allows enforcing a maximum size in bytes for the pool buffers used for decompressing request bodies in both Prometheus and OTLP handlers. By default, this is set to 0, which means no maximum will be enforced (same behavior as currently).

Additionally, a new metric named cortex_distributor_uncompressed_request_body_size_bytes has been included in order to provide visibility on the average of uncompressed body payload, thus allowing for an informed decision to be made when setting the previously mentioned maximum.

---------

Signed-off-by: Miguel Ángel Ortuño <[email protected]>
  • Loading branch information
ortuman authored May 10, 2024
1 parent 271b5ee commit d7987c9
Show file tree
Hide file tree
Showing 18 changed files with 232 additions and 105 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
* [ENHANCEMENT] Querying: Remove OpEmptyMatch from regex concatenations. #8012
* [ENHANCEMENT] Store-gateway: add `-blocks-storage.bucket-store.max-concurrent-queue-timeout`. When set, queries at the store-gateway's query gate will not wait longer than that to execute. If a query reaches the wait timeout, then the querier will retry the blocks on a different store-gateway. If all store-gateways are unavailable, then the query will fail with `err-mimir-store-consistency-check-failed`. #7777
* [ENHANCEMENT] Ingester: Optimize querying with regexp matchers. #8106
* [ENHANCEMENT] Distributor: Introduce `-distributor.max-request-pool-buffer-size` to allow configuring the maximum size of the request pool buffers. #8082
* [BUGFIX] Rules: improve error handling when querier is local to the ruler. #7567
* [BUGFIX] Querier, store-gateway: Protect against panics raised during snappy encoding. #7520
* [BUGFIX] Ingester: Prevent timely compaction of empty blocks. #7624
Expand Down
11 changes: 11 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,17 @@
"fieldType": "int",
"fieldCategory": "advanced"
},
{
"kind": "field",
"name": "max_request_pool_buffer_size",
"required": false,
"desc": "Max size of the pooled buffers used for marshaling write requests. If 0, no max size is enforced.",
"fieldValue": null,
"fieldDefaultValue": 0,
"fieldFlag": "distributor.max-request-pool-buffer-size",
"fieldType": "int",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "remote_timeout",
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,8 @@ Usage of ./cmd/mimir/mimir:
[experimental] Maximum number of exemplars per series per request. 0 to disable limit in request. The exceeding exemplars are dropped.
-distributor.max-recv-msg-size int
Max message size in bytes that the distributors will accept for incoming push requests to the remote write API. If exceeded, the request will be rejected. (default 104857600)
-distributor.max-request-pool-buffer-size int
[experimental] Max size of the pooled buffers used for marshaling write requests. If 0, no max size is enforced.
-distributor.metric-relabeling-enabled
[experimental] Enable metric relabeling for the tenant. This configuration option can be used to forcefully disable metric relabeling on a per-tenant basis. (default true)
-distributor.otel-metric-suffixes-enabled
Expand Down
2 changes: 2 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ The following features are currently experimental:
- `-distributor.retry-after-header.max-backoff-exponent`
- Limit exemplars per series per request
- `-distributor.max-exemplars-per-series-per-request`
- Enforce a maximum pool buffer size for write requests
- `-distributor.max-request-pool-buffer-size`
- Hash ring
- Disabling ring heartbeat timeouts
- `-distributor.ring.heartbeat-timeout=0`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,11 @@ ha_tracker:
# CLI flag: -distributor.max-recv-msg-size
[max_recv_msg_size: <int> | default = 104857600]
# (experimental) Max size of the pooled buffers used for marshaling write
# requests. If 0, no max size is enforced.
# CLI flag: -distributor.max-request-pool-buffer-size
[max_request_pool_buffer_size: <int> | default = 0]
# (advanced) Timeout for downstream ingesters.
# CLI flag: -distributor.remote-timeout
[remote_timeout: <duration> | default = 2s]
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ const OTLPPushEndpoint = "/otlp/v1/metrics"
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, reg prometheus.Registerer, limits *validation.Overrides) {
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, a.logger), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.EnableOtelMetadataStorage, limits, pushConfig.RetryConfig, reg, d.PushWithMiddlewares, a.logger), true, false, "POST")
a.RegisterRoute(PrometheusPushEndpoint, distributor.Handler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, a.logger), true, false, "POST")
a.RegisterRoute(OTLPPushEndpoint, distributor.OTLPHandler(pushConfig.MaxRecvMsgSize, d.RequestBufferPool, a.sourceIPs, a.cfg.SkipLabelNameValidationHeader, a.cfg.EnableOtelMetadataStorage, limits, pushConfig.RetryConfig, d.PushWithMiddlewares, d.PushMetrics, reg, a.logger), true, false, "POST")

a.indexPage.AddLinks(defaultWeight, "Distributor", []IndexPageLink{
{Desc: "Ring status", Path: "/distributor/ring"},
Expand Down
61 changes: 59 additions & 2 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,13 @@ type Distributor struct {
exemplarValidationMetrics *exemplarValidationMetrics
metadataValidationMetrics *metadataValidationMetrics

// Metrics to be passed to distributor push handlers
PushMetrics *PushMetrics

PushWithMiddlewares PushFunc

RequestBufferPool util.Pool

// Pool of []byte used when marshalling write requests.
writeRequestBytePool sync.Pool

Expand All @@ -177,8 +182,9 @@ type Config struct {
RetryConfig RetryConfig `yaml:"retry_after_header"`
HATrackerConfig HATrackerConfig `yaml:"ha_tracker"`

MaxRecvMsgSize int `yaml:"max_recv_msg_size" category:"advanced"`
RemoteTimeout time.Duration `yaml:"remote_timeout" category:"advanced"`
MaxRecvMsgSize int `yaml:"max_recv_msg_size" category:"advanced"`
MaxRequestPoolBufferSize int `yaml:"max_request_pool_buffer_size" category:"experimental"`
RemoteTimeout time.Duration `yaml:"remote_timeout" category:"advanced"`

// Distributors ring
DistributorRing RingConfig `yaml:"ring"`
Expand Down Expand Up @@ -225,6 +231,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
cfg.RetryConfig.RegisterFlags(f)

f.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "Max message size in bytes that the distributors will accept for incoming push requests to the remote write API. If exceeded, the request will be rejected.")
f.IntVar(&cfg.MaxRequestPoolBufferSize, "distributor.max-request-pool-buffer-size", 0, "Max size of the pooled buffers used for marshaling write requests. If 0, no max size is enforced.")
f.DurationVar(&cfg.RemoteTimeout, "distributor.remote-timeout", 2*time.Second, "Timeout for downstream ingesters.")
f.BoolVar(&cfg.WriteRequestsBufferPoolingEnabled, "distributor.write-requests-buffer-pooling-enabled", true, "Enable pooling of buffers used for marshaling write requests.")
f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcMethodLimiter, "distributor.limit-inflight-requests-using-grpc-method-limiter", true, "When enabled, in-flight write requests limit is checked as soon as the gRPC request is received, before the request is decoded and parsed.")
Expand All @@ -251,6 +258,44 @@ const (
limitLabel = "limit"
)

type PushMetrics struct {
otlpRequestCounter *prometheus.CounterVec
uncompressedBodySize *prometheus.HistogramVec
}

func newPushMetrics(reg prometheus.Registerer) *PushMetrics {
return &PushMetrics{
otlpRequestCounter: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_distributor_otlp_requests_total",
Help: "The total number of OTLP requests that have come in to the distributor.",
}, []string{"user"}),
uncompressedBodySize: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_distributor_uncompressed_request_body_size_bytes",
Help: "Size of uncompressed request body in bytes.",
NativeHistogramBucketFactor: 1.1,
NativeHistogramMinResetDuration: 1 * time.Hour,
NativeHistogramMaxBucketNumber: 100,
}, []string{"user"}),
}
}

func (m *PushMetrics) IncOTLPRequest(user string) {
if m != nil {
m.otlpRequestCounter.WithLabelValues(user).Inc()
}
}

func (m *PushMetrics) ObserveUncompressedBodySize(user string, size float64) {
if m != nil {
m.uncompressedBodySize.WithLabelValues(user).Observe(size)
}
}

func (m *PushMetrics) deleteUserMetrics(user string) {
m.otlpRequestCounter.DeleteLabelValues(user)
m.uncompressedBodySize.DeleteLabelValues(user)
}

// New constructs a new Distributor
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, activeGroupsCleanupService *util.ActiveGroupsCleanupService, ingestersRing ring.ReadRing, partitionsRing *ring.PartitionInstanceRing, canJoinDistributorsRing bool, reg prometheus.Registerer, log log.Logger) (*Distributor, error) {
clientMetrics := ingester_client.NewMetrics(reg)
Expand All @@ -270,10 +315,18 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
subservices := []services.Service(nil)
subservices = append(subservices, haTracker)

var requestBufferPool util.Pool
if cfg.MaxRequestPoolBufferSize > 0 {
requestBufferPool = util.NewBucketedBufferPool(1<<10, cfg.MaxRequestPoolBufferSize, 4)
} else {
requestBufferPool = util.NewBufferPool()
}

d := &Distributor{
cfg: cfg,
log: log,
ingestersRing: ingestersRing,
RequestBufferPool: requestBufferPool,
partitionsRing: partitionsRing,
ingesterPool: NewPool(cfg.PoolConfig, ingestersRing, cfg.IngesterClientFactory, log),
healthyInstancesCount: atomic.NewUint32(0),
Expand Down Expand Up @@ -376,6 +429,8 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Name: "cortex_distributor_hash_collisions_total",
Help: "Number of times a hash collision was detected when de-duplicating samples.",
}),

PushMetrics: newPushMetrics(reg),
}

// Initialize expected rejected request labels
Expand Down Expand Up @@ -592,6 +647,8 @@ func (d *Distributor) cleanupInactiveUser(userID string) {
d.nonHASamples.DeleteLabelValues(userID)
d.latestSeenSampleTimestampPerUser.DeleteLabelValues(userID)

d.PushMetrics.deleteUserMetrics(userID)

filter := prometheus.Labels{"user": userID}
d.dedupedSamples.DeletePartialMatch(filter)
d.discardedSamplesTooManyHaClusters.DeletePartialMatch(filter)
Expand Down
37 changes: 17 additions & 20 deletions pkg/distributor/otel.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/grafana/dskit/tenant"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
prometheustranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheus"
Expand All @@ -44,23 +43,20 @@ const (
// OTLPHandler is an http.Handler accepting OTLP write requests.
func OTLPHandler(
maxRecvMsgSize int,
requestBufferPool util.Pool,
sourceIPs *middleware.SourceIPExtractor,
allowSkipLabelNameValidation bool,
enableOtelMetadataStorage bool,
limits *validation.Overrides,
retryCfg RetryConfig,
reg prometheus.Registerer,
push PushFunc,
pushMetrics *PushMetrics,
reg prometheus.Registerer,
logger log.Logger,
) http.Handler {
discardedDueToOtelParseError := validation.DiscardedSamplesCounter(reg, otelParseError)

otlpRequestsCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_distributor_otlp_requests_total",
Help: "The total number of OTLP requests that have come in to the distributor.",
}, []string{"user"})

return handler(maxRecvMsgSize, sourceIPs, allowSkipLabelNameValidation, limits, retryCfg, push, logger, func(ctx context.Context, r *http.Request, maxRecvMsgSize int, buffers *util.RequestBuffers, req *mimirpb.PreallocWriteRequest, logger log.Logger) error {
return handler(maxRecvMsgSize, requestBufferPool, sourceIPs, allowSkipLabelNameValidation, limits, retryCfg, push, logger, func(ctx context.Context, r *http.Request, maxRecvMsgSize int, buffers *util.RequestBuffers, req *mimirpb.PreallocWriteRequest, logger log.Logger) error {
contentType := r.Header.Get("Content-Type")
contentEncoding := r.Header.Get("Content-Encoding")
var compression util.CompressionType
Expand All @@ -73,27 +69,27 @@ func OTLPHandler(
return httpgrpc.Errorf(http.StatusUnsupportedMediaType, "unsupported compression: %s. Only \"gzip\" or no compression supported", contentEncoding)
}

var decoderFunc func(io.ReadCloser) (pmetricotlp.ExportRequest, error)
var decoderFunc func(io.ReadCloser) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error)
switch contentType {
case pbContentType:
decoderFunc = func(reader io.ReadCloser) (pmetricotlp.ExportRequest, error) {
decoderFunc = func(reader io.ReadCloser) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) {
exportReq := pmetricotlp.NewExportRequest()
unmarshaler := otlpProtoUnmarshaler{
request: &exportReq,
}
err := util.ParseProtoReader(ctx, reader, int(r.ContentLength), maxRecvMsgSize, buffers, unmarshaler, compression)
protoBodySize, err := util.ParseProtoReader(ctx, reader, int(r.ContentLength), maxRecvMsgSize, buffers, unmarshaler, compression)
var tooLargeErr util.MsgSizeTooLargeErr
if errors.As(err, &tooLargeErr) {
return exportReq, httpgrpc.Errorf(http.StatusRequestEntityTooLarge, distributorMaxWriteMessageSizeErr{
return exportReq, 0, httpgrpc.Errorf(http.StatusRequestEntityTooLarge, distributorMaxWriteMessageSizeErr{
actual: tooLargeErr.Actual,
limit: tooLargeErr.Limit,
}.Error())
}
return exportReq, err
return exportReq, protoBodySize, err
}

case jsonContentType:
decoderFunc = func(reader io.ReadCloser) (pmetricotlp.ExportRequest, error) {
decoderFunc = func(reader io.ReadCloser) (req pmetricotlp.ExportRequest, uncompressedBodySize int, err error) {
exportReq := pmetricotlp.NewExportRequest()
sz := int(r.ContentLength)
if sz > 0 {
Expand All @@ -105,23 +101,23 @@ func OTLPHandler(
var err error
reader, err = gzip.NewReader(reader)
if err != nil {
return exportReq, errors.Wrap(err, "create gzip reader")
return exportReq, 0, errors.Wrap(err, "create gzip reader")
}
}

reader = http.MaxBytesReader(nil, reader, int64(maxRecvMsgSize))
if _, err := buf.ReadFrom(reader); err != nil {
if util.IsRequestBodyTooLarge(err) {
return exportReq, httpgrpc.Errorf(http.StatusRequestEntityTooLarge, distributorMaxWriteMessageSizeErr{
return exportReq, 0, httpgrpc.Errorf(http.StatusRequestEntityTooLarge, distributorMaxWriteMessageSizeErr{
actual: -1,
limit: maxRecvMsgSize,
}.Error())
}

return exportReq, errors.Wrap(err, "read write request")
return exportReq, 0, errors.Wrap(err, "read write request")
}

return exportReq, exportReq.UnmarshalJSON(buf.Bytes())
return exportReq, buf.Len(), exportReq.UnmarshalJSON(buf.Bytes())
}

default:
Expand All @@ -142,7 +138,7 @@ func OTLPHandler(
spanLogger.SetTag("content_encoding", contentEncoding)
spanLogger.SetTag("content_length", r.ContentLength)

otlpReq, err := decoderFunc(r.Body)
otlpReq, uncompressedBodySize, err := decoderFunc(r.Body)
if err != nil {
return err
}
Expand All @@ -155,7 +151,8 @@ func OTLPHandler(
}
addSuffixes := limits.OTelMetricSuffixesEnabled(tenantID)

otlpRequestsCounter.WithLabelValues(tenantID).Inc()
pushMetrics.IncOTLPRequest(tenantID)
pushMetrics.ObserveUncompressedBodySize(tenantID, float64(uncompressedBodySize))

metrics, err := otelMetricsToTimeseries(tenantID, addSuffixes, discardedDueToOtelParseError, logger, otlpReq.Metrics())
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions pkg/distributor/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/go-kit/log"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -67,7 +66,7 @@ func BenchmarkOTLPHandler(b *testing.B) {
validation.NewMockTenantLimits(map[string]*validation.Limits{}),
)
require.NoError(b, err)
handler := OTLPHandler(100000, nil, false, true, limits, RetryConfig{}, prometheus.NewPedanticRegistry(), pushFunc, log.NewNopLogger())
handler := OTLPHandler(100000, nil, nil, false, true, limits, RetryConfig{}, pushFunc, nil, nil, log.NewNopLogger())

b.Run("protobuf", func(b *testing.B) {
req := createOTLPProtoRequest(b, exportReq, false)
Expand Down
27 changes: 16 additions & 11 deletions pkg/distributor/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@
package distributor

import (
"bytes"
"context"
"errors"
"flag"
"fmt"
"math/rand"
"net/http"
"strconv"
"sync"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -38,11 +36,6 @@ type PushFunc func(ctx context.Context, req *Request) error
type parserFunc func(ctx context.Context, r *http.Request, maxSize int, buffers *util.RequestBuffers, req *mimirpb.PreallocWriteRequest, logger log.Logger) error

var (
bufferPool = sync.Pool{
New: func() any {
return bytes.NewBuffer(make([]byte, 0, 256*1024))
},
}
errRetryBaseLessThanOneSecond = errors.New("retry base duration should not be less than 1 second")
errNonPositiveMaxBackoffExponent = errors.New("max backoff exponent should be a positive value")
)
Expand Down Expand Up @@ -78,19 +71,30 @@ func (cfg *RetryConfig) Validate() error {
// Handler is a http.Handler which accepts WriteRequests.
func Handler(
maxRecvMsgSize int,
requestBufferPool util.Pool,
sourceIPs *middleware.SourceIPExtractor,
allowSkipLabelNameValidation bool,
limits *validation.Overrides,
retryCfg RetryConfig,
push PushFunc,
pushMetrics *PushMetrics,
logger log.Logger,
) http.Handler {
return handler(maxRecvMsgSize, sourceIPs, allowSkipLabelNameValidation, limits, retryCfg, push, logger, func(ctx context.Context, r *http.Request, maxRecvMsgSize int, buffers *util.RequestBuffers, req *mimirpb.PreallocWriteRequest, _ log.Logger) error {
err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, buffers, req, util.RawSnappy)
return handler(maxRecvMsgSize, requestBufferPool, sourceIPs, allowSkipLabelNameValidation, limits, retryCfg, push, logger, func(ctx context.Context, r *http.Request, maxRecvMsgSize int, buffers *util.RequestBuffers, req *mimirpb.PreallocWriteRequest, _ log.Logger) error {
protoBodySize, err := util.ParseProtoReader(ctx, r.Body, int(r.ContentLength), maxRecvMsgSize, buffers, req, util.RawSnappy)
if errors.Is(err, util.MsgSizeTooLargeErr{}) {
err = distributorMaxWriteMessageSizeErr{actual: int(r.ContentLength), limit: maxRecvMsgSize}
}
return err
if err != nil {
return err
}
tenantID, err := tenant.TenantID(ctx)
if err != nil {
return err
}
pushMetrics.ObserveUncompressedBodySize(tenantID, float64(protoBodySize))

return nil
})
}

Expand All @@ -108,6 +112,7 @@ func (e distributorMaxWriteMessageSizeErr) Error() string {

func handler(
maxRecvMsgSize int,
requestBufferPool util.Pool,
sourceIPs *middleware.SourceIPExtractor,
allowSkipLabelNameValidation bool,
limits *validation.Overrides,
Expand All @@ -126,7 +131,7 @@ func handler(
}
}
supplier := func() (*mimirpb.WriteRequest, func(), error) {
rb := util.NewRequestBuffers(&bufferPool)
rb := util.NewRequestBuffers(requestBufferPool)
var req mimirpb.PreallocWriteRequest
if err := parser(ctx, r, maxRecvMsgSize, rb, &req, logger); err != nil {
// Check for httpgrpc error, default to client error if parsing failed
Expand Down
Loading

0 comments on commit d7987c9

Please sign in to comment.