diff --git a/datasetup/build_US_setup.R b/datasetup/build_US_setup.R index 825e2ea05..0ba217a5e 100644 --- a/datasetup/build_US_setup.R +++ b/datasetup/build_US_setup.R @@ -153,6 +153,9 @@ if (length(config$subpop_setup$geodata) > 0) { # manually remove PR census_data <- census_data %>% filter(USPS != "PR") + +if(!dir.exists(dirname(config$subpop_setup$geodata))){ + dir.create(dirname(config$subpop_setup$geodata))} write.csv(file = file.path(geodata_file), census_data, row.names=FALSE) print(paste("Wrote geodata file:", file.path(geodata_file))) @@ -162,13 +165,13 @@ print(paste("Wrote geodata file:", file.path(geodata_file))) # MOBILITY DATA (COMMUTER DATA) ------------------------------------------------------------ -if(state_level & !file.exists(paste0(config$data_path, "/", config$subpop_setup$mobility))){ +if(state_level & !file.exists(paste0(config$subpop_setup$mobility))){ warning(paste("State-level mobility files must be created manually because `build_US_setup.R` does not generate a state-level mobility file automatically. No valid mobility file named", paste0(config$data_path, "/", config$subpop_setup$mobility), "(specified in the config) currently exists. Please check again.")) -} else if(state_level & file.exists(paste0(config$data_path, "/", config$subpop_setup$mobility))){ +} else if(state_level & file.exists(paste0(config$subpop_setup$mobility))){ - warning(paste("Using existing state-level mobility file named", paste0(config$data_path, "/", config$subpop_setup$mobility))) + warning(paste("Using existing state-level mobility file named", paste0(config$subpop_setup$mobility))) } else{ diff --git a/datasetup/build_covid_data.R b/datasetup/build_covid_data.R index 1d22eac18..047d549c0 100644 --- a/datasetup/build_covid_data.R +++ b/datasetup/build_covid_data.R @@ -389,7 +389,8 @@ us_data <- us_data %>% # mutate(across(starts_with("incid"), ~ replace_na(.x, 0))) %>% mutate(across(starts_with("incid"), ~ as.numeric(.x))) - +if(!dir.exists(dirname(config$inference$gt_data_path))){ + dir.create(dirname(config$inference$gt_data_path))} # Save write_csv(us_data, config$inference$gt_data_path) diff --git a/datasetup/build_flu_data.R b/datasetup/build_flu_data.R index 0ba058da9..56a4270e1 100644 --- a/datasetup/build_flu_data.R +++ b/datasetup/build_flu_data.R @@ -80,6 +80,8 @@ us_data <- us_data %>% filter(date >= lubridate::as_date(config$start_date) & date <= lubridate::as_date(end_date_)) %>% filter(!is.na(source)) +if(!dir.exists(dirname(config$inference$gt_data_path))){ + dir.create(dirname(config$inference$gt_data_path))} write_csv(us_data, config$inference$gt_data_path) diff --git a/datasetup/build_nonUS_setup.R b/datasetup/build_nonUS_setup.R index 7af1ccf7b..c926dedf3 100644 --- a/datasetup/build_nonUS_setup.R +++ b/datasetup/build_nonUS_setup.R @@ -103,6 +103,8 @@ if(opt$w){ } # Save population geodata +if(!dir.exists(dirname(config$subpop_setup$geodata))){ + dir.create(dirname(config$subpop_setup$geodata))} names(census_data) <- c("subpop","admin2","admin0","pop") write.csv(file = file.path('geodata.csv'), census_data,row.names=FALSE) diff --git a/flepimop/R_packages/inference/R/inference_slot_runner_funcs.R b/flepimop/R_packages/inference/R/inference_slot_runner_funcs.R index a8d7ddece..83137fef5 100644 --- a/flepimop/R_packages/inference/R/inference_slot_runner_funcs.R +++ b/flepimop/R_packages/inference/R/inference_slot_runner_funcs.R @@ -84,7 +84,6 @@ aggregate_and_calc_loc_likelihoods <- function( ## We use a data frame for debugging, only ll is used likelihood_data[[location]] <- dplyr::tibble( ll = this_location_log_likelihood, - filename = hosp_file, subpop = location, accept = 0, # acceptance decision (0/1) . Will be updated later when accept/reject decisions made accept_avg = 0, # running average acceptance decision @@ -589,8 +588,8 @@ initialize_mcmc_first_block <- function( } # load and add to original seeding - seed_new <- readr::read_csv(global_files[["seed_filename"]]) - added_seeding <- readr::read_csv(config$seeding$added_seeding$added_lambda_file) + seed_new <- readr::read_csv(global_files[["seed_filename"]],show_col_types = FALSE) + added_seeding <- readr::read_csv(config$seeding$added_seeding$added_lambda_file,show_col_types = FALSE) if (!is.null(config$seeding$added_seeding$fix_original_seeding) && config$seeding$added_seeding$fix_original_seeding){ @@ -626,19 +625,6 @@ initialize_mcmc_first_block <- function( } initial_init_file <- config$initial_conditions$initial_conditions_file - if (!file.exists(config$initial_conditions$initial_conditions_file)) { - stop("ERROR: Initial conditions file specified but does not exist.") - } - if (grepl(".csv", initial_init_file)){ - initial_init <- readr::read_csv(initial_init_file) - arrow::write_parquet(initial_init, global_files[["init_filename"]]) - }else{ - err <- !(file.copy(initial_init_file, global_files[["init_filename"]])) - if (err != 0) { - stop("Could not copy initial conditions file") - } - } - } else if (config$initial_conditions$method %in% c("InitialConditionsFolderDraw", "SetInitialConditionsFolderDraw")) { print("Initial conditions in inference has not been fully implemented yet for the 'folder draw' methods, and no copying to global or chimeric files is being done.") @@ -647,21 +633,34 @@ initialize_mcmc_first_block <- function( stop("ERROR: Initial conditions file needs to be specified in the config under `initial_conditions:initial_conditions_file`") } initial_init_file <- global_files[[paste0(config$initial_conditions$initial_file_type, "_filename")]] + } - if (!file.exists(initial_init_file)) { - stop("ERROR: Initial conditions file specified but does not exist.") - } - if (grepl(".csv", initial_init_file)){ - initial_init <- readr::read_csv(initial_init_file) - arrow::write_parquet(initial_init, global_files[["init_filename"]]) - }else{ - err <- !(file.copy(initial_init_file, global_files[["init_filename"]])) - if (err != 0) { - stop("Could not copy initial conditions file") - } - } + if (!file.exists(initial_init_file)) { + stop("ERROR: Initial conditions file specified but does not exist.") + } + + if (grepl(".csv", initial_init_file)){ + initial_init <- readr::read_csv(initial_init_file,show_col_types = FALSE) + }else{ + initial_init <- arrow::read_parquet(initial_init_file) } + + # if the initial conditions file contains a 'date' column, filter for config$start_date + + if("date" %in% colnames(initial_init)){ + + initial_init <- initial_init %>% + dplyr::mutate(date = as.POSIXct(date, tz="UTC")) %>% + dplyr::filter(date == as.POSIXct(paste0(config$start_date, " 00:00:00"), tz="UTC")) + + if (nrow(initial_init) == 0) { + stop("ERROR: Initial conditions file specified but does not contain the start date.") + } + + } + + arrow::write_parquet(initial_init, global_files[["init_filename"]]) } } diff --git a/flepimop/R_packages/inference/tests/testthat/test-aggregate_and_calc_loc_likelihoods.R b/flepimop/R_packages/inference/tests/testthat/test-aggregate_and_calc_loc_likelihoods.R index a4f2a961e..8e9a4bc95 100644 --- a/flepimop/R_packages/inference/tests/testthat/test-aggregate_and_calc_loc_likelihoods.R +++ b/flepimop/R_packages/inference/tests/testthat/test-aggregate_and_calc_loc_likelihoods.R @@ -195,7 +195,7 @@ test_that("aggregate_and_calc_loc_likelihoods returns a likelihood per location expect_that(nrow(tmp), equals(length(stuff$all_locations))) - expect_that(sort(colnames(tmp)), equals(sort(c("ll","accept","accept_prob","accept_avg","filename",stuff$obs_subpop)))) + expect_that(sort(colnames(tmp)), equals(sort(c("ll","accept","accept_prob","accept_avg",stuff$obs_subpop)))) }) diff --git a/flepimop/main_scripts/inference_main.R b/flepimop/main_scripts/inference_main.R index aa912822f..894ea3719 100644 --- a/flepimop/main_scripts/inference_main.R +++ b/flepimop/main_scripts/inference_main.R @@ -34,7 +34,12 @@ option_list = list( optparse::make_option(c("-r", "--rpath"), action="store", default=Sys.getenv("RSCRIPT_PATH","Rscript"), type = 'character', help = "path to R executable"), optparse::make_option(c("-R", "--is-resume"), action="store", default=Sys.getenv("RESUME_RUN",FALSE), type = 'logical', help = "Is this run a resume"), optparse::make_option(c("-I", "--is-interactive"), action="store", default=Sys.getenv("RUN_INTERACTIVE",Sys.getenv("INTERACTIVE_RUN", FALSE)), type = 'logical', help = "Is this run an interactive run"), - optparse::make_option(c("-L", "--reset_chimeric_on_accept"), action = "store", default = Sys.getenv("FLEPI_RESET_CHIMERICS", TRUE), type = 'logical', help = 'Should the chimeric parameters get reset to global parameters when a global acceptance occurs') + optparse::make_option(c("-L", "--reset_chimeric_on_accept"), action = "store", default = Sys.getenv("FLEPI_RESET_CHIMERICS", TRUE), type = 'logical', help = 'Should the chimeric parameters get reset to global parameters when a global acceptance occurs'), + optparse::make_option(c("-S","--save_seir"), action = "store", default = Sys.getenv("SAVE_SEIR", FALSE), type = 'logical', help = 'Should the SEIR output files be saved for each iteration'), + optparse::make_option(c("-H","--save_hosp"), action = "store", default = Sys.getenv("SAVE_HOSP", TRUE), type = 'logical', help = 'Should the HOSP output files be saved for each iteration'), + optparse::make_option(c("-M", "--memory_profiling"), action = "store", default = Sys.getenv("FLEPI_MEM_PROFILE", FALSE), type = 'logical', help = 'Should the memory profiling be run during iterations'), + optparse::make_option(c("-P", "--memory_profiling_iters"), action = "store", default = Sys.getenv("FLEPI_MEM_PROF_ITERS", 100), type = 'integer', help = 'If doing memory profiling, after every X iterations run the profiler'), + optparse::make_option(c("-g", "--subpop_len"), action="store", default=Sys.getenv("SUBPOP_LENGTH", 5), type='integer', help = "number of digits in subpop") ) parser=optparse::OptionParser(option_list=option_list) @@ -114,7 +119,7 @@ foreach(seir_modifiers_scenario = seir_modifiers_scenarios) %:% if (nchar(opt$ground_truth_start) > 0) { ground_truth_start_text <- c("--ground_truth_start", opt$ground_truth_start) } - if (nchar(opt$ground_truth_start) > 0) { + if (nchar(opt$ground_truth_end) > 0) { ground_truth_end_text <- c("--ground_truth_end", opt$ground_truth_end) } @@ -139,6 +144,11 @@ foreach(seir_modifiers_scenario = seir_modifiers_scenarios) %:% "-R", opt[["is-resume"]], "-I", opt[["is-interactive"]], "-L", opt$reset_chimeric_on_accept, + "-S", opt$save_seir, + "-H", opt$save_hosp, + "-M", opt$memory_profiling, + "-P", opt$memory_profiling_iters, + "-g", opt$subpop_len, #paste("2>&1 | tee log_inference_slot_",config$name,"_",opt$run_id, "_", flepi_slot, ".txt", sep=""), # works on Mac only, not windows #paste("2>&1 | tee model_output/",config$name,"/",opt$run_id,"/log/log_inference_slot", flepi_slot, ".txt", sep=""), # doesn't work because config$name needs to be combined with scenarios to generate the folder name, and, because this command seems to only be able to pipe output to pre-existing folders sep = " ") diff --git a/flepimop/main_scripts/inference_slot.R b/flepimop/main_scripts/inference_slot.R index 0eb986ef0..a0bb3610a 100644 --- a/flepimop/main_scripts/inference_slot.R +++ b/flepimop/main_scripts/inference_slot.R @@ -48,6 +48,8 @@ option_list = list( optparse::make_option(c("-R", "--is-resume"), action="store", default=Sys.getenv("RESUME_RUN",FALSE), type = 'logical', help = "Is this run a resume"), optparse::make_option(c("-I", "--is-interactive"), action="store", default=Sys.getenv("RUN_INTERACTIVE",Sys.getenv("INTERACTIVE_RUN", FALSE)), type = 'logical', help = "Is this run an interactive run"), optparse::make_option(c("-L", "--reset_chimeric_on_accept"), action = "store", default = Sys.getenv("FLEPI_RESET_CHIMERICS", TRUE), type = 'logical', help = 'Should the chimeric parameters get reset to global parameters when a global acceptance occurs'), + optparse::make_option(c("-S","--save_seir"), action = "store", default = Sys.getenv("SAVE_SEIR", FALSE), type = 'logical', help = 'Should the SEIR output files be saved for each iteration'), + optparse::make_option(c("-H","--save_hosp"), action = "store", default = Sys.getenv("SAVE_HOSP", TRUE), type = 'logical', help = 'Should the HOSP output files be saved for each iteration'), optparse::make_option(c("-M", "--memory_profiling"), action = "store", default = Sys.getenv("FLEPI_MEM_PROFILE", FALSE), type = 'logical', help = 'Should the memory profiling be run during iterations'), optparse::make_option(c("-P", "--memory_profiling_iters"), action = "store", default = Sys.getenv("FLEPI_MEM_PROF_ITERS", 100), type = 'integer', help = 'If doing memory profiling, after every X iterations run the profiler'), optparse::make_option(c("-g", "--subpop_len"), action="store", default=Sys.getenv("SUBPOP_LENGTH", 5), type='integer', help = "number of digits in subpop") @@ -358,6 +360,19 @@ if (!opt$reset_chimeric_on_accept) { warning("We recommend setting reset_chimeric_on_accept TRUE, since reseting chimeric chains on global acceptances more closely matches normal MCMC behaviour") } +if(!opt$save_seir){ + warning("To save space, intermediate SEIR files will not be saved for every iteration of the MCMC inference procedure. To save these files, set option save_seir TRUE.") +} + +if(!opt$save_hosp){ + warning("To save space, intermediate HOSP files will not be saved for every iteration of the MCMC inference procedure. To save these files, set option save_hosp TRUE.") +} + +if(opt$memory_profiling){ + print(paste("Inference will run memory profiling every",opt$memory_profiling_iters,"iterations")) +} + + for(seir_modifiers_scenario in seir_modifiers_scenarios) { if (!is.null(config$seir_modifiers)){ @@ -452,6 +467,13 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { # So far no acceptances have occurred last_accepted_index <- 0 + + # get filenames of last accepted files (copy these files when rejections occur) + last_accepted_global_files <- inference::create_filename_list(run_id=opt$run_id, + prefix=setup_prefix, + filepath_suffix=global_intermediate_filepath_suffix, + filename_prefix=slotblock_filename_prefix, + index=last_accepted_index) # Load files with the output of initialize_mcmc_first_block @@ -655,9 +677,24 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { if ((opt$this_block == 1) && (last_accepted_index == 0)) { print("by default because it's the first iteration of a block 1") } + + # delete previously accepted files if using a space saving option + if(!opt$save_seir){ + file.remove(last_accepted_global_files[['seir_filename']]) # remove proposed SEIR file + } + if(!opt$save_hosp){ + file.remove(last_accepted_global_files[['hosp_filename']]) # remove proposed HOSP file + } # Update the index of the most recent globally accepted parameters last_accepted_index <- this_index + + # update filenames of last accepted files + last_accepted_global_files <- inference::create_filename_list(run_id=opt$run_id, + prefix=setup_prefix, + filepath_suffix=global_intermediate_filepath_suffix, + filename_prefix=slotblock_filename_prefix, + index=last_accepted_index) if (opt$reset_chimeric_on_accept) { reset_chimeric_files <- TRUE # triggers globally accepted parameters to push back to chimeric @@ -681,18 +718,23 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { # File saving: If global reject occurs, remove "proposed" parameters from global files and instead replacing with the last accepted values - # get filenames of last accepted files - old_global_files <- inference::create_filename_list(run_id=opt$run_id, - prefix=setup_prefix, - filepath_suffix=global_intermediate_filepath_suffix, - filename_prefix=slotblock_filename_prefix, - index=last_accepted_index) - # Update current global likelihood to last accepted one, and record some acceptance statistics # Replace current global files with last accepted values + + # If save_seir = FALSE, don't copy intermediate SEIR files because they aren't being saved + # If save_hosp = FALSE, don't copy intermediate HOSP files because they aren't being saved for (type in names(this_global_files)) { - file.copy(old_global_files[[type]],this_global_files[[type]], overwrite = TRUE) + if((!opt$save_seir & type!='seir_filename') & (!opt$save_hosp & type!='hosp_filename')){ + # copy if (save_seir = FALSE OR type is not SEIR) AND (save_hosp = FALSE OR type is not HOSP) + file.copy(last_accepted_global_files[[type]],this_global_files[[type]], overwrite = TRUE) + } + } + if(!opt$save_seir){ + file.remove(this_global_files[['seir_filename']]) # remove proposed SEIR file + } + if(!opt$save_hosp){ + file.remove(this_global_files[['hosp_filename']]) # remove proposed HOSP file } #acceptance probability for this iteration @@ -803,7 +845,7 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { arrow::write_parquet(new_hnpi,this_chimeric_files[['hnpi_filename']]) arrow::write_parquet(chimeric_current_likelihood_data, this_chimeric_files[['llik_filename']]) - print(paste("Current accepted index is ",last_accepted_index)) + print(paste("Last accepted index is ",last_accepted_index)) # set initial values to start next iteration @@ -838,6 +880,7 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { if (opt$memory_profiling){ if (this_index %% opt$memory_profiling_iters == 0 | this_index == 1){ + print('doing memory profiling') tot_objs_ <- as.numeric(object.size(x=lapply(ls(all.names = TRUE), get)) * 9.31e-10) tot_mem_ <- sum(gc()[,2]) / 1000 curr_obj_sizes <- data.frame('object' = ls()) %>% @@ -869,13 +912,17 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { gc() } + # Ending this MCMC iteration + } - # Ending this MCMC iteration + # Ending this MCMC chain (aka "slot") # Create "final" files after MCMC chain is completed # Will fail if unsuccessful + # moves the most recently globally accepted parameter values from global/intermediate file to global/final + # all file types print("Copying latest global files to final") cpy_res_global <- inference::perform_MCMC_step_copies_global(current_index = last_accepted_index, slot = opt$this_slot, @@ -885,7 +932,9 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { slotblock_filename_prefix = slotblock_filename_prefix, slot_filename_prefix = slot_filename_prefix) if (!prod(unlist(cpy_res_global))) {stop("File copy failed:", paste(unlist(cpy_res_global),paste(names(cpy_res_global),"|")))} + # moves the most recent chimeric parameter values from chimeric/intermediate file to chimeric/final + # all file types except seir and hosp print("Copying latest chimeric files to final") cpy_res_chimeric <- inference::perform_MCMC_step_copies_chimeric(current_index = this_index, slot = opt$this_slot, @@ -896,20 +945,26 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { slot_filename_prefix = slot_filename_prefix) if (!prod(unlist(cpy_res_chimeric))) {stop("File copy failed:", paste(unlist(cpy_res_chimeric),paste(names(cpy_res_chimeric),"|")))} - #####Write currently accepted files to disk - #NOTE: Don't understand why we write these files that don't have an iteration index + warning("Chimeric hosp and seir files not yet supported, just using the most recently globally accepted file of each type") + + #NOTE: Don't understand why we write these files that don't have an iteration index. Not sure what used for #files of the form ../chimeric/intermediate/{slot}.{block}.{run_id}.{variable}.parquet output_chimeric_files <- inference::create_filename_list(run_id=opt$run_id,prefix=setup_prefix, filepath_suffix=chimeric_intermediate_filepath_suffix, filename_prefix=slot_filename_prefix, index=opt$this_block) - #files of the form .../global/intermediate/{slot}.{block}.{run_id}.{variable}.parquet - output_global_files <- inference::create_filename_list(run_id=opt$run_id,prefix=setup_prefix,filepath_suffix=global_intermediate_filepath_suffix,filename_prefix=slot_filename_prefix, index=opt$this_block) - warning("Chimeric hosp and seir files not yet supported, just using the most recently generated file of each type") #files of the form .../global/intermediate/{slot}.{block}.{iteration}.{run_id}.{variable}.parquet - this_index_global_files <- inference::create_filename_list(run_id=opt$run_id,prefix=setup_prefix, filepath_suffix=global_intermediate_filepath_suffix, filename_prefix=slotblock_filename_prefix, index=this_index) + this_index_global_files <- inference::create_filename_list(run_id=opt$run_id,prefix=setup_prefix, filepath_suffix=global_intermediate_filepath_suffix, filename_prefix=slotblock_filename_prefix, index=last_accepted_index) # copy files from most recent global to end of block chimeric?? file.copy(this_index_global_files[['hosp_filename']],output_chimeric_files[['hosp_filename']]) file.copy(this_index_global_files[['seir_filename']],output_chimeric_files[['seir_filename']]) + + # if using space-saving options, delete the last accepted global intermediate giles + if(!opt$save_seir){ + file.remove(last_accepted_global_files[['seir_filename']]) # remove proposed SEIR file + } + if(!opt$save_hosp){ + file.remove(last_accepted_global_files[['hosp_filename']]) # remove proposed HOSP file + } endTimeCount=difftime(Sys.time(), startTimeCount, units = "secs") print(paste("Time to run all MCMC iterations of slot ",opt$this_slot," is ",formatC(endTimeCount,digits=2,format="f")," seconds"))