diff --git a/docs/api/assign.md b/docs/api/assign.md new file mode 100644 index 0000000..8e33554 --- /dev/null +++ b/docs/api/assign.md @@ -0,0 +1,3 @@ +# `saber.assign` + +::: saber.assign \ No newline at end of file diff --git a/docs/api/cluster.md b/docs/api/cluster.md new file mode 100644 index 0000000..47da753 --- /dev/null +++ b/docs/api/cluster.md @@ -0,0 +1,3 @@ +# `saber.cluster` + +::: saber.cluster \ No newline at end of file diff --git a/docs/api/gis.md b/docs/api/gis.md new file mode 100644 index 0000000..e0a94ae --- /dev/null +++ b/docs/api/gis.md @@ -0,0 +1,3 @@ +# `saber.gis` + +::: saber.gis \ No newline at end of file diff --git a/docs/api/index.md b/docs/api/index.md index b9275db..22d56e8 100644 --- a/docs/api/index.md +++ b/docs/api/index.md @@ -1 +1,7 @@ -# API Documentation +# `saber-hbc` API + +* [`saber.assign`](assign.md) +* [`saber.cluster`](cluster.md) +* [`saber.gis`](gis.md) +* [`saber.prep`](prep.md) +* [`saber.validate`](validate.md) diff --git a/docs/api/prep.md b/docs/api/prep.md new file mode 100644 index 0000000..081140b --- /dev/null +++ b/docs/api/prep.md @@ -0,0 +1,3 @@ +# `saber.prep` + +::: saber.prep \ No newline at end of file diff --git a/docs/api/validate.md b/docs/api/validate.md new file mode 100644 index 0000000..fe8ffc2 --- /dev/null +++ b/docs/api/validate.md @@ -0,0 +1,3 @@ +# `saber.validate` + +::: saber.validate \ No newline at end of file diff --git a/docs/data/discharge-data.md b/docs/data/discharge-data.md new file mode 100644 index 0000000..e985ecf --- /dev/null +++ b/docs/data/discharge-data.md @@ -0,0 +1,41 @@ +# Required Hydrological Datasets + +1. Hindcast/Retrospective discharge for every stream segment (reporting point) in the model. This is a time series of + discharge, e.g. hydrograph, for each stream segment. The data should be saved in parquet format and named + `hindcast_series_table.parquet`. The DataFrame should have: + 1. An index named `datetime` of type `datetime`. Contains the datetime stamp for the simulated values (rows) + 2. 1 column per stream, column name is the stream's model ID and is type string, containing the discharge for each + time step. +2. Observed discharge data for each gauge. 1 file per gauge named `{gauge_id}.csv`. The DataFrame should have: + 1. `datetime`: The datetime stamp for the measurements + 2. A column whose name is the unique `gauge_id` containing the discharge for each time step. + +The `hindcast_series_table.parquet` should look like this: + +| datetime | model_id_1 | model_id_2 | model_id_3 | ... | +|------------|------------|------------|------------|-----| +| 1985-01-01 | 50 | 50 | 50 | ... | +| 1985-01-02 | 60 | 60 | 60 | ... | +| 1985-01-03 | 70 | 70 | 70 | ... | +| ... | ... | ... | ... | ... | + +Each gauge's csv file should look like this: + +| datetime | discharge | +|------------|-----------| +| 1985-01-01 | 50 | +| 1985-01-02 | 60 | +| 1985-01-03 | 70 | +| ... | ... | + +## Things to check + +Be sure that both datasets: + +- Are in the same units (e.g. m3/s) +- Are in the same time zone (e.g. UTC) +- Are in the same time step (e.g. daily average) +- Do not contain any non-numeric values (e.g. ICE, none, etc.) +- Do not contain rows with missing values (e.g. NaN or blank cells) +- Have been cleaned of any incorrect values (e.g. no negative values) +- Do not contain any duplicate rows \ No newline at end of file diff --git a/docs/data/gis-data.md b/docs/data/gis-data.md new file mode 100644 index 0000000..e125a22 --- /dev/null +++ b/docs/data/gis-data.md @@ -0,0 +1,46 @@ +# Required GIS Datasets + +1. Drainage lines (usually delineated center lines) with at least the following attributes (columns) + for each feature: + - `model_id`: A unique identifier/ID, any alphanumeric utf-8 string will suffice + - `downstream_model_id`: The ID of the next downstream reach + - `strahler_order`: The strahler stream order of each reach + - `model_drain_area`: Cumulative upstream drainage area + - `x`: The x coordinate of the centroid of each feature (precalculated for faster results later) + - `y`: The y coordinate of the centroid of each feature (precalculated for faster results later) + +2. Points representing the location of each of the river gauging station available with at least the + following attributes (columns) for each feature: + - `gauge_id`: A unique identifier/ID, any alphanumeric utf-8 string will suffice. + - `model_id`: The ID of the stream segment which corresponds to that gauge. + +The `drain_table.parquet` should look like this: + +| downstream_model_id | model_id | model_area | strahler_order | x | y | +|---------------------|-----------------|--------------|----------------|-----|-----| +| unique_stream_# | unique_stream_# | area in km^2 | stream_order | ## | ## | +| unique_stream_# | unique_stream_# | area in km^2 | stream_order | ## | ## | +| unique_stream_# | unique_stream_# | area in km^2 | stream_order | ## | ## | +| ... | ... | ... | ... | ... | ... | + +The `gauge_table.parquet` should look like this: + +| model_id | gauge_id | gauge_area | +|-------------------|------------------|--------------| +| unique_stream_num | unique_gauge_num | area in km^2 | +| unique_stream_num | unique_gauge_num | area in km^2 | +| unique_stream_num | unique_gauge_num | area in km^2 | +| ... | ... | ... | + + +## Things to check + +Be sure that both datasets: + +- Are in the same projected coordinate system +- Only contain gauges and reaches within the area of interest. Clip/delete anything else. + +Other things to consider: + +- You may find it helpful to also have the catchments, adjoint catchments, and a watershed boundary polygon for + visualization purposes. \ No newline at end of file diff --git a/docs/data/index.md b/docs/data/index.md index b05b937..872a09b 100644 --- a/docs/data/index.md +++ b/docs/data/index.md @@ -1,55 +1,13 @@ # Required Datasets -## GIS Datasets +SABER requires [GIS Datasets](./gis-data.md) and [Hydrological Datasets](./discharge-data.md). -1. Drainage lines (usually delineated center lines) with at least the following attributes (columns) - for each feature: - - `model_id`: A unique identifier/ID, any alphanumeric utf-8 string will suffice - - `downstream_model_id`: The ID of the next downstream reach - - `strahler_order`: The strahler stream order of each reach - - `model_drain_area`: Cumulative upstream drainage area - - `x`: The x coordinate of the centroid of each feature (precalculated for faster results later) - - `y`: The y coordinate of the centroid of each feature (precalculated for faster results later) -2. Points representing the location of each of the river gauging station available with at least the - following attributes (columns) for each feature: - - `gauge_id`: A unique identifier/ID, any alphanumeric utf-8 string will suffice. - - `model_id`: The ID of the stream segment which corresponds to that gauge. +These datasets ***need to be prepared independently before using `saber-hbc` functions***. You should organize the datasets in a working +directory that contains 3 subdirectories, as shown below. SABER will expect your inputs to be in the `tables` directory +with the correct names and will generate many files to populate the `gis` and `clusters` directories. -Be sure that both datasets: +Example project directory structure: -- Are in the same projected coordinate system -- Only contain gauges and reaches within the area of interest. Clip/delete anything else. - -Other things to consider: - -- You may find it helpful to also have the catchments, adjoint catchments, and a watershed boundary polygon for - visualization purposes. - -## Hydrological Datasets - -1. Hindcast/Retrospective/Historical Simulation for every stream segment (reporting point) in the model. This is a time - series of discharge (Q) for each stream segment. The data should be in a tabular format that can be read by `pandas`. - The data should have two columns: - 1. `datetime`: The datetime stamp for the measurements - 2. A column whose name is the unique `model_id` containing the discharge for each time step. -2. Observed discharge data for each gauge - 1. `datetime`: The datetime stamp for the measurements - 2. A column whose name is the unique `gauge_id` containing the discharge for each time step. - -Be sure that both datasets: - -- Are in the same units (e.g. m3/s) -- Are in the same time zone (e.g. UTC) -- Are in the same time step (e.g. daily average) -- Do not contain any non-numeric values (e.g. ICE, none, etc.) -- Do not contain rows with missing values (e.g. NaN or blank cells) -- Have been cleaned of any incorrect values (e.g. no negative values) -- Do not contain any duplicate rows - -## Working Directory - -SABER is designed to read and write many files in a working directory. - tables/ # This directory contains all the input datasets drain_table.parquet @@ -64,9 +22,3 @@ SABER is designed to read and write many files in a working directory. gis/ # this directory contains outputs from the SABER commands ... - -`drain_table.parquet` is a table of the attribute table from the drainage lines GIS dataset. It can be generated with -`saber.prep.gis_tables()`. - -`gauge_table.parquet` is a table of the attribute table from the drainage lines GIS dataset. It can be generated with -`saber.prep.gis_tables()`. diff --git a/docs/requirements.txt b/docs/requirements.txt index f14360a..f630e1f 100644 --- a/docs/requirements.txt +++ b/docs/requirements.txt @@ -1,2 +1,3 @@ mkdocs==1.3 -mkdocs-material==8.4 \ No newline at end of file +mkdocs-material==8.4 +mkdocstrings-python==0.7.1 \ No newline at end of file diff --git a/docs/user-guide/data_preparation.md b/docs/user-guide/data_preparation.md index 5e5a884..5f9a8f1 100644 --- a/docs/user-guide/data_preparation.md +++ b/docs/user-guide/data_preparation.md @@ -1,80 +1,30 @@ -# Prepare Spatial Data (scripts not provided) -This step instructs you to collect 3 gis files and use them to generate 2 tables. All 5 files (3 gis files and 2 -tables) should go in the `gis_inputs` directory +# Processing Input Data -1. Clip model drainage lines and catchments shapefile to extents of the region of interest. - For speed/efficiency, merge their attribute tables and save as a csv. - - read drainage line shapefile and with GeoPandas - - delete all columns ***except***: NextDownID, COMID, Tot_Drain_, order_ - - rename the columns: - - NextDownID -> downstream_model_id - - COMID -> model_id - - Tot_Drain -> drainage_area - - order_ -> stream_order - - compute the x and y coordinates of the centroid of each feature (needs the geometry column) - - delete geometry column - - save as `drain_table.csv` in the `gis_inputs` directory +Before following these steps, you should have prepared the required datasets and organized them in a working directory. +Refer to the [Required Datasets](../data/index.md) page for more information. -Tip to compute the x and y coordinates using geopandas +***Prereqs:*** +1. Create a working directory and subdirectories +2. Prepare the `drain_table` and `gauge_table` files. +3. Prepare the `hindcast_series_table` file. -Your table should look like this: +## Prepare Flow Duration Curve Data -| downstream_model_id | model_id | model_drain_area | stream_order | x | y | -|---------------------|-----------------|------------------|--------------|-----|-----| -| unique_stream_# | unique_stream_# | area in km^2 | stream_order | ## | ## | -| unique_stream_# | unique_stream_# | area in km^2 | stream_order | ## | ## | -| unique_stream_# | unique_stream_# | area in km^2 | stream_order | ## | ## | -| ... | ... | ... | ... | ... | ... | - -1. Prepare a csv of the attribute table of the gauge locations shapefile. - - You need the columns: - - model_id - - gauge_id - - drainage_area (if known) - -Your table should look like this (column order is irrelevant): - -| model_id | gauge_drain_area | gauge_id | -|-------------------|------------------|------------------| -| unique_stream_num | area in km^2 | unique_gauge_num | -| unique_stream_num | area in km^2 | unique_gauge_num | -| unique_stream_num | area in km^2 | unique_gauge_num | -| ... | ... | ... | - -# Prepare Discharge Data - -This step instructs you to gather simulated data and observed data. The raw simulated data (netCDF) and raw observed -data (csvs) should be included in the `data_inputs` folder. You may keep them in another location and provide the path -as an argument in the functions that need it. These datasets are used to generate several additional csv files which -are stored in the `data_processed` directory and are used in later steps. The netCDF file may have any name and the -directory of observed data csvs should be called `obs_csvs`. - -Use the dat - -1. Create a single large csv of the historical simulation data with a datetime column and 1 column per stream segment labeled by the stream's ID number. - -| datetime | model_id_1 | model_id_2 | model_id_3 | -|------------|------------|------------|------------| -| 1979-01-01 | 50 | 50 | 50 | -| 1979-01-02 | 60 | 60 | 60 | -| 1979-01-03 | 70 | 70 | 70 | -| ... | ... | ... | ... | - -2. Process the large simulated discharge csv to create a 2nd csv with the flow duration curve on each segment (script provided). +Process the `hindcast_series_table` to create a 2nd table with the flow duration curve on each segment. | p_exceed | model_id_1 | model_id_2 | model_id_3 | |----------|------------|------------|------------| | 100 | 0 | 0 | 0 | -| 99 | 10 | 10 | 10 | -| 98 | 20 | 20 | 20 | +| 97.5 | 10 | 10 | 10 | +| 95 | 20 | 20 | 20 | | ... | ... | ... | ... | -3. Process the large historical discharge csv to create a 3rd csv with the monthly averages on each segment (script provided). +Then process the FDC data to create a 3rd table with scaled/transformed FDC data for each segment. -| month | model_id_1 | model_id_2 | model_id_3 | -|-------|------------|------------|------------| -| 1 | 60 | 60 | 60 | -| 2 | 30 | 30 | 30 | -| 3 | 70 | 70 | 70 | -| ... | ... | ... | ... | +| model_id | Q100 | Q97.5 | Q95 | +|----------|------|-------|-----| +| 1 | 60 | 50 | 40 | +| 2 | 60 | 50 | 40 | +| 3 | 60 | 50 | 40 | +| ... | ... | ... | ... | diff --git a/docs/user-guide/index.md b/docs/user-guide/index.md index cfac838..3043830 100644 --- a/docs/user-guide/index.md +++ b/docs/user-guide/index.md @@ -1,6 +1,8 @@ # User Guide -We anticipate the primary usage of `saber-hbc` will be in scripts or workflows that process data in isolated environments, +While following this guide, you may also want to refer to the [API Documentation](../api). + +We anticipate the primary usage of `saber` will be in scripts or workflows that process data in isolated environments, such as web servers or interactively in notebooks, rather than using the api in an app. The package's API is designed with many modular, compartmentalized functions intending to create flexibility for running specific portions of the SABER process or repeating certain parts if workflows fail or parameters need to be adjusted. @@ -20,3 +22,6 @@ logging.basicConfig( format='%(asctime)s: %(name)s - %(message)s' ) ``` + +## Example Script + diff --git a/docs/user-guide/validation.md b/docs/user-guide/validation.md index c12d9fa..b4e76f7 100644 --- a/docs/user-guide/validation.md +++ b/docs/user-guide/validation.md @@ -27,4 +27,4 @@ obs_data_dir = '/path/to/obs/data/directory' # optional - if data not in workdi saber.validate.sample_gauges(workdir) saber.validate.run_series(workdir, drain_shape, obs_data_dir) -``` \ No newline at end of file +``` diff --git a/mkdocs.yml b/mkdocs.yml index da851fd..a4e5135 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -7,7 +7,10 @@ repo_url: https://github.com/rileyhales/saber-hbc/ theme: material nav: - Home: index.md - - Required Datasets: data/index.md + - Required Datasets: + - Summary: data/index.md + - GIS Datasets: data/gis-data.md + - Discharge Datasets: data/discharge-data.md - User Guide: - Using SABER: user-guide/index.md - Data Preparation: user-guide/data_preparation.md @@ -17,5 +20,15 @@ nav: - Bias Correction: user-guide/bias_correction.md - Validation: user-guide/validation.md - Demonstration: demo/index.md - - API Docs: api/index.md + - API Docs: + - API Reference: api/index.md + - saber.prep: api/prep.md + - saber.cluster: api/cluster.md + - saber.assign: api/assign.md + - saber.gis: api/gis.md + - saber.validate: api/validate.md - Cite SABER: cite/index.md + +plugins: + - search + - mkdocstrings diff --git a/saber/__init__.py b/saber/__init__.py index d75a698..7922ef2 100644 --- a/saber/__init__.py +++ b/saber/__init__.py @@ -14,5 +14,5 @@ ] __author__ = 'Riley C. Hales' -__version__ = '0.5.0' +__version__ = '0.6.0' __license__ = 'BSD 3 Clause Clear' diff --git a/saber/_propagation.py b/saber/_propagation.py deleted file mode 100644 index 41ee9f7..0000000 --- a/saber/_propagation.py +++ /dev/null @@ -1,117 +0,0 @@ -import logging - -import pandas as pd - -from .io import asgn_gid_col -from .io import asgn_mid_col -from .io import down_mid_col -from .io import mid_col -from .io import order_col -from .io import reason_col - -logger = logging.getLogger(__name__) - - -def walk_downstream(df: pd.DataFrame, start_id: int, same_order: bool = True, outlet_next_id: str or int = -1) -> tuple: - """ - Traverse a stream network table containing a column of unique ids, a column of the id for the stream/basin - downstream of that point, and, optionally, a column containing the stream order. - - Args: - df (pd.DataFrame): - start_id (int): - same_order (bool): - outlet_next_id (str or int): - - Returns: - Tuple of stream ids in the order they come from the starting point. - """ - downstream_ids = [] - - df_ = df.copy() - if same_order: - start_id_order = df_[df_[mid_col] == start_id][order_col].values[0] - df_ = df_[df_[order_col] == start_id_order] - - stream_row = df_[df_[mid_col] == start_id] - while len(stream_row[down_mid_col].values) > 0 and stream_row[down_mid_col].values[0] != outlet_next_id: - downstream_ids.append(stream_row[down_mid_col].values[0]) - stream_row = df_[df_[mid_col] == stream_row[down_mid_col].values[0]] - if len(stream_row) == 0: - break - return tuple(downstream_ids) - - -def walk_upstream(df: pd.DataFrame, start_id: int, same_order: bool = True) -> tuple: - """ - Traverse a stream network table containing a column of unique ids, a column of the id for the stream/basin - downstream of that point, and, optionally, a column containing the stream order. - - Args: - df (pd.DataFrame): the assign_table df - start_id (int): that id to start on - same_order (bool): select upstream segments only on the same order stream - - Returns: - Tuple of stream ids in the order they come from the starting point. If you chose same_order = False, the - streams will appear in order on each upstream branch but the various branches will appear mixed in the tuple in - the order they were encountered by the iterations. - """ - df_ = df.copy() - if same_order: - df_ = df_[df_[order_col] == df_[df_[mid_col] == start_id][order_col].values[0]] - - # start a list of the upstream ids - upstream_ids = [start_id, ] - upstream_rows = df_[df_[down_mid_col] == start_id] - - while not upstream_rows.empty or len(upstream_rows) > 0: - if len(upstream_rows) == 1: - upstream_ids.append(upstream_rows[mid_col].values[0]) - upstream_rows = df_[df_[down_mid_col] == upstream_rows[mid_col].values[0]] - elif len(upstream_rows) > 1: - for id in upstream_rows[mid_col].values.tolist(): - upstream_ids += list(walk_upstream(df_, id, False)) - upstream_rows = df_[df_[down_mid_col] == id] - return tuple(set(upstream_ids)) - - -def propagate_in_table(df: pd.DataFrame, start_mid: int, start_gid: int, connected: tuple, max_prop: int, - direction: str): - """ - - Args: - df: the assign_table df - start_mid: the model_id of the stream to start at - connected: a list of stream segments, up- or downstream from the gauged_stream, in the order they come - max_prop: max number of stream segments to propagate up/downstream - direction: either "upstream" or "downstream" - - Returns: - df - """ - _df = df.copy() - - for index, segment_id in enumerate(connected): - distance = index + 1 - if distance > max_prop: - continue - downstream_row = _df[_df[mid_col] == segment_id] - - # if the downstream segment doesn't have an assigned gauge, we're going to assign the current one - if downstream_row[asgn_mid_col].empty or pd.isna(downstream_row[asgn_mid_col]).any(): - _df.loc[_df[mid_col] == segment_id, [asgn_mid_col, asgn_gid_col, reason_col]] = \ - [start_mid, start_gid, f'propagation-{direction}-{distance}'] - logger.info(f'assigned gauged stream {start_mid} to ungauged {direction} {segment_id}') - continue - - # if the stream segment does have an assigned value, check the value to determine what to do - else: - downstream_reason = downstream_row[reason_col].values[0] - if downstream_reason == 'gauged': - logger.info('already gauged') - if 'propagation' in downstream_reason and int(str(downstream_reason).split('-')[-1]) >= distance: - _df.loc[_df[mid_col] == segment_id, [asgn_mid_col, asgn_gid_col, reason_col]] = \ - [start_mid, start_gid, f'propagation-{direction}-{distance}'] - logger.info(f'assigned gauged stream {start_mid} to previously assigned {direction} {segment_id}') - return _df diff --git a/saber/assign.py b/saber/assign.py index 3df555b..3048dc6 100644 --- a/saber/assign.py +++ b/saber/assign.py @@ -1,123 +1,193 @@ import logging -import os -import joblib import numpy as np import pandas as pd -from ._propagation import propagate_in_table -from ._propagation import walk_downstream -from ._propagation import walk_upstream from .io import asgn_gid_col from .io import asgn_mid_col +from .io import down_mid_col from .io import gid_col from .io import mid_col from .io import order_col from .io import read_table from .io import reason_col from .io import write_table +from .io import clbl_col +from .io import x_col +from .io import y_col -__all__ = ['gen', 'merge_clusters', 'assign_gauged', 'assign_propagation', 'assign_by_distance', ] +__all__ = ['generate', 'assign_gauged', 'map_propagate', 'map_resolve_propagations', 'map_assign_ungauged', ] logger = logging.getLogger(__name__) -def gen(workdir: str, cache: bool = True) -> pd.DataFrame: +def generate(workdir: str, labels_df: pd.DataFrame = None, drain_table: pd.DataFrame = None, + gauge_table: pd.DataFrame = None, cache: bool = True) -> pd.DataFrame: """ Joins the drain_table.csv and gauge_table.csv to create the assign_table.csv Args: workdir: path to the working directory cache: whether to cache the assign table immediately + labels_df: a dataframe with a column for the assigned cluster label and a column for the model_id + drain_table: the drain table dataframe + gauge_table: the gauge table dataframe Returns: - None + pd.DataFrame """ - # read and merge the tables - assign_table = pd.merge( - read_table(workdir, 'drain_table'), - read_table(workdir, 'gauge_table'), + # read the tables if they are not provided + if labels_df is None: + labels_df = read_table(workdir, 'cluster_labels') + if drain_table is None: + drain_table = read_table(workdir, 'drain_table') + if gauge_table is None: + gauge_table = read_table(workdir, 'gauge_table') + + # enforce correct column data types + labels_df[mid_col] = labels_df[mid_col].astype(str) + drain_table[mid_col] = drain_table[mid_col].astype(str) + drain_table[down_mid_col] = drain_table[down_mid_col].astype(str) + gauge_table[mid_col] = gauge_table[mid_col].astype(str) + + # join the drain_table and gauge_table then join the labels_df + assign_df = pd.merge( + drain_table, + gauge_table, on=mid_col, how='outer' - ) + ).merge(labels_df, on=mid_col, how='outer') + assign_df = assign_df.sort_values(by=mid_col).reset_index(drop=True) - # create the new columns - assign_table[asgn_mid_col] = np.nan - assign_table[asgn_gid_col] = np.nan - assign_table[reason_col] = np.nan + # create new columns asgn_mid_col, asgn_gid_col, reason_col + assign_df[[asgn_mid_col, asgn_gid_col, reason_col]] = ['unassigned', 'unassigned', 'unassigned'] if cache: - write_table(assign_table, workdir, 'assign_table') + write_table(assign_df, workdir, 'assign_table') + write_table(assign_df[[mid_col, ]], workdir, 'mid_list') + write_table(assign_df[[gid_col, ]], workdir, 'gid_list') + write_table(assign_df[[mid_col, gid_col]], workdir, 'mid_gid_map') - return assign_table + return assign_df -def merge_clusters(workdir: str, assign_table: pd.DataFrame, n_clusters: int = None) -> pd.DataFrame: +def assign_gauged(df: pd.DataFrame) -> pd.DataFrame: """ - Creates a csv listing the streams assigned to each cluster in workdir/kmeans_models and also adds that information - to assign_table.csv + Assigns basins a gauge for correction which contain a gauge Args: - workdir: path to the project directory - assign_table: the assignment table DataFrame - n_clusters: number of clusters to use when applying the labels to the assign_table + df: the assignments table dataframe Returns: - None + Copy of df1 with assignments made """ - # create a dataframe with the optimal model's labels and the model_id's - df = pd.DataFrame({ - 'cluster': joblib.load(os.path.join(workdir, 'clusters', f'kmeans-{n_clusters}.pickle')).labels_, - mid_col: read_table(workdir, 'model_ids').values.flatten() - }, dtype=str) + selector = df[gid_col].notna() + df.loc[selector, asgn_mid_col] = df[mid_col] + df.loc[selector, asgn_gid_col] = df[gid_col] + df.loc[selector, reason_col] = 'gauged' + return df - # merge the dataframes - return assign_table.merge(df, how='outer', on=mid_col) - -def assign_gauged(df: pd.DataFrame) -> pd.DataFrame: +def map_propagate(df: pd.DataFrame, start_mid: int, direction: str) -> pd.DataFrame or None: """ - Assigns basins a gauge for correction which contain a gauge + Meant to be mapped over a dataframe to propagate assignments downstream or upstream Args: df: the assignments table dataframe + start_mid: the model_id to start the propagation from + direction: either 'down' or 'up' to indicate the direction of propagation Returns: - Copy of df with assignments made + pd.DataFrame """ - _df = df.copy() - selector = ~_df[gid_col].isna() - _df.loc[selector, asgn_mid_col] = _df[mid_col] - _df.loc[selector, asgn_gid_col] = _df[gid_col] - _df.loc[selector, reason_col] = 'gauged' - return _df + # logger.info(f'Prop {direction} from {start_mid}') + assigned_rows = [] + start_order = df[df[mid_col] == start_mid][order_col].values[0] + + # select the starting row + stream_row = df[df[mid_col] == start_mid] + start_gid = stream_row[asgn_gid_col].values[0] + select_same_order_streams = df[order_col] == start_order + + n_steps = 1 + + # repeat as long as the current row is not empty + try: + while True: + # select the next up or downstream + if direction == 'down': + id_selector = df[mid_col] == stream_row[down_mid_col].values[0] + else: # direction == 'up': + id_selector = df[down_mid_col] == stream_row[mid_col].values[0] + + # select the next row using the ID and Order selectors + stream_row = df[np.logical_and(id_selector, select_same_order_streams)] + + # Break the loop if + # 1. next row is empty -> no upstream/downstream row -> empty stream_row + # 2. next row stream order not a match -> not picked by select_same_order_streams -> empty stream_row + # 3. next row already has an assignment made -> reason column is not empty string + if stream_row.empty or stream_row[reason_col].values[0] != 'unassigned': + break + + # copy the row, modify the assignment columns, and append to the list + new_row = stream_row.copy() + new_row[[asgn_mid_col, asgn_gid_col, reason_col]] = [start_mid, start_gid, f'prop-{direction}-{n_steps}'] + assigned_rows.append(new_row) + + # increment the steps counter + n_steps = n_steps + 1 + + # Break the loop if + # 1. The next row is an outlet -> no downstream row -> cause error when selecting next row + # 2. we have reach the max number of steps (n_steps -1) + if stream_row[down_mid_col].values[0] != -1 or n_steps >= 8: + break + except Exception as e: + logger.error(f'Error in map_propagate: {e}') + + if len(assigned_rows): + return pd.concat(assigned_rows) + return pd.DataFrame(columns=df.columns) + + +def map_resolve_propagations(df_props: pd.DataFrame, mid: str) -> pd.DataFrame: + """ + Resolves the propagation assignments by choosing the assignment with the fewest steps + Args: + df_props: the combined upstream and downstream propagation assignments dataframe + mid: the model_id to resolve the propagation assignments for -def assign_propagation(df: pd.DataFrame, max_prop: int = 5) -> pd.DataFrame: + Returns: + pd.DataFrame """ + df_mid = df_props[df_props[mid_col] == mid].copy() + # parse the reason statement into number of steps and prop up or downstream + df_mid[['direction', 'n_steps']] = pd.DataFrame( + df_props[reason_col].apply(lambda x: x.split('-')[1:]).to_list()) + df_mid['n_steps'] = df_mid['n_steps'].astype(float) + # sort by n_steps then by reason + df_mid = df_mid.sort_values(['n_steps', 'direction'], ascending=[True, True]) + # return the first row which is the fewest steps and preferring downstream to upstream) + return df_mid.head(1).drop(columns=['direction', 'n_steps']) + + +def assign_propagation(df: pd.DataFrame, df_props: pd.DataFrame) -> pd.DataFrame: + """ + Merge the assignment table and the propagation assignments Args: df: the assignments table dataframe - max_prop: the max number of stream segments to propagate downstream + df_props: the combined upstream and downstream propagation assignments dataframe Returns: - Copy of df with assignments made + pd.DataFrame """ - _df = df.copy() - for gauged_stream in _df.loc[~_df[gid_col].isna(), mid_col]: - subset = _df.loc[_df[mid_col] == gauged_stream, gid_col] - if subset.empty: - continue - start_gid = subset.values[0] - connected_segments = walk_upstream(df, gauged_stream, same_order=True) - _df = propagate_in_table(_df, gauged_stream, start_gid, connected_segments, max_prop, 'upstream') - connected_segments = walk_downstream(df, gauged_stream, same_order=True) - _df = propagate_in_table(_df, gauged_stream, start_gid, connected_segments, max_prop, 'downstream') - - return _df + return pd.concat([df[~df[mid_col].isin(df_props[mid_col])], df_props]) -def assign_by_distance(df: pd.DataFrame) -> pd.DataFrame: +def map_assign_ungauged(assign_df: pd.DataFrame, gauges_df: np.array, mid: str) -> pd.DataFrame: """ Assigns all possible ungauged basins a gauge that is (1) is closer than any other gauge @@ -125,38 +195,27 @@ def assign_by_distance(df: pd.DataFrame) -> pd.DataFrame: (3) in the same simulated fdc cluster as ungauged basin Args: - df: the assignments table dataframe + assign_df: the assignments table dataframe + gauges_df: a subset of the assignments dataframe containing the gauges + mid: the model_id to assign a gauge for Returns: - Copy of df with assignments made + a new row for the given mid with the assignments made """ - _df = df.copy() - # first filter by cluster number - for c_num in sorted(set(_df['sim-fdc-cluster'].values)): - c_sub = _df[_df['sim-fdc-cluster'] == c_num] - # next filter by stream order - for so_num in sorted(set(c_sub[order_col])): - c_so_sub = c_sub[c_sub[order_col] == so_num] - - # determine which ids **need** to be assigned - ids_to_assign = c_so_sub[c_so_sub[asgn_mid_col].isna()][mid_col].values - avail_assigns = c_so_sub[c_so_sub[asgn_mid_col].notna()] - if ids_to_assign.size == 0 or avail_assigns.empty: - logger.error(f'unable to assign cluster {c_num} to stream order {so_num}') - continue - # now you find the closest gauge to each unassigned - for id in ids_to_assign: - subset = c_so_sub.loc[c_so_sub[mid_col] == id, ['x', 'y']] - - dx = avail_assigns.x.values - subset.x.values - dy = avail_assigns.y.values - subset.y.values - avail_assigns['dist'] = np.sqrt(dx * dx + dy * dy) - row_idx_to_assign = avail_assigns['dist'].idxmin() - - mid_to_assign = avail_assigns.loc[row_idx_to_assign].assigned_model_id - gid_to_assign = avail_assigns.loc[row_idx_to_assign].assigned_gauge_id - - _df.loc[_df[mid_col] == id, [asgn_mid_col, asgn_gid_col, reason_col]] = \ - [mid_to_assign, gid_to_assign, f'cluster-{c_num}-dist'] - - return _df + try: + # find the closest gauge using euclidean distance without accounting for projection/map distortion + mid_x, mid_y = assign_df.loc[assign_df[mid_col] == mid, [x_col, y_col]].values.flatten() + row_idx_to_assign = pd.Series( + np.sqrt(np.power(gauges_df[x_col] - mid_x, 2) + np.power(gauges_df[y_col] - mid_y, 2)) + ).idxmin() + + asgn_mid, asgn_gid = gauges_df.loc[row_idx_to_assign, [asgn_mid_col, asgn_gid_col]] + asgn_reason = f'cluster-{gauges_df[clbl_col].values[0]}' + + new_row = assign_df[assign_df[mid_col] == mid].copy() + new_row[[asgn_mid_col, asgn_gid_col, reason_col]] = [asgn_mid, asgn_gid, asgn_reason] + except Exception as e: + logger.error(f'Error in map_assign_ungauged: {e}') + new_row = pd.DataFrame(columns=assign_df.columns) + + return new_row diff --git a/saber/cluster.py b/saber/cluster.py index 2cda3e6..09fae92 100755 --- a/saber/cluster.py +++ b/saber/cluster.py @@ -15,15 +15,19 @@ from sklearn.metrics import silhouette_samples from .io import _find_model_files +from .io import clbl_col +from .io import mid_col +from .io import get_table_path from .io import read_table from .io import write_table -__all__ = ['generate', 'summarize_fit', 'plot_clusters', 'calc_silhouette', 'plot_silhouettes'] +__all__ = ['cluster', 'predict_labels', 'summarize_fit', 'plot_clusters', 'calc_silhouette', 'plot_silhouettes', + 'plot_fit_metrics', 'plot_centers', ] logger = logging.getLogger(__name__) -def generate(workdir: str, x: np.ndarray = None, max_clusters: int = 13) -> None: +def cluster(workdir: str, x: np.ndarray = None, max_clusters: int = 13) -> None: """ Trains scikit-learn MiniBatchKMeans models and saves as pickle @@ -47,6 +51,27 @@ def generate(workdir: str, x: np.ndarray = None, max_clusters: int = 13) -> None return +def predict_labels(workdir: str, n_clusters: int, x: pd.DataFrame) -> pd.DataFrame: + """ + Predict the cluster labels for a set number of FDCs + + Args: + workdir: path to the project directory + n_clusters: number of cluster model to use for prediction + x: A dataframe with 1 row per FDC (stream) and 1 column per FDC value. Index is the stream's ID. + + Returns: + None + """ + model = joblib.load(os.path.join(workdir, 'clusters', f'kmeans-{n_clusters}.pickle')) + labels_df = pd.DataFrame( + np.transpose([model.predict(x.values), x.index]), + columns=[clbl_col, mid_col] + ) + write_table(labels_df, workdir, 'cluster_labels') + return labels_df + + def summarize_fit(workdir: str) -> None: """ Generate a summary of the clustering results save the centers and labels to parquet @@ -83,14 +108,10 @@ def summarize_fit(workdir: str) -> None: sum_df['knee'] = KneeLocator(summary['number'], summary['inertia'], curve='convex', direction='decreasing').knee write_table(sum_df, workdir, 'cluster_metrics') - # save the labels as a parquet - labels = np.transpose(np.array(labels)) - write_table(pd.DataFrame(labels, columns=np.array(range(2, labels.shape[1] + 2)).astype(str)), - workdir, 'cluster_labels') return -def calc_silhouette(workdir: str, x: np.ndarray, n_clusters: int or Iterable = 'all', samples: int = 100_000) -> None: +def calc_silhouette(workdir: str, x: np.ndarray, n_clusters: int or Iterable = 'all', samples: int = 75_000) -> None: """ Calculate the silhouette score for the given number of clusters @@ -363,7 +384,9 @@ def plot_fit_metrics(workdir: str, plt_width: int = 5, plt_height: int = 3) -> N clusters_dir = os.path.join(workdir, 'clusters') df = read_table(workdir, 'cluster_metrics') - df = df.merge(read_table(workdir, 'cluster_sscores'), on='number', how='outer') + if os.path.exists(get_table_path(workdir, 'cluster_sscores')): + df = df.merge(read_table(workdir, 'cluster_sscores'), on='number', how='outer') + df['number'] = df['number'].astype(int) # initialize the figure and labels fig, ax1 = plt.subplots( @@ -371,24 +394,28 @@ def plot_fit_metrics(workdir: str, plt_width: int = 5, plt_height: int = 3) -> N dpi=750, tight_layout=True, ) - ax2 = ax1.twinx() # Plot titles and labels ax1.set_title("Clustering Fit Metrics") ax1.set_xlabel("Number of Clusters") ax1.set_ylabel("Inertia") - ax2.set_ylabel("Silhouette Score") ticks = np.arange(1, df['number'].max() + 2) ax1.set_xlim(ticks[0], ticks[-1]) ax1.set_xticks(ticks) - ax2.set_ylim(0, 1) # plot the inertia - knee = df['knee'].values[0] + knee = int(df['knee'].values[0]) ax1.plot(df['number'], df['inertia'], marker='o', label='Inertia') ax1.plot(knee, df[df['number'] == knee]['inertia'], marker='o', c='red', label='Knee') - ax2.plot(df['number'], df['silhouette'], marker='o', c='green', label='Silhouette Score') + + # add the silhouette scores if they were calculated + if 'silhouette' in df.columns: + df.loc[df['silhouette'].isna(), 'silhouette'] = '' + ax2 = ax1.twinx() + ax2.set_ylabel("Silhouette Score") + ax2.set_ylim(0, 1) + ax2.plot(df['number'], df['silhouette'], marker='o', c='green', label='Silhouette Score') fig.savefig(os.path.join(clusters_dir, f'figure-fit-metrics.png')) plt.close(fig) diff --git a/saber/gis.py b/saber/gis.py index f8f6de9..c521fb4 100644 --- a/saber/gis.py +++ b/saber/gis.py @@ -1,5 +1,5 @@ import os -import warnings +import logging import contextily as cx import geopandas as gpd @@ -12,10 +12,14 @@ from .io import metric_nc_name_list from .io import mid_col from .io import reason_col +from .io import dir_gis +from .io import clbl_col __all__ = ['generate_all', 'clip_by_assignment', 'clip_by_cluster', 'clip_by_unassigned', 'clip_by_ids', 'validation_maps'] +logger = logging.getLogger(__name__) + def generate_all(workdir: str, assign_table: pd.DataFrame, drain_gis: str, prefix: str = '') -> None: """ @@ -37,13 +41,13 @@ def generate_all(workdir: str, assign_table: pd.DataFrame, drain_gis: str, prefi return -def clip_by_assignment(workdir: str, assign_table: pd.DataFrame, drain_gis: str, prefix: str = '') -> None: +def clip_by_assignment(workdir: str, assign_df: pd.DataFrame, drain_gis: str, prefix: str = '') -> None: """ Creates Geopackage files in workdir/gis_outputs for each unique value in the assignment column Args: workdir: the path to the working directory for the project - assign_table: the assign_table dataframe + assign_df: the assign_table dataframe drain_gis: path to a drainage line shapefile which can be clipped prefix: a prefix for names of the outputs to distinguish between data generated at separate instances @@ -51,18 +55,20 @@ def clip_by_assignment(workdir: str, assign_table: pd.DataFrame, drain_gis: str, None """ # read the drainage line shapefile - dl = gpd.read_file(drain_gis) - save_dir = os.path.join(workdir, 'gis') + if isinstance(drain_gis, str): + drain_gis = gpd.read_file(drain_gis) # get the unique list of assignment reasons - for reason in set(assign_table[reason_col].dropna().values): - ids = assign_table[assign_table[reason_col] == reason][mid_col].values - subset = dl[dl[mid_col].isin(ids)] - name = f'{prefix}{"_" if prefix else ""}assignments_{reason}.gpkg' + for reason in assign_df[reason_col].unique(): + logger.info(f'Creating GIS output for group: {reason}') + selector = drain_gis[mid_col].astype(str).isin(assign_df[assign_df[reason_col] == reason][mid_col]) + subset = drain_gis[selector] + name = f'{f"{prefix}_" if prefix else ""}assignments_{reason}.gpkg' if subset.empty: + logger.debug(f'Empty filter: No streams are assigned for {reason}') continue else: - subset.to_file(os.path.join(save_dir, name)) + subset.to_file(os.path.join(workdir, dir_gis, name)) return @@ -79,12 +85,15 @@ def clip_by_cluster(workdir: str, assign_table: pd.DataFrame, drain_gis: str, pr Returns: None """ - dl_gdf = gpd.read_file(drain_gis) - for num in sorted(set(assign_table[mid_col].dropna().values)): - gdf = dl_gdf[dl_gdf[mid_col].isin(assign_table[assign_table[mid_col] == num][mid_col])] + if isinstance(drain_gis, str): + drain_gis = gpd.read_file(drain_gis) + for num in assign_table[clbl_col].unique(): + logger.info(f'Creating GIS output for cluster: {num}') + gdf = drain_gis[drain_gis[mid_col].astype(str).isin(assign_table[assign_table[clbl_col] == num][mid_col])] if gdf.empty: + logger.debug(f'Empty filter: No streams are assigned to cluster {num}') continue - gdf.to_file(os.path.join(workdir, 'gis', f'{prefix}{"_" if prefix else ""}-{int(num)}.gpkg')) + gdf.to_file(os.path.join(workdir, dir_gis, f'{prefix}{"_" if prefix else ""}cluster-{int(num)}.gpkg')) return @@ -101,13 +110,15 @@ def clip_by_unassigned(workdir: str, assign_table: pd.DataFrame, drain_gis: str, Returns: None """ - dl_gdf = gpd.read_file(drain_gis) - ids = assign_table[assign_table[reason_col].isna()][mid_col].values - subset = dl_gdf[dl_gdf[mid_col].isin(ids)] + logger.info('Creating GIS output for unassigned basins') + if isinstance(drain_gis, str): + drain_gis = gpd.read_file(drain_gis) + ids = assign_table[assign_table[reason_col] == 'unassigned'][mid_col].values + subset = drain_gis[drain_gis[mid_col].astype(str).isin(ids)] if subset.empty: - warnings.warn('Empty filter: No streams are unassigned') + logger.debug('Empty filter: No streams are unassigned') return - savepath = os.path.join(workdir, 'gis', f'{prefix}{"_" if prefix else ""}assignments_unassigned.gpkg') + savepath = os.path.join(workdir, dir_gis, f'{prefix}{"_" if prefix else ""}assignments_unassigned.gpkg') subset.to_file(savepath) return @@ -126,10 +137,10 @@ def clip_by_ids(workdir: str, ids: list, drain_gis: str, prefix: str = '', id_co Returns: None """ - dl = gpd.read_file(drain_gis) - save_dir = os.path.join(workdir, 'gis') + if isinstance(drain_gis, str): + drain_gis = gpd.read_file(drain_gis) name = f'{prefix}{"_" if prefix else ""}id_subset.gpkg' - dl[dl[id_column].isin(ids)].to_file(os.path.join(save_dir, name)) + drain_gis[drain_gis[id_column].isin(ids)].to_file(os.path.join(workdir, dir_gis, name)) return @@ -182,15 +193,16 @@ def validation_maps(workdir: str, gauge_gis: str, val_table: pd.DataFrame = None def histomaps(gdf: gpd.GeoDataFrame, metric: str, prct: str, workdir: str) -> None: """ + Creates a histogram of the KGE2012 values for the validation set Args: - gdf: - metric: - prct: - workdir: + gdf: a GeoDataFrame containing validation metrics + metric:name of th emetric to plot + prct: Percentile of the validation set used to generate the histogram + workdir: the project working directory Returns: - + None """ core_columns = [mid_col, gid_col, 'geometry'] # world = gpd.read_file(gpd.datasets.get_path('naturalearth_lowres')) diff --git a/saber/io.py b/saber/io.py index 3ee6651..c061ad7 100644 --- a/saber/io.py +++ b/saber/io.py @@ -9,12 +9,15 @@ # assign table and gis_input file required column names mid_col = 'model_id' gid_col = 'gauge_id' +clbl_col = 'cluster_label' asgn_mid_col = 'assigned_model_id' asgn_gid_col = 'assigned_gauge_id' down_mid_col = 'downstream_model_id' reason_col = 'reason' area_col = 'drain_area' order_col = 'strahler_order' +x_col = 'x_mod' +y_col = 'y_mod' # name of some files produced by the algorithm cluster_count_file = 'best-fit-cluster-count.json' @@ -30,10 +33,18 @@ table_hindcast = 'hindcast.parquet' table_hindcast_fdc = 'hindcast_fdc.parquet' table_hindcast_fdc_trans = 'hindcast_fdc_transformed.parquet' + +table_assign = 'assign_table.parquet' table_mids = 'mid_table.parquet' +table_gids = 'gid_table.parquet' +table_mid_gid_map = 'mid_gid_map_table.parquet' + table_drain = 'drain_table.parquet' table_gauge = 'gauge_table.parquet' -table_assign = 'assign_table.csv' + +table_prop_resolved = 'prop_table_resolved.parquet' +table_prop_downstream = 'prop_table_downstream.parquet' +table_prop_upstream = 'prop_table_upstream.parquet' # tables generated by the clustering functions table_cluster_metrics = 'cluster_metrics.csv' @@ -73,8 +84,8 @@ def get_table_path(workdir: str, table_name: str) -> str: Get the path to a table in the project directory by name Args: - workdir: - table_name: + workdir: path to the project directory + table_name: name of the table to find a path for Returns: Path (str) to the table @@ -88,14 +99,26 @@ def get_table_path(workdir: str, table_name: str) -> str: return os.path.join(workdir, dir_tables, table_hindcast_fdc) elif table_name == 'hindcast_fdc_trans': return os.path.join(workdir, dir_tables, table_hindcast_fdc_trans) - elif table_name == 'model_ids': - return os.path.join(workdir, dir_tables, table_mids) + + elif table_name == 'assign_table': + return os.path.join(workdir, dir_tables, table_assign) elif table_name == 'drain_table': return os.path.join(workdir, dir_tables, table_drain) elif table_name == 'gauge_table': return os.path.join(workdir, dir_tables, table_gauge) - elif table_name == 'assign_table': - return os.path.join(workdir, dir_tables, table_assign) + elif table_name == 'mid_list': + return os.path.join(workdir, dir_tables, table_mids) + elif table_name == 'gid_list': + return os.path.join(workdir, dir_tables, table_gids) + elif table_name == 'mid_gid_map': + return os.path.join(workdir, dir_tables, table_mid_gid_map) + + elif table_name == 'prop_resolved': + return os.path.join(workdir, dir_tables, table_prop_resolved) + elif table_name == 'prop_downstream': + return os.path.join(workdir, dir_tables, table_prop_downstream) + elif table_name == 'prop_upstream': + return os.path.join(workdir, dir_tables, table_prop_upstream) elif table_name == 'cluster_metrics': return os.path.join(workdir, dir_clusters, table_cluster_metrics) @@ -137,7 +160,7 @@ def read_table(workdir: str, table_name: str) -> pd.DataFrame: elif table_format == '.feather': return pd.read_feather(table_path) elif table_format == '.csv': - return pd.read_csv(table_path) + return pd.read_csv(table_path, dtype=str) else: raise ValueError(f'Unknown table format: {table_format}') diff --git a/saber/prep.py b/saber/prep.py index be87052..3fa4335 100755 --- a/saber/prep.py +++ b/saber/prep.py @@ -1,15 +1,12 @@ import geopandas as gpd -import netCDF4 as nc import numpy as np import pandas as pd from sklearn.preprocessing import StandardScaler as Scalar -from .io import mid_col -from .io import order_col -from .io import read_table from .io import write_table +from .io import mid_col -__all__ = ['gis_tables', 'hindcast'] +__all__ = ['gis_tables', 'calculate_fdc'] def gis_tables(workdir: str, gauge_gis: str = None, drain_gis: str = None) -> None: @@ -43,54 +40,35 @@ def gis_tables(workdir: str, gauge_gis: str = None, drain_gis: str = None) -> No return -def hindcast(workdir: str, hind_nc_path: str, drop_order_1: bool = False) -> None: +def calculate_fdc(workdir: str, df: pd.DataFrame, n_steps: int = 41) -> None: """ - Creates hindcast_series_table.parquet and hindcast_fdc_table.parquet in the workdir/tables directory - for the GEOGloWS hindcast data + Creates the hindcast_fdc.parquet and hindcast_fdc_transformed.parquet tables in the workdir/tables directory Args: workdir: path to the working directory for the project - hind_nc_path: path to the hindcast or historical simulation netcdf - drop_order_1: whether to drop the order 1 streams from the hindcast + df: the hindcast hydrograph data DataFrame with 1 column per stream, 1 row per timestep, string column names + containing the stream's ID, and a datetime index. E.g. the shape should be (n_timesteps, n_streams). If not + provided, the function will attempt to load the data from workdir/tables/hindcast_series_table.parquet + n_steps: the number of exceedance probabilities to estimate from 0 to 100%, inclusive. Default is 41, which + produces, 0, 2.5, 5, ..., 97.5, 100. Returns: None """ - # read the assignments table - model_ids = read_table(workdir, 'drain_table') - if drop_order_1: - model_ids = model_ids[model_ids[order_col] > 1] - model_ids = list(set(sorted(model_ids[mid_col].tolist()))) - - # read the hindcast netcdf, convert to dataframe, store as parquet - hnc = nc.Dataset(hind_nc_path) - ids = pd.Series(hnc['rivid'][:]) - ids_selector = ids.isin(model_ids) - ids = ids[ids_selector].astype(str).values.flatten() - - # save the model ids to table for reference - write_table(pd.DataFrame(ids, columns=[mid_col, ]), workdir, 'model_ids') + exceed_prob = np.linspace(100, 0, n_steps) - # save the hindcast series to parquet - df = pd.DataFrame( - hnc['Qout'][:, ids_selector], - columns=ids, - index=pd.to_datetime(hnc.variables['time'][:], unit='s') - ) - df = df[df.index.year >= 1980] - df.index.name = 'datetime' - write_table(df, workdir, 'hindcast') + # write the ID list to file + write_table(pd.DataFrame(df.columns, columns=[mid_col, ]), workdir, 'model_ids') # calculate the FDC and save to parquet - exceed_prob = np.linspace(100, 0, 41) - df = df.apply(lambda x: np.transpose(np.nanpercentile(x, exceed_prob))) - df.index = exceed_prob - df.index.name = 'exceed_prob' - write_table(df, workdir, 'hindcast_fdc') + fdc_df = df.apply(lambda x: np.transpose(np.nanpercentile(x, exceed_prob))) + fdc_df.index = exceed_prob + fdc_df.index.name = 'exceed_prob' + write_table(fdc_df, workdir, 'hindcast_fdc') # transform and prepare for clustering - df = pd.DataFrame(np.transpose(Scalar().fit_transform(np.squeeze(df.values)))) - df.index = ids - df.columns = df.columns.astype(str) - write_table(df, workdir, 'hindcast_fdc_trans') + fdc_df = pd.DataFrame(np.transpose(Scalar().fit_transform(np.squeeze(fdc_df.values)))) + fdc_df.index = df.columns + fdc_df.columns = fdc_df.columns.astype(str) + write_table(fdc_df, workdir, 'hindcast_fdc_trans') return diff --git a/setup.py b/setup.py index 1c424d9..d244237 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,9 @@ AUTHOR = 'Riley C. Hales' DESCRIPTION = 'The SABER tool for bias correcting large hydrologic models' -VERSION = '0.5.0' + +VERSION = '0.6.0' + PYTHON_REQUIRES = '>=3.10' INSTALL_REQUIRES = [ 'contextily', @@ -26,6 +28,18 @@ 'scipy' ] +TROVE_CLASSIFIERS = [ + 'Development Status :: 4 - Beta', + 'Programming Language :: Python :: 3', + 'Topic :: Scientific/Engineering', + 'Topic :: Scientific/Engineering :: Hydrology', + 'Topic :: Scientific/Engineering :: Visualization', + 'Topic :: Scientific/Engineering :: GIS', + 'Intended Audience :: Science/Research', + 'License :: OSI Approved :: BSD License', + 'Natural Language :: English', +] + setup( name='saber-hbc', packages=['saber', ], @@ -35,17 +49,7 @@ long_description_content_type="text/markdown", author=AUTHOR, license='BSD 3-Clause', - classifiers=[ - 'Development Status :: 4 - Beta', - 'Programming Language :: Python :: 3', - 'Topic :: Scientific/Engineering', - 'Topic :: Scientific/Engineering :: Hydrology', - 'Topic :: Scientific/Engineering :: Visualization', - 'Topic :: Scientific/Engineering :: GIS', - 'Intended Audience :: Science/Research', - 'License :: OSI Approved :: BSD License', - 'Natural Language :: English', - ], + classifiers=TROVE_CLASSIFIERS, python_requires=PYTHON_REQUIRES, install_requires=INSTALL_REQUIRES )