diff --git a/batch/AWS_inference_runner.sh b/batch/AWS_inference_runner.sh index 6fe4aeaef..c337cdb65 100755 --- a/batch/AWS_inference_runner.sh +++ b/batch/AWS_inference_runner.sh @@ -3,7 +3,7 @@ set -x # Expected environment variables from AWS Batch env -# S3_MODEL_DATA_PATH location in S3 with the code, data, and dvc pipeline to run +# S3_MODEL_PROJECT_PATH location in S3 with the code, data, and dvc pipeline to run # DVC_OUTPUTS the names of the directories with outputs to save in S3, separated by a space # SIMS_PER_JOB is the number of sims to run per job # JOB_NAME the name of the job @@ -40,7 +40,7 @@ aws configure set default.s3.multipart_chunksize 8MB # Copy the complete model + data package from S3 and # install the local R packages -aws s3 cp --quiet $S3_MODEL_DATA_PATH model_data.tar.gz +aws s3 cp --quiet $S3_MODEL_PROJECT_PATH model_data.tar.gz mkdir model_data tar -xzf model_data.tar.gz -C model_data # chadi: removed v(erbose) option here as it floods the log with data we have anyway from the s3 bucket cd model_data diff --git a/batch/AWS_postprocess_runner.sh b/batch/AWS_postprocess_runner.sh index be0ffbd57..271cc932e 100644 --- a/batch/AWS_postprocess_runner.sh +++ b/batch/AWS_postprocess_runner.sh @@ -3,7 +3,7 @@ set -x # Expected environment variables from AWS Batch env -# S3_MODEL_DATA_PATH location in S3 with the code, data, and dvc pipeline to run +# S3_MODEL_PROJECT_PATH location in S3 with the code, data, and dvc pipeline to run # DVC_OUTPUTS the names of the directories with outputs to save in S3, separated by a space # SIMS_PER_JOB is the number of sims to run per job # JOB_NAME the name of the job @@ -34,7 +34,7 @@ aws configure set default.s3.multipart_chunksize 8MB # Copy the complete model + data package from S3 and # install the local R packages -aws s3 cp --quiet $S3_MODEL_DATA_PATH model_data.tar.gz +aws s3 cp --quiet $S3_MODEL_PROJECT_PATH model_data.tar.gz mkdir model_data tar -xzf model_data.tar.gz -C model_data # chadi: removed v(erbose) option here as it floods the log with data we have anyway from the s3 bucket cd model_data diff --git a/batch/AWS_scenario_runner.sh b/batch/AWS_scenario_runner.sh index 26635f973..8e57ec000 100755 --- a/batch/AWS_scenario_runner.sh +++ b/batch/AWS_scenario_runner.sh @@ -3,7 +3,7 @@ set -x # Expected environment variables from AWS Batch env -# S3_MODEL_DATA_PATH location in S3 with the code, data, and dvc pipeline to run +# S3_MODEL_PROJECT_PATH location in S3 with the code, data, and dvc pipeline to run # DVC_TARGET the name of the dvc file in the model that should be reproduced locally. # DVC_OUTPUTS the names of the directories with outputs to save in S3, separated by a space # S3_RESULTS_PATH location in S3 to store the results @@ -24,7 +24,7 @@ aws configure set default.s3.multipart_chunksize 8MB # Copy the complete model + data package from S3 and # install the local R packages -aws s3 cp --quiet $S3_MODEL_DATA_PATH model_data.tar.gz +aws s3 cp --quiet $S3_MODEL_PROJECT_PATH model_data.tar.gz mkdir model_data tar -xvzf model_data.tar.gz -C model_data cd model_data diff --git a/batch/SLURM_inference_job.run b/batch/SLURM_inference_job.run index a5ed1d94e..7b100db2b 100644 --- a/batch/SLURM_inference_job.run +++ b/batch/SLURM_inference_job.run @@ -7,7 +7,7 @@ set -x -cd $DATA_PATH +cd $PROJECT_PATH FLEPI_SLOT_INDEX=${SLURM_ARRAY_TASK_ID} diff --git a/batch/inference_job_launcher.py b/batch/inference_job_launcher.py index 0eb83f0d0..5e7d072de 100755 --- a/batch/inference_job_launcher.py +++ b/batch/inference_job_launcher.py @@ -55,7 +55,7 @@ def user_confirmation(question="Continue?", default=False): "--data-path", "--data-path", "data_path", - envvar="DATA_PATH", + envvar="PROJECT_PATH", type=click.Path(exists=True), default=".", help="path to the data directory", @@ -673,12 +673,12 @@ def launch(self, job_name, config_file, seir_modifiers_scenarios, outcome_modifi ## TODO: check how each of these variables are used downstream base_env_vars = [ {"name": "BATCH_SYSTEM", "value": self.batch_system}, - {"name": "S3_MODEL_DATA_PATH", "value": f"s3://{self.s3_bucket}/{job_name}.tar.gz"}, + {"name": "S3_MODEL_PROJECT_PATH", "value": f"s3://{self.s3_bucket}/{job_name}.tar.gz"}, {"name": "DVC_OUTPUTS", "value": " ".join(self.outputs)}, {"name": "S3_RESULTS_PATH", "value": s3_results_path}, {"name": "FS_RESULTS_PATH", "value": fs_results_path}, {"name": "S3_UPLOAD", "value": str(self.s3_upload).lower()}, - {"name": "DATA_PATH", "value": str(self.data_path)}, + {"name": "PROJECT_PATH", "value": str(self.data_path)}, {"name": "FLEPI_PATH", "value": str(self.flepi_path)}, {"name": "CONFIG_PATH", "value": config_file}, {"name": "FLEPI_NUM_SLOTS", "value": str(self.num_jobs)}, diff --git a/batch/scenario_job.py b/batch/scenario_job.py index 3953e092d..1a7bb8f38 100755 --- a/batch/scenario_job.py +++ b/batch/scenario_job.py @@ -213,7 +213,7 @@ def launch_job_inner( results_path = f"s3://{s3_output_bucket}/{job_name}" env_vars = [ {"name": "CONFIG_PATH", "value": config_file}, - {"name": "S3_MODEL_DATA_PATH", "value": model_data_path}, + {"name": "S3_MODEL_PROJECT_PATH", "value": model_data_path}, {"name": "DVC_TARGET", "value": dvc_target}, {"name": "DVC_OUTPUTS", "value": " ".join(dvc_outputs)}, {"name": "S3_RESULTS_PATH", "value": results_path}, diff --git a/flepimop/R_packages/flepicommon/NAMESPACE b/flepimop/R_packages/flepicommon/NAMESPACE index 426c104b2..276edf955 100644 --- a/flepimop/R_packages/flepicommon/NAMESPACE +++ b/flepimop/R_packages/flepicommon/NAMESPACE @@ -24,6 +24,7 @@ export(load_config) export(load_geodata_file) export(prettyprint_optlist) export(read_file_of_type) +export(read_parquet_with_check) export(run_id) import(covidcast) import(doParallel) diff --git a/flepimop/R_packages/flepicommon/R/DataUtils.R b/flepimop/R_packages/flepicommon/R/DataUtils.R index 56b17017d..26cb4f6b9 100755 --- a/flepimop/R_packages/flepicommon/R/DataUtils.R +++ b/flepimop/R_packages/flepicommon/R/DataUtils.R @@ -38,7 +38,7 @@ load_geodata_file <- function(filename, if(state_name) { utils::data(fips_us_county, package = "flepicommon") # arrow::read_parquet("datasetup/usdata/fips_us_county.parquet") - geodata <- fips_us_county %>% + geodata <- fips_us_county %>% dplyr::distinct(state, state_name) %>% dplyr::rename(USPS = state) %>% dplyr::rename(state = state_name) %>% @@ -53,6 +53,31 @@ load_geodata_file <- function(filename, +#' Read parquet files with check for existence to understand errors +#' +#' @param file The file to read +#' +#' @return +#' @export +#' +#' @examples +read_parquet_with_check <- function(file){ + if(!file.exists(file)){ + stop(paste("File",file,"does not exist")) + } + arrow::read_parquet(file) +} + + + + + + + + + + + #' Depracated function that returns a function to read files of a specific type (or automatically detected type based on extension) #' @param extension The file extension to read files of @@ -113,11 +138,11 @@ read_file_of_type <- function(extension,...){ # ##' @importFrom cdlTools fips census2010FIPS stateNames # ##' # download_USAFacts_data <- function(filename, url, value_col_name, incl_unassigned = FALSE){ -# +# # dir.create(dirname(filename), showWarnings = FALSE, recursive = TRUE) # message(paste("Downloading", url, "to", filename)) # download.file(url, filename, "auto") -# +# # usafacts_data <- readr::read_csv(filename) # names(usafacts_data) <- stringr::str_to_lower(names(usafacts_data)) # usafacts_data <- dplyr::select(usafacts_data, -statefips,-`county name`) %>% # drop statefips columns @@ -141,13 +166,13 @@ read_file_of_type <- function(extension,...){ # date_func <- ifelse(any(grepl("^\\d\\d\\d\\d",col_names)),lubridate::ymd, lubridate::mdy) # usafacts_data <- tidyr::pivot_longer(usafacts_data, tidyselect::all_of(date_cols), names_to="Update", values_to=value_col_name) # usafacts_data <- dplyr::mutate(usafacts_data, Update=date_func(Update), FIPS=sprintf("%05d", FIPS)) -# +# # validation_date <- Sys.getenv("VALIDATION_DATE") # if ( validation_date != '' ) { # print(paste("(DataUtils.R) Limiting USAFacts data to:", validation_date, sep=" ")) # usafacts_data <- dplyr::filter(usafacts_data, Update < validation_date ) # } -# +# # return(usafacts_data) # } diff --git a/flepimop/R_packages/flepicommon/data/state_fips_abbr.rda b/flepimop/R_packages/flepicommon/data/state_fips_abbr.rda new file mode 100644 index 000000000..7e99ad47a Binary files /dev/null and b/flepimop/R_packages/flepicommon/data/state_fips_abbr.rda differ diff --git a/flepimop/R_packages/inference/R/inference_slot_runner_funcs.R b/flepimop/R_packages/inference/R/inference_slot_runner_funcs.R index 24a7835f9..9d01bf483 100644 --- a/flepimop/R_packages/inference/R/inference_slot_runner_funcs.R +++ b/flepimop/R_packages/inference/R/inference_slot_runner_funcs.R @@ -614,8 +614,6 @@ initialize_mcmc_first_block <- function( } - - ## initial conditions (init) if (!is.null(config$initial_conditions)){ @@ -633,19 +631,18 @@ initialize_mcmc_first_block <- function( } if (grepl(".csv", initial_init_file)){ initial_init <- readr::read_csv(initial_init_file) - config$initial_conditions$initial_conditions_file <- gsub(".csv", ".parquet", config$initial_conditions$initial_conditions_file) - arrow::write_parquet(initial_init, config$initial_conditions$initial_conditions_file) + arrow::write_parquet(initial_init, global_files[["init_filename"]]) + }else{ + err <- !(file.copy(initial_init_file, global_files[["init_filename"]])) + if (err != 0) { + stop("Could not copy initial conditions file") + } } - err <- !(file.copy(config$initial_conditions$initial_conditions_file, global_files[["init_filename"]])) - if (err != 0) { - stop("Could not copy initial conditions file") - } } else if (config$initial_conditions$method %in% c("InitialConditionsFolderDraw", "SetInitialConditionsFolderDraw")) { print("Initial conditions in inference has not been fully implemented yet for the 'folder draw' methods, and no copying to global or chimeric files is being done.") - if (is.null(config$initial_conditions$initial_file_type)) { stop("ERROR: Initial conditions file needs to be specified in the config under `initial_conditions:initial_conditions_file`") } @@ -654,15 +651,14 @@ initialize_mcmc_first_block <- function( if (!file.exists(initial_init_file)) { stop("ERROR: Initial conditions file specified but does not exist.") } - if (grepl(".csv", initial_init_file)){ + if (grepl(".csv", initial_init_file)){ initial_init <- readr::read_csv(initial_init_file) - initial_init_file <- gsub(".csv", ".parquet", initial_init_file) - arrow::write_parquet(initial_init, initial_init_file) - } - - err <- !(file.copy(initial_init_file, global_files[["init_filename"]])) - if (err != 0) { - stop("Could not copy initial conditions file") + arrow::write_parquet(initial_init, global_files[["init_filename"]]) + }else{ + err <- !(file.copy(initial_init_file, global_files[["init_filename"]])) + if (err != 0) { + stop("Could not copy initial conditions file") + } } } diff --git a/flepimop/gempyor_pkg/docs/integration_benchmark.ipynb b/flepimop/gempyor_pkg/docs/integration_benchmark.ipynb index ee936683b..9058802ba 100644 --- a/flepimop/gempyor_pkg/docs/integration_benchmark.ipynb +++ b/flepimop/gempyor_pkg/docs/integration_benchmark.ipynb @@ -31,14 +31,14 @@ "Run once\n", "```python\n", "export FLEPI_PATH=$(pwd)/flepiMoP\n", - "export DATA_PATH=$(pwd)/COVID19_USA\n", + "export PROJECT_PATH=$(pwd)/COVID19_USA\n", "conda activate covidSProd6\n", "cd $FLEPI_PATH\n", "Rscript build/local_install.R\n", "python setup.py develop --no-deps\n", "git lfs install\n", "git lfs pull\n", - "cd $DATA_PATH\n", + "cd $PROJECT_PATH\n", "git restore data/\n", "export CONFIG_PATH=config_smh_r11_optsev_highie_base_deathscases_blk1.yml\n", "Rscript $FLEPI_PATH/datasetup/build_US_setup.R\n", diff --git a/flepimop/gempyor_pkg/src/gempyor/seeding_ic.py b/flepimop/gempyor_pkg/src/gempyor/seeding_ic.py index 21f2fd3ea..fa2f3d162 100644 --- a/flepimop/gempyor_pkg/src/gempyor/seeding_ic.py +++ b/flepimop/gempyor_pkg/src/gempyor/seeding_ic.py @@ -482,7 +482,7 @@ def get_from_config(self, sim_id: int, setup) -> np.ndarray: else: raise ValueError( f"Initial Conditions: Could not set compartment {comp_name} (id: {comp_idx}) in subpop {pl} (id: {pl_idx}). The data from the init file is {states_pl}. \n \ - Use 'allow_missing_compartments' to default to 0 for compartments without initial conditions" + Use 'allow_missing_compartments' to default to 0 for compartments without initial conditions" ) if "rest" in str(ic_df_compartment_val).strip().lower(): rests.append([comp_idx, pl_idx]) diff --git a/flepimop/main_scripts/inference_slot.R b/flepimop/main_scripts/inference_slot.R index 07b5c7c28..5554052ee 100644 --- a/flepimop/main_scripts/inference_slot.R +++ b/flepimop/main_scripts/inference_slot.R @@ -1,6 +1,6 @@ # About -## This script runs a single slot (MCMC chain) of an inference run. It can be called directly, but is often called from inference_main.R if multiple slots are run. +## This script runs a single slot (MCMC chain) of an inference run. It can be called directly, but is often called from inference_main.R if multiple slots are run. # Run Options --------------------------------------------------------------------- @@ -27,7 +27,7 @@ required_packages <- c("dplyr", "magrittr", "xts", "zoo", "stringr") # There are multiple ways to specify options when inference_slot.R is run, which take the following precedence: # 1) (optional) options called along with the script at the command line (ie > Rscript inference_main.R -c my_config.yml) # 2) (optional) environmental variables set by the user (ie user could set > export CONFIG_PATH = ~/flepimop_sample/my_config.yml to not have t specify it each time the script is run) -# If neither are specified, then a default value is used, given by the second argument of Sys.getenv() commands below. +# If neither are specified, then a default value is used, given by the second argument of Sys.getenv() commands below. # *3) For some options, a default doesn't exist, and the value specified in the config will be used if the option is not specified at the command line or by an environmental variable (iterations_per_slot, slots) option_list = list( @@ -74,7 +74,7 @@ flepicommon::prettyprint_optlist(opt) # load Python to use via R -reticulate::use_python(Sys.which(opt$python), required = TRUE) +reticulate::use_python(Sys.which(opt$python), required = TRUE) # Load gempyor module gempyor <- reticulate::import("gempyor") @@ -252,9 +252,9 @@ if ("priors" %in% names(config$inference)) { # ~ WITH Inference ---------------------------------------------------- if (config$inference$do_inference){ - + ## Load ground truth data - + obs <- suppressMessages( readr::read_csv(config$inference$gt_data_path, col_types = readr::cols(date = readr::col_date(), @@ -264,18 +264,18 @@ if (config$inference$do_inference){ dplyr::filter(subpop %in% subpops_, date >= gt_start_date, date <= gt_end_date) %>% dplyr::right_join(tidyr::expand_grid(subpop = unique(.$subpop), date = unique(.$date))) %>% dplyr::mutate_if(is.numeric, dplyr::coalesce, 0) - + subpopnames <- unique(obs[[obs_subpop]]) - - + + ## Compute statistics - + ## for each subpopulation, processes the data as specified in config - finds data types of interest, aggregates by specified period (e.g week), deals with zeros and NAs, etc data_stats <- lapply( subpopnames, function(x) { df <- obs[obs[[obs_subpop]] == x, ] - inference::getStats( + inference::getStats( df, "date", "data_var", @@ -285,48 +285,48 @@ if (config$inference$do_inference){ ) }) %>% set_names(subpopnames) - - + + # function to calculate the likelihood when comparing simulation output (sim_hosp) to ground truth data likelihood_calculation_fun <- function(sim_hosp){ - + sim_hosp <- dplyr::filter(sim_hosp,sim_hosp$time >= min(obs$date),sim_hosp$time <= max(obs$date)) lhs <- unique(sim_hosp[[obs_subpop]]) rhs <- unique(names(data_stats)) all_locations <- rhs[rhs %in% lhs] - + ## No references to config$inference$statistics inference::aggregate_and_calc_loc_likelihoods( all_locations = all_locations, # technically different modeled_outcome = sim_hosp, # simulation output obs_subpop = obs_subpop, targets_config = config[["inference"]][["statistics"]], - obs = obs, + obs = obs, ground_truth_data = data_stats, hosp_file = first_global_files[['llik_filename']], hierarchical_stats = hierarchical_stats, defined_priors = defined_priors, geodata = geodata, - snpi = arrow::read_parquet(first_global_files[['snpi_filename']]), - hnpi = arrow::read_parquet(first_global_files[['hnpi_filename']]), - hpar = dplyr::mutate(arrow::read_parquet(first_global_files[['hpar_filename']]),parameter=paste(quantity,!!rlang::sym(obs_subpop),outcome,sep='_')), + snpi = flepicommon::read_parquet_with_check(first_global_files[['snpi_filename']]), + hnpi = flepicommon::read_parquet_with_check(first_global_files[['hnpi_filename']]), + hpar = dplyr::mutate(flepicommon::read_parquet_with_check(first_global_files[['hpar_filename']]),parameter=paste(quantity,!!rlang::sym(obs_subpop),outcome,sep='_')), start_date = gt_start_date, end_date = gt_end_date ) } print("Running WITH inference") - - + + # ~ WITHOUT Inference --------------------------------------------------- - + } else { - + subpopnames <- obs_subpop - + likelihood_calculation_fun <- function(sim_hosp){ - + all_locations <- unique(sim_hosp[[obs_subpop]]) - + ## No references to config$inference$statistics inference::aggregate_and_calc_loc_likelihoods( all_locations = all_locations, # technically different @@ -339,9 +339,10 @@ if (config$inference$do_inference){ hierarchical_stats = hierarchical_stats, defined_priors = defined_priors, geodata = geodata, - snpi = arrow::read_parquet(first_global_files[['snpi_filename']]), - hnpi = arrow::read_parquet(first_global_files[['hnpi_filename']]), - hpar = dplyr::mutate(arrow::read_parquet(first_global_files[['hpar_filename']]),parameter=paste(quantity,!!rlang::sym(obs_subpop),outcome,sep='_')), + snpi = flepicommon::read_parquet_with_check(first_global_files[['snpi_filename']]), + hnpi = flepicommon::read_parquet_with_check(first_global_files[['hnpi_filename']]), + hpar = dplyr::mutate(flepicommon::read_parquet_with_check(first_global_files[['hpar_filename']]), + parameter=paste(quantity,!!rlang::sym(obs_subpop),outcome,sep='_')), start_date = gt_start_date, end_date = gt_end_date ) @@ -363,23 +364,23 @@ if (!opt$reset_chimeric_on_accept) { } for(seir_modifiers_scenario in seir_modifiers_scenarios) { - + if (!is.null(config$seir_modifiers)){ print(paste0("Running seir modifier scenario: ", seir_modifiers_scenario)) } else { print(paste0("No seir modifier scenarios")) seir_modifiers_scenario <- NULL } - + for(outcome_modifiers_scenario in outcome_modifiers_scenarios) { - + if (!is.null(config$outcome_modifiers)){ print(paste0("Running outcome modifier scenario: ", outcome_modifiers_scenario)) } else { print(paste0("No outcome modifier scenarios")) outcome_modifiers_scenario <- NULL } - + # If no seir or outcome scenarios, instead pass py_none() to Gempyor (which assigns no value to the scenario) if (is.null(seir_modifiers_scenario)){ seir_modifiers_scenario <- reticulate::py_none() @@ -387,20 +388,20 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { if (is.null(outcome_modifiers_scenario)){ outcome_modifiers_scenario <- reticulate::py_none() } - + reset_chimeric_files <- FALSE # this turns on whenever a global acceptance occurs - + ## Set up first iteration of chain ---------- - + ### Create python simulator object - + # Create parts of filename pieces for simulation to save output with # flepicommon::create_prefix is roughly equivalent to paste(...) with some specific formatting rule chimeric_intermediate_filepath_suffix <- flepicommon::create_prefix(prefix="",'chimeric','intermediate',sep='/',trailing_separator='') global_intermediate_filepath_suffix <- flepicommon::create_prefix(prefix="",'global','intermediate',sep='/',trailing_separator='') slot_filename_prefix <- flepicommon::create_prefix(slot=list(opt$this_slot,"%09d"), sep='.', trailing_separator='.') slotblock_filename_prefix <- flepicommon::create_prefix(slot=list(opt$this_slot,"%09d"), block=list(opt$this_block,"%09d"), sep='.', trailing_separator='.') - + ## python configuration: build simulator model specified in config tryCatch({ gempyor_inference_runner <- gempyor$GempyorSimulator( @@ -421,8 +422,8 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { }) setup_prefix <- gempyor_inference_runner$modinf$get_setup_name() # file name piece of the form [config$name]_[seir_modifier_scenario]_[outcome_modifier_scenario] print("gempyor_inference_runner created successfully.") - - + + # Get names of files where output from the initial simulation will be saved ## {prefix}/{run_id}/{type}/{suffix}/{prefix}.{index = block-1}.{run_id}.{type}.{ext} ## N.B.: prefix should end in "{slot}." NOTE: Potential problem. Prefix is {slot}.{block} but then "index" includes block also?? @@ -432,12 +433,12 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { filename_prefix=slotblock_filename_prefix, index=opt$this_block - 1) first_chimeric_files <- inference::create_filename_list(run_id=opt$run_id, - prefix=setup_prefix, + prefix=setup_prefix, filepath_suffix=chimeric_intermediate_filepath_suffix, filename_prefix=slotblock_filename_prefix, index=opt$this_block - 1) - - + + print("RUNNING: MCMC initialization for the first block") # Output saved to files of the form {setup_prefix}/{run_id}/{type}/global/intermediate/{slotblock_filename_prefix}.(block-1).{run_id}.{type}.{ext} # also copied into the /chimeric/ version, which are referenced by first_global_files and first_chimeric_files @@ -446,71 +447,72 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { block = opt$this_block, setup_prefix = setup_prefix, global_intermediate_filepath_suffix = global_intermediate_filepath_suffix, - chimeric_intermediate_filepath_suffix = chimeric_intermediate_filepath_suffix, + chimeric_intermediate_filepath_suffix = chimeric_intermediate_filepath_suffix, filename_prefix = slotblock_filename_prefix, # might be wrong, maybe should just be slot_filename_prefix gempyor_inference_runner = gempyor_inference_runner, likelihood_calculation_function = likelihood_calculation_fun, is_resume = opt[['is-resume']] ) print("First MCMC block initialized successfully.") - + # So far no acceptances have occurred last_accepted_index <- 0 - + # Load files with the output of initialize_mcmc_first_block - + # load those files (chimeric currently identical to global) - initial_spar <- arrow::read_parquet(first_chimeric_files[['spar_filename']]) - initial_hpar <- arrow::read_parquet(first_chimeric_files[['hpar_filename']]) - initial_snpi <- arrow::read_parquet(first_chimeric_files[['snpi_filename']]) - initial_hnpi <- arrow::read_parquet(first_chimeric_files[['hnpi_filename']]) + initial_spar <- flepicommon::read_parquet_with_check(first_chimeric_files[['spar_filename']]) + initial_hpar <- flepicommon::read_parquet_with_check(first_chimeric_files[['hpar_filename']]) + initial_snpi <- flepicommon::read_parquet_with_check(first_chimeric_files[['snpi_filename']]) + initial_hnpi <- flepicommon::read_parquet_with_check(first_chimeric_files[['hnpi_filename']]) if (!is.null(config$initial_conditions)){ - initial_init <- arrow::read_parquet(first_chimeric_files[['init_filename']]) + initial_init <- flepicommon::read_parquet_with_check(first_chimeric_files[['init_filename']]) } if (!is.null(config$seeding)){ seeding_col_types <- NULL suppressMessages(initial_seeding <- readr::read_csv(first_chimeric_files[['seed_filename']], col_types=seeding_col_types)) - + if (opt$stoch_traj_flag) { initial_seeding$amount <- as.integer(round(initial_seeding$amount)) } }else{ initial_seeding <- NULL } - chimeric_current_likelihood_data <- arrow::read_parquet(first_chimeric_files[['llik_filename']]) - global_current_likelihood_data <- arrow::read_parquet(first_global_files[['llik_filename']]) # they are the same ... don't need to load both - + chimeric_current_likelihood_data <- flepicommon::read_parquet_with_check(first_chimeric_files[['llik_filename']]) + global_current_likelihood_data <- flepicommon::read_parquet_with_check(first_global_files[['llik_filename']]) # they are the same ... don't need to load both + + #####Get the full likelihood (WHY IS THIS A DATA FRAME) # Compute total loglik for each sim global_current_likelihood_total <- sum(global_current_likelihood_data$ll) - + #####LOOP NOTES ### this_index is the current MCMC iteration ### last_accepted_index is the index of the most recent globally accepted iternation - + startTimeCount=Sys.time() - + ## Loop over simulations in this block -------------------------------------------- - + # keep track of running average global acceptance rate, since old global likelihood data not kept in memory. Each geoID has same value for acceptance rate in global case, so we just take the 1st entry old_avg_global_accept_rate <- global_current_likelihood_data$accept_avg[1] old_avg_chimeric_accept_rate <- chimeric_current_likelihood_data$accept_avg - + for (this_index in seq_len(opt$iterations_per_slot)) { - + print(paste("Running iteration", this_index)) - + startTimeCountEach = Sys.time() - + ## Create filenames to save output from each iteration this_global_files <- inference::create_filename_list(run_id=opt$run_id, prefix = setup_prefix, filepath_suffix=global_intermediate_filepath_suffix, filename_prefix=slotblock_filename_prefix, index=this_index) this_chimeric_files <- inference::create_filename_list(run_id=opt$run_id, prefix = setup_prefix, filepath_suffix=chimeric_intermediate_filepath_suffix, filename_prefix=slotblock_filename_prefix, index=this_index) - + ### Perturb accepted parameters to get proposed parameters ---- - + # since the first iteration is accepted by default, we don't perturb it, so proposed = initial if ((opt$this_block == 1) && (last_accepted_index == 0)) { - + proposed_spar <- initial_spar proposed_hpar <- initial_hpar proposed_snpi <- initial_snpi @@ -521,19 +523,19 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { if (!is.null(config$seeding)){ proposed_seeding <- initial_seeding } - + }else{ # perturb each parameter type - + proposed_spar <- initial_spar # currently no function to perturb proposed_hpar <- inference::perturb_hpar(initial_hpar, config$outcomes$outcomes) # NOTE: Deprecated?? ?no scenarios possible right now? - + if (!is.null(config$seir_modifiers$modifiers)){ proposed_snpi <- inference::perturb_snpi(initial_snpi, config$seir_modifiers$modifiers) } if (!is.null(config$outcome_modifiers$modifiers)){ proposed_hnpi <- inference::perturb_hnpi(initial_hnpi, config$outcome_modifiers$modifiers) } - + if (!is.null(config$seeding)){ proposed_seeding <- inference::perturb_seeding( seeding = initial_seeding, @@ -552,10 +554,10 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { proposed_init <- initial_init } } - + } - - # Write proposed parameters to files for other code to read. + + # Write proposed parameters to files for other code to read. # Temporarily stored in global files, which are eventually overwritten with global accepted values arrow::write_parquet(proposed_spar,this_global_files[['spar_filename']]) arrow::write_parquet(proposed_hpar,this_global_files[['hpar_filename']]) @@ -567,9 +569,9 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { if (!is.null(config$initial_conditions)){ arrow::write_parquet(proposed_init, this_global_files[['init_filename']]) } - + ## Run the simulator with proposed parameters ------------------- - + # create simulator tryCatch({ gempyor_inference_runner$one_simulation( @@ -582,12 +584,11 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { for(m in reticulate::py_last_error()) cat(m) stop("GempyorSimulator failed to run... stopping") }) - + # run if (config$inference$do_inference){ sim_hosp <- flepicommon::read_file_of_type(gsub(".*[.]","",this_global_files[['hosp_filename']]))(this_global_files[['hosp_filename']]) %>% dplyr::filter(date >= min(obs$date),date <= max(obs$date)) - lhs <- unique(sim_hosp[[obs_subpop]]) rhs <- unique(names(data_stats)) all_locations <- rhs[rhs %in% lhs] @@ -597,7 +598,7 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { obs <- sim_hosp data_stats <- sim_hosp } - + ## Compare model output to data and calculate likelihood ---- proposed_likelihood_data <- inference::aggregate_and_calc_loc_likelihoods( all_locations = all_locations, @@ -619,12 +620,12 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { start_date = gt_start_date, end_date = gt_end_date ) - + rm(sim_hosp) - + # write proposed likelihood to global file arrow::write_parquet(proposed_likelihood_data, this_global_files[['llik_filename']]) - + ## UNCOMMENT TO DEBUG # print('current global likelihood') # print(global_current_likelihood_data) @@ -632,113 +633,113 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { # print(chimeric_current_likelihood_data) #print('proposed likelihood') #print(proposed_likelihood_data) - + ## Compute total loglik for each sim proposed_likelihood_total <- sum(proposed_likelihood_data$ll) ## For logging print(paste("Current likelihood",formatC(global_current_likelihood_total,digits=2,format="f"),"Proposed likelihood", formatC(proposed_likelihood_total,digits=2,format="f"))) - - + + ## Global likelihood acceptance or rejection decision ----------- - - # Compare total likelihood (product of all subpopulations) in current vs proposed likelihood. + + # Compare total likelihood (product of all subpopulations) in current vs proposed likelihood. # Accept if MCMC acceptance decision = 1 or it's the first iteration of the first block # note - we already have a catch for the first block thing earlier (we set proposed = initial likelihood) - shouldn't need 2! global_accept <- ifelse( #same value for all subpopulations - inference::iterateAccept(global_current_likelihood_total, proposed_likelihood_total) || + inference::iterateAccept(global_current_likelihood_total, proposed_likelihood_total) || ((last_accepted_index == 0) && (opt$this_block == 1)),1,0 ) - + # only do global accept if all subpopulations accepted? - if (global_accept == 1 | config$inference$do_inference == FALSE) { - + if (global_accept == 1 | config$inference$do_inference == FALSE) { + print("**** GLOBAL ACCEPT (Recording) ****") - + if ((opt$this_block == 1) && (last_accepted_index == 0)) { print("by default because it's the first iteration of a block 1") } else { gempyor_inference_runner$write_last_seir(sim_id2write=this_index) } - + # Update the index of the most recent globally accepted parameters last_accepted_index <- this_index - + if (opt$reset_chimeric_on_accept) { reset_chimeric_files <- TRUE # triggers globally accepted parameters to push back to chimeric } - + # Update current global likelihood to proposed likelihood and record some acceptance statistics - + #acceptance probability for this iteration - this_accept_prob <- exp(min(c(0, proposed_likelihood_total - global_current_likelihood_total))) - + this_accept_prob <- exp(min(c(0, proposed_likelihood_total - global_current_likelihood_total))) + global_current_likelihood_data <- proposed_likelihood_data # this is used for next iteration global_current_likelihood_total <- proposed_likelihood_total # this is used for next iteration - + global_current_likelihood_data$accept <- 1 # global acceptance decision (0/1), same for each geoID global_current_likelihood_data$accept_prob <- this_accept_prob - + # File saving: If global accept occurs, the global parameter files are already correct as they contain the proposed values - + } else { print("**** GLOBAL REJECT (Recording) ****") - + # File saving: If global reject occurs, remove "proposed" parameters from global files and instead replacing with the last accepted values - + # get filenames of last accepted files - old_global_files <- inference::create_filename_list(run_id=opt$run_id, - prefix=setup_prefix, + old_global_files <- inference::create_filename_list(run_id=opt$run_id, + prefix=setup_prefix, filepath_suffix=global_intermediate_filepath_suffix, - filename_prefix=slotblock_filename_prefix, + filename_prefix=slotblock_filename_prefix, index=last_accepted_index) - + # Update current global likelihood to last accepted one, and record some acceptance statistics - + # Replace current global files with last accepted values for (type in names(this_global_files)) { - file.copy(old_global_files[[type]],this_global_files[[type]], overwrite = TRUE) + file.copy(old_global_files[[type]],this_global_files[[type]], overwrite = TRUE) } - + #acceptance probability for this iteration - this_accept_prob <- exp(min(c(0, proposed_likelihood_total - global_current_likelihood_total))) - + this_accept_prob <- exp(min(c(0, proposed_likelihood_total - global_current_likelihood_total))) + #NOTE: Don't technically need the next 2 lines, as the values saved to memory are last accepted values, but confusing to track these variable names if we skip this - global_current_likelihood_data <- arrow::read_parquet(this_global_files[['llik_filename']]) + global_current_likelihood_data <- flepicommon::read_parquet_with_check(this_global_files[['llik_filename']]) global_current_likelihood_total <- sum(global_current_likelihood_data$ll) - + global_current_likelihood_data$accept <- 0 # global acceptance decision (0/1), same for each geoID global_current_likelihood_data$accept_prob <- this_accept_prob - + } # Calculate more acceptance statistics for the global chain. Same value to each subpopulation effective_index <- (opt$this_block - 1) * opt$iterations_per_slot + this_index # index after all blocks - avg_global_accept_rate <- ((effective_index-1)*old_avg_global_accept_rate + global_accept)/(effective_index) + avg_global_accept_rate <- ((effective_index-1)*old_avg_global_accept_rate + global_accept)/(effective_index) global_current_likelihood_data$accept_avg <-avg_global_accept_rate # update running average acceptance probability old_avg_global_accept_rate <- avg_global_accept_rate # keep track, since old global likelihood data not kept in memory - - # print(paste("Average global acceptance rate: ",formatC(100*avg_global_accept_rate,digits=2,format="f"),"%")) - + + # print(paste("Average global acceptance rate: ",formatC(100*avg_global_accept_rate,digits=2,format="f"),"%")) + # Update global likelihood files arrow::write_parquet(global_current_likelihood_data, this_global_files[['llik_filename']]) # update likelihood saved to file ## Chimeric likelihood acceptance or rejection decisions (one round) --------------------------------------------------------------------------- - + if (!reset_chimeric_files) { # will make separate acceptance decision for each subpop - + # "Chimeric" means GeoID-specific print("Making chimeric acceptance decision") - + if (is.null(config$initial_conditions)){ initial_init <- NULL proposed_init <- NULL - } + } if (is.null(config$seeding)){ initial_seeding <- NULL proposed_seeding <- NULL } - + chimeric_acceptance_list <- inference::accept_reject_proposals( # need to rename this function!! init_orig = initial_init, init_prop = proposed_init, @@ -753,7 +754,7 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { orig_lls = chimeric_current_likelihood_data, prop_lls = proposed_likelihood_data ) - + # Update accepted parameters to start next simulation if (!is.null(config$initial_conditions)){ new_init <- chimeric_acceptance_list$init @@ -766,9 +767,9 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { new_snpi <- chimeric_acceptance_list$snpi new_hnpi <- chimeric_acceptance_list$hnpi chimeric_current_likelihood_data <- chimeric_acceptance_list$ll - + } else { # Proposed values were globally accepted and will be copied to chimeric - + print("Resetting chimeric values to global due to global acceptance") if (!is.null(config$initial_conditions)){ new_init <- proposed_init @@ -781,20 +782,20 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { new_snpi <- proposed_snpi new_hnpi <- proposed_hnpi chimeric_current_likelihood_data <- proposed_likelihood_data - + reset_chimeric_files <- FALSE - + chimeric_current_likelihood_data$accept <- 1 } - + # Calculate acceptance statistics of the chimeric chain - + effective_index <- (opt$this_block - 1) * opt$iterations_per_slot + this_index avg_chimeric_accept_rate <- ((effective_index - 1) * old_avg_chimeric_accept_rate + chimeric_current_likelihood_data$accept) / (effective_index) # running average acceptance rate chimeric_current_likelihood_data$accept_avg <- avg_chimeric_accept_rate chimeric_current_likelihood_data$accept_prob <- exp(min(c(0, proposed_likelihood_data$ll - chimeric_current_likelihood_data$ll))) #acceptance probability - old_avg_chimeric_accept_rate <- avg_chimeric_accept_rate - + old_avg_chimeric_accept_rate <- avg_chimeric_accept_rate + ## Write accepted chimeric parameters to file if (!is.null(config$seeding)){ readr::write_csv(new_seeding,this_chimeric_files[['seed_filename']]) @@ -807,10 +808,10 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { arrow::write_parquet(new_snpi,this_chimeric_files[['snpi_filename']]) arrow::write_parquet(new_hnpi,this_chimeric_files[['hnpi_filename']]) arrow::write_parquet(chimeric_current_likelihood_data, this_chimeric_files[['llik_filename']]) - + print(paste("Current accepted index is ",last_accepted_index)) - - + + # set initial values to start next iteration if (!is.null(config$initial_conditions)){ initial_init <- new_init @@ -822,7 +823,7 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { initial_hpar <- new_hpar initial_snpi <- new_snpi initial_hnpi <- new_hnpi - + # remove "new" and "proposed" values from memory rm(proposed_spar, proposed_hpar, proposed_snpi,proposed_hnpi) rm(new_spar, new_hpar, new_snpi,new_hnpi) @@ -834,14 +835,14 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { rm(proposed_seeding) rm(new_seeding) } - + endTimeCountEach=difftime(Sys.time(), startTimeCountEach, units = "secs") print(paste("Time to run MCMC iteration",this_index,"of slot",opt$this_slot," is ",formatC(endTimeCountEach,digits=2,format="f")," seconds")) - + # memory profiler to diagnose memory creep - + if (opt$memory_profiling){ - + if (this_index %% opt$memory_profiling_iters == 0 | this_index == 1){ tot_objs_ <- as.numeric(object.size(x=lapply(ls(all.names = TRUE), get)) * 9.31e-10) tot_mem_ <- sum(gc()[,2]) / 1000 @@ -857,27 +858,27 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { size = c(tot_mem_, tot_objs_), unit = c("Gb", "Gb"), .before = 1) - - this_global_memprofile <- inference::create_filename_list(run_id=opt$run_id, - prefix=setup_prefix, filepath_suffix=global_intermediate_filepath_suffix, - filename_prefix=slotblock_filename_prefix, index=this_index,types = "memprof", + + this_global_memprofile <- inference::create_filename_list(run_id=opt$run_id, + prefix=setup_prefix, filepath_suffix=global_intermediate_filepath_suffix, + filename_prefix=slotblock_filename_prefix, index=this_index,types = "memprof", extensions = "parquet") arrow::write_parquet(curr_obj_sizes, this_global_memprofile[['memprof_filename']]) rm(curr_obj_sizes) } - + } - + ## Run garbage collector to clear memory and prevent memory leakage # gc_after_a_number <- 1 ## # Garbage collection every 1 iteration if (this_index %% 1 == 0){ gc() } - + } - + # Ending this MCMC iteration - + # Create "final" files after MCMC chain is completed # Will fail if unsuccessful # moves the most recently globally accepted parameter values from global/intermediate file to global/final @@ -900,24 +901,24 @@ for(seir_modifiers_scenario in seir_modifiers_scenarios) { slotblock_filename_prefix = slotblock_filename_prefix, slot_filename_prefix = slot_filename_prefix) if (!prod(unlist(cpy_res_chimeric))) {stop("File copy failed:", paste(unlist(cpy_res_chimeric),paste(names(cpy_res_chimeric),"|")))} - + #####Write currently accepted files to disk #NOTE: Don't understand why we write these files that don't have an iteration index #files of the form ../chimeric/intermediate/{slot}.{block}.{run_id}.{variable}.parquet output_chimeric_files <- inference::create_filename_list(run_id=opt$run_id,prefix=setup_prefix, filepath_suffix=chimeric_intermediate_filepath_suffix, filename_prefix=slot_filename_prefix, index=opt$this_block) #files of the form .../global/intermediate/{slot}.{block}.{run_id}.{variable}.parquet output_global_files <- inference::create_filename_list(run_id=opt$run_id,prefix=setup_prefix,filepath_suffix=global_intermediate_filepath_suffix,filename_prefix=slot_filename_prefix, index=opt$this_block) - + warning("Chimeric hosp and seir files not yet supported, just using the most recently generated file of each type") #files of the form .../global/intermediate/{slot}.{block}.{iteration}.{run_id}.{variable}.parquet this_index_global_files <- inference::create_filename_list(run_id=opt$run_id,prefix=setup_prefix, filepath_suffix=global_intermediate_filepath_suffix, filename_prefix=slotblock_filename_prefix, index=this_index) - + # copy files from most recent global to end of block chimeric?? file.copy(this_index_global_files[['hosp_filename']],output_chimeric_files[['hosp_filename']]) file.copy(this_index_global_files[['seir_filename']],output_chimeric_files[['seir_filename']]) - + endTimeCount=difftime(Sys.time(), startTimeCount, units = "secs") print(paste("Time to run all MCMC iterations of slot ",opt$this_slot," is ",formatC(endTimeCount,digits=2,format="f")," seconds")) - + } } diff --git a/postprocessing/model_output_notebook.Rmd b/postprocessing/model_output_notebook.Rmd index 9b80b60c0..a09d6e019 100644 --- a/postprocessing/model_output_notebook.Rmd +++ b/postprocessing/model_output_notebook.Rmd @@ -8,7 +8,7 @@ output: number_sections: TRUE keep_tex: FALSE params: - opt: !r option_list = list(optparse::make_option(c("-c", "--config"), action="store", default=Sys.getenv("CONFIG_PATH", Sys.getenv("CONFIG_PATH")), type='character', help="path to the config file"), optparse::make_option(c("-d", "--data_path"), action="store", default=Sys.getenv("DATA_PATH", Sys.getenv("DATA_PATH")), type='character', help="path to the data repo"), optparse::make_option(c("-u","--run-id"), action="store", dest = "run_id", type='character', help="Unique identifier for this run", default = Sys.getenv("FLEPI_RUN_INDEX",flepicommon::run_id())), optparse::make_option(c("-R", "--results-path"), action="store", dest = "results_path", type='character', help="Path for model output", default = Sys.getenv("FS_RESULTS_PATH", Sys.getenv("FS_RESULTS_PATH")))) + opt: !r option_list = list(optparse::make_option(c("-c", "--config"), action="store", default=Sys.getenv("CONFIG_PATH", Sys.getenv("CONFIG_PATH")), type='character', help="path to the config file"), optparse::make_option(c("-d", "--data_path"), action="store", default=Sys.getenv("PROJECT_PATH", Sys.getenv("PROJECT_PATH")), type='character', help="path to the data repo"), optparse::make_option(c("-u","--run-id"), action="store", dest = "run_id", type='character', help="Unique identifier for this run", default = Sys.getenv("FLEPI_RUN_INDEX",flepicommon::run_id())), optparse::make_option(c("-R", "--results-path"), action="store", dest = "results_path", type='character', help="Path for model output", default = Sys.getenv("FS_RESULTS_PATH", Sys.getenv("FS_RESULTS_PATH")))) --- ```{r setup, include=FALSE} diff --git a/postprocessing/plot_predictions.R b/postprocessing/plot_predictions.R index c1f571c7a..5397966f5 100644 --- a/postprocessing/plot_predictions.R +++ b/postprocessing/plot_predictions.R @@ -23,9 +23,10 @@ proj_data <- data_comb # STATE DATA -------------------------------------------------------------- +utils::data(fips_us_county, package = "flepicommon") # State Data # -state_cw <- arrow::read_parquet("datasetup/usdata/fips_us_county.parquet") %>% +state_cw <- fips_us_county %>% dplyr::distinct(state, state_code) %>% dplyr::select(USPS = state, location = state_code) %>% dplyr::mutate(location = str_pad(location, 2, side = "left", pad = "0")) %>% diff --git a/postprocessing/run_sim_processing_SLURM.R b/postprocessing/run_sim_processing_SLURM.R index fc75bd0f7..085cecc47 100644 --- a/postprocessing/run_sim_processing_SLURM.R +++ b/postprocessing/run_sim_processing_SLURM.R @@ -13,7 +13,7 @@ options(readr.num_columns = 0) option_list = list( optparse::make_option(c("-c", "--config"), action="store", default=Sys.getenv("CONFIG_PATH", Sys.getenv("CONFIG_PATH")), type='character', help="path to the config file"), optparse::make_option(c("-u","--run-id"), action="store", dest = "run_id", type='character', help="Unique identifier for this run", default = Sys.getenv("FLEPI_RUN_INDEX",covidcommon::run_id())), - optparse::make_option(c("-d", "--data-path"), action="store", dest = "data_path", default=Sys.getenv("DATA_PATH", Sys.getenv("DATA_PATH")), type='character', help="path to data repo"), + optparse::make_option(c("-d", "--data-path"), action="store", dest = "data_path", default=Sys.getenv("PROJECT_PATH", Sys.getenv("PROJECT_PATH")), type='character', help="path to data repo"), optparse::make_option(c("-r","--run-processing"), action="store", dest = "run_processing", default=Sys.getenv("PROCESS",FALSE), type='logical', help = "Process the run if true"), optparse::make_option(c("-P", "--results-path"), action="store", dest = "results_path", type='character', help="Path for model output", default = Sys.getenv("FS_RESULTS_PATH", Sys.getenv("FS_RESULTS_PATH"))), optparse::make_option(c("-F","--full-fit"), action="store", dest = "full_fit", default=Sys.getenv("FULL_FIT",FALSE), type='logical', help = "Process full fit"), @@ -37,7 +37,7 @@ if(opt$config == ""){ if(opt$data_path == ""){ optparse::print_help(parser) stop(paste( - "Please specify a data path -d option or DATA_PATH environment variable." + "Please specify a data path -d option or PROJECT_PATH environment variable." )) } diff --git a/postprocessing/sim_processing_source.R b/postprocessing/sim_processing_source.R index 9997d3822..001cdec08 100644 --- a/postprocessing/sim_processing_source.R +++ b/postprocessing/sim_processing_source.R @@ -59,7 +59,7 @@ combine_and_format_sims <- function(outcome_vars = "incid", # pull out just the total outcomes of interest cols_aggr <- expand_grid(a="incid",b=outcomes_) %>% mutate(d=paste0(a,b)) %>% pull(d) cols_aggr <- cols_aggr[cols_aggr %in% colnames(res_subpop_all)] - cols_aggr <- "incidH_14to15" + if(!keep_all_compartments & !keep_variant_compartments & !keep_vacc_compartments){ res_subpop_all <- res_subpop_all %>% # select(time, subpop, outcome_modifiers_scenario, sim_num, all_of(cols_aggr)) @@ -591,14 +591,15 @@ reichify_cum_ests <- function(cum_ests, cum_var="cumH", point_est=0.5, opt){ outcome_short <- recode(cum_var, "cumI"="inf", "cumC"="case", "cumH"="hosp", "cumD"="death") - + + utils::data(state_fips_abbr, package = "flepicommon") cum_ests <- cum_ests %>% filter(quantile!="data") %>% filter(time>opt$forecast_date) %>% mutate(forecast_date=opt$forecast_date) %>% rename(target_end_date=time) %>% dplyr::select(-location) %>% - dplyr::left_join(arrow::read_parquet("datasetup/usdata/state_fips_abbr.parquet")) %>% + dplyr::left_join(state_fips_abbr) %>% mutate(location = ifelse(USPS=="US", "US", location)) %>% mutate(location=stringr::str_pad(location, width=2, side="left", pad="0")) %>% rename(value=!!sym(cum_var)) %>% @@ -668,11 +669,13 @@ get_weekly_incid <- function(res_state, outcomes){ reichify_inc_ests <- function(weekly_inc_outcome, opt){ + utils::data(state_fips_abbr, package = "flepicommon") + weekly_inc_outcome <- weekly_inc_outcome %>% pivot_wider(names_from = quantile, names_prefix = "quant_", values_from = outcome) %>% mutate(forecast_date=opt$forecast_date) %>% rename(target_end_date=time) %>% - dplyr::left_join(arrow::read_parquet("datasetup/usdata/state_fips_abbr.parquet")) %>% + dplyr::left_join(state_fips_abbr) %>% mutate(location=stringr::str_pad(location, width=2, side="left", pad="0")) %>% mutate(ahead=round(as.numeric(target_end_date - forecast_date)/7)) %>% mutate(target = recode(outcome_name, "incidI"="inf", "incidC"="case", "incidH"="hosp", "incidD"="death")) %>% @@ -706,12 +709,12 @@ format_daily_outcomes <- function(daily_inc_outcome, point_est=0.5, opt){ if (cum_outcomes){ daily_inc_outcome <- daily_inc_outcome %>% mutate(outcome_name = gsub("cum", "incid", outcome_name)) } - + utils::data(state_fips_abbr, package = "flepicommon") daily_inc_outcome <- daily_inc_outcome %>% pivot_wider(names_from = quantile, names_prefix = "quant_", values_from = outcome) %>% mutate(forecast_date = opt$forecast_date) %>% rename(target_end_date = time) %>% - dplyr::left_join(arrow::read_parquet("datasetup/usdata/state_fips_abbr.parquet")) %>% + dplyr::left_join(state_fips_abbr) %>% mutate(location=stringr::str_pad(location, width=2, side="left", pad="0")) %>% mutate(ahead = round(as.numeric(target_end_date - forecast_date))) %>% mutate(target = recode(outcome_name, "incidI"="inf", "incidC"="case", "incidH"="hosp", "incidD"="death")) %>% @@ -753,11 +756,12 @@ format_weekly_outcomes <- function(weekly_inc_outcome, point_est=0.5, opt){ weekly_inc_outcome <- weekly_inc_outcome %>% mutate(outcome_name = gsub("cum", "incid", outcome_name)) } + utils::data(state_fips_abbr, package = "flepicommon") weekly_inc_outcome <- weekly_inc_outcome %>% pivot_wider(names_from = quantile, names_prefix = "quant_", values_from = outcome) %>% mutate(forecast_date=opt$forecast_date) %>% rename(target_end_date=time) %>% - dplyr::left_join(arrow::read_parquet("datasetup/usdata/state_fips_abbr.parquet")) %>% + dplyr::left_join(state_fips_abbr) %>% mutate(location=stringr::str_pad(location, width=2, side="left", pad="0")) %>% mutate(ahead=round(as.numeric(target_end_date - forecast_date)/7)) %>% mutate(target = recode(outcome_name, "incidI"="inf", "incidC"="case", "incidH"="hosp", "incidD"="death")) %>% @@ -810,11 +814,13 @@ get_weekly_incid2 <- function(res_state, point_est=0.5, outcome_var="incidI", op if(opt$reichify) { + utils::data(state_fips_abbr, package = "flepicommon") + weekly_inc_outcome <- weekly_inc_outcome %>% pivot_wider(names_from = quantile, names_prefix = "quant_", values_from = !!sym(outcome_var)) %>% mutate(forecast_date=opt$forecast_date) %>% rename(target_end_date=time) %>% - dplyr::left_join(arrow::read_parquet("datasetup/usdata/state_fips_abbr.parquet")) %>% + dplyr::left_join(state_fips_abbr) %>% mutate(location=stringr::str_pad(location, width=2, side="left", pad="0")) %>% mutate(ahead=round(as.numeric(target_end_date - forecast_date)/7))%>% mutate(target=sprintf(paste0("%d wk ahead inc ", outcome_short), ahead)) %>% @@ -1503,7 +1509,8 @@ process_sims <- function( # SAVE REPLICATES ----------------------------------------------- if (save_reps) { - + + utils::data(state_fips_abbr, package = "flepicommon") weekly_reps <- weekly_incid_sims %>% mutate(time = lubridate::as_date(time)) %>% # filter(time >= lubridate::as_date(projection_date) & time <= lubridate::as_date(end_date)) %>% @@ -1514,7 +1521,7 @@ process_sims <- function( scenario_id = scenario_id, scenario_name=scenario_name) %>% mutate(model_projection_date=opt$forecast_date) %>% rename(target_end_date=time) %>% - dplyr::left_join(arrow::read_parquet("datasetup/usdata/state_fips_abbr.parquet")) %>% + dplyr::left_join(state_fips_abbr) %>% mutate(location=stringr::str_pad(location, width=2, side="left", pad="0")) %>% mutate(ahead=round(as.numeric(target_end_date - model_projection_date)/7)) %>% mutate(target = recode(outcome_name, "incidI"="inf", "incidC"="case", "incidH"="hosp", "incidD"="death")) %>% @@ -1561,6 +1568,7 @@ process_sims <- function( mutate(cum_peak_prob = cumsum(prob_peak)) %>% ungroup() + utils::data(state_fips_abbr, package = "flepicommon") peak_timing <- peak_timing %>% mutate(time = lubridate::as_date(time)) %>% filter(time >= lubridate::as_date(projection_date) & time <= lubridate::as_date(end_date)) %>% @@ -1570,7 +1578,7 @@ process_sims <- function( scenario_id = scenario_id, scenario_name=scenario_name) %>% mutate(model_projection_date=opt$forecast_date) %>% rename(target_end_date=time) %>% - dplyr::left_join(arrow::read_parquet("datasetup/usdata/state_fips_abbr.parquet")) %>% + dplyr::left_join(state_fips_abbr) %>% mutate(location=stringr::str_pad(location, width=2, side="left", pad="0")) %>% mutate(ahead=round(as.numeric(target_end_date - model_projection_date)/7)) %>% mutate(target = recode(outcome_name, "incidI"="inf", "incidC"="case", "incidH"="hosp", "incidD"="death")) %>% @@ -1584,7 +1592,7 @@ process_sims <- function( location = USPS, value=cum_peak_prob, age_group) # PEAK SIZE - + utils::data(state_fips_abbr, package = "flepicommon") peak_size <- weekly_incid_sims %>% filter(outcome_name=="incidH") %>% group_by(USPS, sim_num, outcome_name) %>% @@ -1598,7 +1606,7 @@ process_sims <- function( unnest(x) %>% pivot_wider(names_from = quantile, names_prefix = "quant_", values_from = outcome) %>% mutate(forecast_date=opt$forecast_date) %>% - dplyr::left_join(arrow::read_parquet("datasetup/usdata/state_fips_abbr.parquet")) %>% + dplyr::left_join(state_fips_abbr) %>% mutate(location=stringr::str_pad(location, width=2, side="left", pad="0")) %>% mutate(target = recode(outcome_name, "incidI"="inf", "incidC"="case", "incidH"="hosp", "incidD"="death")) %>% mutate(target = paste0("peak size ", target)) %>%