Skip to content

Commit

Permalink
Merge pull request #227 from HopkinsIDD/add-examples
Browse files Browse the repository at this point in the history
add the flepiMoP sample repository and run integrations against it
  • Loading branch information
jcblemai authored Jun 7, 2024
2 parents 47b7c85 + 51f5c39 commit 798ca1a
Show file tree
Hide file tree
Showing 110 changed files with 28,503 additions and 1,928 deletions.
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ jobs:
cd flepimop/gempyor_pkg
pytest -s
shell: bash
- name: Run gempyor-cli integration tests from examples
run: |
source /var/python/3.10/virtualenv/bin/activate
cd examples
pytest -s
shell: bash
- name: Run flepicommon tests
run: |
setwd("flepimop/R_packages/flepicommon")
Expand Down
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ model_output/
.Rhistory
.Rapp.history

*.pdf
*.h5

# Session Data files
.RData

Expand Down
5 changes: 3 additions & 2 deletions batch/SLURM_inference_job.run
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ if [[ $FLEPI_CONTINUATION == "TRUE" ]]; then
extension='$extension'))")
# in filename is always a seir file
export IN_FILENAME=$(python -c "from gempyor import file_paths; print(file_paths.create_file_name(run_id='$FLEPI_CONTINUATION_RUN_ID',
prefix='$FLEPI_PREFIX/$FLEPI_CONTINUATION_RUN_ID'
inference_filepath_suffix='/global/final',
prefix='$FLEPI_PREFIX/$FLEPI_CONTINUATION_RUN_ID',
inference_filepath_suffix='global/final',
index=$FLEPI_SLOT_INDEX,
ftype='seir',
extension='$extension'))")
Expand All @@ -128,6 +128,7 @@ if [[ $FLEPI_CONTINUATION == "TRUE" ]]; then
else
echo "CONTINUATION: Could not copy file of type $filetype ($IN_FILENAME -> $INIT_FILENAME)"
fi
#Rscript R/scripts/init_R18_resume_highrisk.R --config $CONFIG_PATH --init_filename_raw $INIT_FILENAME --init_filename_post $INIT_FILENAME
#Rscript $FLEPI_PATH/flepimop/main_scripts/seir_init_immuneladder.R --res_config config_SMH_R17_noBoo_lowIE_phase2_blk2.yml
#Rscript $FLEPI_PATH/preprocessing/seir_init_immuneladder_r17phase3_preOm.R --res_config config_SMH_R17_noBoo_lowIE_phase2_blk2.yml
fi
Expand Down
44 changes: 17 additions & 27 deletions batch/inference_job_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from datetime import datetime, timezone, date
import yaml
from gempyor import file_paths
import gempyor.utils


