diff --git a/flepimop/gempyor_pkg/src/gempyor/utils.py b/flepimop/gempyor_pkg/src/gempyor/utils.py index 6131b5c52..8fc4012b2 100644 --- a/flepimop/gempyor_pkg/src/gempyor/utils.py +++ b/flepimop/gempyor_pkg/src/gempyor/utils.py @@ -7,6 +7,7 @@ import shutil import subprocess import time +import typing from typing import List, Dict, Literal import confuse @@ -99,7 +100,21 @@ def read_df( ) -def command_safe_run(command, command_name="mycommand", fail_on_fail=True): +def command_safe_run(command: str, command_name: str="mycommand", fail_on_fail: bool=True) -> tuple[int, str, str]: + """ + Runs a shell command and prints diagnostics if command fails. + + Args: + command: The CLI command to be given. + command_name: The reference name for you command. Default value is "mycommand". + fail_on_fail: If True, an exception will be thrown if the command fails (default is True) + + Returns: + As a tuple; the return code, the standard output, and standard error from running the command. + + Raises: + Exception: If fail_on_fail=True and the command fails, an exception will be thrown. + """ import subprocess import shlex # using shlex to split the command because it's not obvious https://docs.python.org/3/library/subprocess.html#subprocess.Popen @@ -124,7 +139,15 @@ def command_safe_run(command, command_name="mycommand", fail_on_fail=True): def add_method(cls): - "Decorator to add a method to a class" + """ + A function which adds a function to a class. + + Args: + cls: The class you want to add a method to. + + Returns: + decorator: The decorator. + """ def decorator(func): @functools.wraps(func) @@ -137,7 +160,32 @@ def wrapper(*args, **kwargs): return decorator -def search_and_import_plugins_class(plugin_file_path: str, path_prefix: str, class_name: str, **kwargs): +def search_and_import_plugins_class(plugin_file_path: str, path_prefix: str, class_name: str, **kwargs: dict[str, typing.Any]) -> typing.Any: + """ + Function serving to create a class that finds and imports the necessary modules. + + Args: + plugin_file_path: Pathway to the module. + path_prefix: Pathway prefix to the module. + class_name: Name of the class. + **kwargs: Further arguments passed to initilization of the class. + + Returns: + The instance of the class that was instantiated with provided **kwargs. + + Examples: + Suppose there is a module called `my_plugin.py with a class `MyClass` located at `/path/to/plugin/`. + + Dynamically import and instantiate the class: + + >>> instance = search_and_import_plugins_class('/path/to/plugin', path_prefix, 'MyClass', **params) + + View the instance: + + >>> print(instance) + <__main__.MyClass object at 0x7f8b2c6b4d60> + + """ # Look for all possible plugins and import them # https://stackoverflow.com/questions/67631/how-can-i-import-a-module-dynamically-given-the-full-path # unfortunatelly very complicated, this is cpython only ?? @@ -159,29 +207,40 @@ def search_and_import_plugins_class(plugin_file_path: str, path_prefix: str, cla from functools import wraps -def profile(output_file=None, sort_by="cumulative", lines_to_print=None, strip_dirs=False): - """A time profiler decorator. +def profile(output_file: str = None, sort_by: str = "cumulative", lines_to_print: int = None, strip_dirs: bool = False): + """ + A time profiler decorator. Inspired by and modified the profile decorator of Giampaolo Rodola: http://code.activestate.com/recipes/577817-profile-decorator/ + Args: - output_file: str or None. Default is None + output_file: Path of the output file. If only name of the file is given, it's saved in the current directory. If it's None, the name of the decorated function is used. - sort_by: str or SortKey enum or tuple/list of str/SortKey enum + sort_by: Sorting criteria for the Stats object. For a list of valid string and SortKey refer to: https://docs.python.org/3/library/profile.html#pstats.Stats.sort_stats - lines_to_print: int or None + lines_to_print: Number of lines to print. Default (None) is for all the lines. This is useful in reducing the size of the printout, especially that sorting by 'cumulative', the time consuming operations are printed toward the top of the file. - strip_dirs: bool - Whether to remove the leading path info from file names. - This is also useful in reducing the size of the printout + strip_dirs: + Whether to remove the leading path info from file names. + Returns: - Profile of the decorated function + Profile of the decorated function. + + Examples: + >>> @profile(output_file="my_function.prof") + >>> def my_function(): + # Function body content + pass + >>> my_function() + After running ``my_function``, a file named ``my_function.prof`` will be created in the current WD. + This file contains the profiling data. """ def inner(func): @@ -200,7 +259,16 @@ def wrapper(*args, **kwargs): return inner -def as_list(thing): +def as_list(thing: any) -> list[any]: + """ + Returns argument passed as a list. + + Args: + thing: The object that you would like to be converted to a list. + + Returns: + The object converted to a list. + """ if type(thing) == list: return thing return [thing] @@ -208,6 +276,13 @@ def as_list(thing): ### A little timer class class Timer(object): + """ + A timer class that starts, ends, and records time in between. + + Attributes: + name: Name of event. + tstart: Time start. + """ def __init__(self, name): self.name = name @@ -220,7 +295,20 @@ def __exit__(self, type, value, traceback): class ISO8601Date(confuse.Template): - def convert(self, value, view): + """ + Reads in config dates into datetimes.dates. + """ + def convert(self, value: any, view: confuse.ConfigView): + """ + Converts the given value to a datetime.date object. + + Args: + value: The value to be converted. Can be datetime.date object or ISO8601 string. + view: A view object from confuse, to be used for error reporting. + + Raises: + confuse.TemplateError: If `value` is neither a datetime.date nor an ISO8601Date string. + """ if isinstance(value, datetime.date): return value elif isinstance(value, str): @@ -230,15 +318,28 @@ def convert(self, value, view): @add_method(confuse.ConfigView) -def as_date(self): - "Evaluates an datetime.date or ISO8601 date string, raises ValueError on parsing errors." +def as_date(self) -> datetime.date: + """ + Evaluates an datetime.date or ISO8601 date string. + + Returns: + A datetime.date data type of the date associated with the object. + """ return self.get(ISO8601Date()) @add_method(confuse.ConfigView) def as_evaled_expression(self): - "Evaluates an expression string, returning a float. Raises ValueError on parsing errors." + """ + Evaluates an expression string, returning a float. + + Returns: + A float data type of the value associated with the object. + + Raises: + ValueError: On parsing errors. + """ value = self.get() if isinstance(value, numbers.Number): @@ -258,7 +359,8 @@ def get_truncated_normal( a: float | int = 0, b: float | int = 10, ) -> scipy.stats._distn_infrastructure.rv_frozen: - """Returns a truncated normal distribution. + """ + Returns a truncated normal distribution. This function constructs a truncated normal distribution with the specified mean, standard deviation, and bounds. The truncated normal distribution is @@ -271,8 +373,13 @@ def get_truncated_normal( b: The upper bound of the truncated normal distribution. Defaults to 10. Returns: - rv_frozen: A frozen instance of the truncated normal distribution with the - specified parameters. + rv_frozen: A frozen instance of the truncated normal distribution with the specified parameters. + + Examples: + Create a truncated normal distribution with specified parameters (truncated between 1 and 10): + >>> truncated_normal_dist = get_truncated_normal(mean=5, sd=2, a=1, b=10) + >>> print(truncated_normal_dist) + rv_frozen() """ lower = (a - mean) / sd upper = (b - mean) / sd @@ -283,7 +390,8 @@ def get_log_normal( meanlog: float | int, sdlog: float | int, ) -> scipy.stats._distn_infrastructure.rv_frozen: - """Returns a log normal distribution. + """ + Returns a log normal distribution. This function constructs a log normal distribution with the specified log mean and log standard deviation. @@ -295,13 +403,52 @@ def get_log_normal( Returns: rv_frozen: A frozen instance of the log normal distribution with the specified parameters. + + Examples: + Create a log-normal distribution with specified parameters: + >>> log_normal_dist = get_log_normal(meanlog=1, sdlog=0.5) + >>> print(log_normal_dist) + """ return scipy.stats.lognorm(s=sdlog, scale=np.exp(meanlog), loc=0) @add_method(confuse.ConfigView) def as_random_distribution(self): - "Constructs a random distribution object from a distribution config key" + """ + Constructs a random distribution object from a distribution config key. + + Args: + self: Class instance (in this case, a config key) to construct the random distribution from. + + Returns: + A partial object containing the random distribution. + + Raises: + ValueError: When values are out of range. + NotImplementedError: If an unknown distribution is found. + + Examples: + Say that ``config`` is a ``confuse.ConfigView`` instance. + + To create a uniform distribution between 1 and 10: + >>> dist_function = config.as_random_distribution() + >>> sample = dist_function() + 5.436789235794546 + + To use a truncated normal distribution: + >>> config_truncnorm = confuse.ConfigView({ + "distribution": "truncnorm", + "mean": 0 + "sd": 1, + "a": -1, + "b": 1 + }) + >>> truncnorm_dist_function = config_truncnorm.as_random_distribution() + >>> truncnorm_sample = truncnorm_dist_function() + 0.312745 + ``` + """ if isinstance(self.get(), dict): dist = self["distribution"].get() @@ -349,37 +496,36 @@ def list_filenames( folder: str | bytes | os.PathLike = ".", filters: str | list[str] = [], ) -> list[str]: - """Return the list of all filenames and paths in the provided folder. + """ + Return the list of all filenames and paths in the provided folder. This function lists all files in the specified folder and its subdirectories. If filters are provided, only the files containing each of the substrings in the filters will be returned. - Example: + Args: + folder: + The directory to search for files. Defaults to the current directory. + filters: + A string or a list of strings to filter filenames. Only files + containing all the provided substrings will be returned. Defaults to an + empty list. + + Returns: + A list of strings representing the paths to the files that match the filters. + + Examples: To get all files containing "hosp": - ``` - gempyor.utils.list_filenames( + >>> gempyor.utils.list_filenames( folder="model_output/", filters=["hosp"], ) - ``` To get only "hosp" files with a ".parquet" extension: - ``` - gempyor.utils.list_filenames( + >>> gempyor.utils.list_filenames( folder="model_output/", filters=["hosp", ".parquet"], ) - ``` - - Args: - folder: The directory to search for files. Defaults to the current directory. - filters: A string or a list of strings to filter filenames. Only files - containing all the provided substrings will be returned. Defaults to an - empty list. - - Returns: - A list of strings representing the paths to the files that match the filters. """ filters = [filters] if not isinstance(filters, list) else filters filters = filters if len(filters) else [""] @@ -410,7 +556,7 @@ def rolling_mean_pad( Examples: Below is a brief set of examples showcasing how to smooth a metric, like hospitalizations, using this function. - + ``` >>> import numpy as np >>> from gempyor.utils import rolling_mean_pad >>> hospitalizations = np.arange(1., 29.).reshape((7, 4)) @@ -430,6 +576,7 @@ def rolling_mean_pad( [17. , 18. , 19. , 20. ], [20.2, 21.2, 22.2, 23.2], [22.6, 23.6, 24.6, 25.6]]) + ``` """ weights = (1. / window) * np.ones(window) output = scipy.ndimage.convolve1d(data, weights, axis=0, mode="nearest") @@ -447,11 +594,23 @@ def rolling_mean_pad( def print_disk_diagnosis(): + """ + Reads and prints AWS disk diagnostic information. + """ import os from os import path from shutil import disk_usage - def bash(command): + def bash(command: str) -> str: + """ + Executes a shell command and returns its output. + + Args: + command: The shell command to be executed. + + Returns: + The output of the shell command. + """ output = os.popen(command).read() return output @@ -474,6 +633,33 @@ def bash(command): def create_resume_out_filename( flepi_run_index: str, flepi_prefix: str, flepi_slot_index: str, flepi_block_index: str, filetype: str, liketype: str ) -> str: + """ + Compiles run output information. + + Args: + flepi_run_index: Index of the run. + flepi_prefix: File prefix. + flepi_slot_index: Index of the slot. + flepi_block_index: Index of the block. + filetype: File type. + liketype: Chimeric or global. + + Returns: + The path to a corresponding output file. + + Examples: + Generate an output file with specified parameters: + >>> filename = create_resume_out_filename( + flepi_run_index="test_run", + flepi_prefix="model_output/run_id/", + flepi_slot_index="1", + flepi_block_index="2", + filetype="seed", + liketype="chimeric" + ) + >>> print(filename) + "experiment/001/normal/intermediate/000000123.000000000.1.parquet" + """ prefix = f"{flepi_prefix}/{flepi_run_index}" inference_filepath_suffix = f"{liketype}/intermediate" inference_filename_prefix = "{:09d}.".format(int(flepi_slot_index)) @@ -495,6 +681,31 @@ def create_resume_out_filename( def create_resume_input_filename( resume_run_index: str, flepi_prefix: str, flepi_slot_index: str, filetype: str, liketype: str ) -> str: + """ + Compiles run input information. + + Args: + resume_run_index: Index of the run. + flepi_prefix: File prefix. + flepi_slot_index: Index of the slot. + filetype: File type. + liketype: Chimeric or global. + + Returns: + The path to the a corresponding input file. + + Examples: + Generate an input file with specified parameters: + >>> filename = create_resume_input_filename( + resume_run_index="2", + flepi_prefix="model_output/run_id/", + flepi_slot_index="1", + filetype="seed", + liketype="chimeric" + ) + >>> print(filename) + "experiment/002/normal/final/789.csv" + """ prefix = f"{flepi_prefix}/{resume_run_index}" inference_filepath_suffix = f"{liketype}/final" index = flepi_slot_index @@ -511,15 +722,30 @@ def create_resume_input_filename( ) -def get_filetype_for_resume(resume_discard_seeding: str, flepi_block_index: str) -> List[str]: +def get_filetype_for_resume(resume_discard_seeding: str, flepi_block_index: str) -> list[str]: """ Retrieves a list of parquet file types that are relevant for resuming a process based on - specific environment variable settings. This function dynamically determines the list + specific environment variable settings. + This function dynamically determines the list based on the current operational context given by the environment. - The function checks two environment variables: - - `resume_discard_seeding`: Determines whether seeding-related file types should be included. - - `flepi_block_index`: Determines a specific operational mode or block of the process. + Args: + resume_discard_seeding: Determines whether seeding-related file types should be included (str). + flepi_block_index: Determines a specific operational mode or block of the process (str). + + Returns: + List of file types. + + Examples: + Determine file types for block index 1 with seeding data NOT discarded: + >>> filetypes = get_filetype_for_resume(resume_discard_seeding="false", flepi_block_index="1") + >>> print(filetypes) + ["seed", "spar", "snpi", "hpar", "hnpi", "init"] + + Determine file types for block index 2 with seeding data discarded: + >>> filtypes = get_filetype_for_resume(resume_discard_seeding="true", flepi_block_index="2") + >>> print(filetypes) + ["seed", "spar", "snpi", "hpar", "hnpi", "host", "llik", "init"] """ if flepi_block_index == "1": if resume_discard_seeding == "true": @@ -538,12 +764,25 @@ def create_resume_file_names_map( flepi_slot_index, flepi_run_index, last_job_output, -) -> Dict[str, str]: +) -> dict[str, str]: """ Generates a mapping of input file names to output file names for a resume process based on parquet file types and environmental conditions. The function adjusts the file name mappings based on the operational block index and the location of the last job output. + Args: + resume_discard_seeding: Determines whether seeding-related file types should be included. + flepi_block_index: Determines a specific operational mode or block of the process. + resume_run_index: Resume run index. + flepi_prefix: File prefix. + flepi_slot_index: Index of the slot. + flepi_run_index: flepiMoP run index. + last_job_output: Adjusts the keys in the mapping to be prefixed with this path. + + Returns: + A dictionary where keys are input file paths and values are corresponding + output file paths. + The mappings depend on: - Parquet file types appropriate for resuming a process, as determined by the environment. - Whether the files are for 'global' or 'chimeric' types, as these liketypes influence the @@ -553,15 +792,32 @@ def create_resume_file_names_map( - The presence and value of 'LAST_JOB_OUTPUT' environment variable, which if set to an S3 path, adjusts the keys in the mapping to be prefixed with this path. - Returns: - Dict[str, str]: A dictionary where keys are input file paths and values are corresponding - output file paths. The paths may be modified by the 'LAST_JOB_OUTPUT' if it - is set and points to an S3 location. - Raises: No explicit exceptions are raised within the function, but it relies heavily on external functions and environment variables which if improperly configured could lead to unexpected behavior. + + Examples: + Generate a mapping of file names for a given resume process: + >>> file_names_map = create_resume_file_names_map( + resume_discard_seeding="false", + flepi_block_index="1", + resume_run_index="1", + flepi_prefix="model_output/run_id/", + flepi_slot_index="1", + flepi_run_index="test_run", + last_job_output="s3://bucket/path/") + >>> print(file_names_map) + { + 's3://bucket/path/model_output/run_id/1_type1_global_1.in': 'model_output/run_id/test_run_type1_global_1_1.out', + 's3://bucket/path/model_output/run_id/1_type1_chimeric_1.in': 'model_output/run_id/test_run_type1_chimeric_1_1.out', + 's3://bucket/path/model_output/run_id/1_type2_global_1.in': 'model_output/run_id/test_run_type2_global_1_1.out', + 's3://bucket/path/model_output/run_id/1_type2_chimeric_1.in': 'model_output/run_id/test_run_type2_chimeric_1_1.out' + } + # Note: this output is toy output implemented with toy file names. + + Notes: + - The paths may be modified by the 'LAST_JOB_OUTPUT' if it is set and points to an S3 location. """ file_types = get_filetype_for_resume( resume_discard_seeding=resume_discard_seeding, flepi_block_index=flepi_block_index @@ -597,21 +853,22 @@ def create_resume_file_names_map( return resume_file_name_mapping -def download_file_from_s3(name_map: Dict[str, str]) -> None: +def download_file_from_s3(name_map: dict[str, str]) -> None: """ Downloads files from AWS S3 based on a mapping of S3 URIs to local file paths. The function checks if the directory for the first output file exists and creates it if necessary. It then iterates over each S3 URI in the provided mapping, downloads the file to the corresponding local path, and handles errors if the S3 URI format is incorrect or if the download fails. - Parameters: - name_map (Dict[str, str]): A dictionary where keys are S3 URIs (strings) and values - are the local file paths (strings) where the files should - be saved. + Args: + name_map: + A dictionary where keys are S3 URIs (strings) and values + are the local file paths (strings) where the files should + be saved. Returns: - None: This function does not return a value; its primary effect is the side effect of - downloading files and potentially creating directories. + This function does not return a value; its primary effect is the side effect of + downloading files and potentially creating directories. Raises: ValueError: If an S3 URI does not start with 's3://', indicating an invalid format. @@ -660,21 +917,16 @@ def download_file_from_s3(name_map: Dict[str, str]) -> None: print("Could not download file from s3") -def move_file_at_local(name_map: Dict[str, str]) -> None: +def move_file_at_local(name_map: dict[str, str]) -> None: """ Moves files locally according to a given mapping. - This function takes a dictionary where the keys are source file paths and the values are destination file paths. It ensures that the destination directories exist and then copies the files from the source paths to the destination paths. - Parameters: - name_map (Dict[str, str]): A dictionary mapping source file paths to - destination file paths. - - Returns: - None + Args: + name_map: A dictionary mapping source file paths to destination file paths. """ for src, dst in name_map.items(): os.path.makedirs(os.path.dirname(dst), exist_ok=True)