diff --git a/examples/saber_config_example.yml b/examples/saber_config_example.yml new file mode 100644 index 0000000..6900c6c --- /dev/null +++ b/examples/saber_config_example.yml @@ -0,0 +1,18 @@ +# file paths to input data +workdir: '' + +x_fdc_train: '' +x_fdc_all: '' + +drain_table: '' +gauge_table: '' +regulate_table: '' + +drain_gis: '' +gauge_gis: '' + +gauge_data: '' +hindcast_zarr: '' + +# options for processing data +n_processes: 1 diff --git a/examples/saber_script_long.py b/examples/saber_script_long.py deleted file mode 100755 index 957dcd3..0000000 --- a/examples/saber_script_long.py +++ /dev/null @@ -1,120 +0,0 @@ -import logging -from multiprocessing import Pool - -import geopandas as gpd -import numpy as np -import pandas as pd - -import saber - -np.seterr(all="ignore") - -logging.basicConfig( - level=logging.INFO, - filename='', - filemode='a', - datefmt='%Y-%m-%d %X', - format='%(asctime)s: %(name)s - %(levelname)s - %(message)s' -) - -if __name__ == "__main__": - logger = logging.getLogger(__name__) - - # USER INPUTS - POPULATE THESE PATHS - workdir = '' - x_fdc_train = '' - x_fdc_all = '' - drain_gis = '' - gauge_gis = '' - gauge_data = '' - hindcast_zarr = '' - n_processes = 1 - # END USER INPUTS - - logger.info('Generate Clusters') - x_fdc_train = pd.read_parquet(x_fdc_train).values - x_fdc_all = pd.read_parquet(x_fdc_all) - saber.cluster.generate(workdir, x=x_fdc_train) - saber.cluster.summarize_fit(workdir) - saber.cluster.calc_silhouette(workdir, x=x_fdc_train, n_clusters=range(2, 10)) - - logger.info('Create Plots') - saber.cluster.plot_clusters(workdir, x=x_fdc_train) - saber.cluster.plot_centers(workdir) - saber.cluster.plot_fit_metrics(workdir) - saber.cluster.plot_silhouettes(workdir) - - # After this step, you should evaluate the many tables and figures that were generated by the clustering steps. - # Determine the number of clusters which best represents the modeled discharge data. - - n_clusters = 5 - saber.cluster.predict_labels(workdir, n_clusters, x=x_fdc_all) - - # Generate assignments table - logger.info('Generate Assignment Table') - assign_df = saber.assign.generate(workdir) - - # Assign gauged basins - logger.info('Assign Gauged Basins') - assign_df = saber.assign.assign_gauged(assign_df) - gauged_mids = assign_df[assign_df[saber.io.gid_col].notna()][saber.io.mid_col].values - - with Pool(20) as p: - logger.info('Assign by Hydraulic Connectivity') - df_prop_down = pd.concat(p.starmap(saber.assign.map_propagate, [(assign_df, x, 'down') for x in gauged_mids])) - df_prop_up = pd.concat(p.starmap(saber.assign.map_propagate, [(assign_df, x, 'up') for x in gauged_mids])) - df_prop = pd.concat([df_prop_down, df_prop_up]).reset_index(drop=True) - df_prop = pd.concat( - p.starmap(saber.assign.map_resolve_props, [(df_prop, x) for x in df_prop[saber.io.mid_col].unique()]) - ) - - logger.info('Resolve Propagation Assignments') - assign_df = saber.assign.assign_propagation(assign_df, df_prop) - - logger.info('Assign Remaining Basins by Cluster, Spatial, and Physical Decisions') - for cluster_number in range(n_clusters): - logger.info(f'Assigning basins in cluster {cluster_number}') - # limit by cluster number - c_df = assign_df[assign_df[saber.io.clbl_col] == cluster_number] - # keep a list of the unassigned basins in the cluster - mids = c_df[c_df[saber.io.reason_col] == 'unassigned'][saber.io.mid_col].values - # filter cluster dataframe to find only gauged basins - c_df = c_df[c_df[saber.io.gid_col].notna()] - assign_df = pd.concat([ - pd.concat(p.starmap(saber.assign.map_assign_ungauged, [(assign_df, c_df, x) for x in mids])), - assign_df[~assign_df[saber.io.mid_col].isin(mids)] - ]).reset_index(drop=True) - - # Cache the completed propagation tables for inspection later - logger.info('Caching Completed Tables') - saber.io.write_table(df_prop, workdir, 'prop_resolved') - saber.io.write_table(df_prop_down, workdir, 'prop_downstream') - saber.io.write_table(df_prop_up, workdir, 'prop_upstream') - saber.io.write_table(assign_df, workdir, 'assign_table') - - logger.info('SABER Assignment Analysis Completed') - - # Recommended Optional - Generate GIS files to visually inspect the assignments - logger.info('Generating GIS files') - drain_gis = gpd.read_file(drain_gis) - saber.gis.map_by_reason(workdir, assign_df, drain_gis) - saber.gis.map_by_cluster(workdir, assign_df, drain_gis) - saber.gis.map_unassigned(workdir, assign_df, drain_gis) - - # Recommended Optional - Compute performance metrics - logger.info('Compute Performance Metrics') - bs_assign_df = saber.bstrap.mp_table(workdir, assign_df, n_processes) - bs_metrics_df = saber.bstrap.mp_metrics(workdir, bs_assign_df, gauge_data, hindcast_zarr, n_processes=n_processes) - saber.bstrap.merge_metrics_and_gis(workdir, gauge_gis, bs_metrics_df) - - # Optional - Compute the corrected simulation data - logger.info('Computing Bias Corrected Simulations') - with Pool(20) as p: - p.starmap( - saber.calibrate.map_saber, - [[mid, asgn_mid, asgn_gid, hindcast_zarr, gauge_data] for mid, asgn_mid, asgn_gid in - np.moveaxis(assign_df[[saber.io.mid_col, saber.io.asgn_mid_col, saber.io.gid_col]].values, 0, 0)] - ) - logger.info('SABER Calibration Completed') - - logger.info('SABER Completed') diff --git a/examples/saber_script_short.py b/examples/saber_script_short.py index 32df67a..d370119 100755 --- a/examples/saber_script_short.py +++ b/examples/saber_script_short.py @@ -1,13 +1,12 @@ import logging -import numpy as np -import pandas as pd - import saber -np.seterr(all="ignore") +# USER INPUTS - POPULATE THESE PATHS +config_file = '/Users/rchales/Projects/saber-hbc/examples/config.yml' # Path to the configuration file +log_path = '' # leave blank to write to console +# END USER INPUTS -log_path = '/Users/rchales/Projects/geoglows_saber/log.log' logging.basicConfig( level=logging.INFO, filename=log_path, @@ -19,40 +18,33 @@ if __name__ == "__main__": logger = logging.getLogger(__name__) - # USER INPUTS - POPULATE THESE PATHS - workdir = '' - x_fdc_train = '' - x_fdc_all = '' - drain_gis = '' - gauge_gis = '' - gauge_data = '' - hindcast_zarr = '' - n_processes = 1 - # END USER INPUTS + # Read the Config File + saber.io.read_config(config_file) + saber.io.init_workdir(overwrite=False) # Generate Clusters and Plots logger.info('Create Clusters and Plots') - saber.cluster.cluster(workdir, x_fdc_train) + saber.cluster.cluster() # Before continuing, review the clustering results and select the best n_clusters for the next function - saber.cluster.predict_labels(workdir, n_clusters=5, x=pd.read_parquet(x_fdc_all)) - - # Generate Assignments Table and Make Assignments - logger.info('Make Assignments of Ungauged Basins to Gauges') - assign_df = saber.assign.generate(workdir) - assign_df = saber.assign.mp_assign_all(workdir, assign_df) - - # Recommended Optional - Generate GIS files to visually inspect the assignments + saber.cluster.predict_labels(n_clusters=5) + + # Generate Assignments Table and Propagate from Gauges, Dams/Reservoirs + logger.info('Generating Assignments Table') + assign_df = saber.table.init() + assign_df = saber.table.mp_prop_gauges(assign_df) + assign_df = saber.table.mp_prop_regulated(assign_df) + saber.io.write_table(assign_df, 'assign_table') # cache results + + # Optional - Compute performance metrics at gauged locations + logger.info('Perform Bootstrap Validation') + bs_assign_df = saber.bstrap.mp_table(assign_df) + bs_metrics_df = saber.bstrap.mp_metrics(bs_assign_df) + saber.bstrap.gauge_metric_map(bs_metrics_df) + + # Optional - Make all assignments + logger.info('Make Assignments') + assign_df = saber.assign.mp_assign(assign_df) logger.info('Generating GIS files') - saber.gis.create_maps(workdir, assign_df, drain_gis) - - # Recommended Optional - Compute performance metrics - logger.info('Compute Performance Metrics') - bs_assign_df = saber.bstrap.mp_table(workdir, assign_df, n_processes) - bs_metrics_df = saber.bstrap.mp_metrics(workdir, bs_assign_df, gauge_data, hindcast_zarr, n_processes=n_processes) - saber.bstrap.merge_metrics_and_gis(workdir, gauge_gis, bs_metrics_df) - - # # Optional - Compute the Corrected Simulation Data - # logger.info('Compute Corrected Simulation Data') - # saber.calibrate.mp_saber(assign_df, hindcast_zarr, gauge_data) + saber.gis.create_maps(assign_df) logger.info('SABER Completed') diff --git a/saber/__init__.py b/saber/__init__.py index 83aca81..39f1bd5 100644 --- a/saber/__init__.py +++ b/saber/__init__.py @@ -1,14 +1,15 @@ import saber.assign +import saber.bstrap import saber.calibrate import saber.cluster import saber.gis import saber.io -import saber.bstrap +import saber.table __all__ = [ - 'io', 'cluster', 'assign', 'gis', 'calibrate', 'bstrap', + 'io', 'table', 'cluster', 'assign', 'gis', 'calibrate', 'bstrap', ] __author__ = 'Riley C. Hales' -__version__ = '0.7.0' +__version__ = '0.8.0' __license__ = 'BSD 3 Clause Clear' diff --git a/saber/assign.py b/saber/assign.py index 3f4fb9c..bdae752 100644 --- a/saber/assign.py +++ b/saber/assign.py @@ -4,125 +4,39 @@ import numpy as np import pandas as pd -from .io import asgn_gid_col -from .io import asgn_mid_col -from .io import down_mid_col -from .io import gid_col -from .io import mid_col -from .io import order_col +from .io import COL_ASN_GID +from .io import COL_ASN_MID +from .io import COL_ASN_REASON +from .io import COL_CID +from .io import COL_GPROP +from .io import COL_GID +from .io import COL_MID +from .io import COL_X +from .io import COL_Y +from .io import get_state from .io import read_table -from .io import reason_col -from .io import write_table -from .io import clbl_col -from .io import x_col -from .io import y_col -__all__ = ['mp_assign_all', 'generate', 'assign_gauged', 'map_propagate', 'map_resolve_props', 'map_assign_ungauged', ] +__all__ = ['mp_assign', 'assign_gauged', 'assign_propagation', 'mp_assign_clusters', ] logger = logging.getLogger(__name__) -def mp_assign_all(workdir: str, assign_df: pd.DataFrame, n_processes: int or None = None) -> pd.DataFrame: +def mp_assign(df: pd.DataFrame = None) -> pd.DataFrame: """ + Assigns basins a gauge for correction which contain a gauge Args: - workdir: path to the working directory - assign_df: the assignments table dataframe with the clustering labels already applied - n_processes: number of processes to use for multiprocessing, passed to Pool - - Returns: - pd.DataFrame - """ - gauged_mids = assign_df[assign_df[gid_col].notna()][mid_col].values - - # assign gauged basins - logger.info('Assigning Gauged Basins') - assign_df = assign_gauged(assign_df) - - logger.info('Assigning by Hydraulic Connectivity') - with Pool(n_processes) as p: - logger.info('Finding Downstream Assignments') - df_prop_down = pd.concat(p.starmap(map_propagate, [(assign_df, x, 'down') for x in gauged_mids])) - logger.info('Caching Downstream Assignments') - write_table(df_prop_down, workdir, 'prop_downstream') - - logger.info('Finding Upstream Assignments') - df_prop_up = pd.concat(p.starmap(map_propagate, [(assign_df, x, 'up') for x in gauged_mids])) - logger.info('Caching Upstream Assignments') - write_table(df_prop_up, workdir, 'prop_upstream') - - logger.info('Resolving Propagation Assignments') - df_prop = pd.concat([df_prop_down, df_prop_up]).reset_index(drop=True) - df_prop = pd.concat(p.starmap(map_resolve_props, [(df_prop, x) for x in df_prop[mid_col].unique()])) - logger.info('Caching Propagation Assignments') - write_table(df_prop, workdir, 'prop_resolved') - - logger.info('Assign Remaining Basins by Cluster, Spatial, and Physical Decisions') - for cluster_number in range(assign_df[clbl_col].max() + 1): - logger.info(f'Assigning basins in cluster {cluster_number}') - # limit by cluster number - c_df = assign_df[assign_df[clbl_col] == cluster_number] - # keep a list of the unassigned basins in the cluster - mids = c_df[c_df[reason_col] == 'unassigned'][mid_col].values - # filter cluster dataframe to find only gauged basins - c_df = c_df[c_df[gid_col].notna()] - assign_df = pd.concat([ - pd.concat(p.starmap(map_assign_ungauged, [(assign_df, c_df, x) for x in mids])), - assign_df[~assign_df[mid_col].isin(mids)] - ]).reset_index(drop=True) - - logger.info('SABER Assignment Analysis Completed') - return assign_df - - -def generate(workdir: str, labels_df: pd.DataFrame = None, drain_table: pd.DataFrame = None, - gauge_table: pd.DataFrame = None, cache: bool = True) -> pd.DataFrame: - """ - Joins the drain_table.csv and gauge_table.csv to create the assign_table.csv - - Args: - workdir: path to the working directory - cache: whether to cache the assign table immediately - labels_df: a dataframe with a column for the assigned cluster label and a column for the model_id - drain_table: the drain table dataframe - gauge_table: the gauge table dataframe + df: the assignments table dataframe Returns: - pd.DataFrame + Copy of df1 with assignments made """ - # read the tables if they are not provided - if labels_df is None: - labels_df = read_table(workdir, 'cluster_labels') - if drain_table is None: - drain_table = read_table(workdir, 'drain_table') - if gauge_table is None: - gauge_table = read_table(workdir, 'gauge_table') - - # enforce correct column data types - labels_df[mid_col] = labels_df[mid_col].astype(str) - drain_table[mid_col] = drain_table[mid_col].astype(str) - drain_table[down_mid_col] = drain_table[down_mid_col].astype(str) - gauge_table[mid_col] = gauge_table[mid_col].astype(str) - - # join the drain_table and gauge_table then join the labels_df - assign_df = pd.merge( - drain_table, - gauge_table, - on=mid_col, - how='outer' - ).merge(labels_df, on=mid_col, how='outer') - assign_df = assign_df.sort_values(by=mid_col).reset_index(drop=True) - - # create new columns asgn_mid_col, asgn_gid_col, reason_col - assign_df[[asgn_mid_col, asgn_gid_col, reason_col]] = ['unassigned', 'unassigned', 'unassigned'] - - if cache: - write_table(assign_df, workdir, 'assign_table') - write_table(assign_df[[mid_col, ]], workdir, 'mid_list') - write_table(assign_df[[gid_col, ]], workdir, 'gid_list') - write_table(assign_df[[mid_col, gid_col]], workdir, 'mid_gid_map') - - return assign_df + if df is None: + df = read_table('assign_table') + df = assign_gauged(df) + df = assign_propagation(df) + df = mp_assign_clusters(df, get_state('n_processes')) + return df def assign_gauged(df: pd.DataFrame) -> pd.DataFrame: @@ -135,112 +49,60 @@ def assign_gauged(df: pd.DataFrame) -> pd.DataFrame: Returns: Copy of df1 with assignments made """ - selector = df[gid_col].notna() - df.loc[selector, asgn_mid_col] = df[mid_col] - df.loc[selector, asgn_gid_col] = df[gid_col] - df.loc[selector, reason_col] = 'gauged' + selector = df[COL_GID].notna() + df.loc[selector, COL_ASN_MID] = df[COL_MID] + df.loc[selector, COL_ASN_GID] = df[COL_GID] + df.loc[selector, COL_ASN_REASON] = 'gauged' return df -def map_propagate(df: pd.DataFrame, start_mid: int, direction: str) -> pd.DataFrame or None: +def assign_propagation(df: pd.DataFrame) -> pd.DataFrame: """ - Meant to be mapped over a dataframe to propagate assignments downstream or upstream + Merge the assignment table and the propagation assignments Args: df: the assignments table dataframe - start_mid: the model_id to start the propagation from - direction: either 'down' or 'up' to indicate the direction of propagation Returns: pd.DataFrame """ - assigned_rows = [] - start_order = df[df[mid_col] == start_mid][order_col].values[0] - - # select the starting row - stream_row = df[df[mid_col] == start_mid] - start_gid = stream_row[asgn_gid_col].values[0] - select_same_order_streams = df[order_col] == start_order - - n_steps = 1 - - # repeat as long as the current row is not empty - try: - while True: - # select the next up or downstream - if direction == 'down': - id_selector = df[mid_col] == stream_row[down_mid_col].values[0] - else: # direction == 'up': - id_selector = df[down_mid_col] == stream_row[mid_col].values[0] - - # select the next row using the ID and Order selectors - stream_row = df[np.logical_and(id_selector, select_same_order_streams)] - - # Break the loop if - # 1. next row is empty -> no upstream/downstream row -> empty stream_row - # 2. next row stream order not a match -> not picked by select_same_order_streams -> empty stream_row - # 3. next row already has an assignment made -> reason column is not empty string - if stream_row.empty or stream_row[reason_col].values[0] != 'unassigned': - break - - # copy the row, modify the assignment columns, and append to the list - new_row = stream_row.copy() - new_row[[asgn_mid_col, asgn_gid_col, reason_col]] = [start_mid, start_gid, f'prop-{direction}-{n_steps}'] - assigned_rows.append(new_row) + # todo + not_gauged = df[df[COL_GID].isna()] - # increment the steps counter - n_steps = n_steps + 1 + # need to write values to the asgn mid and gid columns and also the reasons + return - # Break the loop if - # 1. The next row is an outlet -> no downstream row -> cause error when selecting next row - # 2. we have reach the max number of steps (n_steps -1) - if stream_row[down_mid_col].values[0] != -1 or n_steps >= 8: - break - except Exception as e: - logger.error(f'Error in map_propagate: {e}') - - if len(assigned_rows): - return pd.concat(assigned_rows) - return pd.DataFrame(columns=df.columns) - -def map_resolve_props(df_props: pd.DataFrame, mid: str) -> pd.DataFrame: +def mp_assign_clusters(df: pd.DataFrame, n_processes: int or None = None) -> pd.DataFrame: """ - Resolves the propagation assignments by choosing the assignment with the fewest steps Args: - df_props: the combined upstream and downstream propagation assignments dataframe - mid: the model_id to resolve the propagation assignments for + df: the assignments table dataframe with the clustering labels already applied + n_processes: number of processes to use for multiprocessing, passed to Pool Returns: pd.DataFrame """ - df_mid = df_props[df_props[mid_col] == mid].copy() - # parse the reason statement into number of steps and prop up or downstream - df_mid[['direction', 'n_steps']] = pd.DataFrame( - df_props[reason_col].apply(lambda x: x.split('-')[1:]).to_list()) - df_mid['n_steps'] = df_mid['n_steps'].astype(float) - # sort by n_steps then by reason - df_mid = df_mid.sort_values(['n_steps', 'direction'], ascending=[True, True]) - # return the first row which is the fewest steps and preferring downstream to upstream - return df_mid.head(1).drop(columns=['direction', 'n_steps']) - - -def assign_propagation(df: pd.DataFrame, df_props: pd.DataFrame) -> pd.DataFrame: - """ - Merge the assignment table and the propagation assignments - - Args: - df: the assignments table dataframe - df_props: the combined upstream and downstream propagation assignments dataframe + # todo filter the dataframe + with Pool(n_processes) as p: + logger.info('Assign Basins within Clusters') + for cluster_number in range(df[COL_CID].max() + 1): + logger.info(f'Assigning basins in cluster {cluster_number}') + # limit by cluster number + c_df = df[df[COL_CID] == cluster_number] + # keep a list of the unassigned basins in the cluster + mids = c_df[c_df[COL_ASN_REASON] == 'unassigned'][COL_MID].values + # filter cluster dataframe to find only gauged basins + c_df = c_df[c_df[COL_GID].notna()] + df = pd.concat([ + pd.concat(p.starmap(_map_assign_ungauged, [(df, c_df, x) for x in mids])), + df[~df[COL_MID].isin(mids)] + ]).reset_index(drop=True) - Returns: - pd.DataFrame - """ - return pd.concat([df[~df[mid_col].isin(df_props[mid_col])], df_props]) + return df -def map_assign_ungauged(assign_df: pd.DataFrame, gauges_df: np.array, mid: str) -> pd.DataFrame: +def _map_assign_ungauged(assign_df: pd.DataFrame, gauges_df: np.array, mid: str) -> pd.DataFrame: """ Assigns all possible ungauged basins a gauge that is (1) is closer than any other gauge @@ -256,18 +118,23 @@ def map_assign_ungauged(assign_df: pd.DataFrame, gauges_df: np.array, mid: str) a new row for the given mid with the assignments made """ try: - # todo account for physical parameters if they are included - # find the closest gauge using euclidean distance without accounting for projection/map distortion - mid_x, mid_y = assign_df.loc[assign_df[mid_col] == mid, [x_col, y_col]].head(1).values.flatten() - row_idx_to_assign = pd.Series( - np.sqrt(np.power(gauges_df[x_col] - mid_x, 2) + np.power(gauges_df[y_col] - mid_y, 2)) - ).idxmin() - - asgn_mid, asgn_gid = gauges_df.loc[row_idx_to_assign, [mid_col, gid_col]] - asgn_reason = f'cluster-{gauges_df[clbl_col].values[0]}' - - new_row = assign_df[assign_df[mid_col] == mid].copy() - new_row[[asgn_mid_col, asgn_gid_col, reason_col]] = [asgn_mid, asgn_gid, asgn_reason] + new_row = assign_df[assign_df[COL_MID] == mid].copy() + if new_row[COL_GPROP].values[0] != '': + prop_str = new_row[COL_GPROP].values[0] + asgn_mid = prop_str.split('-')[-1] + asgn_gid = assign_df[assign_df[COL_MID] == asgn_mid][COL_ASN_GID].values[0] + asgn_reason = prop_str + + else: + # find the closest gauge using euclidean distance without accounting for projection/map distortion + mid_x, mid_y = assign_df.loc[assign_df[COL_MID] == mid, [COL_X, COL_Y]].head(1).values.flatten() + row_idx_to_assign = pd.Series( + np.sqrt(np.power(gauges_df[COL_X] - mid_x, 2) + np.power(gauges_df[COL_Y] - mid_y, 2)) + ).idxmin() + asgn_mid, asgn_gid = gauges_df.loc[row_idx_to_assign, [COL_MID, COL_GID]] + asgn_reason = f'cluster-{gauges_df[COL_CID].values[0]}' + + new_row[[COL_ASN_MID, COL_ASN_GID, COL_ASN_REASON]] = [asgn_mid, asgn_gid, asgn_reason] except Exception as e: logger.error(f'Error in map_assign_ungauged: {e}') new_row = pd.DataFrame(columns=assign_df.columns) diff --git a/saber/bstrap.py b/saber/bstrap.py index b0a78d7..7c4730b 100644 --- a/saber/bstrap.py +++ b/saber/bstrap.py @@ -10,47 +10,49 @@ import seaborn as sns from matplotlib import pyplot as plt -from .assign import map_assign_ungauged +from .assign import _map_assign_ungauged from .calibrate import map_saber -from .io import asgn_gid_col -from .io import asgn_mid_col -from .io import dir_valid -from .io import gid_col -from .io import mid_col -from .io import q_mod -from .io import q_obs -from .io import q_sim +from .io import COL_ASN_GID +from .io import COL_ASN_MID +from .io import COL_GID +from .io import COL_MID +from .io import COL_QMOD +from .io import COL_QOBS +from .io import COL_QSIM +from .io import get_dir +from .io import get_state +from .io import read_gis +from .io import read_table from .io import write_table -__all__ = ['mp_table', 'metrics', 'mp_metrics', 'plots', 'merge_metrics_and_gis'] +__all__ = ['mp_table', 'metrics', 'mp_metrics', 'plots', 'gauge_metric_map'] logger = logging.getLogger(__name__) warnings.filterwarnings('ignore') -def mp_table(workdir: str, assign_df: pd.DataFrame, n_processes: int = 1) -> pd.DataFrame: +def mp_table(assign_df: pd.DataFrame) -> pd.DataFrame: """ Generates the assignment table for bootstrap validation by assigning each gauged stream to a different gauged stream following the same rules as all other gauges. Args: - workdir: the project working directory path assign_df: pandas.DataFrame of the assignment table - n_processes: number of processes to use for multiprocessing, passed to Pool Returns: None """ # subset the assign dataframe to only rows which contain gauges & reset the index - assign_df = assign_df[assign_df[gid_col].notna()].reset_index(drop=True) + assign_df = assign_df[assign_df[COL_GID].notna()] + assign_df = assign_df.reset_index(drop=True) - with Pool(n_processes) as p: + with Pool(get_state('n_processes')) as p: bootstrap_assign_df = pd.concat( p.starmap(_map_mp_table, [[assign_df, row_idx] for row_idx in assign_df.index]) ) - write_table(bootstrap_assign_df, workdir, 'assign_table_bootstrap') + write_table(bootstrap_assign_df, 'assign_table_bootstrap') return bootstrap_assign_df @@ -66,10 +68,10 @@ def _map_mp_table(assign_df: pd.DataFrame, row_idx: int) -> pd.DataFrame: Returns: pandas.DataFrame of the row with the new assignment """ - return map_assign_ungauged(assign_df, assign_df.drop(row_idx), assign_df.loc[row_idx][mid_col]) + return _map_assign_ungauged(assign_df, assign_df.drop(row_idx), assign_df.loc[row_idx][COL_MID]) -def metrics(row_idx: int, assign_df: pd.DataFrame, gauge_data: str, hds: str) -> pd.DataFrame | None: +def metrics(row_idx: int, assign_df: pd.DataFrame, gauge_data: str, hindcast_zarr: str) -> pd.DataFrame | None: """ Performs bootstrap validation. @@ -77,7 +79,7 @@ def metrics(row_idx: int, assign_df: pd.DataFrame, gauge_data: str, hds: str) -> row_idx: the row of the assignment table to remove and perform bootstrap validation with assign_df: pandas.DataFrame of the assignment table gauge_data: string path to the directory of observed data - hds: string path to the hindcast streamflow dataset + hindcast_zarr: string path to the hindcast streamflow dataset Returns: None @@ -85,23 +87,23 @@ def metrics(row_idx: int, assign_df: pd.DataFrame, gauge_data: str, hds: str) -> try: row = assign_df.loc[row_idx] corrected_df = map_saber( - row[mid_col], - row[asgn_mid_col], - row[asgn_gid_col], - hds, + row[COL_MID], + row[COL_ASN_MID], + row[COL_ASN_GID], + hindcast_zarr, gauge_data, ) if corrected_df is None: - logger.warning(f'No corrected data for {row[mid_col]}') + logger.warning(f'No corrected data for {row[COL_MID]}') return None - if not (q_mod in corrected_df.columns and q_sim in corrected_df.columns): + if not (COL_QMOD in corrected_df.columns and COL_QSIM in corrected_df.columns): logger.warning(f'Missing adjusted and simulated columns') return None # create a dataframe of original and corrected streamflow that can be used for calculating metrics - metrics_df = pd.read_csv(os.path.join(gauge_data, f'{row[gid_col]}.csv'), index_col=0) - metrics_df.columns = [q_obs, ] + metrics_df = pd.read_csv(os.path.join(gauge_data, f'{row[COL_GID]}.csv'), index_col=0) + metrics_df.columns = [COL_QOBS, ] metrics_df.index = pd.to_datetime(metrics_df.index) metrics_df = pd.merge(corrected_df, metrics_df, how='inner', left_index=True, right_index=True) @@ -110,12 +112,12 @@ def metrics(row_idx: int, assign_df: pd.DataFrame, gauge_data: str, hds: str) -> # if the dataframe is empty (dates did not align or all rows were inf or NaN), return None if metrics_df.empty: - logger.warning(f'Empty dataframe for {row[mid_col]}') + logger.warning(f'Empty dataframe for {row[COL_MID]}') return None - obs_values = metrics_df[q_obs].values.flatten() - sim_values = metrics_df[q_sim].values.flatten() - mod_values = np.squeeze(metrics_df[q_mod].values.flatten()) + obs_values = metrics_df[COL_QOBS].values.flatten() + sim_values = metrics_df[COL_QSIM].values.flatten() + mod_values = np.squeeze(metrics_df[COL_QMOD].values.flatten()) if mod_values.dtype == np.dtype('O'): mod_values = np.array(mod_values.tolist()).astype(np.float64).flatten() @@ -136,58 +138,59 @@ def metrics(row_idx: int, assign_df: pd.DataFrame, gauge_data: str, hds: str) -> 'nse_corr': hs.nse(mod_values, obs_values), 'kge_corr': hs.kge_2012(mod_values, sim_values), - 'reach_id': row[mid_col], - 'gauge_id': row[gid_col], - 'asgn_reach_id': row[asgn_mid_col], + 'reach_id': row[COL_MID], + 'gauge_id': row[COL_GID], + 'asgn_reach_id': row[COL_ASN_MID], }, index=[0, ]) except Exception as e: logger.error(e) - logger.error(f'Failed bootstrap validation for {row[mid_col]}') + logger.error(f'Failed bootstrap validation for {row[COL_MID]}') return None -def mp_metrics(workdir: str, assign_df: pd.DataFrame, gauge_data: str, hds: str, n_processes: int = 1) -> pd.DataFrame: +def mp_metrics(assign_df: pd.DataFrame = None) -> pd.DataFrame: """ Performs bootstrap validation using multiprocessing. Args: - workdir: the project working directory assign_df: pandas.DataFrame of the assignment table - gauge_data: string path to the directory of observed data - hds: string path to the hindcast streamflow dataset - n_processes: number of processes to use for multiprocessing, passed to Pool Returns: None """ + if assign_df is None: + assign_df = read_table('assign_table_bootstrap') + + gauge_data_dir = get_state('gauge_data') + hindcast_zarr = get_state('hindcast_zarr') + # subset the assign dataframe to only rows which contain gauges & reset the index - assign_df = assign_df[assign_df[gid_col].notna()].reset_index(drop=True) + assign_df = assign_df[assign_df[COL_GID].notna()].reset_index(drop=True) - with Pool(n_processes) as p: + with Pool(get_state('n_processes')) as p: metrics_df = pd.concat( p.starmap( metrics, - [[idx, assign_df, gauge_data, hds] for idx in assign_df.index] + [[idx, assign_df, gauge_data_dir, hindcast_zarr] for idx in assign_df.index] ) ) - write_table(metrics_df, workdir, 'bootstrap_metrics') + write_table(metrics_df, 'bootstrap_metrics') return metrics_df -def plots(workdir: str, bdf: pd.DataFrame = None) -> None: +def plots(bdf: pd.DataFrame = None) -> None: """ Args: - workdir: the project working directory bdf: pandas.DataFrame of the bootstrap metrics Returns: None """ if bdf is None: - bdf = pd.read_csv(os.path.join(workdir, dir_valid, 'bootstrap_metrics.csv')) + bdf = pd.read_csv(os.path.join(get_dir('validation'), 'bootstrap_metrics.csv')) for stat in ['me', 'mae', 'rmse', 'nse', 'kge']: fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(8, 4), dpi=1500, tight_layout=True, sharey=True) @@ -233,47 +236,48 @@ def plots(workdir: str, bdf: pd.DataFrame = None) -> None: ax1.axvline(stat_df[f'{stat}_sim'].median(), c='green') ax2.axvline(stat_df[f'{stat}_corr'].median(), c='green') - fig.savefig(os.path.join(workdir, dir_valid, f'bootstrap_{stat}.png')) + fig.savefig(os.path.join(get_dir('validation'), f'bootstrap_{stat}.png')) return -def merge_metrics_and_gis(workdir: str, gdf: gpd.GeoDataFrame or str, bdf: pd.DataFrame = pd.DataFrame or None) -> None: +def gauge_metric_map(bdf: pd.DataFrame = pd.DataFrame or None, gauge_gdf: gpd.GeoDataFrame = None) -> None: """ - Creates a matplolib map of KGE metrics + Creates a geopackge of the gauge locations with added attributes for metrics calculated during the bootstrap + validation. Args: - workdir: the project working directory - gdf: geopandas.GeoDataFrame of the model reaches bdf: pandas.DataFrame of the bootstrap metrics + gauge_gdf: geopandas.GeoDataFrame of the gauge locations Returns: None """ - if isinstance(gdf, str): - gdf = gpd.read_file(gdf) + if gauge_gdf is None: + gauge_gdf = read_gis('gauge_gis') if bdf is None: - bdf = pd.read_csv(os.path.join(workdir, dir_valid, 'bootstrap_metrics.csv')) + bdf = read_table('bootstrap_metrics') - gdf = gdf.merge(bdf, on=gid_col, how='left') + gauge_gdf = gauge_gdf.merge(bdf, on=COL_GID, how='left') for metric in ['me', 'mae', 'rmse', 'kge', 'nse']: # convert from string to float then prepare a column for the results. cols = [f'{metric}_sim', f'{metric}_corr'] - gdf[cols] = gdf[cols].astype(float) - gdf[metric] = np.nan + gauge_gdf[cols] = gauge_gdf[cols].astype(float) + gauge_gdf[metric] = np.nan for metric in ['kge', 'nse']: # want to see increase or difference less than or equal to 0.2 - gdf.loc[gdf[f'{metric}_corr'] > gdf[f'{metric}_sim'], metric] = 2 - gdf.loc[np.abs(gdf[f'{metric}_corr'] - gdf[f'{metric}_sim']) <= 0.2, metric] = 1 - gdf.loc[gdf[f'{metric}_corr'] < gdf[f'{metric}_sim'], metric] = 0 + gauge_gdf.loc[gauge_gdf[f'{metric}_corr'] > gauge_gdf[f'{metric}_sim'], metric] = 2 + gauge_gdf.loc[np.abs(gauge_gdf[f'{metric}_corr'] - gauge_gdf[f'{metric}_sim']) <= 0.2, metric] = 1 + gauge_gdf.loc[gauge_gdf[f'{metric}_corr'] < gauge_gdf[f'{metric}_sim'], metric] = 0 for metric in ['me', 'mae', 'rmse']: # want to see decrease in absolute value or difference less than 10% - gdf.loc[gdf[f'{metric}_corr'].abs() < gdf[f'{metric}_sim'].abs(), metric] = 2 - gdf.loc[np.abs(gdf[f'{metric}_corr'] - gdf[f'{metric}_sim']) < gdf[f'{metric}_sim'].abs() * .1, metric] = 1 - gdf.loc[gdf[f'{metric}_corr'].abs() > gdf[f'{metric}_sim'].abs(), metric] = 0 + gauge_gdf.loc[gauge_gdf[f'{metric}_corr'].abs() < gauge_gdf[f'{metric}_sim'].abs(), metric] = 2 + gauge_gdf.loc[np.abs(gauge_gdf[f'{metric}_corr'] - gauge_gdf[f'{metric}_sim']) < gauge_gdf[ + f'{metric}_sim'].abs() * .1, metric] = 1 + gauge_gdf.loc[gauge_gdf[f'{metric}_corr'].abs() > gauge_gdf[f'{metric}_sim'].abs(), metric] = 0 - gdf.to_file(os.path.join(workdir, dir_valid, 'bootstrap_metrics_all.gpkg'), driver='GPKG') + gauge_gdf.to_file(os.path.join(get_dir('validation'), 'bootstrap_metrics.gpkg'), driver='GPKG') return diff --git a/saber/calibrate.py b/saber/calibrate.py index facde7d..1107003 100644 --- a/saber/calibrate.py +++ b/saber/calibrate.py @@ -9,12 +9,12 @@ from natsort import natsorted from scipy import interpolate, stats -from .io import asgn_mid_col -from .io import gid_col -from .io import mid_col -from .io import q_mod -from .io import q_obs -from .io import q_sim +from .io import COL_ASN_MID +from .io import COL_GID +from .io import COL_MID +from .io import COL_QMOD +from .io import COL_QOBS +from .io import COL_QSIM logger = logging.getLogger(__name__) @@ -47,7 +47,7 @@ def mp_saber(assign_df: pd.DataFrame, hds: str, gauge_data: str, save_dir: str = p.starmap( map_saber, [[mid, asgn_mid, asgn_gid, hds, gauge_data, save_dir] for mid, asgn_mid, asgn_gid in - np.moveaxis(assign_df[[mid_col, asgn_mid_col, gid_col]].values, 0, 0)] + np.moveaxis(assign_df[[COL_MID, COL_ASN_MID, COL_GID]].values, 0, 0)] ) logger.info('Finished SABER Bias Correction') @@ -83,10 +83,10 @@ def map_saber(mid: str, asgn_mid: str, asgn_gid: str, hds: str, gauge_data: str) hds = xarray.open_mfdataset(hds, concat_dim='rivid', combine='nested', parallel=True, engine='zarr') rivids = hds.rivid.values sim_a = hds['Qout'][:, rivids == int(mid)].values - sim_a = pd.DataFrame(sim_a, index=hds['time'].values, columns=[q_sim]) + sim_a = pd.DataFrame(sim_a, index=hds['time'].values, columns=[COL_QSIM]) if asgn_mid != mid: sim_b = hds['Qout'][:, rivids == int(asgn_mid)].values - sim_b = pd.DataFrame(sim_b, index=sim_a.index, columns=[q_sim]) + sim_b = pd.DataFrame(sim_b, index=sim_a.index, columns=[COL_QSIM]) sim_b = sim_b[sim_b.index.year >= 1980] sim_a = sim_a[sim_a.index.year >= 1980] hds.close() @@ -140,8 +140,8 @@ def fdc_mapping(sim_df: pd.DataFrame, obs_df: pd.DataFrame) -> pd.DataFrame: values += to_flow(to_prob(month_sim.values)).tolist() return pd.DataFrame({ - q_mod: values, - q_sim: sim_df.values.flatten(), + COL_QMOD: values, + COL_QSIM: sim_df.values.flatten(), }, index=dates).sort_index() @@ -227,16 +227,16 @@ def sfdc_mapping(sim_flow_a: pd.DataFrame, obs_flow_a: pd.DataFrame, sim_flow_b: # compute the flow duration curves if drop_outliers: - sim_fdc_a = calc_fdc(_drop_outliers_by_zscore(sim_flow_a, threshold=outlier_threshold), col_name=q_sim) - sim_fdc_b = calc_fdc(_drop_outliers_by_zscore(sim_flow_b, threshold=outlier_threshold), col_name=q_sim) - obs_fdc = calc_fdc(_drop_outliers_by_zscore(obs_flow_a, threshold=outlier_threshold), col_name=q_obs) + sim_fdc_a = calc_fdc(_drop_outliers_by_zscore(sim_flow_a, threshold=outlier_threshold), col_name=COL_QSIM) + sim_fdc_b = calc_fdc(_drop_outliers_by_zscore(sim_flow_b, threshold=outlier_threshold), col_name=COL_QSIM) + obs_fdc = calc_fdc(_drop_outliers_by_zscore(obs_flow_a, threshold=outlier_threshold), col_name=COL_QOBS) else: - sim_fdc_a = calc_fdc(sim_flow_a, col_name=q_sim) - sim_fdc_b = calc_fdc(sim_flow_b, col_name=q_sim) - obs_fdc = calc_fdc(obs_flow_a, col_name=q_obs) + sim_fdc_a = calc_fdc(sim_flow_a, col_name=COL_QSIM) + sim_fdc_b = calc_fdc(sim_flow_b, col_name=COL_QSIM) + obs_fdc = calc_fdc(obs_flow_a, col_name=COL_QOBS) # calculate the scalar flow duration curve (at point A with simulated and observed data) - scalar_fdc = calc_sfdc(sim_fdc_a[q_sim], obs_fdc[q_obs]) + scalar_fdc = calc_sfdc(sim_fdc_a[COL_QSIM], obs_fdc[COL_QOBS]) if filter_scalar_fdc: scalar_fdc = scalar_fdc[scalar_fdc['p_exceed'].between(filter_range[0], filter_range[1])] @@ -269,7 +269,7 @@ def sfdc_mapping(sim_flow_a: pd.DataFrame, obs_flow_a: pd.DataFrame, sim_flow_b: response = pd.DataFrame(data=np.transpose([qb_adjusted, qb_original]), index=sim_flow_b.index.to_list(), - columns=(q_mod, q_sim)) + columns=(COL_QMOD, COL_QSIM)) if metadata: response['scalars'] = scalars response['p_exceed'] = p_exceed diff --git a/saber/cluster.py b/saber/cluster.py index 9ddbd74..41b3438 100755 --- a/saber/cluster.py +++ b/saber/cluster.py @@ -14,10 +14,10 @@ from sklearn.cluster import MiniBatchKMeans from sklearn.metrics import silhouette_samples -from .io import _find_model_files -from .io import clbl_col -from .io import mid_col -from .io import get_table_path +from .io import COL_CID +from .io import COL_MID +from .io import list_cluster_files +from .io import get_dir from .io import read_table from .io import write_table @@ -32,37 +32,39 @@ logger = logging.getLogger(__name__) -def cluster(workdir: str, train_df: str) -> None: +def cluster(plot: bool = False) -> None: """ Train k-means cluster models, calculate fit metrics, and generate plots Args: - workdir: path to the project directory - train_df: path to the prepared FDC data for training cluster models + plot: boolean flag to indicate whether plots should be generated after clustering Returns: None """ logger.info('Generate Clusters') - x_fdc_train = pd.read_parquet(train_df, engine='fastparquet').values - generate(workdir, x=x_fdc_train) - summarize_fit(workdir) + + x_fdc_train = read_table("x_fdc_train").values + generate(x=x_fdc_train) + summarize_fit() # calc_silhouette(workdir, x=x_fdc_train, n_clusters=range(2, 10)) + if not plot: + return + logger.info('Create Plots') - plot_clusters(workdir, x=x_fdc_train) - plot_centers(workdir) - plot_fit_metrics(workdir) + plot_clusters(x=x_fdc_train) + plot_centers() + plot_fit_metrics() # plot_silhouettes(workdir) return -def generate(workdir: str, x: np.ndarray = None, max_clusters: int = 13) -> None: +def generate(x: np.ndarray = None, max_clusters: int = 13) -> None: """ Trains scikit-learn MiniBatchKMeans models and saves as pickle Args: - workdir: path to the project directory x: a numpy array of the prepared FDC data max_clusters: maximum number of clusters to train @@ -70,52 +72,51 @@ def generate(workdir: str, x: np.ndarray = None, max_clusters: int = 13) -> None None """ if x is None: - x = read_table(workdir, 'hindcast_fdc_trans').values + x = read_table('x_fdc_train').values # build the kmeans model for a range of cluster numbers for n_clusters in range(2, max_clusters + 1): logger.info(f'Clustering n={n_clusters}') kmeans = MiniBatchKMeans(n_clusters=n_clusters, init='k-means++', n_init=100) kmeans.fit_predict(x) - joblib.dump(kmeans, os.path.join(workdir, 'clusters', f'kmeans-{n_clusters}.pickle')) + joblib.dump(kmeans, os.path.join(get_dir('clusters'), f'kmeans-{n_clusters}.pickle')) return -def predict_labels(workdir: str, n_clusters: int, x: pd.DataFrame) -> pd.DataFrame: +def predict_labels(n_clusters: int, x: pd.DataFrame = None) -> pd.DataFrame: """ Predict the cluster labels for a set number of FDCs Args: - workdir: path to the project directory n_clusters: number of cluster model to use for prediction x: A dataframe with 1 row per FDC (stream) and 1 column per FDC value. Index is the stream's ID. Returns: None """ - model = joblib.load(os.path.join(workdir, 'clusters', f'kmeans-{n_clusters}.pickle')) + if x is None: + x = read_table('x_fdc_all') + + model = joblib.load(os.path.join(get_dir('clusters'), f'kmeans-{n_clusters}.pickle')) labels_df = pd.DataFrame( np.transpose([model.predict(x.values), x.index]), - columns=[clbl_col, mid_col] + columns=[COL_CID, COL_MID] ) - write_table(labels_df, workdir, 'cluster_labels') + write_table(labels_df, 'cluster_table') return labels_df -def summarize_fit(workdir: str) -> None: +def summarize_fit() -> None: """ Generate a summary of the clustering results save the centers and labels to parquet - Args: - workdir: path to the project directory - Returns: None """ summary = {'number': [], 'inertia': [], 'n_iter': []} labels = [] - for model_file in _find_model_files(workdir, n_clusters='all'): + for model_file in list_cluster_files(n_clusters='all'): logger.info(f'Post Processing {os.path.basename(model_file)}') kmeans = joblib.load(model_file) n_clusters = int(kmeans.n_clusters) @@ -124,9 +125,7 @@ def summarize_fit(workdir: str) -> None: # save cluster centroids to table - columns are the cluster number, rows are the centroid FDC values write_table( pd.DataFrame(np.transpose(kmeans.cluster_centers_), columns=np.array(range(n_clusters)).astype(str)), - workdir, - f'cluster_centers_{n_clusters}' - ) + f'cluster_centers_{n_clusters}') # save the summary stats from this model summary['number'].append(n_clusters) @@ -136,17 +135,16 @@ def summarize_fit(workdir: str) -> None: # save the summary results as a csv sum_df = pd.DataFrame(summary) sum_df['knee'] = KneeLocator(summary['number'], summary['inertia'], curve='convex', direction='decreasing').knee - write_table(sum_df, workdir, 'cluster_metrics') + write_table(sum_df, 'cluster_metrics') return -def calc_silhouette(workdir: str, x: np.ndarray, n_clusters: int or Iterable = 'all', samples: int = 75_000) -> None: +def calc_silhouette(x: np.ndarray, n_clusters: int or Iterable = 'all', samples: int = 75_000) -> None: """ Calculate the silhouette score for the given number of clusters Args: - workdir: path to the project directory x: a numpy array of the prepared FDC data n_clusters: the number of clusters to calculate the silhouette score for samples: the number of samples to use for the silhouette score calculation @@ -155,14 +153,14 @@ def calc_silhouette(workdir: str, x: np.ndarray, n_clusters: int or Iterable = ' None """ if x is None: - x = read_table(workdir, 'hindcast_fdc_trans').values + x = read_table('hindcast_fdc_trans').values fdc_df = pd.DataFrame(x) summary = {'number': [], 'silhouette': []} random_shuffler = np.random.default_rng() - for model_file in _find_model_files(workdir, n_clusters): + for model_file in list_cluster_files(n_clusters): logger.info(f'Calculating Silhouettes for {os.path.basename(model_file)}') kmeans = joblib.load(model_file) @@ -181,24 +179,23 @@ def calc_silhouette(workdir: str, x: np.ndarray, n_clusters: int or Iterable = ' ss_df['silhouette'] = silhouette_samples(ss_df.drop(columns='label').values, ss_df['label'].values, n_jobs=-1) ss_df['silhouette'] = ss_df['silhouette'].round(3) ss_df.columns = ss_df.columns.astype(str) - write_table(ss_df, workdir, f'cluster_sscores_{kmeans.n_clusters}') + write_table(ss_df, f'cluster_sscores_{kmeans.n_clusters}') # save the summary stats from this model summary['number'].append(kmeans.n_clusters) summary['silhouette'].append(ss_df['silhouette'].mean()) # save the summary stats - write_table(pd.DataFrame(summary), workdir, 'cluster_sscores') + write_table(pd.DataFrame(summary), 'cluster_sscores') return -def plot_clusters(workdir: str, x: np.ndarray = None, n_clusters: int or Iterable = 'all', +def plot_clusters(x: np.ndarray = None, n_clusters: int or Iterable = 'all', max_cols: int = 3, plt_width: int = 2, plt_height: int = 2, n_lines: int = 2_500) -> None: """ Generate figures of the clustered FDC's Args: - workdir: path to the project directory x: a numpy array of the prepared FDC data n_clusters: number of clusters to create figures for max_cols: maximum number of columns (subplots) in the figure @@ -210,7 +207,7 @@ def plot_clusters(workdir: str, x: np.ndarray = None, n_clusters: int or Iterabl None """ if x is None: - x = read_table(workdir, 'hindcast_fdc_trans').values + x = read_table('hindcast_fdc_trans').values size = x.shape[1] x_values = np.linspace(0, size, 5) @@ -218,7 +215,7 @@ def plot_clusters(workdir: str, x: np.ndarray = None, n_clusters: int or Iterabl random_shuffler = np.random.default_rng() - for model_file in _find_model_files(workdir, n_clusters): + for model_file in list_cluster_files(n_clusters): logger.info(f'Plotting Clusters {os.path.basename(model_file)}') # load the model and calculate @@ -256,7 +253,7 @@ def plot_clusters(workdir: str, x: np.ndarray = None, n_clusters: int or Iterabl for ax in fig.axes[n_clusters:]: ax.axis('off') - fig.savefig(os.path.join(workdir, 'clusters', f'figure-clusters-{n_clusters}.png')) + fig.savefig(os.path.join(get_dir('clusters'), f'figure-clusters-{n_clusters}.png')) plt.close(fig) return @@ -282,7 +279,7 @@ def plot_silhouettes(workdir: str, plt_width: int = 3, plt_height: int = 3) -> N logger.info(f'Generating Silhouette Diagram: {os.path.basename(sscore_table)}') n_clusters = int(sscore_table.split('_')[-1].split('.')[0]) sscore_df = pd.read_parquet(sscore_table, engine='fastparquet') - centers_df = read_table(workdir, f'cluster_centers_{n_clusters}') + centers_df = read_table(f'cluster_centers_{n_clusters}') mean_ss = sscore_df['silhouette'].mean() # initialize the figure @@ -341,12 +338,11 @@ def plot_silhouettes(workdir: str, plt_width: int = 3, plt_height: int = 3) -> N return -def plot_centers(workdir: str, plt_width: int = 2, plt_height: int = 2, max_cols: int = 3) -> None: +def plot_centers(plt_width: int = 2, plt_height: int = 2, max_cols: int = 3) -> None: """ Plot the cluster centers for each cluster. Args: - workdir: path to the project directory plt_width: width of each subplot in inches plt_height: height of each subplot in inches max_cols: maximum number of columns of subplots in the figure @@ -356,7 +352,7 @@ def plot_centers(workdir: str, plt_width: int = 2, plt_height: int = 2, max_cols """ logger.info('Plotting Cluster Centers') - clusters_dir = os.path.join(workdir, 'clusters') + clusters_dir = get_dir('clusters') for n_clusters in [4, 7, 10, 13]: # count number of files to plot @@ -398,12 +394,11 @@ def plot_centers(workdir: str, plt_width: int = 2, plt_height: int = 2, max_cols return -def plot_fit_metrics(workdir: str, plt_width: int = 4, plt_height: int = 4) -> None: +def plot_fit_metrics(plt_width: int = 4, plt_height: int = 4) -> None: """ Plot the cluster metrics, inertia and silhouette score, vs number of clusters Args: - workdir: path to the project directory plt_width: width of each subplot in inches plt_height: height of each subplot in inches @@ -412,11 +407,15 @@ def plot_fit_metrics(workdir: str, plt_width: int = 4, plt_height: int = 4) -> N """ logger.info('Plotting Cluster Fit Metrics') - clusters_dir = os.path.join(workdir, 'clusters') + clusters_dir = get_dir('clusters') + + df = read_table('cluster_metrics') + + try: + df = df.merge(read_table('cluster_sscores'), on='number', how='outer') + except FileNotFoundError: + pass - df = read_table(workdir, 'cluster_metrics') - if os.path.exists(get_table_path(workdir, 'cluster_sscores')): - df = df.merge(read_table(workdir, 'cluster_sscores'), on='number', how='outer') df['number'] = df['number'].astype(int) df['inertia'] = df['inertia'].astype(float) diff --git a/saber/gis.py b/saber/gis.py index bf7f0c4..5409e03 100644 --- a/saber/gis.py +++ b/saber/gis.py @@ -1,5 +1,5 @@ -import os import logging +import os import contextily as cx import geopandas as gpd @@ -8,33 +8,37 @@ import numpy as np import pandas as pd -from .io import gid_col -from .io import metric_nc_name_list -from .io import mid_col -from .io import reason_col -from .io import dir_gis -from .io import clbl_col +from .io import COL_ASN_REASON +from .io import COL_CID +from .io import COL_GID +from .io import COL_MID +from .io import get_dir +from .io import read_gis +from .io import read_table -__all__ = ['create_maps', 'map_by_reason', 'map_by_cluster', 'map_unassigned', 'map_ids', - 'validation_maps'] +__all__ = ['create_maps', 'map_by_reason', 'map_by_cluster', 'map_unassigned', 'map_ids', ] logger = logging.getLogger(__name__) -def create_maps(workdir: str, assign_df: pd.DataFrame, drain_gis: str or gpd.GeoDataFrame, prefix: str = '') -> None: +def create_maps(assign_df: pd.DataFrame = None, drain_gis: gpd.GeoDataFrame = None, prefix: str = '') -> None: """ Runs all the clip functions which create subsets of the drainage lines GIS dataset based on how they were assigned for bias correction. Args: - workdir: the path to the working directory for the project assign_df: the assignment table dataframe - drain_gis: path to a drainage line shapefile which can be clipped + drain_gis: a geodataframe of the drainage lines gis dataset prefix: a prefix for names of the outputs to distinguish between data generated in separate instances Returns: None """ + if assign_df is None: + assign_df = read_table('assign_table') + if drain_gis is None: + drain_gis = read_gis('drain_gis') + if type(drain_gis) == str: gdf = gpd.read_file(drain_gis) elif type(drain_gis) == gpd.GeoDataFrame: @@ -42,18 +46,17 @@ def create_maps(workdir: str, assign_df: pd.DataFrame, drain_gis: str or gpd.Geo else: raise TypeError(f'Invalid type for drain_gis: {type(drain_gis)}') - map_by_reason(workdir, assign_df, gdf, prefix) - map_by_cluster(workdir, assign_df, gdf, prefix) - map_unassigned(workdir, assign_df, gdf, prefix) + map_by_reason(assign_df, gdf, prefix) + map_by_cluster(assign_df, gdf, prefix) + map_unassigned(assign_df, gdf, prefix) return -def map_by_reason(workdir: str, assign_df: pd.DataFrame, drain_gis: str or gpd.GeoDataFrame, prefix: str = '') -> None: +def map_by_reason(assign_df: pd.DataFrame, drain_gis: str or gpd.GeoDataFrame, prefix: str = '') -> None: """ Creates Geopackage files in workdir/gis_outputs for each unique value in the assignment column Args: - workdir: the path to the working directory for the project assign_df: the assignment table dataframe drain_gis: path to a drainage line shapefile which can be clipped prefix: a prefix for names of the outputs to distinguish between data generated at separate instances @@ -66,25 +69,24 @@ def map_by_reason(workdir: str, assign_df: pd.DataFrame, drain_gis: str or gpd.G drain_gis = gpd.read_file(drain_gis) # get the unique list of assignment reasons - for reason in assign_df[reason_col].unique(): + for reason in assign_df[COL_ASN_REASON].unique(): logger.info(f'Creating GIS output for group: {reason}') - selector = drain_gis[mid_col].astype(str).isin(assign_df[assign_df[reason_col] == reason][mid_col]) + selector = drain_gis[COL_MID].astype(str).isin(assign_df[assign_df[COL_ASN_REASON] == reason][COL_MID]) subset = drain_gis[selector] name = f'{f"{prefix}_" if prefix else ""}assignments_{reason}.gpkg' if subset.empty: logger.debug(f'Empty filter: No streams are assigned for {reason}') continue else: - subset.to_file(os.path.join(workdir, dir_gis, name)) + subset.to_file(os.path.join(get_dir('gis'), name)) return -def map_by_cluster(workdir: str, assign_table: pd.DataFrame, drain_gis: str, prefix: str = '') -> None: +def map_by_cluster(assign_table: pd.DataFrame, drain_gis: str, prefix: str = '') -> None: """ Creates Geopackage files in workdir/gis_outputs of the drainage lines based on the fdc cluster they were assigned to Args: - workdir: the path to the working directory for the project assign_table: the assignment table dataframe drain_gis: path to a drainage line shapefile which can be clipped prefix: optional, a prefix to prepend to each created file's name @@ -94,22 +96,21 @@ def map_by_cluster(workdir: str, assign_table: pd.DataFrame, drain_gis: str, pre """ if isinstance(drain_gis, str): drain_gis = gpd.read_file(drain_gis) - for num in assign_table[clbl_col].unique(): + for num in assign_table[COL_CID].unique(): logger.info(f'Creating GIS output for cluster: {num}') - gdf = drain_gis[drain_gis[mid_col].astype(str).isin(assign_table[assign_table[clbl_col] == num][mid_col])] + gdf = drain_gis[drain_gis[COL_MID].astype(str).isin(assign_table[assign_table[COL_CID] == num][COL_MID])] if gdf.empty: logger.debug(f'Empty filter: No streams are assigned to cluster {num}') continue - gdf.to_file(os.path.join(workdir, dir_gis, f'{prefix}{"_" if prefix else ""}cluster-{int(num)}.gpkg')) + gdf.to_file(os.path.join(get_dir('gis'), f'{prefix}{"_" if prefix else ""}cluster-{int(num)}.gpkg')) return -def map_unassigned(workdir: str, assign_table: pd.DataFrame, drain_gis: str, prefix: str = '') -> None: +def map_unassigned(assign_table: pd.DataFrame, drain_gis: str, prefix: str = '') -> None: """ Creates Geopackage files in workdir/gis_outputs of the drainage lines which haven't been assigned a gauge yet Args: - workdir: the path to the working directory for the project assign_table: the assignment table dataframe drain_gis: path to a drainage line shapefile which can be clipped prefix: optional, a prefix to prepend to each created file's name @@ -120,22 +121,21 @@ def map_unassigned(workdir: str, assign_table: pd.DataFrame, drain_gis: str, pre logger.info('Creating GIS output for unassigned basins') if isinstance(drain_gis, str): drain_gis = gpd.read_file(drain_gis) - ids = assign_table[assign_table[reason_col] == 'unassigned'][mid_col].values - subset = drain_gis[drain_gis[mid_col].astype(str).isin(ids)] + ids = assign_table[assign_table[COL_ASN_REASON] == 'unassigned'][COL_MID].values + subset = drain_gis[drain_gis[COL_MID].astype(str).isin(ids)] if subset.empty: logger.debug('Empty filter: No streams are unassigned') return - savepath = os.path.join(workdir, dir_gis, f'{prefix}{"_" if prefix else ""}assignments_unassigned.gpkg') + savepath = os.path.join(get_dir('gis'), f'{prefix}{"_" if prefix else ""}assignments_unassigned.gpkg') subset.to_file(savepath) return -def map_ids(workdir: str, ids: list, drain_gis: str, prefix: str = '', id_column: str = mid_col) -> None: +def map_ids(ids: list, drain_gis: str, prefix: str = '', id_column: str = COL_MID) -> None: """ Creates Geopackage files in workdir/gis_outputs of the subset of 'drain_shape' with an ID in the specified list Args: - workdir: path to the project directory ids: any iterable containing a series of model_ids drain_gis: path to the drainage shapefile to be clipped prefix: optional, a prefix to prepend to each created file's name @@ -147,58 +147,11 @@ def map_ids(workdir: str, ids: list, drain_gis: str, prefix: str = '', id_column if isinstance(drain_gis, str): drain_gis = gpd.read_file(drain_gis) name = f'{prefix}{"_" if prefix else ""}id_subset.gpkg' - drain_gis[drain_gis[id_column].isin(ids)].to_file(os.path.join(workdir, dir_gis, name)) - return - - -def validation_maps(workdir: str, gauge_gis: str, val_table: pd.DataFrame = None, prefix: str = '') -> None: - """ - Creates Geopackage files in workdir/gis_outputs of subsets of the gauge_shape. - 1 is the fill gauge shape with added attribute columns for all the computed stats. There are 2 for each of the 5 - validation groups; 1 which shows the gauges included in the validation set and 1 which shows gauges that were - excluded from the validation set. - - Args: - workdir: path to the project directory - val_table: the validation table produced by saber.validate - gauge_gis: path to the gauge locations shapefile - prefix: optional, a prefix to prepend to each created file's name - - Returns: - None - """ - if val_table is None: - val_table = pd.read_csv(os.path.join(workdir, 'validation_runs', 'val_table.csv')) - save_dir = os.path.join(workdir, 'gis') - - # merge gauge table with the validation table - gdf = gpd.read_file(gauge_gis) - gdf = gdf.merge(val_table, on=gid_col, how='inner') - gdf.to_file(os.path.join(save_dir, 'gauges_with_validation_stats.gpkg')) - - core_columns = [mid_col, gid_col, 'geometry'] - - # generate gis files by validation run, by stat, and by included/excluded - for val_set in ('50', '60', '70', '80', '90'): - for metric in metric_nc_name_list: - # select only columns for the validation run we're iterating on - too complex for filter/regex - cols_to_select = core_columns + [val_set, f'{metric}_{val_set}'] - gdf_sub = gdf[cols_to_select] - gdf_sub = gdf_sub.rename(columns={f'{metric}_{val_set}': metric}) - - name = f'{prefix}{"_" if prefix else ""}valset_{val_set}_{metric}_included.gpkg' - gdf_sub[gdf_sub[val_set] == 1].to_file(os.path.join(save_dir, name)) - - name = f'{prefix}{"_" if prefix else ""}valset_{val_set}_{metric}_excluded.gpkg' - exc = gdf_sub[gdf_sub[val_set] == 0] - exc.to_file(os.path.join(save_dir, name)) - if metric == 'KGE2012': - histomaps(exc, metric, val_set, workdir) - + drain_gis[drain_gis[id_column].isin(ids)].to_file(os.path.join(get_dir('gis'), name)) return -def histomaps(gdf: gpd.GeoDataFrame, metric: str, prct: str, workdir: str) -> None: +def histomaps(gdf: gpd.GeoDataFrame, metric: str, prct: str) -> None: """ Creates a histogram of the KGE2012 values for the validation set @@ -206,12 +159,11 @@ def histomaps(gdf: gpd.GeoDataFrame, metric: str, prct: str, workdir: str) -> No gdf: a GeoDataFrame containing validation metrics metric:name of th emetric to plot prct: Percentile of the validation set used to generate the histogram - workdir: the project working directory Returns: None """ - core_columns = [mid_col, gid_col, 'geometry'] + core_columns = [COL_MID, COL_GID, 'geometry'] # world = gpd.read_file(gpd.datasets.get_path('naturalearth_lowres')) # world.plot(ax=axm, color='white', edgecolor='black') @@ -250,5 +202,5 @@ def histomaps(gdf: gpd.GeoDataFrame, metric: str, prct: str, workdir: str) -> No gdf[core_columns + [metric, ]].to_crs(epsg=3857).plot(metric) cx.add_basemap(ax=axm, zoom=9, source=cx.providers.Esri.WorldTopoMap, attribution='') - fig.savefig(os.path.join(workdir, 'gis', f'{metric}_{prct}.png')) + fig.savefig(os.path.join(get_dir('gis'), f'{metric}_{prct}.png')) return diff --git a/saber/io.py b/saber/io.py index 98e8de5..eb76683 100644 --- a/saber/io.py +++ b/saber/io.py @@ -1,159 +1,240 @@ import glob +import logging import os +import shutil from collections.abc import Iterable from typing import List +import geopandas as gpd import pandas as pd +import yaml from natsort import natsorted +logger = logging.getLogger(__name__) + +__all__ = [ + 'read_config', 'init_workdir', 'get_state', 'get_dir', 'read_table', 'write_table', 'read_gis', + 'list_cluster_files', + + 'COL_MID', 'COL_GID', 'COL_RID', 'COL_CID', + 'COL_STRM_ORD', 'COL_X', 'COL_Y', 'COL_MID_DOWN', + 'COL_RPROP', 'COL_GPROP', 'COL_ASN_MID', 'COL_ASN_GID', 'COL_ASN_REASON', + + 'COL_QOBS', 'COL_QMOD', 'COL_QSIM', + 'DIR_TABLES', 'DIR_GIS', 'DIR_CLUSTERS', 'DIR_VALID', 'DIR_LIST', + 'TABLE_ASSIGN', + 'TABLE_CLUSTER_METRICS', 'TABLE_CLUSTER_SSCORES', 'TABLE_CLUSTER_LABELS', 'CLUSTER_COUNT_JSON', + 'TABLE_ASSIGN_BTSTRP', 'TABLE_BTSTRP_METRICS', + + 'GENERATED_TABLE_NAMES_MAP', 'VALID_YAML_KEYS', 'VALID_GIS_NAMES', +] + +# file paths used in this project which should come from the config file +workdir = '' +x_fdc_train = '' +x_fdc_all = '' +drain_table = '' +gauge_table = '' +regulate_table = '' +drain_gis = '' +gauge_gis = '' +gauge_data = '' +hindcast_zarr = '' + +# processing options +n_processes = 1 + +# lists for validating +VALID_YAML_KEYS = {'workdir', + 'x_fdc_train', + 'x_fdc_all', + 'drain_table', + 'gauge_table', + 'regulate_table', + 'drain_gis', + 'gauge_gis', + 'gauge_data', + 'hindcast_zarr', + 'n_processes', } + +VALID_GIS_NAMES = ['drain_gis', 'gauge_gis'] + # assign table and gis_input file required column names -mid_col = 'model_id' -gid_col = 'gauge_id' -clbl_col = 'cluster_label' -asgn_mid_col = 'assigned_model_id' -asgn_gid_col = 'assigned_gauge_id' -down_mid_col = 'downstream_model_id' -reason_col = 'reason' -area_col = 'drain_area' -order_col = 'strahler_order' -x_col = 'x_mod' -y_col = 'y_mod' - -# dataframe columns names -q_obs = 'Qobs' -q_mod = 'Qmod' -q_sim = 'Qsim' - -# name of some files produced by the algorithm -cluster_count_file = 'best-fit-cluster-count.json' -cal_nc_name = 'calibrated_simulated_flow.nc' - -# scaffolded folders -dir_tables = 'tables' -dir_gis = 'gis' -dir_clusters = 'clusters' -dir_valid = 'validation' - -# name of the required input tables and the outputs -table_hindcast = 'hindcast.parquet' -table_hindcast_fdc = 'hindcast_fdc.parquet' -table_hindcast_fdc_trans = 'hindcast_fdc_transformed.parquet' - -table_assign = 'assign_table.parquet' -table_mids = 'mid_table.parquet' -table_gids = 'gid_table.parquet' -table_mid_gid_map = 'mid_gid_map_table.parquet' - -table_assign_bootstrap = 'assign_table_bootstrap.csv' -table_bootstrap_metrics = 'bootstrap_metrics.csv' - -table_drain = 'drain_table.parquet' -table_gauge = 'gauge_table.parquet' - -table_prop_resolved = 'prop_table_resolved.parquet' -table_prop_downstream = 'prop_table_downstream.parquet' -table_prop_upstream = 'prop_table_upstream.parquet' +COL_MID = 'model_id' # model id column name: in drain_table, gauge_table, regulate_table, cluster_table +COL_GID = 'gauge_id' # gauge id column name: in gauge_table +COL_RID = 'reg_id' # regulate id column name: in regulate_table +COL_CID = 'clstr_id' # cluster column name: in cluster_table + +COL_STRM_ORD = 'strahler_order' # strahler order column name: in drain_table +COL_X = 'x_mod' # x coordinate column name: in drain_table +COL_Y = 'y_mod' # y coordinate column name: in drain_table +COL_MID_DOWN = 'downstream_model_id' # downstream model id column name: in drain_table + +COL_RPROP = 'rprop' # regulated stream propagation: created by assign_table +COL_GPROP = 'gprop' # gauged stream propagation: created by assign_table +COL_ASN_MID = 'asgn_mid' # assigned model id column name: in assign_table +COL_ASN_GID = 'asgn_gid' # assigned gauge id column name: in assign_table +COL_ASN_REASON = 'reason' # reason column name: in assign_table + +all_cols = [COL_MID, + COL_GID, + COL_RID, + COL_STRM_ORD, + COL_X, + COL_Y, + COL_RPROP, + COL_GPROP, + COL_CID, + COL_MID_DOWN, + COL_ASN_MID, + COL_ASN_GID, + COL_ASN_REASON, ] + +atable_cols = [COL_ASN_MID, COL_ASN_GID, COL_ASN_REASON, COL_RPROP, COL_GPROP] +atable_cols_defaults = ['unassigned', 'unassigned', 'unassigned', '', ''] + +# discharge dataframe columns names +COL_QOBS = 'Qobs' +COL_QMOD = 'Qmod' +COL_QSIM = 'Qsim' + +# required workdir folders +DIR_TABLES = 'tables' +DIR_GIS = 'gis' +DIR_CLUSTERS = 'clusters' +DIR_VALID = 'validation' +DIR_LIST = [DIR_TABLES, DIR_GIS, DIR_CLUSTERS, DIR_VALID] + +# name of the required input tables +TABLE_ASSIGN = 'assign_table.parquet' # tables generated by the clustering functions -table_cluster_metrics = 'cluster_metrics.csv' -table_cluster_sscores = 'cluster_sscores.csv' -table_cluster_labels = 'cluster_labels.parquet' +TABLE_CLUSTER_METRICS = 'cluster_metrics.csv' +TABLE_CLUSTER_SSCORES = 'cluster_sscores.csv' +TABLE_CLUSTER_LABELS = 'cluster_labels.parquet' +CLUSTER_COUNT_JSON = 'best-fit-cluster-count.json' + +# tables produced by the bootstrap validation process +TABLE_ASSIGN_BTSTRP = 'assign_table_bootstrap.csv' +TABLE_BTSTRP_METRICS = 'bootstrap_metrics.csv' + +GENERATED_TABLE_NAMES_MAP = { + "assign_table": TABLE_ASSIGN, + "assign_table_bootstrap": TABLE_ASSIGN_BTSTRP, + "bootstrap_metrics": TABLE_BTSTRP_METRICS, + "cluster_metrics": TABLE_CLUSTER_METRICS, + "cluster_sscores": TABLE_CLUSTER_SSCORES, + "cluster_table": TABLE_CLUSTER_LABELS, +} + + +def read_config(config: str) -> None: + """ + Read the config file to set paths and values + + Args: + config: path to the config file + + Returns: + None + """ + # open a yml and read to dictionary + with open(config, 'r') as f: + config_dict = yaml.safe_load(f) + + if config_dict is None: + raise ValueError('Config file is empty') + + # set global variables + for key, value in config_dict.items(): + if key not in VALID_YAML_KEYS: + logger.error(f'Ignored invalid key in config file: "{key}". Consult docs for valid keys.') + continue + logger.info(f'Config: {key} = {value}') + globals()[key] = value + + # validate inputs + if not os.path.isdir(workdir): + logger.warning(f'Workspace directory does not exist: {workdir}') + if not os.path.exists(drain_gis): + logger.warning(f'Drainage network GIS file does not exist: {drain_gis}') + if not os.path.exists(gauge_gis): + logger.warning(f'Gauge network GIS file does not exist: {gauge_gis}') + if not os.path.isdir(gauge_data): + logger.warning(f'Gauge data directory does not exist: {gauge_data}') + if not glob.glob(hindcast_zarr): + logger.warning(f'Hindcast zarr directory does not exist or is empty: {hindcast_zarr}') -# metrics computed on validation sets -metric_list = ['ME', 'MAE', 'RMSE', 'NSE', 'KGE (2012)'] -metric_nc_name_list = ['ME', 'MAE', 'RMSE', 'NSE', 'KGE2012'] + return -def scaffold_workdir(path: str, include_validation: bool = True) -> None: +def init_workdir(path: str = None, overwrite: bool = False) -> None: """ Creates the correct directories for a Saber project within the specified directory Args: path: the path to a directory where you want to create workdir subdirectories - include_validation: boolean, indicates whether to create the validation folder + overwrite: boolean flag, delete existing directories and files and recreate the directory structure? Returns: None + + Raises: + NotADirectoryError: if the path is not a directory """ - dir_list = [dir_tables, dir_gis, dir_clusters] + if path is None: + path = workdir + if not os.path.exists(path): - os.mkdir(path) - if include_validation: - dir_list.append(dir_valid) - for d in dir_list: + logger.warning(f'Provided path to workdir does not exist. Attempting to create: {path}') + os.makedirs(path) + elif overwrite: + logger.warning(f'overwrite=True, Deleting existing workdir: {workdir}') + shutil.rmtree(path) + + for d in DIR_LIST: p = os.path.join(path, d) if not os.path.exists(p): os.mkdir(p) return -def get_table_path(workdir: str, table_name: str) -> str: +def get_state(prop) -> int or str: """ - Get the path to a table in the project directory by name + Get a state variable provided by the config or a controlled global variable Args: - workdir: path to the project directory - table_name: name of the table to find a path for + prop: name of the global variable Returns: - Path (str) to the table + value of the global variable + """ + assert prop in globals(), ValueError(f'"{prop}" is not a recognized project state key') + return globals()[prop] - Raises: - ValueError: if the table name is not recognized + +def get_dir(dir_name: str) -> str: """ - if table_name == 'hindcast': - return os.path.join(workdir, dir_tables, table_hindcast) - elif table_name == 'hindcast_fdc': - return os.path.join(workdir, dir_tables, table_hindcast_fdc) - elif table_name == 'hindcast_fdc_trans': - return os.path.join(workdir, dir_tables, table_hindcast_fdc_trans) - - elif table_name == 'assign_table': - return os.path.join(workdir, dir_tables, table_assign) - elif table_name == 'drain_table': - return os.path.join(workdir, dir_tables, table_drain) - elif table_name == 'gauge_table': - return os.path.join(workdir, dir_tables, table_gauge) - elif table_name == 'mid_list': - return os.path.join(workdir, dir_tables, table_mids) - elif table_name == 'gid_list': - return os.path.join(workdir, dir_tables, table_gids) - elif table_name == 'mid_gid_map': - return os.path.join(workdir, dir_tables, table_mid_gid_map) - - elif table_name == 'assign_table_bootstrap': - return os.path.join(workdir, dir_tables, table_assign_bootstrap) - elif table_name == 'bootstrap_metrics': - return os.path.join(workdir, dir_tables, table_bootstrap_metrics) - - elif table_name == 'prop_resolved': - return os.path.join(workdir, dir_tables, table_prop_resolved) - elif table_name == 'prop_downstream': - return os.path.join(workdir, dir_tables, table_prop_downstream) - elif table_name == 'prop_upstream': - return os.path.join(workdir, dir_tables, table_prop_upstream) - - elif table_name == 'cluster_metrics': - return os.path.join(workdir, dir_clusters, table_cluster_metrics) - elif table_name == 'cluster_sscores': - return os.path.join(workdir, dir_clusters, table_cluster_sscores) - elif table_name == 'cluster_labels': - return os.path.join(workdir, dir_clusters, table_cluster_labels) - elif table_name.startswith('cluster_centers_'): # cluster_centers_{n_clusters}.parquet - 1 per cluster - return os.path.join(workdir, dir_clusters, f'{table_name}.parquet') - elif table_name.startswith('cluster_sscores_'): # cluster_sscores_{n_clusters}.parquet - 1 per cluster - return os.path.join(workdir, dir_clusters, f'{table_name}.parquet') + Get the path to a directory within the workspace - else: - raise ValueError(f'Unknown table name: {table_name}') + Args: + dir_name: name of the directory + + Returns: + path to the directory + """ + assert dir_name in [DIR_TABLES, DIR_GIS, DIR_CLUSTERS, DIR_VALID], f'"{dir_name}" is not a valid directory name' + table_path = os.path.join(workdir, dir_name) + if not os.path.exists(table_path): + logger.warning(f'"{dir_name}" directory does not exist. Error imminent: {table_path}') + return table_path -def read_table(workdir: str, table_name: str) -> pd.DataFrame: +def read_table(table_name: str) -> pd.DataFrame: """ Read a table from the project directory by name. Args: - workdir: path to the project directory table_name: name of the table to read Returns: @@ -163,7 +244,7 @@ def read_table(workdir: str, table_name: str) -> pd.DataFrame: FileNotFoundError: if the table does not exist in the correct directory with the correct name ValueError: if the table format is not recognized """ - table_path = get_table_path(workdir, table_name) + table_path = _get_table_path(table_name) if not os.path.exists(table_path): raise FileNotFoundError(f'Table does not exist: {table_path}') @@ -178,14 +259,13 @@ def read_table(workdir: str, table_name: str) -> pd.DataFrame: raise ValueError(f'Unknown table format: {table_format}') -def write_table(table: pd.DataFrame, workdir: str, table_name: str) -> None: +def write_table(df: pd.DataFrame, name: str) -> None: """ Write a table to the correct location in the project directory Args: - table: the pandas DataFrame to write - workdir: the path to the project directory - table_name: the name of the table to write + df: the pandas DataFrame to write + name: the name of the table to write Returns: None @@ -193,24 +273,46 @@ def write_table(table: pd.DataFrame, workdir: str, table_name: str) -> None: Raises: ValueError: if the table format is not recognized """ - table_path = get_table_path(workdir, table_name) + table_path = _get_table_path(name) table_format = os.path.splitext(table_path)[-1] if table_format == '.parquet': - return table.to_parquet(table_path) + return df.to_parquet(table_path) elif table_format == '.feather': - return table.to_feather(table_path) + return df.to_feather(table_path) elif table_format == '.csv': - return table.to_csv(table_path, index=False) + return df.to_csv(table_path, index=False) else: raise ValueError(f'Unknown table format: {table_format}') -def _find_model_files(workdir: str, n_clusters: int or Iterable = 'all') -> List[str]: +def read_gis(gis_name: str) -> gpd.GeoDataFrame: + """ + Read a GIS file from the project directory by name. + + Args: + gis_name: name of the GIS file to read + + Returns: + gpd.GeoDataFrame + + Raises: + FileNotFoundError: if the GIS file does not exist in the correct directory with the correct name + ValueError: if the GIS format is not recognized + """ + assert gis_name in VALID_GIS_NAMES, ValueError(f'"{gis_name}" is not a recognized project state key') + gis_path = globals()[gis_name] + if not os.path.exists(gis_path): + raise FileNotFoundError(f'GIS file "{gis_name}" does not exist at path: {gis_path}') + + return gpd.read_file(gis_path) + + +def list_cluster_files(n_clusters: int or Iterable = 'all') -> List[str]: """ Find all the kmeans model files in the project directory. Args: - workdir: path to the project directory + n_clusters: the number of clusters to find models for. If 'all', all models will be returned Returns: List of paths to the kmeans model files @@ -218,7 +320,7 @@ def _find_model_files(workdir: str, n_clusters: int or Iterable = 'all') -> List Raises: TypeError: if n_clusters is not an int, iterable of int, or 'all' """ - kmeans_dir = os.path.join(workdir, dir_clusters) + kmeans_dir = os.path.join(workdir, DIR_CLUSTERS) if n_clusters == 'all': return natsorted(glob.glob(os.path.join(kmeans_dir, 'kmeans-*.pickle'))) elif isinstance(n_clusters, int): @@ -227,3 +329,28 @@ def _find_model_files(workdir: str, n_clusters: int or Iterable = 'all') -> List return natsorted([os.path.join(kmeans_dir, f'kmeans-{i}.pickle') for i in n_clusters]) else: raise TypeError('n_clusters should be of type int or an iterable') + + +def _get_table_path(table_name: str) -> str: + """ + Get the path to a table in the project directory by name + + Args: + table_name: name of the table to find a path for + + Returns: + Path (str) to the table + + Raises: + ValueError: if the table name is not recognized + """ + if table_name in VALID_YAML_KEYS: + return os.path.join(workdir, globals()[table_name]) + elif table_name in GENERATED_TABLE_NAMES_MAP: + return os.path.join(workdir, DIR_TABLES, GENERATED_TABLE_NAMES_MAP[table_name]) + elif table_name.startswith('cluster_'): + # cluster_centers_{n_clusters}.parquet - 1 per cluster + # cluster_sscores_{n_clusters}.parquet - 1 per cluster + return os.path.join(workdir, DIR_CLUSTERS, f'{table_name}.parquet') + else: + raise ValueError(f'Unknown table name: {table_name}') diff --git a/saber/table.py b/saber/table.py new file mode 100644 index 0000000..1202d3a --- /dev/null +++ b/saber/table.py @@ -0,0 +1,240 @@ +import logging +from multiprocessing import Pool + +import numpy as np +import pandas as pd + +from .io import COL_ASN_GID +from .io import COL_ASN_MID +from .io import COL_GPROP +from .io import COL_GID +from .io import COL_MID +from .io import COL_MID_DOWN +from .io import COL_RID +from .io import COL_RPROP +from .io import COL_STRM_ORD +from .io import all_cols +from .io import atable_cols +from .io import atable_cols_defaults +from .io import read_table +from .io import write_table + +__all__ = ['init', 'mp_prop_gauges', 'mp_prop_regulated'] + +logger = logging.getLogger(__name__) + + +def init(drain_table: pd.DataFrame = None, + gauge_table: pd.DataFrame = None, + reg_table: pd.DataFrame = None, + cluster_table: pd.DataFrame = None, + cache: bool = True) -> pd.DataFrame: + """ + Joins the drain_table.csv and gauge_table.csv to create the assign_table.csv + + Args: + drain_table: the drain table dataframe + gauge_table: the gauge table dataframe + reg_table: the regulatory structure table dataframe + cluster_table: a dataframe with a column for the assigned cluster label and a column for the model_id + cache: whether to cache the assign table immediately + + Returns: + pd.DataFrame + """ + # read the tables if they are not provided + if drain_table is None: + try: + drain_table = read_table('drain_table') + except FileNotFoundError: + raise FileNotFoundError('The drain_table must be provided or created first') + if gauge_table is None: + try: + gauge_table = read_table('gauge_table') + except FileNotFoundError: + raise FileNotFoundError('The gauge_table must be provided or created first') + if reg_table is None: + try: + reg_table = read_table('regulate_table') + except FileNotFoundError: + raise FileNotFoundError('The regulate_table must be provided or created first') + if cluster_table is None: + try: + cluster_table = read_table('cluster_table') + except FileNotFoundError: + raise FileNotFoundError('The cluster_table must be provided or created first') + + # enforce correct column data types + drain_table[COL_MID] = drain_table[COL_MID].astype(str) + drain_table[COL_MID_DOWN] = drain_table[COL_MID_DOWN].astype(str) + gauge_table[COL_MID] = gauge_table[COL_MID].astype(str) + gauge_table[COL_GID] = gauge_table[COL_GID].astype(str) + reg_table[COL_MID] = reg_table[COL_MID].astype(str) + reg_table[COL_RID] = reg_table[COL_RID].astype(str) + cluster_table[COL_MID] = cluster_table[COL_MID].astype(str) + + # merge the drain_table, gauge_table, reg_table, and labels_df on the model_id column + assign_df = ( + drain_table + .merge(gauge_table, on=COL_MID, how='outer') + .merge(reg_table, on=COL_MID, how='outer') + .merge(cluster_table, on=COL_MID, how='outer') + .sort_values(by=COL_MID).reset_index(drop=True) + ) + + # create new columns asn_mid_col, asn_gid_col, reason_col + assign_df[atable_cols] = atable_cols_defaults + + if not all([col in assign_df.columns for col in all_cols]): + logger.error('Missing columns in assign table. Check your input tables.') + logger.debug(f'Have columns: {assign_df.columns}') + logger.debug(f'Need columns: {all_cols}') + raise AssertionError('Missing columns in assign table. Check your input tables.') + + if cache: + write_table(assign_df, 'assign_table') + + return assign_df + + +def mp_prop_gauges(df: pd.DataFrame, n_processes: int or None = None) -> pd.DataFrame: + """ + Traverses dendritic stream networks to identify upstream and downstream river reaches + + Args: + df: the assign table dataframe + n_processes: the number of processes to use for multiprocessing + + Returns: + pd.DataFrame + """ + logger.info('Propagating from Gauges') + gauged_mids = df[df[COL_GID].notna()][COL_MID].values + + with Pool(n_processes) as p: + logger.info('Finding Downstream') + df_prop_down = pd.concat(p.starmap(_map_propagate, [(df, x, 'down', COL_GPROP) for x in gauged_mids])) + logger.info('Finding Upstream') + df_prop_up = pd.concat(p.starmap(_map_propagate, [(df, x, 'up', COL_GPROP) for x in gauged_mids])) + logger.info('Resolving Nearest Propagation Neighbor') + df_prop = pd.concat([df_prop_down, df_prop_up]).reset_index(drop=True) + df_prop = pd.concat(p.starmap(_map_resolve_props, [(df_prop, x, COL_GPROP) for x in df_prop[COL_MID].unique()])) + + return pd.concat([df[~df[COL_MID].isin(df_prop[COL_MID])], df_prop]) + + +def mp_prop_regulated(df: pd.DataFrame, n_processes: int or None = None) -> pd.DataFrame: + """ + Traverses dendritic stream networks downstream from regulatory structures + + Args: + df: the assign table dataframe + n_processes: the number of processes to use for multiprocessing + + Returns: + pd.DataFrame + """ + logger.info('Propagating from Regulatory Structures') + with Pool(n_processes) as p: + logger.info('Propagating Downstream') + df_prop = pd.concat(p.starmap( + _map_propagate, + [(df, x, 'down', COL_RPROP, False) for x in df[df[COL_RID].notna()][COL_MID].values] + )) + logger.info('Resolving Propagation') + df_prop = pd.concat(p.starmap(_map_resolve_props, [(df_prop, x, COL_RPROP) for x in df_prop[COL_MID].unique()])) + + return pd.concat([df[~df[COL_MID].isin(df_prop[COL_MID])], df_prop]) + + +def _map_propagate(df: pd.DataFrame, start_mid: int, direction: str, prop_col: str, + same_order: bool = True, max_steps: int = 15) -> pd.DataFrame or None: + """ + Meant to be mapped over a dataframe to propagate assignments downstream or upstream + + Args: + df: the assignments table dataframe + start_mid: the model_id to start the propagation from + direction: either 'down' or 'up' to indicate the direction of propagation + prop_col: the column where the propagation information should be recorded + max_steps: the maximum number of steps to propagate + + Returns: + pd.DataFrame + """ + assigned_rows = [] + + # select the row to start the propagation from + start_order = df[df[COL_MID] == start_mid][COL_STRM_ORD].values[0] + stream_row = df[df[COL_MID] == start_mid] + start_gid = stream_row[COL_ASN_GID].values[0] + + # create a boolean selector array for filtering all future queries + if same_order: + select_same_order_streams = True + else: + select_same_order_streams = df[COL_STRM_ORD] == start_order + + # counter for the number of steps taken by the loop + n_steps = 1 + + # repeat as long as the current row is not empty + try: + while True: + # select the next up or downstream + if direction == 'down': + id_selector = df[COL_MID] == stream_row[COL_MID_DOWN].values[0] + else: # direction == 'up': + id_selector = df[COL_MID_DOWN] == stream_row[COL_MID].values[0] + + # select the next row using the ID and Order selectors + stream_row = df[np.logical_and(id_selector, select_same_order_streams)] + + # Break the loop if + # 1. next row is empty -> no upstream reach + # 2. next row stream order not a match -> not picked by select_same_order_streams -> empty stream_row + if stream_row.empty: + break + + # copy the row, modify the assignment columns, and append to the list + new_row = stream_row.copy() + new_row[[COL_ASN_MID, COL_ASN_GID, prop_col]] = [start_mid, start_gid, + f'{direction}-{n_steps}-{start_mid}'] + assigned_rows.append(new_row) + + # increment the steps counter + n_steps = n_steps + 1 + + # Break the loop if + # 1. The next row is an outlet -> no downstream row -> cause error when selecting next row + # 2. we have reach the max number of steps (n_steps -1) + if int(stream_row[COL_MID_DOWN].values[0]) == -1 or n_steps > max_steps: + break + except Exception as e: + logger.error(f'Error in map_propagate: {e}') + + if len(assigned_rows): + return pd.concat(assigned_rows) + return pd.DataFrame(columns=df.columns) + + +def _map_resolve_props(df_props: pd.DataFrame, mid: str, prop_col: str) -> pd.DataFrame: + """ + Resolves the propagation assignments by choosing the assignment with the fewest steps + + Args: + df_props: the combined upstream and downstream propagation assignments dataframe + mid: the model_id to resolve the propagation assignments for + prop_col: the column where the propagation information is recorded + + Returns: + pd.DataFrame + """ + df_mid = df_props[df_props[COL_MID] == mid].copy() + # parse the reason statement into number of steps and prop up or downstream + df_mid[['direction', 'n_steps']] = df_mid[prop_col].apply(lambda x: x.split('-')[:2]).to_list() + df_mid['n_steps'] = df_mid['n_steps'].astype(int) + # sort by n_steps then by reason + df_mid = df_mid.sort_values(['n_steps', 'direction'], ascending=[True, True]) + # return the first row which is the fewest steps and preferring downstream to upstream + return df_mid.head(1).drop(columns=['direction', 'n_steps']) diff --git a/setup.py b/setup.py index 52ff7c3..0e9cc70 100644 --- a/setup.py +++ b/setup.py @@ -6,7 +6,7 @@ AUTHOR = 'Riley C. Hales' DESCRIPTION = 'The SABER tool for bias correcting large hydrologic models' -VERSION = '0.7.0' +VERSION = '0.8.0' PYTHON_REQUIRES = '>=3.10' INSTALL_REQUIRES = [ @@ -22,6 +22,7 @@ 'numpy', 'pandas', 'pyarrow', + 'pyyaml', 'requests', 'scikit-learn', 'scipy',