Skip to content

Commit

Permalink
Add support for ephemeral and PV backed volumes.
Browse files Browse the repository at this point in the history
  • Loading branch information
hime committed Oct 16, 2024
1 parent 76e8a58 commit 70cae14
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 16 deletions.
4 changes: 4 additions & 0 deletions cmd/webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ func main() {
// Setup Informer
informerFactory := informers.NewSharedInformerFactory(client, resyncDuration)
nodeLister := informerFactory.Core().V1().Nodes().Lister()
pvcLister := informerFactory.Core().V1().PersistentVolumeClaims().Lister()
pvLister := informerFactory.Core().V1().PersistentVolumes().Lister()

informerFactory.Start(context.Done())
informerFactory.WaitForCacheSync(context.Done())
Expand Down Expand Up @@ -141,6 +143,8 @@ func main() {
Config: c,
Decoder: admission.NewDecoder(runtime.NewScheme()),
NodeLister: nodeLister,
PvLister: pvLister,
PvcLister: pvcLister,
ServerVersion: serverVersion,
},
})
Expand Down
79 changes: 79 additions & 0 deletions pkg/webhook/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
Copyright 2018 The Kubernetes Authors.
Copyright 2024 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package webhook

import (
"errors"

corev1 "k8s.io/api/core/v1"
)

func (si *SidecarInjector) IsPreprovisionCSIVolume(csiDriver string, pvc *corev1.PersistentVolumeClaim) (bool, *corev1.PersistentVolume, error) {
// IsPreprovisionCSIVolume checks whether the volume is a pre-provisioned volume for the desired csiDriver.
if csiDriver == "" {
return false, nil, errors.New("failed to check IsPreprovisionCSIVolume, csiDriver is empty")
}

if pvc == nil {
return false, nil, errors.New("failed to check IsPreprovisionCSIVolume, pvsi is nil")
}

if pvc.Spec.VolumeName == "" {
return false, nil, nil
}

// GetPV returns an error if pvc.Spec.VolumeName is not empty and the associated PV object is not found in the API server.
pv, err := si.GetPV(pvc.Spec.VolumeName)
if err != nil {
return false, nil, err // no additional context needed for error.
}

if pv.Spec.CSI == nil {
return false, nil, nil
}

// PV is using dynamic mounting. See details: https://cloud.google.com/storage/docs/gcsfuse-mount#dynamic-mount
if pv.Spec.CSI.VolumeHandle == "_" {
return false, nil, nil
}

if pv.Spec.CSI.Driver == csiDriver {
return true, pv, nil
}

// Returns false when PV - PVC pair was created for a different csi driver or different storage type.
return false, nil, nil
}

func (si *SidecarInjector) GetPV(name string) (*corev1.PersistentVolume, error) {
pv, err := si.PvLister.Get(name)
if err != nil {
return nil, err
}

return pv, nil
}

func (si *SidecarInjector) GetPVC(namespace, name string) (*corev1.PersistentVolumeClaim, error) {
pvc, err := si.PvcLister.PersistentVolumeClaims(namespace).Get(name)
if err != nil {
return nil, err
}

return pvc, nil
}
6 changes: 3 additions & 3 deletions pkg/webhook/injection.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ func injectSidecarContainer(pod *corev1.Pod, config *Config, supportsNativeSidec
}
}

