Skip to content

Commit

Permalink
fix(CSI-213): initial version of nfsMount.doMount()
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeyberezansky committed Aug 4, 2024
1 parent 954669b commit fbbd1da
Showing 1 changed file with 12 additions and 54 deletions.
66 changes: 12 additions & 54 deletions pkg/wekafs/nfsmount.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,71 +117,29 @@ func (m *nfsMount) ensureMountIpAddress(ctx context.Context, apiClient *apiclien

func (m *nfsMount) doMount(ctx context.Context, apiClient *apiclient.ApiClient, mountOptions MountOptions) error {
logger := log.Ctx(ctx).With().Str("mount_point", m.mountPoint).Str("filesystem", m.fsName).Logger()
mountToken := ""
var mountOptionsSensitive []string
var localContainerName string
if err := os.MkdirAll(m.mountPoint, DefaultVolumePermissions); err != nil {
return err
}
if !m.isInDevMode() {
pattern := "/proc/wekafs/*/queue"
containerPaths, err := filepath.Glob(pattern)
if err != nil {
logger.Error().Err(err).Msg("Failed to fetch WekaFS containers on host, cannot mount filesystem without Weka container")
return err
} else if len(containerPaths) == 0 {
logger.Error().Err(err).Msg("Failed to find active Weka container, cannot mount filesystem")
return errors.New("could not perform a mount since active Weka container was not found on host")
}

if apiClient == nil {
// this flow is relevant only for legacy volumes, will not work with SCMC
logger.Trace().Msg("No API client for mount, not requesting mount token")
} else {
if mountToken, err = apiClient.GetMountTokenForFilesystemName(ctx, m.fsName); err != nil {
return err
}
mountOptionsSensitive = append(mountOptionsSensitive, fmt.Sprintf("token=%s", mountToken))
logger.Trace().Msg("No API client for mount, cannot proceed")
return errors.New("no API client for mount, cannot do NFS mount")
}

// if needed, add containerName to the mount string
if apiClient != nil && len(containerPaths) > 1 {
if apiClient.SupportsMultipleClusters() {
localContainerName = apiClient.Credentials.LocalContainerName
if localContainerName != "" {
logger.Info().Str("local_container_name", localContainerName).Msg("Local container name set by secret")
} else {
container, err := apiClient.GetLocalContainer(ctx, m.allowProtocolContainers)
if err != nil || container == nil {
logger.Warn().Err(err).Msg("Failed to determine local container, assuming default")
} else {
localContainerName = container.ContainerName
}

}
if localContainerName != "" {
for _, p := range containerPaths {
containerName := filepath.Base(filepath.Dir(p))
if localContainerName == containerName {
mountOptions.customOptions["container_name"] = mountOption{
option: "container_name",
value: localContainerName,
}
break
}
}

} else {
err = errors.New("mount failed, local container name not specified and could not be determined automatically, refer to documentation on handling multiple clusters clients with Kubernetes")
logger.Error().Err(err).Msg("Failed to mount")
return err
}
}
if err := m.ensureMountIpAddress(ctx, apiClient); err != nil {
logger.Error().Err(err).Msg("Failed to get mount IP address")
return err
}

logger.Trace().Strs("mount_options", m.mountOptions.Strings()).
Fields(mountOptions).Msg("Performing mount")
return m.kMounter.MountSensitive(m.fsName, m.mountPoint, "wekafs", mountOptions.Strings(), mountOptionsSensitive)
mountTarget := m.mountIpAddress + ":/" + m.fsName
logger.Trace().
Strs("mount_options", m.mountOptions.Strings()).
Str("mount_target", mountTarget).
Fields(mountOptions).
Msg("Performing mount")
return m.kMounter.MountSensitive(mountTarget, m.mountPoint, "nfs", mountOptions.Strings(), mountOptionsSensitive)
} else {
fakePath := filepath.Join(m.debugPath, m.fsName)
if err := os.MkdirAll(fakePath, DefaultVolumePermissions); err != nil {
Expand Down

0 comments on commit fbbd1da

Please sign in to comment.