From c4681d606dba197cde323375846334f1562c9843 Mon Sep 17 00:00:00 2001 From: Matheus Pimenta Date: Fri, 2 Aug 2024 13:29:34 +0100 Subject: [PATCH] Add proxy support for GCS buckets Signed-off-by: Matheus Pimenta --- api/v1beta2/bucket_types.go | 2 +- .../source.toolkit.fluxcd.io_buckets.yaml | 2 +- docs/api/v1beta2/source.md | 4 +- docs/spec/v1beta2/buckets.md | 2 +- go.mod | 2 +- internal/controller/bucket_controller.go | 21 ++-- internal/controller/bucket_controller_test.go | 51 ++++++++- pkg/gcp/gcp.go | 100 +++++++++++++++--- pkg/gcp/gcp_test.go | 100 ++++++++++++++++-- tests/listener/listener.go | 43 ++++++++ tests/proxy/proxy.go | 45 ++++++++ 11 files changed, 337 insertions(+), 35 deletions(-) create mode 100644 tests/listener/listener.go create mode 100644 tests/proxy/proxy.go diff --git a/api/v1beta2/bucket_types.go b/api/v1beta2/bucket_types.go index 928a61373..d30417fba 100644 --- a/api/v1beta2/bucket_types.go +++ b/api/v1beta2/bucket_types.go @@ -103,7 +103,7 @@ type BucketSpec struct { // ProxySecretRef specifies the Secret containing the proxy configuration // to use while communicating with the Bucket server. // - // Only supported for the generic provider. + // Only supported for the `generic` and `gcp` providers. // +optional ProxySecretRef *meta.LocalObjectReference `json:"proxySecretRef,omitempty"` diff --git a/config/crd/bases/source.toolkit.fluxcd.io_buckets.yaml b/config/crd/bases/source.toolkit.fluxcd.io_buckets.yaml index 5411f06b0..cc3358890 100644 --- a/config/crd/bases/source.toolkit.fluxcd.io_buckets.yaml +++ b/config/crd/bases/source.toolkit.fluxcd.io_buckets.yaml @@ -397,7 +397,7 @@ spec: to use while communicating with the Bucket server. - Only supported for the generic provider. + Only supported for the `generic` and `gcp` providers. properties: name: description: Name of the referent. diff --git a/docs/api/v1beta2/source.md b/docs/api/v1beta2/source.md index 451d83611..dd02a3992 100644 --- a/docs/api/v1beta2/source.md +++ b/docs/api/v1beta2/source.md @@ -202,7 +202,7 @@ github.com/fluxcd/pkg/apis/meta.LocalObjectReference (Optional)

ProxySecretRef specifies the Secret containing the proxy configuration to use while communicating with the Bucket server.

-

Only supported for the generic provider.

+

Only supported for the generic and gcp providers.

@@ -1568,7 +1568,7 @@ github.com/fluxcd/pkg/apis/meta.LocalObjectReference (Optional)

ProxySecretRef specifies the Secret containing the proxy configuration to use while communicating with the Bucket server.

-

Only supported for the generic provider.

+

Only supported for the generic and gcp providers.

diff --git a/docs/spec/v1beta2/buckets.md b/docs/spec/v1beta2/buckets.md index 630f9f5e5..da51a56e3 100644 --- a/docs/spec/v1beta2/buckets.md +++ b/docs/spec/v1beta2/buckets.md @@ -837,7 +837,7 @@ The Secret can contain three keys: - `password`, to specify the password to use if the proxy server is protected by basic authentication. This is an optional key. -This API is only supported for the `generic` [provider](#provider). +This API is only supported for the `generic` and `gcp` [providers](#provider). Example: diff --git a/go.mod b/go.mod index b8330eb4a..a9de17dd8 100644 --- a/go.mod +++ b/go.mod @@ -60,6 +60,7 @@ require ( github.com/sirupsen/logrus v1.9.3 github.com/spf13/pflag v1.0.5 golang.org/x/crypto v0.22.0 + golang.org/x/oauth2 v0.19.0 golang.org/x/sync v0.7.0 google.golang.org/api v0.177.0 gotest.tools v2.2.0+incompatible @@ -360,7 +361,6 @@ require ( golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f // indirect golang.org/x/mod v0.17.0 // indirect golang.org/x/net v0.24.0 // indirect - golang.org/x/oauth2 v0.19.0 // indirect golang.org/x/sys v0.19.0 // indirect golang.org/x/term v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/internal/controller/bucket_controller.go b/internal/controller/bucket_controller.go index 656e5d704..9934a7a11 100644 --- a/internal/controller/bucket_controller.go +++ b/internal/controller/bucket_controller.go @@ -431,6 +431,12 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.Serial // Return error as the world as observed may change return sreconcile.ResultEmpty, e } + proxyURL, err := r.getProxyURL(ctx, obj) + if err != nil { + e := serror.NewGeneric(err, sourcev1.AuthenticationFailedReason) + conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) + return sreconcile.ResultEmpty, e + } // Construct provider client var provider BucketProvider @@ -441,7 +447,14 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.Serial conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, "%s", e) return sreconcile.ResultEmpty, e } - if provider, err = gcp.NewClient(ctx, secret); err != nil { + var opts []gcp.Option + if secret != nil { + opts = append(opts, gcp.WithSecret(secret)) + } + if proxyURL != nil { + opts = append(opts, gcp.WithProxyURL(proxyURL)) + } + if provider, err = gcp.NewClient(ctx, opts...); err != nil { e := serror.NewGeneric(err, "ClientError") conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, "%s", e) return sreconcile.ResultEmpty, e @@ -469,12 +482,6 @@ func (r *BucketReconciler) reconcileSource(ctx context.Context, sp *patch.Serial conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, "%s", e) return sreconcile.ResultEmpty, e } - proxyURL, err := r.getProxyURL(ctx, obj) - if err != nil { - e := serror.NewGeneric(err, sourcev1.AuthenticationFailedReason) - conditions.MarkTrue(obj, sourcev1.FetchFailedCondition, e.Reason, e.Error()) - return sreconcile.ResultEmpty, e - } var opts []minio.Option if secret != nil { opts = append(opts, minio.WithSecret(secret)) diff --git a/internal/controller/bucket_controller_test.go b/internal/controller/bucket_controller_test.go index 11c99613f..aad937aad 100644 --- a/internal/controller/bucket_controller_test.go +++ b/internal/controller/bucket_controller_test.go @@ -445,7 +445,7 @@ func TestBucketReconciler_reconcileSource_generic(t *testing.T) { assertConditions []metav1.Condition }{ { - name: "Reconciles GCS source", + name: "Reconciles generic source", bucketName: "dummy", bucketObjects: []*s3mock.Object{ { @@ -933,6 +933,49 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), }, }, + { + name: "Observes non-existing proxySecretRef", + bucketName: "dummy", + beforeFunc: func(obj *bucketv1.Bucket) { + obj.Spec.ProxySecretRef = &meta.LocalObjectReference{ + Name: "dummy", + } + conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") + conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") + }, + want: sreconcile.ResultEmpty, + wantErr: true, + assertIndex: index.NewDigester(), + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "failed to get secret '/dummy': secrets \"dummy\" not found"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), + *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), + }, + }, + { + name: "Observes invalid proxySecretRef", + bucketName: "dummy", + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dummy", + }, + }, + beforeFunc: func(obj *bucketv1.Bucket) { + obj.Spec.ProxySecretRef = &meta.LocalObjectReference{ + Name: "dummy", + } + conditions.MarkReconciling(obj, meta.ProgressingReason, "foo") + conditions.MarkUnknown(obj, meta.ReadyCondition, "foo", "bar") + }, + want: sreconcile.ResultEmpty, + wantErr: true, + assertIndex: index.NewDigester(), + assertConditions: []metav1.Condition{ + *conditions.TrueCondition(sourcev1.FetchFailedCondition, sourcev1.AuthenticationFailedReason, "invalid proxy secret '/dummy': key 'address' is missing"), + *conditions.TrueCondition(meta.ReconcilingCondition, meta.ProgressingReason, "foo"), + *conditions.UnknownCondition(meta.ReadyCondition, "foo", "bar"), + }, + }, { name: "Observes non-existing bucket name", bucketName: "dummy", @@ -1178,7 +1221,11 @@ func TestBucketReconciler_reconcileSource_gcs(t *testing.T) { sp := patch.NewSerialPatcher(obj, r.Client) got, err := r.reconcileSource(context.TODO(), sp, obj, index, tmpDir) - g.Expect(err != nil).To(Equal(tt.wantErr)) + if tt.wantErr { + g.Expect(err).To(HaveOccurred()) + } else { + g.Expect(err).ToNot(HaveOccurred()) + } g.Expect(got).To(Equal(tt.want)) g.Expect(index.Index()).To(Equal(tt.assertIndex.Index())) diff --git a/pkg/gcp/gcp.go b/pkg/gcp/gcp.go index 77011fada..b4c422b32 100644 --- a/pkg/gcp/gcp.go +++ b/pkg/gcp/gcp.go @@ -21,13 +21,17 @@ import ( "errors" "fmt" "io" + "net/http" + "net/url" "os" "path/filepath" gcpstorage "cloud.google.com/go/storage" "github.com/go-logr/logr" + "golang.org/x/oauth2/google" "google.golang.org/api/iterator" "google.golang.org/api/option" + htransport "google.golang.org/api/transport/http" corev1 "k8s.io/api/core/v1" ctrl "sigs.k8s.io/controller-runtime" ) @@ -48,24 +52,64 @@ type GCSClient struct { *gcpstorage.Client } -// NewClient creates a new GCP storage client. The Client will automatically look for the Google Application +// Option is a functional option for configuring the GCS client. +type Option func(*options) + +// WithSecret sets the secret to use for authenticating with GCP. +func WithSecret(secret *corev1.Secret) Option { + return func(o *options) { + o.secret = secret + } +} + +// WithProxyURL sets the proxy URL to use for the GCS client. +func WithProxyURL(proxyURL *url.URL) Option { + return func(o *options) { + o.proxyURL = proxyURL + } +} + +type options struct { + secret *corev1.Secret + proxyURL *url.URL + + newHTTPClient func(context.Context, *options) (*http.Client, error) // test-only +} + +func newOptions() *options { + return &options{ + newHTTPClient: newHTTPClient, + } +} + +// NewClient creates a new GCP storage client. The Client will automatically look for the Google Application // Credential environment variable or look for the Google Application Credential file. -func NewClient(ctx context.Context, secret *corev1.Secret) (*GCSClient, error) { - c := &GCSClient{} - if secret != nil { - client, err := gcpstorage.NewClient(ctx, option.WithCredentialsJSON(secret.Data["serviceaccount"])) - if err != nil { - return nil, err - } - c.Client = client - } else { - client, err := gcpstorage.NewClient(ctx) +func NewClient(ctx context.Context, opts ...Option) (*GCSClient, error) { + o := newOptions() + for _, opt := range opts { + opt(o) + } + secret := o.secret + proxyURL := o.proxyURL + + var gcpOpts []option.ClientOption + switch { + case proxyURL != nil: + httpClient, err := o.newHTTPClient(ctx, o) if err != nil { return nil, err } - c.Client = client + gcpOpts = append(gcpOpts, option.WithHTTPClient(httpClient)) + case secret != nil: + gcpOpts = append(gcpOpts, option.WithCredentialsJSON(secret.Data["serviceaccount"])) } - return c, nil + + client, err := gcpstorage.NewClient(ctx, gcpOpts...) + if err != nil { + return nil, err + } + + return &GCSClient{client}, nil } // ValidateSecret validates the credential secret. The provided Secret may @@ -197,3 +241,33 @@ func (c *GCSClient) Close(ctx context.Context) { func (c *GCSClient) ObjectIsNotFound(err error) bool { return errors.Is(err, gcpstorage.ErrObjectNotExist) } + +// newHTTPClient creates a new HTTP client for interacting with Google Cloud APIs. +func newHTTPClient(ctx context.Context, o *options) (*http.Client, error) { + secret := o.secret + proxyURL := o.proxyURL + + var creds *google.Credentials + var err error + if secret != nil { + creds, err = google.CredentialsFromJSON(ctx, secret.Data["serviceaccount"], gcpstorage.ScopeReadOnly) + if err != nil { + return nil, fmt.Errorf("failed to create Google credentials from secret: %w", err) + } + } else { + creds, err = google.FindDefaultCredentials(ctx, gcpstorage.ScopeReadOnly) + if err != nil { + return nil, fmt.Errorf("failed to find Google default credentials: %w", err) + } + } + + baseTransport := http.DefaultTransport.(*http.Transport).Clone() + if proxyURL != nil { + baseTransport.Proxy = http.ProxyURL(proxyURL) + } + transport, err := htransport.NewTransport(ctx, baseTransport, option.WithCredentials(creds)) + if err != nil { + return nil, fmt.Errorf("failed to create Google HTTP transport: %w", err) + } + return &http.Client{Transport: transport}, nil +} diff --git a/pkg/gcp/gcp_test.go b/pkg/gcp/gcp_test.go index 9ccf0c645..3db20978a 100644 --- a/pkg/gcp/gcp_test.go +++ b/pkg/gcp/gcp_test.go @@ -26,19 +26,20 @@ import ( "net" "net/http" "net/http/httptest" + "net/url" "os" "path/filepath" "testing" "time" gcpstorage "cloud.google.com/go/storage" + testproxy "github.com/fluxcd/source-controller/tests/proxy" "google.golang.org/api/googleapi" + "google.golang.org/api/option" raw "google.golang.org/api/storage/v1" "gotest.tools/assert" corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "google.golang.org/api/option" ) const ( @@ -46,10 +47,12 @@ const ( objectName string = "test.yaml" objectGeneration int64 = 3 objectEtag string = "bFbHCDvedeecefdgmfmhfuRxBdcedGe96S82XJOAXxjJpk=" + envGCSHost string = "STORAGE_EMULATOR_HOST" ) var ( hc *http.Client + host string client *gcpstorage.Client close func() err error @@ -76,7 +79,7 @@ var ( ) func TestMain(m *testing.M) { - hc, close = newTestServer(func(w http.ResponseWriter, r *http.Request) { + hc, host, close = newTestServer(func(w http.ResponseWriter, r *http.Request) { io.Copy(io.Discard, r.Body) switch r.RequestURI { case fmt.Sprintf("/storage/v1/b/%s?alt=json&prettyPrint=false&projection=full", bucketName): @@ -140,12 +143,94 @@ func TestMain(m *testing.M) { } func TestNewClientWithSecretErr(t *testing.T) { - gcpClient, err := NewClient(context.Background(), secret.DeepCopy()) + gcpClient, err := NewClient(context.Background(), WithSecret(secret.DeepCopy())) t.Log(err) assert.Error(t, err, "dialing: invalid character 'e' looking for beginning of value") assert.Assert(t, gcpClient == nil) } +func TestNewClientWithProxyErr(t *testing.T) { + tests := []struct { + name string + opts []Option + err string + }{ + { + name: "invalid secret", + opts: []Option{WithSecret(secret.DeepCopy())}, + err: "failed to create Google credentials from secret: invalid character 'e' looking for beginning of value", + }, + { + name: "attempts default credentials", + err: "failed to find Google default credentials: google: could not find default credentials. See https://cloud.google.com/docs/authentication/external/set-up-adc for more information", + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + opts := append([]Option{WithProxyURL(&url.URL{})}, tt.opts...) + gcpClient, err := NewClient(context.Background(), opts...) + assert.Error(t, err, tt.err) + assert.Assert(t, gcpClient == nil) + }) + } +} + +func TestProxy(t *testing.T) { + proxyAddr, proxyPort := testproxy.New(t) + + err := os.Setenv(envGCSHost, fmt.Sprintf("https://%s", host)) + assert.NilError(t, err) + defer func() { + err := os.Unsetenv(envGCSHost) + assert.NilError(t, err) + }() + + tests := []struct { + name string + proxyURL *url.URL + err string + }{ + { + name: "valid proxy", + proxyURL: &url.URL{Scheme: "http", Host: proxyAddr}, + }, + { + name: "invalid proxy", + proxyURL: &url.URL{Scheme: "http", Host: fmt.Sprintf("localhost:%d", proxyPort+1)}, + err: "connection refused", + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + opts := []Option{WithProxyURL(tt.proxyURL)} + opts = append(opts, func(o *options) { + o.newHTTPClient = func(ctx context.Context, o *options) (*http.Client, error) { + transport := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + Proxy: http.ProxyURL(tt.proxyURL), + } + return &http.Client{Transport: transport}, nil + } + }) + gcpClient, err := NewClient(context.Background(), opts...) + assert.NilError(t, err) + assert.Assert(t, gcpClient != nil) + gcpClient.Client.SetRetry(gcpstorage.WithMaxAttempts(1)) + exists, err := gcpClient.BucketExists(context.Background(), bucketName) + if tt.err != "" { + assert.ErrorContains(t, err, tt.err) + } else { + assert.NilError(t, err) + assert.Assert(t, exists) + } + }) + } +} + func TestBucketExists(t *testing.T) { gcpClient := &GCSClient{ Client: client, @@ -272,16 +357,17 @@ func TestValidateSecret(t *testing.T) { } } -func newTestServer(handler func(w http.ResponseWriter, r *http.Request)) (*http.Client, func()) { +func newTestServer(handler func(w http.ResponseWriter, r *http.Request)) (*http.Client, string, func()) { ts := httptest.NewTLSServer(http.HandlerFunc(handler)) + host := ts.Listener.Addr().String() tlsConf := &tls.Config{InsecureSkipVerify: true} tr := &http.Transport{ TLSClientConfig: tlsConf, DialTLS: func(netw, addr string) (net.Conn, error) { - return tls.Dial("tcp", ts.Listener.Addr().String(), tlsConf) + return tls.Dial("tcp", host, tlsConf) }, } - return &http.Client{Transport: tr}, func() { + return &http.Client{Transport: tr}, host, func() { tr.CloseIdleConnections() ts.Close() } diff --git a/tests/listener/listener.go b/tests/listener/listener.go new file mode 100644 index 000000000..822880f98 --- /dev/null +++ b/tests/listener/listener.go @@ -0,0 +1,43 @@ +/* +Copyright 2024 The Flux authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testlistener + +import ( + "net" + "strconv" + "strings" + "testing" + + "gotest.tools/assert" +) + +// New creates a TCP listener on a random port and returns +// the listener, the address and the port of this listener. +// It also registers a cleanup function to close the listener +// when the test ends. +func New(t *testing.T) (net.Listener, string, int) { + t.Helper() + + lis, err := net.Listen("tcp", ":0") + assert.NilError(t, err) + t.Cleanup(func() { lis.Close() }) + + addr := lis.Addr().String() + addrParts := strings.Split(addr, ":") + portStr := addrParts[len(addrParts)-1] + port, err := strconv.Atoi(portStr) + assert.NilError(t, err) + + return lis, addr, port +} diff --git a/tests/proxy/proxy.go b/tests/proxy/proxy.go new file mode 100644 index 000000000..64d35c2dc --- /dev/null +++ b/tests/proxy/proxy.go @@ -0,0 +1,45 @@ +/* +Copyright 2024 The Flux authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testproxy + +import ( + "net/http" + "testing" + + "github.com/elazarl/goproxy" + + testlistener "github.com/fluxcd/source-controller/tests/listener" +) + +// New creates a new goproxy server on a random port and returns +// the address and the port of this server. It also registers a +// cleanup functions to close the server and the listener when +// the test ends. +func New(t *testing.T) (string, int) { + t.Helper() + + lis, addr, port := testlistener.New(t) + + handler := goproxy.NewProxyHttpServer() + handler.Verbose = true + + server := &http.Server{ + Addr: addr, + Handler: handler, + } + go server.Serve(lis) + t.Cleanup(func() { server.Close() }) + + return addr, port +}