Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: extend kubernetes connection #1208

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions connection/cnrm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package connection

import (
"encoding/base64"
"fmt"

"github.com/flanksource/duty/context"
dutyKube "github.com/flanksource/duty/kubernetes"
container "google.golang.org/api/container/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

// +kubebuilder:object:generate=true
type CNRMConnection struct {
GKE GKEConnection `json:"gke" yaml:"gke"`

ClusterResource string `json:"clusterResource"`
ClusterResourceNamespace string `json:"clusterResourceNamespace"`
}

func (t *CNRMConnection) Populate(ctx ConnectionContext) error {
return t.GKE.Populate(ctx)
}

func (t *CNRMConnection) KubernetesClient(ctx context.Context) (kubernetes.Interface, *rest.Config, error) {
cnrmCluster, restConfig, err := t.GKE.KubernetesClient(ctx)
if err != nil {
return nil, nil, fmt.Errorf("failed to create kubernetes client for GKE: %w", err)
}

containerResourceKubeClient, err := dutyKube.NewKubeClient(cnrmCluster, restConfig).
GetClientByGroupVersionKind("container.cnrm.cloud.google.com", "v1beta1", "ContainerCluster")
if err != nil {
return nil, nil, err
}

obj, err := containerResourceKubeClient.Namespace(t.ClusterResourceNamespace).Get(ctx, t.ClusterResource, metav1.GetOptions{})
if err != nil {
return nil, nil, err
}

clusterResourceRestConfig, err := t.createRestConfigForClusteResource(ctx, obj)
if err != nil {
return nil, nil, err
}

clientset, err := kubernetes.NewForConfig(clusterResourceRestConfig)
if err != nil {
return nil, nil, err
}

return clientset, restConfig, nil
}

func (t *CNRMConnection) createRestConfigForClusteResource(ctx context.Context, clusterObj *unstructured.Unstructured) (*rest.Config, error) {
endpoint, found, err := unstructured.NestedString(clusterObj.Object, "status", "endpoint")
if err != nil || !found {
return nil, fmt.Errorf("failed to extract cluster endpoint from cluster resource: %w", err)
}

caCertB64, found, err := unstructured.NestedString(clusterObj.Object, "spec", "masterAuth", "clusterCaCertificate")
if err != nil || !found {
return nil, fmt.Errorf("failed to extract cluster CA certificate from cluster resource: %w", err)
}

ca, err := base64.URLEncoding.DecodeString(caCertB64)
if err != nil {
return nil, fmt.Errorf("unable to decode cluster CA certificate: %w", err)
}

token, err := t.GKE.Token(ctx, container.CloudPlatformScope)
if err != nil {
return nil, fmt.Errorf("failed to get token for gke: %w", err)
}

return &rest.Config{
Host: endpoint,
TLSClientConfig: rest.TLSClientConfig{
CAData: ca,
},
BearerToken: token.AccessToken,
}, nil
}
22 changes: 22 additions & 0 deletions connection/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package connection

import (
"encoding/json"
"fmt"
"time"

"github.com/patrickmn/go-cache"
)

// caches cloud tokens. eg: EKS token, GKE token, ...
var tokenCache = cache.New(time.Hour, time.Hour)

func tokenCacheKey(cloud string, cred any, identifiers string) string {
switch v := cred.(type) {
case string:
return fmt.Sprintf("%s-%s-%s", cloud, v, identifiers)
default:
m, _ := json.Marshal(v)
return fmt.Sprintf("%s-%s-%s", cloud, m, identifiers)
}
}
118 changes: 118 additions & 0 deletions connection/eks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package connection

import (
gocontext "context"
"encoding/base64"
"fmt"
"net/http"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
signerv4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
"github.com/aws/aws-sdk-go-v2/service/eks"
"github.com/flanksource/duty/context"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

const (
clusterIDHeader = "x-k8s-aws-id"
emptyStringSha256 = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855"
v1Prefix = "k8s-aws-v1."
)

// +kubebuilder:object:generate=true
type EKSConnection struct {
AWSConnection `json:",inline" yaml:",inline"`

Cluster string `json:"cluster"`
}

func (t *EKSConnection) Populate(ctx ConnectionContext) error {
return t.AWSConnection.Populate(ctx)
}

func (t *EKSConnection) KubernetesClient(ctx context.Context) (kubernetes.Interface, *rest.Config, error) {
awsConfig, err := t.AWSConnection.Client(ctx)
if err != nil {
return nil, nil, err
}

eksEndpoint, ca, err := eksClusterDetails(ctx, t.Cluster, awsConfig)
if err != nil {
return nil, nil, err
}

token, err := getEKSToken(ctx, t.Cluster, awsConfig)
if err != nil {
return nil, nil, fmt.Errorf("failed to get token for EKS: %w", err)
}

restConfig := &rest.Config{
Host: eksEndpoint,
BearerToken: token,
TLSClientConfig: rest.TLSClientConfig{
CAData: ca,
},
}

clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, nil, err
}

return clientset, restConfig, nil
}

func eksClusterDetails(ctx gocontext.Context, clusterName string, conf aws.Config) (string, []byte, error) {
eksClient := eks.NewFromConfig(conf)
cluster, err := eksClient.DescribeCluster(ctx, &eks.DescribeClusterInput{Name: &clusterName})
if err != nil {
return "", nil, fmt.Errorf("unable to get cluster info: %v", err)
}

ca, err := base64.URLEncoding.DecodeString(*cluster.Cluster.CertificateAuthority.Data)
if err != nil {
return "", nil, fmt.Errorf("unable to presign URL: %v", err)
}

return *cluster.Cluster.Endpoint, ca, nil
}

func getEKSToken(ctx gocontext.Context, cluster string, conf aws.Config) (string, error) {
cred, err := conf.Credentials.Retrieve(ctx)
if err != nil {
return "", fmt.Errorf("failed to retrive credentials from aws config: %w", err)
}

cacheKey := tokenCacheKey("eks", cred, cluster)
if found, ok := tokenCache.Get(cacheKey); ok {
return found.(string), nil
}

signedURI, err := getSignedSTSURI(ctx, cluster, cred)
if err != nil {
return "", fmt.Errorf("failed to get signed URI: %w", err)
}

token := v1Prefix + base64.RawURLEncoding.EncodeToString([]byte(signedURI))
tokenCache.Set(cacheKey, token, time.Minute*15)
adityathebe marked this conversation as resolved.
Show resolved Hide resolved
return token, nil
}

func getSignedSTSURI(ctx gocontext.Context, cluster string, cred aws.Credentials) (string, error) {
request, err := http.NewRequest(http.MethodGet, "https://sts.amazonaws.com/?Action=GetCallerIdentity&Version=2011-06-15", nil)
if err != nil {
return "", err
}

request.Header.Add(clusterIDHeader, cluster)
request.Header.Add("X-Amz-Expires", "0")
signer := signerv4.NewSigner()
signedURI, _, err := signer.PresignHTTP(ctx, cred, request, emptyStringSha256, "sts", "us-east-1", time.Now())
if err != nil {
return "", err
}

return signedURI, nil
}
10 changes: 7 additions & 3 deletions connection/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,14 @@ func SetupConnection(ctx context.Context, connections ExecConnections, cmd *osEx
return nil, err
} else if found {
connections.Kubernetes = &KubernetesConnection{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(kubeconfig, &connections.Kubernetes.KubeConfig); err != nil {
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(kubeconfig, &connections.Kubernetes.Kubeconfig); err != nil {
return nil, err
}

if err := connections.Kubernetes.Populate(ctx.WithNamespace(scraperNamespace)); err != nil {
if clientset, restConfig, err := connections.Kubernetes.Populate(ctx.WithNamespace(scraperNamespace)); err != nil {
return nil, fmt.Errorf("failed to hydrate kubernetes connection: %w", err)
} else {
ctx = ctx.WithKubernetes(clientset, restConfig)
}

break
Expand All @@ -120,8 +122,10 @@ func SetupConnection(ctx context.Context, connections ExecConnections, cmd *osEx

if connections.Kubernetes != nil {
if lo.FromPtr(connections.FromConfigItem) == "" {
if err := connections.Kubernetes.Populate(ctx); err != nil {
if clientset, restConfig, err := connections.Kubernetes.Populate(ctx); err != nil {
return nil, fmt.Errorf("failed to hydrate kubernetes connection: %w", err)
} else {
ctx = ctx.WithKubernetes(clientset, restConfig)
}
}

Expand Down
59 changes: 21 additions & 38 deletions connection/gcp.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package connection

import (
"crypto/tls"
"net/http"
"strings"
"time"

gcs "cloud.google.com/go/storage"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"

"github.com/flanksource/commons/hash"
"github.com/flanksource/commons/utils"
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/types"
"google.golang.org/api/option"
)

// +kubebuilder:object:generate=true
Expand All @@ -23,51 +24,33 @@ type GCPConnection struct {
SkipTLSVerify bool `yaml:"skipTLSVerify,omitempty" json:"skipTLSVerify,omitempty"`
}

func (conn *GCPConnection) Client(ctx context.Context) (*gcs.Client, error) {
conn = conn.Validate()
var client *gcs.Client
var err error

var clientOpts []option.ClientOption

if conn.Endpoint != "" {
clientOpts = append(clientOpts, option.WithEndpoint(conn.Endpoint))
func (g *GCPConnection) Validate() *GCPConnection {
if g == nil {
return &GCPConnection{}
}

if conn.SkipTLSVerify {
insecureHTTPClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
}

clientOpts = append(clientOpts, option.WithHTTPClient(insecureHTTPClient))
}
return g
}

if conn.Credentials != nil && !conn.Credentials.IsEmpty() {
credential, err := ctx.GetEnvValueFromCache(*conn.Credentials, ctx.GetNamespace())
if err != nil {
return nil, err
}
clientOpts = append(clientOpts, option.WithCredentialsJSON([]byte(credential)))
} else {
clientOpts = append(clientOpts, option.WithoutAuthentication())
func (g *GCPConnection) Token(ctx context.Context, scopes ...string) (*oauth2.Token, error) {
cacheKey := tokenCacheKey("gcp", hash.Sha256Hex(g.Credentials.ValueStatic), strings.Join(scopes, ","))
if found, ok := tokenCache.Get(cacheKey); ok {
return found.(*oauth2.Token), nil
}

client, err = gcs.NewClient(ctx.Context, clientOpts...)
creds, err := google.CredentialsFromJSON(ctx, []byte(g.Credentials.ValueStatic), scopes...)
if err != nil {
return nil, err
}

return client, nil
}

func (g *GCPConnection) Validate() *GCPConnection {
if g == nil {
return &GCPConnection{}
tokenSource := creds.TokenSource
token, err := tokenSource.Token()
if err != nil {
return nil, err
}

return g
tokenCache.Set(cacheKey, token, time.Until(token.Expiry))
return token, nil
}

// HydrateConnection attempts to find the connection by name
Expand Down
Loading
Loading