From fcce2d439fc48c73321f0d863b4db11d1d9fb995 Mon Sep 17 00:00:00 2001 From: Shaun Truelove Date: Mon, 25 Mar 2024 10:58:14 -0400 Subject: [PATCH] add a new function to check if parquet file exists before pulling it. If it does, not, return error, printing the name of the file. --- flepimop/R_packages/flepicommon/NAMESPACE | 1 + flepimop/R_packages/flepicommon/R/DataUtils.R | 35 +- flepimop/main_scripts/inference_slot.R | 309 +++++++++--------- 3 files changed, 186 insertions(+), 159 deletions(-) diff --git a/flepimop/R_packages/flepicommon/NAMESPACE b/flepimop/R_packages/flepicommon/NAMESPACE index 426c104b2..276edf955 100644 --- a/flepimop/R_packages/flepicommon/NAMESPACE +++ b/flepimop/R_packages/flepicommon/NAMESPACE @@ -24,6 +24,7 @@ export(load_config) export(load_geodata_file) export(prettyprint_optlist) export(read_file_of_type) +export(read_parquet_with_check) export(run_id) import(covidcast) import(doParallel) diff --git a/flepimop/R_packages/flepicommon/R/DataUtils.R b/flepimop/R_packages/flepicommon/R/DataUtils.R index 56b17017d..26cb4f6b9 100755 --- a/flepimop/R_packages/flepicommon/R/DataUtils.R +++ b/flepimop/R_packages/flepicommon/R/DataUtils.R @@ -38,7 +38,7 @@ load_geodata_file <- function(filename, if(state_name) { utils::data(fips_us_county, package = "flepicommon") # arrow::read_parquet("datasetup/usdata/fips_us_county.parquet") - geodata <- fips_us_county %>% + geodata <- fips_us_county %>% dplyr::distinct(state, state_name) %>% dplyr::rename(USPS = state) %>% dplyr::rename(state = state_name) %>% @@ -53,6 +53,31 @@ load_geodata_file <- function(filename, +#' Read parquet files with check for existence to understand errors +#' +#' @param file The file to read +#' +#' @return +#' @export +#' +#' @examples +read_parquet_with_check <- function(file){ + if(!file.exists(file)){ + stop(paste("File",file,"does not exist")) + } + arrow::read_parquet(file) +} + + + + + + + + + + + #' Depracated function that returns a function to read files of a specific type (or automatically detected type based on extension) #' @param extension The file extension to read files of @@ -113,11 +138,11 @@ read_file_of_type <- function(extension,...){ # ##' @importFrom cdlTools fips census2010FIPS stateNames # ##' # download_USAFacts_data <- function(filename, url, value_col_name, incl_unassigned = FALSE){ -# +# # dir.create(dirname(filename), showWarnings = FALSE, recursive = TRUE) # message(paste("Downloading", url, "to", filename)) # download.file(url, filename, "auto") -# +# # usafacts_data <- readr::read_csv(filename) # names(usafacts_data) <- stringr::str_to_lower(names(usafacts_data)) # usafacts_data <- dplyr::select(usafacts_data, -statefips,-`county name`) %>% # drop statefips columns @@ -141,13 +166,13 @@ read_file_of_type <- function(extension,...){ # date_func <- ifelse(any(grepl("^\\d\\d\\d\\d",col_names)),lubridate::ymd, lubridate::mdy) # usafacts_data <- tidyr::pivot_longer(usafacts_data, tidyselect::all_of(date_cols), names_to="Update", values_to=value_col_name) # usafacts_data <- dplyr::mutate(usafacts_data, Update=date_func(Update), FIPS=sprintf("%05d", FIPS)) -# +# # validation_date <- Sys.getenv("VALIDATION_DATE") # if ( validation_date != '' ) { # print(paste("(DataUtils.R) Limiting USAFacts data to:", validation_date, sep=" ")) # usafacts_data <- dplyr::filter(usafacts_data, Update < validation_date ) # } -# +# # return(usafacts_data) # } diff --git a/flepimop/main_scripts/inference_slot.R b/flepimop/main_scripts/inference_slot.R index e86243b12..4d0f06d36 100644 --- a/flepimop/main_scripts/inference_slot.R +++ b/flepimop/main_scripts/inference_slot.R @@ -1,6 +1,6 @@ # About -## This script runs a single slot (MCMC chain) of an inference run. It can be called directly, but is often called from inference_main.R if multiple slots are run. +## This script runs a single slot (MCMC chain) of an inference run. It can be called directly, but is often called from inference_main.R if multiple slots are run. # Run Options --------------------------------------------------------------------- @@ -27,7 +27,7 @@ required_packages <- c("dplyr", "magrittr", "xts", "zoo", "stringr") # There are multiple ways to specify options when inference_slot.R is run, which take the following precedence: # 1) (optional) options called along with the script at the command line (ie > Rscript inference_main.R -c my_config.yml) # 2) (optional) environmental variables set by the user (ie user could set > export CONFIG_PATH = ~/flepimop_sample/my_config.yml to not have t specify it each time the script is run) -# If neither are specified, then a default value is used, given by the second argument of Sys.getenv() commands below. +# If neither are specified, then a default value is used, given by the second argument of Sys.getenv() commands below. # *3) For some options, a default doesn't exist, and the value specified in the config will be used if the option is not specified at the command line or by an environmental variable (iterations_per_slot, slots) option_list = list( @@ -74,7 +74,7 @@ flepicommon::prettyprint_optlist(opt) # load Python to use via R -reticulate::use_python(Sys.which(opt$python), required = TRUE) +reticulate::use_python(Sys.which(opt$python), required = TRUE) # Load gempyor module gempyor <- reticulate::import("gempyor") @@ -252,9 +252,9 @@ if ("priors" %in% names(config$inference)) { # ~ WITH Inference ---------------------------------------------------- if (config$inference$do_inference){ - + ## Load ground truth data - + obs <- suppressMessages( readr::read_csv(config$inference$gt_data_path, col_types = readr::cols(date = readr::col_date(), @@ -264,18 +264,18 @@ if (config$inference$do_inference){ dplyr::filter(subpop %in% subpops_, date >= gt_start_date, date <= gt_end_date) %>% dplyr::right_join(tidyr::expand_grid(subpop = unique(.$subpop), date = unique(.$date))) %>% dplyr::mutate_if(is.numeric, dplyr::coalesce, 0) - + subpopnames <- unique(obs[[obs_subpop]]) - - + + ## Compute statistics - + ## for each subpopulation, processes the data as specified in config - finds data types of interest, aggregates by specified period (e.g week), deals with zeros and NAs, etc data_stats <- lapply( subpopnames, function(x) { df <- obs[obs[[obs_subpop]] == x, ] - inference::getStats( + inference::getStats( df, "date", "data_var", @@ -285,48 +285,48 @@ if (config$inference$do_inference){ ) }) %>% set_names(subpopnames) - - + + # function to calculate the likelihood when comparing simulation output (sim_hosp) to ground truth data likelihood_calculation_fun <- function(sim_hosp){ - + sim_hosp <- dplyr::filter(sim_hosp,sim_hosp$time >= min(obs$date),sim_hosp$time <= max(obs$date)) lhs <- unique(sim_hosp[[obs_subpop]]) rhs <- unique(names(data_stats)) all_locations <- rhs[rhs %in% lhs] - + ## No references to config$inference$statistics inference::aggregate_and_calc_loc_likelihoods( all_locations = all_locations, # technically different modeled_outcome = sim_hosp, # simulation output obs_subpop = obs_subpop, targets_config = config[["inference"]][["statistics"]], - obs = obs, + obs = obs, ground_truth_data = data_stats, hosp_file = first_global_files[['llik_filename']], hierarchical_stats = hierarchical_stats, defined_priors = defined_priors, geodata = geodata, - snpi = arrow::read_parquet(first_global_files[['snpi_filename']]), - hnpi = arrow::read_parquet(first_global_files[['hnpi_filename']]), - hpar = dplyr::mutate(arrow::read_parquet(first_global_files[['hpar_filename']]),parameter=paste(quantity,!!rlang::sym(obs_subpop),outcome,sep='_')), + snpi = flepicommon::read_parquet_with_check(first_global_files[['snpi_filename']]), + hnpi = flepicommon::read_parquet_with_check(first_global_files[['hnpi_filename']]), + hpar = dplyr::mutate(flepicommon::read_parquet_with_check(first_global_files[['hpar_filename']]),parameter=paste(quantity,!!rlang::sym(obs_subpop),outcome,sep='_')), start_date = gt_start_date, end_date = gt_end_date ) } print("Running WITH inference") - - + + # ~ WITHOUT Inference --------------------------------------------------- - + } else { - + subpopnames <- obs_subpop - + likelihood_calculation_fun <- function(sim_hosp){ - + all_locations <- unique(sim_hosp[[obs_subpop]]) - + ## No references to config$inference$statistics inference::aggregate_and_calc_loc_likelihoods( all_locations = all_locations, # technically different @@ -339,9 +339,10 @@ if (config$inference$do_inference){ hierarchical_stats = hierarchical_stats, defined_priors = defined_priors, geodata = geodata, - snpi = arrow::read_parquet(first_global_files[['snpi_filename']]), - hnpi = arrow::read_parquet(first_global_files[['hnpi_filename']]), - hpar = dplyr::mutate(arrow::read_parquet(first_global_files[['hpar_filename']]),parameter=paste(quantity,!!rlang::sym(obs_subpop),outcome,sep='_')), + snpi = flepicommon::read_parquet_with_check(first_global_files[['snpi_filename']]), + hnpi = flepicommon::read_parquet_with_check(first_global_files[['hnpi_filename']]), + hpar = dplyr::mutate(flepicommon::read_parquet_with_check(first_global_files[['hpar_filename']]), + parameter=paste(quantity,!!rlang::sym(obs_subpop),outcome,sep='_')), start_date = gt_start_date, end_date = gt_end_date ) @@ -363,23 +364,23 @@ if (!opt$reset_chimeric_on_accept) { } for(seir_modifiers_scenario in seir_modifiers_scenarios) { - + if (!is.null(config$seir_modifiers)){ print(paste0("Running seir modifier scenario: ", seir_modifiers_scenario)) } else { print(paste0("No seir modifier scenarios")) seir_modifiers_scenario <- NULL } - + for(outcome_modifiers_scenario in outcome_modifiers_scenarios) { - + if (!is.null(config$outcome_modifiers)){ print(paste0("Running outcome modifier scenario: ", outcome_modifiers_scenario)) } else { print(paste0("No outcome modifier scenarios")) outcome_modifiers_scenario <- NULL } - + # If no seir or outcome scenarios, instead pass py_none() to Gempyor (which assigns no value to the scenario) if (is.null(seir_modifiers_scenario)){ seir_modifiers_scenario <- reticulate::py_none() @@ -387,20 +388,20 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { if (is.null(outcome_modifiers_scenario)){ outcome_modifiers_scenario <- reticulate::py_none() } - + reset_chimeric_files <- FALSE # this turns on whenever a global acceptance occurs - + ## Set up first iteration of chain ---------- - + ### Create python simulator object - + # Create parts of filename pieces for simulation to save output with # flepicommon::create_prefix is roughly equivalent to paste(...) with some specific formatting rule chimeric_intermediate_filepath_suffix <- flepicommon::create_prefix(prefix="",'chimeric','intermediate',sep='/',trailing_separator='') global_intermediate_filepath_suffix <- flepicommon::create_prefix(prefix="",'global','intermediate',sep='/',trailing_separator='') slot_filename_prefix <- flepicommon::create_prefix(slot=list(opt$this_slot,"%09d"), sep='.', trailing_separator='.') slotblock_filename_prefix <- flepicommon::create_prefix(slot=list(opt$this_slot,"%09d"), block=list(opt$this_block,"%09d"), sep='.', trailing_separator='.') - + ## python configuration: build simulator model specified in config tryCatch({ gempyor_inference_runner <- gempyor$GempyorSimulator( @@ -421,8 +422,8 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { }) setup_prefix <- gempyor_inference_runner$modinf$get_setup_name() # file name piece of the form [config$name]_[seir_modifier_scenario]_[outcome_modifier_scenario] print("gempyor_inference_runner created successfully.") - - + + # Get names of files where output from the initial simulation will be saved ## {prefix}/{run_id}/{type}/{suffix}/{prefix}.{index = block-1}.{run_id}.{type}.{ext} ## N.B.: prefix should end in "{slot}." NOTE: Potential problem. Prefix is {slot}.{block} but then "index" includes block also?? @@ -432,12 +433,12 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { filename_prefix=slotblock_filename_prefix, index=opt$this_block - 1) first_chimeric_files <- inference::create_filename_list(run_id=opt$run_id, - prefix=setup_prefix, + prefix=setup_prefix, filepath_suffix=chimeric_intermediate_filepath_suffix, filename_prefix=slotblock_filename_prefix, index=opt$this_block - 1) - - + + print("RUNNING: MCMC initialization for the first block") # Output saved to files of the form {setup_prefix}/{run_id}/{type}/global/intermediate/{slotblock_filename_prefix}.(block-1).{run_id}.{type}.{ext} # also copied into the /chimeric/ version, which are referenced by first_global_files and first_chimeric_files @@ -446,71 +447,71 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { block = opt$this_block, setup_prefix = setup_prefix, global_intermediate_filepath_suffix = global_intermediate_filepath_suffix, - chimeric_intermediate_filepath_suffix = chimeric_intermediate_filepath_suffix, + chimeric_intermediate_filepath_suffix = chimeric_intermediate_filepath_suffix, filename_prefix = slotblock_filename_prefix, # might be wrong, maybe should just be slot_filename_prefix gempyor_inference_runner = gempyor_inference_runner, likelihood_calculation_function = likelihood_calculation_fun, is_resume = opt[['is-resume']] ) print("First MCMC block initialized successfully.") - + # So far no acceptances have occurred last_accepted_index <- 0 - + # Load files with the output of initialize_mcmc_first_block - + # load those files (chimeric currently identical to global) - initial_spar <- arrow::read_parquet(first_chimeric_files[['spar_filename']]) - initial_hpar <- arrow::read_parquet(first_chimeric_files[['hpar_filename']]) - initial_snpi <- arrow::read_parquet(first_chimeric_files[['snpi_filename']]) - initial_hnpi <- arrow::read_parquet(first_chimeric_files[['hnpi_filename']]) + initial_spar <- read_parquet_with_check(first_chimeric_files[['spar_filename']]) + initial_hpar <- read_parquet_with_check(first_chimeric_files[['hpar_filename']]) + initial_snpi <- read_parquet_with_check(first_chimeric_files[['snpi_filename']]) + initial_hnpi <- read_parquet_with_check(first_chimeric_files[['hnpi_filename']]) if (!is.null(config$initial_conditions)){ - initial_init <- arrow::read_parquet(first_chimeric_files[['init_filename']]) + initial_init <- read_parquet_with_check(first_chimeric_files[['init_filename']]) } if (!is.null(config$seeding)){ seeding_col_types <- NULL suppressMessages(initial_seeding <- readr::read_csv(first_chimeric_files[['seed_filename']], col_types=seeding_col_types)) - + if (opt$stoch_traj_flag) { initial_seeding$amount <- as.integer(round(initial_seeding$amount)) } }else{ initial_seeding <- NULL } - chimeric_current_likelihood_data <- arrow::read_parquet(first_chimeric_files[['llik_filename']]) - global_current_likelihood_data <- arrow::read_parquet(first_global_files[['llik_filename']]) # they are the same ... don't need to load both - + chimeric_current_likelihood_data <- read_parquet_with_check(first_chimeric_files[['llik_filename']]) + global_current_likelihood_data <- read_parquet_with_check(first_global_files[['llik_filename']]) # they are the same ... don't need to load both + #####Get the full likelihood (WHY IS THIS A DATA FRAME) # Compute total loglik for each sim global_current_likelihood_total <- sum(global_current_likelihood_data$ll) - + #####LOOP NOTES ### this_index is the current MCMC iteration ### last_accepted_index is the index of the most recent globally accepted iternation - + startTimeCount=Sys.time() - + ## Loop over simulations in this block -------------------------------------------- - + # keep track of running average global acceptance rate, since old global likelihood data not kept in memory. Each geoID has same value for acceptance rate in global case, so we just take the 1st entry old_avg_global_accept_rate <- global_current_likelihood_data$accept_avg[1] old_avg_chimeric_accept_rate <- chimeric_current_likelihood_data$accept_avg - + for (this_index in seq_len(opt$iterations_per_slot)) { - + print(paste("Running iteration", this_index)) - + startTimeCountEach = Sys.time() - + ## Create filenames to save output from each iteration this_global_files <- inference::create_filename_list(run_id=opt$run_id, prefix = setup_prefix, filepath_suffix=global_intermediate_filepath_suffix, filename_prefix=slotblock_filename_prefix, index=this_index) this_chimeric_files <- inference::create_filename_list(run_id=opt$run_id, prefix = setup_prefix, filepath_suffix=chimeric_intermediate_filepath_suffix, filename_prefix=slotblock_filename_prefix, index=this_index) - + ### Perturb accepted parameters to get proposed parameters ---- - + # since the first iteration is accepted by default, we don't perturb it, so proposed = initial if ((opt$this_block == 1) && (last_accepted_index == 0)) { - + proposed_spar <- initial_spar proposed_hpar <- initial_hpar proposed_snpi <- initial_snpi @@ -521,19 +522,19 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { if (!is.null(config$seeding)){ proposed_seeding <- initial_seeding } - + }else{ # perturb each parameter type - + proposed_spar <- initial_spar # currently no function to perturb proposed_hpar <- inference::perturb_hpar(initial_hpar, config$outcomes$outcomes) # NOTE: Deprecated?? ?no scenarios possible right now? - + if (!is.null(config$seir_modifiers$modifiers)){ proposed_snpi <- inference::perturb_snpi(initial_snpi, config$seir_modifiers$modifiers) } if (!is.null(config$outcome_modifiers$modifiers)){ proposed_hnpi <- inference::perturb_hnpi(initial_hnpi, config$outcome_modifiers$modifiers) } - + if (!is.null(config$seeding)){ proposed_seeding <- inference::perturb_seeding( seeding = initial_seeding, @@ -552,10 +553,10 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { proposed_init <- initial_init } } - + } - - # Write proposed parameters to files for other code to read. + + # Write proposed parameters to files for other code to read. # Temporarily stored in global files, which are eventually overwritten with global accepted values arrow::write_parquet(proposed_spar,this_global_files[['spar_filename']]) arrow::write_parquet(proposed_hpar,this_global_files[['hpar_filename']]) @@ -567,9 +568,9 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { if (!is.null(config$initial_conditions)){ arrow::write_parquet(proposed_init, this_global_files[['init_filename']]) } - + ## Run the simulator with proposed parameters ------------------- - + # create simulator tryCatch({ gempyor_inference_runner$one_simulation( @@ -582,12 +583,12 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { for(m in reticulate::py_last_error()) cat(m) stop("GempyorSimulator failed to run... stopping") }) - + # run if (config$inference$do_inference){ sim_hosp <- flepicommon::read_file_of_type(gsub(".*[.]","",this_global_files[['hosp_filename']]))(this_global_files[['hosp_filename']]) %>% dplyr::filter(time >= min(obs$date),time <= max(obs$date)) - + lhs <- unique(sim_hosp[[obs_subpop]]) rhs <- unique(names(data_stats)) all_locations <- rhs[rhs %in% lhs] @@ -597,7 +598,7 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { obs <- sim_hosp data_stats <- sim_hosp } - + ## Compare model output to data and calculate likelihood ---- proposed_likelihood_data <- inference::aggregate_and_calc_loc_likelihoods( all_locations = all_locations, @@ -619,12 +620,12 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { start_date = gt_start_date, end_date = gt_end_date ) - + rm(sim_hosp) - + # write proposed likelihood to global file arrow::write_parquet(proposed_likelihood_data, this_global_files[['llik_filename']]) - + ## UNCOMMENT TO DEBUG # print('current global likelihood') # print(global_current_likelihood_data) @@ -632,111 +633,111 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { # print(chimeric_current_likelihood_data) #print('proposed likelihood') #print(proposed_likelihood_data) - + ## Compute total loglik for each sim proposed_likelihood_total <- sum(proposed_likelihood_data$ll) ## For logging print(paste("Current likelihood",formatC(global_current_likelihood_total,digits=2,format="f"),"Proposed likelihood", formatC(proposed_likelihood_total,digits=2,format="f"))) - - + + ## Global likelihood acceptance or rejection decision ----------- - - # Compare total likelihood (product of all subpopulations) in current vs proposed likelihood. + + # Compare total likelihood (product of all subpopulations) in current vs proposed likelihood. # Accept if MCMC acceptance decision = 1 or it's the first iteration of the first block # note - we already have a catch for the first block thing earlier (we set proposed = initial likelihood) - shouldn't need 2! global_accept <- ifelse( #same value for all subpopulations - inference::iterateAccept(global_current_likelihood_total, proposed_likelihood_total) || + inference::iterateAccept(global_current_likelihood_total, proposed_likelihood_total) || ((last_accepted_index == 0) && (opt$this_block == 1)),1,0 ) - + # only do global accept if all subpopulations accepted? - if (global_accept == 1 | config$inference$do_inference == FALSE) { - + if (global_accept == 1 | config$inference$do_inference == FALSE) { + print("**** GLOBAL ACCEPT (Recording) ****") - + if ((opt$this_block == 1) && (last_accepted_index == 0)) { print("by default because it's the first iteration of a block 1") } - + # Update the index of the most recent globally accepted parameters last_accepted_index <- this_index - + if (opt$reset_chimeric_on_accept) { reset_chimeric_files <- TRUE # triggers globally accepted parameters to push back to chimeric } - + # Update current global likelihood to proposed likelihood and record some acceptance statistics - + #acceptance probability for this iteration - this_accept_prob <- exp(min(c(0, proposed_likelihood_total - global_current_likelihood_total))) - + this_accept_prob <- exp(min(c(0, proposed_likelihood_total - global_current_likelihood_total))) + global_current_likelihood_data <- proposed_likelihood_data # this is used for next iteration global_current_likelihood_total <- proposed_likelihood_total # this is used for next iteration - + global_current_likelihood_data$accept <- 1 # global acceptance decision (0/1), same for each geoID global_current_likelihood_data$accept_prob <- this_accept_prob - + # File saving: If global accept occurs, the global parameter files are already correct as they contain the proposed values - + } else { print("**** GLOBAL REJECT (Recording) ****") - + # File saving: If global reject occurs, remove "proposed" parameters from global files and instead replacing with the last accepted values - + # get filenames of last accepted files - old_global_files <- inference::create_filename_list(run_id=opt$run_id, - prefix=setup_prefix, + old_global_files <- inference::create_filename_list(run_id=opt$run_id, + prefix=setup_prefix, filepath_suffix=global_intermediate_filepath_suffix, - filename_prefix=slotblock_filename_prefix, + filename_prefix=slotblock_filename_prefix, index=last_accepted_index) - + # Update current global likelihood to last accepted one, and record some acceptance statistics - + # Replace current global files with last accepted values for (type in names(this_global_files)) { - file.copy(old_global_files[[type]],this_global_files[[type]], overwrite = TRUE) + file.copy(old_global_files[[type]],this_global_files[[type]], overwrite = TRUE) } - + #acceptance probability for this iteration - this_accept_prob <- exp(min(c(0, proposed_likelihood_total - global_current_likelihood_total))) - + this_accept_prob <- exp(min(c(0, proposed_likelihood_total - global_current_likelihood_total))) + #NOTE: Don't technically need the next 2 lines, as the values saved to memory are last accepted values, but confusing to track these variable names if we skip this - global_current_likelihood_data <- arrow::read_parquet(this_global_files[['llik_filename']]) + global_current_likelihood_data <- flepicommon::read_parquet_with_check(this_global_files[['llik_filename']]) global_current_likelihood_total <- sum(global_current_likelihood_data$ll) - + global_current_likelihood_data$accept <- 0 # global acceptance decision (0/1), same for each geoID global_current_likelihood_data$accept_prob <- this_accept_prob - + } # Calculate more acceptance statistics for the global chain. Same value to each subpopulation effective_index <- (opt$this_block - 1) * opt$iterations_per_slot + this_index # index after all blocks - avg_global_accept_rate <- ((effective_index-1)*old_avg_global_accept_rate + global_accept)/(effective_index) + avg_global_accept_rate <- ((effective_index-1)*old_avg_global_accept_rate + global_accept)/(effective_index) global_current_likelihood_data$accept_avg <-avg_global_accept_rate # update running average acceptance probability old_avg_global_accept_rate <- avg_global_accept_rate # keep track, since old global likelihood data not kept in memory - - # print(paste("Average global acceptance rate: ",formatC(100*avg_global_accept_rate,digits=2,format="f"),"%")) - + + # print(paste("Average global acceptance rate: ",formatC(100*avg_global_accept_rate,digits=2,format="f"),"%")) + # Update global likelihood files arrow::write_parquet(global_current_likelihood_data, this_global_files[['llik_filename']]) # update likelihood saved to file ## Chimeric likelihood acceptance or rejection decisions (one round) --------------------------------------------------------------------------- - + if (!reset_chimeric_files) { # will make separate acceptance decision for each subpop - + # "Chimeric" means GeoID-specific print("Making chimeric acceptance decision") - + if (is.null(config$initial_conditions)){ initial_init <- NULL proposed_init <- NULL - } + } if (is.null(config$seeding)){ initial_seeding <- NULL proposed_seeding <- NULL } - + chimeric_acceptance_list <- inference::accept_reject_proposals( # need to rename this function!! init_orig = initial_init, init_prop = proposed_init, @@ -751,7 +752,7 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { orig_lls = chimeric_current_likelihood_data, prop_lls = proposed_likelihood_data ) - + # Update accepted parameters to start next simulation if (!is.null(config$initial_conditions)){ new_init <- chimeric_acceptance_list$init @@ -764,9 +765,9 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { new_snpi <- chimeric_acceptance_list$snpi new_hnpi <- chimeric_acceptance_list$hnpi chimeric_current_likelihood_data <- chimeric_acceptance_list$ll - + } else { # Proposed values were globally accepted and will be copied to chimeric - + print("Resetting chimeric values to global due to global acceptance") if (!is.null(config$initial_conditions)){ new_init <- proposed_init @@ -779,20 +780,20 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { new_snpi <- proposed_snpi new_hnpi <- proposed_hnpi chimeric_current_likelihood_data <- proposed_likelihood_data - + reset_chimeric_files <- FALSE - + chimeric_current_likelihood_data$accept <- 1 } - + # Calculate acceptance statistics of the chimeric chain - + effective_index <- (opt$this_block - 1) * opt$iterations_per_slot + this_index avg_chimeric_accept_rate <- ((effective_index - 1) * old_avg_chimeric_accept_rate + chimeric_current_likelihood_data$accept) / (effective_index) # running average acceptance rate chimeric_current_likelihood_data$accept_avg <- avg_chimeric_accept_rate chimeric_current_likelihood_data$accept_prob <- exp(min(c(0, proposed_likelihood_data$ll - chimeric_current_likelihood_data$ll))) #acceptance probability - old_avg_chimeric_accept_rate <- avg_chimeric_accept_rate - + old_avg_chimeric_accept_rate <- avg_chimeric_accept_rate + ## Write accepted chimeric parameters to file if (!is.null(config$seeding)){ readr::write_csv(new_seeding,this_chimeric_files[['seed_filename']]) @@ -805,10 +806,10 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { arrow::write_parquet(new_snpi,this_chimeric_files[['snpi_filename']]) arrow::write_parquet(new_hnpi,this_chimeric_files[['hnpi_filename']]) arrow::write_parquet(chimeric_current_likelihood_data, this_chimeric_files[['llik_filename']]) - + print(paste("Current accepted index is ",last_accepted_index)) - - + + # set initial values to start next iteration if (!is.null(config$initial_conditions)){ initial_init <- new_init @@ -820,7 +821,7 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { initial_hpar <- new_hpar initial_snpi <- new_snpi initial_hnpi <- new_hnpi - + # remove "new" and "proposed" values from memory rm(proposed_spar, proposed_hpar, proposed_snpi,proposed_hnpi) rm(new_spar, new_hpar, new_snpi,new_hnpi) @@ -832,14 +833,14 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { rm(proposed_seeding) rm(new_seeding) } - + endTimeCountEach=difftime(Sys.time(), startTimeCountEach, units = "secs") print(paste("Time to run MCMC iteration",this_index,"of slot",opt$this_slot," is ",formatC(endTimeCountEach,digits=2,format="f")," seconds")) - + # memory profiler to diagnose memory creep - + if (opt$memory_profiling){ - + if (this_index %% opt$memory_profiling_iters == 0 | this_index == 1){ tot_objs_ <- as.numeric(object.size(x=lapply(ls(all.names = TRUE), get)) * 9.31e-10) tot_mem_ <- sum(gc()[,2]) / 1000 @@ -855,27 +856,27 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { size = c(tot_mem_, tot_objs_), unit = c("Gb", "Gb"), .before = 1) - - this_global_memprofile <- inference::create_filename_list(run_id=opt$run_id, - prefix=setup_prefix, filepath_suffix=global_intermediate_filepath_suffix, - filename_prefix=slotblock_filename_prefix, index=this_index,types = "memprof", + + this_global_memprofile <- inference::create_filename_list(run_id=opt$run_id, + prefix=setup_prefix, filepath_suffix=global_intermediate_filepath_suffix, + filename_prefix=slotblock_filename_prefix, index=this_index,types = "memprof", extensions = "parquet") arrow::write_parquet(curr_obj_sizes, this_global_memprofile[['memprof_filename']]) rm(curr_obj_sizes) } - + } - + ## Run garbage collector to clear memory and prevent memory leakage # gc_after_a_number <- 1 ## # Garbage collection every 1 iteration if (this_index %% 1 == 0){ gc() } - + } - + # Ending this MCMC iteration - + # Create "final" files after MCMC chain is completed # Will fail if unsuccessful # moves the most recently globally accepted parameter values from global/intermediate file to global/final @@ -898,24 +899,24 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { slotblock_filename_prefix = slotblock_filename_prefix, slot_filename_prefix = slot_filename_prefix) if (!prod(unlist(cpy_res_chimeric))) {stop("File copy failed:", paste(unlist(cpy_res_chimeric),paste(names(cpy_res_chimeric),"|")))} - + #####Write currently accepted files to disk #NOTE: Don't understand why we write these files that don't have an iteration index #files of the form ../chimeric/intermediate/{slot}.{block}.{run_id}.{variable}.parquet output_chimeric_files <- inference::create_filename_list(run_id=opt$run_id,prefix=setup_prefix, filepath_suffix=chimeric_intermediate_filepath_suffix, filename_prefix=slot_filename_prefix, index=opt$this_block) #files of the form .../global/intermediate/{slot}.{block}.{run_id}.{variable}.parquet output_global_files <- inference::create_filename_list(run_id=opt$run_id,prefix=setup_prefix,filepath_suffix=global_intermediate_filepath_suffix,filename_prefix=slot_filename_prefix, index=opt$this_block) - + warning("Chimeric hosp and seir files not yet supported, just using the most recently generated file of each type") #files of the form .../global/intermediate/{slot}.{block}.{iteration}.{run_id}.{variable}.parquet this_index_global_files <- inference::create_filename_list(run_id=opt$run_id,prefix=setup_prefix, filepath_suffix=global_intermediate_filepath_suffix, filename_prefix=slotblock_filename_prefix, index=this_index) - + # copy files from most recent global to end of block chimeric?? file.copy(this_index_global_files[['hosp_filename']],output_chimeric_files[['hosp_filename']]) file.copy(this_index_global_files[['seir_filename']],output_chimeric_files[['seir_filename']]) - + endTimeCount=difftime(Sys.time(), startTimeCount, units = "secs") print(paste("Time to run all MCMC iterations of slot ",opt$this_slot," is ",formatC(endTimeCount,digits=2,format="f")," seconds")) - + } }