Skip to content

Commit

Permalink
feat: add devbox command (#5971)
Browse files Browse the repository at this point in the history
* feat: add initial `testkube devbox` command
* feat: clean up development tool a bit
* fix: GZip the binaries before sending
* fix: small issues with devbox, add dashboard link, add README
* feat: parallelize devbox better
* chore: add links for CRD Sync workflows and templates
* chore: adjust devbox messages
* chore: reduce size of binaries for devbox
* fix: reduce Init Process size from 35MB to 5MB
* chore: round time in devbox
* fix: generate properly slug for devbox environment
* chore: add option to open dashboard
* fix: delete debug
* chore: avoid logs for canceled operation
* chore: increase timeout for build bucket
* fix: restarting pod
* fix: restarting pod
* chore: clean messages and add transfer size
* feat: add basic binary storage to avoid transferring too much data in devbox
* feat: make BinaryPatch more stable
* fix: make binary patch more stable
* chore: adjust a bit binary patch constants
* chore: reuse buffer better
* fix: corner cases where binary patch was stuck
* fix: avoid unnecessary container for agent in devbox
  • Loading branch information
rangoo94 authored Oct 31, 2024
1 parent 7510bab commit 23a55eb
Show file tree
Hide file tree
Showing 24 changed files with 3,725 additions and 6 deletions.
3 changes: 3 additions & 0 deletions cmd/kubectl-testkube/commands/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/common/validator"
"github.com/kubeshop/testkube/cmd/kubectl-testkube/commands/pro"
"github.com/kubeshop/testkube/cmd/kubectl-testkube/config"
"github.com/kubeshop/testkube/cmd/tcl/kubectl-testkube/devbox"
"github.com/kubeshop/testkube/pkg/telemetry"
"github.com/kubeshop/testkube/pkg/ui"
)
Expand Down Expand Up @@ -65,6 +66,8 @@ func init() {
RootCmd.AddCommand(NewDockerCmd())
RootCmd.AddCommand(pro.NewLoginCmd())

RootCmd.AddCommand(devbox.NewDevBoxCommand())

RootCmd.SetHelpCommand(NewHelpCmd())
}

Expand Down
227 changes: 227 additions & 0 deletions cmd/tcl/devbox-binary-storage/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Copyright 2024 Testkube.
//
// Licensed as a Testkube Pro file under the Testkube Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/kubeshop/testkube/blob/main/licenses/TCL.txt

package main

import (
"bytes"
"compress/gzip"
"crypto/sha256"
"fmt"
"io"
"net/http"
"os"
"os/signal"
"path/filepath"
"strings"
"sync"
"syscall"

"github.com/dustin/go-humanize"

"github.com/kubeshop/testkube/cmd/tcl/kubectl-testkube/devbox/devutils"
)

var (
locks = make(map[string]*sync.RWMutex)
locksMu sync.Mutex
hashCache = make(map[string]string)
)

func getLock(filePath string) *sync.RWMutex {
locksMu.Lock()
defer locksMu.Unlock()
if locks[filePath] == nil {
locks[filePath] = new(sync.RWMutex)
}
return locks[filePath]
}

func rebuildHash(filePath string) {
hashCache[filePath] = ""
f, err := os.Open(filePath)
if err != nil {
return
}
defer f.Close()

h := sha256.New()
if _, err := io.Copy(h, f); err == nil {
hashCache[filePath] = fmt.Sprintf("%x", h.Sum(nil))
}
}

func getHash(filePath string) string {
if hashCache[filePath] == "" {
rebuildHash(filePath)
}
return hashCache[filePath]
}

func main() {
storagePath := "/storage"
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})

http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
filePath := filepath.Clean(strings.TrimPrefix(r.URL.Path, "/"))
if filePath == "" {
w.WriteHeader(http.StatusNotFound)
return
}
localPath := filepath.Join(storagePath, filePath)
if r.Method == http.MethodGet {
getLock(filePath).RLock()
defer getLock(filePath).RUnlock()

file, err := os.Open(localPath)
if err != nil {
w.WriteHeader(http.StatusNotFound)
return
}
stat, err := file.Stat()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Length", fmt.Sprintf("%d", stat.Size()))
w.WriteHeader(http.StatusOK)
io.Copy(w, file)
return
} else if r.Method == http.MethodPost {
getLock(filePath).Lock()
defer getLock(filePath).Unlock()

body, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("failed reading body", err)
return
}
if r.ContentLength != int64(len(body)) {
w.WriteHeader(http.StatusBadRequest)
return
}
if r.Header.Get("Content-Encoding") == "gzip" {
gz, err := gzip.NewReader(bytes.NewBuffer(body))
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("failed reading body into gzip", err)
return
}
body, err = io.ReadAll(gz)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("failed reading back data from gzip stream", err)
return
}
}

err = os.WriteFile(localPath, body, 0666)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("failed to write file", err)
return
}

h := sha256.New()
if _, err := io.Copy(h, bytes.NewBuffer(body)); err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("failed to build hash", err)
}
hashCache[filePath] = fmt.Sprintf("%x", h.Sum(nil))

fmt.Println("saved file", filePath, humanize.Bytes(uint64(len(body))))
w.WriteHeader(http.StatusOK)
return
} else if r.Method == http.MethodPatch {
getLock(filePath).Lock()
defer getLock(filePath).Unlock()

body, err := io.ReadAll(r.Body)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("failed reading body", err)
return
}
if r.ContentLength != int64(len(body)) {
w.WriteHeader(http.StatusBadRequest)
return
}
if r.Header.Get("Content-Encoding") == "gzip" {
gz, err := gzip.NewReader(bytes.NewBuffer(body))
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("failed reading body into gzip", err)
return
}
body, err = io.ReadAll(gz)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("failed reading back data from gzip stream", err)
return
}
}

// Verify if patch can be applied
if r.Header.Get("X-Prev-Hash") != getHash(filePath) {
w.WriteHeader(http.StatusConflict)
return
}

// Apply patch
prevFile, err := os.ReadFile(localPath)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("failed reading existing file", err)
return
}
patch := devutils.NewBinaryPatchFromBytes(body)
file := patch.Apply(prevFile)

h := sha256.New()
if _, err := io.Copy(h, bytes.NewBuffer(file)); err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("failed to build hash", err)
return
}

// Validate hash
nextHash := fmt.Sprintf("%x", h.Sum(nil))
if r.Header.Get("X-Hash") != nextHash {
w.WriteHeader(http.StatusBadRequest)
fmt.Println("after applying patch result has different hash than expected", err)
return
}
fmt.Println("Expected hash", r.Header.Get("X-Hash"), "got", nextHash)
err = os.WriteFile(localPath, file, 0666)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
fmt.Println("failed to write file", err)
return
}
hashCache[filePath] = nextHash
w.WriteHeader(http.StatusOK)
return
}
w.WriteHeader(http.StatusMethodNotAllowed)
})

stopSignal := make(chan os.Signal, 1)
signal.Notify(stopSignal, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-stopSignal
os.Exit(0)
}()

fmt.Println("Starting server...")

panic(http.ListenAndServe(":8080", nil))
}
Loading

0 comments on commit 23a55eb

Please sign in to comment.