Skip to content

Commit

Permalink
added rt
Browse files Browse the repository at this point in the history
Signed-off-by: Vanshikav123 <[email protected]>
  • Loading branch information
Vanshikav123 committed May 24, 2024
1 parent 1cffe47 commit 540c1e5
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 37 deletions.
6 changes: 3 additions & 3 deletions providers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,16 +153,16 @@ func NewBucket(logger log.Logger, azureConfig []byte, component string, rt http.
if conf.MSIResource != "" {
level.Warn(logger).Log("msg", "The field msi_resource has been deprecated and should no longer be set")
}
return NewBucketWithConfig(logger, conf, component)
return NewBucketWithConfig(logger, conf, component, rt)
}

// NewBucketWithConfig returns a new Bucket using the provided Azure config struct.
func NewBucketWithConfig(logger log.Logger, conf Config, component string) (*Bucket, error) {
func NewBucketWithConfig(logger log.Logger, conf Config, component string, rt http.RoundTripper) (*Bucket, error) {
if err := conf.validate(); err != nil {
return nil, err
}

containerClient, err := getContainerClient(conf)
containerClient, err := getContainerClient(conf, rt)
if err != nil {
return nil, err
}
Expand Down
16 changes: 12 additions & 4 deletions providers/azure/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,19 @@ import (
// DirDelim is the delimiter used to model a directory structure in an object store bucket.
const DirDelim = "/"

func getContainerClient(conf Config) (*container.Client, error) {
dt, err := exthttp.DefaultTransport(conf.HTTPConfig)
if err != nil {
return nil, err
func getContainerClient(conf Config, rt http.RoundTripper) (*container.Client, error) {
var dt http.RoundTripper
var err error

if rt != nil {
dt = rt
} else {
dt, err = exthttp.DefaultTransport(conf.HTTPConfig)
if err != nil {
return nil, err
}
}

opt := &container.ClientOptions{
ClientOptions: azcore.ClientOptions{
Retry: policy.RetryOptions{
Expand Down
11 changes: 8 additions & 3 deletions providers/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,11 @@ func NewBucket(logger log.Logger, conf []byte, component string, rt http.RoundTr
return nil, errors.Wrap(err, "parsing cos configuration")
}

return NewBucketWithConfig(logger, config, component)
return NewBucketWithConfig(logger, config, component, rt)
}

// NewBucketWithConfig returns a new Bucket using the provided cos config values.
func NewBucketWithConfig(logger log.Logger, config Config, component string) (*Bucket, error) {
func NewBucketWithConfig(logger log.Logger, config Config, component string, rt http.RoundTripper) (*Bucket, error) {
if err := config.validate(); err != nil {
return nil, errors.Wrap(err, "validate cos configuration")
}
Expand All @@ -128,7 +128,12 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B
}
}
b := &cos.BaseURL{BucketURL: bucketURL}
tpt, _ := exthttp.DefaultTransport(config.HTTPConfig)
var tpt http.RoundTripper
if rt != nil {
tpt = rt
} else {
tpt, _ = exthttp.DefaultTransport(config.HTTPConfig)
}
client := cos.NewClient(b, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: config.SecretId,
Expand Down
19 changes: 9 additions & 10 deletions providers/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component st
if err != nil {
return nil, err
}
return NewBucketWithConfig(ctx, logger, config, component)
return NewBucketWithConfig(ctx, logger, config, component, rt)
}

// NewBucketWithConfig returns a new Bucket with gcs Config struct.
func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string) (*Bucket, error) {
func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string, rt http.RoundTripper) (*Bucket, error) {
if gc.Bucket == "" {
return nil, errors.New("missing Google Cloud Storage bucket name for stored blocks")
}
Expand All @@ -103,7 +103,7 @@ func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, comp

if !gc.UseGRPC {
var err error
opts, err = appendHttpOptions(gc, opts)
opts, err = appendHttpOptions(gc, opts, rt)
if err != nil {
return nil, err
}
Expand All @@ -112,25 +112,24 @@ func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, comp
return newBucket(ctx, logger, gc, opts)
}

func appendHttpOptions(gc Config, opts []option.ClientOption) ([]option.ClientOption, error) {
func appendHttpOptions(gc Config, opts []option.ClientOption, rt http.RoundTripper) ([]option.ClientOption, error) {
// Check if a roundtripper has been set in the config
// otherwise build the default transport.
var rt http.RoundTripper
if gc.HTTPConfig.Transport != nil {
rt = gc.HTTPConfig.Transport
var tpt http.RoundTripper
if rt != nil {
tpt = rt
} else {
var err error
rt, err = exthttp.DefaultTransport(gc.HTTPConfig)
tpt, err = exthttp.DefaultTransport(gc.HTTPConfig)
if err != nil {
return nil, err
}
}

// GCS uses some defaults when "options.WithHTTPClient" is not used that are important when we call
// htransport.NewTransport namely the scopes that are then used for OAth authentication. So to build our own
// http client we need to se those defaults
opts = append(opts, option.WithScopes(storage.ScopeFullControl, "https://www.googleapis.com/auth/cloud-platform"))
gRT, err := htransport.NewTransport(context.Background(), rt, opts...)
gRT, err := htransport.NewTransport(context.Background(), tpt, opts...)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion providers/gcs/gcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestNewBucketWithConfig_ShouldCreateGRPC(t *testing.T) {
err = os.Setenv("STORAGE_EMULATOR_HOST_GRPC", svr.Addr)
testutil.Ok(t, err)

bkt, err := NewBucketWithConfig(context.Background(), log.NewNopLogger(), cfg, "test-bucket")
bkt, err := NewBucketWithConfig(context.Background(), log.NewNopLogger(), cfg, "test-bucket", http.DefaultTransport)
testutil.Ok(t, err)

// Check if the bucket is created.
Expand Down
9 changes: 8 additions & 1 deletion providers/oci/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,15 @@ func NewBucket(logger log.Logger, ociConfig []byte, rt http.RoundTripper) (*Buck
return nil, errors.Wrapf(err, "unable to create ObjectStorage client with the given oci configurations")
}

var tpt http.RoundTripper
if rt != nil {
tpt = rt
} else {
tpt = CustomTransport(config)
}

httpClient := http.Client{
Transport: CustomTransport(config),
Transport: tpt,
Timeout: config.HTTPConfig.ClientTimeout,
}
client.HTTPClient = &httpClient
Expand Down
14 changes: 7 additions & 7 deletions providers/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func NewBucket(logger log.Logger, conf []byte, component string, rt http.RoundTr
return nil, err
}

return NewBucketWithConfig(logger, config, component)
return NewBucketWithConfig(logger, config, component, rt)
}

type overrideSignerType struct {
Expand All @@ -201,7 +201,7 @@ func (s *overrideSignerType) Retrieve() (credentials.Value, error) {
}

// NewBucketWithConfig returns a new Bucket using the provided s3 config values.
func NewBucketWithConfig(logger log.Logger, config Config, component string) (*Bucket, error) {
func NewBucketWithConfig(logger log.Logger, config Config, component string, rt http.RoundTripper) (*Bucket, error) {
var chain []credentials.Provider

// TODO(bwplotka): Don't do flags as they won't scale, use actual params like v2, v4 instead
Expand Down Expand Up @@ -244,12 +244,12 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B

// Check if a roundtripper has been set in the config
// otherwise build the default transport.
var rt http.RoundTripper
if config.HTTPConfig.Transport != nil {
rt = config.HTTPConfig.Transport
var tpt http.RoundTripper
if rt != nil {
tpt = rt
} else {
var err error
rt, err = exthttp.DefaultTransport(config.HTTPConfig)
tpt, err = exthttp.DefaultTransport(config.HTTPConfig)
if err != nil {
return nil, err
}
Expand All @@ -259,7 +259,7 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B
Creds: credentials.NewChainCredentials(chain),
Secure: !config.Insecure,
Region: config.Region,
Transport: rt,
Transport: tpt,
BucketLookup: config.BucketLookupType.MinioType(),
})
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions providers/s3/s3_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package s3_test
import (
"bytes"
"context"
"net/http"
"strings"
"testing"

Expand Down Expand Up @@ -37,6 +38,7 @@ func BenchmarkUpload(b *testing.B) {
log.NewNopLogger(),
e2ethanos.NewS3Config(bucket, m.Endpoint("https"), m.Dir()),
"test-feed",
http.DefaultTransport,
)
testutil.Ok(b, err)

Expand Down
16 changes: 8 additions & 8 deletions providers/s3/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func TestBucket_getServerSideEncryption(t *testing.T) {
// Default config should return no SSE config.
cfg := DefaultConfig
cfg.Endpoint = endpoint
bkt, err := NewBucketWithConfig(log.NewNopLogger(), cfg, "test")
bkt, err := NewBucketWithConfig(log.NewNopLogger(), cfg, "test", http.DefaultTransport)
testutil.Ok(t, err)

sse, err := bkt.getServerSideEncryption(context.Background())
Expand All @@ -335,7 +335,7 @@ func TestBucket_getServerSideEncryption(t *testing.T) {
cfg = DefaultConfig
cfg.Endpoint = endpoint
cfg.SSEConfig = SSEConfig{Type: SSES3}
bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test")
bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test", http.DefaultTransport)
testutil.Ok(t, err)

sse, err = bkt.getServerSideEncryption(context.Background())
Expand All @@ -351,7 +351,7 @@ func TestBucket_getServerSideEncryption(t *testing.T) {
Type: SSEKMS,
KMSKeyID: "key",
}
bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test")
bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test", http.DefaultTransport)
testutil.Ok(t, err)

sse, err = bkt.getServerSideEncryption(context.Background())
Expand All @@ -375,7 +375,7 @@ func TestBucket_getServerSideEncryption(t *testing.T) {
KMSKeyID: "key",
KMSEncryptionContext: map[string]string{"foo": "bar"},
}
bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test")
bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test", http.DefaultTransport)
testutil.Ok(t, err)

sse, err = bkt.getServerSideEncryption(context.Background())
Expand All @@ -396,7 +396,7 @@ func TestBucket_getServerSideEncryption(t *testing.T) {
override, err := encrypt.NewSSEKMS("test", nil)
testutil.Ok(t, err)

bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test")
bkt, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test", http.DefaultTransport)
testutil.Ok(t, err)

sse, err = bkt.getServerSideEncryption(context.WithValue(context.Background(), sseConfigKey, override))
Expand All @@ -423,7 +423,7 @@ func TestBucket_Get_ShouldReturnErrorIfServerTruncateResponse(t *testing.T) {
cfg.AccessKey = "test"
cfg.SecretKey = "test"

bkt, err := NewBucketWithConfig(log.NewNopLogger(), cfg, "test")
bkt, err := NewBucketWithConfig(log.NewNopLogger(), cfg, "test", http.DefaultTransport)
testutil.Ok(t, err)

reader, err := bkt.Get(context.Background(), "test")
Expand All @@ -448,7 +448,7 @@ func TestParseConfig_CustomStorageClass(t *testing.T) {
cfg.Endpoint = endpoint
storageClass := "STANDARD_IA"
cfg.PutUserMetadata[testCase.storageClassKey] = storageClass
bkt, err := NewBucketWithConfig(log.NewNopLogger(), cfg, "test")
bkt, err := NewBucketWithConfig(log.NewNopLogger(), cfg, "test", http.DefaultTransport)
testutil.Ok(t, err)
testutil.Equals(t, storageClass, bkt.storageClass)
})
Expand All @@ -458,7 +458,7 @@ func TestParseConfig_CustomStorageClass(t *testing.T) {
func TestParseConfig_DefaultStorageClassIsZero(t *testing.T) {
cfg := DefaultConfig
cfg.Endpoint = endpoint
bkt, err := NewBucketWithConfig(log.NewNopLogger(), cfg, "test")
bkt, err := NewBucketWithConfig(log.NewNopLogger(), cfg, "test", http.DefaultTransport)
testutil.Ok(t, err)
testutil.Equals(t, "", bkt.storageClass)
}

0 comments on commit 540c1e5

Please sign in to comment.