Skip to content

Commit

Permalink
Merge pull request #205 from HopkinsIDD/breaking-improvements-fixsaving
Browse files Browse the repository at this point in the history
breaking-improvements-fixsaving
  • Loading branch information
alsnhll authored Apr 22, 2024
2 parents 8d49518 + 841221c commit da23f2b
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 47 deletions.
55 changes: 27 additions & 28 deletions flepimop/R_packages/inference/R/inference_slot_runner_funcs.R
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ aggregate_and_calc_loc_likelihoods <- function(
## We use a data frame for debugging, only ll is used
likelihood_data[[location]] <- dplyr::tibble(
ll = this_location_log_likelihood,
filename = hosp_file,
subpop = location,
accept = 0, # acceptance decision (0/1) . Will be updated later when accept/reject decisions made
accept_avg = 0, # running average acceptance decision
Expand Down Expand Up @@ -589,8 +588,8 @@ initialize_mcmc_first_block <- function(
}

# load and add to original seeding
seed_new <- readr::read_csv(global_files[["seed_filename"]])
added_seeding <- readr::read_csv(config$seeding$added_seeding$added_lambda_file)
seed_new <- readr::read_csv(global_files[["seed_filename"]],show_col_types = FALSE)
added_seeding <- readr::read_csv(config$seeding$added_seeding$added_lambda_file,show_col_types = FALSE)

if (!is.null(config$seeding$added_seeding$fix_original_seeding) &&
config$seeding$added_seeding$fix_original_seeding){
Expand Down Expand Up @@ -626,19 +625,6 @@ initialize_mcmc_first_block <- function(
}
initial_init_file <- config$initial_conditions$initial_conditions_file

if (!file.exists(config$initial_conditions$initial_conditions_file)) {
stop("ERROR: Initial conditions file specified but does not exist.")
}
if (grepl(".csv", initial_init_file)){
initial_init <- readr::read_csv(initial_init_file)
arrow::write_parquet(initial_init, global_files[["init_filename"]])
}else{
err <- !(file.copy(initial_init_file, global_files[["init_filename"]]))
if (err != 0) {
stop("Could not copy initial conditions file")
}
}

} else if (config$initial_conditions$method %in% c("InitialConditionsFolderDraw", "SetInitialConditionsFolderDraw")) {
print("Initial conditions in inference has not been fully implemented yet for the 'folder draw' methods,
and no copying to global or chimeric files is being done.")
Expand All @@ -647,21 +633,34 @@ initialize_mcmc_first_block <- function(
stop("ERROR: Initial conditions file needs to be specified in the config under `initial_conditions:initial_conditions_file`")
}
initial_init_file <- global_files[[paste0(config$initial_conditions$initial_file_type, "_filename")]]
}

if (!file.exists(initial_init_file)) {
stop("ERROR: Initial conditions file specified but does not exist.")
}
if (grepl(".csv", initial_init_file)){
initial_init <- readr::read_csv(initial_init_file)
arrow::write_parquet(initial_init, global_files[["init_filename"]])
}else{
err <- !(file.copy(initial_init_file, global_files[["init_filename"]]))
if (err != 0) {
stop("Could not copy initial conditions file")
}
}

if (!file.exists(initial_init_file)) {
stop("ERROR: Initial conditions file specified but does not exist.")
}

if (grepl(".csv", initial_init_file)){
initial_init <- readr::read_csv(initial_init_file,show_col_types = FALSE)
}else{
initial_init <- arrow::read_parquet(initial_init_file)
}

# if the initial conditions file contains a 'date' column, filter for config$start_date

if("date" %in% colnames(initial_init)){

initial_init <- initial_init %>%
dplyr::mutate(date = as.POSIXct(date, tz="UTC")) %>%
dplyr::filter(date == as.POSIXct(paste0(config$start_date, " 00:00:00"), tz="UTC"))

if (nrow(initial_init) == 0) {
stop("ERROR: Initial conditions file specified but does not contain the start date.")
}

}

arrow::write_parquet(initial_init, global_files[["init_filename"]])
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ test_that("aggregate_and_calc_loc_likelihoods returns a likelihood per location


expect_that(nrow(tmp), equals(length(stuff$all_locations)))
expect_that(sort(colnames(tmp)), equals(sort(c("ll","accept","accept_prob","accept_avg","filename",stuff$obs_subpop))))
expect_that(sort(colnames(tmp)), equals(sort(c("ll","accept","accept_prob","accept_avg",stuff$obs_subpop))))

})

Expand Down
14 changes: 12 additions & 2 deletions flepimop/main_scripts/inference_main.R
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ option_list = list(
optparse::make_option(c("-r", "--rpath"), action="store", default=Sys.getenv("RSCRIPT_PATH","Rscript"), type = 'character', help = "path to R executable"),
optparse::make_option(c("-R", "--is-resume"), action="store", default=Sys.getenv("RESUME_RUN",FALSE), type = 'logical', help = "Is this run a resume"),
optparse::make_option(c("-I", "--is-interactive"), action="store", default=Sys.getenv("RUN_INTERACTIVE",Sys.getenv("INTERACTIVE_RUN", FALSE)), type = 'logical', help = "Is this run an interactive run"),
optparse::make_option(c("-L", "--reset_chimeric_on_accept"), action = "store", default = Sys.getenv("FLEPI_RESET_CHIMERICS", TRUE), type = 'logical', help = 'Should the chimeric parameters get reset to global parameters when a global acceptance occurs')
optparse::make_option(c("-L", "--reset_chimeric_on_accept"), action = "store", default = Sys.getenv("FLEPI_RESET_CHIMERICS", TRUE), type = 'logical', help = 'Should the chimeric parameters get reset to global parameters when a global acceptance occurs'),
optparse::make_option(c("-S","--save_seir"), action = "store", default = Sys.getenv("SAVE_SEIR", FALSE), type = 'logical', help = 'Should the SEIR output files be saved for each iteration'),
optparse::make_option(c("-H","--save_hosp"), action = "store", default = Sys.getenv("SAVE_HOSP", TRUE), type = 'logical', help = 'Should the HOSP output files be saved for each iteration'),
optparse::make_option(c("-M", "--memory_profiling"), action = "store", default = Sys.getenv("FLEPI_MEM_PROFILE", FALSE), type = 'logical', help = 'Should the memory profiling be run during iterations'),
optparse::make_option(c("-P", "--memory_profiling_iters"), action = "store", default = Sys.getenv("FLEPI_MEM_PROF_ITERS", 100), type = 'integer', help = 'If doing memory profiling, after every X iterations run the profiler'),
optparse::make_option(c("-g", "--subpop_len"), action="store", default=Sys.getenv("SUBPOP_LENGTH", 5), type='integer', help = "number of digits in subpop")
)

parser=optparse::OptionParser(option_list=option_list)
Expand Down Expand Up @@ -114,7 +119,7 @@ foreach(seir_modifiers_scenario = seir_modifiers_scenarios) %:%
if (nchar(opt$ground_truth_start) > 0) {
ground_truth_start_text <- c("--ground_truth_start", opt$ground_truth_start)
}
if (nchar(opt$ground_truth_start) > 0) {
if (nchar(opt$ground_truth_end) > 0) {
ground_truth_end_text <- c("--ground_truth_end", opt$ground_truth_end)
}

Expand All @@ -139,6 +144,11 @@ foreach(seir_modifiers_scenario = seir_modifiers_scenarios) %:%
"-R", opt[["is-resume"]],
"-I", opt[["is-interactive"]],
"-L", opt$reset_chimeric_on_accept,
"-S", opt$save_seir,
"-H", opt$save_hosp,
"-M", opt$memory_profiling,
"-P", opt$memory_profiling_iters,
"-g", opt$subpop_len,
#paste("2>&1 | tee log_inference_slot_",config$name,"_",opt$run_id, "_", flepi_slot, ".txt", sep=""), # works on Mac only, not windows
#paste("2>&1 | tee model_output/",config$name,"/",opt$run_id,"/log/log_inference_slot", flepi_slot, ".txt", sep=""), # doesn't work because config$name needs to be combined with scenarios to generate the folder name, and, because this command seems to only be able to pipe output to pre-existing folders
sep = " ")
Expand Down
87 changes: 71 additions & 16 deletions flepimop/main_scripts/inference_slot.R
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ option_list = list(
optparse::make_option(c("-R", "--is-resume"), action="store", default=Sys.getenv("RESUME_RUN",FALSE), type = 'logical', help = "Is this run a resume"),
optparse::make_option(c("-I", "--is-interactive"), action="store", default=Sys.getenv("RUN_INTERACTIVE",Sys.getenv("INTERACTIVE_RUN", FALSE)), type = 'logical', help = "Is this run an interactive run"),
optparse::make_option(c("-L", "--reset_chimeric_on_accept"), action = "store", default = Sys.getenv("FLEPI_RESET_CHIMERICS", TRUE), type = 'logical', help = 'Should the chimeric parameters get reset to global parameters when a global acceptance occurs'),
optparse::make_option(c("-S","--save_seir"), action = "store", default = Sys.getenv("SAVE_SEIR", FALSE), type = 'logical', help = 'Should the SEIR output files be saved for each iteration'),
optparse::make_option(c("-H","--save_hosp"), action = "store", default = Sys.getenv("SAVE_HOSP", TRUE), type = 'logical', help = 'Should the HOSP output files be saved for each iteration'),
optparse::make_option(c("-M", "--memory_profiling"), action = "store", default = Sys.getenv("FLEPI_MEM_PROFILE", FALSE), type = 'logical', help = 'Should the memory profiling be run during iterations'),
optparse::make_option(c("-P", "--memory_profiling_iters"), action = "store", default = Sys.getenv("FLEPI_MEM_PROF_ITERS", 100), type = 'integer', help = 'If doing memory profiling, after every X iterations run the profiler'),
optparse::make_option(c("-g", "--subpop_len"), action="store", default=Sys.getenv("SUBPOP_LENGTH", 5), type='integer', help = "number of digits in subpop")
Expand Down Expand Up @@ -363,6 +365,19 @@ if (!opt$reset_chimeric_on_accept) {
warning("We recommend setting reset_chimeric_on_accept TRUE, since reseting chimeric chains on global acceptances more closely matches normal MCMC behaviour")
}

if(!opt$save_seir){
warning("To save space, intermediate SEIR files will not be saved for every iteration of the MCMC inference procedure. To save these files, set option save_seir TRUE.")
}

if(!opt$save_hosp){
warning("To save space, intermediate HOSP files will not be saved for every iteration of the MCMC inference procedure. To save these files, set option save_hosp TRUE.")
}

if(opt$memory_profiling){
print(paste("Inference will run memory profiling every",opt$memory_profiling_iters,"iterations"))
}


for(seir_modifiers_scenario in seir_modifiers_scenarios) {

if (!is.null(config$seir_modifiers)){
Expand Down Expand Up @@ -457,6 +472,13 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) {

# So far no acceptances have occurred
last_accepted_index <- 0

# get filenames of last accepted files (copy these files when rejections occur)
last_accepted_global_files <- inference::create_filename_list(run_id=opt$run_id,
prefix=setup_prefix,
filepath_suffix=global_intermediate_filepath_suffix,
filename_prefix=slotblock_filename_prefix,
index=last_accepted_index)

# Load files with the output of initialize_mcmc_first_block

Expand Down Expand Up @@ -660,9 +682,24 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) {
if ((opt$this_block == 1) && (last_accepted_index == 0)) {
print("by default because it's the first iteration of a block 1")
}

# delete previously accepted files if using a space saving option
if(!opt$save_seir){
file.remove(last_accepted_global_files[['seir_filename']]) # remove proposed SEIR file
}
if(!opt$save_hosp){
file.remove(last_accepted_global_files[['hosp_filename']]) # remove proposed HOSP file
}

# Update the index of the most recent globally accepted parameters
last_accepted_index <- this_index

# update filenames of last accepted files
last_accepted_global_files <- inference::create_filename_list(run_id=opt$run_id,
prefix=setup_prefix,
filepath_suffix=global_intermediate_filepath_suffix,
filename_prefix=slotblock_filename_prefix,
index=last_accepted_index)

if (opt$reset_chimeric_on_accept) {
reset_chimeric_files <- TRUE # triggers globally accepted parameters to push back to chimeric
Expand All @@ -686,18 +723,23 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) {

# File saving: If global reject occurs, remove "proposed" parameters from global files and instead replacing with the last accepted values

# get filenames of last accepted files
old_global_files <- inference::create_filename_list(run_id=opt$run_id,
prefix=setup_prefix,
filepath_suffix=global_intermediate_filepath_suffix,
filename_prefix=slotblock_filename_prefix,
index=last_accepted_index)

# Update current global likelihood to last accepted one, and record some acceptance statistics

# Replace current global files with last accepted values

# If save_seir = FALSE, don't copy intermediate SEIR files because they aren't being saved
# If save_hosp = FALSE, don't copy intermediate HOSP files because they aren't being saved
for (type in names(this_global_files)) {
file.copy(old_global_files[[type]],this_global_files[[type]], overwrite = TRUE)
if((!opt$save_seir & type!='seir_filename') & (!opt$save_hosp & type!='hosp_filename')){
# copy if (save_seir = FALSE OR type is not SEIR) AND (save_hosp = FALSE OR type is not HOSP)
file.copy(last_accepted_global_files[[type]],this_global_files[[type]], overwrite = TRUE)
}
}
if(!opt$save_seir){
file.remove(this_global_files[['seir_filename']]) # remove proposed SEIR file
}
if(!opt$save_hosp){
file.remove(this_global_files[['hosp_filename']]) # remove proposed HOSP file
}

#acceptance probability for this iteration
Expand Down Expand Up @@ -808,7 +850,7 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) {
arrow::write_parquet(new_hnpi,this_chimeric_files[['hnpi_filename']])
arrow::write_parquet(chimeric_current_likelihood_data, this_chimeric_files[['llik_filename']])

print(paste("Current accepted index is ",last_accepted_index))
print(paste("Last accepted index is ",last_accepted_index))


# set initial values to start next iteration
Expand Down Expand Up @@ -843,6 +885,7 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) {
if (opt$memory_profiling){

if (this_index %% opt$memory_profiling_iters == 0 | this_index == 1){
print('doing memory profiling')
tot_objs_ <- as.numeric(object.size(x=lapply(ls(all.names = TRUE), get)) * 9.31e-10)
tot_mem_ <- sum(gc()[,2]) / 1000
curr_obj_sizes <- data.frame('object' = ls()) %>%
Expand Down Expand Up @@ -874,13 +917,17 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) {
gc()
}

# Ending this MCMC iteration

}

# Ending this MCMC iteration
# Ending this MCMC chain (aka "slot")

# Create "final" files after MCMC chain is completed
# Will fail if unsuccessful

# moves the most recently globally accepted parameter values from global/intermediate file to global/final
# all file types
print("Copying latest global files to final")
cpy_res_global <- inference::perform_MCMC_step_copies_global(current_index = last_accepted_index,
slot = opt$this_slot,
Expand All @@ -890,7 +937,9 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) {
slotblock_filename_prefix = slotblock_filename_prefix,
slot_filename_prefix = slot_filename_prefix)
if (!prod(unlist(cpy_res_global))) {stop("File copy failed:", paste(unlist(cpy_res_global),paste(names(cpy_res_global),"|")))}

# moves the most recent chimeric parameter values from chimeric/intermediate file to chimeric/final
# all file types except seir and hosp
print("Copying latest chimeric files to final")
cpy_res_chimeric <- inference::perform_MCMC_step_copies_chimeric(current_index = this_index,
slot = opt$this_slot,
Expand All @@ -901,20 +950,26 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) {
slot_filename_prefix = slot_filename_prefix)
if (!prod(unlist(cpy_res_chimeric))) {stop("File copy failed:", paste(unlist(cpy_res_chimeric),paste(names(cpy_res_chimeric),"|")))}

#####Write currently accepted files to disk
#NOTE: Don't understand why we write these files that don't have an iteration index
warning("Chimeric hosp and seir files not yet supported, just using the most recently globally accepted file of each type")

#NOTE: Don't understand why we write these files that don't have an iteration index. Not sure what used for
#files of the form ../chimeric/intermediate/{slot}.{block}.{run_id}.{variable}.parquet
output_chimeric_files <- inference::create_filename_list(run_id=opt$run_id,prefix=setup_prefix, filepath_suffix=chimeric_intermediate_filepath_suffix, filename_prefix=slot_filename_prefix, index=opt$this_block)
#files of the form .../global/intermediate/{slot}.{block}.{run_id}.{variable}.parquet
output_global_files <- inference::create_filename_list(run_id=opt$run_id,prefix=setup_prefix,filepath_suffix=global_intermediate_filepath_suffix,filename_prefix=slot_filename_prefix, index=opt$this_block)

warning("Chimeric hosp and seir files not yet supported, just using the most recently generated file of each type")
#files of the form .../global/intermediate/{slot}.{block}.{iteration}.{run_id}.{variable}.parquet
this_index_global_files <- inference::create_filename_list(run_id=opt$run_id,prefix=setup_prefix, filepath_suffix=global_intermediate_filepath_suffix, filename_prefix=slotblock_filename_prefix, index=this_index)
this_index_global_files <- inference::create_filename_list(run_id=opt$run_id,prefix=setup_prefix, filepath_suffix=global_intermediate_filepath_suffix, filename_prefix=slotblock_filename_prefix, index=last_accepted_index)

# copy files from most recent global to end of block chimeric??
file.copy(this_index_global_files[['hosp_filename']],output_chimeric_files[['hosp_filename']])
file.copy(this_index_global_files[['seir_filename']],output_chimeric_files[['seir_filename']])

# if using space-saving options, delete the last accepted global intermediate giles
if(!opt$save_seir){
file.remove(last_accepted_global_files[['seir_filename']]) # remove proposed SEIR file
}
if(!opt$save_hosp){
file.remove(last_accepted_global_files[['hosp_filename']]) # remove proposed HOSP file
}

endTimeCount=difftime(Sys.time(), startTimeCount, units = "secs")
print(paste("Time to run all MCMC iterations of slot ",opt$this_slot," is ",formatC(endTimeCount,digits=2,format="f")," seconds"))
Expand Down

0 comments on commit da23f2b

Please sign in to comment.