Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed kubectl dependencies from sidecars #39

Merged
merged 11 commits into from
Aug 9, 2023
2 changes: 0 additions & 2 deletions pkg/sidecars/docker/aux.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,8 @@ func mountEmptyDir(container v1.Container, pod v1.Pod, emptyDir string) string {
podVolumeSpec = &vol.VolumeSource
}
if podVolumeSpec != nil && podVolumeSpec.EmptyDir != nil {
// pod-global directory
edPath = filepath.Join(wd+"/"+commonIL.InterLinkConfigInst.DataRootFolder, pod.Namespace+"-"+string(pod.UID)+"/"+"emptyDirs/"+vol.Name)
log.G(Ctx).Info("-- Creating EmptyDir in " + edPath)
// mounted for every container
cmd := []string{"-p " + edPath}
shell := exec2.ExecTask{
Command: "mkdir",
Expand Down
129 changes: 51 additions & 78 deletions pkg/sidecars/slurm/aux.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func prepare_envs(container v1.Container) []string {
return env
}

func prepare_mounts(container v1.Container, pod *v1.Pod) []string {
func prepare_mounts(container v1.Container, pod *v1.Pod, data []commonIL.RetrievedPodData) []string {
log.G(Ctx).Info("-- Preparing mountpoints for " + container.Name)
mount := make([]string, 1)
mount = append(mount, "--bind")
Expand All @@ -64,54 +64,49 @@ func prepare_mounts(container v1.Container, pod *v1.Pod) []string {
log.G(Ctx).Info("-- Created directory " + commonIL.InterLinkConfigInst.DataRootFolder + strings.Join(pod_name[:len(pod_name)-1], "-"))
}

for _, mount_var := range container.VolumeMounts {
log.G(Ctx).Info("-- Processing mountpoint " + mount_var.Name)

var podVolumeSpec *v1.VolumeSource
path := ""
fmt.Print(path)

for _, vol := range pod.Spec.Volumes {

if vol.Name == mount_var.Name {
podVolumeSpec = &vol.VolumeSource
for _, podData := range data {
for _, cont := range podData.Containers {
for _, cfgMap := range cont.ConfigMaps {
if container.Name == cont.Name {
configMapsPaths, envs := mountConfigMaps(container, pod, cfgMap)
for i, path := range configMapsPaths {
if os.Getenv("SHARED_FS") != "true" {
dirs := strings.Split(path, ":")
splitDirs := strings.Split(dirs[0], "/")
dir := filepath.Join(splitDirs[:len(splitDirs)-1]...)
prefix += "\nmkdir -p " + dir + " && touch " + dirs[0] + " && echo $" + envs[i] + " > " + dirs[0]
} else {
mount_data += path
}
}
}
}

if podVolumeSpec != nil && podVolumeSpec.ConfigMap != nil {
configMapsPaths, envs := mountConfigMaps(container, pod)
for i, path := range configMapsPaths {
if os.Getenv("SHARED_FS") == "true" {
dirs := strings.Split(path, ":")
splitDirs := strings.Split(dirs[0], "/")
dir := filepath.Join(splitDirs[:len(splitDirs)-1]...)
prefix += "\nmkdir -p " + dir + " && touch " + dirs[0] + " && echo $" + envs[i] + " > " + dirs[0]
for _, secret := range cont.Secrets {
if container.Name == cont.Name {
secretsPaths, envs := mountSecrets(container, pod, secret)
for i, path := range secretsPaths {
if os.Getenv("SHARED_FS") != "true" {
dirs := strings.Split(path, ":")
splitDirs := strings.Split(dirs[0], "/")
dir := filepath.Join(splitDirs[:len(splitDirs)-1]...)
prefix += "\nmkdir -p " + dir + " && touch " + dirs[0] + " && echo $" + envs[i] + " > " + dirs[0]
} else {
mount_data += path
}
}
mount_data += path
}
}

} else if podVolumeSpec != nil && podVolumeSpec.Secret != nil {
secretsPaths, envs := mountSecrets(container, pod)
for i, path := range secretsPaths {
if os.Getenv("SHARED_FS") == "true" {
dirs := strings.Split(path, ":")
splitDirs := strings.Split(dirs[0], "/")
dir := filepath.Join(splitDirs[:len(splitDirs)-1]...)
prefix += "\nmkdir -p " + dir + " && touch " + dirs[0] + " && echo $" + envs[i] + " > " + dirs[0]
}
for _, emptyDir := range cont.EmptyDirs {
if container.Name == cont.Name {
path := mountEmptyDir(container, pod, emptyDir)
mount_data += path
}
} else if podVolumeSpec != nil && podVolumeSpec.EmptyDir != nil {
path := mountEmptyDir(container, pod)
mount_data += path

} else {
/* path = filepath.Join(commonIL.InterLinkConfigInst.DataRootFolder, pod.Namespace+"-"+string(pod.UID)+"/", mount_var.Name)
path = (".knoc/" + strings.Join(pod_name, "-") + "/" + mount_var.Name + ":" + mount_var.MountPath + ",")
mount_data += path */
log.G(Ctx).Debug("\n*******************\n*To be implemented*\n*******************")
}
}
}

path_hardcoded := ("/cvmfs/grid.cern.ch/etc/grid-security:/etc/grid-security" + "," +
"/cvmfs:/cvmfs" + "," +
"/exa5/scratch/user/spigad" + "," +
Expand All @@ -126,6 +121,7 @@ func prepare_mounts(container v1.Container, pod *v1.Pod) []string {
func produce_slurm_script(container v1.Container, metadata metav1.ObjectMeta, command []string) string {
log.G(Ctx).Info("-- Creating file for the Slurm script")
path := "/tmp/" + container.Name + ".sh"
os.Remove(path)
f, err := os.Create(path)
postfix := ""

Expand Down Expand Up @@ -256,7 +252,7 @@ func delete_container(container v1.Container) {
exec.Command("rm", "-rf", commonIL.InterLinkConfigInst.DataRootFolder+container.Name)
}

func mountConfigMaps(container v1.Container, pod *v1.Pod) ([]string, []string) { //returns an array containing mount paths for configMaps
func mountConfigMaps(container v1.Container, pod *v1.Pod, cfgMap v1.ConfigMap) ([]string, []string) { //returns an array containing mount paths for configMaps
configMaps := make(map[string]string)
var configMapNamePaths []string
var envs []string
Expand All @@ -283,37 +279,26 @@ func mountConfigMaps(container v1.Container, pod *v1.Pod) ([]string, []string) {
podVolumeSpec = &vol.VolumeSource
}
if podVolumeSpec != nil && podVolumeSpec.ConfigMap != nil {
log.G(Ctx).Info("--- Retrieving ConfigMap " + podVolumeSpec.ConfigMap.Name)
cmvs := podVolumeSpec.ConfigMap
log.G(Ctx).Info("--- Mounting ConfigMap " + podVolumeSpec.ConfigMap.Name)
mode := os.FileMode(*podVolumeSpec.ConfigMap.DefaultMode)
podConfigMapDir := filepath.Join(commonIL.InterLinkConfigInst.DataRootFolder, pod.Namespace+"-"+string(pod.UID)+"/", "configMaps/", vol.Name)

configMap, err := Clientset.CoreV1().ConfigMaps(pod.Namespace).Get(cmvs.Name, metav1.GetOptions{})

if err != nil {
log.G(Ctx).Error(err)
}

if configMap.Data != nil {
for key := range configMap.Data {
configMaps[key] = configMap.Data[key]
if cfgMap.Data != nil {
for key := range cfgMap.Data {
configMaps[key] = cfgMap.Data[key]
path := filepath.Join(podConfigMapDir, key)
path += (":" + mountSpec.MountPath + "/" + key + ",")
configMapNamePaths = append(configMapNamePaths, path)

if os.Getenv("SHARED_FS") != "true" {
env := string(container.Name) + "_CFG_" + key
log.G(Ctx).Debug("---- Setting env " + env + " to mount the file later")
os.Setenv(env, configMap.Data[key])
os.Setenv(env, cfgMap.Data[key])
envs = append(envs, env)
}
}
}

if configMaps == nil {
continue
}

if os.Getenv("SHARED_FS") == "true" {
log.G(Ctx).Info("--- Shared FS enabled, files will be directly created before the job submission")
cmd = []string{"-p " + podConfigMapDir}
Expand Down Expand Up @@ -350,7 +335,7 @@ func mountConfigMaps(container v1.Container, pod *v1.Pod) ([]string, []string) {
return configMapNamePaths, envs
}

func mountSecrets(container v1.Container, pod *v1.Pod) ([]string, []string) { //returns an array containing mount paths for secrets
func mountSecrets(container v1.Container, pod *v1.Pod, secret v1.Secret) ([]string, []string) { //returns an array containing mount paths for secrets
secrets := make(map[string][]byte)
var secretNamePaths []string
var envs []string
Expand All @@ -377,17 +362,10 @@ func mountSecrets(container v1.Container, pod *v1.Pod) ([]string, []string) { //
podVolumeSpec = &vol.VolumeSource
}
if podVolumeSpec != nil && podVolumeSpec.Secret != nil {
log.G(Ctx).Info("--- Retrieving Secret " + podVolumeSpec.Secret.SecretName)
svs := podVolumeSpec.Secret
log.G(Ctx).Info("--- Mounting Secret " + podVolumeSpec.Secret.SecretName)
mode := os.FileMode(*podVolumeSpec.Secret.DefaultMode)
podSecretDir := filepath.Join(commonIL.InterLinkConfigInst.DataRootFolder, pod.Namespace+"-"+string(pod.UID)+"/", "secrets/", vol.Name)

secret, err := Clientset.CoreV1().Secrets(pod.Namespace).Get(svs.SecretName, metav1.GetOptions{})

if err != nil {
log.G(Ctx).Error(err)
}

if secret.Data != nil {
for key := range secret.Data {
secrets[key] = secret.Data[key]
Expand All @@ -404,10 +382,6 @@ func mountSecrets(container v1.Container, pod *v1.Pod) ([]string, []string) { //
}
}

if secrets == nil {
continue
}

if os.Getenv("SHARED_FS") == "true" {
log.G(Ctx).Info("--- Shared FS enabled, files will be directly created before the job submission")
cmd = []string{"-p " + podSecretDir}
Expand All @@ -427,7 +401,7 @@ func mountSecrets(container v1.Container, pod *v1.Pod) ([]string, []string) { //
log.G(Ctx).Debug("--- Created folder " + podSecretDir)
}

log.G(Ctx).Debug("Writing Secret files")
log.G(Ctx).Debug("--- Writing Secret files")
for k, v := range secrets {
// TODO: Ensure that these files are deleted in failure cases
fullPath := filepath.Join(podSecretDir, k)
Expand All @@ -436,7 +410,7 @@ func mountSecrets(container v1.Container, pod *v1.Pod) ([]string, []string) { //
log.G(Ctx).Errorf("Could not write Secret file %s", fullPath)
os.Remove(fullPath)
} else {
log.G(Ctx).Debug("Written Secret file " + fullPath)
log.G(Ctx).Debug("--- Written Secret file " + fullPath)
}
}
}
Expand All @@ -447,11 +421,12 @@ func mountSecrets(container v1.Container, pod *v1.Pod) ([]string, []string) { //
return secretNamePaths, envs
}

func mountEmptyDir(container v1.Container, pod *v1.Pod) string {
func mountEmptyDir(container v1.Container, pod *v1.Pod, emptyDir string) string {
var edPath string
wd, _ := os.Getwd()

if commonIL.InterLinkConfigInst.ExportPodData {
cmd := []string{"-rf " + commonIL.InterLinkConfigInst.DataRootFolder + "emptyDirs"}
cmd := []string{"-rf " + wd + "/" + commonIL.InterLinkConfigInst.DataRootFolder + "emptyDirs"}
shell := exec2.ExecTask{
Command: "rm",
Args: cmd,
Expand All @@ -472,10 +447,8 @@ func mountEmptyDir(container v1.Container, pod *v1.Pod) string {
podVolumeSpec = &vol.VolumeSource
}
if podVolumeSpec != nil && podVolumeSpec.EmptyDir != nil {
// pod-global directory
edPath = filepath.Join(commonIL.InterLinkConfigInst.DataRootFolder, pod.Namespace+"-"+string(pod.UID)+"/"+"emptyDirs/"+vol.Name)
log.G(Ctx).Info("--- Creating EmptyDir in " + edPath)
// mounted for every container
edPath = filepath.Join(wd+"/"+commonIL.InterLinkConfigInst.DataRootFolder, pod.Namespace+"-"+string(pod.UID)+"/"+"emptyDirs/"+vol.Name)
log.G(Ctx).Info("-- Creating EmptyDir in " + edPath)
cmd := []string{"-p " + edPath}
shell := exec2.ExecTask{
Command: "mkdir",
Expand All @@ -487,7 +460,7 @@ func mountEmptyDir(container v1.Container, pod *v1.Pod) string {
if err != nil {
log.G(Ctx).Error(err)
} else {
log.G(Ctx).Debug("---- Created EmptyDir in " + edPath)
log.G(Ctx).Debug("-- Created EmptyDir in " + edPath)
}

edPath += (":" + mountSpec.MountPath + "/" + mountSpec.Name + ",")
Expand Down
30 changes: 13 additions & 17 deletions pkg/sidecars/slurm/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,30 @@ var JID []JidStruct

func SubmitHandler(w http.ResponseWriter, r *http.Request) {
log.G(Ctx).Info("Slurm Sidecar: received Submit call")
//var resp commonIL.StatusResponse

bodyBytes, err := ioutil.ReadAll(r.Body)
if err != nil {
log.G(Ctx).Error(err)
return
}

var req commonIL.Request
var req []commonIL.RetrievedPodData
json.Unmarshal(bodyBytes, &req)
if err != nil {
log.G(Ctx).Error(err)
return
}

for _, pod := range req.Pods {
for _, data := range req {
var metadata metav1.ObjectMeta
var containers []v1.Container

containers = pod.Spec.Containers
metadata = pod.ObjectMeta
containers = data.Pod.Spec.Containers
metadata = data.Pod.ObjectMeta

for _, container := range containers {
log.G(Ctx).Info("- Beginning script generation for container " + container.Name)
commstr1 := []string{"singularity", "exec"}

envs := prepare_envs(container)
image := ""
mounts := prepare_mounts(container, pod)
mounts := prepare_mounts(container, &data.Pod, req)
if strings.HasPrefix(container.Image, "/") {
if image_uri, ok := metadata.Annotations["slurm-job.knoc.io/image-root"]; ok {
image = image_uri + container.Image
Expand All @@ -65,14 +61,14 @@ func SubmitHandler(w http.ResponseWriter, r *http.Request) {
singularity_command = append(singularity_command, container.Args...)

path := produce_slurm_script(container, metadata, singularity_command)
out := slurm_batch_submit(path)
handle_jid(container, out, *pod)
/*out := */ slurm_batch_submit(path)
//handle_jid(container, out, data.Pod)

jid, err := os.ReadFile(commonIL.InterLinkConfigInst.DataRootFolder + container.Name + ".jid")
if err != nil {
log.G(Ctx).Error("Unable to read JID from file")
}
JID = append(JID, JidStruct{JID: string(jid), Pod: *pod})
JID = append(JID, JidStruct{JID: string(jid), Pod: data.Pod})
}
}

Expand All @@ -88,14 +84,14 @@ func StopHandler(w http.ResponseWriter, r *http.Request) {
return
}

var req commonIL.Request
var req []*v1.Pod
err = json.Unmarshal(bodyBytes, &req)
if err != nil {
log.G(Ctx).Error(err)
return
}

for _, pod := range req.Pods {
for _, pod := range req {
containers := pod.Spec.Containers

for _, container := range containers {
Expand All @@ -113,7 +109,7 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) {
return
}

var req commonIL.Request
var req []*v1.Pod
var resp commonIL.StatusResponse
json.Unmarshal(bodyBytes, &req)
if err != nil {
Expand All @@ -134,7 +130,7 @@ func StatusHandler(w http.ResponseWriter, r *http.Request) {
log.G(Ctx).Error("Unable to retrieve job status: " + execReturn.Stderr)
}

for _, pod := range req.Pods {
for _, pod := range req {
var flag = false
for _, jid := range JID {

Expand Down