Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Hedged-request: Handling in HTTP Client Configuration #119

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ Generally, you have two ways of using `objstore` module:

First is to import the provider you want e.g. [`github.com/thanos-io/objstore/providers/s3`](providers/s3) and instantiate it with available constructor (e.g. `NewBucket`).

The second option is to use the factory `NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registerer, component string)` that will instantiate the object storage client based on YAML file provided. The YAML file has generally the format like this:
The second option is to use the factory `NewBucket(logger log.Logger, confContentYaml []byte, reg prometheus.Registerer, component string, rt http.RoundTripper)` that will instantiate the object storage client based on YAML file provided. The YAML file has generally the format like this:

```yaml
type: <PROVIDER_TYPE>
Expand All @@ -114,6 +114,8 @@ config:

The exact option depends on provider and are in sections below.

`NewBucket` function now accepts an `http.RoundTripper` parameter allows clients to provide a custom transport for HTTP requests. This change allows the use of various HTTP clients, including hedged HTTP transports.

> NOTE: All code snippets are auto-generated from code and up-to-date.

Check out the [Thanos documentation](https://thanos.io/tip/thanos/storage.md/) to see how Thanos uses this module.
Expand Down
15 changes: 8 additions & 7 deletions client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package client
import (
"context"
"fmt"
"net/http"
"strings"

"github.com/thanos-io/objstore"
Expand Down Expand Up @@ -49,7 +50,7 @@ type BucketConfig struct {

// NewBucket initializes and returns new object storage clients.
// NOTE: confContentYaml can contain secrets.
func NewBucket(logger log.Logger, confContentYaml []byte, component string) (objstore.Bucket, error) {
func NewBucket(logger log.Logger, confContentYaml []byte, component string, rt http.RoundTripper) (objstore.Bucket, error) {
level.Info(logger).Log("msg", "loading bucket configuration")
bucketConf := &BucketConfig{}
if err := yaml.UnmarshalStrict(confContentYaml, bucketConf); err != nil {
Expand All @@ -64,25 +65,25 @@ func NewBucket(logger log.Logger, confContentYaml []byte, component string) (obj
var bucket objstore.Bucket
switch strings.ToUpper(string(bucketConf.Type)) {
case string(GCS):
bucket, err = gcs.NewBucket(context.Background(), logger, config, component)
bucket, err = gcs.NewBucket(context.Background(), logger, config, component, rt)
case string(S3):
bucket, err = s3.NewBucket(logger, config, component)
bucket, err = s3.NewBucket(logger, config, component, rt)
case string(AZURE):
bucket, err = azure.NewBucket(logger, config, component)
bucket, err = azure.NewBucket(logger, config, component, rt)
case string(SWIFT):
bucket, err = swift.NewContainer(logger, config)
Copy link
Member

@GiedriusS GiedriusS May 31, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not supported by all providers? 🤔 I checked a few like swift/bos and I don't see a reason why this is not implemented there.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not supported by all providers? 🤔 I checked a few like swift/bos and I don't see a reason why this is not implemented there.

i added this parameter to the providers which had http.Client , swift does not uses NewBucket but bos/obs can have this parameter i think also i see that obs makes new ObsClient and bos uses NewClient which makes a BOS client...wdyt?

case string(COS):
bucket, err = cos.NewBucket(logger, config, component)
bucket, err = cos.NewBucket(logger, config, component, rt)
case string(ALIYUNOSS):
bucket, err = oss.NewBucket(logger, config, component)
case string(FILESYSTEM):
bucket, err = filesystem.NewBucketFromConfig(config)
case string(BOS):
bucket, err = bos.NewBucket(logger, config, component)
case string(OCI):
bucket, err = oci.NewBucket(logger, config)
bucket, err = oci.NewBucket(logger, config, rt)
case string(OBS):
bucket, err = obs.NewBucket(logger, config)
bucket, err = obs.NewBucket(logger, config, rt)
default:
return nil, errors.Errorf("bucket with type %s is not supported", bucketConf.Type)
}
Expand Down
7 changes: 4 additions & 3 deletions client/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package client
import (
"context"
"fmt"
"net/http"
"os"

"github.com/go-kit/log"
Expand All @@ -23,7 +24,7 @@ func ExampleBucket() {
}

// Create a new bucket.
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example")
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example", http.DefaultTransport)
if err != nil {
panic(err)
}
Expand All @@ -46,7 +47,7 @@ func ExampleTracingBucketUsingOpenTracing() { //nolint:govet
}

// Create a new bucket.
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example")
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example", http.DefaultTransport)
if err != nil {
panic(err)
}
Expand All @@ -72,7 +73,7 @@ func ExampleTracingBucketUsingOpenTelemetry() { //nolint:govet
}

// Create a new bucket.
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example")
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example", http.DefaultTransport)
if err != nil {
panic(err)
}
Expand Down
11 changes: 6 additions & 5 deletions providers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package azure
import (
"context"
"io"
"net/http"
"os"
"strings"
"testing"
Expand Down Expand Up @@ -143,7 +144,7 @@ type Bucket struct {
}

// NewBucket returns a new Bucket using the provided Azure config.
func NewBucket(logger log.Logger, azureConfig []byte, component string) (*Bucket, error) {
func NewBucket(logger log.Logger, azureConfig []byte, component string, rt http.RoundTripper) (*Bucket, error) {
level.Debug(logger).Log("msg", "creating new Azure bucket connection", "component", component)
conf, err := parseConfig(azureConfig)
if err != nil {
Expand All @@ -152,16 +153,16 @@ func NewBucket(logger log.Logger, azureConfig []byte, component string) (*Bucket
if conf.MSIResource != "" {
level.Warn(logger).Log("msg", "The field msi_resource has been deprecated and should no longer be set")
}
return NewBucketWithConfig(logger, conf, component)
return NewBucketWithConfig(logger, conf, component, rt)
}

// NewBucketWithConfig returns a new Bucket using the provided Azure config struct.
func NewBucketWithConfig(logger log.Logger, conf Config, component string) (*Bucket, error) {
func NewBucketWithConfig(logger log.Logger, conf Config, component string, rt http.RoundTripper) (*Bucket, error) {
if err := conf.validate(); err != nil {
return nil, err
}

containerClient, err := getContainerClient(conf)
containerClient, err := getContainerClient(conf, rt)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -352,7 +353,7 @@ func NewTestBucket(t testing.TB, component string) (objstore.Bucket, func(), err
if err != nil {
return nil, nil, err
}
bkt, err := NewBucket(log.NewNopLogger(), bc, component)
bkt, err := NewBucket(log.NewNopLogger(), bc, component, http.DefaultTransport)
if err != nil {
t.Errorf("Cannot create Azure storage container:")
return nil, nil, err
Expand Down
16 changes: 12 additions & 4 deletions providers/azure/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,19 @@ import (
// DirDelim is the delimiter used to model a directory structure in an object store bucket.
const DirDelim = "/"

func getContainerClient(conf Config) (*container.Client, error) {
dt, err := exthttp.DefaultTransport(conf.HTTPConfig)
if err != nil {
return nil, err
func getContainerClient(conf Config, rt http.RoundTripper) (*container.Client, error) {
var dt http.RoundTripper
var err error

if rt != nil {
dt = rt
} else {
dt, err = exthttp.DefaultTransport(conf.HTTPConfig)
if err != nil {
return nil, err
}
}

opt := &container.ClientOptions{
ClientOptions: azcore.ClientOptions{
Retry: policy.RetryOptions{
Expand Down
17 changes: 11 additions & 6 deletions providers/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func parseConfig(conf []byte) (Config, error) {
}

// NewBucket returns a new Bucket using the provided cos configuration.
func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) {
func NewBucket(logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -105,11 +105,11 @@ func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error
return nil, errors.Wrap(err, "parsing cos configuration")
}

return NewBucketWithConfig(logger, config, component)
return NewBucketWithConfig(logger, config, component, rt)
}

// NewBucketWithConfig returns a new Bucket using the provided cos config values.
func NewBucketWithConfig(logger log.Logger, config Config, component string) (*Bucket, error) {
func NewBucketWithConfig(logger log.Logger, config Config, component string, rt http.RoundTripper) (*Bucket, error) {
if err := config.validate(); err != nil {
return nil, errors.Wrap(err, "validate cos configuration")
}
Expand All @@ -128,7 +128,12 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B
}
}
b := &cos.BaseURL{BucketURL: bucketURL}
tpt, _ := exthttp.DefaultTransport(config.HTTPConfig)
var tpt http.RoundTripper
if rt != nil {
tpt = rt
} else {
tpt, _ = exthttp.DefaultTransport(config.HTTPConfig)
}
client := cos.NewClient(b, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: config.SecretId,
Expand Down Expand Up @@ -485,7 +490,7 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) {
return nil, nil, err
}

b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test")
b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test", http.DefaultTransport)
if err != nil {
return nil, nil, err
}
Expand All @@ -506,7 +511,7 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) {
return nil, nil, err
}

b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test")
b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test", http.DefaultTransport)
if err != nil {
return nil, nil, err
}
Expand Down
23 changes: 11 additions & 12 deletions providers/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,16 @@ func parseConfig(conf []byte) (Config, error) {
}

// NewBucket returns a new Bucket against the given bucket handle.
func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string) (*Bucket, error) {
func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) {
config, err := parseConfig(conf)
if err != nil {
return nil, err
}
return NewBucketWithConfig(ctx, logger, config, component)
return NewBucketWithConfig(ctx, logger, config, component, rt)
}

// NewBucketWithConfig returns a new Bucket with gcs Config struct.
func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string) (*Bucket, error) {
func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string, rt http.RoundTripper) (*Bucket, error) {
if gc.Bucket == "" {
return nil, errors.New("missing Google Cloud Storage bucket name for stored blocks")
}
Expand All @@ -103,7 +103,7 @@ func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, comp

if !gc.UseGRPC {
var err error
opts, err = appendHttpOptions(gc, opts)
opts, err = appendHttpOptions(gc, opts, rt)
if err != nil {
return nil, err
}
Expand All @@ -112,25 +112,24 @@ func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, comp
return newBucket(ctx, logger, gc, opts)
}

func appendHttpOptions(gc Config, opts []option.ClientOption) ([]option.ClientOption, error) {
func appendHttpOptions(gc Config, opts []option.ClientOption, rt http.RoundTripper) ([]option.ClientOption, error) {
// Check if a roundtripper has been set in the config
// otherwise build the default transport.
var rt http.RoundTripper
if gc.HTTPConfig.Transport != nil {
rt = gc.HTTPConfig.Transport
var tpt http.RoundTripper
if rt != nil {
tpt = rt
} else {
var err error
rt, err = exthttp.DefaultTransport(gc.HTTPConfig)
tpt, err = exthttp.DefaultTransport(gc.HTTPConfig)
if err != nil {
return nil, err
}
}

// GCS uses some defaults when "options.WithHTTPClient" is not used that are important when we call
// htransport.NewTransport namely the scopes that are then used for OAth authentication. So to build our own
// http client we need to se those defaults
opts = append(opts, option.WithScopes(storage.ScopeFullControl, "https://www.googleapis.com/auth/cloud-platform"))
gRT, err := htransport.NewTransport(context.Background(), rt, opts...)
gRT, err := htransport.NewTransport(context.Background(), tpt, opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -302,7 +301,7 @@ func NewTestBucket(t testing.TB, project string) (objstore.Bucket, func(), error
return nil, nil, err
}

b, err := NewBucket(ctx, log.NewNopLogger(), bc, "thanos-e2e-test")
b, err := NewBucket(ctx, log.NewNopLogger(), bc, "thanos-e2e-test", http.DefaultTransport)
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion providers/gcs/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestNewBucketWithConfig_ShouldCreateGRPC(t *testing.T) {
err = os.Setenv("STORAGE_EMULATOR_HOST_GRPC", svr.Addr)
testutil.Ok(t, err)

bkt, err := NewBucketWithConfig(context.Background(), log.NewNopLogger(), cfg, "test-bucket")
bkt, err := NewBucketWithConfig(context.Background(), log.NewNopLogger(), cfg, "test-bucket", http.DefaultTransport)
testutil.Ok(t, err)

// Check if the bucket is created.
Expand Down
27 changes: 19 additions & 8 deletions providers/obs/obs.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"io"
"math"
"net/http"
"os"
"strings"
"testing"
Expand Down Expand Up @@ -74,13 +75,13 @@ type Bucket struct {
name string
}

func NewBucket(logger log.Logger, conf []byte) (*Bucket, error) {
func NewBucket(logger log.Logger, conf []byte, rt http.RoundTripper) (*Bucket, error) {
config, err := parseConfig(conf)
if err != nil {
return nil, errors.Wrap(err, "parsing cos configuration")
}

return NewBucketWithConfig(logger, config)
return NewBucketWithConfig(logger, config, rt)
}

func parseConfig(conf []byte) (Config, error) {
Expand All @@ -92,17 +93,27 @@ func parseConfig(conf []byte) (Config, error) {
return config, nil
}

func NewBucketWithConfig(logger log.Logger, config Config) (*Bucket, error) {
func NewBucketWithConfig(logger log.Logger, config Config, rt http.RoundTripper) (*Bucket, error) {
if err := config.validate(); err != nil {
return nil, errors.Wrap(err, "validate obs config err")
}
var tpt *http.Transport
var err error

rt, err := exthttp.DefaultTransport(config.HTTPConfig)
if err != nil {
return nil, errors.Wrap(err, "get http transport err")
if rt != nil {
var ok bool
tpt, ok = rt.(*http.Transport)
if !ok {
return nil, errors.New("provided RoundTripper is not an *http.Transport")
}
} else {
tpt, err = exthttp.DefaultTransport(config.HTTPConfig)
if err != nil {
return nil, errors.Wrap(err, "get http transport err")
}
}

client, err := obs.New(config.AccessKey, config.SecretKey, config.Endpoint, obs.WithHttpTransport(rt))
client, err := obs.New(config.AccessKey, config.SecretKey, config.Endpoint, obs.WithHttpTransport(tpt))
if err != nil {
return nil, errors.Wrap(err, "initialize obs client err")
}
Expand Down Expand Up @@ -369,7 +380,7 @@ func NewTestBucketFromConfig(t testing.TB, c Config, reuseBucket bool, location
if err != nil {
return nil, nil, err
}
b, err := NewBucket(log.NewNopLogger(), bc)
b, err := NewBucket(log.NewNopLogger(), bc, http.DefaultTransport)
if err != nil {
return nil, nil, err
}
Expand Down
Loading
Loading