def user_confirmation(question="Continue?", default=False):
Expand All @@ -36,7 +37,7 @@ def user_confirmation(question="Continue?", default=False):
@click.option(
"-c",
"--config",
"config_file",
"config_filepath",
envvar="CONFIG_PATH",
type=click.Path(exists=True),
required=True,
Expand Down Expand Up @@ -272,7 +273,7 @@ def user_confirmation(question="Continue?", default=False):
)
def launch_batch(
batch_system,
config_file,
config_filepath,
flepi_path,
data_path,
run_id,
Expand Down Expand Up @@ -301,7 +302,7 @@ def launch_batch(
continuation_run_id,
):
config = None
with open(config_file) as f:
with open(config_filepath) as f:
config = yaml.full_load(f)

# A unique name for this job run, based on the config name and current time
Expand All @@ -327,7 +328,7 @@ def launch_batch(
)
return 1
else:
print(f"WARNING: no inference section found in {config_file}!")
print(f"WARNING: no inference section found in {config_filepath}!")

if "s3://" in str(restart_from_location): # ugly hack: str because it might be None
restart_from_run_id = aws_countfiles_autodetect_runid(
Expand Down Expand Up @@ -402,7 +403,7 @@ def launch_batch(
if "scenarios" in config["outcome_modifiers"]:
outcome_modifiers_scenarios = config["outcome_modifiers"]["scenarios"]

handler.launch(job_name, config_file, seir_modifiers_scenarios, outcome_modifiers_scenarios)
handler.launch(job_name, config_filepath, seir_modifiers_scenarios, outcome_modifiers_scenarios)

# Set job_name as environmental variable so it can be pulled for pushing to git
os.environ["job_name"] = job_name
Expand Down Expand Up @@ -655,7 +656,7 @@ def save_file(self, source, destination, remove_source=False, prefix=""):
if remove_source:
os.remove(source)

def launch(self, job_name, config_file, seir_modifiers_scenarios, outcome_modifiers_scenarios):
def launch(self, job_name, config_filepath, seir_modifiers_scenarios, outcome_modifiers_scenarios):
s3_results_path = f"s3://{self.s3_bucket}/{job_name}"

if self.batch_system == "slurm":
Expand All @@ -682,7 +683,7 @@ def launch(self, job_name, config_file, seir_modifiers_scenarios, outcome_modifi
{"name": "S3_UPLOAD", "value": str(self.s3_upload).lower()},
{"name": "PROJECT_PATH", "value": str(self.data_path)},
{"name": "FLEPI_PATH", "value": str(self.flepi_path)},
{"name": "CONFIG_PATH", "value": config_file},
{"name": "CONFIG_PATH", "value": config_filepath},
{"name": "FLEPI_NUM_SLOTS", "value": str(self.num_jobs)},
{
"name": "FLEPI_MAX_STACK_SIZE",
Expand All @@ -703,7 +704,7 @@ def launch(self, job_name, config_file, seir_modifiers_scenarios, outcome_modifi
{"name": "FLEPI_MEM_PROF_ITERS", "value": str(os.getenv("FLEPI_MEM_PROF_ITERS", default="50"))},
{"name": "SLACK_CHANNEL", "value": str(self.slack_channel)},
]
with open(config_file) as f:
with open(config_filepath) as f:
config = yaml.full_load(f)

for ctr, (s, d) in enumerate(itertools.product(seir_modifiers_scenarios, outcome_modifiers_scenarios)):
Expand Down Expand Up @@ -798,31 +799,20 @@ def launch(self, job_name, config_file, seir_modifiers_scenarios, outcome_modifi
print("slurm command to be run >>>>>>>> ")
print(command)
print(" <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< ")
import shlex # using shlex to split the command because it's not obvious https://docs.python.org/3/library/subprocess.html#subprocess.Popen

sr = subprocess.Popen(shlex.split(command), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = sr.communicate()
if sr.returncode != 0:
print(f"sbatch command failed with returncode {sr.returncode}")
print("sbatch command failed with stdout and stderr:")
print("stdout: ", stdout)
print("stderr: ", stderr)
raise Exception("sbatch command failed")

returncode, stdout, stderr = gempyor.utils.command_safe_run(
command, command_name="sbatch", fail_on_fail=True
)
slurm_job_id = stdout.decode().split(" ")[-1][:-1]
print(f">>> SUCCESS SCHEDULING JOB. Slurm job id is {slurm_job_id}")

postprod_command = f"""sbatch {export_str} --dependency=afterany:{slurm_job_id} --mem={12000}M --time={60} --job-name=post-{cur_job_name} --output=log_postprod_{self.run_id}_{cur_job_name}.txt {os.path.dirname(os.path.realpath(__file__))}/SLURM_postprocess_runner.run"""
print("post-processing command to be run >>>>>>>> ")
print(postprod_command)
print(" <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< ")
sr = subprocess.Popen(shlex.split(postprod_command), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = sr.communicate()
if sr.returncode != 0:
print(f"sbatch command failed with returncode {sr.returncode}")
print("sbatch command failed with stdout and stderr:")
print("stdout: ", stdout)
print("stderr: ", stderr)
raise Exception("sbatch command failed")
returncode, stdout, stderr = gempyor.utils.command_safe_run(
postprod_command, command_name="sbatch postprod", fail_on_fail=True
)
postprod_job_id = stdout.decode().split(" ")[-1][:-1]
print(f">>> SUCCESS SCHEDULING POST-PROCESSING JOB. Slurm job id is {postprod_job_id}")

Expand Down Expand Up @@ -915,7 +905,7 @@ def launch(self, job_name, config_file, seir_modifiers_scenarios, outcome_modifi
if self.continuation:
print(f" >> Continuing from run id is {self.continuation_run_id} located in {self.continuation_location}")
print(f" >> Run id is {self.run_id}")
print(f" >> config is {config_file.split('/')[-1]}")
print(f" >> config is {config_filepath.split('/')[-1]}")
flepimop_branch = subprocess.getoutput(f"cd {self.flepi_path}; git rev-parse --abbrev-ref HEAD")
data_branch = subprocess.getoutput(f"cd {self.data_path}; git rev-parse --abbrev-ref HEAD")
data_hash = subprocess.getoutput(f"cd {self.data_path}; git rev-parse HEAD")
Expand Down
22 changes: 11 additions & 11 deletions batch/scenario_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
@click.option(
"-c",
"--config",
"config_file",
"config_filepath",
envvar="CONFIG_PATH",
type=click.Path(exists=True),
required=True,
Expand Down Expand Up @@ -109,7 +109,7 @@
help="The amount of RAM in megabytes needed per CPU running simulations",
)
def launch_batch(
config_file,
config_filepath,
num_jobs,
slots_per_job,
dvc_target,
Expand All @@ -122,26 +122,26 @@ def launch_batch(
memory,
):
config = None
with open(config_file) as f:
with open(config_filepath) as f:
config = yaml.full_load(f)

# A unique name for this job run, based on the config name and current time
job_name = f"{config['name']}-{int(time.time())}"

# Update and save the config file with the number of sims to run
print(f"Updating {config_file} to run {slots_per_job} slots...")
print(f"Updating {config_filepath} to run {slots_per_job} slots...")
config["nslots"] = slots_per_job

if parallelize_scenarios:
seir_modifiers_scenarios = config["seir_modifiers"]["scenarios"]
for s in seir_modifiers_scenarios:
seir_modifiers_scenario_job_name = f"{job_name}_{s}"
config["interventions"]["scenarios"] = [s]
with open(config_file, "w") as f:
with open(config_filepath, "w") as f:
yaml.dump(config, f, sort_keys=False)
launch_job_inner(
seir_modifiers_scenario_job_name,
config_file,
config_filepath,
num_jobs,
slots_per_job,
dvc_target,
Expand All @@ -153,14 +153,14 @@ def launch_batch(
memory,
)
config["interventions"]["scenarios"] = seir_modifiers_scenarios
with open(config_file, "w") as f:
with open(config_filepath, "w") as f:
yaml.dump(config, f, sort_keys=False)
else:
with open(config_file, "w") as f:
with open(config_filepath, "w") as f:
yaml.dump(config, f, sort_keys=False)
launch_job_inner(
job_name,
config_file,
config_filepath,
num_jobs,
slots_per_job,
dvc_target,
Expand All @@ -179,7 +179,7 @@ def launch_batch(

def launch_job_inner(
job_name,
config_file,
config_filepath,
num_jobs,
slots_per_job,
dvc_target,
Expand Down Expand Up @@ -212,7 +212,7 @@ def launch_job_inner(
model_data_path = f"s3://{s3_input_bucket}/{tarfile_name}"
results_path = f"s3://{s3_output_bucket}/{job_name}"
env_vars = [
{"name": "CONFIG_PATH", "value": config_file},
{"name": "CONFIG_PATH", "value": config_filepath},
{"name": "S3_MODEL_PROJECT_PATH", "value": model_data_path},
{"name": "DVC_TARGET", "value": dvc_target},
{"name": "DVC_OUTPUTS", "value": " ".join(dvc_outputs)},
Expand Down
6 changes: 6 additions & 0 deletions datasetup/build_covid_data.R
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,12 @@ us_data <- us_data %>%




# ~ Manually fix weird MAINE point -------------------------------------------------------------

# if source == ME , dates are between 2023-09-26 and 2023-09-30, set incidH to NA
us_data <- us_data %>% mutate(incidH = ifelse(source == "ME" & date >= "2023-09-26" & date <= "2023-09-30", NA, incidH))

# ~ Fix non-numeric -------------------------------------------------------------
# -- leave NAs so its not assuming an NA is a 0 and fitting to it

Expand Down
Loading

0 comments on commit 798ca1a

Please sign in to comment.