From 444809a1c9aa47c1674970d199312229ca38a38b Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Fri, 29 Nov 2024 09:26:45 +0545 Subject: [PATCH 1/5] feat: extend kubernetes connection * support sourcing rest config from EKS via aws creds --- connection/environment.go | 8 +- connection/kubernetes.go | 133 +++++++++++++++++++++++++--- connection/zz_generated.deepcopy.go | 27 +++++- go.mod | 10 ++- go.sum | 22 +++-- 5 files changed, 175 insertions(+), 25 deletions(-) diff --git a/connection/environment.go b/connection/environment.go index 9e3e9037..9419938e 100644 --- a/connection/environment.go +++ b/connection/environment.go @@ -108,8 +108,10 @@ func SetupConnection(ctx context.Context, connections ExecConnections, cmd *osEx return nil, err } - if err := connections.Kubernetes.Populate(ctx.WithNamespace(scraperNamespace)); err != nil { + if clientSet, restConfig, err := connections.Kubernetes.Populate(ctx.WithNamespace(scraperNamespace)); err != nil { return nil, fmt.Errorf("failed to hydrate kubernetes connection: %w", err) + } else { + ctx = ctx.WithKubernetes(clientSet, restConfig) } break @@ -120,8 +122,10 @@ func SetupConnection(ctx context.Context, connections ExecConnections, cmd *osEx if connections.Kubernetes != nil { if lo.FromPtr(connections.FromConfigItem) == "" { - if err := connections.Kubernetes.Populate(ctx); err != nil { + if clientSet, restConfig, err := connections.Kubernetes.Populate(ctx); err != nil { return nil, fmt.Errorf("failed to hydrate kubernetes connection: %w", err) + } else { + ctx = ctx.WithKubernetes(clientSet, restConfig) } } diff --git a/connection/kubernetes.go b/connection/kubernetes.go index e403014d..a50be40f 100644 --- a/connection/kubernetes.go +++ b/connection/kubernetes.go @@ -1,16 +1,42 @@ package connection import ( + gocontext "context" + "encoding/base64" "fmt" + "net/http" + "time" + "github.com/aws/aws-sdk-go-v2/aws" + signerv4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" + "github.com/aws/aws-sdk-go-v2/service/eks" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + + "github.com/flanksource/duty/context" + dutyKubernetes "github.com/flanksource/duty/kubernetes" "github.com/flanksource/duty/models" "github.com/flanksource/duty/types" ) +const ( + clusterIDHeader = "x-k8s-aws-id" + emptyStringSha256 = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + v1Prefix = "k8s-aws-v1." +) + +// +kubebuilder:object:generate=true +type EKSConnection struct { + AWSConnection `json:",inline" yaml:",inline"` + + Cluster string `json:"cluster"` +} + // +kubebuilder:object:generate=true type KubernetesConnection struct { - ConnectionName string `json:"connection,omitempty"` - KubeConfig types.EnvVar `json:"kubeconfig,omitempty"` + ConnectionName string `json:"connection,omitempty"` + KubeConfig *types.EnvVar `json:"kubeconfig,omitempty"` + EKS *EKSConnection `json:"eks,omitempty"` } func (t KubernetesConnection) ToModel() models.Connection { @@ -20,23 +46,110 @@ func (t KubernetesConnection) ToModel() models.Connection { } } -func (t *KubernetesConnection) Populate(ctx ConnectionContext) error { +func (t *KubernetesConnection) Populate(ctx context.Context) (kubernetes.Interface, *rest.Config, error) { if t.ConnectionName != "" { connection, err := ctx.HydrateConnectionByURL(t.ConnectionName) if err != nil { - return err + return nil, nil, err } else if connection == nil { - return fmt.Errorf("connection[%s] not found", t.ConnectionName) + return nil, nil, fmt.Errorf("connection[%s] not found", t.ConnectionName) } t.KubeConfig.ValueStatic = connection.Certificate } - if v, err := ctx.GetEnvValueFromCache(t.KubeConfig, ctx.GetNamespace()); err != nil { - return err - } else { - t.KubeConfig.ValueStatic = v + if t.KubeConfig != nil { + if v, err := ctx.GetEnvValueFromCache(*t.KubeConfig, ctx.GetNamespace()); err != nil { + return nil, nil, err + } else { + t.KubeConfig.ValueStatic = v + } + + return dutyKubernetes.NewClientFromPathOrConfig(ctx.Logger, t.KubeConfig.ValueStatic) + } + + if t.EKS != nil { + if err := t.EKS.Populate(ctx); err != nil { + return nil, nil, err + } + + awsConfig, err := t.EKS.AWSConnection.Client(ctx) + if err != nil { + return nil, nil, err + } + + eksEndpoint, ca, err := eksClusterDetails(ctx, t.EKS.Cluster, awsConfig) + if err != nil { + return nil, nil, err + } + + token, err := getEKSToken(ctx, t.EKS.Cluster, awsConfig) + if err != nil { + return nil, nil, fmt.Errorf("failed to get token for EKS: %w", err) + } + + restConfig := &rest.Config{ + Host: eksEndpoint, + BearerToken: token, + TLSClientConfig: rest.TLSClientConfig{ + CAData: ca, + }, + } + + clientset, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return nil, nil, err + } + + return clientset, restConfig, nil + } + + return nil, nil, nil +} + +func eksClusterDetails(ctx gocontext.Context, clusterName string, conf aws.Config) (string, []byte, error) { + eksClient := eks.NewFromConfig(conf) + cluster, err := eksClient.DescribeCluster(ctx, &eks.DescribeClusterInput{Name: &clusterName}) + if err != nil { + return "", nil, fmt.Errorf("unable to get cluster info: %v", err) + } + + ca, err := base64.URLEncoding.DecodeString(*cluster.Cluster.CertificateAuthority.Data) + if err != nil { + return "", nil, fmt.Errorf("unable to presign URL: %v", err) + } + + return *cluster.Cluster.Endpoint, ca, nil +} + +func getEKSToken(ctx gocontext.Context, cluster string, conf aws.Config) (string, error) { + cred, err := conf.Credentials.Retrieve(ctx) + if err != nil { + return "", fmt.Errorf("failed to retrive credentials from aws config: %w", err) + } + + signedURI, err := getSignedURI(ctx, cluster, cred) + if err != nil { + return "", fmt.Errorf("failed to get signed URI: %w", err) + } + + token := v1Prefix + base64.RawURLEncoding.EncodeToString([]byte(signedURI)) + return token, nil +} + +func getSignedURI(ctx gocontext.Context, cluster string, cred aws.Credentials) (string, error) { + request, err := http.NewRequest(http.MethodGet, "https://sts.amazonaws.com/?Action=GetCallerIdentity&Version=2011-06-15", nil) + if err != nil { + return "", err + } + + request.Header.Add(clusterIDHeader, cluster) + request.Header.Add("X-Amz-Expires", "0") + signer := signerv4.NewSigner() + signedURI, _, err := signer.PresignHTTP(ctx, cred, request, emptyStringSha256, "sts", "us-east-1", time.Now()) + if err != nil { + return "", err } - return nil + return signedURI, nil } diff --git a/connection/zz_generated.deepcopy.go b/connection/zz_generated.deepcopy.go index e71332d1..eabf0f8f 100644 --- a/connection/zz_generated.deepcopy.go +++ b/connection/zz_generated.deepcopy.go @@ -51,6 +51,22 @@ func (in *AzureConnection) DeepCopy() *AzureConnection { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *EKSConnection) DeepCopyInto(out *EKSConnection) { + *out = *in + in.AWSConnection.DeepCopyInto(&out.AWSConnection) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new EKSConnection. +func (in *EKSConnection) DeepCopy() *EKSConnection { + if in == nil { + return nil + } + out := new(EKSConnection) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ExecConnections) DeepCopyInto(out *ExecConnections) { *out = *in @@ -184,7 +200,16 @@ func (in *HTTPConnection) DeepCopy() *HTTPConnection { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *KubernetesConnection) DeepCopyInto(out *KubernetesConnection) { *out = *in - in.KubeConfig.DeepCopyInto(&out.KubeConfig) + if in.KubeConfig != nil { + in, out := &in.KubeConfig, &out.KubeConfig + *out = new(types.EnvVar) + (*in).DeepCopyInto(*out) + } + if in.EKS != nil { + in, out := &in.EKS, &out.EKS + *out = new(EKSConnection) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KubernetesConnection. diff --git a/go.mod b/go.mod index da1fa05f..cc5c3b2a 100644 --- a/go.mod +++ b/go.mod @@ -12,9 +12,10 @@ require ( github.com/TomOnTime/utfutil v0.0.0-20230223141146-125e65197b36 github.com/WinterYukky/gorm-extra-clause-plugin v0.2.0 github.com/asecurityteam/rolling v2.0.4+incompatible - github.com/aws/aws-sdk-go-v2 v1.30.4 + github.com/aws/aws-sdk-go-v2 v1.32.5 github.com/aws/aws-sdk-go-v2/config v1.27.29 github.com/aws/aws-sdk-go-v2/credentials v1.17.29 + github.com/aws/aws-sdk-go-v2/service/eks v1.52.1 github.com/aws/aws-sdk-go-v2/service/sts v1.30.5 github.com/eko/gocache/lib/v4 v4.1.6 github.com/eko/gocache/store/go_cache/v4 v4.2.2 @@ -96,8 +97,8 @@ require ( github.com/apparentlymart/go-textseg/v15 v15.0.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.4 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.24 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.24 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.10 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 // indirect @@ -107,7 +108,7 @@ require ( github.com/aws/aws-sdk-go-v2/service/s3 v1.48.0 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.22.5 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 // indirect - github.com/aws/smithy-go v1.20.4 // indirect + github.com/aws/smithy-go v1.22.1 // indirect github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bmatcuk/doublestar/v4 v4.6.1 // indirect @@ -169,6 +170,7 @@ require ( github.com/jeremywohl/flatten v0.0.0-20180923035001-588fe0d4c603 // indirect github.com/jinzhu/inflection v1.0.0 // indirect github.com/jinzhu/now v1.1.5 // indirect + github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/kevinburke/ssh_config v1.2.0 // indirect github.com/kr/fs v0.1.0 // indirect diff --git a/go.sum b/go.sum index 173875a9..09726729 100644 --- a/go.sum +++ b/go.sum @@ -668,8 +668,8 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkY github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/asecurityteam/rolling v2.0.4+incompatible h1:WOSeokINZT0IDzYGc5BVcjLlR9vPol08RvI2GAsmB0s= github.com/asecurityteam/rolling v2.0.4+incompatible/go.mod h1:2D4ba5ZfYCWrIMleUgTvc8pmLExEuvu3PDwl+vnG58Q= -github.com/aws/aws-sdk-go-v2 v1.30.4 h1:frhcagrVNrzmT95RJImMHgabt99vkXGslubDaDagTk8= -github.com/aws/aws-sdk-go-v2 v1.30.4/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0= +github.com/aws/aws-sdk-go-v2 v1.32.5 h1:U8vdWJuY7ruAkzaOdD7guwJjD06YSKmnKCJs7s3IkIo= +github.com/aws/aws-sdk-go-v2 v1.32.5/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.4 h1:OCs21ST2LrepDfD3lwlQiOqIGp6JiEUqG84GzTDoyJs= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.5.4/go.mod h1:usURWEKSNNAcAZuzRn/9ZYPT8aZQkR7xcCtunK/LkJo= github.com/aws/aws-sdk-go-v2/config v1.27.29 h1:+ZPKb3u9Up4KZWLGTtpTmC5T3XmRD1ZQ8XQjRCHUvJw= @@ -680,14 +680,16 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12 h1:yjwoSyDZF8Jth+mUk5lSPJ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12/go.mod h1:fuR57fAgMk7ot3WcNQfb6rSEn+SUffl7ri+aa8uKysI= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.15.7 h1:FnLf60PtjXp8ZOzQfhJVsqF0OtYKQZWQfqOLshh8YXg= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.15.7/go.mod h1:tDVvl8hyU6E9B8TrnNrZQEVkQlB8hjJwcgpPhgtlnNg= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 h1:TNyt/+X43KJ9IJJMjKfa3bNTiZbUP7DeCxfbTROESwY= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16/go.mod h1:2DwJF39FlNAUiX5pAc0UNeiz16lK2t7IaFcm0LFHEgc= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 h1:jYfy8UPmd+6kJW5YhY0L1/KftReOGxI/4NtVSTh9O/I= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16/go.mod h1:7ZfEPZxkW42Afq4uQB8H2E2e6ebh6mXTueEpYzjCzcs= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.24 h1:4usbeaes3yJnCFC7kfeyhkdkPtoRYPa/hTmCqMpKpLI= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.24/go.mod h1:5CI1JemjVwde8m2WG3cz23qHKPOxbpkq0HaoreEgLIY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.24 h1:N1zsICrQglfzaBnrfM0Ys00860C+QFwu6u/5+LomP+o= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.24/go.mod h1:dCn9HbJ8+K31i8IQ8EWmWj0EiIk0+vKiHNMxTTYveAg= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.10 h1:5oE2WzJE56/mVveuDZPJESKlg/00AaS2pY2QZcnxg4M= github.com/aws/aws-sdk-go-v2/internal/v4a v1.2.10/go.mod h1:FHbKWQtRBYUz4vO5WBWjzMD2by126ny5y/1EoaWoLfI= +github.com/aws/aws-sdk-go-v2/service/eks v1.52.1 h1:XqyUdJbXQxY48CbBtN9a51HoTQy/kTIwrWiruRDsydk= +github.com/aws/aws-sdk-go-v2/service/eks v1.52.1/go.mod h1:WTfZ/+I7aSMEna6iYm1Kjne9A8f1MyxXNfp6hCa1+Bk= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 h1:KypMCbLPPHEmf9DgMGw51jMj77VfGPAN2Kv4cfhlfgI= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4/go.mod h1:Vz1JQXliGcQktFTN/LN6uGppAIRoLBR2bMvIMP0gOjc= github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.2.10 h1:L0ai8WICYHozIKK+OtPzVJBugL7culcuM4E4JOpIEm8= @@ -704,8 +706,8 @@ github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 h1:SKvPgvdvmiTWoi0GAJ7AsJfO github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5/go.mod h1:20sz31hv/WsPa3HhU3hfrIet2kxM4Pe0r20eBZ20Tac= github.com/aws/aws-sdk-go-v2/service/sts v1.30.5 h1:OMsEmCyz2i89XwRwPouAJvhj81wINh+4UK+k/0Yo/q8= github.com/aws/aws-sdk-go-v2/service/sts v1.30.5/go.mod h1:vmSqFK+BVIwVpDAGZB3CoCXHzurt4qBE8lf+I/kRTh0= -github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4= -github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= +github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro= +github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk= github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -1106,6 +1108,10 @@ github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= From 872d21a9a695444acfadb96120ae03b990369eb8 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Tue, 3 Dec 2024 17:54:26 +0545 Subject: [PATCH 2/5] feat: kubernetes connection from CNRM & GKE --- connection/cnrm.go | 86 ++++++++++++++++ connection/eks.go | 112 +++++++++++++++++++++ connection/environment.go | 10 +- connection/gcp.go | 55 +++------- connection/gcs.go | 45 +++++++++ connection/gke.go | 110 ++++++++++++++++++++ connection/kubernetes.go | 150 +++++++++------------------- connection/zz_generated.deepcopy.go | 64 +++++++++++- go.mod | 2 +- 9 files changed, 479 insertions(+), 155 deletions(-) create mode 100644 connection/cnrm.go create mode 100644 connection/eks.go create mode 100644 connection/gke.go diff --git a/connection/cnrm.go b/connection/cnrm.go new file mode 100644 index 00000000..1d6f31b3 --- /dev/null +++ b/connection/cnrm.go @@ -0,0 +1,86 @@ +package connection + +import ( + "encoding/base64" + "fmt" + + "github.com/flanksource/duty/context" + dutyKube "github.com/flanksource/duty/kubernetes" + container "google.golang.org/api/container/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +// +kubebuilder:object:generate=true +type CNRMConnection struct { + GKE GKEConnection `json:"gke" yaml:"gke"` + + ClusterResource string `json:"clusterResource"` + ClusterResourceNamespace string `json:"clusterResourceNamespace"` +} + +func (t *CNRMConnection) Populate(ctx ConnectionContext) error { + return t.GKE.Populate(ctx) +} + +func (t *CNRMConnection) KubernetesClient(ctx context.Context) (kubernetes.Interface, *rest.Config, error) { + cnrmCluster, restConfig, err := t.GKE.KubernetesClient(ctx) + if err != nil { + return nil, nil, fmt.Errorf("failed to create kubernetes client for GKE: %w", err) + } + + containerResourceKubeClient, err := dutyKube.NewKubeClient(cnrmCluster, restConfig). + GetClientByGroupVersionKind("container.cnrm.cloud.google.com", "v1beta1", "ContainerCluster") + if err != nil { + return nil, nil, err + } + + obj, err := containerResourceKubeClient.Namespace(t.ClusterResourceNamespace).Get(ctx, t.ClusterResource, metav1.GetOptions{}) + if err != nil { + return nil, nil, err + } + + clusterResourceRestConfig, err := t.createRestConfigForClusteResource(ctx, obj) + if err != nil { + return nil, nil, err + } + + clientset, err := kubernetes.NewForConfig(clusterResourceRestConfig) + if err != nil { + return nil, nil, err + } + + return clientset, restConfig, nil +} + +func (t *CNRMConnection) createRestConfigForClusteResource(ctx context.Context, clusterObj *unstructured.Unstructured) (*rest.Config, error) { + endpoint, found, err := unstructured.NestedString(clusterObj.Object, "status", "endpoint") + if err != nil || !found { + return nil, fmt.Errorf("failed to extract cluster endpoint from cluster resource: %w", err) + } + + caCertB64, found, err := unstructured.NestedString(clusterObj.Object, "spec", "masterAuth", "clusterCaCertificate") + if err != nil || !found { + return nil, fmt.Errorf("failed to extract cluster CA certificate from cluster resource: %w", err) + } + + ca, err := base64.URLEncoding.DecodeString(caCertB64) + if err != nil { + return nil, fmt.Errorf("unable to decode cluster CA certificate: %w", err) + } + + token, err := t.GKE.Token(ctx, container.CloudPlatformScope) + if err != nil { + return nil, fmt.Errorf("failed to get token for gke: %w", err) + } + + return &rest.Config{ + Host: endpoint, + TLSClientConfig: rest.TLSClientConfig{ + CAData: ca, + }, + BearerToken: token.AccessToken, + }, nil +} diff --git a/connection/eks.go b/connection/eks.go new file mode 100644 index 00000000..57753e06 --- /dev/null +++ b/connection/eks.go @@ -0,0 +1,112 @@ +package connection + +import ( + gocontext "context" + "encoding/base64" + "fmt" + "net/http" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + signerv4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" + "github.com/aws/aws-sdk-go-v2/service/eks" + "github.com/flanksource/duty/context" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +const ( + clusterIDHeader = "x-k8s-aws-id" + emptyStringSha256 = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" + v1Prefix = "k8s-aws-v1." +) + +// +kubebuilder:object:generate=true +type EKSConnection struct { + AWSConnection `json:",inline" yaml:",inline"` + + Cluster string `json:"cluster"` +} + +func (t *EKSConnection) Populate(ctx ConnectionContext) error { + return t.AWSConnection.Populate(ctx) +} + +func (t *EKSConnection) KubernetesClient(ctx context.Context) (kubernetes.Interface, *rest.Config, error) { + awsConfig, err := t.AWSConnection.Client(ctx) + if err != nil { + return nil, nil, err + } + + eksEndpoint, ca, err := eksClusterDetails(ctx, t.Cluster, awsConfig) + if err != nil { + return nil, nil, err + } + + token, err := getEKSToken(ctx, t.Cluster, awsConfig) + if err != nil { + return nil, nil, fmt.Errorf("failed to get token for EKS: %w", err) + } + + restConfig := &rest.Config{ + Host: eksEndpoint, + BearerToken: token, + TLSClientConfig: rest.TLSClientConfig{ + CAData: ca, + }, + } + + clientset, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return nil, nil, err + } + + return clientset, restConfig, nil +} + +func eksClusterDetails(ctx gocontext.Context, clusterName string, conf aws.Config) (string, []byte, error) { + eksClient := eks.NewFromConfig(conf) + cluster, err := eksClient.DescribeCluster(ctx, &eks.DescribeClusterInput{Name: &clusterName}) + if err != nil { + return "", nil, fmt.Errorf("unable to get cluster info: %v", err) + } + + ca, err := base64.URLEncoding.DecodeString(*cluster.Cluster.CertificateAuthority.Data) + if err != nil { + return "", nil, fmt.Errorf("unable to presign URL: %v", err) + } + + return *cluster.Cluster.Endpoint, ca, nil +} + +func getEKSToken(ctx gocontext.Context, cluster string, conf aws.Config) (string, error) { + cred, err := conf.Credentials.Retrieve(ctx) + if err != nil { + return "", fmt.Errorf("failed to retrive credentials from aws config: %w", err) + } + + signedURI, err := getSignedSTSURI(ctx, cluster, cred) + if err != nil { + return "", fmt.Errorf("failed to get signed URI: %w", err) + } + + token := v1Prefix + base64.RawURLEncoding.EncodeToString([]byte(signedURI)) + return token, nil +} + +func getSignedSTSURI(ctx gocontext.Context, cluster string, cred aws.Credentials) (string, error) { + request, err := http.NewRequest(http.MethodGet, "https://sts.amazonaws.com/?Action=GetCallerIdentity&Version=2011-06-15", nil) + if err != nil { + return "", err + } + + request.Header.Add(clusterIDHeader, cluster) + request.Header.Add("X-Amz-Expires", "0") + signer := signerv4.NewSigner() + signedURI, _, err := signer.PresignHTTP(ctx, cred, request, emptyStringSha256, "sts", "us-east-1", time.Now()) + if err != nil { + return "", err + } + + return signedURI, nil +} diff --git a/connection/environment.go b/connection/environment.go index 9419938e..8e026405 100644 --- a/connection/environment.go +++ b/connection/environment.go @@ -104,14 +104,14 @@ func SetupConnection(ctx context.Context, connections ExecConnections, cmd *osEx return nil, err } else if found { connections.Kubernetes = &KubernetesConnection{} - if err := runtime.DefaultUnstructuredConverter.FromUnstructured(kubeconfig, &connections.Kubernetes.KubeConfig); err != nil { + if err := runtime.DefaultUnstructuredConverter.FromUnstructured(kubeconfig, &connections.Kubernetes.Kubeconfig); err != nil { return nil, err } - if clientSet, restConfig, err := connections.Kubernetes.Populate(ctx.WithNamespace(scraperNamespace)); err != nil { + if clientset, restConfig, err := connections.Kubernetes.Populate(ctx.WithNamespace(scraperNamespace)); err != nil { return nil, fmt.Errorf("failed to hydrate kubernetes connection: %w", err) } else { - ctx = ctx.WithKubernetes(clientSet, restConfig) + ctx = ctx.WithKubernetes(clientset, restConfig) } break @@ -122,10 +122,10 @@ func SetupConnection(ctx context.Context, connections ExecConnections, cmd *osEx if connections.Kubernetes != nil { if lo.FromPtr(connections.FromConfigItem) == "" { - if clientSet, restConfig, err := connections.Kubernetes.Populate(ctx); err != nil { + if clientset, restConfig, err := connections.Kubernetes.Populate(ctx); err != nil { return nil, fmt.Errorf("failed to hydrate kubernetes connection: %w", err) } else { - ctx = ctx.WithKubernetes(clientSet, restConfig) + ctx = ctx.WithKubernetes(clientset, restConfig) } } diff --git a/connection/gcp.go b/connection/gcp.go index aab723ee..7e045af9 100644 --- a/connection/gcp.go +++ b/connection/gcp.go @@ -1,15 +1,12 @@ package connection import ( - "crypto/tls" - "net/http" - - gcs "cloud.google.com/go/storage" + "golang.org/x/oauth2" + "golang.org/x/oauth2/google" "github.com/flanksource/commons/utils" "github.com/flanksource/duty/context" "github.com/flanksource/duty/types" - "google.golang.org/api/option" ) // +kubebuilder:object:generate=true @@ -23,51 +20,27 @@ type GCPConnection struct { SkipTLSVerify bool `yaml:"skipTLSVerify,omitempty" json:"skipTLSVerify,omitempty"` } -func (conn *GCPConnection) Client(ctx context.Context) (*gcs.Client, error) { - conn = conn.Validate() - var client *gcs.Client - var err error - - var clientOpts []option.ClientOption - - if conn.Endpoint != "" { - clientOpts = append(clientOpts, option.WithEndpoint(conn.Endpoint)) - } - - if conn.SkipTLSVerify { - insecureHTTPClient := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - }, - } - - clientOpts = append(clientOpts, option.WithHTTPClient(insecureHTTPClient)) +func (g *GCPConnection) Validate() *GCPConnection { + if g == nil { + return &GCPConnection{} } - if conn.Credentials != nil && !conn.Credentials.IsEmpty() { - credential, err := ctx.GetEnvValueFromCache(*conn.Credentials, ctx.GetNamespace()) - if err != nil { - return nil, err - } - clientOpts = append(clientOpts, option.WithCredentialsJSON([]byte(credential))) - } else { - clientOpts = append(clientOpts, option.WithoutAuthentication()) - } + return g +} - client, err = gcs.NewClient(ctx.Context, clientOpts...) +func (g *GCPConnection) Token(ctx context.Context, scopes ...string) (*oauth2.Token, error) { + creds, err := google.CredentialsFromJSON(ctx, []byte(g.Credentials.ValueStatic), scopes...) if err != nil { return nil, err } - return client, nil -} - -func (g *GCPConnection) Validate() *GCPConnection { - if g == nil { - return &GCPConnection{} + tokenSource := creds.TokenSource + token, err := tokenSource.Token() + if err != nil { + return nil, err } - return g + return token, nil } // HydrateConnection attempts to find the connection by name diff --git a/connection/gcs.go b/connection/gcs.go index dc844ee8..f4c16f77 100644 --- a/connection/gcs.go +++ b/connection/gcs.go @@ -1,7 +1,13 @@ package connection import ( + "crypto/tls" + "net/http" + + gcs "cloud.google.com/go/storage" + "github.com/flanksource/duty/context" "github.com/flanksource/duty/types" + "google.golang.org/api/option" ) // +kubebuilder:object:generate=true @@ -18,6 +24,45 @@ func (g *GCSConnection) Validate() *GCSConnection { return g } +func (g *GCSConnection) Client(ctx context.Context) (*gcs.Client, error) { + g = g.Validate() + var client *gcs.Client + var err error + + var clientOpts []option.ClientOption + + if g.Endpoint != "" { + clientOpts = append(clientOpts, option.WithEndpoint(g.Endpoint)) + } + + if g.SkipTLSVerify { + insecureHTTPClient := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + } + + clientOpts = append(clientOpts, option.WithHTTPClient(insecureHTTPClient)) + } + + if g.Credentials != nil && !g.Credentials.IsEmpty() { + credential, err := ctx.GetEnvValueFromCache(*g.Credentials, ctx.GetNamespace()) + if err != nil { + return nil, err + } + clientOpts = append(clientOpts, option.WithCredentialsJSON([]byte(credential))) + } else { + clientOpts = append(clientOpts, option.WithoutAuthentication()) + } + + client, err = gcs.NewClient(ctx.Context, clientOpts...) + if err != nil { + return nil, err + } + + return client, nil +} + // HydrateConnection attempts to find the connection by name // and populate the endpoint and credentials. func (g *GCSConnection) HydrateConnection(ctx ConnectionContext) error { diff --git a/connection/gke.go b/connection/gke.go new file mode 100644 index 00000000..062e8d0a --- /dev/null +++ b/connection/gke.go @@ -0,0 +1,110 @@ +package connection + +import ( + "crypto/tls" + "encoding/base64" + "fmt" + "net/http" + + "github.com/flanksource/duty/context" + container "google.golang.org/api/container/v1" + "google.golang.org/api/option" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +// +kubebuilder:object:generate=true +type GKEConnection struct { + GCPConnection `json:",inline" yaml:",inline"` + + ProjectID string `json:"projectID"` + Zone string `json:"zone"` + Cluster string `json:"cluster"` +} + +func (t *GKEConnection) Populate(ctx ConnectionContext) error { + return t.GCPConnection.HydrateConnection(ctx) +} + +func (t *GKEConnection) Validate() *GKEConnection { + if t == nil { + return &GKEConnection{} + } + + return t +} + +func (t *GKEConnection) Client(ctx context.Context) (*container.Service, error) { + t = t.Validate() + + var clientOpts []option.ClientOption + + if t.Endpoint != "" { + clientOpts = append(clientOpts, option.WithEndpoint(t.Endpoint)) + } + + if t.SkipTLSVerify { + insecureHTTPClient := &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + } + + clientOpts = append(clientOpts, option.WithHTTPClient(insecureHTTPClient)) + } + + if t.Credentials != nil && !t.Credentials.IsEmpty() { + credential, err := ctx.GetEnvValueFromCache(*t.Credentials, ctx.GetNamespace()) + if err != nil { + return nil, err + } + clientOpts = append(clientOpts, option.WithCredentialsJSON([]byte(credential))) + } else { + clientOpts = append(clientOpts, option.WithoutAuthentication()) + } + + svc, err := container.NewService(ctx, clientOpts...) + if err != nil { + return nil, err + } + + return svc, nil +} + +func (t *GKEConnection) KubernetesClient(ctx context.Context) (kubernetes.Interface, *rest.Config, error) { + containerService, err := t.Client(ctx) + if err != nil { + return nil, nil, err + } + + clusterName := fmt.Sprintf("projects/%s/locations/%s/clusters/%s", t.ProjectID, t.Zone, t.Cluster) + cluster, err := containerService.Projects.Locations.Clusters.Get(clusterName).Context(ctx).Do() + if err != nil { + return nil, nil, fmt.Errorf("failed to get cluster: %w", err) + } + + token, err := t.GCPConnection.Token(ctx, container.CloudPlatformScope) + if err != nil { + return nil, nil, fmt.Errorf("failed to get token for gke: %w", err) + } + + ca, err := base64.URLEncoding.DecodeString(cluster.MasterAuth.ClusterCaCertificate) + if err != nil { + return nil, nil, fmt.Errorf("unable to decode cluster CA certificate: %w", err) + } + + restConfig := &rest.Config{ + Host: "https://" + cluster.Endpoint, + BearerToken: token.AccessToken, + TLSClientConfig: rest.TLSClientConfig{ + CAData: ca, + }, + } + + clientset, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return nil, nil, err + } + + return clientset, restConfig, nil +} diff --git a/connection/kubernetes.go b/connection/kubernetes.go index a50be40f..56eee01b 100644 --- a/connection/kubernetes.go +++ b/connection/kubernetes.go @@ -1,15 +1,8 @@ package connection import ( - gocontext "context" - "encoding/base64" "fmt" - "net/http" - "time" - "github.com/aws/aws-sdk-go-v2/aws" - signerv4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4" - "github.com/aws/aws-sdk-go-v2/service/eks" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -19,34 +12,14 @@ import ( "github.com/flanksource/duty/types" ) -const ( - clusterIDHeader = "x-k8s-aws-id" - emptyStringSha256 = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855" - v1Prefix = "k8s-aws-v1." -) - -// +kubebuilder:object:generate=true -type EKSConnection struct { - AWSConnection `json:",inline" yaml:",inline"` - - Cluster string `json:"cluster"` -} - // +kubebuilder:object:generate=true -type KubernetesConnection struct { - ConnectionName string `json:"connection,omitempty"` - KubeConfig *types.EnvVar `json:"kubeconfig,omitempty"` - EKS *EKSConnection `json:"eks,omitempty"` -} - -func (t KubernetesConnection) ToModel() models.Connection { - return models.Connection{ - Type: models.ConnectionTypeKubernetes, - Certificate: t.KubeConfig.ValueStatic, - } +type KubeconfigConnection struct { + // Connection name to populate kubeconfig + ConnectionName string `json:"connection,omitempty"` + Kubeconfig *types.EnvVar `json:"kubeconfig,omitempty"` } -func (t *KubernetesConnection) Populate(ctx context.Context) (kubernetes.Interface, *rest.Config, error) { +func (t *KubeconfigConnection) Populate(ctx context.Context) (kubernetes.Interface, *rest.Config, error) { if t.ConnectionName != "" { connection, err := ctx.HydrateConnectionByURL(t.ConnectionName) if err != nil { @@ -55,101 +28,68 @@ func (t *KubernetesConnection) Populate(ctx context.Context) (kubernetes.Interfa return nil, nil, fmt.Errorf("connection[%s] not found", t.ConnectionName) } - t.KubeConfig.ValueStatic = connection.Certificate + t.Kubeconfig.ValueStatic = connection.Certificate } - if t.KubeConfig != nil { - if v, err := ctx.GetEnvValueFromCache(*t.KubeConfig, ctx.GetNamespace()); err != nil { + if t.Kubeconfig != nil { + if v, err := ctx.GetEnvValueFromCache(*t.Kubeconfig, ctx.GetNamespace()); err != nil { return nil, nil, err } else { - t.KubeConfig.ValueStatic = v - } - - return dutyKubernetes.NewClientFromPathOrConfig(ctx.Logger, t.KubeConfig.ValueStatic) - } - - if t.EKS != nil { - if err := t.EKS.Populate(ctx); err != nil { - return nil, nil, err - } - - awsConfig, err := t.EKS.AWSConnection.Client(ctx) - if err != nil { - return nil, nil, err - } - - eksEndpoint, ca, err := eksClusterDetails(ctx, t.EKS.Cluster, awsConfig) - if err != nil { - return nil, nil, err - } - - token, err := getEKSToken(ctx, t.EKS.Cluster, awsConfig) - if err != nil { - return nil, nil, fmt.Errorf("failed to get token for EKS: %w", err) - } - - restConfig := &rest.Config{ - Host: eksEndpoint, - BearerToken: token, - TLSClientConfig: rest.TLSClientConfig{ - CAData: ca, - }, - } - - clientset, err := kubernetes.NewForConfig(restConfig) - if err != nil { - return nil, nil, err + t.Kubeconfig.ValueStatic = v } - return clientset, restConfig, nil + return dutyKubernetes.NewClientFromPathOrConfig(ctx.Logger, t.Kubeconfig.ValueStatic) } return nil, nil, nil } -func eksClusterDetails(ctx gocontext.Context, clusterName string, conf aws.Config) (string, []byte, error) { - eksClient := eks.NewFromConfig(conf) - cluster, err := eksClient.DescribeCluster(ctx, &eks.DescribeClusterInput{Name: &clusterName}) - if err != nil { - return "", nil, fmt.Errorf("unable to get cluster info: %v", err) - } +// +kubebuilder:object:generate=true +type KubernetesConnection struct { + KubeconfigConnection `json:",inline"` - ca, err := base64.URLEncoding.DecodeString(*cluster.Cluster.CertificateAuthority.Data) - if err != nil { - return "", nil, fmt.Errorf("unable to presign URL: %v", err) - } + EKS *EKSConnection `json:"eks,omitempty"` + GKE *GKEConnection `json:"gke,omitempty"` + CNRM *CNRMConnection `json:"cnrm,omitempty"` +} - return *cluster.Cluster.Endpoint, ca, nil +func (t KubernetesConnection) ToModel() models.Connection { + return models.Connection{ + Type: models.ConnectionTypeKubernetes, + Certificate: t.Kubeconfig.ValueStatic, + } } -func getEKSToken(ctx gocontext.Context, cluster string, conf aws.Config) (string, error) { - cred, err := conf.Credentials.Retrieve(ctx) - if err != nil { - return "", fmt.Errorf("failed to retrive credentials from aws config: %w", err) +func (t *KubernetesConnection) Populate(ctx context.Context) (kubernetes.Interface, *rest.Config, error) { + if clientset, restConfig, err := t.KubeconfigConnection.Populate(ctx); err != nil { + return nil, nil, nil + } else if clientset != nil { + return clientset, restConfig, nil } - signedURI, err := getSignedURI(ctx, cluster, cred) - if err != nil { - return "", fmt.Errorf("failed to get signed URI: %w", err) + if t.GKE != nil { + if err := t.GKE.Populate(ctx); err != nil { + return nil, nil, err + } + + return t.GKE.KubernetesClient(ctx) } - token := v1Prefix + base64.RawURLEncoding.EncodeToString([]byte(signedURI)) - return token, nil -} + if t.EKS != nil { + if err := t.EKS.Populate(ctx); err != nil { + return nil, nil, err + } -func getSignedURI(ctx gocontext.Context, cluster string, cred aws.Credentials) (string, error) { - request, err := http.NewRequest(http.MethodGet, "https://sts.amazonaws.com/?Action=GetCallerIdentity&Version=2011-06-15", nil) - if err != nil { - return "", err + return t.EKS.KubernetesClient(ctx) } - request.Header.Add(clusterIDHeader, cluster) - request.Header.Add("X-Amz-Expires", "0") - signer := signerv4.NewSigner() - signedURI, _, err := signer.PresignHTTP(ctx, cred, request, emptyStringSha256, "sts", "us-east-1", time.Now()) - if err != nil { - return "", err + if t.CNRM != nil { + if err := t.CNRM.Populate(ctx); err != nil { + return nil, nil, err + } + + return t.CNRM.KubernetesClient(ctx) } - return signedURI, nil + return nil, nil, nil } diff --git a/connection/zz_generated.deepcopy.go b/connection/zz_generated.deepcopy.go index eabf0f8f..63300d64 100644 --- a/connection/zz_generated.deepcopy.go +++ b/connection/zz_generated.deepcopy.go @@ -51,6 +51,22 @@ func (in *AzureConnection) DeepCopy() *AzureConnection { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *CNRMConnection) DeepCopyInto(out *CNRMConnection) { + *out = *in + in.GKE.DeepCopyInto(&out.GKE) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new CNRMConnection. +func (in *CNRMConnection) DeepCopy() *CNRMConnection { + if in == nil { + return nil + } + out := new(CNRMConnection) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *EKSConnection) DeepCopyInto(out *EKSConnection) { *out = *in @@ -143,6 +159,22 @@ func (in *GCSConnection) DeepCopy() *GCSConnection { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *GKEConnection) DeepCopyInto(out *GKEConnection) { + *out = *in + in.GCPConnection.DeepCopyInto(&out.GCPConnection) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GKEConnection. +func (in *GKEConnection) DeepCopy() *GKEConnection { + if in == nil { + return nil + } + out := new(GKEConnection) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GitConnection) DeepCopyInto(out *GitConnection) { *out = *in @@ -198,18 +230,44 @@ func (in *HTTPConnection) DeepCopy() *HTTPConnection { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *KubernetesConnection) DeepCopyInto(out *KubernetesConnection) { +func (in *KubeconfigConnection) DeepCopyInto(out *KubeconfigConnection) { *out = *in - if in.KubeConfig != nil { - in, out := &in.KubeConfig, &out.KubeConfig + if in.Kubeconfig != nil { + in, out := &in.Kubeconfig, &out.Kubeconfig *out = new(types.EnvVar) (*in).DeepCopyInto(*out) } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KubeconfigConnection. +func (in *KubeconfigConnection) DeepCopy() *KubeconfigConnection { + if in == nil { + return nil + } + out := new(KubeconfigConnection) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *KubernetesConnection) DeepCopyInto(out *KubernetesConnection) { + *out = *in + in.KubeconfigConnection.DeepCopyInto(&out.KubeconfigConnection) if in.EKS != nil { in, out := &in.EKS, &out.EKS *out = new(EKSConnection) (*in).DeepCopyInto(*out) } + if in.GKE != nil { + in, out := &in.GKE, &out.GKE + *out = new(GKEConnection) + (*in).DeepCopyInto(*out) + } + if in.CNRM != nil { + in, out := &in.CNRM, &out.CNRM + *out = new(CNRMConnection) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new KubernetesConnection. diff --git a/go.mod b/go.mod index cc5c3b2a..a7dd29e0 100644 --- a/go.mod +++ b/go.mod @@ -66,6 +66,7 @@ require ( go.opentelemetry.io/otel/trace v1.24.0 golang.org/x/crypto v0.27.0 golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 + golang.org/x/oauth2 v0.21.0 golang.org/x/sync v0.8.0 google.golang.org/api v0.169.0 google.golang.org/grpc v1.65.0 @@ -238,7 +239,6 @@ require ( go.starlark.net v0.0.0-20231121155337-90ade8b19d09 // indirect golang.org/x/mod v0.20.0 // indirect golang.org/x/net v0.29.0 // indirect - golang.org/x/oauth2 v0.21.0 // indirect golang.org/x/sys v0.25.0 // indirect golang.org/x/term v0.24.0 // indirect golang.org/x/text v0.18.0 // indirect From 21206414ad56eba827bf51c8d27249ece94ec079 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Tue, 3 Dec 2024 18:58:33 +0545 Subject: [PATCH 3/5] feat: cache cloud tokens --- connection/common.go | 22 ++++++++++++++++++++++ connection/eks.go | 6 ++++++ connection/gcp.go | 10 ++++++++++ 3 files changed, 38 insertions(+) create mode 100644 connection/common.go diff --git a/connection/common.go b/connection/common.go new file mode 100644 index 00000000..a0b45f7b --- /dev/null +++ b/connection/common.go @@ -0,0 +1,22 @@ +package connection + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/patrickmn/go-cache" +) + +// caches cloud tokens. eg: EKS token, GKE token, ... +var tokenCache = cache.New(time.Hour, time.Hour) + +func tokenCacheKey(cloud string, cred any, identifiers string) string { + switch v := cred.(type) { + case string: + return fmt.Sprintf("%s-%s-%s", cloud, v, identifiers) + default: + m, _ := json.Marshal(v) + return fmt.Sprintf("%s-%s-%s", cloud, m, identifiers) + } +} diff --git a/connection/eks.go b/connection/eks.go index 57753e06..4451a339 100644 --- a/connection/eks.go +++ b/connection/eks.go @@ -85,12 +85,18 @@ func getEKSToken(ctx gocontext.Context, cluster string, conf aws.Config) (string return "", fmt.Errorf("failed to retrive credentials from aws config: %w", err) } + cacheKey := tokenCacheKey("eks", cred, cluster) + if found, ok := tokenCache.Get(cacheKey); ok { + return found.(string), nil + } + signedURI, err := getSignedSTSURI(ctx, cluster, cred) if err != nil { return "", fmt.Errorf("failed to get signed URI: %w", err) } token := v1Prefix + base64.RawURLEncoding.EncodeToString([]byte(signedURI)) + tokenCache.Set(cacheKey, token, time.Minute*15) return token, nil } diff --git a/connection/gcp.go b/connection/gcp.go index 7e045af9..e8bc26d9 100644 --- a/connection/gcp.go +++ b/connection/gcp.go @@ -1,9 +1,13 @@ package connection import ( + "strings" + "time" + "golang.org/x/oauth2" "golang.org/x/oauth2/google" + "github.com/flanksource/commons/hash" "github.com/flanksource/commons/utils" "github.com/flanksource/duty/context" "github.com/flanksource/duty/types" @@ -29,6 +33,11 @@ func (g *GCPConnection) Validate() *GCPConnection { } func (g *GCPConnection) Token(ctx context.Context, scopes ...string) (*oauth2.Token, error) { + cacheKey := tokenCacheKey("gcp", hash.Sha256Hex(g.Credentials.ValueStatic), strings.Join(scopes, ",")) + if found, ok := tokenCache.Get(cacheKey); ok { + return found.(*oauth2.Token), nil + } + creds, err := google.CredentialsFromJSON(ctx, []byte(g.Credentials.ValueStatic), scopes...) if err != nil { return nil, err @@ -40,6 +49,7 @@ func (g *GCPConnection) Token(ctx context.Context, scopes ...string) (*oauth2.To return nil, err } + tokenCache.Set(cacheKey, token, time.Until(token.Expiry)) return token, nil } From 646f4e891dab4428ace354d3659130f296e1eaa0 Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Wed, 4 Dec 2024 17:26:50 +0545 Subject: [PATCH 4/5] fix: google cloud connection --- connection/aws.go | 6 +++--- connection/gcp.go | 9 +++++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/connection/aws.go b/connection/aws.go index 4f678e56..ba97c2fd 100644 --- a/connection/aws.go +++ b/connection/aws.go @@ -95,19 +95,19 @@ func (t *AWSConnection) Populate(ctx ConnectionContext) error { } if accessKey, err := ctx.GetEnvValueFromCache(t.AccessKey, ctx.GetNamespace()); err != nil { - return fmt.Errorf("could not parse AWS access key id: %v", err) + return fmt.Errorf("could not get AWS access key id from env var: %w", err) } else { t.AccessKey.ValueStatic = accessKey } if secretKey, err := ctx.GetEnvValueFromCache(t.SecretKey, ctx.GetNamespace()); err != nil { - return fmt.Errorf("could not parse AWS secret access key: %w", err) + return fmt.Errorf("could not get AWS secret access key from env var: %w", err) } else { t.SecretKey.ValueStatic = secretKey } if sessionToken, err := ctx.GetEnvValueFromCache(t.SessionToken, ctx.GetNamespace()); err != nil { - return fmt.Errorf("could not parse AWS session token: %w", err) + return fmt.Errorf("could not get AWS session token from env var: %w", err) } else { t.SessionToken.ValueStatic = sessionToken } diff --git a/connection/gcp.go b/connection/gcp.go index e8bc26d9..61b122b3 100644 --- a/connection/gcp.go +++ b/connection/gcp.go @@ -1,6 +1,7 @@ package connection import ( + "fmt" "strings" "time" @@ -66,6 +67,14 @@ func (g *GCPConnection) HydrateConnection(ctx ConnectionContext) error { g.Endpoint = connection.URL } + if g.Credentials != nil { + if cred, err := ctx.GetEnvValueFromCache(*g.Credentials, ctx.GetNamespace()); err != nil { + return fmt.Errorf("could get gcloud credentials from env var: %w", err) + } else { + g.Credentials.ValueStatic = cred + } + } + return nil } From d558164592a840d0d6cb55eb99735c2ad5cc6a1b Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Wed, 4 Dec 2024 17:44:55 +0545 Subject: [PATCH 5/5] feat: add an option to create kuberenetes clients with fresh/cached token. --- connection/cnrm.go | 10 +++++----- connection/common.go | 7 +++++++ connection/eks.go | 15 +++++++++------ connection/environment.go | 4 ++-- connection/gcp.go | 10 ++++++---- connection/gke.go | 4 ++-- connection/kubernetes.go | 8 ++++---- 7 files changed, 35 insertions(+), 23 deletions(-) diff --git a/connection/cnrm.go b/connection/cnrm.go index 1d6f31b3..77b667b9 100644 --- a/connection/cnrm.go +++ b/connection/cnrm.go @@ -25,8 +25,8 @@ func (t *CNRMConnection) Populate(ctx ConnectionContext) error { return t.GKE.Populate(ctx) } -func (t *CNRMConnection) KubernetesClient(ctx context.Context) (kubernetes.Interface, *rest.Config, error) { - cnrmCluster, restConfig, err := t.GKE.KubernetesClient(ctx) +func (t *CNRMConnection) KubernetesClient(ctx context.Context, freshToken bool) (kubernetes.Interface, *rest.Config, error) { + cnrmCluster, restConfig, err := t.GKE.KubernetesClient(ctx, freshToken) if err != nil { return nil, nil, fmt.Errorf("failed to create kubernetes client for GKE: %w", err) } @@ -42,7 +42,7 @@ func (t *CNRMConnection) KubernetesClient(ctx context.Context) (kubernetes.Inter return nil, nil, err } - clusterResourceRestConfig, err := t.createRestConfigForClusteResource(ctx, obj) + clusterResourceRestConfig, err := t.createRestConfigForClusteResource(ctx, freshToken, obj) if err != nil { return nil, nil, err } @@ -55,7 +55,7 @@ func (t *CNRMConnection) KubernetesClient(ctx context.Context) (kubernetes.Inter return clientset, restConfig, nil } -func (t *CNRMConnection) createRestConfigForClusteResource(ctx context.Context, clusterObj *unstructured.Unstructured) (*rest.Config, error) { +func (t *CNRMConnection) createRestConfigForClusteResource(ctx context.Context, freshToken bool, clusterObj *unstructured.Unstructured) (*rest.Config, error) { endpoint, found, err := unstructured.NestedString(clusterObj.Object, "status", "endpoint") if err != nil || !found { return nil, fmt.Errorf("failed to extract cluster endpoint from cluster resource: %w", err) @@ -71,7 +71,7 @@ func (t *CNRMConnection) createRestConfigForClusteResource(ctx context.Context, return nil, fmt.Errorf("unable to decode cluster CA certificate: %w", err) } - token, err := t.GKE.Token(ctx, container.CloudPlatformScope) + token, err := t.GKE.Token(ctx, freshToken, container.CloudPlatformScope) if err != nil { return nil, fmt.Errorf("failed to get token for gke: %w", err) } diff --git a/connection/common.go b/connection/common.go index a0b45f7b..6cfa9cf6 100644 --- a/connection/common.go +++ b/connection/common.go @@ -8,6 +8,13 @@ import ( "github.com/patrickmn/go-cache" ) +// tokenSafetyMargin is the duration to deduct when caching tokens. +// Example: if a token expires in 15 minutes then we cache it with a +// TTL of 15 minutes - tokenSafetyMargin = 10 minutes. +// +// This is to prevent using a token that immediately expires. +const tokenSafetyMargin = time.Minute * 5 + // caches cloud tokens. eg: EKS token, GKE token, ... var tokenCache = cache.New(time.Hour, time.Hour) diff --git a/connection/eks.go b/connection/eks.go index 4451a339..681650d5 100644 --- a/connection/eks.go +++ b/connection/eks.go @@ -32,7 +32,7 @@ func (t *EKSConnection) Populate(ctx ConnectionContext) error { return t.AWSConnection.Populate(ctx) } -func (t *EKSConnection) KubernetesClient(ctx context.Context) (kubernetes.Interface, *rest.Config, error) { +func (t *EKSConnection) KubernetesClient(ctx context.Context, freshToken bool) (kubernetes.Interface, *rest.Config, error) { awsConfig, err := t.AWSConnection.Client(ctx) if err != nil { return nil, nil, err @@ -43,7 +43,7 @@ func (t *EKSConnection) KubernetesClient(ctx context.Context) (kubernetes.Interf return nil, nil, err } - token, err := getEKSToken(ctx, t.Cluster, awsConfig) + token, err := getEKSToken(ctx, t.Cluster, awsConfig, freshToken) if err != nil { return nil, nil, fmt.Errorf("failed to get token for EKS: %w", err) } @@ -79,15 +79,17 @@ func eksClusterDetails(ctx gocontext.Context, clusterName string, conf aws.Confi return *cluster.Cluster.Endpoint, ca, nil } -func getEKSToken(ctx gocontext.Context, cluster string, conf aws.Config) (string, error) { +func getEKSToken(ctx gocontext.Context, cluster string, conf aws.Config, freshToken bool) (string, error) { cred, err := conf.Credentials.Retrieve(ctx) if err != nil { return "", fmt.Errorf("failed to retrive credentials from aws config: %w", err) } cacheKey := tokenCacheKey("eks", cred, cluster) - if found, ok := tokenCache.Get(cacheKey); ok { - return found.(string), nil + if !freshToken { + if found, ok := tokenCache.Get(cacheKey); ok { + return found.(string), nil + } } signedURI, err := getSignedSTSURI(ctx, cluster, cred) @@ -96,7 +98,8 @@ func getEKSToken(ctx gocontext.Context, cluster string, conf aws.Config) (string } token := v1Prefix + base64.RawURLEncoding.EncodeToString([]byte(signedURI)) - tokenCache.Set(cacheKey, token, time.Minute*15) + tokenTTL := time.Minute * 15 + tokenCache.Set(cacheKey, token, tokenTTL-tokenSafetyMargin) return token, nil } diff --git a/connection/environment.go b/connection/environment.go index 8e026405..47313e98 100644 --- a/connection/environment.go +++ b/connection/environment.go @@ -108,7 +108,7 @@ func SetupConnection(ctx context.Context, connections ExecConnections, cmd *osEx return nil, err } - if clientset, restConfig, err := connections.Kubernetes.Populate(ctx.WithNamespace(scraperNamespace)); err != nil { + if clientset, restConfig, err := connections.Kubernetes.Populate(ctx.WithNamespace(scraperNamespace), true); err != nil { return nil, fmt.Errorf("failed to hydrate kubernetes connection: %w", err) } else { ctx = ctx.WithKubernetes(clientset, restConfig) @@ -122,7 +122,7 @@ func SetupConnection(ctx context.Context, connections ExecConnections, cmd *osEx if connections.Kubernetes != nil { if lo.FromPtr(connections.FromConfigItem) == "" { - if clientset, restConfig, err := connections.Kubernetes.Populate(ctx); err != nil { + if clientset, restConfig, err := connections.Kubernetes.Populate(ctx, true); err != nil { return nil, fmt.Errorf("failed to hydrate kubernetes connection: %w", err) } else { ctx = ctx.WithKubernetes(clientset, restConfig) diff --git a/connection/gcp.go b/connection/gcp.go index 61b122b3..d59e1d0b 100644 --- a/connection/gcp.go +++ b/connection/gcp.go @@ -33,10 +33,12 @@ func (g *GCPConnection) Validate() *GCPConnection { return g } -func (g *GCPConnection) Token(ctx context.Context, scopes ...string) (*oauth2.Token, error) { +func (g *GCPConnection) Token(ctx context.Context, freshToken bool, scopes ...string) (*oauth2.Token, error) { cacheKey := tokenCacheKey("gcp", hash.Sha256Hex(g.Credentials.ValueStatic), strings.Join(scopes, ",")) - if found, ok := tokenCache.Get(cacheKey); ok { - return found.(*oauth2.Token), nil + if !freshToken { + if found, ok := tokenCache.Get(cacheKey); ok { + return found.(*oauth2.Token), nil + } } creds, err := google.CredentialsFromJSON(ctx, []byte(g.Credentials.ValueStatic), scopes...) @@ -50,7 +52,7 @@ func (g *GCPConnection) Token(ctx context.Context, scopes ...string) (*oauth2.To return nil, err } - tokenCache.Set(cacheKey, token, time.Until(token.Expiry)) + tokenCache.Set(cacheKey, token, time.Until(token.Expiry)-tokenSafetyMargin) return token, nil } diff --git a/connection/gke.go b/connection/gke.go index 062e8d0a..50946478 100644 --- a/connection/gke.go +++ b/connection/gke.go @@ -71,7 +71,7 @@ func (t *GKEConnection) Client(ctx context.Context) (*container.Service, error) return svc, nil } -func (t *GKEConnection) KubernetesClient(ctx context.Context) (kubernetes.Interface, *rest.Config, error) { +func (t *GKEConnection) KubernetesClient(ctx context.Context, freshToken bool) (kubernetes.Interface, *rest.Config, error) { containerService, err := t.Client(ctx) if err != nil { return nil, nil, err @@ -83,7 +83,7 @@ func (t *GKEConnection) KubernetesClient(ctx context.Context) (kubernetes.Interf return nil, nil, fmt.Errorf("failed to get cluster: %w", err) } - token, err := t.GCPConnection.Token(ctx, container.CloudPlatformScope) + token, err := t.GCPConnection.Token(ctx, freshToken, container.CloudPlatformScope) if err != nil { return nil, nil, fmt.Errorf("failed to get token for gke: %w", err) } diff --git a/connection/kubernetes.go b/connection/kubernetes.go index 56eee01b..bc1649d8 100644 --- a/connection/kubernetes.go +++ b/connection/kubernetes.go @@ -60,7 +60,7 @@ func (t KubernetesConnection) ToModel() models.Connection { } } -func (t *KubernetesConnection) Populate(ctx context.Context) (kubernetes.Interface, *rest.Config, error) { +func (t *KubernetesConnection) Populate(ctx context.Context, freshToken bool) (kubernetes.Interface, *rest.Config, error) { if clientset, restConfig, err := t.KubeconfigConnection.Populate(ctx); err != nil { return nil, nil, nil } else if clientset != nil { @@ -72,7 +72,7 @@ func (t *KubernetesConnection) Populate(ctx context.Context) (kubernetes.Interfa return nil, nil, err } - return t.GKE.KubernetesClient(ctx) + return t.GKE.KubernetesClient(ctx, freshToken) } if t.EKS != nil { @@ -80,7 +80,7 @@ func (t *KubernetesConnection) Populate(ctx context.Context) (kubernetes.Interfa return nil, nil, err } - return t.EKS.KubernetesClient(ctx) + return t.EKS.KubernetesClient(ctx, freshToken) } if t.CNRM != nil { @@ -88,7 +88,7 @@ func (t *KubernetesConnection) Populate(ctx context.Context) (kubernetes.Interfa return nil, nil, err } - return t.CNRM.KubernetesClient(ctx) + return t.CNRM.KubernetesClient(ctx, freshToken) } return nil, nil, nil