Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support read ahead configs through mount options. #416

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/csi_driver/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ FROM distroless-$TARGETARCH AS output-image
# Copy the mount/umount binaries
COPY --from=debian /bin/mount /bin/mount
COPY --from=debian /bin/umount /bin/umount
COPY --from=debian /bin/mountpoint /bin/mountpoint
COPY --from=debian /bin/cat /bin/cat

# Copy shared libraries into distroless base.
COPY --from=debian ${LD_LINUX_FILE} ${LIB_DIR}
Expand All @@ -68,6 +70,8 @@ COPY --from=debian /usr/bin/ldd /usr/bin/ldd
SHELL ["/bin/bash", "-c"]
RUN if ldd /bin/mount | grep "not found"; then echo "!!! Missing deps for mount command !!!" && exit 1; fi
RUN if ldd /bin/umount | grep "not found"; then echo "!!! Missing deps for umount command !!!" && exit 1; fi
RUN if ldd /bin/mountpoint | grep "not found"; then echo "!!! Missing deps for mountpoint command !!!" && exit 1; fi
RUN if ldd /bin/cat | grep "not found"; then echo "!!! Missing deps for cat command !!!" && exit 1; fi

# Final build stage, create the real Docker image with ENTRYPOINT
FROM output-image
Expand Down
6 changes: 6 additions & 0 deletions deploy/base/node/node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ spec:
mountPath: /csi
- name: fuse-socket-dir
mountPath: /sockets
- name: host-sysfs
mountPath: /sys
- name: csi-driver-registrar
securityContext:
readOnlyRootFilesystem: true
Expand Down Expand Up @@ -121,6 +123,10 @@ spec:
type: DirectoryOrCreate
- name: fuse-socket-dir
emptyDir: {}
- name: host-sysfs
hostPath:
path: /sys
type: Directory
# https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/
# See "special case". This will tolerate everything. Node component should
# be scheduled on all nodes.
Expand Down
110 changes: 106 additions & 4 deletions pkg/csi_mounter/csi_mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import (
"fmt"
"net"
"os"
"os/exec"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"syscall"
Expand All @@ -38,7 +41,18 @@ import (
"k8s.io/mount-utils"
)

const socketName = "socket"
const (
socketName = "socket"
readAheadKBMountFlagRegexPattern = "^read_ahead_kb=(.+)$"
maxRatioMountFlagRegexPattern = "^max_ratio=(.+)$"
readAheadKBMountFlag = "read_ahead_kb"
maxRatioMountFlag = "max_ratio"
)

var (
readAheadKBMountFlagRegex = regexp.MustCompile(readAheadKBMountFlagRegexPattern)
maxRatioMountFlagRegex = regexp.MustCompile(maxRatioMountFlagRegexPattern)
)

// Mounter provides the Cloud Storage FUSE CSI implementation of mount.Interface
// for the linux platform.
Expand Down Expand Up @@ -68,7 +82,10 @@ func (m *Mounter) Mount(source string, target string, fstype string, options []s
m.mux.Lock()
defer m.mux.Unlock()

csiMountOptions, sidecarMountOptions := prepareMountOptions(options)
csiMountOptions, sidecarMountOptions, sysfsBDI, err := prepareMountOptions(options)
if err != nil {
return err
}

// Prepare sidecar mounter MountConfig
mc := sidecarmounter.MountConfig{
Expand Down Expand Up @@ -96,6 +113,17 @@ func (m *Mounter) Mount(source string, target string, fstype string, options []s
return fmt.Errorf("failed to mount the fuse filesystem: %w", err)
}

if len(sysfsBDI) != 0 {
go func() {
// updateSysfsConfig may hang until the file descriptor (fd) is either consumed or canceled.
// It will succeed once dfuse finishes the mount process, or it will fail if dfuse fails
// or the mount point is cleaned up due to mounting failures.
if err := updateSysfsConfig(target, sysfsBDI); err != nil {
klog.Errorf("%v failed to update read_ahead_kb or max_ratio: %v", logPrefix, err)
}
}()
}

listener, err := m.createSocket(target, logPrefix)
if err != nil {
// If mount failed at this step,
Expand Down Expand Up @@ -125,6 +153,53 @@ func (m *Mounter) Mount(source string, target string, fstype string, options []s
return nil
}

// updateSysfsConfig modifies the kernel page cache settings based on the read_ahead_kb or max_ratio provided in the mountOption,
// and verifies that the values are successfully updated after the operation completes.
func updateSysfsConfig(targetMountPath string, sysfsBDI map[string]int64) error {
// Command will hang until mount completes.
cmd := exec.Command("mountpoint", "-d", targetMountPath)
output, err := cmd.CombinedOutput()
if err != nil {
klog.Errorf("Error executing mountpoint command on target path %s: %v", targetMountPath, err)
var exitError *exec.ExitError
if errors.As(err, &exitError) {
klog.Errorf("Exit code: %d", exitError.ExitCode())
}

return err
}

outputStr := strings.TrimSpace(string(output))
klog.Infof("Output of mountpoint for target mount path %s: %s", targetMountPath, output)

for key, value := range sysfsBDI {
writeErr := func(targetDevice, key string) error {
// Update the target value.
sysfsBDIPath := filepath.Join("/sys/class/bdi/", targetDevice, key)
file, err := os.OpenFile(sysfsBDIPath, os.O_WRONLY|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("failed to open file %q: %w", sysfsBDIPath, err)
}
defer file.Close()

_, err = file.Write([]byte(fmt.Sprintf("%d\n", value)))

if err != nil {
return fmt.Errorf("failed to write to file %q: %w", "echo", err)
}

klog.Infof("Updated %s to %d", sysfsBDIPath, value)

return nil
}(outputStr, key)
if writeErr != nil {
return err
}
}

return nil
}

func (m *Mounter) UnmountWithForce(target string, umountTimeout time.Duration) error {
m.cleanupSocket(target)

Expand Down Expand Up @@ -216,7 +291,7 @@ func startAcceptConn(l net.Listener, logPrefix string, msg []byte, fd int, cance
klog.V(4).Infof("%v exiting the listener goroutine.", logPrefix)
}

func prepareMountOptions(options []string) ([]string, []string) {
func prepareMountOptions(options []string) ([]string, []string, map[string]int64, error) {
allowedOptions := map[string]bool{
"exec": true,
"noexec": true,
Expand Down Expand Up @@ -248,6 +323,7 @@ func prepareMountOptions(options []string) ([]string, []string) {
}
}

sysfsBDI := make(map[string]int64)
for _, o := range optionSet.List() {
if strings.HasPrefix(o, "o=") {
v := o[2:]
Expand All @@ -258,7 +334,33 @@ func prepareMountOptions(options []string) ([]string, []string) {
}
optionSet.Delete(o)
}

if readAheadKB := readAheadKBMountFlagRegex.FindStringSubmatch(o); len(readAheadKB) == 2 {
// There is only one matching pattern in readAheadKBMountFlagRegex
// If found, it will be at index 1
readAheadKBInt, err := strconv.ParseInt(readAheadKB[1], 10, 0)
if err != nil {
return nil, nil, nil, fmt.Errorf("invalid read_ahead_kb mount flag %q: %w", o, err)
}
if readAheadKBInt < 0 {
return nil, nil, nil, fmt.Errorf("invalid negative value for read_ahead_kb mount flag: %q", o)
}
sysfsBDI[readAheadKBMountFlag] = readAheadKBInt
optionSet.Delete(o)
}

if maxRatio := maxRatioMountFlagRegex.FindStringSubmatch(o); len(maxRatio) == 2 {
maxRatioInt, err := strconv.ParseInt(maxRatio[1], 10, 0)
if err != nil {
return nil, nil, nil, fmt.Errorf("invalid max_ratio mount flag %q: %w", o, err)
}
if maxRatioInt < 0 || maxRatioInt > 100 {
return nil, nil, nil, fmt.Errorf("invalid value for max_ratio mount flag: %q", o)
}
sysfsBDI[maxRatioMountFlag] = maxRatioInt
optionSet.Delete(o)
}
}

return csiMountOptions, optionSet.List()
return csiMountOptions, optionSet.List(), sysfsBDI, nil
}
76 changes: 68 additions & 8 deletions pkg/csi_mounter/csi_mounter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,44 +42,104 @@ func TestPrepareMountArgs(t *testing.T) {
inputMountOptions []string
expecteCsiMountOptions []string
expecteSidecarMountOptions []string
expectedSysfsBDI map[string]int64
expectErr bool
}{
{
name: "should return valid options correctly with empty input",
inputMountOptions: []string{},
expecteCsiMountOptions: defaultCsiMountOptions,
expecteSidecarMountOptions: []string{},
expectedSysfsBDI: map[string]int64{},
},
{
name: "should return valid options correctly with CSI mount options only",
inputMountOptions: []string{"ro", "o=noexec", "o=noatime", "o=invalid"},
expecteCsiMountOptions: append(defaultCsiMountOptions, "ro", "noexec", "noatime"),
expecteSidecarMountOptions: []string{},
expectedSysfsBDI: map[string]int64{},
},
{
name: "should return valid options correctly with sidecar mount options only",
inputMountOptions: []string{"implicit-dirs", "max-conns-per-host=10"},
expecteCsiMountOptions: defaultCsiMountOptions,
expecteSidecarMountOptions: []string{"implicit-dirs", "max-conns-per-host=10"},
expectedSysfsBDI: map[string]int64{},
},
{
name: "should return valid options correctly with CSI and sidecar mount options",
inputMountOptions: []string{"ro", "implicit-dirs", "max-conns-per-host=10", "o=noexec", "o=noatime", "o=invalid"},
expecteCsiMountOptions: append(defaultCsiMountOptions, "ro", "noexec", "noatime"),
expecteSidecarMountOptions: []string{"implicit-dirs", "max-conns-per-host=10"},
expectedSysfsBDI: map[string]int64{},
},
{
name: "should return valid options correctly with CSI and sidecar mount options with read ahead configs",
inputMountOptions: []string{"ro", "implicit-dirs", "max-conns-per-host=10", "o=noexec", "o=noatime", "o=invalid", "read_ahead_kb=4096", "max_ratio=100"},
expecteCsiMountOptions: append(defaultCsiMountOptions, "ro", "noexec", "noatime"),
expecteSidecarMountOptions: []string{"implicit-dirs", "max-conns-per-host=10"},
expectedSysfsBDI: map[string]int64{"read_ahead_kb": 4096, "max_ratio": 100},
},
{
name: "invalid read ahead - not int",
inputMountOptions: append(defaultCsiMountOptions, "read_ahead_kb=abc"),
expectErr: true,
},
{
name: "invalid read ahead - negative",
inputMountOptions: append(defaultCsiMountOptions, "read_ahead_kb=-1"),
expectErr: true,
},
{
name: "invalid max ratio - not int",
inputMountOptions: append(defaultCsiMountOptions, "max_ratio=abc"),
expectErr: true,
},
{
name: "invalid max ratio - negative",
inputMountOptions: append(defaultCsiMountOptions, "max_ratio=-1"),
expectErr: true,
},
{
name: "invalid max ratio - greater than 100",
inputMountOptions: append(defaultCsiMountOptions, "max_ratio=101"),
expectErr: true,
},
}

for _, tc := range testCases {
t.Logf("test case: %s", tc.name)
t.Run(tc.name, func(t *testing.T) {
t.Parallel()
t.Logf("test case: %s", tc.name)

c, s, sysfsBDI, err := prepareMountOptions(tc.inputMountOptions)

if tc.expectErr && err == nil {
t.Errorf("test %q failed: expected an error, but got nil", tc.name)

return
}
if !tc.expectErr && err != nil {
t.Errorf("test %q failed: unexpected error: %v", tc.name, err)

return
}
if tc.expectErr {
return
}

if !reflect.DeepEqual(countOptionOccurrence(c), countOptionOccurrence(tc.expecteCsiMountOptions)) {
t.Errorf("Got options %v, but expected %v", c, tc.expecteCsiMountOptions)
}

c, s := prepareMountOptions(tc.inputMountOptions)
if !reflect.DeepEqual(countOptionOccurrence(c), countOptionOccurrence(tc.expecteCsiMountOptions)) {
t.Errorf("Got options %v, but expected %v", c, tc.expecteCsiMountOptions)
}
if !reflect.DeepEqual(countOptionOccurrence(s), countOptionOccurrence(tc.expecteSidecarMountOptions)) {
t.Errorf("Got options %v, but expected %v", s, tc.expecteSidecarMountOptions)
}

if !reflect.DeepEqual(countOptionOccurrence(s), countOptionOccurrence(tc.expecteSidecarMountOptions)) {
t.Errorf("Got options %v, but expected %v", s, tc.expecteSidecarMountOptions)
}
if !reflect.DeepEqual(sysfsBDI, tc.expectedSysfsBDI) {
t.Errorf("Got sysfsBDI %v, expected %v", sysfsBDI, tc.expectedSysfsBDI)
}
})
}
}

Expand Down
Loading