Skip to content

Commit

Permalink
Update qsub_parallel to include rabbitmq submission.
Browse files Browse the repository at this point in the history
  • Loading branch information
Dongchen Zhang committed Dec 9, 2023
1 parent ca435b6 commit 1341ada
Showing 1 changed file with 87 additions and 65 deletions.
152 changes: 87 additions & 65 deletions base/remote/R/qsub_parallel.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ qsub_parallel <- function(settings, files = NULL, prefix = "sipnet.out", sleep =
folder <- NULL
run_list <- readLines(con = file.path(settings$rundir, "runs.txt"))
is_local <- PEcAn.remote::is.localhost(settings$host)
is_qsub <- !is.null(settings$host$qsub)
is_rabbitmq <- !is.null(settings$host$rabbitmq)
# loop through runs and either call start run, or launch job on remote machine
# parallel submit jobs
cores <- parallel::detectCores()
Expand All @@ -35,25 +37,32 @@ qsub_parallel <- function(settings, files = NULL, prefix = "sipnet.out", sleep =
PEcAn.logger::logger.info("Submitting jobs!")
# if we want to submit jobs separately.
if(is.null(files)){
jobids <- foreach::foreach(run = run_list, .packages="Kendall", .options.snow=opts, settings = rep(settings, length(files))) %dopar% {
run_id_string <- format(run, scientific = FALSE)
qsub <- settings$host$qsub
qsub <- gsub("@NAME@", paste0("PEcAn-", run_id_string), qsub)
qsub <- gsub("@STDOUT@", file.path(settings$host$outdir, run_id_string, "stdout.log"), qsub)
qsub <- gsub("@STDERR@", file.path(settings$host$outdir, run_id_string, "stderr.log"), qsub)
qsub <- strsplit(qsub, " (?=([^\"']*\"[^\"']*\")*[^\"']*$)", perl = TRUE)
# start the actual model run
cmd <- qsub[[1]]
if(PEcAn.remote::is.localhost(settings$host)){
out <- system2(cmd, file.path(settings$host$rundir, run_id_string, "job.sh"), stdout = TRUE, stderr = TRUE)
}else{
out <- PEcAn.remote::remote.execute.cmd(settings$host, cmd, file.path(settings$host$rundir, run_id_string, "job.sh"), stderr = TRUE)
if (is_qsub) {
jobids <- foreach::foreach(run = run_list, .packages="Kendall", .options.snow=opts, settings = rep(settings, length(run_list))) %dopar% {
run_id_string <- format(run, scientific = FALSE)
qsub <- settings$host$qsub
qsub <- gsub("@NAME@", paste0("PEcAn-", run_id_string), qsub)
qsub <- gsub("@STDOUT@", file.path(settings$host$outdir, run_id_string, "stdout.log"), qsub)
qsub <- gsub("@STDERR@", file.path(settings$host$outdir, run_id_string, "stderr.log"), qsub)
qsub <- strsplit(qsub, " (?=([^\"']*\"[^\"']*\")*[^\"']*$)", perl = TRUE)
# start the actual model run
cmd <- qsub[[1]]
if(PEcAn.remote::is.localhost(settings$host)){
out <- system2(cmd, file.path(settings$host$rundir, run_id_string, "job.sh"), stdout = TRUE, stderr = TRUE)
}else{
out <- PEcAn.remote::remote.execute.cmd(settings$host, cmd, file.path(settings$host$rundir, run_id_string, "job.sh"), stderr = TRUE)
}
jobid <- PEcAn.remote::qsub_get_jobid(
out = out[length(out)],
qsub.jobid = settings$host$qsub.jobid,
stop.on.error = TRUE)
return(jobid)
}
} else if (is_rabbitmq) {
out <- foreach::foreach(run = run_list, .packages="Kendall", .options.snow=opts, settings = rep(settings, length(run_list))) %dopar% {
run_id_string <- format(run, scientific = FALSE)
PEcAn.remote::start_rabbitmq(file.path(settings$host$rundir, run_id_string), settings$host$rabbitmq$uri, settings$host$rabbitmq$queue)
}
jobid <- PEcAn.remote::qsub_get_jobid(
out = out[length(out)],
qsub.jobid = settings$host$qsub.jobid,
stop.on.error = TRUE)
return(jobid)
}
}else{
# if we want to submit merged job files.
Expand Down Expand Up @@ -92,69 +101,82 @@ qsub_parallel <- function(settings, files = NULL, prefix = "sipnet.out", sleep =
## setup progressbar
folders <- file.path(settings$host$outdir, run_list)
L_folder <- length(folders)
L_jobid <- length(jobids)

pb <- utils::txtProgressBar(min = 0, max = L_folder, style = 3)
pb1 <- utils::txtProgressBar(min = 0, max = L_jobid, style = 3)
pbi <- pbi1 <- 0
pbi <- 0
#here we not only detect if the target files are generated.
#we also detect if the jobs are still existed on the server.
if (hybrid) {
while ((L_folder - length(folders)) < L_folder &
(L_jobid - length(jobids)) < L_jobid) {
if (is_rabbitmq) {
while ((L_folder - length(folders)) < L_folder) {
Sys.sleep(sleep)
completed_folders <- foreach::foreach(folder = folders) %dopar% {
if(file.exists(file.path(folder, prefix))){
return(folder)
}
} %>% unlist()
folders <- folders[which(!folders %in% completed_folders)]

#or we can try detect if the jobs are still on the server.
#specify the host and qstat arguments for the future_map function.
host <- settings$host
qstat <- host$qstat
completed_jobs <- jobids %>% furrr::future_map(function(id) {
if (PEcAn.remote::qsub_run_finished(
run = id,
host = host,
qstat = qstat)) {
return(id)
}
}) %>% unlist()
jobids <- jobids[which(!jobids %in% completed_jobs)]

#compare two progresses and set the maximum progress for the progress bar.
pbi <- L_folder - length(folders)
utils::setTxtProgressBar(pb, pbi)

pbi1 <- L_jobid - length(jobids)
utils::setTxtProgressBar(pb1, pbi1)
}
} else {
#special case that only detect the job ids on the server.
while ((L_jobid - length(jobids)) < L_jobid) {
#detect if the jobs are still on the server.
#specify the host and qstat arguments for the future_map function.
Sys.sleep(sleep)
host <- settings$host
qstat <- host$qstat
completed_jobs <- jobids %>% furrr::future_map(function(id) {
if (PEcAn.remote::qsub_run_finished(
run = id,
host = host,
qstat = qstat)) {
return(id)
}
}) %>% unlist()
jobids <- jobids[which(!jobids %in% completed_jobs)]

#compare two progresses and set the maximum progress for the progress bar.
pbi1 <- L_jobid - length(jobids)
utils::setTxtProgressBar(pb1, pbi1)
L_jobid <- length(jobids)
pb1 <- utils::txtProgressBar(min = 0, max = L_jobid, style = 3)
pb1 <- 0
if (hybrid) {
while ((L_folder - length(folders)) < L_folder &
(L_jobid - length(jobids)) < L_jobid) {
Sys.sleep(sleep)
completed_folders <- foreach::foreach(folder = folders) %dopar% {
if(file.exists(file.path(folder, prefix))){
return(folder)
}
} %>% unlist()
folders <- folders[which(!folders %in% completed_folders)]

#or we can try detect if the jobs are still on the server.
#specify the host and qstat arguments for the future_map function.
host <- settings$host
qstat <- host$qstat
completed_jobs <- jobids %>% furrr::future_map(function(id) {
if (PEcAn.remote::qsub_run_finished(
run = id,
host = host,
qstat = qstat)) {
return(id)
}
}) %>% unlist()
jobids <- jobids[which(!jobids %in% completed_jobs)]

#compare two progresses and set the maximum progress for the progress bar.
pbi <- L_folder - length(folders)
utils::setTxtProgressBar(pb, pbi)

pbi1 <- L_jobid - length(jobids)
utils::setTxtProgressBar(pb1, pbi1)
}
} else {
#special case that only detect the job ids on the server.
while ((L_jobid - length(jobids)) < L_jobid) {
#detect if the jobs are still on the server.
#specify the host and qstat arguments for the future_map function.
Sys.sleep(sleep)
host <- settings$host
qstat <- host$qstat
completed_jobs <- jobids %>% furrr::future_map(function(id) {
if (PEcAn.remote::qsub_run_finished(
run = id,
host = host,
qstat = qstat)) {
return(id)
}
}) %>% unlist()
jobids <- jobids[which(!jobids %in% completed_jobs)]

#compare two progresses and set the maximum progress for the progress bar.
pbi1 <- L_jobid - length(jobids)
utils::setTxtProgressBar(pb1, pbi1)
}
}
}

close(pb)
parallel::stopCluster(cl)
PEcAn.logger::logger.info("Completed!")
Expand Down

0 comments on commit 1341ada

Please sign in to comment.