Skip to content

Commit

Permalink
Native sidecar suppport for k8s >= 1.29
Browse files Browse the repository at this point in the history
  • Loading branch information
hime committed Feb 22, 2024
1 parent efe0b79 commit e214623
Show file tree
Hide file tree
Showing 10 changed files with 559 additions and 50 deletions.
19 changes: 15 additions & 4 deletions cmd/sidecar_mounter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
Expand Down Expand Up @@ -121,12 +122,12 @@ func main() {
signal.Notify(c, syscall.SIGTERM)
klog.Info("waiting for SIGTERM signal...")

// Monitor the exit file.
// If the exit file is detected, send a syscall.SIGTERM signal to the signal channel.
go func() {
// Function that monitors the exit file used in regular sidecar containers.
monitorExitFile := func() {
ticker := time.NewTicker(5 * time.Second)
for {
<-ticker.C
// If exit file is detected, send a syscall.SIGTERM signal to the signal channel.
if _, err := os.Stat(*volumeBasePath + "/exit"); err == nil {
klog.Info("all the other containers terminated in the Pod, exiting the sidecar container.")

Expand All @@ -143,7 +144,17 @@ func main() {
return
}
}
}()
}

envVar := os.Getenv("NATIVE_SIDECAR")
isNativeSidecar, err := strconv.ParseBool(envVar)
if envVar != "" && err != nil {
klog.Warningf(`env variable "%s" could not be converted to boolean`, envVar)
}
// When the pod contains a regular container, we monitor for the exit file.
if !isNativeSidecar {
go monitorExitFile()
}

<-c // blocking the process
klog.Info("received SIGTERM signal, waiting for all the gcsfuse processes exit...")
Expand Down
33 changes: 28 additions & 5 deletions cmd/webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ package main
import (
"flag"
"net/http"
"time"

"github.com/go-logr/logr"
wh "github.com/googlecloudplatform/gcs-fuse-csi-driver/pkg/webhook"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -65,9 +68,28 @@ func main() {
// Load webhook config
c := wh.LoadConfig(*sidecarImage, *imagePullPolicy, *cpuRequest, *cpuLimit, *memoryRequest, *memoryLimit, *ephemeralStorageRequest, *ephemeralStorageLimit)

// Load config for manager, informers, listers
kubeConfig := config.GetConfigOrDie()

// Setup client
client, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
klog.Fatalf("Unable to get clientset: %v", err)
}

// Setup stop channel
stopCh := signals.SetupSignalHandler()

// Setup Informer
informerFactory := informers.NewSharedInformerFactory(client, time.Duration(1))
nodeLister := informerFactory.Core().V1().Nodes().Lister()

informerFactory.Start(stopCh.Done())
informerFactory.WaitForCacheSync(stopCh.Done())

// Setup a Manager
klog.Info("Setting up manager.")
mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{
mgr, err := manager.New(kubeConfig, manager.Options{
HealthProbeBindAddress: *healthProbeBindAddress,
ReadinessEndpointName: "/readyz",
WebhookServer: webhook.NewServer(webhook.Options{
Expand All @@ -94,14 +116,15 @@ func main() {
klog.Info("Registering webhooks to the webhook server.")
hookServer.Register("/inject", &webhook.Admission{
Handler: &wh.SidecarInjector{
Client: mgr.GetClient(),
Config: c,
Decoder: admission.NewDecoder(runtime.NewScheme()),
Client: mgr.GetClient(),
Config: c,
Decoder: admission.NewDecoder(runtime.NewScheme()),
NodeLister: nodeLister,
},
})

klog.Info("Starting manager.")
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
if err := mgr.Start(stopCh); err != nil {
klog.Fatalf("Unable to run manager: %v", err)
}
}
2 changes: 2 additions & 0 deletions deploy/base/webhook/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ spec:
runAsGroup: 2079
seccompProfile:
type: RuntimeDefault
priorityClassName: csi-gcp-gcs-webhook
serviceAccount: gcsfusecsi-webhook-sa
containers:
- name: gcs-fuse-csi-driver-webhook
securityContext:
Expand Down
3 changes: 2 additions & 1 deletion deploy/base/webhook/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ kind: Kustomization
namespace: gcs-fuse-csi-driver
resources:
- deployment.yaml
- mutatingwebhook.yaml
- mutatingwebhook.yaml
- webhook_setup.yaml
50 changes: 50 additions & 0 deletions deploy/base/webhook/webhook_setup.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Copyright 2018 The Kubernetes Authors.
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

---
##### Webhook Service Account, Roles, RoleBindings
apiVersion: v1
kind: ServiceAccount
metadata:
name: gcsfusecsi-webhook-sa
---
apiVersion: scheduling.k8s.io/v1
kind: PriorityClass
metadata:
name: csi-gcp-gcs-webhook
value: 900001000
globalDefault: false
description: "This priority class should be used for the Cloud Storage FUSE CSI driver webhook deployment only."
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: gcs-fuse-csi-webhook-role
rules:
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get","list","watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: gcs-fuse-csi-webhook-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: gcs-fuse-csi-webhook-role
subjects:
- kind: ServiceAccount
name: gcsfusecsi-webhook-sa
20 changes: 15 additions & 5 deletions pkg/csi_driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
mount "k8s.io/mount-utils"
)
Expand Down Expand Up @@ -157,7 +158,7 @@ func (s *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublish
// Since the webhook mutating ordering is not definitive,
// the sidecar position is not checked in the ValidatePodHasSidecarContainerInjected func.
shouldInjectedByWebhook := strings.ToLower(pod.Annotations[webhook.AnnotationGcsfuseVolumeEnableKey]) == "true"
sidecarInjected := webhook.ValidatePodHasSidecarContainerInjected(pod, false)
sidecarInjected, isInitContainer := webhook.ValidatePodHasSidecarContainerInjected(pod, false)
if !sidecarInjected {
if shouldInjectedByWebhook {
return nil, status.Error(codes.Internal, "the webhook failed to inject the sidecar container into the Pod spec")
Expand All @@ -175,9 +176,10 @@ func (s *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublish
// Check if the sidecar container is still required,
// if not, put an exit file to the emptyDir path to
// notify the sidecar container to exit.
// This check will be unnecessary when the Kubernetes sidecar container feature is adopted.
if err := putExitFile(pod, emptyDirBasePath); err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
if !isInitContainer {
if err := putExitFile(pod, emptyDirBasePath); err != nil {
return nil, status.Errorf(codes.Internal, err.Error())
}
}

// Check if there is any error from the sidecar container
Expand Down Expand Up @@ -211,8 +213,16 @@ func (s *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublish
return nil, status.Errorf(code, "the sidecar container failed with error: %v", errMsgStr)
}

var list []corev1.ContainerStatus
// Use ContainerStatuses or InitContainerStatuses
if isInitContainer {
list = pod.Status.InitContainerStatuses
} else {
list = pod.Status.ContainerStatuses
}

// Check if the sidecar container terminated
for _, cs := range pod.Status.ContainerStatuses {
for _, cs := range list {
if cs.Name != webhook.SidecarContainerName {
continue
}
Expand Down
82 changes: 75 additions & 7 deletions pkg/webhook/mutatingwebhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ import (
"fmt"
"net/http"
"strings"
"unicode"

v1 "k8s.io/api/admission/v1"
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/util/parsers"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -46,8 +49,9 @@ const (
type SidecarInjector struct {
Client client.Client
// default sidecar container config values, can be overwritten by the pod annotations
Config *Config
Decoder *admission.Decoder
Config *Config
Decoder *admission.Decoder
NodeLister listersv1.NodeLister
}

// Handle injects a gcsfuse sidecar container and a emptyDir to incoming qualified pods.
Expand All @@ -60,7 +64,7 @@ func (si *SidecarInjector) Handle(_ context.Context, req admission.Request) admi
return admission.Errored(http.StatusBadRequest, err)
}

if req.Operation != v1.Create {
if req.Operation != admissionv1.Create {
return admission.Allowed(fmt.Sprintf("No injection required for operation %v.", req.Operation))
}

Expand All @@ -78,7 +82,8 @@ func (si *SidecarInjector) Handle(_ context.Context, req admission.Request) admi
return admission.Errored(http.StatusBadRequest, fmt.Errorf("the acceptable values for %q are 'True', 'true', 'false' or 'False'", AnnotationGcsfuseVolumeEnableKey))
}

if ValidatePodHasSidecarContainerInjected(pod, true) {
sidecarInjected, _ := ValidatePodHasSidecarContainerInjected(pod, true)
if sidecarInjected {
return admission.Allowed("The sidecar container was injected, no injection required.")
}

Expand All @@ -97,8 +102,20 @@ func (si *SidecarInjector) Handle(_ context.Context, req admission.Request) admi

klog.Infof("mutating Pod: Name %q, GenerateName %q, Namespace %q, sidecar image %q, CPU request %q, CPU limit %q, memory request %q, memory limit %q, ephemeral storage request %q, ephemeral storage limit %q",
pod.Name, pod.GenerateName, pod.Namespace, config.ContainerImage, &config.CPURequest, &config.CPULimit, &config.MemoryRequest, &config.MemoryLimit, &config.EphemeralStorageRequest, &config.EphemeralStorageLimit)
// the gcsfuse sidecar container has to before the containers that consume the gcsfuse volume
pod.Spec.Containers = append([]corev1.Container{GetSidecarContainerSpec(config)}, pod.Spec.Containers...)

// Check support for native sidecar.
supportsNativeSidecar, err := si.supportsNativeSidecar()
if err != nil {
return admission.Errored(http.StatusInternalServerError, fmt.Errorf("failed to verify native sidecar support: %w", err))
}

// Inject container.
if supportsNativeSidecar {
pod.Spec.InitContainers = append([]corev1.Container{GetNativeSidecarContainerSpec(config)}, pod.Spec.InitContainers...)
} else {
pod.Spec.Containers = append([]corev1.Container{GetSidecarContainerSpec(config)}, pod.Spec.Containers...)
}

pod.Spec.Volumes = append(GetSidecarContainerVolumeSpec(pod.Spec.Volumes), pod.Spec.Volumes...)
marshaledPod, err := json.Marshal(pod)
if err != nil {
Expand Down Expand Up @@ -174,3 +191,54 @@ func parseSidecarContainerImage(pod *corev1.Pod) (string, error) {

return image, nil
}

func (si *SidecarInjector) supportsNativeSidecar() (bool, error) {
list, err := si.NodeLister.List(labels.Everything())
if err != nil {
return false, fmt.Errorf("failed to get cluster nodes: %w", err)
}

supportsNativeSidecar := true
for _, node := range list {
version, err := getRelease(node.Status.NodeInfo.KubeletVersion)
if version < "1.29" || err != nil {
if err != nil {
klog.Errorf(`invalid node gke version: could not get node "%s" k8s release from version "%s": "%v"`, node.Name, version, err)
}
supportsNativeSidecar = false

break
}
}

return supportsNativeSidecar, nil
}

func getRelease(version string) (string, error) {
// GKE version format example.
// - "1.23.4-gke.1234"
if len(version) > 0 && strings.ToLower(version[:1]) == "v" {
version = version[1:]
}
if version == "" || version == "-" {
return "", fmt.Errorf(`the k8s version "%s" provided is not valid`, version)
}

k8sVersion := strings.Split(version, "-")
k8sBreakadown := strings.Split(k8sVersion[0], ".")

for _, numbers := range k8sBreakadown {
for _, digit := range numbers {
if !unicode.IsDigit(digit) {
return "", fmt.Errorf(`character "%c" in "%s" is not a number`, digit, version)
}
}
}

// Check if it contains at least two numbers like ["1", "28", ...] to make "1.28"
if len(k8sBreakadown) < 2 {
return "", fmt.Errorf(`the k8s version "%s" provided in "%s" is not valid`, k8sVersion[0], version)
}

return k8sBreakadown[0] + "." + k8sBreakadown[1], nil
}
Loading

0 comments on commit e214623

Please sign in to comment.