From c8fce036a951b1b47bc09ac2f5edc79d8a63c73b Mon Sep 17 00:00:00 2001 From: fang19911030 Date: Wed, 17 Apr 2024 14:37:38 -0400 Subject: [PATCH 01/13] add function to generate filename for resume files --- flepimop/gempyor_pkg/src/gempyor/utils.py | 21 +++++++++++++++++++ .../gempyor_pkg/tests/utils/test_utils.py | 14 +++++++++++++ 2 files changed, 35 insertions(+) diff --git a/flepimop/gempyor_pkg/src/gempyor/utils.py b/flepimop/gempyor_pkg/src/gempyor/utils.py index 905584cf5..9d515cf17 100644 --- a/flepimop/gempyor_pkg/src/gempyor/utils.py +++ b/flepimop/gempyor_pkg/src/gempyor/utils.py @@ -1,3 +1,4 @@ +import os import datetime import functools import numbers @@ -9,6 +10,7 @@ import scipy.stats import sympy.parsing.sympy_parser import logging +from gempyor import file_paths logger = logging.getLogger(__name__) @@ -287,3 +289,22 @@ def bash(command): print("------------") print(f"lsblk: {bash('lsblk')}") print("END AWS DIAGNOSIS ================================") + +def create_resume_out_filename(filetype: str, liketype: str) -> str: + run_id = os.environ.get("FLEPI_RUN_INDEX") + prefix = f"{os.environ.get("FLEPI_PREFIX")}/{os.environ.get("FLEPI_RUN_INDEX")}" + inference_filepath_suffix = f"{liketype}/intermidate" + FLEPI_SLOT_INDEX = int(os.environ.get("FLEPI_SLOT_INDEX")) + inference_filename_prefix='%09d.' % FLEPI_SLOT_INDEX + index='{:09d}.{:09d}'.format(1, int(os.environ.get("FLEPI_BLOCK_INDEX")-1)) + extension = "parquet" + if filetype == "seed": + extension = "csv" + return file_paths.create_file_name(run_id=run_id, + prefix=prefix, + inference_filename_prefix=inference_filename_prefix, + inference_filepath_suffix=inference_filepath_suffix, + index=index, + extension=extension) + + \ No newline at end of file diff --git a/flepimop/gempyor_pkg/tests/utils/test_utils.py b/flepimop/gempyor_pkg/tests/utils/test_utils.py index 694a7296f..1a1353009 100644 --- a/flepimop/gempyor_pkg/tests/utils/test_utils.py +++ b/flepimop/gempyor_pkg/tests/utils/test_utils.py @@ -90,3 +90,17 @@ def test_get_truncated_normal_success(): def test_get_log_normal_success(): utils.get_log_normal(meanlog=0, sdlog=1) + +def test_create_resume_out_filename(): + os.environ["FLEPI_RUN_INDEX"] = "123" + os.environ["FLEPI_PREFIX"] = "prefix" + os.environ["FLEPI_SLOT_INDEX"] = "2" + os.environ["FLEPI_BLOCK_INDEX"] = "2" + + expected_filename = "prefix/123/000000002./intermidate/000000001.000000001.parquet" + assert utils.create_resume_out_filename("spar", "like") == expected_filename + + expected_filename = "prefix/123/000000002./intermidate/000000001.000000001.csv" + assert utils.create_resume_out_filename("seed", "like") == expected_filename + + os.environ.clear() \ No newline at end of file From a63105ea4d148e728061c2d570e35c1855309c1e Mon Sep 17 00:00:00 2001 From: fang19911030 Date: Wed, 24 Apr 2024 10:37:21 -0400 Subject: [PATCH 02/13] fix functions and adding unit tests --- flepimop/gempyor_pkg/src/gempyor/utils.py | 21 +++++++-- .../gempyor_pkg/tests/utils/test_utils.py | 43 +++++++++++++------ 2 files changed, 48 insertions(+), 16 deletions(-) diff --git a/flepimop/gempyor_pkg/src/gempyor/utils.py b/flepimop/gempyor_pkg/src/gempyor/utils.py index 9d515cf17..f28487f0a 100644 --- a/flepimop/gempyor_pkg/src/gempyor/utils.py +++ b/flepimop/gempyor_pkg/src/gempyor/utils.py @@ -292,11 +292,11 @@ def bash(command): def create_resume_out_filename(filetype: str, liketype: str) -> str: run_id = os.environ.get("FLEPI_RUN_INDEX") - prefix = f"{os.environ.get("FLEPI_PREFIX")}/{os.environ.get("FLEPI_RUN_INDEX")}" + prefix = f"{os.environ.get('FLEPI_PREFIX')}/{os.environ.get('FLEPI_RUN_INDEX')}" inference_filepath_suffix = f"{liketype}/intermidate" FLEPI_SLOT_INDEX = int(os.environ.get("FLEPI_SLOT_INDEX")) inference_filename_prefix='%09d.' % FLEPI_SLOT_INDEX - index='{:09d}.{:09d}'.format(1, int(os.environ.get("FLEPI_BLOCK_INDEX")-1)) + index='{:09d}.{:09d}'.format(1, int(os.environ.get("FLEPI_BLOCK_INDEX"))-1) extension = "parquet" if filetype == "seed": extension = "csv" @@ -305,6 +305,21 @@ def create_resume_out_filename(filetype: str, liketype: str) -> str: inference_filename_prefix=inference_filename_prefix, inference_filepath_suffix=inference_filepath_suffix, index=index, + ftype=filetype, extension=extension) - \ No newline at end of file +def create_resume_input_filename(filetype: str, liketype: str) -> str: + run_id = os.environ.get("RESUME_RUN_INDEX") + prefix = f"{os.environ.get('FLEPI_PREFIX')}/{os.environ.get('RESUME_RUN_INDEX')}" + inference_filepath_suffix = f"{liketype}/final" + index = os.environ.get("FLEPI_SLOT_INDEX") + extension = "parquet" + if filetype == "seed": + extension = "csv" + return file_paths.create_file_name(run_id=run_id, + prefix=prefix, + inference_filepath_suffix=inference_filepath_suffix, + index=index, + ftype=filetype, + extension=extension) + \ No newline at end of file diff --git a/flepimop/gempyor_pkg/tests/utils/test_utils.py b/flepimop/gempyor_pkg/tests/utils/test_utils.py index 1a1353009..83597bbd5 100644 --- a/flepimop/gempyor_pkg/tests/utils/test_utils.py +++ b/flepimop/gempyor_pkg/tests/utils/test_utils.py @@ -1,5 +1,4 @@ import pytest -import datetime import os import pandas as pd @@ -10,8 +9,6 @@ from gempyor import utils DATA_DIR = os.path.dirname(__file__) + "/data" -# os.chdir(os.path.dirname(__file__)) - tmp_path = "/tmp" @@ -91,16 +88,36 @@ def test_get_truncated_normal_success(): def test_get_log_normal_success(): utils.get_log_normal(meanlog=0, sdlog=1) -def test_create_resume_out_filename(): - os.environ["FLEPI_RUN_INDEX"] = "123" - os.environ["FLEPI_PREFIX"] = "prefix" - os.environ["FLEPI_SLOT_INDEX"] = "2" - os.environ["FLEPI_BLOCK_INDEX"] = "2" - expected_filename = "prefix/123/000000002./intermidate/000000001.000000001.parquet" - assert utils.create_resume_out_filename("spar", "like") == expected_filename +@pytest.fixture +def env_vars(monkeypatch): + # Setting environment variables for the test + monkeypatch.setenv("RESUME_RUN_INDEX", "321") + monkeypatch.setenv("FLEPI_PREFIX", "output") + monkeypatch.setenv("FLEPI_SLOT_INDEX", "2") + monkeypatch.setenv("FLEPI_BLOCK_INDEX", "2") + monkeypatch.setenv("FLEPI_RUN_INDEX", "123") + + +def test_create_resume_out_filename(env_vars): + result = utils.create_resume_out_filename("spar", "global") + expected_filename = """model_output/output/123/spar/global/intermidate + /000000002.000000001.000000001.123.spar.parquet""" + assert result == expected_filename + + result2 = utils.create_resume_out_filename("seed", "chimeric") + expected_filename2 = """model_output/output/123/seed/chimeric/intermidate + /000000002.000000001.000000001.123.seed.csv'""" + assert result2 == expected_filename2 + + +def test_create_resume_input_filename(env_vars): - expected_filename = "prefix/123/000000002./intermidate/000000001.000000001.csv" - assert utils.create_resume_out_filename("seed", "like") == expected_filename + result = utils.create_resume_input_filename("spar", "global") + expect_filename = 'model_output/output/321/spar/global/final/000000002.321.spar.parquet' - os.environ.clear() \ No newline at end of file + assert result == expect_filename + + result2 = utils.create_resume_input_filename("seed", "chimeric") + expect_filename2 = 'model_output/output/321/seed/chimeric/final/000000002.321.seed.csv' + assert result2 == expect_filename2 From aca80bcabc856cee6937258d180b5b97e37c41a9 Mon Sep 17 00:00:00 2001 From: fang19911030 Date: Wed, 24 Apr 2024 14:38:40 -0400 Subject: [PATCH 03/13] add copy function --- flepimop/gempyor_pkg/src/gempyor/utils.py | 45 ++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/flepimop/gempyor_pkg/src/gempyor/utils.py b/flepimop/gempyor_pkg/src/gempyor/utils.py index f28487f0a..9af2156c3 100644 --- a/flepimop/gempyor_pkg/src/gempyor/utils.py +++ b/flepimop/gempyor_pkg/src/gempyor/utils.py @@ -9,6 +9,8 @@ import pyarrow as pa import scipy.stats import sympy.parsing.sympy_parser +import subprocess +import shutil import logging from gempyor import file_paths @@ -322,4 +324,45 @@ def create_resume_input_filename(filetype: str, liketype: str) -> str: index=index, ftype=filetype, extension=extension) - \ No newline at end of file + + +def copy_file_based_on_last_job_output(): + last_job_output = os.environ.get("LAST_JOB_OUTPUT") + resume_discard_seeding = os.environ.get("RESUME_DISCARD_SEEDING") + parquet_types = ["seed", "spar", "snpi", "hpar", "hnpi", "init"] + if resume_discard_seeding == "true": + parquet_types.remove("seed") + liketypes = ["global", "chimeric"] + file_name_map = dict() + + for filetype in parquet_types: + for liketype in liketypes: + input_file_name = create_resume_input_filename(filetype=filetype, liketype=liketype) + output_file_name = create_resume_out_filename(filetype=filetype, liketype=liketype) + file_name_map[input_file_name] = output_file_name + + if last_job_output.find("s3://") >= 0: + for in_filename in file_name_map: + command = ['aws', 's3', 'cp', '--quiet', last_job_output+"/"+in_filename, file_name_map[in_filename]] + try: + result = subprocess.run(command, check=True, stdout = subprocess.PIPE, + stderr = subprocess.PIPE) + print("Output:", result.stdout.decode()) + except subprocess.CalledProcessError as e: + print("Error: ", e.stderr.decode()) + else: + first_output_filename = next(iter(file_name_map.values())) + output_dir = os.path.dirname(first_output_filename) + if not os.path.exists(output_dir): + os.makedirs(output_dir) + for in_filename in file_name_map: + shutil.copy(os.path.join(last_job_output, in_filename), file_name_map[in_filename]) + + + for in_filename in file_name_map: + output_file_name = file_name_map[in_filename] + parquet_type = [ptype for ptype in parquet_types if ptype in output_file_name] + if os.path.exists(output_file_name): + print(f"Copy successful for file of type {parquet_type} {in_filename}->{output_file_name}") + else: + print(f"Could not copy file of type {parquet_type} {in_filename}->{output_file_name}") \ No newline at end of file From 727169cb79bba3e869676402e9466e0d221b15da Mon Sep 17 00:00:00 2001 From: fang19911030 Date: Mon, 29 Apr 2024 10:22:48 -0400 Subject: [PATCH 04/13] format change --- flepimop/gempyor_pkg/src/gempyor/utils.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/flepimop/gempyor_pkg/src/gempyor/utils.py b/flepimop/gempyor_pkg/src/gempyor/utils.py index 9af2156c3..3fc810183 100644 --- a/flepimop/gempyor_pkg/src/gempyor/utils.py +++ b/flepimop/gempyor_pkg/src/gempyor/utils.py @@ -334,19 +334,19 @@ def copy_file_based_on_last_job_output(): parquet_types.remove("seed") liketypes = ["global", "chimeric"] file_name_map = dict() - + for filetype in parquet_types: for liketype in liketypes: input_file_name = create_resume_input_filename(filetype=filetype, liketype=liketype) output_file_name = create_resume_out_filename(filetype=filetype, liketype=liketype) file_name_map[input_file_name] = output_file_name - + if last_job_output.find("s3://") >= 0: for in_filename in file_name_map: command = ['aws', 's3', 'cp', '--quiet', last_job_output+"/"+in_filename, file_name_map[in_filename]] try: - result = subprocess.run(command, check=True, stdout = subprocess.PIPE, - stderr = subprocess.PIPE) + result = subprocess.run(command, check=True, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) print("Output:", result.stdout.decode()) except subprocess.CalledProcessError as e: print("Error: ", e.stderr.decode()) @@ -358,11 +358,10 @@ def copy_file_based_on_last_job_output(): for in_filename in file_name_map: shutil.copy(os.path.join(last_job_output, in_filename), file_name_map[in_filename]) - for in_filename in file_name_map: output_file_name = file_name_map[in_filename] parquet_type = [ptype for ptype in parquet_types if ptype in output_file_name] if os.path.exists(output_file_name): print(f"Copy successful for file of type {parquet_type} {in_filename}->{output_file_name}") else: - print(f"Could not copy file of type {parquet_type} {in_filename}->{output_file_name}") \ No newline at end of file + print(f"Could not copy file of type {parquet_type} {in_filename}->{output_file_name}") From 2d35b8d1c9e4328423207d9f5488a3e97fe780b2 Mon Sep 17 00:00:00 2001 From: fang19911030 Date: Mon, 29 Apr 2024 14:18:14 -0400 Subject: [PATCH 05/13] add new function to set parquest types and add tests for it --- flepimop/gempyor_pkg/src/gempyor/utils.py | 33 ++++++++++++++++--- .../gempyor_pkg/tests/utils/test_utils.py | 27 ++++++++++++--- 2 files changed, 50 insertions(+), 10 deletions(-) diff --git a/flepimop/gempyor_pkg/src/gempyor/utils.py b/flepimop/gempyor_pkg/src/gempyor/utils.py index a8f3094ef..44e42463e 100644 --- a/flepimop/gempyor_pkg/src/gempyor/utils.py +++ b/flepimop/gempyor_pkg/src/gempyor/utils.py @@ -13,6 +13,7 @@ import shutil import logging from gempyor import file_paths +from typing import List logger = logging.getLogger(__name__) @@ -385,12 +386,31 @@ def create_resume_input_filename(filetype: str, liketype: str) -> str: extension=extension) -def copy_file_based_on_last_job_output(): - last_job_output = os.environ.get("LAST_JOB_OUTPUT") +def get_parquet_types()-> List[str]: resume_discard_seeding = os.environ.get("RESUME_DISCARD_SEEDING") - parquet_types = ["seed", "spar", "snpi", "hpar", "hnpi", "init"] - if resume_discard_seeding == "true": - parquet_types.remove("seed") + flepi_block_index = os.environ.get("FLEPI_BLOCK_INDEX") + if flepi_block_index == "1": + if resume_discard_seeding == "true": + return ["spar", "snpi", "hpar", "hnpi", "init"] + else: + return ["seed", "spar", "snpi", "hpar", "hnpi", "init"] + else: + return ["seed", "spar", "snpi", "hpar", "hnpi", "host", "llik", "init"] + + +def copy_file_based_on_last_job_output() -> bool: + """ + Copies files based on the last job output. + + This function copies files from the last job output directory to the corresponding output directory + based on the file types and like types. The file names are determined using the `create_resume_input_filename` + and `create_resume_out_filename` functions. + + Returns: + bool: True if all files are successfully copied, False otherwise. + """ + last_job_output = os.environ.get("LAST_JOB_OUTPUT") + parquet_types = get_parquet_types() liketypes = ["global", "chimeric"] file_name_map = dict() @@ -424,3 +444,6 @@ def copy_file_based_on_last_job_output(): print(f"Copy successful for file of type {parquet_type} {in_filename}->{output_file_name}") else: print(f"Could not copy file of type {parquet_type} {in_filename}->{output_file_name}") + return False + + return True diff --git a/flepimop/gempyor_pkg/tests/utils/test_utils.py b/flepimop/gempyor_pkg/tests/utils/test_utils.py index a51a593c1..7f828b103 100644 --- a/flepimop/gempyor_pkg/tests/utils/test_utils.py +++ b/flepimop/gempyor_pkg/tests/utils/test_utils.py @@ -5,7 +5,8 @@ # import dask.dataframe as dd import pyarrow as pa import time - +from typing import List +from unittest.mock import patch from gempyor import utils DATA_DIR = os.path.dirname(__file__) + "/data" @@ -101,13 +102,11 @@ def env_vars(monkeypatch): def test_create_resume_out_filename(env_vars): result = utils.create_resume_out_filename("spar", "global") - expected_filename = """model_output/output/123/spar/global/intermidate - /000000002.000000001.000000001.123.spar.parquet""" + expected_filename = "model_output/output/123/spar/global/intermidate/000000002.000000001.000000001.123.spar.parquet" assert result == expected_filename result2 = utils.create_resume_out_filename("seed", "chimeric") - expected_filename2 = """model_output/output/123/seed/chimeric/intermidate - /000000002.000000001.000000001.123.seed.csv'""" + expected_filename2 = "model_output/output/123/seed/chimeric/intermidate/000000002.000000001.000000001.123.seed.csv" assert result2 == expected_filename2 @@ -121,3 +120,21 @@ def test_create_resume_input_filename(env_vars): result2 = utils.create_resume_input_filename("seed", "chimeric") expect_filename2 = 'model_output/output/321/seed/chimeric/final/000000002.321.seed.csv' assert result2 == expect_filename2 + + +@patch.dict(os.environ, {"RESUME_DISCARD_SEEDING": "true", "FLEPI_BLOCK_INDEX": "1"}) +def test_get_parquet_types_resume_discard_seeding_true_flepi_block_index_1(): + expected_types = ["spar", "snpi", "hpar", "hnpi", "init"] + assert utils.get_parquet_types() == expected_types + + +@patch.dict(os.environ, {"RESUME_DISCARD_SEEDING": "false", "FLEPI_BLOCK_INDEX": "1"}) +def test_get_parquet_types_resume_discard_seeding_false_flepi_block_index_1(): + expected_types = ["seed", "spar", "snpi", "hpar", "hnpi", "init"] + assert utils.get_parquet_types() == expected_types + + +@patch.dict(os.environ, {"FLEPI_BLOCK_INDEX": "2"}) +def test_get_parquet_types_flepi_block_index_2(): + expected_types = ["seed", "spar", "snpi", "hpar", "hnpi", "host", "llik", "init"] + assert utils.get_parquet_types() == expected_types \ No newline at end of file From bb5c191e647a43b5a5fc6fd677e0eb80c070eef2 Mon Sep 17 00:00:00 2001 From: fang19911030 Date: Fri, 10 May 2024 10:53:46 -0400 Subject: [PATCH 06/13] add functions to create resume file name map and download from s3 bucket --- flepimop/gempyor_pkg/src/gempyor/utils.py | 147 +++++++++++++----- .../gempyor_pkg/tests/utils/test_utils.py | 10 +- 2 files changed, 113 insertions(+), 44 deletions(-) diff --git a/flepimop/gempyor_pkg/src/gempyor/utils.py b/flepimop/gempyor_pkg/src/gempyor/utils.py index 44e42463e..6b937cd18 100644 --- a/flepimop/gempyor_pkg/src/gempyor/utils.py +++ b/flepimop/gempyor_pkg/src/gempyor/utils.py @@ -12,8 +12,10 @@ import subprocess import shutil import logging +import boto3 from gempyor import file_paths -from typing import List +from typing import List, Dict +from botocore.exceptions import ClientError logger = logging.getLogger(__name__) @@ -352,6 +354,7 @@ def bash(command): print(f"lsblk: {bash('lsblk')}") print("END AWS DIAGNOSIS ================================") + def create_resume_out_filename(filetype: str, liketype: str) -> str: run_id = os.environ.get("FLEPI_RUN_INDEX") prefix = f"{os.environ.get('FLEPI_PREFIX')}/{os.environ.get('FLEPI_RUN_INDEX')}" @@ -386,7 +389,16 @@ def create_resume_input_filename(filetype: str, liketype: str) -> str: extension=extension) -def get_parquet_types()-> List[str]: +def get_parquet_types_for_resume() -> List[str]: + """ + Retrieves a list of parquet file types that are relevant for resuming a process based on + specific environment variable settings. This function dynamically determines the list + based on the current operational context given by the environment. + + The function checks two environment variables: + - `RESUME_DISCARD_SEEDING`: Determines whether seeding-related file types should be included. + - `FLEPI_BLOCK_INDEX`: Determines a specific operational mode or block of the process. + """ resume_discard_seeding = os.environ.get("RESUME_DISCARD_SEEDING") flepi_block_index = os.environ.get("FLEPI_BLOCK_INDEX") if flepi_block_index == "1": @@ -398,52 +410,103 @@ def get_parquet_types()-> List[str]: return ["seed", "spar", "snpi", "hpar", "hnpi", "host", "llik", "init"] -def copy_file_based_on_last_job_output() -> bool: +def create_resume_file_names_map() -> Dict[str, str]: """ - Copies files based on the last job output. - - This function copies files from the last job output directory to the corresponding output directory - based on the file types and like types. The file names are determined using the `create_resume_input_filename` - and `create_resume_out_filename` functions. + Generates a mapping of input file names to output file names for a resume process based on + parquet file types and environmental conditions. The function adjusts the file name mappings + based on the operational block index and the location of the last job output. + + The mappings depend on: + - Parquet file types appropriate for resuming a process, as determined by the environment. + - Whether the files are for 'global' or 'chimeric' types, as these liketypes influence the + file naming convention. + - The operational block index ('FLEPI_BLOCK_INDEX'), which can alter the input file names for + block index '1'. + - The presence and value of 'LAST_JOB_OUTPUT' environment variable, which if set to an S3 path, + adjusts the keys in the mapping to be prefixed with this path. Returns: - bool: True if all files are successfully copied, False otherwise. + Dict[str, str]: A dictionary where keys are input file paths and values are corresponding + output file paths. The paths may be modified by the 'LAST_JOB_OUTPUT' if it + is set and points to an S3 location. + + Raises: + No explicit exceptions are raised within the function, but it relies heavily on external + functions and environment variables which if improperly configured could lead to unexpected + behavior. """ - last_job_output = os.environ.get("LAST_JOB_OUTPUT") - parquet_types = get_parquet_types() + parquet_types = get_parquet_types_for_resume() + resume_file_name_mapping = dict() liketypes = ["global", "chimeric"] - file_name_map = dict() - for filetype in parquet_types: for liketype in liketypes: - input_file_name = create_resume_input_filename(filetype=filetype, liketype=liketype) output_file_name = create_resume_out_filename(filetype=filetype, liketype=liketype) - file_name_map[input_file_name] = output_file_name - + input_file_name = output_file_name + if os.environ.get("FLEPI_BLOCK_INDEX") == "1": + input_file_name = create_resume_input_filename(filetype=filetype, liketype=liketype) + resume_file_name_mapping[input_file_name] = output_file_name + + last_job_output = os.environ.get("LAST_JOB_OUTPUT") if last_job_output.find("s3://") >= 0: - for in_filename in file_name_map: - command = ['aws', 's3', 'cp', '--quiet', last_job_output+"/"+in_filename, file_name_map[in_filename]] - try: - result = subprocess.run(command, check=True, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - print("Output:", result.stdout.decode()) - except subprocess.CalledProcessError as e: - print("Error: ", e.stderr.decode()) - else: - first_output_filename = next(iter(file_name_map.values())) - output_dir = os.path.dirname(first_output_filename) - if not os.path.exists(output_dir): - os.makedirs(output_dir) - for in_filename in file_name_map: - shutil.copy(os.path.join(last_job_output, in_filename), file_name_map[in_filename]) - - for in_filename in file_name_map: - output_file_name = file_name_map[in_filename] - parquet_type = [ptype for ptype in parquet_types if ptype in output_file_name] - if os.path.exists(output_file_name): - print(f"Copy successful for file of type {parquet_type} {in_filename}->{output_file_name}") - else: - print(f"Could not copy file of type {parquet_type} {in_filename}->{output_file_name}") - return False - - return True + old_keys = list(resume_file_name_mapping.keys()) + for k in old_keys: + new_key = os.path.join(last_job_output, k) + resume_file_name_mapping[new_key] = resume_file_name_mapping[k] + del resume_file_name_mapping[k] + return resume_file_name_mapping + + +def download_file_from_s3(name_map: Dict[str, str]) -> None: + """ + Downloads files from AWS S3 based on a mapping of S3 URIs to local file paths. The function + checks if the directory for the first output file exists and creates it if necessary. It + then iterates over each S3 URI in the provided mapping, downloads the file to the corresponding + local path, and handles errors if the S3 URI format is incorrect or if the download fails. + + Parameters: + name_map (Dict[str, str]): A dictionary where keys are S3 URIs (strings) and values + are the local file paths (strings) where the files should + be saved. + + Returns: + None: This function does not return a value; its primary effect is the side effect of + downloading files and potentially creating directories. + + Raises: + ValueError: If an S3 URI does not start with 's3://', indicating an invalid format. + ClientError: If an error occurs during the download from S3, such as a permissions issue, + a missing file, or network-related errors. These are caught and logged but not + re-raised, to allow the function to attempt subsequent downloads. + + Examples: + >>> name_map = { + "s3://mybucket/data/file1.txt": "/local/path/to/file1.txt", + "s3://mybucket/data/file2.txt": "/local/path/to/file2.txt" + } + >>> download_file_from_s3(name_map) + # This would download 'file1.txt' and 'file2.txt' from 'mybucket' on S3 to the specified local paths. + + # If an S3 URI is malformed: + >>> name_map = { + "http://wrongurl.com/data/file1.txt": "/local/path/to/file1.txt" + } + >>> download_file_from_s3(name_map) + # This will raise a ValueError indicating the invalid S3 URI format. + """ + s3 = boto3.client('s3') + first_output_filename = next(iter(name_map.values())) + output_dir = os.path.dirname(first_output_filename) + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + for s3_uri in name_map: + try: + if s3_uri.startswith('s3://'): + bucket = s3_uri.split('/')[2] + object = s3_uri[len(bucket)+6:] + s3.download_file(bucket, object, name_map[s3_uri]) + else: + raise ValueError(f'Invalid S3 URI format {s3_uri}') + except ClientError as e: + print(f"An error occurred: {e}") + print("Could not download file from s3") \ No newline at end of file diff --git a/flepimop/gempyor_pkg/tests/utils/test_utils.py b/flepimop/gempyor_pkg/tests/utils/test_utils.py index 7f828b103..0d408c90e 100644 --- a/flepimop/gempyor_pkg/tests/utils/test_utils.py +++ b/flepimop/gempyor_pkg/tests/utils/test_utils.py @@ -5,7 +5,6 @@ # import dask.dataframe as dd import pyarrow as pa import time -from typing import List from unittest.mock import patch from gempyor import utils @@ -98,6 +97,7 @@ def env_vars(monkeypatch): monkeypatch.setenv("FLEPI_SLOT_INDEX", "2") monkeypatch.setenv("FLEPI_BLOCK_INDEX", "2") monkeypatch.setenv("FLEPI_RUN_INDEX", "123") + monkeypatch.setenv("LAST_JOB_OUTPUT", "s3://bucket") def test_create_resume_out_filename(env_vars): @@ -137,4 +137,10 @@ def test_get_parquet_types_resume_discard_seeding_false_flepi_block_index_1(): @patch.dict(os.environ, {"FLEPI_BLOCK_INDEX": "2"}) def test_get_parquet_types_flepi_block_index_2(): expected_types = ["seed", "spar", "snpi", "hpar", "hnpi", "host", "llik", "init"] - assert utils.get_parquet_types() == expected_types \ No newline at end of file + assert utils.get_parquet_types() == expected_types + + +def test_create_resume_file_names_map(env_vars): + name_map = utils.create_resume_file_names_map() + for k in name_map: + assert k.find("s3://bucket") >= 0 From 34394a341d5b89e17b29fa189447490adb2c618c Mon Sep 17 00:00:00 2001 From: fang19911030 Date: Tue, 14 May 2024 08:59:56 -0400 Subject: [PATCH 07/13] correct wrong functions name in tests --- flepimop/gempyor_pkg/tests/utils/test_utils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flepimop/gempyor_pkg/tests/utils/test_utils.py b/flepimop/gempyor_pkg/tests/utils/test_utils.py index 0d408c90e..260a623ac 100644 --- a/flepimop/gempyor_pkg/tests/utils/test_utils.py +++ b/flepimop/gempyor_pkg/tests/utils/test_utils.py @@ -125,19 +125,19 @@ def test_create_resume_input_filename(env_vars): @patch.dict(os.environ, {"RESUME_DISCARD_SEEDING": "true", "FLEPI_BLOCK_INDEX": "1"}) def test_get_parquet_types_resume_discard_seeding_true_flepi_block_index_1(): expected_types = ["spar", "snpi", "hpar", "hnpi", "init"] - assert utils.get_parquet_types() == expected_types + assert utils.get_parquet_types_for_resume() == expected_types @patch.dict(os.environ, {"RESUME_DISCARD_SEEDING": "false", "FLEPI_BLOCK_INDEX": "1"}) def test_get_parquet_types_resume_discard_seeding_false_flepi_block_index_1(): expected_types = ["seed", "spar", "snpi", "hpar", "hnpi", "init"] - assert utils.get_parquet_types() == expected_types + assert utils.get_parquet_types_for_resume() == expected_types @patch.dict(os.environ, {"FLEPI_BLOCK_INDEX": "2"}) def test_get_parquet_types_flepi_block_index_2(): expected_types = ["seed", "spar", "snpi", "hpar", "hnpi", "host", "llik", "init"] - assert utils.get_parquet_types() == expected_types + assert utils.get_parquet_types_for_resume() == expected_types def test_create_resume_file_names_map(env_vars): From aab8388b290492a3e9326524e0b5b7c9e3ae83da Mon Sep 17 00:00:00 2001 From: kjsato Date: Tue, 14 May 2024 12:41:47 -0400 Subject: [PATCH 08/13] modified to use a common style in the function 'create_resume_out_filename()' in utils.py --- flepimop/gempyor_pkg/src/gempyor/utils.py | 86 ++++++++++++----------- 1 file changed, 46 insertions(+), 40 deletions(-) diff --git a/flepimop/gempyor_pkg/src/gempyor/utils.py b/flepimop/gempyor_pkg/src/gempyor/utils.py index 6b937cd18..1f4cc55e6 100644 --- a/flepimop/gempyor_pkg/src/gempyor/utils.py +++ b/flepimop/gempyor_pkg/src/gempyor/utils.py @@ -359,20 +359,24 @@ def create_resume_out_filename(filetype: str, liketype: str) -> str: run_id = os.environ.get("FLEPI_RUN_INDEX") prefix = f"{os.environ.get('FLEPI_PREFIX')}/{os.environ.get('FLEPI_RUN_INDEX')}" inference_filepath_suffix = f"{liketype}/intermidate" - FLEPI_SLOT_INDEX = int(os.environ.get("FLEPI_SLOT_INDEX")) - inference_filename_prefix='%09d.' % FLEPI_SLOT_INDEX - index='{:09d}.{:09d}'.format(1, int(os.environ.get("FLEPI_BLOCK_INDEX"))-1) + # FLEPI_SLOT_INDEX = int(os.environ.get("FLEPI_SLOT_INDEX")) + # inference_filename_prefix='%09d.' % FLEPI_SLOT_INDEX + inference_filename_prefix = "{:09d}.".format(int(os.environ.get("FLEPI_SLOT_INDEX"))) + index = "{:09d}.{:09d}".format(1, int(os.environ.get("FLEPI_BLOCK_INDEX")) - 1) extension = "parquet" if filetype == "seed": extension = "csv" - return file_paths.create_file_name(run_id=run_id, - prefix=prefix, - inference_filename_prefix=inference_filename_prefix, - inference_filepath_suffix=inference_filepath_suffix, - index=index, - ftype=filetype, - extension=extension) - + return file_paths.create_file_name( + run_id=run_id, + prefix=prefix, + inference_filename_prefix=inference_filename_prefix, + inference_filepath_suffix=inference_filepath_suffix, + index=index, + ftype=filetype, + extension=extension, + ) + + def create_resume_input_filename(filetype: str, liketype: str) -> str: run_id = os.environ.get("RESUME_RUN_INDEX") prefix = f"{os.environ.get('FLEPI_PREFIX')}/{os.environ.get('RESUME_RUN_INDEX')}" @@ -381,18 +385,20 @@ def create_resume_input_filename(filetype: str, liketype: str) -> str: extension = "parquet" if filetype == "seed": extension = "csv" - return file_paths.create_file_name(run_id=run_id, - prefix=prefix, - inference_filepath_suffix=inference_filepath_suffix, - index=index, - ftype=filetype, - extension=extension) + return file_paths.create_file_name( + run_id=run_id, + prefix=prefix, + inference_filepath_suffix=inference_filepath_suffix, + index=index, + ftype=filetype, + extension=extension, + ) def get_parquet_types_for_resume() -> List[str]: """ - Retrieves a list of parquet file types that are relevant for resuming a process based on - specific environment variable settings. This function dynamically determines the list + Retrieves a list of parquet file types that are relevant for resuming a process based on + specific environment variable settings. This function dynamically determines the list based on the current operational context given by the environment. The function checks two environment variables: @@ -408,7 +414,7 @@ def get_parquet_types_for_resume() -> List[str]: return ["seed", "spar", "snpi", "hpar", "hnpi", "init"] else: return ["seed", "spar", "snpi", "hpar", "hnpi", "host", "llik", "init"] - + def create_resume_file_names_map() -> Dict[str, str]: """ @@ -418,21 +424,21 @@ def create_resume_file_names_map() -> Dict[str, str]: The mappings depend on: - Parquet file types appropriate for resuming a process, as determined by the environment. - - Whether the files are for 'global' or 'chimeric' types, as these liketypes influence the + - Whether the files are for 'global' or 'chimeric' types, as these liketypes influence the file naming convention. - - The operational block index ('FLEPI_BLOCK_INDEX'), which can alter the input file names for + - The operational block index ('FLEPI_BLOCK_INDEX'), which can alter the input file names for block index '1'. - The presence and value of 'LAST_JOB_OUTPUT' environment variable, which if set to an S3 path, adjusts the keys in the mapping to be prefixed with this path. Returns: - Dict[str, str]: A dictionary where keys are input file paths and values are corresponding - output file paths. The paths may be modified by the 'LAST_JOB_OUTPUT' if it + Dict[str, str]: A dictionary where keys are input file paths and values are corresponding + output file paths. The paths may be modified by the 'LAST_JOB_OUTPUT' if it is set and points to an S3 location. Raises: - No explicit exceptions are raised within the function, but it relies heavily on external - functions and environment variables which if improperly configured could lead to unexpected + No explicit exceptions are raised within the function, but it relies heavily on external + functions and environment variables which if improperly configured could lead to unexpected behavior. """ parquet_types = get_parquet_types_for_resume() @@ -445,7 +451,7 @@ def create_resume_file_names_map() -> Dict[str, str]: if os.environ.get("FLEPI_BLOCK_INDEX") == "1": input_file_name = create_resume_input_filename(filetype=filetype, liketype=liketype) resume_file_name_mapping[input_file_name] = output_file_name - + last_job_output = os.environ.get("LAST_JOB_OUTPUT") if last_job_output.find("s3://") >= 0: old_keys = list(resume_file_name_mapping.keys()) @@ -458,24 +464,24 @@ def create_resume_file_names_map() -> Dict[str, str]: def download_file_from_s3(name_map: Dict[str, str]) -> None: """ - Downloads files from AWS S3 based on a mapping of S3 URIs to local file paths. The function - checks if the directory for the first output file exists and creates it if necessary. It - then iterates over each S3 URI in the provided mapping, downloads the file to the corresponding + Downloads files from AWS S3 based on a mapping of S3 URIs to local file paths. The function + checks if the directory for the first output file exists and creates it if necessary. It + then iterates over each S3 URI in the provided mapping, downloads the file to the corresponding local path, and handles errors if the S3 URI format is incorrect or if the download fails. Parameters: - name_map (Dict[str, str]): A dictionary where keys are S3 URIs (strings) and values - are the local file paths (strings) where the files should + name_map (Dict[str, str]): A dictionary where keys are S3 URIs (strings) and values + are the local file paths (strings) where the files should be saved. Returns: - None: This function does not return a value; its primary effect is the side effect of + None: This function does not return a value; its primary effect is the side effect of downloading files and potentially creating directories. Raises: ValueError: If an S3 URI does not start with 's3://', indicating an invalid format. ClientError: If an error occurs during the download from S3, such as a permissions issue, - a missing file, or network-related errors. These are caught and logged but not + a missing file, or network-related errors. These are caught and logged but not re-raised, to allow the function to attempt subsequent downloads. Examples: @@ -493,7 +499,7 @@ def download_file_from_s3(name_map: Dict[str, str]) -> None: >>> download_file_from_s3(name_map) # This will raise a ValueError indicating the invalid S3 URI format. """ - s3 = boto3.client('s3') + s3 = boto3.client("s3") first_output_filename = next(iter(name_map.values())) output_dir = os.path.dirname(first_output_filename) if not os.path.exists(output_dir): @@ -501,12 +507,12 @@ def download_file_from_s3(name_map: Dict[str, str]) -> None: for s3_uri in name_map: try: - if s3_uri.startswith('s3://'): - bucket = s3_uri.split('/')[2] - object = s3_uri[len(bucket)+6:] + if s3_uri.startswith("s3://"): + bucket = s3_uri.split("/")[2] + object = s3_uri[len(bucket) + 6 :] s3.download_file(bucket, object, name_map[s3_uri]) else: - raise ValueError(f'Invalid S3 URI format {s3_uri}') + raise ValueError(f"Invalid S3 URI format {s3_uri}") except ClientError as e: print(f"An error occurred: {e}") - print("Could not download file from s3") \ No newline at end of file + print("Could not download file from s3") From dc44991003f33f14f7c39bf4cfe129820f628b95 Mon Sep 17 00:00:00 2001 From: fang19911030 Date: Wed, 15 May 2024 09:46:29 -0400 Subject: [PATCH 09/13] address requested changes by koji --- flepimop/gempyor_pkg/src/gempyor/utils.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/flepimop/gempyor_pkg/src/gempyor/utils.py b/flepimop/gempyor_pkg/src/gempyor/utils.py index 1f4cc55e6..6c2aeb18d 100644 --- a/flepimop/gempyor_pkg/src/gempyor/utils.py +++ b/flepimop/gempyor_pkg/src/gempyor/utils.py @@ -359,8 +359,6 @@ def create_resume_out_filename(filetype: str, liketype: str) -> str: run_id = os.environ.get("FLEPI_RUN_INDEX") prefix = f"{os.environ.get('FLEPI_PREFIX')}/{os.environ.get('FLEPI_RUN_INDEX')}" inference_filepath_suffix = f"{liketype}/intermidate" - # FLEPI_SLOT_INDEX = int(os.environ.get("FLEPI_SLOT_INDEX")) - # inference_filename_prefix='%09d.' % FLEPI_SLOT_INDEX inference_filename_prefix = "{:09d}.".format(int(os.environ.get("FLEPI_SLOT_INDEX"))) index = "{:09d}.{:09d}".format(1, int(os.environ.get("FLEPI_BLOCK_INDEX")) - 1) extension = "parquet" From 5498298cf3b0d69f18c99b5bf1b0af730d89ce8a Mon Sep 17 00:00:00 2001 From: fang19911030 Date: Mon, 17 Jun 2024 10:49:40 -0400 Subject: [PATCH 10/13] add functions to copy files at local and get rid of the reading of environment variables --- flepimop/gempyor_pkg/src/gempyor/utils.py | 84 +++++++++++++------ .../gempyor_pkg/tests/utils/test_utils.py | 54 ++++++++---- 2 files changed, 97 insertions(+), 41 deletions(-) diff --git a/flepimop/gempyor_pkg/src/gempyor/utils.py b/flepimop/gempyor_pkg/src/gempyor/utils.py index 6c2aeb18d..ab0e4302f 100644 --- a/flepimop/gempyor_pkg/src/gempyor/utils.py +++ b/flepimop/gempyor_pkg/src/gempyor/utils.py @@ -355,17 +355,21 @@ def bash(command): print("END AWS DIAGNOSIS ================================") -def create_resume_out_filename(filetype: str, liketype: str) -> str: - run_id = os.environ.get("FLEPI_RUN_INDEX") - prefix = f"{os.environ.get('FLEPI_PREFIX')}/{os.environ.get('FLEPI_RUN_INDEX')}" - inference_filepath_suffix = f"{liketype}/intermidate" - inference_filename_prefix = "{:09d}.".format(int(os.environ.get("FLEPI_SLOT_INDEX"))) - index = "{:09d}.{:09d}".format(1, int(os.environ.get("FLEPI_BLOCK_INDEX")) - 1) +def create_resume_out_filename(flepi_run_index: str, + flepi_prefix: str, + flepi_slot_index: str, + flepi_block_index: str, + filetype: str, + liketype: str) -> str: + prefix = f"{flepi_prefix}/{flepi_run_index}" + inference_filepath_suffix = f"{liketype}/intermediate" + inference_filename_prefix = "{:09d}.".format(int(flepi_slot_index)) + index = "{:09d}.{:09d}".format(1, int(flepi_block_index) - 1) extension = "parquet" if filetype == "seed": extension = "csv" return file_paths.create_file_name( - run_id=run_id, + run_id=flepi_run_index, prefix=prefix, inference_filename_prefix=inference_filename_prefix, inference_filepath_suffix=inference_filepath_suffix, @@ -375,16 +379,15 @@ def create_resume_out_filename(filetype: str, liketype: str) -> str: ) -def create_resume_input_filename(filetype: str, liketype: str) -> str: - run_id = os.environ.get("RESUME_RUN_INDEX") - prefix = f"{os.environ.get('FLEPI_PREFIX')}/{os.environ.get('RESUME_RUN_INDEX')}" +def create_resume_input_filename(resume_run_index: str, flepi_prefix: str, flepi_slot_index: str, filetype: str, liketype: str) -> str: + prefix = f"{flepi_prefix}/{resume_run_index}" inference_filepath_suffix = f"{liketype}/final" - index = os.environ.get("FLEPI_SLOT_INDEX") + index = flepi_slot_index extension = "parquet" if filetype == "seed": extension = "csv" return file_paths.create_file_name( - run_id=run_id, + run_id=resume_run_index, prefix=prefix, inference_filepath_suffix=inference_filepath_suffix, index=index, @@ -393,18 +396,16 @@ def create_resume_input_filename(filetype: str, liketype: str) -> str: ) -def get_parquet_types_for_resume() -> List[str]: +def get_filetype_for_resume(resume_discard_seeding: str, flepi_block_index: str) -> List[str]: """ Retrieves a list of parquet file types that are relevant for resuming a process based on specific environment variable settings. This function dynamically determines the list based on the current operational context given by the environment. The function checks two environment variables: - - `RESUME_DISCARD_SEEDING`: Determines whether seeding-related file types should be included. - - `FLEPI_BLOCK_INDEX`: Determines a specific operational mode or block of the process. + - `resume_discard_seeding`: Determines whether seeding-related file types should be included. + - `flepi_block_index`: Determines a specific operational mode or block of the process. """ - resume_discard_seeding = os.environ.get("RESUME_DISCARD_SEEDING") - flepi_block_index = os.environ.get("FLEPI_BLOCK_INDEX") if flepi_block_index == "1": if resume_discard_seeding == "true": return ["spar", "snpi", "hpar", "hnpi", "init"] @@ -414,7 +415,14 @@ def get_parquet_types_for_resume() -> List[str]: return ["seed", "spar", "snpi", "hpar", "hnpi", "host", "llik", "init"] -def create_resume_file_names_map() -> Dict[str, str]: +def create_resume_file_names_map(resume_discard_seeding, + flepi_block_index, + resume_run_index, + flepi_prefix, + flepi_slot_index, + flepi_run_index, + last_job_output + ) -> Dict[str, str]: """ Generates a mapping of input file names to output file names for a resume process based on parquet file types and environmental conditions. The function adjusts the file name mappings @@ -439,18 +447,26 @@ def create_resume_file_names_map() -> Dict[str, str]: functions and environment variables which if improperly configured could lead to unexpected behavior. """ - parquet_types = get_parquet_types_for_resume() + file_types = get_filetype_for_resume(resume_discard_seeding=resume_discard_seeding, + flepi_block_index=flepi_block_index) resume_file_name_mapping = dict() liketypes = ["global", "chimeric"] - for filetype in parquet_types: + for filetype in file_types: for liketype in liketypes: - output_file_name = create_resume_out_filename(filetype=filetype, liketype=liketype) + output_file_name = create_resume_out_filename(flepi_run_index=flepi_run_index, + flepi_prefix=flepi_prefix, + flepi_slot_index=flepi_slot_index, + flepi_block_index=flepi_block_index, + filetype=filetype, + liketype=liketype) input_file_name = output_file_name if os.environ.get("FLEPI_BLOCK_INDEX") == "1": - input_file_name = create_resume_input_filename(filetype=filetype, liketype=liketype) + input_file_name = create_resume_input_filename(resume_run_index=resume_run_index, + flepi_prefix=flepi_prefix, + flepi_slot_index=flepi_slot_index, + filetype=filetype, + liketype=liketype) resume_file_name_mapping[input_file_name] = output_file_name - - last_job_output = os.environ.get("LAST_JOB_OUTPUT") if last_job_output.find("s3://") >= 0: old_keys = list(resume_file_name_mapping.keys()) for k in old_keys: @@ -514,3 +530,23 @@ def download_file_from_s3(name_map: Dict[str, str]) -> None: except ClientError as e: print(f"An error occurred: {e}") print("Could not download file from s3") + +def move_file_at_local(name_map: Dict[str, str]) -> None: + """ + Moves files locally according to a given mapping. + + This function takes a dictionary where the keys are source file paths and + the values are destination file paths. It ensures that the destination + directories exist and then copies the files from the source paths to the + destination paths. + + Parameters: + name_map (Dict[str, str]): A dictionary mapping source file paths to + destination file paths. + + Returns: + None + """ + for src, dst in name_map.items(): + os.path.makedirs(os.path.dirname(dst), exist_ok = True) + shutil.copy(src, dst) \ No newline at end of file diff --git a/flepimop/gempyor_pkg/tests/utils/test_utils.py b/flepimop/gempyor_pkg/tests/utils/test_utils.py index 260a623ac..3743df4f3 100644 --- a/flepimop/gempyor_pkg/tests/utils/test_utils.py +++ b/flepimop/gempyor_pkg/tests/utils/test_utils.py @@ -101,46 +101,66 @@ def env_vars(monkeypatch): def test_create_resume_out_filename(env_vars): - result = utils.create_resume_out_filename("spar", "global") - expected_filename = "model_output/output/123/spar/global/intermidate/000000002.000000001.000000001.123.spar.parquet" + result = utils.create_resume_out_filename(flepi_run_index="123", + flepi_prefix="output", + flepi_slot_index="2", + flepi_block_index="2", + filetype = "spar", + liketype = "global") + expected_filename = "model_output/output/123/spar/global/intermediate/000000002.000000001.000000001.123.spar.parquet" assert result == expected_filename - result2 = utils.create_resume_out_filename("seed", "chimeric") - expected_filename2 = "model_output/output/123/seed/chimeric/intermidate/000000002.000000001.000000001.123.seed.csv" + result2 = utils.create_resume_out_filename(flepi_run_index="123", + flepi_prefix="output", + flepi_slot_index="2", + flepi_block_index="2", + filetype = "seed", + liketype = "chimeric") + expected_filename2 = "model_output/output/123/seed/chimeric/intermediate/000000002.000000001.000000001.123.seed.csv" assert result2 == expected_filename2 def test_create_resume_input_filename(env_vars): - result = utils.create_resume_input_filename("spar", "global") + result = utils.create_resume_input_filename(flepi_slot_index="2", + resume_run_index="321", + flepi_prefix="output", + filetype="spar", + liketype="global") expect_filename = 'model_output/output/321/spar/global/final/000000002.321.spar.parquet' assert result == expect_filename - result2 = utils.create_resume_input_filename("seed", "chimeric") + result2 = utils.create_resume_input_filename(flepi_slot_index="2", + resume_run_index="321", + flepi_prefix="output", + filetype="seed", liketype="chimeric") expect_filename2 = 'model_output/output/321/seed/chimeric/final/000000002.321.seed.csv' assert result2 == expect_filename2 -@patch.dict(os.environ, {"RESUME_DISCARD_SEEDING": "true", "FLEPI_BLOCK_INDEX": "1"}) -def test_get_parquet_types_resume_discard_seeding_true_flepi_block_index_1(): +def test_get_filetype_resume_discard_seeding_true_flepi_block_index_1(): expected_types = ["spar", "snpi", "hpar", "hnpi", "init"] - assert utils.get_parquet_types_for_resume() == expected_types + assert utils.get_filetype_for_resume(resume_discard_seeding="true", flepi_block_index="1") == expected_types -@patch.dict(os.environ, {"RESUME_DISCARD_SEEDING": "false", "FLEPI_BLOCK_INDEX": "1"}) -def test_get_parquet_types_resume_discard_seeding_false_flepi_block_index_1(): +def test_get_filetype_resume_discard_seeding_false_flepi_block_index_1(): expected_types = ["seed", "spar", "snpi", "hpar", "hnpi", "init"] - assert utils.get_parquet_types_for_resume() == expected_types + assert utils.get_filetype_for_resume(resume_discard_seeding="false", flepi_block_index="1") == expected_types -@patch.dict(os.environ, {"FLEPI_BLOCK_INDEX": "2"}) -def test_get_parquet_types_flepi_block_index_2(): +def test_get_filetype_flepi_block_index_2(): expected_types = ["seed", "spar", "snpi", "hpar", "hnpi", "host", "llik", "init"] - assert utils.get_parquet_types_for_resume() == expected_types + assert utils.get_filetype_for_resume(resume_discard_seeding="false", flepi_block_index="2") == expected_types -def test_create_resume_file_names_map(env_vars): - name_map = utils.create_resume_file_names_map() +def test_create_resume_file_names_map(): + name_map = utils.create_resume_file_names_map(resume_discard_seeding="false", + flepi_block_index="2", + resume_run_index="321", + flepi_prefix="output", + flepi_slot_index="2", + flepi_run_index="123", + last_job_output="s3://bucket") for k in name_map: assert k.find("s3://bucket") >= 0 From ef0c86447e77fea4ddf95aa8502a4b279d78fea0 Mon Sep 17 00:00:00 2001 From: fang19911030 Date: Tue, 18 Jun 2024 09:26:31 -0400 Subject: [PATCH 11/13] remove unnecessary test code --- flepimop/gempyor_pkg/tests/utils/test_utils.py | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/flepimop/gempyor_pkg/tests/utils/test_utils.py b/flepimop/gempyor_pkg/tests/utils/test_utils.py index 3743df4f3..254e8290f 100644 --- a/flepimop/gempyor_pkg/tests/utils/test_utils.py +++ b/flepimop/gempyor_pkg/tests/utils/test_utils.py @@ -1,11 +1,8 @@ import pytest import os import pandas as pd - -# import dask.dataframe as dd import pyarrow as pa import time -from unittest.mock import patch from gempyor import utils DATA_DIR = os.path.dirname(__file__) + "/data" @@ -89,17 +86,6 @@ def test_get_log_normal_success(): utils.get_log_normal(meanlog=0, sdlog=1) -@pytest.fixture -def env_vars(monkeypatch): - # Setting environment variables for the test - monkeypatch.setenv("RESUME_RUN_INDEX", "321") - monkeypatch.setenv("FLEPI_PREFIX", "output") - monkeypatch.setenv("FLEPI_SLOT_INDEX", "2") - monkeypatch.setenv("FLEPI_BLOCK_INDEX", "2") - monkeypatch.setenv("FLEPI_RUN_INDEX", "123") - monkeypatch.setenv("LAST_JOB_OUTPUT", "s3://bucket") - - def test_create_resume_out_filename(env_vars): result = utils.create_resume_out_filename(flepi_run_index="123", flepi_prefix="output", From 413485a1f9ce47c855d06a745ccd7ddde682f241 Mon Sep 17 00:00:00 2001 From: fang19911030 Date: Tue, 18 Jun 2024 09:42:03 -0400 Subject: [PATCH 12/13] format code --- flepimop/gempyor_pkg/src/gempyor/utils.py | 83 +++++++++---------- .../gempyor_pkg/tests/utils/test_utils.py | 77 ++++++++--------- 2 files changed, 79 insertions(+), 81 deletions(-) diff --git a/flepimop/gempyor_pkg/src/gempyor/utils.py b/flepimop/gempyor_pkg/src/gempyor/utils.py index ab0e4302f..4d3209061 100644 --- a/flepimop/gempyor_pkg/src/gempyor/utils.py +++ b/flepimop/gempyor_pkg/src/gempyor/utils.py @@ -227,15 +227,11 @@ def as_random_distribution(self): dist = self["distribution"].get() if dist == "fixed": return functools.partial( - np.random.uniform, - self["value"].as_evaled_expression(), - self["value"].as_evaled_expression(), + np.random.uniform, self["value"].as_evaled_expression(), self["value"].as_evaled_expression(), ) elif dist == "uniform": return functools.partial( - np.random.uniform, - self["low"].as_evaled_expression(), - self["high"].as_evaled_expression(), + np.random.uniform, self["low"].as_evaled_expression(), self["high"].as_evaled_expression(), ) elif dist == "poisson": return functools.partial(np.random.poisson, self["lam"].as_evaled_expression()) @@ -260,18 +256,13 @@ def as_random_distribution(self): ).rvs elif dist == "lognorm": return get_log_normal( - meanlog=self["meanlog"].as_evaled_expression(), - sdlog=self["sdlog"].as_evaled_expression(), + meanlog=self["meanlog"].as_evaled_expression(), sdlog=self["sdlog"].as_evaled_expression(), ).rvs else: raise NotImplementedError(f"unknown distribution [got: {dist}]") else: # we allow a fixed value specified directly: - return functools.partial( - np.random.uniform, - self.as_evaled_expression(), - self.as_evaled_expression(), - ) + return functools.partial(np.random.uniform, self.as_evaled_expression(), self.as_evaled_expression(),) def list_filenames(folder: str = ".", filters: list = []) -> list: @@ -355,12 +346,9 @@ def bash(command): print("END AWS DIAGNOSIS ================================") -def create_resume_out_filename(flepi_run_index: str, - flepi_prefix: str, - flepi_slot_index: str, - flepi_block_index: str, - filetype: str, - liketype: str) -> str: +def create_resume_out_filename( + flepi_run_index: str, flepi_prefix: str, flepi_slot_index: str, flepi_block_index: str, filetype: str, liketype: str +) -> str: prefix = f"{flepi_prefix}/{flepi_run_index}" inference_filepath_suffix = f"{liketype}/intermediate" inference_filename_prefix = "{:09d}.".format(int(flepi_slot_index)) @@ -379,7 +367,9 @@ def create_resume_out_filename(flepi_run_index: str, ) -def create_resume_input_filename(resume_run_index: str, flepi_prefix: str, flepi_slot_index: str, filetype: str, liketype: str) -> str: +def create_resume_input_filename( + resume_run_index: str, flepi_prefix: str, flepi_slot_index: str, filetype: str, liketype: str +) -> str: prefix = f"{flepi_prefix}/{resume_run_index}" inference_filepath_suffix = f"{liketype}/final" index = flepi_slot_index @@ -415,14 +405,15 @@ def get_filetype_for_resume(resume_discard_seeding: str, flepi_block_index: str) return ["seed", "spar", "snpi", "hpar", "hnpi", "host", "llik", "init"] -def create_resume_file_names_map(resume_discard_seeding, - flepi_block_index, - resume_run_index, - flepi_prefix, - flepi_slot_index, - flepi_run_index, - last_job_output - ) -> Dict[str, str]: +def create_resume_file_names_map( + resume_discard_seeding, + flepi_block_index, + resume_run_index, + flepi_prefix, + flepi_slot_index, + flepi_run_index, + last_job_output, +) -> Dict[str, str]: """ Generates a mapping of input file names to output file names for a resume process based on parquet file types and environmental conditions. The function adjusts the file name mappings @@ -447,25 +438,30 @@ def create_resume_file_names_map(resume_discard_seeding, functions and environment variables which if improperly configured could lead to unexpected behavior. """ - file_types = get_filetype_for_resume(resume_discard_seeding=resume_discard_seeding, - flepi_block_index=flepi_block_index) + file_types = get_filetype_for_resume( + resume_discard_seeding=resume_discard_seeding, flepi_block_index=flepi_block_index + ) resume_file_name_mapping = dict() liketypes = ["global", "chimeric"] for filetype in file_types: for liketype in liketypes: - output_file_name = create_resume_out_filename(flepi_run_index=flepi_run_index, - flepi_prefix=flepi_prefix, - flepi_slot_index=flepi_slot_index, - flepi_block_index=flepi_block_index, - filetype=filetype, - liketype=liketype) + output_file_name = create_resume_out_filename( + flepi_run_index=flepi_run_index, + flepi_prefix=flepi_prefix, + flepi_slot_index=flepi_slot_index, + flepi_block_index=flepi_block_index, + filetype=filetype, + liketype=liketype, + ) input_file_name = output_file_name if os.environ.get("FLEPI_BLOCK_INDEX") == "1": - input_file_name = create_resume_input_filename(resume_run_index=resume_run_index, - flepi_prefix=flepi_prefix, - flepi_slot_index=flepi_slot_index, - filetype=filetype, - liketype=liketype) + input_file_name = create_resume_input_filename( + resume_run_index=resume_run_index, + flepi_prefix=flepi_prefix, + flepi_slot_index=flepi_slot_index, + filetype=filetype, + liketype=liketype, + ) resume_file_name_mapping[input_file_name] = output_file_name if last_job_output.find("s3://") >= 0: old_keys = list(resume_file_name_mapping.keys()) @@ -531,6 +527,7 @@ def download_file_from_s3(name_map: Dict[str, str]) -> None: print(f"An error occurred: {e}") print("Could not download file from s3") + def move_file_at_local(name_map: Dict[str, str]) -> None: """ Moves files locally according to a given mapping. @@ -548,5 +545,5 @@ def move_file_at_local(name_map: Dict[str, str]) -> None: None """ for src, dst in name_map.items(): - os.path.makedirs(os.path.dirname(dst), exist_ok = True) - shutil.copy(src, dst) \ No newline at end of file + os.path.makedirs(os.path.dirname(dst), exist_ok=True) + shutil.copy(src, dst) diff --git a/flepimop/gempyor_pkg/tests/utils/test_utils.py b/flepimop/gempyor_pkg/tests/utils/test_utils.py index 254e8290f..d1d3de50d 100644 --- a/flepimop/gempyor_pkg/tests/utils/test_utils.py +++ b/flepimop/gempyor_pkg/tests/utils/test_utils.py @@ -10,11 +10,7 @@ @pytest.mark.parametrize( - ("fname", "extension"), - [ - ("mobility", "csv"), - ("usa-geoid-params-output", "parquet"), - ], + ("fname", "extension"), [("mobility", "csv"), ("usa-geoid-params-output", "parquet"),], ) def test_read_df_and_write_success(fname, extension): os.chdir(tmp_path) @@ -87,41 +83,44 @@ def test_get_log_normal_success(): def test_create_resume_out_filename(env_vars): - result = utils.create_resume_out_filename(flepi_run_index="123", - flepi_prefix="output", - flepi_slot_index="2", - flepi_block_index="2", - filetype = "spar", - liketype = "global") - expected_filename = "model_output/output/123/spar/global/intermediate/000000002.000000001.000000001.123.spar.parquet" + result = utils.create_resume_out_filename( + flepi_run_index="123", + flepi_prefix="output", + flepi_slot_index="2", + flepi_block_index="2", + filetype="spar", + liketype="global", + ) + expected_filename = ( + "model_output/output/123/spar/global/intermediate/000000002.000000001.000000001.123.spar.parquet" + ) assert result == expected_filename - - result2 = utils.create_resume_out_filename(flepi_run_index="123", - flepi_prefix="output", - flepi_slot_index="2", - flepi_block_index="2", - filetype = "seed", - liketype = "chimeric") + + result2 = utils.create_resume_out_filename( + flepi_run_index="123", + flepi_prefix="output", + flepi_slot_index="2", + flepi_block_index="2", + filetype="seed", + liketype="chimeric", + ) expected_filename2 = "model_output/output/123/seed/chimeric/intermediate/000000002.000000001.000000001.123.seed.csv" assert result2 == expected_filename2 def test_create_resume_input_filename(env_vars): - result = utils.create_resume_input_filename(flepi_slot_index="2", - resume_run_index="321", - flepi_prefix="output", - filetype="spar", - liketype="global") - expect_filename = 'model_output/output/321/spar/global/final/000000002.321.spar.parquet' + result = utils.create_resume_input_filename( + flepi_slot_index="2", resume_run_index="321", flepi_prefix="output", filetype="spar", liketype="global" + ) + expect_filename = "model_output/output/321/spar/global/final/000000002.321.spar.parquet" assert result == expect_filename - - result2 = utils.create_resume_input_filename(flepi_slot_index="2", - resume_run_index="321", - flepi_prefix="output", - filetype="seed", liketype="chimeric") - expect_filename2 = 'model_output/output/321/seed/chimeric/final/000000002.321.seed.csv' + + result2 = utils.create_resume_input_filename( + flepi_slot_index="2", resume_run_index="321", flepi_prefix="output", filetype="seed", liketype="chimeric" + ) + expect_filename2 = "model_output/output/321/seed/chimeric/final/000000002.321.seed.csv" assert result2 == expect_filename2 @@ -141,12 +140,14 @@ def test_get_filetype_flepi_block_index_2(): def test_create_resume_file_names_map(): - name_map = utils.create_resume_file_names_map(resume_discard_seeding="false", - flepi_block_index="2", - resume_run_index="321", - flepi_prefix="output", - flepi_slot_index="2", - flepi_run_index="123", - last_job_output="s3://bucket") + name_map = utils.create_resume_file_names_map( + resume_discard_seeding="false", + flepi_block_index="2", + resume_run_index="321", + flepi_prefix="output", + flepi_slot_index="2", + flepi_run_index="123", + last_job_output="s3://bucket", + ) for k in name_map: assert k.find("s3://bucket") >= 0 From 59cc58eccbec86189016fbf57dd48d071c3ef7bd Mon Sep 17 00:00:00 2001 From: fang19911030 Date: Tue, 18 Jun 2024 09:53:14 -0400 Subject: [PATCH 13/13] fix --- flepimop/gempyor_pkg/tests/utils/test_utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flepimop/gempyor_pkg/tests/utils/test_utils.py b/flepimop/gempyor_pkg/tests/utils/test_utils.py index d1d3de50d..768451b2a 100644 --- a/flepimop/gempyor_pkg/tests/utils/test_utils.py +++ b/flepimop/gempyor_pkg/tests/utils/test_utils.py @@ -82,7 +82,7 @@ def test_get_log_normal_success(): utils.get_log_normal(meanlog=0, sdlog=1) -def test_create_resume_out_filename(env_vars): +def test_create_resume_out_filename(): result = utils.create_resume_out_filename( flepi_run_index="123", flepi_prefix="output", @@ -108,7 +108,7 @@ def test_create_resume_out_filename(env_vars): assert result2 == expected_filename2 -def test_create_resume_input_filename(env_vars): +def test_create_resume_input_filename(): result = utils.create_resume_input_filename( flepi_slot_index="2", resume_run_index="321", flepi_prefix="output", filetype="spar", liketype="global"