Skip to content

Commit

Permalink
Use structured logging in marble-injector
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Weiße <[email protected]>
  • Loading branch information
daniel-weisse committed Nov 15, 2024
1 parent f16b632 commit 544da06
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 52 deletions.
9 changes: 5 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,11 @@ add_custom_target(sign-coordinator ALL DEPENDS coordinator-enclave.signed coordi
#

add_custom_target(marble-injector ALL
CGO_ENABLED=0
go build ${TRIMPATH}
-o ${CMAKE_BINARY_DIR}
-buildvcs=false
COMMAND
${CMAKE_COMMAND} -P ${CMAKE_SOURCE_DIR}/build_with_version.cmake
"go" "${PROJECT_VERSION}" "${CMAKE_BINARY_DIR}/marble-injector"
"main"
${TRIMPATH}
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/cmd/marble-injector
)

Expand Down
25 changes: 17 additions & 8 deletions cmd/marble-injector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,19 @@ import (
"crypto/tls"
"flag"
"fmt"
"log"
"net/http"

"github.com/edgelesssys/marblerun/injector"
"github.com/edgelesssys/marblerun/internal/logging"
"go.uber.org/zap"
)

// Version of the injector.
var Version = "0.0.0" // Don't touch! Automatically injected at build-time.

// GitCommit is the git commit hash.
var GitCommit = "0000000000000000000000000000000000000000" // Don't touch! Automatically injected at build-time.

func main() {
var certFile string
var keyFile string
Expand All @@ -30,12 +37,12 @@ func main() {

flag.Parse()

log := logging.New()
defer log.Sync() // flushes buffer, if any
log.Info("Starting marble-injector webhook", zap.String("version", Version), zap.String("commit", GitCommit))

mux := http.NewServeMux()
w := &injector.Mutator{
CoordAddr: addr,
DomainName: clusterDomain,
SGXResource: sgxResource,
}
w := injector.New(addr, clusterDomain, sgxResource, log)

mux.HandleFunc("/mutate", w.HandleMutate)

Expand All @@ -46,10 +53,12 @@ func main() {
TLSConfig: &tls.Config{
GetCertificate: loadWebhookCert(certFile, keyFile),
},
ErrorLog: logging.NewWrapper(log),
}

log.Println("Starting Server")
log.Fatal(s.ListenAndServeTLS("", ""))
log.Info("Starting Server")
err := s.ListenAndServeTLS("", "")
log.Fatal("Failed running server", zap.Error(err))
}

// loadWebhookCert loads the certificate and key file for the webhook server.
Expand Down
77 changes: 44 additions & 33 deletions injector/injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
"errors"
"fmt"
"io"
"log"
"net/http"
"strings"

"github.com/edgelesssys/marblerun/util/k8sutil"
"go.uber.org/zap"
v1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -35,51 +35,62 @@ const (

// Mutator struct.
type Mutator struct {
// CoordAddr contains the address of the MarbleRun coordinator
CoordAddr string
DomainName string
SGXResource string
// coordAddr contains the address of the MarbleRun coordinator
coordAddr string
domainName string
sgxResource string
log *zap.Logger
}

// New creates a new Mutator.
func New(coordAddr, domainName, sgxResource string, log *zap.Logger) *Mutator {
return &Mutator{
coordAddr: coordAddr,
domainName: domainName,
sgxResource: sgxResource,
log: log,
}
}

// HandleMutate handles mutate requests and injects sgx tolerations into the request.
func (m *Mutator) HandleMutate(w http.ResponseWriter, r *http.Request) {
log.Println("Handling mutate request, injecting sgx tolerations")
m.log.Info("Handling mutate request, injecting sgx tolerations")
body := checkRequest(w, r)
if body == nil {
// Error was already written to w
return
}

// mutate the request and add sgx tolerations to pod
mutatedBody, err := mutate(body, m.CoordAddr, m.DomainName, m.SGXResource)
mutatedBody, err := m.mutate(body)
if err != nil {
http.Error(w, fmt.Sprintf("unable to mutate request: %v", err), http.StatusInternalServerError)
http.Error(w, fmt.Sprintf("unable to mutate request: %s", err), http.StatusInternalServerError)
return
}

if _, err := w.Write(mutatedBody); err != nil {
http.Error(w, fmt.Sprintf("unable to write response: %v", err), http.StatusInternalServerError)
http.Error(w, fmt.Sprintf("unable to write response: %s", err), http.StatusInternalServerError)
return
}
}

// mutate handles the creation of json patches for pods.
func mutate(body []byte, coordAddr, domainName, resourceKey string) ([]byte, error) {
func (m *Mutator) mutate(body []byte) ([]byte, error) {
admReviewReq := v1.AdmissionReview{}
if err := json.Unmarshal(body, &admReviewReq); err != nil {
log.Println("Unable to mutate request: invalid admission review")
return nil, fmt.Errorf("invalid admission review: %v", err)
m.log.Error("Unable to mutate request: invalid admission review", zap.Error(err))
return nil, fmt.Errorf("invalid admission review: %w", err)
}

if admReviewReq.Request == nil {
log.Println("Unable to mutate request: empty admission review request")
m.log.Error("Unable to mutate request: empty admission review request")
return nil, errors.New("empty admission request")
}

var pod corev1.Pod
if err := json.Unmarshal(admReviewReq.Request.Object.Raw, &pod); err != nil {
log.Println("Unable to mutate request: invalid pod")
return nil, fmt.Errorf("invalid pod: %v", err)
m.log.Error("Unable to mutate request: invalid pod", zap.Error(err))
return nil, fmt.Errorf("invalid pod: %w", err)
}

// admission response
Expand All @@ -101,11 +112,11 @@ func mutate(body []byte, coordAddr, domainName, resourceKey string) ([]byte, err
marbleType, exists := pod.Labels[labelMarbleType]
if !exists {
// admission request was sent for a pod without marblerun/marbletype label, this should not happen
return generateResponse(pod, admReviewReq, admReviewResponse, false, fmt.Sprintf("Error: missing [%s] label, request denied", labelMarbleType))
return m.generateResponse(pod, admReviewReq, admReviewResponse, false, fmt.Sprintf("Missing [%s] label, request denied", labelMarbleType))
}
if len(marbleType) <= 0 {
// deny request if the label exists, but is empty
return generateResponse(pod, admReviewReq, admReviewResponse, false, fmt.Sprintf("Error: empty [%s] label, request denied", labelMarbleType))
return m.generateResponse(pod, admReviewReq, admReviewResponse, false, fmt.Sprintf("Empty [%s] label, request denied", labelMarbleType))
}

injectSgx := false
Expand All @@ -122,15 +133,15 @@ func mutate(body []byte, coordAddr, domainName, resourceKey string) ([]byte, err
newEnvVars := []corev1.EnvVar{
{
Name: envMarbleCoordinatorAddr,
Value: coordAddr,
Value: m.coordAddr,
},
{
Name: envMarbleType,
Value: marbleType,
},
{
Name: envMarbleDNSName,
Value: strings.ToLower(fmt.Sprintf("%s,%s.%s,%s.%s.svc.%s", marbleType, marbleType, namespace, marbleType, namespace, domainName)),
Value: strings.ToLower(fmt.Sprintf("%s,%s.%s,%s.%s.svc.%s", marbleType, marbleType, namespace, marbleType, namespace, m.domainName)),
},
}

Expand Down Expand Up @@ -185,19 +196,19 @@ func mutate(body []byte, coordAddr, domainName, resourceKey string) ([]byte, err
if container.Resources.Limits == nil {
container.Resources.Limits = make(map[corev1.ResourceName]resource.Quantity)
}
switch resourceKey {
switch m.sgxResource {
case k8sutil.IntelEpc.String():
// Intels device plugin offers 3 resources:
// epc : sets EPC for the container
// enclave : provides a handle to /dev/sgx_enclave
// provision : provides a handle to /dev/sgx_provision, this is not needed when the Marble utilises out-of-process quote-generation
setResourceLimit(container.Resources.Limits, k8sutil.IntelEpc, k8sutil.GetEPCResourceLimit(resourceKey))
setResourceLimit(container.Resources.Limits, k8sutil.IntelEpc, k8sutil.GetEPCResourceLimit(m.sgxResource))
setResourceLimit(container.Resources.Limits, k8sutil.IntelEnclave, "1")
setResourceLimit(container.Resources.Limits, k8sutil.IntelProvision, "1")
default:
// Azure and Alibaba Cloud plugins offer only 1 resource
// for custom plugins we can only inject the resource provided by the `resourceKey`
setResourceLimit(container.Resources.Limits, corev1.ResourceName(resourceKey), k8sutil.GetEPCResourceLimit(resourceKey))
setResourceLimit(container.Resources.Limits, corev1.ResourceName(m.sgxResource), k8sutil.GetEPCResourceLimit(m.sgxResource))
}
}

Expand Down Expand Up @@ -227,13 +238,13 @@ func mutate(body []byte, coordAddr, domainName, resourceKey string) ([]byte, err
// inject sgx tolerations if enabled
if injectSgx {
pod.Spec.Tolerations = append(pod.Spec.Tolerations, corev1.Toleration{
Key: resourceKey,
Key: m.sgxResource,
Operator: corev1.TolerationOpExists,
Effect: corev1.TaintEffectNoSchedule,
})
}

return generateResponse(pod, admReviewReq, admReviewResponse, true, fmt.Sprintf("Mutation request for pod of marble type [%s] successful", marbleType))
return m.generateResponse(pod, admReviewReq, admReviewResponse, true, fmt.Sprintf("Mutation request for pod of marble type [%s] successful", marbleType))
}

// checkRequest verifies the request used was POST and not empty.
Expand All @@ -251,7 +262,7 @@ func checkRequest(w http.ResponseWriter, r *http.Request) []byte {
body, err := io.ReadAll(r.Body)
defer r.Body.Close()
if err != nil {
http.Error(w, fmt.Sprintf("unable to read request: %v", err), http.StatusBadRequest)
http.Error(w, fmt.Sprintf("unable to read request: %s", err), http.StatusBadRequest)
return nil
}

Expand Down Expand Up @@ -279,16 +290,16 @@ func setResourceLimit(target map[corev1.ResourceName]resource.Quantity, key core
}

// generateResponse creates the admission response.
func generateResponse(pod corev1.Pod, request, response v1.AdmissionReview, allowed bool, message string) ([]byte, error) {
func (m *Mutator) generateResponse(pod corev1.Pod, request, response v1.AdmissionReview, allowed bool, message string) ([]byte, error) {
marshaledPod, err := json.Marshal(pod)
if err != nil {
log.Println("Error: unable to marshal patched pod")
return nil, fmt.Errorf("unable to marshal patched pod: %v", err)
m.log.Error("Unable to marshal patched pod", zap.Error(err))
return nil, fmt.Errorf("unable to marshal patched pod: %w", err)
}
resp := admission.PatchResponseFromRaw(request.Request.Object.Raw, marshaledPod)
if err := resp.Complete(admission.Request{AdmissionRequest: *request.Request}); err != nil {
log.Println("Error: patching failed")
return nil, fmt.Errorf("patching failed: %v", err)
m.log.Error("Patching failed", zap.Error(err))
return nil, fmt.Errorf("patching failed: %w", err)
}

response.Response = &resp.AdmissionResponse
Expand All @@ -306,11 +317,11 @@ func generateResponse(pod corev1.Pod, request, response v1.AdmissionReview, allo

bytes, err := json.Marshal(response)
if err != nil {
log.Println("Error: unable to marshal admission response")
return nil, fmt.Errorf("unable to marshal admission response: %v", err)
m.log.Error("Unable to marshal admission response", zap.Error(err))
return nil, fmt.Errorf("unable to marshal admission response: %w", err)
}

log.Println(message)
m.log.Info(message)

return bytes, nil
}
22 changes: 15 additions & 7 deletions injector/injector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"
v1 "k8s.io/api/admission/v1"
)

Expand Down Expand Up @@ -74,7 +75,8 @@ func TestMutatesValidRequest(t *testing.T) {
}`

// test if patch contains all desired values
response, err := mutate([]byte(rawJSON), "coordinator-mesh-api.marblerun:2001", "cluster.local", "kubernetes.azure.com/sgx_epc_mem_in_MiB")
m := New("coordinator-mesh-api.marblerun:2001", "cluster.local", "kubernetes.azure.com/sgx_epc_mem_in_MiB", zaptest.NewLogger(t))
response, err := m.mutate([]byte(rawJSON))
require.NoError(err, "failed to mutate request")

r := v1.AdmissionReview{}
Expand All @@ -94,7 +96,7 @@ func TestMutatesValidRequest(t *testing.T) {
assert.Contains(string(r.Response.Patch), `"path":"/spec/containers/0/resources","value":{}}`, "injected resources into the wrong pod")

// test if patch works without sgx values
response, err = mutate([]byte(strings.Replace(rawJSON, `"marblerun/resource-injection": "enabled"`, `"marblerun/resource-injection": "disabled"`, -1)), "coordinator-mesh-api.marblerun:2001", "cluster.local", "kubernetes.azure.com/sgx_epc_mem_in_MiB")
response, err = m.mutate([]byte(strings.Replace(rawJSON, `"marblerun/resource-injection": "enabled"`, `"marblerun/resource-injection": "disabled"`, -1)))
require.NoError(err, "failed to mutate request")
require.NoError(json.Unmarshal(response, &r), "failed to unmarshal response with error %s", err)
assert.NotContains(string(r.Response.Patch), `"op":"add","path":"/spec/containers/1/resources","value":{"limits":{"kubernetes.azure.com/sgx_epc_mem_in_MiB":"10"}}`, "patch contained sgx resources, but resources were not supposed to be set")
Expand Down Expand Up @@ -172,7 +174,8 @@ func TestPreSetValues(t *testing.T) {
}
}`

response, err := mutate([]byte(rawJSON), "coordinator-mesh-api.marblerun:2001", "cluster.local", "kubernetes.azure.com/sgx_epc_mem_in_MiB")
m := New("coordinator-mesh-api.marblerun:2001", "cluster.local", "kubernetes.azure.com/sgx_epc_mem_in_MiB", zaptest.NewLogger(t))
response, err := m.mutate([]byte(rawJSON))
require.NoError(err, "failed to mutate request")

r := v1.AdmissionReview{}
Expand Down Expand Up @@ -227,7 +230,8 @@ func TestRejectsUnsetMarbletype(t *testing.T) {
}
}`

response, err := mutate([]byte(rawJSON), "coordinator-mesh-api.marblerun:2001", "cluster.local", "kubernetes.azure.com/sgx_epc_mem_in_MiB")
m := New("coordinator-mesh-api.marblerun:2001", "cluster.local", "kubernetes.azure.com/sgx_epc_mem_in_MiB", zaptest.NewLogger(t))
response, err := m.mutate([]byte(rawJSON))
require.NoError(err, "failed to mutate request")

r := v1.AdmissionReview{}
Expand All @@ -240,7 +244,8 @@ func TestErrorsOnInvalid(t *testing.T) {

rawJSON := `This should return Error`

_, err := mutate([]byte(rawJSON), "coordinator-mesh-api.marblerun:2001", "cluster.local", "kubernetes.azure.com/sgx_epc_mem_in_MiB")
m := New("coordinator-mesh-api.marblerun:2001", "cluster.local", "kubernetes.azure.com/sgx_epc_mem_in_MiB", zaptest.NewLogger(t))
_, err := m.mutate([]byte(rawJSON))
require.Error(err, "did not fail on invalid request")
}

Expand All @@ -254,7 +259,9 @@ func TestErrorsOnInvalidPod(t *testing.T) {
"object": "invalid"
}
}`
_, err := mutate([]byte(rawJSON), "coordinator-mesh-api.marblerun:2001", "cluster.local", "kubernetes.azure.com/sgx_epc_mem_in_MiB")

m := New("coordinator-mesh-api.marblerun:2001", "cluster.local", "kubernetes.azure.com/sgx_epc_mem_in_MiB", zaptest.NewLogger(t))
_, err := m.mutate([]byte(rawJSON))
require.Error(err, "did not fail when sending invalid request")
}

Expand Down Expand Up @@ -321,7 +328,8 @@ func TestDoesNotCreateDoubleVolumeMounts(t *testing.T) {
}
}`

response, err := mutate([]byte(rawJSON), "coordinator-mesh-api.marblerun:2001", "cluster.local", "kubernetes.azure.com/sgx_epc_mem_in_MiB")
m := New("coordinator-mesh-api.marblerun:2001", "cluster.local", "kubernetes.azure.com/sgx_epc_mem_in_MiB", zaptest.NewLogger(t))
response, err := m.mutate([]byte(rawJSON))
require.NoError(err)

r := v1.AdmissionReview{}
Expand Down

0 comments on commit 544da06

Please sign in to comment.