func injectMetadataPrefetchSidecarContainer(pod *corev1.Pod, config *Config, supportsNativeSidecar bool) {
func (si *SidecarInjector) injectMetadataPrefetchSidecarContainer(pod *corev1.Pod, config *Config, supportsNativeSidecar bool) {
var containerSpec corev1.Container
var index int

if supportsNativeSidecar {
containerSpec = GetNativeMetadataPrefetchSidecarContainerSpec(pod, config)
containerSpec = si.GetNativeMetadataPrefetchSidecarContainerSpec(pod, config)
index = getInjectIndexAfterContainer(pod.Spec.InitContainers, SidecarContainerName)
} else {
containerSpec = GetMetadataPrefetchSidecarContainerSpec(pod, config)
containerSpec = si.GetMetadataPrefetchSidecarContainerSpec(pod, config)
index = getInjectIndexAfterContainer(pod.Spec.Containers, SidecarContainerName)
}

Expand Down
4 changes: 3 additions & 1 deletion pkg/webhook/mutatingwebhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type SidecarInjector struct {
Config *Config
Decoder admission.Decoder
NodeLister listersv1.NodeLister
PvcLister listersv1.PersistentVolumeClaimLister
PvLister listersv1.PersistentVolumeLister
ServerVersion *version.Version
}

Expand Down Expand Up @@ -112,7 +114,7 @@ func (si *SidecarInjector) Handle(_ context.Context, req admission.Request) admi
LogPodMutation(pod, config)

// Inject metadata prefetch sidecar.
injectMetadataPrefetchSidecarContainer(pod, config, supportsNativeSidecar)
si.injectMetadataPrefetchSidecarContainer(pod, config, supportsNativeSidecar)

marshaledPod, err := json.Marshal(pod)
if err != nil {
Expand Down
26 changes: 14 additions & 12 deletions pkg/webhook/sidecar_spec.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ func GetSidecarContainerSpec(c *Config) corev1.Container {
return container
}

func GetNativeMetadataPrefetchSidecarContainerSpec(pod *corev1.Pod, c *Config) corev1.Container {
container := GetMetadataPrefetchSidecarContainerSpec(pod, c)
func (si *SidecarInjector) GetNativeMetadataPrefetchSidecarContainerSpec(pod *corev1.Pod, c *Config) corev1.Container {
container := si.GetMetadataPrefetchSidecarContainerSpec(pod, c)
container.Env = append(container.Env, corev1.EnvVar{Name: "NATIVE_SIDECAR", Value: "TRUE"})
container.RestartPolicy = ptr.To(corev1.ContainerRestartPolicyAlways)

return container
}

func GetMetadataPrefetchSidecarContainerSpec(pod *corev1.Pod, c *Config) corev1.Container {
func (si *SidecarInjector) GetMetadataPrefetchSidecarContainerSpec(pod *corev1.Pod, c *Config) corev1.Container {
limits, requests := prepareResourceList(c)

// The sidecar container follows Restricted Pod Security Standard,
Expand Down Expand Up @@ -163,17 +163,19 @@ func GetMetadataPrefetchSidecarContainerSpec(pod *corev1.Pod, c *Config) corev1.
}

for _, v := range pod.Spec.Volumes {
if v.CSI == nil {
// We don't log because it can generate lots of trash.
continue
}
if v.CSI.Driver == "gcsfuse.csi.storage.gke.io" {
enableMetaPrefetch, err := ParseBool(v.CSI.VolumeAttributes["gcsfuseMetadataPrefetchOnMount"])
if err != nil {
klog.Errorf("failed to parse bool %v", enableMetaPrefetch)

if b, volumeAttributes, _ := si.isGcsFuseCSIVolume(v, pod.Namespace); b {
enableMetaPrefetchRaw, ok := volumeAttributes["gcsfuseMetadataPrefetchOnMount"]
// We disable metadata prefetch by default, so we
// skip injection of volume mount when not set.
if !ok {
continue
}

enableMetaPrefetch, err := ParseBool(enableMetaPrefetchRaw)
if err != nil {
klog.Errorf(`failed to determine if metadata prefetch is needed for volume "%s": %v`, v.Name, err)
}

if enableMetaPrefetch {
container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{Name: v.Name, MountPath: filepath.Join("/volumes/", v.Name), ReadOnly: true})
}
Expand Down
64 changes: 64 additions & 0 deletions pkg/webhook/volumes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
Copyright 2018 The Kubernetes Authors.
Copyright 2022 Google LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package webhook

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
)

const (
gcsFuseCsiDriverName = "gcsfuse.csi.storage.gke.io"
)

func (si *SidecarInjector) isGcsFuseCSIVolume(volume corev1.Volume, namespace string) (bool, map[string]string, error) {
// Check if it is ephemeral volume.
if volume.CSI != nil {
if volume.CSI.Driver == gcsFuseCsiDriverName {
return true, volume.CSI.VolumeAttributes, nil
}

return false, nil, nil
}

// Check if it's a persistent volume.
pvc := volume.PersistentVolumeClaim
if pvc == nil {
return false, nil, nil
}
pvcName := pvc.ClaimName
pvcObj, err := si.GetPVC(namespace, pvcName)
if err != nil {
return false, nil, err
}

// Check if the PVC is a preprovisioned parallelstore volume.
isPreprovisionGcsFuseCsi, pv, err := si.IsPreprovisionCSIVolume(gcsFuseCsiDriverName, pvcObj)
if err != nil {
klog.Warningf("unable to determine if PVC %s/%s is a pre-provisioned gcsfuse volume: %v", namespace, pvcName, err)

return false, nil, nil
}
if isPreprovisionGcsFuseCsi {
return true, pv.Spec.CSI.VolumeAttributes, nil
}

klog.Infof("PVC %s/%s is not referring to a pre-provisioned gcsfuse volume", namespace, pvcName)

return false, nil, nil
}

0 comments on commit 70cae14

Please sign in to comment.