Skip to content

Commit

Permalink
Add hedged request to Store Gateway
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Dec 3, 2024
1 parent bc1b5be commit dca3507
Show file tree
Hide file tree
Showing 43 changed files with 2,165 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
* [FEATURE] Ruler: Add support for per-user external labels #6340
* [ENHANCEMENT] Store Gateway: Add a hedged request to reduce the tail latency. #6388
* [ENHANCEMENT] Ingester: Add metrics to track succeed/failed native histograms. #6370
* [ENHANCEMENT] Query Frontend/Querier: Add an experimental flag `-querier.enable-promql-experimental-functions` to enable experimental promQL functions. #6355
* [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333
Expand Down
17 changes: 17 additions & 0 deletions docs/blocks-storage/store-gateway.md
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,23 @@ store_gateway:
# tenant(s) for processing will ignore them instead.
# CLI flag: -store-gateway.disabled-tenants
[disabled_tenants: <string> | default = ""]

hedged_request:
# If true, hedged requests are applied to object store calls. It can help
# with reducing tail latency.
# CLI flag: -store-gateway.hedged-request.enabled
[enabled: <boolean> | default = false]

# Maximum number of hedged requests allowed for each initial request. A high
# number can reduce latency but increase internal calls.
# CLI flag: -store-gateway.hedged-request.max-requests
[max_requests: <int> | default = 3]

# It is used to calculate a latency threshold to trigger hedged requests.
# For example, additional requests are triggered when the initial request
# response time exceeds the 90th percentile.
# CLI flag: -store-gateway.hedged-request.quantile
[quantile: <float> | default = 0.9]
```
### `blocks_storage_config`
Expand Down
17 changes: 17 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -5664,6 +5664,23 @@ sharding_ring:
# tenant(s) for processing will ignore them instead.
# CLI flag: -store-gateway.disabled-tenants
[disabled_tenants: <string> | default = ""]
hedged_request:
# If true, hedged requests are applied to object store calls. It can help with
# reducing tail latency.
# CLI flag: -store-gateway.hedged-request.enabled
[enabled: <boolean> | default = false]
# Maximum number of hedged requests allowed for each initial request. A high
# number can reduce latency but increase internal calls.
# CLI flag: -store-gateway.hedged-request.max-requests
[max_requests: <int> | default = 3]
# It is used to calculate a latency threshold to trigger hedged requests. For
# example, additional requests are triggered when the initial request response
# time exceeds the 90th percentile.
# CLI flag: -store-gateway.hedged-request.quantile
[quantile: <float> | default = 0.9]
```

### `tracing_config`
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,11 @@ require (
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/caio/go-tdigest v3.1.0+incompatible // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cristalhq/hedgedhttp v0.9.1 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dennwc/varint v1.0.0 // indirect
github.com/dgryski/go-metro v0.0.0-20200812162917-85c65e2d0165 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,8 @@ github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl
github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874 h1:N7oVaKyGp8bttX0bfZGmcGkjz7DLQXhAn3DNd3T0ous=
github.com/bradfitz/gomemcache v0.0.0-20230905024940-24af94b03874/go.mod h1:r5xuitiExdLAJ09PR7vBVENGvp4ZuTBeWTGtxuX3K+c=
github.com/caio/go-tdigest v3.1.0+incompatible h1:uoVMJ3Q5lXmVLCCqaMGHLBWnbGoN6Lpu7OAUPR60cds=
github.com/caio/go-tdigest v3.1.0+incompatible/go.mod h1:sHQM/ubZStBUmF1WbB8FAm8q9GjDajLC5T7ydxE3JHI=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
Expand Down Expand Up @@ -942,6 +944,8 @@ github.com/cortexproject/promqlsmith v0.0.0-20241121054008-8b48fe2471ef/go.mod h
github.com/cortexproject/weaveworks-common v0.0.0-20241129212437-96019edf21f1 h1:UoSixdl0sBUhfEOMpIGxFnJjp3/y/+nkw6Du7su05FE=
github.com/cortexproject/weaveworks-common v0.0.0-20241129212437-96019edf21f1/go.mod h1:7cl8fS/nivXe2DmBUUmr/3UGTJG2jVU2NRaIayR2Zjs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cristalhq/hedgedhttp v0.9.1 h1:g68L9cf8uUyQKQJwciD0A1Vgbsz+QgCjuB1I8FAsCDs=
github.com/cristalhq/hedgedhttp v0.9.1/go.mod h1:XkqWU6qVMutbhW68NnzjWrGtH8NUx1UfYqGYtHVKIsI=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
Expand Down Expand Up @@ -1390,6 +1394,8 @@ github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6Fm
github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7c=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1yfhB7XSJJKlFZKl/J+dCPAknuiaGOshXAs=
github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353 h1:X/79QL0b4YJVO5+OsPH9rF2u428CIrGL/jLmPsoOQQ4=
github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353/go.mod h1:N0SVk0uhy+E1PZ3C9ctsPRlvOPAFPkCNlcPBDkt0N3U=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
Expand Down
2 changes: 1 addition & 1 deletion integration/e2ecortex/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type S3Client struct {
}

func NewS3Client(cfg s3.Config) (*S3Client, error) {
writer, err := s3.NewBucketClient(cfg, "test", log.NewNopLogger())
writer, err := s3.NewBucketClient(cfg, nil, "test", log.NewNopLogger())
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/alertmanager/alertstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func NewAlertStore(ctx context.Context, cfg Config, cfgProvider bucket.TenantCon
return local.NewStore(cfg.Local)
}

bucketClient, err := bucket.NewClient(ctx, cfg.Config, "alertmanager-storage", logger, reg)
bucketClient, err := bucket.NewClient(ctx, cfg.Config, nil, "alertmanager-storage", logger, reg)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ type Compactor struct {
// NewCompactor makes a new Compactor.
func NewCompactor(compactorCfg Config, storageCfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, registerer prometheus.Registerer, limits *validation.Overrides) (*Compactor, error) {
bucketClientFactory := func(ctx context.Context) (objstore.InstrumentedBucket, error) {
return bucket.NewClient(ctx, storageCfg.Bucket, "compactor", logger, registerer)
return bucket.NewClient(ctx, storageCfg.Bucket, nil, "compactor", logger, registerer)
}

blocksGrouperFactory := compactorCfg.BlocksGrouperFactory
Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (t *Cortex) initRuntimeConfig() (services.Service, error) {
}
}
}
return bucket.NewClient(ctx, t.Cfg.RuntimeConfig.StorageConfig, "runtime-config", logger, registerer)
return bucket.NewClient(ctx, t.Cfg.RuntimeConfig.StorageConfig, nil, "runtime-config", logger, registerer)
}
serv, err := runtimeconfig.New(t.Cfg.RuntimeConfig, registerer, logger, bucketClientFactory)
if err == nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe
cfg.ingesterClientFactory = client.MakeIngesterClient
}

bucketClient, err := bucket.NewClient(context.Background(), cfg.BlocksStorageConfig.Bucket, "ingester", logger, registerer)
bucketClient, err := bucket.NewClient(context.Background(), cfg.BlocksStorageConfig.Bucket, nil, "ingester", logger, registerer)
if err != nil {
return nil, errors.Wrap(err, "failed to create the bucket client")
}
Expand Down Expand Up @@ -769,7 +769,7 @@ func New(cfg Config, limits *validation.Overrides, registerer prometheus.Registe
// this is a special version of ingester used by Flusher. This ingester is not ingesting anything, its only purpose is to react
// on Flush method and flush all opened TSDBs when called.
func NewForFlusher(cfg Config, limits *validation.Overrides, registerer prometheus.Registerer, logger log.Logger) (*Ingester, error) {
bucketClient, err := bucket.NewClient(context.Background(), cfg.BlocksStorageConfig.Bucket, "ingester", logger, registerer)
bucketClient, err := bucket.NewClient(context.Background(), cfg.BlocksStorageConfig.Bucket, nil, "ingester", logger, registerer)
if err != nil {
return nil, errors.Wrap(err, "failed to create the bucket client")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/purger/tenant_deletion_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (api *TenantDeletionAPI) isBlocksForUserDeleted(ctx context.Context, userID
}

func createBucketClient(cfg cortex_tsdb.BlocksStorageConfig, logger log.Logger, reg prometheus.Registerer) (objstore.InstrumentedBucket, error) {
bucketClient, err := bucket.NewClient(context.Background(), cfg.Bucket, "purger", logger, reg)
bucketClient, err := bucket.NewClient(context.Background(), cfg.Bucket, nil, "purger", logger, reg)
if err != nil {
return nil, errors.Wrap(err, "create bucket client")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func NewBlocksStoreQueryable(
func NewBlocksStoreQueryableFromConfig(querierCfg Config, gatewayCfg storegateway.Config, storageCfg cortex_tsdb.BlocksStorageConfig, limits BlocksStoreLimits, logger log.Logger, reg prometheus.Registerer) (*BlocksStoreQueryable, error) {
var stores BlocksStoreSet

bucketClient, err := bucket.NewClient(context.Background(), storageCfg.Bucket, "querier", logger, reg)
bucketClient, err := bucket.NewClient(context.Background(), storageCfg.Bucket, gatewayCfg.HedgedRequest.GetHedgedRoundTripper(), "querier", logger, reg)
if err != nil {
return nil, errors.Wrap(err, "failed to create bucket client")
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/ruler/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewRuleStore(ctx context.Context, cfg rulestore.Config, cfgProvider bucket.
return local.NewLocalRulesClient(cfg.Local, loader)
}

bucketClient, err := bucket.NewClient(ctx, cfg.Config, "ruler-storage", logger, reg)
bucketClient, err := bucket.NewClient(ctx, cfg.Config, nil, "ruler-storage", logger, reg)
if err != nil {
return nil, err
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/storage/bucket/azure/bucket_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package azure

import (
"net/http"

"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/thanos-io/objstore"
Expand All @@ -9,7 +11,7 @@ import (
yaml "gopkg.in/yaml.v2"
)

func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) {
func NewBucketClient(cfg Config, hedgedRoundTripper func(rt http.RoundTripper) http.RoundTripper, name string, logger log.Logger) (objstore.Bucket, error) {
bucketConfig := azure.Config{
StorageAccountName: cfg.StorageAccountName,
StorageAccountKey: cfg.StorageAccountKey.Value,
Expand Down Expand Up @@ -37,5 +39,5 @@ func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucke
return nil, err
}

return azure.NewBucket(logger, serialized, name, nil)
return azure.NewBucket(logger, serialized, name, hedgedRoundTripper)
}
11 changes: 6 additions & 5 deletions pkg/storage/bucket/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"flag"
"fmt"
"net/http"
"strings"

"github.com/go-kit/log"
Expand Down Expand Up @@ -103,17 +104,17 @@ func (cfg *Config) Validate() error {
}

// NewClient creates a new bucket client based on the configured backend
func NewClient(ctx context.Context, cfg Config, name string, logger log.Logger, reg prometheus.Registerer) (bucket objstore.InstrumentedBucket, err error) {
func NewClient(ctx context.Context, cfg Config, hedgedRoundTripper func(rt http.RoundTripper) http.RoundTripper, name string, logger log.Logger, reg prometheus.Registerer) (bucket objstore.InstrumentedBucket, err error) {
var client objstore.Bucket
switch cfg.Backend {
case S3:
client, err = s3.NewBucketClient(cfg.S3, name, logger)
client, err = s3.NewBucketClient(cfg.S3, hedgedRoundTripper, name, logger)
case GCS:
client, err = gcs.NewBucketClient(ctx, cfg.GCS, name, logger)
client, err = gcs.NewBucketClient(ctx, cfg.GCS, hedgedRoundTripper, name, logger)
case Azure:
client, err = azure.NewBucketClient(cfg.Azure, name, logger)
client, err = azure.NewBucketClient(cfg.Azure, hedgedRoundTripper, name, logger)
case Swift:
client, err = swift.NewBucketClient(cfg.Swift, name, logger)
client, err = swift.NewBucketClient(cfg.Swift, hedgedRoundTripper, name, logger)
case Filesystem:
client, err = filesystem.NewBucketClient(cfg.Filesystem)
default:
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bucket/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestNewClient(t *testing.T) {
require.NoError(t, err)

// Instance a new bucket client from the config
bucketClient, err := NewClient(context.Background(), cfg, "test", util_log.Logger, nil)
bucketClient, err := NewClient(context.Background(), cfg, nil, "test", util_log.Logger, nil)
require.Equal(t, testData.expectedErr, err)

if testData.expectedErr == nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/storage/bucket/gcs/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gcs

import (
"context"
"net/http"

"github.com/go-kit/log"
"github.com/thanos-io/objstore"
Expand All @@ -10,7 +11,7 @@ import (
)

// NewBucketClient creates a new GCS bucket client
func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Logger) (objstore.Bucket, error) {
func NewBucketClient(ctx context.Context, cfg Config, hedgedRoundTripper func(rt http.RoundTripper) http.RoundTripper, name string, logger log.Logger) (objstore.Bucket, error) {
bucketConfig := gcs.Config{
Bucket: cfg.BucketName,
ServiceAccount: cfg.ServiceAccount.Value,
Expand All @@ -23,5 +24,5 @@ func NewBucketClient(ctx context.Context, cfg Config, name string, logger log.Lo
return nil, err
}

return gcs.NewBucket(ctx, logger, serialized, name, nil)
return gcs.NewBucket(ctx, logger, serialized, name, hedgedRoundTripper)
}
43 changes: 43 additions & 0 deletions pkg/storage/bucket/hedged_request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package bucket

import (
"errors"
"flag"
"net/http"

"github.com/thanos-io/thanos/pkg/exthttp"
)

var (
errInvalidQuantile = errors.New("invalid hedged request quantile, it must be between 0 and 1")
)

type HedgedRequestConfig struct {
Enabled bool `yaml:"enabled"`
MaxRequests uint `yaml:"max_requests"`
Quantile float64 `yaml:"quantile"`
}

func (cfg *HedgedRequestConfig) RegisterFlagsWithPrefix(f *flag.FlagSet, prefix string) {
f.BoolVar(&cfg.Enabled, prefix+"hedged-request.enabled", false, "If true, hedged requests are applied to object store calls. It can help with reducing tail latency.")
f.UintVar(&cfg.MaxRequests, prefix+"hedged-request.max-requests", 3, "Maximum number of hedged requests allowed for each initial request. A high number can reduce latency but increase internal calls.")
f.Float64Var(&cfg.Quantile, prefix+"hedged-request.quantile", 0.9, "It is used to calculate a latency threshold to trigger hedged requests. For example, additional requests are triggered when the initial request response time exceeds the 90th percentile.")
}

func (cfg *HedgedRequestConfig) GetHedgedRoundTripper() func(rt http.RoundTripper) http.RoundTripper {
return exthttp.CreateHedgedTransportWithConfig(exthttp.CustomBucketConfig{
HedgingConfig: exthttp.HedgingConfig{
Enabled: cfg.Enabled,
UpTo: cfg.MaxRequests,
Quantile: cfg.Quantile,
},
})
}

func (cfg *HedgedRequestConfig) Validate() error {
if cfg.Quantile > 1 || cfg.Quantile < 0 {
return errInvalidQuantile
}

return nil
}
36 changes: 36 additions & 0 deletions pkg/storage/bucket/hedged_request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package bucket

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestHedgedRequest_Validate(t *testing.T) {
t.Parallel()
tests := map[string]struct {
cfg *HedgedRequestConfig
expected error
}{
"should fail if hedged request quantile is less than 0": {
cfg: &HedgedRequestConfig{
Quantile: -0.1,
},
expected: errInvalidQuantile,
},
"should fail if hedged request quantile is more than 1": {
cfg: &HedgedRequestConfig{
Quantile: 1.1,
},
expected: errInvalidQuantile,
},
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
err := testData.cfg.Validate()
require.Equal(t, testData.expected, err)
})
}

}
5 changes: 3 additions & 2 deletions pkg/storage/bucket/s3/bucket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"net/http"
"time"

"github.com/go-kit/log"
Expand All @@ -21,13 +22,13 @@ var defaultRetryMinBackoff = 5 * time.Second
var defaultRetryMaxBackoff = 1 * time.Minute

// NewBucketClient creates a new S3 bucket client
func NewBucketClient(cfg Config, name string, logger log.Logger) (objstore.Bucket, error) {
func NewBucketClient(cfg Config, hedgedRoundTripper func(rt http.RoundTripper) http.RoundTripper, name string, logger log.Logger) (objstore.Bucket, error) {
s3Cfg, err := newS3Config(cfg)
if err != nil {
return nil, err
}

bucket, err := s3.NewBucketWithConfig(logger, s3Cfg, name, nil)
bucket, err := s3.NewBucketWithConfig(logger, s3Cfg, name, hedgedRoundTripper)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/bucket/sse_bucket_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestSSEBucketClient_Upload_ShouldInjectCustomSSEConfig(t *testing.T) {
BucketLookupType: s3.BucketPathLookup,
}

s3Client, err := s3.NewBucketClient(s3Cfg, "test", log.NewNopLogger())
s3Client, err := s3.NewBucketClient(s3Cfg, nil, "test", log.NewNopLogger())
require.NoError(t, err)

// Configure the config provider with NO KMS key ID.
Expand Down
Loading

0 comments on commit dca3507

Please sign in to comment.