diff --git a/cmd/kubectl-testkube/commands/root.go b/cmd/kubectl-testkube/commands/root.go index 9e3bc87718..17464b48de 100644 --- a/cmd/kubectl-testkube/commands/root.go +++ b/cmd/kubectl-testkube/commands/root.go @@ -16,6 +16,7 @@ import ( "github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/common/validator" "github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/pro" "github.com/kubeshop/testkube/cmd/kubectl-testkube/config" + "github.com/kubeshop/testkube/cmd/tcl/kubectl-testkube/devbox" "github.com/kubeshop/testkube/pkg/telemetry" "github.com/kubeshop/testkube/pkg/ui" ) @@ -65,6 +66,8 @@ func init() { RootCmd.AddCommand(NewDockerCmd()) RootCmd.AddCommand(pro.NewLoginCmd()) + RootCmd.AddCommand(devbox.NewDevBoxCommand()) + RootCmd.SetHelpCommand(NewHelpCmd()) } diff --git a/cmd/tcl/devbox-binary-storage/main.go b/cmd/tcl/devbox-binary-storage/main.go new file mode 100644 index 0000000000..b387472286 --- /dev/null +++ b/cmd/tcl/devbox-binary-storage/main.go @@ -0,0 +1,227 @@ +// Copyright 2024 Testkube. +// +// Licensed as a Testkube Pro file under the Testkube Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt + +package main + +import ( + "bytes" + "compress/gzip" + "crypto/sha256" + "fmt" + "io" + "net/http" + "os" + "os/signal" + "path/filepath" + "strings" + "sync" + "syscall" + + "github.com/dustin/go-humanize" + + "github.com/kubeshop/testkube/cmd/tcl/kubectl-testkube/devbox/devutils" +) + +var ( + locks = make(map[string]*sync.RWMutex) + locksMu sync.Mutex + hashCache = make(map[string]string) +) + +func getLock(filePath string) *sync.RWMutex { + locksMu.Lock() + defer locksMu.Unlock() + if locks[filePath] == nil { + locks[filePath] = new(sync.RWMutex) + } + return locks[filePath] +} + +func rebuildHash(filePath string) { + hashCache[filePath] = "" + f, err := os.Open(filePath) + if err != nil { + return + } + defer f.Close() + + h := sha256.New() + if _, err := io.Copy(h, f); err == nil { + hashCache[filePath] = fmt.Sprintf("%x", h.Sum(nil)) + } +} + +func getHash(filePath string) string { + if hashCache[filePath] == "" { + rebuildHash(filePath) + } + return hashCache[filePath] +} + +func main() { + storagePath := "/storage" + http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + filePath := filepath.Clean(strings.TrimPrefix(r.URL.Path, "/")) + if filePath == "" { + w.WriteHeader(http.StatusNotFound) + return + } + localPath := filepath.Join(storagePath, filePath) + if r.Method == http.MethodGet { + getLock(filePath).RLock() + defer getLock(filePath).RUnlock() + + file, err := os.Open(localPath) + if err != nil { + w.WriteHeader(http.StatusNotFound) + return + } + stat, err := file.Stat() + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/octet-stream") + w.Header().Set("Content-Length", fmt.Sprintf("%d", stat.Size())) + w.WriteHeader(http.StatusOK) + io.Copy(w, file) + return + } else if r.Method == http.MethodPost { + getLock(filePath).Lock() + defer getLock(filePath).Unlock() + + body, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Println("failed reading body", err) + return + } + if r.ContentLength != int64(len(body)) { + w.WriteHeader(http.StatusBadRequest) + return + } + if r.Header.Get("Content-Encoding") == "gzip" { + gz, err := gzip.NewReader(bytes.NewBuffer(body)) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Println("failed reading body into gzip", err) + return + } + body, err = io.ReadAll(gz) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Println("failed reading back data from gzip stream", err) + return + } + } + + err = os.WriteFile(localPath, body, 0666) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Println("failed to write file", err) + return + } + + h := sha256.New() + if _, err := io.Copy(h, bytes.NewBuffer(body)); err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Println("failed to build hash", err) + } + hashCache[filePath] = fmt.Sprintf("%x", h.Sum(nil)) + + fmt.Println("saved file", filePath, humanize.Bytes(uint64(len(body)))) + w.WriteHeader(http.StatusOK) + return + } else if r.Method == http.MethodPatch { + getLock(filePath).Lock() + defer getLock(filePath).Unlock() + + body, err := io.ReadAll(r.Body) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Println("failed reading body", err) + return + } + if r.ContentLength != int64(len(body)) { + w.WriteHeader(http.StatusBadRequest) + return + } + if r.Header.Get("Content-Encoding") == "gzip" { + gz, err := gzip.NewReader(bytes.NewBuffer(body)) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Println("failed reading body into gzip", err) + return + } + body, err = io.ReadAll(gz) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Println("failed reading back data from gzip stream", err) + return + } + } + + // Verify if patch can be applied + if r.Header.Get("X-Prev-Hash") != getHash(filePath) { + w.WriteHeader(http.StatusConflict) + return + } + + // Apply patch + prevFile, err := os.ReadFile(localPath) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Println("failed reading existing file", err) + return + } + patch := devutils.NewBinaryPatchFromBytes(body) + file := patch.Apply(prevFile) + + h := sha256.New() + if _, err := io.Copy(h, bytes.NewBuffer(file)); err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Println("failed to build hash", err) + return + } + + // Validate hash + nextHash := fmt.Sprintf("%x", h.Sum(nil)) + if r.Header.Get("X-Hash") != nextHash { + w.WriteHeader(http.StatusBadRequest) + fmt.Println("after applying patch result has different hash than expected", err) + return + } + fmt.Println("Expected hash", r.Header.Get("X-Hash"), "got", nextHash) + err = os.WriteFile(localPath, file, 0666) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + fmt.Println("failed to write file", err) + return + } + hashCache[filePath] = nextHash + w.WriteHeader(http.StatusOK) + return + } + w.WriteHeader(http.StatusMethodNotAllowed) + }) + + stopSignal := make(chan os.Signal, 1) + signal.Notify(stopSignal, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-stopSignal + os.Exit(0) + }() + + fmt.Println("Starting server...") + + panic(http.ListenAndServe(":8080", nil)) +} diff --git a/cmd/tcl/devbox-mutating-webhook/main.go b/cmd/tcl/devbox-mutating-webhook/main.go new file mode 100644 index 0000000000..4fa8a51509 --- /dev/null +++ b/cmd/tcl/devbox-mutating-webhook/main.go @@ -0,0 +1,185 @@ +// Copyright 2024 Testkube. +// +// Licensed as a Testkube Pro file under the Testkube Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt + +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "os" + "os/signal" + "strings" + "syscall" + + "github.com/wI2L/jsondiff" + admissionv1 "k8s.io/api/admission/v1" + corev1 "k8s.io/api/core/v1" + + "github.com/kubeshop/testkube/internal/common" + "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/constants" +) + +func main() { + http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + }) + + http.HandleFunc("/mutate", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + if r.Header.Get("Content-Type") != "application/json" { + http.Error(w, "invalid content type", http.StatusBadRequest) + return + } + + initImage := os.Args[1] + toolkitImage := os.Args[2] + + buf := new(bytes.Buffer) + buf.ReadFrom(r.Body) + body := buf.Bytes() + + if len(body) == 0 { + http.Error(w, "invalid request", http.StatusBadRequest) + return + } + + var review admissionv1.AdmissionReview + if err := json.Unmarshal(body, &review); err != nil { + http.Error(w, fmt.Sprintf("invalid request: %s", err), http.StatusBadRequest) + return + } + + if review.Request == nil { + http.Error(w, "invalid request: empty", http.StatusBadRequest) + return + } + if review.Request.Kind.Kind != "Pod" { + http.Error(w, fmt.Sprintf("invalid resource: %s", review.Request.Kind.Kind), http.StatusBadRequest) + return + } + + pod := corev1.Pod{} + if err := json.Unmarshal(review.Request.Object.Raw, &pod); err != nil { + http.Error(w, fmt.Sprintf("invalid pod provided: %s", err), http.StatusBadRequest) + return + } + originalPod := pod.DeepCopy() + + // Apply changes + if pod.Labels[constants.ResourceIdLabelName] != "" { + usesToolkit := false + for _, c := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { + if c.Image == toolkitImage { + usesToolkit = true + } + } + + pod.Spec.Volumes = append(pod.Spec.Volumes, corev1.Volume{ + Name: "devbox", + VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}, + }) + + script := ` + set -e + /.tktw-bin/wget -O /.tk-devbox/init http://devbox-binary:8080/init || exit 1 + chmod 777 /.tk-devbox/init + ls -lah /.tk-devbox` + if usesToolkit { + script = ` + set -e + /.tktw-bin/wget -O /.tk-devbox/init http://devbox-binary:8080/init || exit 1 + /.tktw-bin/wget -O /.tk-devbox/toolkit http://devbox-binary:8080/toolkit || exit 1 + chmod 777 /.tk-devbox/init + chmod 777 /.tk-devbox/toolkit + ls -lah /.tk-devbox` + } + + pod.Spec.InitContainers = append([]corev1.Container{{ + Name: "devbox-init", + Image: initImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{"/bin/sh", "-c"}, + Args: []string{script}, + }}, pod.Spec.InitContainers...) + + // TODO: Handle it better, to not be ambiguous + pod.Annotations[constants.SpecAnnotationName] = strings.ReplaceAll(pod.Annotations[constants.SpecAnnotationName], "\"/toolkit\"", "\"/.tk-devbox/toolkit\"") + pod.Annotations[constants.SpecAnnotationName] = strings.ReplaceAll(pod.Annotations[constants.SpecAnnotationName], "\"/.tktw/toolkit\"", "\"/.tk-devbox/toolkit\"") + + for i := range pod.Spec.InitContainers { + if (pod.Spec.InitContainers[i].Image == toolkitImage || pod.Spec.InitContainers[i].Image == initImage) && pod.Spec.InitContainers[i].Command[0] == "/init" { + pod.Spec.InitContainers[i].Command[0] = "/.tk-devbox/init" + } + if pod.Spec.InitContainers[i].Command[0] == "/.tktw/init" { + pod.Spec.InitContainers[i].Command[0] = "/.tk-devbox/init" + } + } + for i := range pod.Spec.Containers { + if (pod.Spec.Containers[i].Image == toolkitImage || pod.Spec.Containers[i].Image == initImage) && pod.Spec.Containers[i].Command[0] == "/init" { + pod.Spec.Containers[i].Command[0] = "/.tk-devbox/init" + } + if pod.Spec.Containers[i].Command[0] == "/.tktw/init" { + pod.Spec.Containers[i].Command[0] = "/.tk-devbox/init" + } + } + + for i := range pod.Spec.InitContainers { + pod.Spec.InitContainers[i].VolumeMounts = append(pod.Spec.InitContainers[i].VolumeMounts, corev1.VolumeMount{ + Name: "devbox", + MountPath: "/.tk-devbox", + }) + } + for i := range pod.Spec.Containers { + pod.Spec.Containers[i].VolumeMounts = append(pod.Spec.Containers[i].VolumeMounts, corev1.VolumeMount{ + Name: "devbox", + MountPath: "/.tk-devbox", + }) + } + } + + patch, err := jsondiff.Compare(originalPod, pod) + if err != nil { + http.Error(w, fmt.Sprintf("failed to build patch for changes: %s", err), http.StatusInternalServerError) + return + } + + serializedPatch, err := json.Marshal(patch) + if err != nil { + http.Error(w, fmt.Sprintf("failed to serialize patch for changes: %s", err), http.StatusInternalServerError) + return + } + + review.Response = &admissionv1.AdmissionResponse{ + UID: review.Request.UID, + Allowed: true, + PatchType: common.Ptr(admissionv1.PatchTypeJSONPatch), + Patch: serializedPatch, + } + + serializedResponse, err := json.Marshal(review) + if err != nil { + http.Error(w, fmt.Sprintf("cannot marshal result: %s", err), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + fmt.Fprintf(w, "%s", serializedResponse) + }) + + stopSignal := make(chan os.Signal, 1) + signal.Notify(stopSignal, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-stopSignal + os.Exit(0) + }() + + fmt.Println("Starting server...") + + panic(http.ListenAndServeTLS(":8443", "/certs/tls.crt", "/certs/tls.key", nil)) +} diff --git a/cmd/tcl/kubectl-testkube/devbox/README.md b/cmd/tcl/kubectl-testkube/devbox/README.md new file mode 100644 index 0000000000..b41c8818f8 --- /dev/null +++ b/cmd/tcl/kubectl-testkube/devbox/README.md @@ -0,0 +1,69 @@ +# Development Box - TCL Licensed + +This utility is used to help with development of the Agent features (like Test Workflows). + +## How it works + +* It takes current Testkube CLI credentials and create development environment inside +* It deploys the Agent into the current cluster + * Test Triggers are disabled + * Webhooks are disabled + * Legacy Tests and Test Suites are disabled + * It's not using Helm Chart, so default templates are not available +* For live changes, it deploys Interceptor and Binary Storage into the current cluster + * Binary Storage stores latest binaries for the Agent, Toolkit and Init Process + * Binary Storage is optimized for patching binaries with incremental builds (to avoid sending the whole binary, when only small part is changed) + * Interceptor loads the Toolkit and Init Process from the Object Storage into every Test Workflow Execution pod + +## Usage + +* Login to Testkube CLI, like `testkube login` +* Run `go run cmd/kubectl-testkube/main.go devbox` + * It's worth to create alias for that in own `.bashrc` or `.bash_profile` + * It's worth to pass a devbox name, like `-n dawid`, so it's not using random name + +The CLI will print a dashboard link for the selected environment. + +## Why? + +It's a fast way to get live changes during the development: +* initial deployment takes up to 60 seconds +* continuous deployments take 1-10 seconds (depending on changes and network bandwidth) +* the Execution performance is not much worse (it's just running single container before, that is only fetching up to 100MB from local Object Storage) + +## Parameters + +Most important parameters are `-n, --name` for devbox static name, +and `-s, --sync` for synchronising Test Workflow and Test Workflow Template CRDs from the file system. + +```shell +Usage: + testkube devbox [flags] + +Aliases: + devbox, dev + +Flags: + --agent-image string base agent image (default "kubeshop/testkube-api-server:latest") + --init-image string base init image (default "kubeshop/testkube-tw-init:latest") + -n, --name string devbox name (default "1730107481990508000") + -s, --sync strings synchronise resources at paths + --toolkit-image string base toolkit image (default "kubeshop/testkube-tw-toolkit:latest") + -o, --open open dashboard in browser +``` + +## Example + +```shell +# Initialize alias +tk() { + cd ~/projects/testkube + go run cmd/kubectl-testkube/main.go $@ +} + +# Select the proper cluster to deploy the devbox +kubectx cloud-dev + +# Run development box, synchronising all the Test Workflows from 'test' directory in Testkube repository +tk devbox -n dawid -s test +``` diff --git a/cmd/tcl/kubectl-testkube/devbox/command.go b/cmd/tcl/kubectl-testkube/devbox/command.go new file mode 100644 index 0000000000..80122c636b --- /dev/null +++ b/cmd/tcl/kubectl-testkube/devbox/command.go @@ -0,0 +1,611 @@ +// Copyright 2024 Testkube. +// +// Licensed as a Testkube Pro file under the Testkube Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt + +package devbox + +import ( + "context" + "fmt" + "os" + "os/signal" + "path/filepath" + "strings" + "sync" + "syscall" + "time" + + "github.com/dustin/go-humanize" + "github.com/gookit/color" + "github.com/pkg/errors" + "github.com/pterm/pterm" + "github.com/savioxavier/termlink" + openurl "github.com/skratchdot/open-golang/open" + "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" + + "github.com/kubeshop/testkube/cmd/kubectl-testkube/config" + "github.com/kubeshop/testkube/cmd/tcl/kubectl-testkube/devbox/devutils" + "github.com/kubeshop/testkube/pkg/cloud/client" + "github.com/kubeshop/testkube/pkg/mapper/testworkflows" + "github.com/kubeshop/testkube/pkg/ui" +) + +const ( + InterceptorMainPath = "cmd/tcl/devbox-mutating-webhook/main.go" + BinaryStorageMainPath = "cmd/tcl/devbox-binary-storage/main.go" + AgentMainPath = "cmd/api-server/main.go" + ToolkitMainPath = "cmd/testworkflow-toolkit/main.go" + InitProcessMainPath = "cmd/testworkflow-init/main.go" +) + +func NewDevBoxCommand() *cobra.Command { + var ( + rawDevboxName string + open bool + baseAgentImage string + baseInitImage string + baseToolkitImage string + syncResources []string + ) + + cmd := &cobra.Command{ + Use: "devbox", + Hidden: true, + Aliases: []string{"dev"}, + Run: func(cmd *cobra.Command, args []string) { + ctx, ctxCancel := context.WithCancel(context.Background()) + stopSignal := make(chan os.Signal, 1) + signal.Notify(stopSignal, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-stopSignal + ctxCancel() + }() + + startTs := time.Now() + + // Find repository root + rootDir := devutils.FindDirContaining(InterceptorMainPath, AgentMainPath, ToolkitMainPath, InitProcessMainPath) + if rootDir == "" { + ui.Fail(errors.New("testkube repository not found")) + } + + // Connect to cluster + cluster, err := devutils.NewCluster() + if err != nil { + ui.Fail(err) + } + + // Connect to Testkube + cfg, err := config.Load() + if err != nil { + pterm.Error.Printfln("Failed to load config file: %s", err.Error()) + return + } + cloud, err := devutils.NewCloud(cfg.CloudContext, cmd) + if err != nil { + pterm.Error.Printfln("Failed to connect to Cloud: %s", err.Error()) + return + } + + // Detect obsolete devbox environments + if obsolete := cloud.ListObsolete(); len(obsolete) > 0 { + count := 0 + for _, env := range obsolete { + err := cloud.DeleteEnvironment(env.Id) + if err != nil { + fmt.Printf("Failed to delete obsolete devbox environment (%s): %s\n", env.Name, err.Error()) + continue + } + cluster.Namespace(env.Name).Destroy() + count++ + } + fmt.Printf("Deleted %d/%d obsolete devbox environments\n", count, len(obsolete)) + } + + // Initialize bare cluster resources + namespace := cluster.Namespace(fmt.Sprintf("devbox-%s", rawDevboxName)) + interceptorPod := namespace.Pod("devbox-interceptor") + agentPod := namespace.Pod("devbox-agent") + binaryStoragePod := namespace.Pod("devbox-binary") + + // Initialize binaries + interceptorBin := devutils.NewBinary(InterceptorMainPath, cluster.OperatingSystem(), cluster.Architecture()) + binaryStorageBin := devutils.NewBinary(BinaryStorageMainPath, cluster.OperatingSystem(), cluster.Architecture()) + agentBin := devutils.NewBinary(AgentMainPath, cluster.OperatingSystem(), cluster.Architecture()) + toolkitBin := devutils.NewBinary(ToolkitMainPath, cluster.OperatingSystem(), cluster.Architecture()) + initProcessBin := devutils.NewBinary(InitProcessMainPath, cluster.OperatingSystem(), cluster.Architecture()) + + // Initialize clean up + defer interceptorBin.Close() + defer binaryStorageBin.Close() + defer agentBin.Close() + defer toolkitBin.Close() + defer initProcessBin.Close() + + // Initialize wrappers over cluster resources + interceptor := devutils.NewInterceptor(interceptorPod, baseInitImage, baseToolkitImage, interceptorBin) + agent := devutils.NewAgent(agentPod, cloud, baseAgentImage, baseInitImage, baseToolkitImage) + binaryStorage := devutils.NewBinaryStorage(binaryStoragePod, binaryStorageBin) + var env *client.Environment + + // Cleanup + cleanupCh := make(chan struct{}) + var cleanupMu sync.Mutex + cleanup := func() { + cleanupMu.Lock() + + interceptorBin.Close() + binaryStorageBin.Close() + agentBin.Close() + toolkitBin.Close() + initProcessBin.Close() + + fmt.Println("Deleting namespace...") + if err := namespace.Destroy(); err != nil { + fmt.Println("Failed to destroy namespace:", err.Error()) + } + if env != nil && env.Id != "" { + fmt.Println("Deleting environment...") + if err = cloud.DeleteEnvironment(env.Id); err != nil { + fmt.Println("Failed to delete environment:", err.Error()) + } + } + } + go func() { + <-ctx.Done() + cleanup() + close(cleanupCh) + }() + + fail := func(err error) { + fmt.Println("Error:", err.Error()) + cleanup() + os.Exit(1) + } + + // Create environment in the Cloud + fmt.Println("Creating environment in Cloud...") + env, err = cloud.CreateEnvironment(namespace.Name()) + if err != nil { + fail(errors.Wrap(err, "failed to create Cloud environment")) + } + + // Create namespace + fmt.Println("Creating namespace...") + if err = namespace.Create(); err != nil { + fail(errors.Wrap(err, "failed to create namespace")) + } + + g, _ := errgroup.WithContext(ctx) + binaryStorageReadiness := make(chan struct{}) + + // Deploying interceptor + g.Go(func() error { + fmt.Println("[Interceptor] Building...") + its := time.Now() + _, err := interceptorBin.Build(ctx) + if err != nil { + color.Red.Printf("[Interceptor] Build failed in %s. Error: %s\n", time.Since(its).Truncate(time.Millisecond), err) + } else { + fmt.Printf("[Interceptor] Built in %s.\n", time.Since(its).Truncate(time.Millisecond)) + } + fmt.Println("[Interceptor] Deploying...") + if err = interceptor.Create(ctx); err != nil { + fail(errors.Wrap(err, "failed to create interceptor")) + } + fmt.Println("[Interceptor] Waiting for readiness...") + if err = interceptor.WaitForReady(ctx); err != nil { + fail(errors.Wrap(err, "failed to create interceptor")) + } + fmt.Println("[Interceptor] Enabling...") + if err = interceptor.Enable(ctx); err != nil { + fail(errors.Wrap(err, "failed to enable interceptor")) + } + fmt.Println("[Interceptor] Ready") + return nil + }) + + // Deploying binary storage + g.Go(func() error { + fmt.Println("[Binary Storage] Building...") + its := time.Now() + _, err := binaryStorageBin.Build(ctx) + if err != nil { + color.Red.Printf("[Binary Storage] Build failed in %s. Error: %s\n", time.Since(its).Truncate(time.Millisecond), err) + } else { + fmt.Printf("[Binary Storage] Built in %s.\n", time.Since(its).Truncate(time.Millisecond)) + } + fmt.Println("[Binary Storage] Deploying...") + if err = binaryStorage.Create(ctx); err != nil { + fail(errors.Wrap(err, "failed to create binary storage")) + } + fmt.Println("[Binary Storage] Waiting for readiness...") + if err = binaryStorage.WaitForReady(ctx); err != nil { + fail(errors.Wrap(err, "failed to create binary storage")) + } + fmt.Println("[Binary Storage] Ready") + close(binaryStorageReadiness) + return nil + }) + + // Deploying the Agent + g.Go(func() error { + fmt.Println("[Agent] Building...") + its := time.Now() + _, err := agentBin.Build(ctx) + if err != nil { + color.Red.Printf("[Agent] Build failed in %s. Error: %s\n", time.Since(its).Truncate(time.Millisecond), err) + } else { + fmt.Printf("[Agent] Built in %s (size: %s).\n", time.Since(its).Truncate(time.Millisecond), agentBin.Size()) + } + <-binaryStorageReadiness + fmt.Println("[Agent] Uploading...") + its = time.Now() + _, size, err := binaryStorage.Upload(ctx, "testkube-api-server", agentBin) + if err != nil { + color.Red.Printf("[Agent] Upload failed in %s. Error: %s\n", time.Since(its).Truncate(time.Millisecond), err) + } else { + fmt.Printf("[Agent] Uploaded %s in %s.\n", humanize.Bytes(uint64(size)), time.Since(its).Truncate(time.Millisecond)) + } + fmt.Println("[Agent] Deploying...") + if err = agent.Create(ctx, env); err != nil { + fail(errors.Wrap(err, "failed to create agent")) + } + fmt.Println("[Agent] Waiting for readiness...") + if err = agent.WaitForReady(ctx); err != nil { + fail(errors.Wrap(err, "failed to create agent")) + } + fmt.Println("[Agent] Ready...") + return nil + }) + + // Building Toolkit + g.Go(func() error { + fmt.Println("[Toolkit] Building...") + its := time.Now() + _, err := toolkitBin.Build(ctx) + if err != nil { + color.Red.Printf("[Toolkit] Build failed in %s. Error: %s\n", time.Since(its).Truncate(time.Millisecond), err) + } else { + fmt.Printf("[Toolkit] Built in %s (size: %s).\n", time.Since(its).Truncate(time.Millisecond), toolkitBin.Size()) + } + <-binaryStorageReadiness + fmt.Println("[Toolkit] Uploading...") + its = time.Now() + _, size, err := binaryStorage.Upload(ctx, "toolkit", toolkitBin) + if err != nil { + color.Red.Printf("[Toolkit] Upload failed in %s. Error: %s\n", time.Since(its).Truncate(time.Millisecond), err) + } else { + fmt.Printf("[Toolkit] Uploaded %s in %s.\n", humanize.Bytes(uint64(size)), time.Since(its).Truncate(time.Millisecond)) + } + return nil + }) + + // Building Init Process + g.Go(func() error { + fmt.Println("[Init Process] Building...") + its := time.Now() + _, err := initProcessBin.Build(ctx) + if err != nil { + color.Red.Printf("[Init Process] Build failed in %s. Error: %s\n", time.Since(its).Truncate(time.Millisecond), err) + } else { + fmt.Printf("[Init Process] Built in %s (size: %s).\n", time.Since(its).Truncate(time.Millisecond), initProcessBin.Size()) + } + <-binaryStorageReadiness + fmt.Println("[Init Process] Uploading...") + its = time.Now() + _, size, err := binaryStorage.Upload(ctx, "init", initProcessBin) + if err != nil { + color.Red.Printf("[Init Process] Upload failed in %s. Error: %s\n", time.Since(its).Truncate(time.Millisecond), err) + } else { + fmt.Printf("[Init Process] Uploaded %s in %s.\n", humanize.Bytes(uint64(size)), time.Since(its).Truncate(time.Millisecond)) + } + return nil + }) + + g.Wait() + + // Live synchronisation + fmt.Println("Creating file system watcher...") + goWatcher, err := devutils.NewFsWatcher(rootDir) + if err != nil { + fail(errors.Wrap(err, "failed to watch Testkube repository")) + } + + if len(syncResources) > 0 { + fmt.Println("Loading Test Workflows and Templates...") + sync := devutils.NewCRDSync() + + // Initial run + for _, path := range syncResources { + _ = sync.Load(path) + } + fmt.Printf("Started synchronising %d Test Workflows and %d Templates...\n", sync.WorkflowsCount(), sync.TemplatesCount()) + + // Propagate changes from FS to CRDSync + yamlWatcher, err := devutils.NewFsWatcher(syncResources...) + if err != nil { + fail(errors.Wrap(err, "failed to watch for YAML changes")) + } + go func() { + for { + if ctx.Err() != nil { + break + } + file, err := yamlWatcher.Next(ctx) + if !strings.HasSuffix(file, ".yml") && !strings.HasSuffix(file, ".yaml") { + continue + } + if err == nil { + _ = sync.Load(file) + } + } + }() + + workflowLabel := func(name string) string { + if !termlink.SupportsHyperlinks() { + return name + } + return name + " " + termlink.ColorLink("(open)", cloud.DashboardUrl(env.Slug, fmt.Sprintf("dashboard/test-workflows/%s", name)), "magenta") + } + + templateLabel := func(name string) string { + if !termlink.SupportsHyperlinks() { + return name + } + return termlink.Link(name, cloud.DashboardUrl(env.Slug, fmt.Sprintf("dashboard/test-workflow-templates/%s", name))) + } + + // Propagate changes from CRDSync to Cloud + go func() { + parallel := make(chan struct{}, 10) + process := func(update *devutils.CRDSyncUpdate) { + parallel <- struct{}{} + switch update.Op { + case devutils.CRDSyncUpdateOpCreate: + client, err := cloud.Client(env.Id) + if err != nil { + fail(errors.Wrap(err, "failed to create cloud client")) + } + if update.Template != nil { + update.Template.Spec.Events = nil // ignore Cronjobs + _, err := client.CreateTestWorkflowTemplate(*testworkflows.MapTemplateKubeToAPI(update.Template)) + if err != nil { + fmt.Printf("CRD Sync: creating template: %s: error: %s\n", templateLabel(update.Template.Name), err.Error()) + } else { + fmt.Println("CRD Sync: created template:", templateLabel(update.Template.Name)) + } + } else { + update.Workflow.Spec.Events = nil // ignore Cronjobs + _, err := client.CreateTestWorkflow(*testworkflows.MapKubeToAPI(update.Workflow)) + if err != nil { + fmt.Printf("CRD Sync: creating workflow: %s: error: %s\n", workflowLabel(update.Workflow.Name), err.Error()) + } else { + fmt.Println("CRD Sync: created workflow:", workflowLabel(update.Workflow.Name)) + } + } + case devutils.CRDSyncUpdateOpUpdate: + client, err := cloud.Client(env.Id) + if err != nil { + fail(errors.Wrap(err, "failed to create cloud client")) + } + if update.Template != nil { + update.Template.Spec.Events = nil // ignore Cronjobs + _, err := client.UpdateTestWorkflowTemplate(*testworkflows.MapTemplateKubeToAPI(update.Template)) + if err != nil { + fmt.Printf("CRD Sync: updating template: %s: error: %s\n", templateLabel(update.Template.Name), err.Error()) + } else { + fmt.Println("CRD Sync: updated template:", templateLabel(update.Template.Name)) + } + } else { + update.Workflow.Spec.Events = nil + _, err := client.UpdateTestWorkflow(*testworkflows.MapKubeToAPI(update.Workflow)) + if err != nil { + fmt.Printf("CRD Sync: updating workflow: %s: error: %s\n", workflowLabel(update.Workflow.Name), err.Error()) + } else { + fmt.Println("CRD Sync: updated workflow:", workflowLabel(update.Workflow.Name)) + } + } + case devutils.CRDSyncUpdateOpDelete: + client, err := cloud.Client(env.Id) + if err != nil { + fail(errors.Wrap(err, "failed to create cloud client")) + } + if update.Template != nil { + err := client.DeleteTestWorkflowTemplate(update.Template.Name) + if err != nil { + fmt.Printf("CRD Sync: deleting template: %s: error: %s\n", templateLabel(update.Template.Name), err.Error()) + } else { + fmt.Println("CRD Sync: deleted template:", templateLabel(update.Template.Name)) + } + } else { + err := client.DeleteTestWorkflow(update.Workflow.Name) + if err != nil { + fmt.Printf("CRD Sync: deleting workflow: %s: error: %s\n", workflowLabel(update.Workflow.Name), err.Error()) + } else { + fmt.Println("CRD Sync: deleted workflow:", workflowLabel(update.Workflow.Name)) + } + } + } + <-parallel + } + for { + if ctx.Err() != nil { + break + } + update, err := sync.Next(ctx) + if err != nil { + continue + } + go process(update) + } + }() + } + + fmt.Println("Waiting for file changes...") + + rebuild := func(ctx context.Context) { + g, _ := errgroup.WithContext(ctx) + ts := time.Now() + fmt.Println(color.Yellow.Render("Rebuilding applications...")) + g.Go(func() error { + its := time.Now() + _, err := agentBin.Build(ctx) + if ctx.Err() != nil { + return nil + } + if err != nil { + color.Red.Printf(" Agent: build failed in %s. Error: %s\n", time.Since(its).Truncate(time.Millisecond), err) + return err + } + fmt.Printf(" Agent: built in %s (size: %s).\n", time.Since(its).Truncate(time.Millisecond), agentBin.Size()) + + its = time.Now() + cached, size, err := binaryStorage.Upload(ctx, "testkube-api-server", agentBin) + if ctx.Err() != nil { + return nil + } + if err != nil { + color.Red.Printf(" Agent: upload failed in %s. Error: %s\n", time.Since(its).Truncate(time.Millisecond), err) + return err + } + if cached { + fmt.Printf(" Agent: no changes.\n") + } else { + fmt.Printf(" Agent: uploaded %s in %s.\n", humanize.Bytes(uint64(size)), time.Since(its).Truncate(time.Millisecond)) + + // Restart only if it has changes + err := agentPod.Restart(ctx) + if ctx.Err() != nil { + return nil + } + if err == nil { + fmt.Printf(" Agent: restarted. Waiting for readiness...\n") + _ = agentPod.RefreshData(ctx) + err = agentPod.WaitForReady(ctx) + if ctx.Err() != nil { + return nil + } + if err == nil { + fmt.Printf(" Agent: ready again\n") + } else { + fail(errors.Wrap(err, "failed to wait for agent pod readiness")) + } + } else { + fmt.Printf(" Agent: restart failed: %s\n", err.Error()) + } + } + return nil + }) + g.Go(func() error { + its := time.Now() + _, err := toolkitBin.Build(ctx) + if ctx.Err() != nil { + return nil + } + if err != nil { + color.Red.Printf(" Toolkit: build failed in %s. Error: %s\n", time.Since(its).Truncate(time.Millisecond), err) + return err + } + fmt.Printf(" Toolkit: built in %s (size: %s).\n", time.Since(its).Truncate(time.Millisecond), toolkitBin.Size()) + + its = time.Now() + cached, size, err := binaryStorage.Upload(ctx, "toolkit", toolkitBin) + if ctx.Err() != nil { + return nil + } + if err != nil { + color.Red.Printf(" Toolkit: upload failed in %s. Error: %s\n", time.Since(its).Truncate(time.Millisecond), err) + return err + } + if cached { + fmt.Printf(" Toolkit: no changes.\n") + } else { + fmt.Printf(" Toolkit: uploaded %s in %s.\n", humanize.Bytes(uint64(size)), time.Since(its).Truncate(time.Millisecond)) + } + return nil + }) + g.Go(func() error { + its := time.Now() + _, err := initProcessBin.Build(ctx) + if ctx.Err() != nil { + return nil + } + if err != nil { + color.Red.Printf(" Init Process: build failed in %s. Error: %s\n", time.Since(its).Truncate(time.Millisecond), err) + return err + } + fmt.Printf(" Init Process: built in %s (size: %s).\n", time.Since(its).Truncate(time.Millisecond), initProcessBin.Size()) + + its = time.Now() + cached, size, err := binaryStorage.Upload(ctx, "init", initProcessBin) + if ctx.Err() != nil { + return nil + } + if err != nil { + color.Red.Printf(" Init Process: upload failed in %s. Error: %s\n", time.Since(its).Truncate(time.Millisecond), err) + return err + } + if cached { + fmt.Printf(" Init Process: no changes.\n") + } else { + fmt.Printf(" Init Process: uploaded %s in %s.\n", humanize.Bytes(uint64(size)), time.Since(its).Truncate(time.Millisecond)) + } + return nil + }) + err = g.Wait() + if ctx.Err() == nil { + color.Green.Println("Applications updated in", time.Since(ts).Truncate(time.Millisecond)) + } + } + + color.Green.Println("Development box is ready. Took", time.Since(startTs).Truncate(time.Millisecond)) + if termlink.SupportsHyperlinks() { + fmt.Println("Dashboard:", termlink.Link(cloud.DashboardUrl(env.Slug, "dashboard/test-workflows"), cloud.DashboardUrl(env.Slug, "dashboard/test-workflows"))) + } else { + fmt.Println("Dashboard:", cloud.DashboardUrl(env.Slug, "dashboard/test-workflows")) + } + if open { + openurl.Run(cloud.DashboardUrl(env.Slug, "dashboard/test-workflows")) + } + + _, rebuildCtxCancel := context.WithCancel(ctx) + for { + if ctx.Err() != nil { + break + } + file, err := goWatcher.Next(ctx) + if err != nil { + fmt.Println("file system watcher error:", err.Error()) + } else if strings.HasSuffix(file, ".go") { + relPath, _ := filepath.Rel(rootDir, file) + if relPath == "" { + relPath = file + } + fmt.Printf("%s changed\n", relPath) + rebuildCtxCancel() + var rebuildCtx context.Context + rebuildCtx, rebuildCtxCancel = context.WithCancel(ctx) + go rebuild(rebuildCtx) + } + } + rebuildCtxCancel() + + <-cleanupCh + }, + } + + cmd.Flags().StringVarP(&rawDevboxName, "name", "n", fmt.Sprintf("%d", time.Now().UnixNano()), "devbox name") + cmd.Flags().StringSliceVarP(&syncResources, "sync", "s", nil, "synchronise resources at paths") + cmd.Flags().BoolVarP(&open, "open", "o", false, "open dashboard in browser") + cmd.Flags().StringVar(&baseInitImage, "init-image", "kubeshop/testkube-tw-init:latest", "base init image") + cmd.Flags().StringVar(&baseToolkitImage, "toolkit-image", "kubeshop/testkube-tw-toolkit:latest", "base toolkit image") + cmd.Flags().StringVar(&baseAgentImage, "agent-image", "kubeshop/testkube-api-server:latest", "base agent image") + + return cmd +} diff --git a/cmd/tcl/kubectl-testkube/devbox/devutils/agent.go b/cmd/tcl/kubectl-testkube/devbox/devutils/agent.go new file mode 100644 index 0000000000..c0440f33c6 --- /dev/null +++ b/cmd/tcl/kubectl-testkube/devbox/devutils/agent.go @@ -0,0 +1,124 @@ +// Copyright 2024 Testkube. +// +// Licensed as a Testkube Pro file under the Testkube Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt + +package devutils + +import ( + "context" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" + + "github.com/kubeshop/testkube/internal/common" + "github.com/kubeshop/testkube/pkg/cloud/client" +) + +type Agent struct { + pod *PodObject + cloud *cloudObj + agentImage string + initProcessImage string + toolkitImage string +} + +func NewAgent(pod *PodObject, cloud *cloudObj, agentImage, initProcessImage, toolkitImage string) *Agent { + return &Agent{ + pod: pod, + cloud: cloud, + agentImage: agentImage, + initProcessImage: initProcessImage, + toolkitImage: toolkitImage, + } +} + +func (r *Agent) Create(ctx context.Context, env *client.Environment) error { + tlsInsecure := "false" + if r.cloud.AgentInsecure() { + tlsInsecure = "true" + } + err := r.pod.Create(ctx, &corev1.Pod{ + Spec: corev1.PodSpec{ + TerminationGracePeriodSeconds: common.Ptr(int64(1)), + Volumes: []corev1.Volume{ + {Name: "tmp", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}}, + {Name: "nats", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}}, + {Name: "devbox", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}}, + }, + ServiceAccountName: "devbox-account", + Containers: []corev1.Container{ + { + Name: "server", + Image: r.agentImage, + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{"/bin/sh", "-c"}, + Args: []string{` + wget -O /.tk-devbox/testkube-api-server http://devbox-binary:8080/testkube-api-server || exit 1 + chmod 777 /.tk-devbox/testkube-api-server + exec /.tk-devbox/testkube-api-server`}, + Env: []corev1.EnvVar{ + {Name: "NATS_EMBEDDED", Value: "true"}, + {Name: "APISERVER_PORT", Value: "8088"}, + {Name: "APISERVER_FULLNAME", Value: "devbox-agent"}, + {Name: "DISABLE_TEST_TRIGGERS", Value: "true"}, + {Name: "DISABLE_WEBHOOKS", Value: "true"}, + {Name: "DISABLE_DEPRECATED_TESTS", Value: "true"}, + {Name: "TESTKUBE_ANALYTICS_ENABLED", Value: "false"}, + {Name: "TESTKUBE_NAMESPACE", Value: r.pod.Namespace()}, + {Name: "JOB_SERVICE_ACCOUNT_NAME", Value: "devbox-account"}, + {Name: "TESTKUBE_ENABLE_IMAGE_DATA_PERSISTENT_CACHE", Value: "true"}, + {Name: "TESTKUBE_IMAGE_DATA_PERSISTENT_CACHE_KEY", Value: "testkube-image-cache"}, + {Name: "TESTKUBE_TW_TOOLKIT_IMAGE", Value: r.toolkitImage}, + {Name: "TESTKUBE_TW_INIT_IMAGE", Value: r.initProcessImage}, + {Name: "TESTKUBE_PRO_API_KEY", Value: env.AgentToken}, + {Name: "TESTKUBE_PRO_ORG_ID", Value: env.OrganizationId}, + {Name: "TESTKUBE_PRO_ENV_ID", Value: env.Id}, + {Name: "TESTKUBE_PRO_URL", Value: r.cloud.AgentURI()}, + {Name: "TESTKUBE_PRO_TLS_INSECURE", Value: tlsInsecure}, + {Name: "TESTKUBE_PRO_TLS_SKIP_VERIFY", Value: "true"}, + }, + VolumeMounts: []corev1.VolumeMount{ + {Name: "tmp", MountPath: "/tmp"}, + {Name: "nats", MountPath: "/app/nats"}, + {Name: "devbox", MountPath: "/.tk-devbox"}, + }, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/health", + Port: intstr.FromInt32(8088), + Scheme: corev1.URISchemeHTTP, + }, + }, + PeriodSeconds: 1, + }, + }, + }, + }, + }) + if err != nil { + return err + } + err = r.pod.WaitForContainerStarted(ctx) + if err != nil { + return err + } + return r.pod.CreateService(ctx, corev1.ServicePort{ + Name: "api", + Protocol: "TCP", + Port: 8088, + TargetPort: intstr.FromInt32(8088), + }) +} + +func (r *Agent) WaitForReady(ctx context.Context) error { + return r.pod.WaitForReady(ctx) +} + +func (r *Agent) Restart(ctx context.Context) error { + return r.pod.Restart(ctx) +} diff --git a/cmd/tcl/kubectl-testkube/devbox/devutils/binary.go b/cmd/tcl/kubectl-testkube/devbox/devutils/binary.go new file mode 100644 index 0000000000..fa84239a85 --- /dev/null +++ b/cmd/tcl/kubectl-testkube/devbox/devutils/binary.go @@ -0,0 +1,171 @@ +// Copyright 2024 Testkube. +// +// Licensed as a Testkube Pro file under the Testkube Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt + +package devutils + +import ( + "context" + "crypto/sha256" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/dustin/go-humanize" + + "github.com/kubeshop/testkube/pkg/tmp" +) + +type Binary struct { + mainPath string + outputPath string + alternatingOutputPath string + operatingSystem string + procArchitecture string + + prevHash string + hash string + buildMu sync.RWMutex +} + +func NewBinary(mainPath, operatingSystem, procArchitecture string) *Binary { + return &Binary{ + mainPath: mainPath, + outputPath: tmp.Name(), + alternatingOutputPath: tmp.Name(), + operatingSystem: operatingSystem, + procArchitecture: procArchitecture, + } +} + +func (b *Binary) updateHash() error { + f, err := os.Open(b.outputPath) + if err != nil { + return fmt.Errorf("failed to get hash: reading binary: %s", err.Error()) + } + defer f.Close() + + h := sha256.New() + if _, err := io.Copy(h, f); err != nil { + return fmt.Errorf("failed to get hash: %s", err.Error()) + } + + b.prevHash = b.hash + b.hash = fmt.Sprintf("%x", h.Sum(nil)) + return nil +} + +func (b *Binary) Hash() string { + b.buildMu.RLock() + defer b.buildMu.RUnlock() + return b.hash +} + +func (b *Binary) Path() string { + b.buildMu.RLock() + defer b.buildMu.RUnlock() + return b.outputPath +} + +func (b *Binary) Size() string { + b.buildMu.RLock() + defer b.buildMu.RUnlock() + stat, err := os.Stat(b.outputPath) + if err != nil { + return "" + } + return humanize.Bytes(uint64(stat.Size())) +} + +func (b *Binary) patch() ([]byte, error) { + prevFile, prevErr := os.ReadFile(b.alternatingOutputPath) + if prevErr != nil { + return nil, prevErr + } + currentFile, currentErr := os.ReadFile(b.outputPath) + if currentErr != nil { + return nil, currentErr + } + // In 1.5 second either it will optimize, or just pass it down + return NewBinaryPatchFor(prevFile, currentFile, 1500*time.Millisecond).Bytes(), nil +} + +func (b *Binary) Build(ctx context.Context) (string, error) { + b.buildMu.Lock() + defer b.buildMu.Unlock() + + cmd := exec.Command( + "go", "build", + "-o", b.alternatingOutputPath, + fmt.Sprintf("-ldflags=%s", strings.Join([]string{ + "-X github.com/kubeshop/testkube/internal/app/api/v1.SlackBotClientID=", + "-X github.com/kubeshop/testkube/internal/app/api/v1.SlackBotClientSecret=", + "-X github.com/kubeshop/testkube/pkg/telemetry.TestkubeMeasurementID=", + "-X github.com/kubeshop/testkube/pkg/telemetry.TestkubeMeasurementSecret=", + "-X github.com/kubeshop/testkube/internal/pkg/api.Version=devbox", + "-X github.com/kubeshop/testkube/internal/pkg/api.Commit=000000000", + "-s", + "-w", + "-v", + }, " ")), + "./main.go", + ) + cmd.Dir = filepath.Dir(b.mainPath) + cmd.Env = append(os.Environ(), + fmt.Sprintf("GOOS=%s", b.operatingSystem), + fmt.Sprintf("GOARCH=%s", b.procArchitecture), + ) + r, w := io.Pipe() + cmd.Stdout = w + cmd.Stderr = w + var buf []byte + var bufMu sync.Mutex + go func() { + bufMu.Lock() + defer bufMu.Unlock() + buf, _ = io.ReadAll(r) + }() + + go func() { + <-ctx.Done() + if cmd.Process != nil { + cmd.Process.Kill() + } + }() + + err := cmd.Run() + w.Close() + if err != nil { + bufMu.Lock() + defer bufMu.Unlock() + if ctx.Err() != nil { + return "", ctx.Err() + } + return "", fmt.Errorf("failed to build: %s: %s", err.Error(), string(buf)) + } + + // Switch paths + p := b.alternatingOutputPath + b.alternatingOutputPath = b.outputPath + b.outputPath = p + + err = b.updateHash() + if err != nil { + return "", err + } + return b.hash, err +} + +func (b *Binary) Close() { + os.Remove(b.outputPath) + os.Remove(b.alternatingOutputPath) +} diff --git a/cmd/tcl/kubectl-testkube/devbox/devutils/binarypatch.go b/cmd/tcl/kubectl-testkube/devbox/devutils/binarypatch.go new file mode 100644 index 0000000000..fcbd1ef45b --- /dev/null +++ b/cmd/tcl/kubectl-testkube/devbox/devutils/binarypatch.go @@ -0,0 +1,385 @@ +// Copyright 2024 Testkube. +// +// Licensed as a Testkube Pro file under the Testkube Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt + +package devutils + +import ( + "bytes" + "encoding/binary" + "math" + "slices" + "time" +) + +type BinaryPatchOpType = byte + +const ( + BinaryPatchAddOpType BinaryPatchOpType = 1 + BinaryPatchOriginalOpType BinaryPatchOpType = 2 +) + +// BinaryPatch is helper to avoid sending the whole binaries. +// It's optimized for fast analysis to send it ASAP, +// so the resulting patch may be bigger than it's needed. +// It's working nicely for incremental builds though. +type BinaryPatch struct { + buf *bytes.Buffer +} + +type BinaryPatchThreshold struct { + Duration time.Duration + Minimum float64 +} + +func NewBinaryPatch() *BinaryPatch { + return &BinaryPatch{ + buf: bytes.NewBuffer(nil), + } +} + +func NewBinaryPatchFromBytes(data []byte) *BinaryPatch { + return &BinaryPatch{ + buf: bytes.NewBuffer(data), + } +} + +func NewBinaryPatchFor(originalFile, currentFile []byte, maxDuration time.Duration) *BinaryPatch { + p := NewBinaryPatch() + p.Read(originalFile, currentFile, maxDuration) + return p +} + +func (p *BinaryPatch) Bytes() []byte { + return p.buf.Bytes() +} + +func (p *BinaryPatch) Load(data []byte) { + p.buf = bytes.NewBuffer(data) +} + +func (p *BinaryPatch) Read(originalFile, currentFile []byte, maxDuration time.Duration) { + skew := uint32(50) + minReuse := uint32(20) + reasonableReuse := uint32(128) + step := skew / 2 + + ops := &BinaryPatchOpList{} + + ts := time.Now() + + originalMarkers := make([][]uint32, math.MaxUint16+1) + originalIterations := uint32(len(originalFile)) - skew + + for i := skew; i < originalIterations; { + if originalFile[i] == 0 { + i++ + continue + } + // Approximate the marker + marker := uint16((int(originalFile[i-(skew/4)])+int(originalFile[i-(skew/2)]))/2) | uint16((int(originalFile[i+(skew/4)])+int(originalFile[i+(skew/2)]))/2)<<8 + originalMarkers[marker] = append(originalMarkers[marker], i) + i++ + } + + // Delete most popular characters to avoid problems with too many iterations + sizes := make([]int, len(originalMarkers)) + for i := 0; i < len(originalMarkers); i++ { + sizes[i] = len(originalMarkers[i]) + } + slices.Sort(sizes) + total := 0 + for i := range originalMarkers { + total += len(originalMarkers[i]) + } + current := total + clearTopMarkers := func(percentage int) { + percentage = max(100-max(0, percentage), 0) + keep := total * percentage / 100 + i := 0 + for ; i < len(sizes) && keep > 0; i++ { + keep -= sizes[i] + } + if i == len(sizes) { + i-- + } + maxMarkersCount := sizes[i] + for j := i + 1; j < len(sizes); j++ { + sizes[j] = 0 + } + + for i := uint16(0); i < math.MaxUint16; i++ { + if len(originalMarkers[i]) > maxMarkersCount { + current -= len(originalMarkers[i]) + originalMarkers[i] = nil + } + } + } + clearTopMarkers(10) + + ciMax := uint32(len(currentFile) - 1) + oiMax := uint32(len(originalFile) - 1) + + lastCi := uint32(0) + iterations := uint32(0) + speedUps := 0 + + maxIndex := uint32(len(currentFile)) - skew + tt := time.Now().Add(200 * time.Millisecond) +loop: + for ci := skew / 2; ci < maxIndex; { + if currentFile[ci] == 0 { + ci++ + continue + } + + // Find most unique marker in current step + marker := uint16((int(currentFile[ci-(skew/4)])+int(currentFile[ci-(skew/2)]))/2) | uint16((int(currentFile[ci+(skew/4)])+int(currentFile[ci+(skew/2)]))/2)<<8 + bestCi := ci + markerLen := len(originalMarkers[marker]) + for i := uint32(0); i < step && ci+i <= maxIndex; i++ { + if currentFile[ci+i] == 0 { + continue + } + currentMarker := uint16((int(currentFile[ci+i-(skew/4)])+int(currentFile[ci+i-(skew/2)]))/2) | uint16((int(currentFile[ci+i+(skew/4)])+int(currentFile[ci+i+(skew/2)]))/2)<<8 + currentMarkerLen := len(originalMarkers[currentMarker]) + if currentMarkerLen != 0 && currentMarkerLen < markerLen { + marker = currentMarker + markerLen = currentMarkerLen + bestCi = ci + i + } + } + ci = bestCi + + iterations++ + + if maxDuration != 0 && iterations%1000 == 0 && time.Since(ts) > maxDuration { + break + } + + if time.Since(tt) > 30*time.Millisecond { + speedUps++ + if speedUps == 2 { + step = skew * 3 / 4 + } + if speedUps == 5 { + step = skew + } + clearTopMarkers(20 + speedUps*5) + tt = time.Now() + } + + lastOR := uint32(0) + nextCL := uint32(0) + nextCR := uint32(0) + nextOL := uint32(0) + for _, oi := range originalMarkers[marker] { + if lastOR >= oi || + currentFile[ci] != originalFile[oi] || + currentFile[ci+1] != originalFile[oi+1] || + currentFile[ci-skew/2] != originalFile[oi-skew/2] || + currentFile[ci+skew/2] != originalFile[oi+skew/2] { + continue + } + // Validate exact range + l, r := uint32(0), uint32(0) + for ; oi+r < oiMax && ci+r < ciMax && originalFile[oi+r+1] == currentFile[ci+r+1]; r++ { + } + for ; ci-l > 0 && oi-l > 0 && originalFile[oi-l-1] == currentFile[ci-l-1]; l++ { + } + lastOR = oi + r + // Determine if it's nice + if l+r+1 >= minReuse && nextCR-nextCL < r+l { + nextCL = ci - l + nextCR = ci + r + nextOL = oi - l + } + if l+r > reasonableReuse { + break + } + } + + if nextCL != 0 || nextCR != 0 { + addLength := int32(nextCL) - int32(lastCi) + if addLength < 0 { + ops.Cut(uint32(-addLength)) + } else { + ops.Add(currentFile[lastCi:nextCL]) + } + lastCi = nextCR + 1 + ops.Original(nextOL, nextCR-nextCL+1) + ci = lastCi + step + continue loop + } + + ci += step + } + + ops.Add(currentFile[lastCi:]) + + p.buf = bytes.NewBuffer(ops.Bytes()) +} + +func (p *BinaryPatch) Len() int { + return p.buf.Len() +} + +func (p *BinaryPatch) Apply(original []byte) []byte { + patch := p.buf.Bytes() + size := binary.LittleEndian.Uint32(patch[0:4]) + result := make([]byte, size) + resultIndex := uint32(0) + for i := 4; i < len(patch); { + switch patch[i] { + case BinaryPatchOriginalOpType: + index := binary.LittleEndian.Uint32(patch[i+1 : i+5]) + count := binary.LittleEndian.Uint32(patch[i+5 : i+9]) + copy(result[resultIndex:], original[index:index+count]) + resultIndex += count + i += 9 + case BinaryPatchAddOpType: + count := binary.LittleEndian.Uint32(patch[i+1 : i+5]) + copy(result[resultIndex:], patch[i+5:i+5+int(count)]) + i += 5 + int(count) + resultIndex += count + } + } + return result +} + +type BinaryPatchOp struct { + op BinaryPatchOpType + val1 uint32 + val2 uint32 + content []byte +} + +func (b *BinaryPatchOp) Cut(bytesCount uint32) (nextOp *BinaryPatchOp, left uint32) { + if bytesCount == 0 { + return b, 0 + } + switch b.op { + case BinaryPatchOriginalOpType: + if bytesCount >= b.val2 { + return nil, bytesCount - b.val2 + } + b.val2 -= bytesCount + return b, 0 + case BinaryPatchAddOpType: + size := uint32(len(b.content)) + if bytesCount >= size { + return nil, bytesCount - size + } + b.content = b.content[0 : size-bytesCount] + return b, 0 + } + return nil, bytesCount +} + +func (b *BinaryPatchOp) TargetSize() uint32 { + switch b.op { + case BinaryPatchOriginalOpType: + return b.val2 + case BinaryPatchAddOpType: + return uint32(len(b.content)) + } + return 0 +} + +func (b *BinaryPatchOp) PatchSize() uint32 { + switch b.op { + case BinaryPatchOriginalOpType: + return 9 // byte + uint32 + uint32 + case BinaryPatchAddOpType: + return 5 + uint32(len(b.content)) // byte + uint32 + []byte(content) + } + return 0 +} + +type BinaryPatchOpList struct { + ops []BinaryPatchOp + count int +} + +func (b *BinaryPatchOpList) TargetSize() uint32 { + total := uint32(0) + for i := 0; i < b.count; i++ { + total += b.ops[i].TargetSize() + } + return total +} + +func (b *BinaryPatchOpList) PatchSize() uint32 { + total := uint32(4) // uint32 for file size + for i := 0; i < b.count; i++ { + total += b.ops[i].PatchSize() + } + return total +} + +func (b *BinaryPatchOpList) Cut(bytesCount uint32) uint32 { + var next *BinaryPatchOp + for i := b.count - 1; bytesCount > 0 && i >= 0; i-- { + next, bytesCount = b.ops[i].Cut(bytesCount) + if next == nil { + b.count-- + b.ops[i] = BinaryPatchOp{} + } + } + return bytesCount +} + +func (b *BinaryPatchOpList) Bytes() []byte { + targetSize := b.TargetSize() + + // Prepare buffer for the patch + result := make([]byte, b.PatchSize()) + binary.LittleEndian.PutUint32(result, targetSize) + resultIndex := 4 + + // Include all patches + for i := 0; i < b.count; i++ { + switch b.ops[i].op { + case BinaryPatchOriginalOpType: + result[resultIndex] = BinaryPatchOriginalOpType + binary.LittleEndian.PutUint32(result[resultIndex+1:], b.ops[i].val1) + binary.LittleEndian.PutUint32(result[resultIndex+5:], b.ops[i].val2) + resultIndex += 9 + case BinaryPatchAddOpType: + result[resultIndex] = BinaryPatchAddOpType + binary.LittleEndian.PutUint32(result[resultIndex+1:], uint32(len(b.ops[i].content))) + copy(result[resultIndex+5:], b.ops[i].content) + resultIndex += 5 + len(b.ops[i].content) + } + } + return result +} + +func (b *BinaryPatchOpList) Original(index, bytesCount uint32) { + if bytesCount == 0 { + return + } + + b.append(BinaryPatchOp{op: BinaryPatchOriginalOpType, val1: index, val2: bytesCount}) +} + +func (b *BinaryPatchOpList) Add(bytesArr []byte) { + if len(bytesArr) == 0 { + return + } + + b.append(BinaryPatchOp{op: BinaryPatchAddOpType, content: bytesArr}) +} + +func (b *BinaryPatchOpList) append(op BinaryPatchOp) { + // Grow if needed + if len(b.ops) <= b.count { + b.ops = append(b.ops, make([]BinaryPatchOp, 100)...) + } + b.ops[b.count] = op + b.count++ +} diff --git a/cmd/tcl/kubectl-testkube/devbox/devutils/binarystorage.go b/cmd/tcl/kubectl-testkube/devbox/devutils/binarystorage.go new file mode 100644 index 0000000000..a2e4c4941f --- /dev/null +++ b/cmd/tcl/kubectl-testkube/devbox/devutils/binarystorage.go @@ -0,0 +1,298 @@ +// Copyright 2024 Testkube. +// +// Licensed as a Testkube Pro file under the Testkube Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt + +package devutils + +import ( + "bytes" + "compress/gzip" + "context" + "crypto/tls" + "fmt" + "io" + "net/http" + "os" + "sync" + "time" + + errors2 "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/remotecommand" + + "github.com/kubeshop/testkube/cmd/testworkflow-toolkit/artifacts" + "github.com/kubeshop/testkube/internal/common" +) + +type BinaryStorage struct { + pod *PodObject + binary *Binary + localPort int + hashes map[string]string + hashMu sync.RWMutex +} + +func NewBinaryStorage(pod *PodObject, binary *Binary) *BinaryStorage { + return &BinaryStorage{ + pod: pod, + binary: binary, + hashes: make(map[string]string), + } +} + +func (r *BinaryStorage) Create(ctx context.Context) error { + if r.binary.Hash() == "" { + return errors2.New("binary storage server binary is not built") + } + + // Deploy Pod + err := r.pod.Create(ctx, &corev1.Pod{ + Spec: corev1.PodSpec{ + TerminationGracePeriodSeconds: common.Ptr(int64(1)), + Volumes: []corev1.Volume{ + {Name: "server", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}}, + {Name: "storage", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}}, + }, + Containers: []corev1.Container{ + { + Name: "binary-storage", + Image: "busybox:1.36.1-musl", + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{"/bin/sh", "-c", fmt.Sprintf("while [ ! -f /app/server-ready ]; do sleep 1; done\n/app/server")}, + VolumeMounts: []corev1.VolumeMount{ + {Name: "server", MountPath: "/app"}, + {Name: "storage", MountPath: "/storage"}, + }, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/health", + Port: intstr.FromInt32(8080), + Scheme: corev1.URISchemeHTTP, + }, + }, + PeriodSeconds: 1, + }, + }, + }, + }, + }) + if err != nil { + return err + } + + // Wait for the container to be started + err = r.pod.WaitForContainerStarted(ctx) + if err != nil { + return err + } + + // Deploy Service + err = r.pod.CreateService(ctx, corev1.ServicePort{ + Name: "api", + Protocol: "TCP", + Port: 8080, + TargetPort: intstr.FromInt32(8080), + }) + if err != nil { + return err + } + + // TODO: Move transfer utilities to *PodObject + // Apply the binary + req := r.pod.ClientSet().CoreV1().RESTClient(). + Post(). + Resource("pods"). + Name(r.pod.Name()). + Namespace(r.pod.Namespace()). + SubResource("exec"). + VersionedParams(&corev1.PodExecOptions{ + Container: "binary-storage", + Command: []string{"tar", "-xzf", "-", "-C", "/app"}, + Stdin: true, + Stdout: true, + Stderr: true, + TTY: false, + }, scheme.ParameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(r.pod.RESTConfig(), "POST", req.URL()) + if err != nil { + return errors2.Wrap(err, "failed to create spdy executor") + } + + os.WriteFile("/tmp/flag", []byte{1}, 0777) + flagFile, err := os.Open("/tmp/flag") + if err != nil { + return errors2.Wrap(err, "failed to open flag file") + } + defer flagFile.Close() + flagFileStat, err := flagFile.Stat() + if err != nil { + return err + } + + file, err := os.Open(r.binary.Path()) + if err != nil { + return err + } + defer file.Close() + fileStat, err := file.Stat() + if err != nil { + return err + } + + tarStream := artifacts.NewTarStream() + go func() { + defer tarStream.Close() + tarStream.Add("server", file, fileStat) + tarStream.Add("server-ready", flagFile, flagFileStat) + }() + + reader, writer := io.Pipe() + var buf []byte + var bufMu sync.Mutex + go func() { + bufMu.Lock() + defer bufMu.Unlock() + buf, _ = io.ReadAll(reader) + }() + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdin: tarStream, + Stdout: writer, + Stderr: writer, + Tty: false, + }) + if err != nil { + writer.Close() + bufMu.Lock() + defer bufMu.Unlock() + return fmt.Errorf("failed to stream binary: %s: %s", err.Error(), string(buf)) + } + writer.Close() + + err = r.pod.WaitForReady(ctx) + if err != nil { + return err + } + + r.localPort = GetFreePort() + err = r.pod.Forward(ctx, 8080, r.localPort, true) + if err != nil { + return err + } + + return nil +} + +func (r *BinaryStorage) WaitForReady(ctx context.Context) error { + return r.pod.WaitForReady(ctx) +} + +func (r *BinaryStorage) Is(path string, hash string) bool { + r.hashMu.RLock() + defer r.hashMu.RUnlock() + return r.hashes[path] == hash +} + +func (r *BinaryStorage) SetHash(path string, hash string) { + r.hashMu.Lock() + defer r.hashMu.Unlock() + r.hashes[path] = hash +} + +func (r *BinaryStorage) Upload(ctx context.Context, name string, binary *Binary) (cached bool, size int, err error) { + binary.buildMu.RLock() + defer binary.buildMu.RUnlock() + if binary.hash != "" && r.Is(name, binary.hash) { + return true, 0, nil + } + for i := 0; i < 5; i++ { + size, err = r.upload(ctx, name, binary) + if err == nil { + return + } + if ctx.Err() != nil { + return false, 0, err + } + time.Sleep(time.Duration(100*(i+1)) * time.Millisecond) + } + return false, size, err +} + +func (r *BinaryStorage) upload(ctx context.Context, name string, binary *Binary) (int, error) { + file, err := os.Open(binary.outputPath) + if err != nil { + return 0, err + } + defer file.Close() + + tr := http.DefaultTransport.(*http.Transport).Clone() + tr.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} + client := &http.Client{Transport: tr} + + if binary.hash != "" && binary.prevHash != "" && r.Is(name, binary.prevHash) { + contents, err := binary.patch() + if err == nil { + gzipContents := bytes.NewBuffer(nil) + gz := gzip.NewWriter(gzipContents) + io.Copy(gz, bytes.NewBuffer(contents)) + gz.Flush() + gz.Close() + + gzipContentsLen := gzipContents.Len() + req, err := http.NewRequestWithContext(ctx, http.MethodPatch, fmt.Sprintf("http://localhost:%d/%s", r.localPort, name), gzipContents) + if err != nil { + if ctx.Err() != nil { + return 0, err + } + fmt.Printf("error while sending %s patch, fallback to full stream: %s\n", name, err) + } else { + req.ContentLength = int64(gzipContentsLen) + req.Header.Set("Content-Encoding", "gzip") + req.Header.Set("X-Prev-Hash", binary.prevHash) + req.Header.Set("X-Hash", binary.hash) + res, err := client.Do(req) + if err != nil { + fmt.Printf("error while sending %s patch, fallback to full stream: %s\n", name, err) + } else if res.StatusCode != http.StatusOK { + b, _ := io.ReadAll(res.Body) + fmt.Printf("error while sending %s patch, fallback to full stream: status code: %s, message: %s\n", name, res.Status, string(b)) + } else { + r.SetHash(name, binary.hash) + return gzipContentsLen, nil + } + } + } + } + + buf := bytes.NewBuffer(nil) + gz := gzip.NewWriter(buf) + io.Copy(gz, file) + gz.Flush() + gz.Close() + bufLen := buf.Len() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("http://localhost:%d/%s", r.localPort, name), buf) + if err != nil { + return bufLen, err + } + req.ContentLength = int64(bufLen) + req.Header.Set("Content-Encoding", "gzip") + + res, err := client.Do(req) + if err != nil { + return bufLen, err + } + if res.StatusCode != http.StatusOK { + b, _ := io.ReadAll(res.Body) + return bufLen, fmt.Errorf("failed saving file: status code: %d / message: %s", res.StatusCode, string(b)) + } + r.SetHash(name, binary.hash) + return bufLen, nil +} diff --git a/cmd/tcl/kubectl-testkube/devbox/devutils/certificates.go b/cmd/tcl/kubectl-testkube/devbox/devutils/certificates.go new file mode 100644 index 0000000000..7d63b11da1 --- /dev/null +++ b/cmd/tcl/kubectl-testkube/devbox/devutils/certificates.go @@ -0,0 +1,89 @@ +// Copyright 2024 Testkube. +// +// Licensed as a Testkube Pro file under the Testkube Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt + +package devutils + +import ( + "bytes" + "crypto/rand" + "crypto/rsa" + "crypto/x509" + "crypto/x509/pkix" + "encoding/pem" + "math/big" + "time" +) + +type CertificateSet struct { + CaPEM []byte + CrtPEM []byte + KeyPEM []byte +} + +func CreateCertificate(cert x509.Certificate) (result CertificateSet, err error) { + // Build CA + ca := &x509.Certificate{ + SerialNumber: big.NewInt(11111), + Subject: pkix.Name{ + Organization: []string{"Kubeshop"}, + Country: []string{"US"}, + Province: []string{""}, + Locality: []string{"Wilmington"}, + StreetAddress: []string{"Orange St"}, + PostalCode: []string{"19801"}, + }, + NotBefore: time.Now(), + NotAfter: time.Now().AddDate(10, 0, 0), + IsCA: true, + ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth}, + KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign, + BasicConstraintsValid: true, + } + caPrivKey, err := rsa.GenerateKey(rand.Reader, 4096) + if err != nil { + return result, err + } + caBytes, err := x509.CreateCertificate(rand.Reader, ca, ca, &caPrivKey.PublicKey, caPrivKey) + if err != nil { + return result, err + } + caPEM := new(bytes.Buffer) + pem.Encode(caPEM, &pem.Block{Type: "CERTIFICATE", Bytes: caBytes}) + caPrivKeyPEM := new(bytes.Buffer) + pem.Encode(caPrivKeyPEM, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(caPrivKey)}) + + // Build the direct certificate + cert.NotBefore = ca.NotBefore + cert.NotAfter = ca.NotAfter + cert.SerialNumber = big.NewInt(11111) + cert.Subject = ca.Subject + cert.SubjectKeyId = []byte{1, 2, 3, 4, 6} + cert.ExtKeyUsage = []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth} + cert.KeyUsage = x509.KeyUsageDigitalSignature + + certPrivKey, err := rsa.GenerateKey(rand.Reader, 4096) + if err != nil { + return result, err + } + certBytes, err := x509.CreateCertificate(rand.Reader, &cert, ca, &certPrivKey.PublicKey, caPrivKey) + if err != nil { + return result, err + } + certPEM := new(bytes.Buffer) + pem.Encode(certPEM, &pem.Block{ + Type: "CERTIFICATE", + Bytes: certBytes, + }) + certPrivKeyPEM := new(bytes.Buffer) + pem.Encode(certPrivKeyPEM, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(certPrivKey)}) + + result.CaPEM = caPEM.Bytes() + result.CrtPEM = certPEM.Bytes() + result.KeyPEM = certPrivKeyPEM.Bytes() + return result, nil +} diff --git a/cmd/tcl/kubectl-testkube/devbox/devutils/cloud.go b/cmd/tcl/kubectl-testkube/devbox/devutils/cloud.go new file mode 100644 index 0000000000..69b6f77a17 --- /dev/null +++ b/cmd/tcl/kubectl-testkube/devbox/devutils/cloud.go @@ -0,0 +1,181 @@ +// Copyright 2024 Testkube. +// +// Licensed as a Testkube Pro file under the Testkube Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt + +package devutils + +import ( + "errors" + "fmt" + "regexp" + "strings" + "sync" + "time" + + "github.com/spf13/cobra" + + common2 "github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/common" + "github.com/kubeshop/testkube/cmd/kubectl-testkube/config" + client2 "github.com/kubeshop/testkube/pkg/api/v1/client" + "github.com/kubeshop/testkube/pkg/cloud/client" +) + +type cloudObj struct { + cfg config.CloudContext + envClient *client.EnvironmentsClient + list []client.Environment + + clientMu sync.Mutex + client client2.Client + clientTs time.Time + + cmd *cobra.Command +} + +func NewCloud(cfg config.CloudContext, cmd *cobra.Command) (*cloudObj, error) { + if cfg.ApiKey == "" || cfg.OrganizationId == "" || cfg.OrganizationName == "" { + return nil, errors.New("login to the organization first") + } + if strings.HasPrefix(cfg.AgentUri, "https://") { + cfg.AgentUri = strings.TrimPrefix(cfg.AgentUri, "https://") + if !regexp.MustCompile(`:\d+$`).MatchString(cfg.AgentUri) { + cfg.AgentUri += ":443" + } + } else if strings.HasPrefix(cfg.AgentUri, "http://") { + cfg.AgentUri = strings.TrimPrefix(cfg.AgentUri, "http://") + if !regexp.MustCompile(`:\d+$`).MatchString(cfg.AgentUri) { + cfg.AgentUri += ":80" + } + } + // TODO: FIX THAT + if strings.HasPrefix(cfg.AgentUri, "api.") { + cfg.AgentUri = "agent." + strings.TrimPrefix(cfg.AgentUri, "api.") + } + envClient := client.NewEnvironmentsClient(cfg.ApiUri, cfg.ApiKey, cfg.OrganizationId) + obj := &cloudObj{ + cfg: cfg, + envClient: envClient, + cmd: cmd, + } + + err := obj.UpdateList() + if err != nil { + return nil, err + } + return obj, nil +} + +func (c *cloudObj) List() []client.Environment { + return c.list +} + +func (c *cloudObj) ListObsolete() []client.Environment { + obsolete := make([]client.Environment, 0) + for _, env := range c.list { + if !env.Connected { + obsolete = append(obsolete, env) + } + } + return obsolete +} + +func (c *cloudObj) UpdateList() error { + list, err := c.envClient.List() + if err != nil { + return err + } + result := make([]client.Environment, 0) + for i := range list { + if strings.HasPrefix(list[i].Name, "devbox-") { + result = append(result, list[i]) + } + } + c.list = result + return nil +} + +func (c *cloudObj) Client(environmentId string) (client2.Client, error) { + c.clientMu.Lock() + defer c.clientMu.Unlock() + + if c.client == nil || c.clientTs.Add(5*time.Minute).Before(time.Now()) { + common2.GetClient(c.cmd) // refresh token + var err error + c.client, err = client2.GetClient(client2.ClientCloud, client2.Options{ + Insecure: c.AgentInsecure(), + ApiUri: c.ApiURI(), + CloudApiKey: c.ApiKey(), + CloudOrganization: c.cfg.OrganizationId, + CloudEnvironment: environmentId, + CloudApiPathPrefix: fmt.Sprintf("/organizations/%s/environments/%s/agent", c.cfg.OrganizationId, environmentId), + }) + if err != nil { + return nil, err + } + c.clientTs = time.Now() + } + return c.client, nil +} + +func (c *cloudObj) AgentURI() string { + return c.cfg.AgentUri +} + +func (c *cloudObj) AgentInsecure() bool { + return strings.HasPrefix(c.cfg.ApiUri, "http://") +} + +func (c *cloudObj) ApiURI() string { + return c.cfg.ApiUri +} + +func (c *cloudObj) ApiKey() string { + return c.cfg.ApiKey +} + +func (c *cloudObj) ApiInsecure() bool { + return strings.HasPrefix(c.cfg.ApiUri, "http://") +} + +func (c *cloudObj) DashboardUrl(id, path string) string { + return strings.TrimSuffix(fmt.Sprintf("%s/organization/%s/environment/%s/", c.cfg.UiUri, c.cfg.OrganizationId, id)+strings.TrimPrefix(path, "/"), "/") +} + +func (c *cloudObj) CreateEnvironment(name string) (*client.Environment, error) { + env, err := c.envClient.Create(client.Environment{ + Name: name, + Owner: c.cfg.OrganizationId, + OrganizationId: c.cfg.OrganizationId, + }) + if err != nil { + return nil, err + } + // TODO: POST request is not returning slug - if it will, delete the fallback path + if env.Slug != "" { + c.list = append(c.list, env) + } else { + err = c.UpdateList() + if err != nil { + return nil, err + } + for i := range c.list { + if c.list[i].Id == env.Id { + env = c.list[i] + break + } + } + } + // Hack to build proper URLs even when slug is missing + if env.Slug == "" { + env.Slug = env.Id + } + return &env, nil +} + +func (c *cloudObj) DeleteEnvironment(id string) error { + return c.envClient.Delete(id) +} diff --git a/cmd/tcl/kubectl-testkube/devbox/devutils/cluster.go b/cmd/tcl/kubectl-testkube/devbox/devutils/cluster.go new file mode 100644 index 0000000000..19450acd94 --- /dev/null +++ b/cmd/tcl/kubectl-testkube/devbox/devutils/cluster.go @@ -0,0 +1,74 @@ +// Copyright 2024 Testkube. +// +// Licensed as a Testkube Pro file under the Testkube Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt + +package devutils + +import ( + "strings" + + "github.com/pkg/errors" + "k8s.io/apimachinery/pkg/version" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + + "github.com/kubeshop/testkube/pkg/k8sclient" +) + +type ClusterObject struct { + cfg *rest.Config + clientSet *kubernetes.Clientset + versionInfo *version.Info +} + +func NewCluster() (*ClusterObject, error) { + config, err := rest.InClusterConfig() + if err != nil { + config, err = k8sclient.GetK8sClientConfig() + if err != nil { + return nil, errors.Wrap(err, "failed to get Kubernetes config") + } + } + clientSet, err := kubernetes.NewForConfig(config) + if err != nil { + return nil, errors.Wrap(err, "failed to create Kubernetes client") + } + info, err := clientSet.ServerVersion() + if err != nil { + return nil, errors.Wrap(err, "failed to get Kubernetes cluster details") + } + + return &ClusterObject{ + clientSet: clientSet, + versionInfo: info, + cfg: config, + }, nil +} + +func (c *ClusterObject) ClientSet() *kubernetes.Clientset { + return c.clientSet +} + +func (c *ClusterObject) Config() *rest.Config { + return c.cfg +} + +func (c *ClusterObject) Namespace(name string) *NamespaceObject { + return NewNamespace(c.clientSet, c.cfg, name) +} + +func (c *ClusterObject) Host() string { + return c.cfg.Host +} + +func (c *ClusterObject) OperatingSystem() string { + return strings.Split(c.versionInfo.Platform, "/")[0] +} + +func (c *ClusterObject) Architecture() string { + return strings.Split(c.versionInfo.Platform, "/")[1] +} diff --git a/cmd/tcl/kubectl-testkube/devbox/devutils/crdsync.go b/cmd/tcl/kubectl-testkube/devbox/devutils/crdsync.go new file mode 100644 index 0000000000..44c28a3db0 --- /dev/null +++ b/cmd/tcl/kubectl-testkube/devbox/devutils/crdsync.go @@ -0,0 +1,249 @@ +// Copyright 2024 Testkube. +// +// Licensed as a Testkube Pro file under the Testkube Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt + +package devutils + +import ( + "bytes" + "context" + "encoding/json" + "io" + "io/fs" + "os" + "path/filepath" + "strings" + "sync" + + "github.com/pkg/errors" + "gopkg.in/yaml.v2" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + testworkflowsv1 "github.com/kubeshop/testkube-operator/api/testworkflows/v1" + "github.com/kubeshop/testkube/internal/common" + "github.com/kubeshop/testkube/pkg/testworkflows/executionworker/controller/store" +) + +type CRDSyncWorkflow struct { + Workflow testworkflowsv1.TestWorkflow + SourcePath string +} + +type CRDSyncTemplate struct { + Template testworkflowsv1.TestWorkflowTemplate + SourcePath string +} + +type CRDSyncUpdateOp string + +const ( + CRDSyncUpdateOpCreate CRDSyncUpdateOp = "create" + CRDSyncUpdateOpUpdate CRDSyncUpdateOp = "update" + CRDSyncUpdateOpDelete CRDSyncUpdateOp = "delete" +) + +type CRDSyncUpdate struct { + Template *testworkflowsv1.TestWorkflowTemplate + Workflow *testworkflowsv1.TestWorkflow + Op CRDSyncUpdateOp +} + +type CRDSync struct { + workflows []CRDSyncWorkflow + templates []CRDSyncTemplate + updates []CRDSyncUpdate + mu sync.Mutex + emitter store.Update +} + +// TODO: optimize for duplicates +func NewCRDSync() *CRDSync { + return &CRDSync{ + workflows: make([]CRDSyncWorkflow, 0), + templates: make([]CRDSyncTemplate, 0), + updates: make([]CRDSyncUpdate, 0), + emitter: store.NewUpdate(), + } +} + +func (c *CRDSync) WorkflowsCount() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.workflows) +} + +func (c *CRDSync) TemplatesCount() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.templates) +} + +func (c *CRDSync) Next(ctx context.Context) (*CRDSyncUpdate, error) { + for { + if ctx.Err() != nil { + return nil, ctx.Err() + } + c.mu.Lock() + if len(c.updates) > 0 { + next := c.updates[0] + c.updates = c.updates[1:] + c.mu.Unlock() + return &next, nil + } + ch := c.emitter.Next() + c.mu.Unlock() + select { + case <-ctx.Done(): + case <-ch: + } + } +} + +func (c *CRDSync) processWorkflow(sourcePath string, workflow testworkflowsv1.TestWorkflow) error { + for i := range c.workflows { + if c.workflows[i].Workflow.Name == workflow.Name { + v1, _ := json.Marshal(c.workflows[i].Workflow) + v2, _ := json.Marshal(workflow) + c.workflows[i].SourcePath = sourcePath + if !bytes.Equal(v1, v2) { + c.workflows[i].Workflow = workflow + c.updates = append(c.updates, CRDSyncUpdate{Workflow: &workflow, Op: CRDSyncUpdateOpUpdate}) + } + return nil + } + } + c.workflows = append(c.workflows, CRDSyncWorkflow{SourcePath: sourcePath, Workflow: workflow}) + c.updates = append(c.updates, CRDSyncUpdate{Workflow: &workflow, Op: CRDSyncUpdateOpCreate}) + return nil +} + +func (c *CRDSync) processTemplate(sourcePath string, template testworkflowsv1.TestWorkflowTemplate) error { + for i := range c.templates { + if c.templates[i].Template.Name == template.Name { + v1, _ := json.Marshal(c.templates[i].Template) + v2, _ := json.Marshal(template) + if !bytes.Equal(v1, v2) { + c.templates[i].SourcePath = sourcePath + c.templates[i].Template = template + c.updates = append(c.updates, CRDSyncUpdate{Template: &template, Op: CRDSyncUpdateOpUpdate}) + return nil + } + } + } + c.templates = append(c.templates, CRDSyncTemplate{SourcePath: sourcePath, Template: template}) + c.updates = append(c.updates, CRDSyncUpdate{Template: &template, Op: CRDSyncUpdateOpCreate}) + return nil +} + +func (c *CRDSync) deleteFile(path string) error { + for i := 0; i < len(c.templates); i++ { + if c.templates[i].SourcePath == path { + c.updates = append(c.updates, CRDSyncUpdate{ + Template: &testworkflowsv1.TestWorkflowTemplate{ObjectMeta: metav1.ObjectMeta{Name: c.templates[i].Template.Name}}, + Op: CRDSyncUpdateOpDelete, + }) + c.templates = append(c.templates[:i], c.templates[i+1:]...) + i-- + } + } + for i := 0; i < len(c.workflows); i++ { + if c.workflows[i].SourcePath == path { + c.updates = append(c.updates, CRDSyncUpdate{ + Template: &testworkflowsv1.TestWorkflowTemplate{ObjectMeta: metav1.ObjectMeta{Name: c.templates[i].Template.Name}}, + Op: CRDSyncUpdateOpDelete, + }) + c.workflows = append(c.workflows[:i], c.workflows[i+1:]...) + i-- + } + } + return nil +} + +func (c *CRDSync) loadFile(path string) error { + // Ignore non-YAML files + if !strings.HasSuffix(path, ".yml") && !strings.HasSuffix(path, ".yaml") { + return nil + } + + defer c.emitter.Emit() + + // Parse the YAML file + file, err := os.Open(path) + if err != nil { + c.deleteFile(path) + return nil + } + + // TODO: Handle deleted entries + decoder := yaml.NewDecoder(file) + for { + var obj map[string]interface{} + err := decoder.Decode(&obj) + if errors.Is(err, io.EOF) { + break + } + if err != nil { + break + } + + if obj["kind"] == nil || !(obj["kind"].(string) == "TestWorkflow" || obj["kind"].(string) == "TestWorkflowTemplate") { + continue + } + + if obj["kind"].(string) == "TestWorkflow" { + bytes, _ := yaml.Marshal(obj) + tw := testworkflowsv1.TestWorkflow{} + err := common.DeserializeCRD(&tw, bytes) + if tw.Name == "" { + continue + } + if err != nil { + continue + } + c.processWorkflow(path, tw) + } else if obj["kind"].(string) == "TestWorkflowTemplate" { + bytes, _ := yaml.Marshal(obj) + tw := testworkflowsv1.TestWorkflowTemplate{} + err := common.DeserializeCRD(&tw, bytes) + if tw.Name == "" { + continue + } + if err != nil { + continue + } + c.processTemplate(path, tw) + } + } + file.Close() + return nil +} + +func (c *CRDSync) Load(path string) error { + c.mu.Lock() + defer c.mu.Unlock() + + path, err := filepath.Abs(path) + if err != nil { + return err + } + + stat, err := os.Stat(path) + if err != nil { + return err + } + + if !stat.IsDir() { + return c.loadFile(path) + } + + return filepath.Walk(path, func(path string, info fs.FileInfo, err error) error { + if info.IsDir() { + return nil + } + return c.loadFile(path) + }) +} diff --git a/cmd/tcl/kubectl-testkube/devbox/devutils/find.go b/cmd/tcl/kubectl-testkube/devbox/devutils/find.go new file mode 100644 index 0000000000..14804be6a6 --- /dev/null +++ b/cmd/tcl/kubectl-testkube/devbox/devutils/find.go @@ -0,0 +1,34 @@ +// Copyright 2024 Testkube. +// +// Licensed as a Testkube Pro file under the Testkube Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt + +package devutils + +import ( + "os" + "path/filepath" +) + +func FindDirContaining(paths ...string) string { + cwd, _ := os.Getwd() + + // Find near in the tree + current := filepath.Clean(filepath.Join(cwd, "testkube", "dummy")) +loop: + for current != filepath.Clean(filepath.Join(cwd, "..")) { + current = filepath.Dir(current) + for _, path := range paths { + expected := filepath.Clean(filepath.Join(current, path)) + _, err := os.Stat(expected) + if err != nil { + continue loop + } + } + return current + } + return "" +} diff --git a/cmd/tcl/kubectl-testkube/devbox/devutils/forwarding.go b/cmd/tcl/kubectl-testkube/devbox/devutils/forwarding.go new file mode 100644 index 0000000000..499b0b2ee5 --- /dev/null +++ b/cmd/tcl/kubectl-testkube/devbox/devutils/forwarding.go @@ -0,0 +1,173 @@ +// Copyright 2024 Testkube. +// +// Licensed as a Testkube Pro file under the Testkube Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt + +package devutils + +import ( + "bytes" + "crypto/tls" + "crypto/x509" + "fmt" + "io" + "net" + "net/http" + "net/url" + "strings" + "sync" + "time" + + "github.com/pkg/errors" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/portforward" + "k8s.io/client-go/transport/spdy" +) + +func GetFreePort() int { + var a *net.TCPAddr + var err error + if a, err = net.ResolveTCPAddr("tcp", ":0"); err == nil { + var l *net.TCPListener + if l, err = net.ListenTCP("tcp", a); err == nil { + defer l.Close() + return l.Addr().(*net.TCPAddr).Port + } + } + panic(err) +} + +// TODO: Support context +func ForwardPod(config *rest.Config, namespace, podName string, clusterPort, localPort int, ping bool) error { + middlewarePort := GetFreePort() + transport, upgrader, err := spdy.RoundTripperFor(config) + if err != nil { + return err + } + path := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/portforward", namespace, podName) + hostIP := strings.TrimPrefix(strings.TrimPrefix(config.Host, "http://"), "https://") + serverURL := url.URL{Scheme: "https", Path: path, Host: hostIP} + dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, http.MethodPost, &serverURL) + stopChan, readyChan := make(chan struct{}, 1), make(chan struct{}, 1) + out, errOut := new(bytes.Buffer), new(bytes.Buffer) + forwarder, err := portforward.New(dialer, []string{fmt.Sprintf("%d:%d", middlewarePort, clusterPort)}, stopChan, readyChan, out, errOut) + if err != nil { + return err + } + go func() { + for { + if err = forwarder.ForwardPorts(); err != nil { + fmt.Println(errors.Wrap(err, "warn: forwarder: closed")) + time.Sleep(50 * time.Millisecond) + readyChan = make(chan struct{}, 1) + forwarder, err = portforward.New(dialer, []string{fmt.Sprintf("%d:%d", middlewarePort, clusterPort)}, stopChan, readyChan, out, errOut) + go func(readyChan chan struct{}) { + <-readyChan + fmt.Println("forwarder: reconnected") + }(readyChan) + } + } + }() + + // Hack to handle Kubernetes Port Forwarding issue. + // Stream through a different server, to ensure that both connections are fully read, with no broken pipe. + // @see {@link https://github.com/kubernetes/kubernetes/issues/74551} + ln, err := net.Listen("tcp", fmt.Sprintf(":%d", localPort)) + if err != nil { + return err + } + go func() { + defer ln.Close() + for { + conn, err := ln.Accept() + if err == nil { + go func(conn net.Conn) { + defer conn.Close() + open, err := net.Dial("tcp", fmt.Sprintf(":%d", middlewarePort)) + if err != nil { + return + } + defer open.Close() + var wg sync.WaitGroup + wg.Add(2) + go func() { + io.Copy(open, conn) + wg.Done() + }() + go func() { + io.Copy(conn, open) + wg.Done() + }() + wg.Wait() + + // Read all before closing + io.ReadAll(conn) + io.ReadAll(open) + }(conn) + } + } + }() + + if ping { + go func() { + for { + http.NewRequest(http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d", localPort), nil) + time.Sleep(4 * time.Second) + } + }() + } + + return nil +} + +func ProxySSL(sourcePort, sslPort int) error { + set, err := CreateCertificate(x509.Certificate{ + IPAddresses: []net.IP{net.ParseIP("0.0.0.0"), net.ParseIP("127.0.0.1"), net.IPv6loopback}, + }) + if err != nil { + return err + } + crt, err := tls.X509KeyPair(set.CrtPEM, set.KeyPEM) + if err != nil { + return err + } + ln, err := tls.Listen("tcp", fmt.Sprintf(":%d", sslPort), &tls.Config{ + Certificates: []tls.Certificate{crt}, + InsecureSkipVerify: true, + }) + if err != nil { + return err + } + go func() { + defer ln.Close() + + for { + conn, err := ln.Accept() + if err == nil { + go func(conn net.Conn) { + defer conn.Close() + open, err := net.Dial("tcp", fmt.Sprintf(":%d", sourcePort)) + if err != nil { + return + } + defer open.Close() + var wg sync.WaitGroup + wg.Add(2) + go func() { + io.Copy(open, conn) + wg.Done() + }() + go func() { + io.Copy(conn, open) + wg.Done() + }() + wg.Wait() + }(conn) + } + } + }() + return nil +} diff --git a/cmd/tcl/kubectl-testkube/devbox/devutils/fswatcher.go b/cmd/tcl/kubectl-testkube/devbox/devutils/fswatcher.go new file mode 100644 index 0000000000..c12a38e885 --- /dev/null +++ b/cmd/tcl/kubectl-testkube/devbox/devutils/fswatcher.go @@ -0,0 +1,106 @@ +// Copyright 2024 Testkube. +// +// Licensed as a Testkube Pro file under the Testkube Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt + +package devutils + +import ( + "context" + "io" + "io/fs" + "os" + "path/filepath" + + "github.com/fsnotify/fsnotify" +) + +type FsWatcher struct { + watcher *fsnotify.Watcher +} + +// TODO: support masks like **/*.go +func NewFsWatcher(paths ...string) (*FsWatcher, error) { + fsWatcher, err := fsnotify.NewWatcher() + if err != nil { + return nil, err + } + w := &FsWatcher{ + watcher: fsWatcher, + } + for i := range paths { + if err = w.add(paths[i]); err != nil { + fsWatcher.Close() + return nil, err + } + } + return w, nil +} + +func (w *FsWatcher) Close() error { + return w.watcher.Close() +} + +func (w *FsWatcher) addRecursive(dirPath string) error { + if err := w.watcher.Add(dirPath); err != nil { + return err + } + return filepath.WalkDir(dirPath, func(path string, d fs.DirEntry, err error) error { + if err != nil || !d.IsDir() { + return nil + } + if filepath.Base(path)[0] == '.' { + // Ignore dot-files + return nil + } + if path == dirPath { + return nil + } + return w.addRecursive(path) + }) +} + +func (w *FsWatcher) add(path string) error { + path, err := filepath.Abs(path) + if err != nil { + return err + } + return w.addRecursive(path) +} + +func (w *FsWatcher) Next(ctx context.Context) (string, error) { + for { + select { + case <-ctx.Done(): + return "", ctx.Err() + case event, ok := <-w.watcher.Events: + if !ok { + return "", io.EOF + } + fileinfo, err := os.Stat(event.Name) + if err != nil { + continue + } + if fileinfo.IsDir() { + if event.Has(fsnotify.Create) { + if err = w.addRecursive(event.Name); err != nil { + return "", err + } + } + continue + } + if !event.Has(fsnotify.Create) && !event.Has(fsnotify.Write) && !event.Has(fsnotify.Remove) { + continue + } + return event.Name, nil + case err, ok := <-w.watcher.Errors: + if !ok { + return "", io.EOF + } + return "", err + } + } +} diff --git a/cmd/tcl/kubectl-testkube/devbox/devutils/interceptor.go b/cmd/tcl/kubectl-testkube/devbox/devutils/interceptor.go new file mode 100644 index 0000000000..e63ab1978b --- /dev/null +++ b/cmd/tcl/kubectl-testkube/devbox/devutils/interceptor.go @@ -0,0 +1,291 @@ +// Copyright 2024 Testkube. +// +// Licensed as a Testkube Pro file under the Testkube Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt + +package devutils + +import ( + "context" + "crypto/x509" + "fmt" + "io" + "os" + "sync" + + "github.com/kballard/go-shellquote" + errors2 "github.com/pkg/errors" + admissionregistrationv1 "k8s.io/api/admissionregistration/v1" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/remotecommand" + + "github.com/kubeshop/testkube/cmd/testworkflow-toolkit/artifacts" + "github.com/kubeshop/testkube/internal/common" + "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/constants" +) + +type Interceptor struct { + pod *PodObject + caPem []byte + initProcessImageName string + toolkitImageName string + binary *Binary +} + +func NewInterceptor(pod *PodObject, initProcessImageName, toolkitImageName string, binary *Binary) *Interceptor { + return &Interceptor{ + pod: pod, + initProcessImageName: initProcessImageName, + toolkitImageName: toolkitImageName, + binary: binary, + } +} + +func (r *Interceptor) Create(ctx context.Context) error { + if r.binary.Hash() == "" { + return errors2.New("interceptor binary is not built") + } + + certSet, err := CreateCertificate(x509.Certificate{ + DNSNames: []string{ + fmt.Sprintf("%s.%s", r.pod.Name(), r.pod.Namespace()), + fmt.Sprintf("%s.%s.svc", r.pod.Name(), r.pod.Namespace()), + }, + }) + if err != nil { + return err + } + + // Deploy certificate + certSecretName := fmt.Sprintf("%s-cert", r.pod.Name()) + _, err = r.pod.ClientSet().CoreV1().Secrets(r.pod.Namespace()).Create(ctx, &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: certSecretName, + }, + Data: map[string][]byte{ + "ca.crt": certSet.CaPEM, + "tls.crt": certSet.CrtPEM, + "tls.key": certSet.KeyPEM, + }, + }, metav1.CreateOptions{}) + if err != nil { + if !errors.IsAlreadyExists(err) { + return err + } + secret, err := r.pod.ClientSet().CoreV1().Secrets(r.pod.Namespace()).Get(ctx, certSecretName, metav1.GetOptions{}) + if err != nil { + return err + } + certSet.CaPEM = secret.Data["ca.crt"] + certSet.CrtPEM = secret.Data["tls.crt"] + certSet.KeyPEM = secret.Data["tls.key"] + } + r.caPem = certSet.CaPEM + + // Deploy Pod + err = r.pod.Create(ctx, &corev1.Pod{ + Spec: corev1.PodSpec{ + TerminationGracePeriodSeconds: common.Ptr(int64(1)), + Volumes: []corev1.Volume{ + {Name: "server", VolumeSource: corev1.VolumeSource{EmptyDir: &corev1.EmptyDirVolumeSource{}}}, + {Name: "certs", VolumeSource: corev1.VolumeSource{Secret: &corev1.SecretVolumeSource{ + SecretName: certSecretName, + }}}, + }, + Containers: []corev1.Container{ + { + Name: "interceptor", + Image: "busybox:1.36.1-musl", + ImagePullPolicy: corev1.PullIfNotPresent, + Command: []string{"/bin/sh", "-c", fmt.Sprintf("while [ ! -f /app/server-ready ]; do sleep 1; done\n/app/server %s", shellquote.Join(r.initProcessImageName, r.toolkitImageName))}, + VolumeMounts: []corev1.VolumeMount{ + {Name: "server", MountPath: "/app"}, + {Name: "certs", MountPath: "/certs"}, + }, + ReadinessProbe: &corev1.Probe{ + ProbeHandler: corev1.ProbeHandler{ + HTTPGet: &corev1.HTTPGetAction{ + Path: "/health", + Port: intstr.FromInt32(8443), + Scheme: corev1.URISchemeHTTPS, + }, + }, + PeriodSeconds: 1, + }, + }, + }, + }, + }) + if err != nil { + return err + } + + // Wait for the container to be started + err = r.pod.WaitForContainerStarted(ctx) + if err != nil { + return err + } + + // Deploy Service + err = r.pod.CreateService(ctx, corev1.ServicePort{ + Name: "api", + Protocol: "TCP", + Port: 8443, + TargetPort: intstr.FromInt32(8443), + }) + if err != nil { + return err + } + + // TODO: Move transfer utilities to *PodObject + // Apply the binary + req := r.pod.ClientSet().CoreV1().RESTClient(). + Post(). + Resource("pods"). + Name(r.pod.Name()). + Namespace(r.pod.Namespace()). + SubResource("exec"). + VersionedParams(&corev1.PodExecOptions{ + Container: "interceptor", + Command: []string{"tar", "-xzf", "-", "-C", "/app"}, + Stdin: true, + Stdout: true, + Stderr: true, + TTY: false, + }, scheme.ParameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(r.pod.RESTConfig(), "POST", req.URL()) + if err != nil { + return errors2.Wrap(err, "failed to create spdy executor") + } + + os.WriteFile("/tmp/flag", []byte{1}, 0777) + flagFile, err := os.Open("/tmp/flag") + if err != nil { + return errors2.Wrap(err, "failed to open flag file") + } + defer flagFile.Close() + flagFileStat, err := flagFile.Stat() + if err != nil { + return err + } + + file, err := os.Open(r.binary.Path()) + if err != nil { + return err + } + defer file.Close() + fileStat, err := file.Stat() + if err != nil { + return err + } + + tarStream := artifacts.NewTarStream() + go func() { + defer tarStream.Close() + tarStream.Add("server", file, fileStat) + tarStream.Add("server-ready", flagFile, flagFileStat) + }() + + reader, writer := io.Pipe() + var buf []byte + var bufMu sync.Mutex + go func() { + bufMu.Lock() + defer bufMu.Unlock() + buf, _ = io.ReadAll(reader) + }() + err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{ + Stdin: tarStream, + Stdout: writer, + Stderr: writer, + Tty: false, + }) + if err != nil { + writer.Close() + bufMu.Lock() + defer bufMu.Unlock() + return fmt.Errorf("failed to stream binary: %s: %s", err.Error(), string(buf)) + } + writer.Close() + + return nil +} + +func (r *Interceptor) WaitForReady(ctx context.Context) error { + return r.pod.WaitForReady(ctx) +} + +func (r *Interceptor) Enable(ctx context.Context) error { + _ = r.Disable() + + _, err := r.pod.ClientSet().AdmissionregistrationV1().MutatingWebhookConfigurations().Create(ctx, &admissionregistrationv1.MutatingWebhookConfiguration{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-webhook-%s", r.pod.Name(), r.pod.Namespace()), + Labels: map[string]string{ + "testkube.io/devbox-name": r.pod.Namespace(), + }, + }, + Webhooks: []admissionregistrationv1.MutatingWebhook{ + { + Name: "devbox.kb.io", + ClientConfig: admissionregistrationv1.WebhookClientConfig{ + Service: &admissionregistrationv1.ServiceReference{ + Name: r.pod.Name(), + Namespace: r.pod.Namespace(), + Path: common.Ptr("/mutate"), + Port: common.Ptr(int32(8443)), + }, + CABundle: r.caPem, + }, + Rules: []admissionregistrationv1.RuleWithOperations{ + { + Rule: admissionregistrationv1.Rule{ + APIGroups: []string{""}, + APIVersions: []string{"v1"}, + Resources: []string{"pods"}, + Scope: common.Ptr(admissionregistrationv1.NamespacedScope), + }, + Operations: []admissionregistrationv1.OperationType{ + admissionregistrationv1.Create, + }, + }, + }, + NamespaceSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "kubernetes.io/metadata.name", + Operator: metav1.LabelSelectorOpIn, + Values: []string{r.pod.Namespace()}, + }, + }, + }, + ObjectSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: constants.ResourceIdLabelName, + Operator: metav1.LabelSelectorOpExists, + }, + }, + }, + SideEffects: common.Ptr(admissionregistrationv1.SideEffectClassNone), + AdmissionReviewVersions: []string{"v1"}, + }, + }, + }, metav1.CreateOptions{}) + return err +} + +func (r *Interceptor) Disable() (err error) { + return r.pod.ClientSet().AdmissionregistrationV1().MutatingWebhookConfigurations().Delete( + context.Background(), + fmt.Sprintf("%s-webhook-%s", r.pod.Name(), r.pod.Namespace()), + metav1.DeleteOptions{}) +} diff --git a/cmd/tcl/kubectl-testkube/devbox/devutils/namespace.go b/cmd/tcl/kubectl-testkube/devbox/devutils/namespace.go new file mode 100644 index 0000000000..8f0845ff4a --- /dev/null +++ b/cmd/tcl/kubectl-testkube/devbox/devutils/namespace.go @@ -0,0 +1,187 @@ +// Copyright 2024 Testkube. +// +// Licensed as a Testkube Pro file under the Testkube Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt + +package devutils + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" + k8serrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + + "github.com/kubeshop/testkube/internal/common" +) + +var ( + ErrNotDevboxNamespace = errors.New("selected namespace exists and is not devbox") +) + +type NamespaceObject struct { + name string + clientSet *kubernetes.Clientset + restConfig *rest.Config + namespace *corev1.Namespace +} + +func NewNamespace(kubeClient *kubernetes.Clientset, kubeRestConfig *rest.Config, name string) *NamespaceObject { + return &NamespaceObject{ + name: name, + clientSet: kubeClient, + restConfig: kubeRestConfig, + } +} + +func (n *NamespaceObject) Name() string { + return n.name +} + +func (n *NamespaceObject) ServiceAccountName() string { + return "devbox-account" +} + +func (n *NamespaceObject) Pod(name string) *PodObject { + return NewPod(n.clientSet, n.restConfig, n.name, name) +} + +func (n *NamespaceObject) create() error { + for { + namespace, err := n.clientSet.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: n.name, + Labels: map[string]string{ + "testkube.io/devbox": "namespace", + }, + }, + }, metav1.CreateOptions{}) + if err != nil { + if strings.Contains(err.Error(), "being deleted") { + time.Sleep(200 * time.Millisecond) + continue + } + if k8serrors.IsAlreadyExists(err) { + namespace, err = n.clientSet.CoreV1().Namespaces().Get(context.Background(), n.name, metav1.GetOptions{}) + if err != nil { + return err + } + if namespace.Labels["testkube.io/devbox"] != "namespace" { + return ErrNotDevboxNamespace + } + err = n.clientSet.CoreV1().Namespaces().Delete(context.Background(), n.name, metav1.DeleteOptions{ + GracePeriodSeconds: common.Ptr(int64(0)), + PropagationPolicy: common.Ptr(metav1.DeletePropagationForeground), + }) + if err != nil { + return err + } + continue + } + return errors.Wrap(err, "failed to create namespace") + } + n.namespace = namespace + return nil + } +} + +func (n *NamespaceObject) createServiceAccount() error { + // Create service account + serviceAccount, err := n.clientSet.CoreV1().ServiceAccounts(n.name).Create(context.Background(), &corev1.ServiceAccount{ + ObjectMeta: metav1.ObjectMeta{Name: n.ServiceAccountName()}, + }, metav1.CreateOptions{}) + if err != nil && !k8serrors.IsAlreadyExists(err) { + return errors.Wrap(err, "failed to create service account") + } + + // Create service account role + role, err := n.clientSet.RbacV1().Roles(n.name).Create(context.Background(), &rbacv1.Role{ + ObjectMeta: metav1.ObjectMeta{ + Name: "devbox-account-role", + }, + Rules: []rbacv1.PolicyRule{ + { + Verbs: []string{"get", "watch", "list", "create", "delete", "deletecollection"}, + APIGroups: []string{"batch"}, + Resources: []string{"jobs"}, + }, + { + Verbs: []string{"get", "watch", "list", "create", "patch", "update", "delete", "deletecollection"}, + APIGroups: []string{""}, + Resources: []string{"pods", "persistentvolumeclaims", "secrets", "configmaps"}, + }, + { + Verbs: []string{"get", "watch", "list"}, + APIGroups: []string{""}, + Resources: []string{"pods/log", "events"}, + }, + { + Verbs: []string{"get", "watch", "list", "create", "patch", "update", "delete", "deletecollection"}, + APIGroups: []string{"testworkflows.testkube.io"}, + Resources: []string{"testworkflows", "testworkflows/status", "testworkflowtemplates"}, + }, + }, + }, metav1.CreateOptions{}) + if err != nil && !k8serrors.IsAlreadyExists(err) { + return errors.Wrap(err, "failed to create roles") + } + + // Create service account role binding + _, err = n.clientSet.RbacV1().RoleBindings(n.name).Create(context.Background(), &rbacv1.RoleBinding{ + ObjectMeta: metav1.ObjectMeta{Name: "devbox-account-rb"}, + Subjects: []rbacv1.Subject{ + { + Kind: "ServiceAccount", + Name: serviceAccount.Name, + Namespace: n.name, + }, + }, + RoleRef: rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "Role", + Name: role.Name, + }, + }, metav1.CreateOptions{}) + if err != nil && !k8serrors.IsAlreadyExists(err) { + return errors.Wrap(err, "failed to create role bindings") + } + return nil +} + +func (n *NamespaceObject) Create() error { + if n.namespace != nil { + return nil + } + + if err := n.create(); err != nil { + return err + } + if err := n.createServiceAccount(); err != nil { + return err + } + return nil +} + +func (n *NamespaceObject) Destroy() error { + err := n.clientSet.CoreV1().Namespaces().Delete(context.Background(), n.name, metav1.DeleteOptions{ + GracePeriodSeconds: common.Ptr(int64(0)), + PropagationPolicy: common.Ptr(metav1.DeletePropagationForeground), + }) + if err != nil && !k8serrors.IsNotFound(err) { + return err + } + err = n.clientSet.AdmissionregistrationV1().MutatingWebhookConfigurations().DeleteCollection(context.Background(), metav1.DeleteOptions{}, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("testkube.io/devbox-name=%s", n.name), + }) + return err +} diff --git a/cmd/tcl/kubectl-testkube/devbox/devutils/pods.go b/cmd/tcl/kubectl-testkube/devbox/devutils/pods.go new file mode 100644 index 0000000000..c923083d6c --- /dev/null +++ b/cmd/tcl/kubectl-testkube/devbox/devutils/pods.go @@ -0,0 +1,228 @@ +// Copyright 2024 Testkube. +// +// Licensed as a Testkube Pro file under the Testkube Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt + +package devutils + +import ( + "context" + "fmt" + "sync" + "time" + + errors2 "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + + "github.com/kubeshop/testkube/internal/common" +) + +var ( + ErrPodNotFound = errors2.New("pod not found") +) + +type PodObject struct { + name string + namespace string + pod *corev1.Pod + service *corev1.Service + clientSet *kubernetes.Clientset + restConfig *rest.Config + + mu sync.Mutex +} + +func NewPod(kubeClient *kubernetes.Clientset, kubeRestConfig *rest.Config, namespace, name string) *PodObject { + return &PodObject{ + name: name, + namespace: namespace, + clientSet: kubeClient, + restConfig: kubeRestConfig, + } +} + +func (p *PodObject) Name() string { + return p.name +} + +func (p *PodObject) Namespace() string { + return p.namespace +} + +func (p *PodObject) Selector() metav1.LabelSelector { + return metav1.LabelSelector{ + MatchLabels: map[string]string{ + "testkube.io/devbox": p.name, + }, + } +} + +func (p *PodObject) ClientSet() *kubernetes.Clientset { + return p.clientSet +} + +func (p *PodObject) RESTConfig() *rest.Config { + return p.restConfig +} + +func (p *PodObject) Create(ctx context.Context, request *corev1.Pod) error { + p.mu.Lock() + defer p.mu.Unlock() + if p.pod != nil { + return nil + } + return p.create(ctx, request) +} + +func (p *PodObject) create(ctx context.Context, request *corev1.Pod) error { + request = request.DeepCopy() + request.Name = p.name + request.Namespace = p.namespace + request.ResourceVersion = "" + if len(request.Labels) == 0 { + request.Labels = make(map[string]string) + } + request.Labels["testkube.io/devbox"] = p.name + + pod, err := p.clientSet.CoreV1().Pods(p.namespace).Create(ctx, request, metav1.CreateOptions{}) + if errors.IsAlreadyExists(err) { + err = p.clientSet.CoreV1().Pods(p.namespace).Delete(ctx, request.Name, metav1.DeleteOptions{ + GracePeriodSeconds: common.Ptr(int64(0)), + PropagationPolicy: common.Ptr(metav1.DeletePropagationForeground), + }) + if err != nil && !errors.IsNotFound(err) { + return errors2.Wrap(err, "failed to delete existing pod") + } + pod, err = p.clientSet.CoreV1().Pods(p.namespace).Create(context.Background(), request, metav1.CreateOptions{}) + } + if err != nil { + return errors2.Wrap(err, "failed to create pod") + } + p.pod = pod + return nil +} + +func (p *PodObject) Pod() *corev1.Pod { + return p.pod +} + +func (p *PodObject) RefreshData(ctx context.Context) error { + p.mu.Lock() + defer p.mu.Unlock() + + pods, err := p.clientSet.CoreV1().Pods(p.namespace).List(ctx, metav1.ListOptions{ + LabelSelector: fmt.Sprintf("testkube.io/devbox=%s", p.name), + }) + if err != nil { + return err + } + if len(pods.Items) == 0 { + p.pod = nil + return ErrPodNotFound + } + p.pod = &pods.Items[0] + return nil +} + +func (p *PodObject) Watch(ctx context.Context) error { + panic("not implemented") +} + +func (p *PodObject) Restart(ctx context.Context) error { + p.mu.Lock() + defer p.mu.Unlock() + + pod := p.pod + if pod == nil { + return ErrPodNotFound + } + p.pod = nil + _ = p.clientSet.CoreV1().Pods(p.namespace).Delete(context.Background(), p.name, metav1.DeleteOptions{ + GracePeriodSeconds: common.Ptr(int64(0)), + PropagationPolicy: common.Ptr(metav1.DeletePropagationForeground), + }) + return p.create(context.Background(), pod) +} + +func (p *PodObject) WaitForReady(ctx context.Context) error { + for { + if p.pod != nil && len(p.pod.Status.ContainerStatuses) > 0 && p.pod.Status.ContainerStatuses[0].Ready { + return nil + } + time.Sleep(300 * time.Millisecond) + err := p.RefreshData(ctx) + if err != nil { + return err + } + } +} + +func (p *PodObject) WaitForContainerStarted(ctx context.Context) (err error) { + for { + if p.pod != nil && len(p.pod.Status.ContainerStatuses) > 0 && p.pod.Status.ContainerStatuses[0].Started != nil && *p.pod.Status.ContainerStatuses[0].Started { + return nil + } + time.Sleep(300 * time.Millisecond) + err := p.RefreshData(ctx) + if err != nil { + return err + } + } +} + +func (p *PodObject) ClusterIP() string { + if p.pod == nil { + return "" + } + return p.pod.Status.PodIP +} + +func (p *PodObject) ClusterAddress() string { + if p.service == nil { + return p.ClusterIP() + } + return fmt.Sprintf("%s.%s.svc", p.service.Name, p.service.Namespace) +} + +func (p *PodObject) CreateService(ctx context.Context, ports ...corev1.ServicePort) error { + request := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: p.name, + }, + Spec: corev1.ServiceSpec{ + Type: corev1.ServiceTypeClusterIP, + Selector: map[string]string{ + "testkube.io/devbox": p.name, + }, + Ports: ports, + }, + } + + svc, err := p.clientSet.CoreV1().Services(p.namespace).Create(ctx, request, metav1.CreateOptions{}) + if errors.IsAlreadyExists(err) { + err = p.clientSet.CoreV1().Services(p.namespace).Delete(ctx, request.Name, metav1.DeleteOptions{}) + if err != nil && !errors.IsNotFound(err) { + return errors2.Wrap(err, "failed to delete existing service") + } + svc, err = p.clientSet.CoreV1().Services(p.namespace).Create(ctx, request, metav1.CreateOptions{}) + } + if err != nil { + return err + } + p.service = svc + return nil +} + +func (p *PodObject) Forward(_ context.Context, clusterPort, localPort int, ping bool) error { + if p.pod == nil { + return ErrPodNotFound + } + return ForwardPod(p.restConfig, p.pod.Namespace, p.pod.Name, clusterPort, localPort, ping) +} diff --git a/cmd/testworkflow-init/commands/setup.go b/cmd/testworkflow-init/commands/setup.go index 4cd0b9ec1c..004b2eda41 100644 --- a/cmd/testworkflow-init/commands/setup.go +++ b/cmd/testworkflow-init/commands/setup.go @@ -9,10 +9,14 @@ import ( "github.com/kubeshop/testkube/cmd/testworkflow-init/data" "github.com/kubeshop/testkube/cmd/testworkflow-init/output" "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/action/actiontypes/lite" - constants2 "github.com/kubeshop/testkube/pkg/testworkflows/testworkflowprocessor/constants" "github.com/kubeshop/testkube/pkg/version" ) +// Moved from testworkflowprocessor/constants to reduce init process size +const ( + defaultInitImageBusyboxBinaryPath = "/.tktw-bin" +) + func Setup(config lite.ActionSetup) error { stdout := output.Std stdoutUnsafe := stdout.Direct() @@ -50,7 +54,7 @@ func Setup(config lite.ActionSetup) error { if config.CopyBinaries { // Use `cp` on the whole directory, as it has plenty of files, which lead to the same FS block. // Copying individual files will lead to high FS usage - err := exec.Command("cp", "-rf", constants2.DefaultInitImageBusyboxBinaryPath, data.InternalBinPath).Run() + err := exec.Command("cp", "-rf", defaultInitImageBusyboxBinaryPath, data.InternalBinPath).Run() if err != nil { stdoutUnsafe.Error(" error\n") stdoutUnsafe.Errorf(" failed to copy the binaries: %s\n", err.Error()) diff --git a/cmd/testworkflow-toolkit/env/client.go b/cmd/testworkflow-toolkit/env/client.go index 0f1b244af9..1031cb444c 100644 --- a/cmd/testworkflow-toolkit/env/client.go +++ b/cmd/testworkflow-toolkit/env/client.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "math" + "net/url" + "strconv" corev1 "k8s.io/api/core/v1" @@ -68,12 +70,20 @@ func ImageInspector() imageinspector.Inspector { } func Testkube() client.Client { + uri, err := url.Parse(config2.Config().Worker.Connection.LocalApiUrl) + host := config.APIServerName + port := config.APIServerPort + if err == nil { + host = uri.Hostname() + portStr, _ := strconv.ParseInt(uri.Port(), 10, 32) + port = int(portStr) + } if config2.UseProxy() { - return client.NewProxyAPIClient(Kubernetes(), client.NewAPIConfig(config2.Namespace(), config.APIServerName, config.APIServerPort)) + return client.NewProxyAPIClient(Kubernetes(), client.NewAPIConfig(config2.Namespace(), host, port)) } httpClient := phttp.NewClient(true) sseClient := phttp.NewSSEClient(true) - return client.NewDirectAPIClient(httpClient, sseClient, fmt.Sprintf("http://%s:%d", config.APIServerName, config.APIServerPort), "") + return client.NewDirectAPIClient(httpClient, sseClient, fmt.Sprintf("http://%s:%d", host, port), "") } func Cloud(ctx context.Context) (cloudexecutor.Executor, cloud.TestKubeCloudAPIClient) { diff --git a/go.mod b/go.mod index 259032470e..9cee87d6da 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,7 @@ require ( github.com/dustinkirkland/golang-petname v0.0.0-20191129215211-8e5a1ed0cff0 github.com/fasthttp/websocket v1.5.0 github.com/fluxcd/pkg/apis/event v0.2.0 + github.com/fsnotify/fsnotify v1.6.0 github.com/gabriel-vasile/mimetype v1.4.1 github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 github.com/gofiber/adaptor/v2 v2.1.29 @@ -63,6 +64,7 @@ require ( github.com/stretchr/testify v1.9.0 github.com/valyala/fasthttp v1.51.0 github.com/vektah/gqlparser/v2 v2.5.2-0.20230422221642-25e09f9d292d + github.com/wI2L/jsondiff v0.6.0 go.mongodb.org/mongo-driver v1.11.3 go.uber.org/zap v1.26.0 golang.org/x/exp v0.0.0-20230905200255-921286631fa9 @@ -118,7 +120,6 @@ require ( github.com/evanphx/json-patch/v5 v5.7.0 // indirect github.com/fatih/color v1.15.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-errors/errors v1.5.1 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -184,6 +185,7 @@ require ( github.com/rs/xid v1.4.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 // indirect + github.com/savioxavier/termlink v1.4.1 // indirect github.com/savsgio/gotils v0.0.0-20211223103454-d0aaa54c5899 // indirect github.com/segmentio/backo-go v1.0.0 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect @@ -192,6 +194,10 @@ require ( github.com/sirupsen/logrus v1.9.3 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/thlib/go-timezone-local v0.0.0-20210907160436-ef149e42d28e // indirect + github.com/tidwall/gjson v1.17.1 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.1 // indirect + github.com/tidwall/sjson v1.2.5 // indirect github.com/tklauser/go-sysconf v0.3.12 // indirect github.com/tklauser/numcpus v0.6.1 // indirect github.com/urfave/cli/v2 v2.24.4 // indirect diff --git a/go.sum b/go.sum index c2c4e83355..cd0cd7ac9e 100644 --- a/go.sum +++ b/go.sum @@ -556,6 +556,8 @@ github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/santhosh-tekuri/jsonschema/v5 v5.0.0 h1:TToq11gyfNlrMFZiYujSekIsPd9AmsA2Bj/iv+s4JHE= github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H/cDPdvJ/SZJQLWWXWGrZ0= +github.com/savioxavier/termlink v1.4.1 h1:pFcd+XH8iQjL+2mB4buCDUo+CMt5kKsr8jGG+VLfYAg= +github.com/savioxavier/termlink v1.4.1/go.mod h1:5T5ePUlWbxCHIwyF8/Ez1qufOoGM89RCg9NvG+3G3gc= github.com/savsgio/gotils v0.0.0-20211223103454-d0aaa54c5899 h1:Orn7s+r1raRTBKLSc9DmbktTT04sL+vkzsbRD2Q8rOI= github.com/savsgio/gotils v0.0.0-20211223103454-d0aaa54c5899/go.mod h1:oejLrk1Y/5zOF+c/aHtXqn3TFlzzbAgPWg8zBiAHDas= github.com/segmentio/analytics-go/v3 v3.2.1 h1:G+f90zxtc1p9G+WigVyTR0xNfOghOGs/PYAlljLOyeg= @@ -608,8 +610,17 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/thlib/go-timezone-local v0.0.0-20210907160436-ef149e42d28e h1:BuzhfgfWQbX0dWzYzT1zsORLnHRv3bcRcsaUk0VmXA8= github.com/thlib/go-timezone-local v0.0.0-20210907160436-ef149e42d28e/go.mod h1:/Tnicc6m/lsJE0irFMA0LfIwTBo4QP7A8IfyIv4zZKI= -github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/gjson v1.17.1 h1:wlYEnwqAHgzmhNUFfw7Xalt2JzQvsMx2Se4PcoFCT/U= +github.com/tidwall/gjson v1.17.1/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFAEVmqU= github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= @@ -629,6 +640,8 @@ github.com/vbatts/tar-split v0.11.3 h1:hLFqsOLQ1SsppQNTMpkpPXClLDfC2A3Zgy9OUU+RV github.com/vbatts/tar-split v0.11.3/go.mod h1:9QlHN18E+fEH7RdG+QAJJcuya3rqT7eXSTY7wGrAokY= github.com/vektah/gqlparser/v2 v2.5.2-0.20230422221642-25e09f9d292d h1:ibuD+jp4yLoOY4w8+5+2fDq0ufJ/noPn/cPntJMWB1E= github.com/vektah/gqlparser/v2 v2.5.2-0.20230422221642-25e09f9d292d/go.mod h1:mPgqFBu/woKTVYWyNk8cO3kh4S/f4aRFZrvOnp3hmCs= +github.com/wI2L/jsondiff v0.6.0 h1:zrsH3FbfVa3JO9llxrcDy/XLkYPLgoMX6Mz3T2PP2AI= +github.com/wI2L/jsondiff v0.6.0/go.mod h1:D6aQ5gKgPF9g17j+E9N7aasmU1O+XvfmWm1y8UMmNpw= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E= diff --git a/pkg/cloud/client/environments.go b/pkg/cloud/client/environments.go index e081eb98fb..3191a3bc22 100644 --- a/pkg/cloud/client/environments.go +++ b/pkg/cloud/client/environments.go @@ -18,6 +18,7 @@ func NewEnvironmentsClient(baseUrl, token, orgID string) *EnvironmentsClient { type Environment struct { Name string `json:"name"` Id string `json:"id"` + Slug string `json:"slug,omitempty"` Connected bool `json:"connected"` Owner string `json:"owner"` InstallCommand string `json:"installCommand,omitempty"`