diff --git a/README.md b/README.md index 42d10cc..e4016a9 100755 --- a/README.md +++ b/README.md @@ -1,73 +1,54 @@ # Stream Analysis for Bias Estimation and Reduction ## Theory -Basins and streams will be used interchangeably to refer to the specific stream subunit. - -Large scale hydrologic models are generally not strongly physically based because of the extreme amounts of input data, -computing power, and computing time that would be required to run it. The model uses the results of other physically based -models. This is advantageous to us because we rely on the land surface model's behavior of mapping basins with similar -hydrologically important geophysical physical characteristics to similar discharge forecasts. - -This method relies on spatial analysis, machine learning, and statistical refinements. This is a post processing step which -makes it applicable to many models when the source data, code, or other inputs make the model inaccessible or impractical -to tweak. We do this in an attempt to avoid requiring the source model data or the ability to run the model. -Both of those conditions are not always available or practical when dealing with large scale models or datasets. +Large scale hydrological models are often biased despite the best calibration efforts. Bias correction is a pre- or +post-processing step applied to the model's inputs or outputs to empirically alter datasets and improve model performance. +SABER is a bias correction post processor for discharge predictions. SABER builds on frequency matching ideas used in several +other methods that match biased predictions with observed data with corresponding exceedance probabilities; that is, +using the flow duration curve. SABER expands this approach to ungauged stream reaches using spatial analysis, clustering +(machine learning), and statistical refinements and a new concept- the scalar flow duration curve. ## Python environment See requirements.txt -- python >= 3.7 -- numpy -- pandas -- geopandas (https://github.com/geopandas/geopandas) -- scikit-learn -- tslearn (https://github.com/tslearn-team/tslearn) ## Required Data/Inputs -You need the following data to follow this procedure. The instructions list Shapefile but other vector spatial data -file formats are acceptable -1. Shapefiles for the Drainage Lines (lines) and Catchments (polygons) in the watershed as used by the hydrological model. - The attribute table should contain (at least) the following entries for each feature: - - (Both) An identifier number - - (Both) The ID of the next segment/catchment downstream - - (Drainage lines, preferably both) The stream order of each segment - - (Catchments, preferably both) Cumulative upstream drainage area - - (Drainage lines) the x coordinate of the centroid of each feature - - (Drainage lines) the y coordinate of the centroid of each feature -2. Shapefile of points representing the location of each of the river gauging stations available. - The attribute table should contain (at least) the following entries for each point - - A unique ID number or name assigned to the gauge, preferably numeric. Randomly generate unique numeric ID's if they don't exist. +You need the following data to follow this procedure. Geopackage format GIS data may be substituted with Shapefile or +equivalent formats that can be read by `geopandas`. +1. Geopackage drainage Lines (usually delineated center lines) and catchments/subbasins (polygons) in the watershed. The + attribute tables for both should contain (at least) the following entries for each feature: + - An identifier column (alphanumeric) labeled `model_id` + - The ID of the next downstream reach/subbasin labeled `downstream_model_id` + - The stream order of each reach/subbasin labeled `order` + - Cumulative upstream drainage area labeled + - The x coordinate of the centroid of each feature (precalculated for faster results later) + - The y coordinate of the centroid of each feature (precalculated for faster results later) +2. Geopackage points representing the location of each of the river gauging stations available. The attribute table + should contain (at least) the following entries for each point + - A unique ID number or name assigned to the gauge, preferably short alphanumeric. You can randomly generate them if convenient - The ID of the stream segment in the model which corresponds to that gauge. -3. Shapefile of the boundary (polygon) of the target region for bias calibration. -4. The Shapefiles for the Drainage Lines, Catchments, Gauges/Stations, and boundary are all in the same projection. -5. Historical simulated discharge for each stream segment and for as long (temporally) as is available. -6. Observed discharge data for as many stream reaches as possible within the target region. -7. The units of the simulation and observation data must be in the same units. -8. A working directory on the computer where the scripts are going to be run. +3. Hindcast/historical simulation discharge for each stream reach +4. Observed discharge data for each gauge + +Things to check when preparing your data +1. The units of the simulated and observed data are in the same units +2. The GIS datasets are all in the same projection. +3. The GIS datasets should only contain gauges and reaches/subbasins within the area of interest. Clip/delete anything + else. You might find it helpful to keep a watershed boundary geopackage. ## Process ### 1 Create a Working Directory -```python -import saber as saber - -path_to_working_directory = '/my/file/path' -saber.prep.scaffold_workdir(path_to_working_directory) -``` - Your working directory should exactly like this. ``` working_directory - kmeans_models/ - kmeans_images/ + tables/ data_inputs/ - data_processed/ - gis_inputs/ + kmeans_outputs/ gis_outputs/ - validation_sets/ ``` ### 2 Prepare Spatial Data (scripts not provided) -This step instructs you to collect 3 gis files and use them to generate 2 csv tables. All 5 files (3 gis files and 2 +This step instructs you to collect 3 gis files and use them to generate 2 tables. All 5 files (3 gis files and 2 tables) should go in the `gis_inputs` directory 1. Clip model drainage lines and catchments shapefile to extents of the region of interest. @@ -109,14 +90,14 @@ gdf.to_file('/file/path/to/save', driver='GeoJSON') Your table should look like this: -| downstream_model_id | model_id | drainage_area_mod | stream_order | x | y | -|---------------------|-----------------|-------------------|--------------|-----|-----| -| unique_stream_# | unique_stream_# | area in km^2 | stream_order | ## | ## | -| unique_stream_# | unique_stream_# | area in km^2 | stream_order | ## | ## | -| unique_stream_# | unique_stream_# | area in km^2 | stream_order | ## | ## | -| ... | ... | ... | ... | ... | ... | +| downstream_model_id | model_id | model_drain_area | stream_order | x | y | +|---------------------|-----------------|------------------|--------------|-----|-----| +| unique_stream_# | unique_stream_# | area in km^2 | stream_order | ## | ## | +| unique_stream_# | unique_stream_# | area in km^2 | stream_order | ## | ## | +| unique_stream_# | unique_stream_# | area in km^2 | stream_order | ## | ## | +| ... | ... | ... | ... | ... | ... | -2. Prepare a csv of the attribute table of the gauge locations shapefile. +1. Prepare a csv of the attribute table of the gauge locations shapefile. - You need the columns: - model_id - gauge_id @@ -124,12 +105,12 @@ Your table should look like this: Your table should look like this (column order is irrelevant): -| model_id | drainage_area_obs | gauge_id | -|-------------------|-------------------|------------------| -| unique_stream_num | area in km^2 | unique_gauge_num | -| unique_stream_num | area in km^2 | unique_gauge_num | -| unique_stream_num | area in km^2 | unique_gauge_num | -| ... | ... | ... | +| model_id | gauge_drain_area | gauge_id | +|-------------------|------------------|------------------| +| unique_stream_num | area in km^2 | unique_gauge_num | +| unique_stream_num | area in km^2 | unique_gauge_num | +| unique_stream_num | area in km^2 | unique_gauge_num | +| ... | ... | ... | Your project's working directory now looks like ``` @@ -169,7 +150,7 @@ looks like this: ```python import saber as saber workdir = '/path/to/project/directory/' -saber.prep.gen_assignments_table(workdir) +saber.table.gen(workdir) ``` Your project's working directory now looks like @@ -238,17 +219,13 @@ import saber as saber workdir = '/path/to/working/directory' -saber.prep.historical_simulation( - workdir, - '/path/to/historical/simulation/netcdf.nc' # optional - if nc not stored in data_inputs folder -) -saber.prep.hist_sim_table( +saber.prep.hindcast( workdir, - '/path/to/historical/simulation/netcdf.nc' # optional - if nc not stored in data_inputs folder + '/path/to/historical/simulation/netcdf.nc' # optional - if nc not stored in data_inputs folder ) saber.prep.observed_data( workdir, - '/path/to/obs/csv/directory' # optional - if csvs not stored in workdir/data_inputs/obs_csvs + '/path/to/obs/csv/directory' # optional - if csvs not stored in workdir/data_inputs/obs_csvs ) ``` diff --git a/conda-environment.yml b/conda-environment.yml new file mode 100644 index 0000000..f7879c0 --- /dev/null +++ b/conda-environment.yml @@ -0,0 +1,20 @@ +name: saber +channels: + - conda-forge + - defaults +dependencies: + - python=3 + - contextily + - geopandas + - hydrostats + - joblib + - kneed + - matplotlib + - natsort + - netCDF4 + - numba + - numpy + - pandas + - pyarrow + - requests + - scipy diff --git a/environment.yml b/environment.yml deleted file mode 100644 index f58ed11..0000000 --- a/environment.yml +++ /dev/null @@ -1,16 +0,0 @@ -name: bias -channels: - - conda-forge -dependencies: - - python>=3 - - pandas - - geopandas - - numpy - - xarray - - tslearn - - matplotlib - - xarray - - geoglows - - requests - - plotly - - scipy \ No newline at end of file diff --git a/examples/colombia-magdalena/magdalena_example.py b/examples/colombia-magdalena/magdalena_example.py deleted file mode 100644 index 2b0bf39..0000000 --- a/examples/colombia-magdalena/magdalena_example.py +++ /dev/null @@ -1,73 +0,0 @@ -import os - -import numpy as np - -import saber - - -np.seterr(all="ignore") - -workdir = '/Users/rchales/data/regional-bias-correction/colombia-magdalena' -drain_shape = os.path.join(workdir, 'gis_inputs', 'magdalena_dl_attrname_xy.json') -gauge_shape = os.path.join(workdir, 'gis_inputs', 'ideam_stations.json') -obs_data_dir = os.path.join(workdir, 'data_inputs', 'obs_csvs') - -# Only need to do this step 1x ever -# saber.prep.scaffold_working_directory(workdir) - -# Create the gauge_table and drain_table.csv -# Scripts not provided, check readme for instructions - -# Generate the assignments table -# assign_table = saber.table.gen(workdir) -# saber.table.cache(workdir, assign_table) -# Or read the existing table -# assign_table = saber.table.read(workdir) - -# Prepare the observation and simulation data -# Only need to do this step 1x ever -# saber.prep.historical_simulation(os.path.join(workdir, 'data_simulated', 'south_america_era5_qout.nc'), workdir) -# saber.prep.observation_data(workdir) - -# Generate the clusters using the historical simulation data -# saber.cluster.generate(workdir) -# assign_table = saber.cluster.summarize(workdir, assign_table) -# saber.table.cache(workdir, assign_table) - -# Assign basins which are gauged and propagate those gauges -# assign_table = saber.assign.gauged(assign_table) -# assign_table = saber.assign.propagation(assign_table) -# assign_table = saber.assign.clusters_by_dist(assign_table) -# todo assign_table = saber.assign.clusters_by_monavg(assign_table) - -# Cache the assignments table with the updates -# saber.table.cache(workdir, assign_table) - -# Generate GIS files so you can go explore your progress graphically -# saber.gis.clip_by_assignment(workdir, assign_table, drain_shape) -# saber.gis.clip_by_cluster(workdir, assign_table, drain_shape) -# saber.gis.clip_by_unassigned(workdir, assign_table, drain_shape) - -# Compute the corrected simulation data -# assign_table = saber.table.read(workdir) -# saber.calibrate_region(workdir, assign_table) -# vtab = saber.validate.gen_val_table(workdir) -saber.gis.validation_maps(workdir, gauge_shape) -saber.analysis.plot(workdir, obs_data_dir, 9007721) - - -# import pandas as pd -# path_to_your_pickle_file = '/Users/rchales/data/regional-bias-correction/colombia-magdalena/validation_runs/90/data_processed/subset_time_series.pickle' -# a = pd.read_pickle(path_to_your_pickle_file) -# a.index = a.index.tz_localize('UTC') -# a.to_pickle(path_to_your_pickle_file) - - -# import netCDF4 as nc -# import numpy as np -# path = '/Users/rchales/data/regional-bias-correction/colombia-magdalena/validation_runs/90/calibrated_simulated_flow.nc' -# a = nc.Dataset(path, 'r+') -# a.createVariable('corrected', 'i4', ('corrected',), zlib=True, shuffle=True, fill_value=-1) -# a['corrected'][:] = np.array((0, 1)) -# a.sync() -# a.close() \ No newline at end of file diff --git a/examples/example_inputs.py b/examples/example_inputs.py deleted file mode 100644 index 4163d24..0000000 --- a/examples/example_inputs.py +++ /dev/null @@ -1,16 +0,0 @@ -import os - - -# COLOMBIA -workdir = '/Users/rchales/data/saber/colombia-magdalena' -drain_shape = os.path.join(workdir, 'gis_inputs', 'magdalena_dl_attrname_xy.json') -gauge_shape = os.path.join(workdir, 'gis_inputs', 'ideam_stations.json') -obs_data_dir = os.path.join(workdir, 'data_inputs', 'obs_csvs') -hist_sim_nc = os.path.join(workdir, 'data_inputs', 'south_america_era5_qout.nc') - -# TEXAS -workdir = '/Users/rchales/data/saber/texas' -drain_shape = os.path.join(workdir, 'shapefiles', 'texas-dl.json') -gauge_shape = os.path.join(workdir, 'shapefiles', 'texas-gauges.shp') -obs_data_dir = os.path.join(workdir, 'data_inputs', 'obs_csvs') -hist_sim_nc = os.path.join(workdir, 'data_inputs', 'Qout_era5_t640_24hr_19790101to20210630.nc.nc') diff --git a/examples/example_script.py b/examples/example_script.py deleted file mode 100644 index bedb6a4..0000000 --- a/examples/example_script.py +++ /dev/null @@ -1,70 +0,0 @@ -import os - -import numpy as np - -import saber - - -np.seterr(all="ignore") - -# workdir = '' -# drain_shape = os.path.join(workdir, 'gis_inputs', '') -# gauge_shape = '' -# obs_data_dir = '' -# hist_sim_nc = '' - -# COLOMBIA -# workdir = '/Users/rchales/data/saber/colombia-magdalena' -# drain_shape = os.path.join(workdir, 'gis_inputs', 'magdalena_dl_attrname_xy.json') -# gauge_shape = os.path.join(workdir, 'gis_inputs', 'ideam_stations.json') -# obs_data_dir = os.path.join(workdir, 'data_inputs', 'obs_csvs') -# hist_sim_nc = os.path.join(workdir, 'data_inputs', 'south_america_era5_qout.nc') - - -# Prepare the working directory - only need to do this step 1x ever -# saber.prep.scaffold_working_directory(workdir) -# Scripts not provided. Consult README.md for instructions -# Create the gauge_table.csv and drain_table.csv -# Put the historical simulation netCDF in the right folder -# Put the observed data csv files in the data_inputs/obs_csvs folder - -# Prepare the observation and simulation data - Only need to do this step 1x ever -print('Preparing data') -saber.prep.historical_simulation(workdir) - -# Generate the assignments table -print('Generate Assignment Table') -assign_table = saber.table.gen(workdir) -saber.table.cache(workdir, assign_table) - -# Generate the clusters using the historical simulation data -print('Generate Clusters') -saber.cluster.generate(workdir) -assign_table = saber.cluster.summarize(workdir, assign_table) -saber.table.cache(workdir, assign_table) - -# Assign basins which are gauged and propagate those gauges -print('Making Assignments') -assign_table = saber.assign.gauged(assign_table) -assign_table = saber.assign.propagation(assign_table) -assign_table = saber.assign.clusters_by_dist(assign_table) - -# Cache the assignments table with the updates -saber.table.cache(workdir, assign_table) - -# Generate GIS files so you can go explore your progress graphically -print('Generate GIS files') -saber.gis.clip_by_assignment(workdir, assign_table, drain_shape) -saber.gis.clip_by_cluster(workdir, assign_table, drain_shape) -saber.gis.clip_by_unassigned(workdir, assign_table, drain_shape) - -# Compute the corrected simulation data -print('Starting Calibration') -saber.calibrate_region(workdir, assign_table) - -# run the validation study -print('Performing Validation') -saber.validate.sample_gauges(workdir, overwrite=True) -saber.validate.run_series(workdir, drain_shape, obs_data_dir) -vtab = saber.validate.gen_val_table(workdir) -saber.gis.validation_maps(workdir, gauge_shape, vtab) diff --git a/examples/saber_example.py b/examples/saber_example.py new file mode 100644 index 0000000..843f349 --- /dev/null +++ b/examples/saber_example.py @@ -0,0 +1,43 @@ +import os + +import numpy as np +import saber + + +np.seterr(all="ignore") + +workdir = '' +hist_sim_nc = os.path.join(workdir, '') +obs_data_dir = os.path.join(workdir, '') +drain_gis = os.path.join(workdir, '') +gauge_gis = os.path.join(workdir, '') + +print('Prepare inputs') +saber.prep.gis_tables(workdir, drain_gis=drain_gis, gauge_gis=gauge_gis) +saber.prep.hindcast(workdir) + +# Generate the assignments table +print('Generate Assignment Table') +assign_table = saber.table.gen(workdir) +saber.table.cache(workdir, assign_table) + +# Generate clusters using the historical simulation data +print('Generate Clusters') +saber.cluster.generate(workdir) +saber.cluster.plot(workdir) + +# Assign basins which are gauged and propagate those gauges +print('Making Assignments') +assign_table = saber.table.merge_clusters(workdir, assign_table, n_clusters=5) +assign_table = saber.table.assign_gauged(assign_table) +assign_table = saber.table.assign_propagation(assign_table) +assign_table = saber.table.assign_by_distance(assign_table) + +# Cache the assignments table with the updates +saber.table.cache(workdir, assign_table) + +# Generate GIS files to explore your progress graphically +print('Generate GIS files') +saber.gis.clip_by_assignment(workdir, assign_table, drain_gis) +saber.gis.clip_by_cluster(workdir, assign_table, drain_gis) +saber.gis.clip_by_unassigned(workdir, assign_table, drain_gis) diff --git a/requirements.txt b/requirements.txt index 4c71968..16193a7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,17 +1,14 @@ -pandas +contextily geopandas -numpy -xarray -netCDF4 -tslearn +hydrostats +joblib +kneed matplotlib -geoglows -grids +natsort +netCDF4 +numba +numpy +pandas +pyarrow requests -plotly -h5py scipy -hydrostats -kneed -hydrostats -matplotlib \ No newline at end of file diff --git a/saber/__init__.py b/saber/__init__.py index 6b24826..d9eeb2c 100644 --- a/saber/__init__.py +++ b/saber/__init__.py @@ -1,19 +1,18 @@ -from saber._workflow import prep_region, analyze_region -from saber._calibrate import calibrate_stream, calibrate_region - -import saber.table -import saber.prep -import saber.cluster import saber.assign +import saber.cluster import saber.gis -import saber.utils +import saber.io +import saber.prep import saber.validate -import saber.analysis +from saber._calibrate import calibrate, calibrate_region +__all__ = [ + # individual functions + 'calibrate', 'calibrate_region', + # modules + 'io', 'prep', 'cluster', 'assign', 'gis', 'validate', +] -__all__ = ['prep_region', 'analyze_region', - 'calibrate_stream', 'calibrate_region', - 'table', 'prep', 'assign', 'gis', 'cluster', 'utils', 'validate', 'analysis'] __author__ = 'Riley Hales' -__version__ = '0.3.0' +__version__ = '0.4.0' __license__ = 'BSD 3 Clause Clear' diff --git a/saber/_calibrate.py b/saber/_calibrate.py index 2172509..9a1aa08 100644 --- a/saber/_calibrate.py +++ b/saber/_calibrate.py @@ -9,140 +9,279 @@ import pandas as pd from scipy import interpolate, stats -from .utils import solve_gumbel1, compute_fdc, compute_scalar_fdc -from ._vocab import mid_col -from ._vocab import asgn_mid_col -from ._vocab import asgn_gid_col -from ._vocab import metric_list -from ._vocab import metric_nc_name_list -from ._vocab import sim_ts_pickle -from ._vocab import cal_nc_name - - -def calibrate_stream(sim_flow_a: pd.DataFrame, obs_flow_a: pd.DataFrame, sim_flow_b: pd.DataFrame, - fix_seasonally: bool = True, - drop_outliers: bool = False, outlier_threshold: int or float = 2.5, - filter_scalar_fdc: bool = False, filter_range: tuple = (0, 80), - extrapolate_method: str = 'nearest', fill_value: int or float = None, - fit_gumbel: bool = False, gumbel_range: tuple = (25, 75), ) -> pd.DataFrame: +from .io import asgn_gid_col +from .io import asgn_mid_col +from .io import cal_nc_name +from .io import metric_list +from .io import metric_nc_name_list +from .io import mid_col +from .io import table_hindcast + + +def calibrate(sim_flow_a: pd.DataFrame, obs_flow_a: pd.DataFrame, sim_flow_b: pd.DataFrame = None, + fix_seasonally: bool = True, empty_months: str = 'skip', + drop_outliers: bool = False, outlier_threshold: int or float = 2.5, + filter_scalar_fdc: bool = False, filter_range: tuple = (0, 80), + extrapolate: str = 'nearest', fill_value: int or float = None, + fit_gumbel: bool = False, fit_range: tuple = (10, 90), ) -> pd.DataFrame: """ - Given the simulated and observed stream flow at location a, attempts to the remove the bias from simulated - stream flow at point b. This + Removes the bias from simulated discharge using the SABER method. + + Given simulated and observed discharge at location A, removes bias from simulated data at point A. + Given simulated and observed discharge at location A, removes bias from simulated data at point B, if given B. Args: - sim_flow_a (pd.DataFrame): simulated hydrograph at point A - obs_flow_a (pd.DataFrame): observed hydrograph at point A - sim_flow_b (pd.DataFrame): simulated hydrograph at point B + sim_flow_a (pd.DataFrame): simulated hydrograph at point A. should contain a datetime index with daily values + and a single column of discharge values. + obs_flow_a (pd.DataFrame): observed hydrograph at point A. should contain a datetime index with daily values + and a single column of discharge values. + sim_flow_b (pd.DataFrame): (optional) simulated hydrograph at point B to correct using scalar flow duration + curve mapping and the bias relationship at point A. should contain a datetime index with daily values + and a single column of discharge values. + fix_seasonally (bool): fix on a monthly (True) or annual (False) basis + empty_months (str): how to handle months in the simulated data where no observed data are available. Options: + "skip": ignore simulated data for months without + drop_outliers (bool): flag to exclude outliers outlier_threshold (int or float): number of std deviations from mean to exclude from flow duration curve - filter_scalar_fdc (bool): - filter_range (tuple): - extrapolate_method (bool): - fill_value (int or float): - fit_gumbel (bool): - gumbel_range (tuple): + + filter_scalar_fdc (bool): flag to filter the scalar flow duration curve + filter_range (tuple): lower and upper bounds of the filter range + + extrapolate (str): method to use for extrapolation. Options: nearest, const, linear, average, max, min + fill_value (int or float): value to use for extrapolation when extrapolate_method='const' + + fit_gumbel (bool): flag to replace extremely low/high corrected flows with values from Gumbel type 1 + fit_range (tuple): lower and upper bounds of exceedance probabilities to replace with Gumbel values Returns: - pd.DataFrame of the + pd.DataFrame with a DateTime index and columns with corrected flow, uncorrected flow, the scalar adjustment + factor applied to correct the discharge, and the percentile of the uncorrected flow (in the seasonal grouping, + if applicable). """ + if sim_flow_b is None: + sim_flow_b = sim_flow_a.copy() if fix_seasonally: # list of the unique months in the historical simulation. should always be 1->12 but just in case... monthly_results = [] for month in sorted(set(sim_flow_a.index.strftime('%m'))): - # filter data to only be current iteration's month - mon_sim_data = sim_flow_a[sim_flow_a.index.month == int(month)].dropna() + # filter data to current iteration's month mon_obs_data = obs_flow_a[obs_flow_a.index.month == int(month)].dropna() + + if mon_obs_data.empty: + if empty_months == 'skip': + continue + else: + raise ValueError(f'Invalid value for argument "empty_months". Given: {empty_months}.') + + mon_sim_data = sim_flow_a[sim_flow_a.index.month == int(month)].dropna() mon_cor_data = sim_flow_b[sim_flow_b.index.month == int(month)].dropna() - monthly_results.append(calibrate_stream( + monthly_results.append(calibrate( mon_sim_data, mon_obs_data, mon_cor_data, - fix_seasonally=False, + fix_seasonally=False, empty_months=empty_months, drop_outliers=drop_outliers, outlier_threshold=outlier_threshold, filter_scalar_fdc=filter_scalar_fdc, filter_range=filter_range, - extrapolate_method=extrapolate_method, fill_value=fill_value, - fit_gumbel=fit_gumbel, gumbel_range=gumbel_range, ) + extrapolate=extrapolate, fill_value=fill_value, + fit_gumbel=fit_gumbel, fit_range=fit_range, ) ) # combine the results from each monthly into a single dataframe (sorted chronologically) and return it return pd.concat(monthly_results).sort_index() - # compute the fdc for paired sim/obs data and compute scalar fdc, either with or without outliers + # compute the flow duration curves if drop_outliers: - # drop outlier data - # https://stackoverflow.com/questions/23199796/detect-and-exclude-outliers-in-pandas-data-frame - sim_fdc = compute_fdc( - sim_flow_a[(np.abs(stats.zscore(sim_flow_a)) < outlier_threshold).all(axis=1)]) - obs_fdc = compute_fdc( - obs_flow_a[(np.abs(stats.zscore(obs_flow_a)) < outlier_threshold).all(axis=1)]) + sim_fdc_a = calc_fdc(_drop_outliers_by_zscore(sim_flow_a, threshold=outlier_threshold), col_name='Q_sim') + sim_fdc_b = calc_fdc(_drop_outliers_by_zscore(sim_flow_b, threshold=outlier_threshold), col_name='Q_sim') + obs_fdc = calc_fdc(_drop_outliers_by_zscore(obs_flow_a, threshold=outlier_threshold), col_name='Q_obs') else: - sim_fdc = compute_fdc(sim_flow_a) - obs_fdc = compute_fdc(obs_flow_a) - - scalar_fdc = compute_scalar_fdc(obs_fdc['flow'].values.flatten(), sim_fdc['flow'].values.flatten()) + sim_fdc_a = calc_fdc(sim_flow_a, col_name='Q_sim') + sim_fdc_b = calc_fdc(sim_flow_b, col_name='Q_sim') + obs_fdc = calc_fdc(obs_flow_a, col_name='Q_obs') + # calculate the scalar flow duration curve (at point A with simulated and observed data) + scalar_fdc = calc_sfdc(sim_fdc_a['flow'].values.flatten(), obs_fdc['flow'].values.flatten()) if filter_scalar_fdc: - scalar_fdc = scalar_fdc[scalar_fdc['Exceedance Probability'] >= filter_range[0]] - scalar_fdc = scalar_fdc[scalar_fdc['Exceedance Probability'] <= filter_range[1]] - - # Convert the percentiles - if extrapolate_method == 'nearest': - to_scalar = interpolate.interp1d(scalar_fdc.values[:, 0], scalar_fdc.values[:, 1], - fill_value='extrapolate', kind='nearest') - elif extrapolate_method == 'value': - to_scalar = interpolate.interp1d(scalar_fdc.values[:, 0], scalar_fdc.values[:, 1], - fill_value=fill_value, bounds_error=False) - elif extrapolate_method == 'linear': - to_scalar = interpolate.interp1d(scalar_fdc.values[:, 0], scalar_fdc.values[:, 1], - fill_value='extrapolate') - elif extrapolate_method == 'average': - to_scalar = interpolate.interp1d(scalar_fdc.values[:, 0], scalar_fdc.values[:, 1], - fill_value=np.mean(scalar_fdc.values[:, 1]), bounds_error=False) - elif extrapolate_method == 'max' or extrapolate_method == 'maximum': - to_scalar = interpolate.interp1d(scalar_fdc.values[:, 0], scalar_fdc.values[:, 1], - fill_value=np.max(scalar_fdc.values[:, 1]), bounds_error=False) - elif extrapolate_method == 'min' or extrapolate_method == 'minimum': - to_scalar = interpolate.interp1d(scalar_fdc.values[:, 0], scalar_fdc.values[:, 1], - fill_value=np.min(scalar_fdc.values[:, 1]), bounds_error=False) + scalar_fdc = scalar_fdc[scalar_fdc['p_exceed'].between(filter_range[0], filter_range[1])] + + # make interpolators: Q_b -> p_exceed, p_exceed -> scalars_a + # flow at B converted to exceedance probabilities, then matched with the scalar computed at point A + flow_to_percent = _make_interpolator(sim_fdc_b.values, + sim_fdc_b.index, + extrap=extrapolate, + fill_value=fill_value) + + percent_to_scalar = _make_interpolator(scalar_fdc.index, + scalar_fdc.values, + extrap=extrapolate, + fill_value=fill_value) + + # apply interpolators to correct flows at B with data from A + qb_original = sim_flow_b.values.flatten() + p_exceed = flow_to_percent(qb_original) + scalars = percent_to_scalar(p_exceed) + qb_adjusted = qb_original / scalars + + if fit_gumbel: + qb_adjusted = _fit_extreme_values_to_gumbel(qb_adjusted, p_exceed, fit_range) + + return pd.DataFrame(data=np.transpose([qb_adjusted, qb_original, scalars, p_exceed]), + index=sim_flow_b.index.to_list(), + columns=('Q_adjusted', 'Q_original', 'scalars', 'p_exceed')) + + +def calc_fdc(flows: np.array, steps: int = 201, col_name: str = 'Q') -> pd.DataFrame: + """ + Compute flow duration curve (exceedance probabilities) from a list of flows + + Args: + flows: array of flows + steps: number of steps (exceedance probabilities) to use in the FDC + col_name: name of the column in the returned dataframe + + Returns: + pd.DataFrame with index 'p_exceed' and columns 'Q' (or col_name) + """ + # calculate the FDC and save to parquet + exceed_prob = np.linspace(100, 0, steps) + fdc_flows = np.nanpercentile(flows, exceed_prob) + df = pd.DataFrame(fdc_flows, columns=[col_name, ], index=exceed_prob) + df.index.name = 'p_exceed' + return df + + +def calc_sfdc(sim_fdc: pd.DataFrame, obs_fdc: pd.DataFrame) -> pd.DataFrame: + """ + Compute the scalar flow duration curve (exceedance probabilities) from two flow duration curves + + Args: + sim_fdc: simulated flow duration curve + obs_fdc: observed flow duration curve + + Returns: + pd.DataFrame with index 'p_exceed' and columns 'Q' + """ + scalars_df = pd.DataFrame(np.divide(sim_fdc.values, obs_fdc.values), columns=['scalars'], index=sim_fdc.index) + scalars_df.replace(np.inf, np.nan, inplace=True) + scalars_df.dropna(inplace=True) + return scalars_df + + +def _drop_outliers_by_zscore(df: pd.DataFrame, threshold: float = 3) -> pd.DataFrame: + """ + Drop outliers from a dataframe by their z-score and a threshold + Based on https://stackoverflow.com/questions/23199796/detect-and-exclude-outliers-in-pandas-data-frame + Args: + df: dataframe to drop outliers from + threshold: z-score threshold + + Returns: + pd.DataFrame with outliers removed + """ + return df[(np.abs(stats.zscore(df)) < threshold).all(axis=1)] + + +def _filter_sfdc(sfdc: pd.DataFrame, filter_range: list) -> pd.DataFrame: + """ + Filter the scalar flow duration curve by the specified range + + Args: + sfdc: scalar flow duration curve DataFrame + filter_range: list of [lower_bound: int or float, upper_bound: int or float] + + Returns: + pd.DataFrame: filtered scalar flow duration curve + """ + return sfdc[np.logical_and(sfdc.index > filter_range[0], sfdc.index < filter_range[1])] + + +def _make_interpolator(x: np.array, y: np.array, extrap: str = 'nearest', + fill_value: int or float = None) -> interpolate.interp1d: + """ + Make an interpolator from two arrays + + Args: + x: x values + y: y values + extrap: method for extrapolation: nearest, const, linear, average, max, min + fill_value: value to use when extrap='const' + + Returns: + interpolate.interp1d + """ + # make interpolator which converts the percentiles to scalars + if extrap == 'nearest': + return interpolate.interp1d(x, y, fill_value='extrapolate', kind='nearest') + elif extrap == 'const': + if fill_value is None: + raise ValueError('Must provide the const kwarg when extrap_method="const"') + return interpolate.interp1d(x, y, fill_value=fill_value, bounds_error=False) + elif extrap == 'linear': + return interpolate.interp1d(x, y, fill_value='extrapolate') + elif extrap == 'average': + return interpolate.interp1d(x, y, fill_value=np.mean(y), bounds_error=False) + elif extrap == 'max' or extrap == 'maximum': + return interpolate.interp1d(x, y, fill_value=np.max(y), bounds_error=False) + elif extrap == 'min' or extrap == 'minimum': + return interpolate.interp1d(x, y, fill_value=np.min(y), bounds_error=False) else: raise ValueError('Invalid extrapolation method provided') - # determine the percentile of each uncorrected flow using the monthly fdc - values = sim_flow_b.values.flatten() - percentiles = [stats.percentileofscore(values, a) for a in values] - scalars = to_scalar(percentiles) - values = values * scalars - if fit_gumbel: - tmp = pd.DataFrame(np.transpose([values, percentiles]), columns=('q', 'p')) +def _solve_gumbel1(std, xbar, rp): + """ + Solves the Gumbel Type I pdf = exp(-exp(-b)) + + Args: + std: standard deviation of the dataset + xbar: mean of the dataset + rp: return period to calculate in years - # compute the average and standard deviation except for scaled data outside the percentile range specified - mid = tmp[tmp['p'] > gumbel_range[0]] - mid = mid[mid['p'] < gumbel_range[1]] - xbar = statistics.mean(mid['q'].tolist()) - std = statistics.stdev(mid['q'].tolist(), xbar) + Returns: + float: discharge value + """ + return -np.log(-np.log(1 - (1 / rp))) * std * .7797 + xbar - (.45 * std) - q = [] - for p in tmp[tmp['p'] <= gumbel_range[0]]['p'].tolist(): - q.append(solve_gumbel1(std, xbar, 1 / (1 - (p / 100)))) - tmp.loc[tmp['p'] <= gumbel_range[0], 'q'] = q - q = [] - for p in tmp[tmp['p'] >= gumbel_range[1]]['p'].tolist(): - if p >= 100: - p = 99.999 - q.append(solve_gumbel1(std, xbar, 1 / (1 - (p / 100)))) - tmp.loc[tmp['p'] >= gumbel_range[1], 'q'] = q +def _fit_extreme_values_to_gumbel(q_adjust: np.array, p_exceed: np.array, fit_range: tuple = None) -> np.array: + """ + Replace the extreme values from the corrected data with values based on the gumbel distribution - values = tmp['q'].values + Args: + q_adjust: adjusted flows to be refined + p_exceed: exceedance probabilities of the adjusted flows + fit_range: range of exceedance probabilities to fit to the Gumbel distribution - return pd.DataFrame(data=np.transpose([values, scalars, percentiles]), - index=sim_flow_b.index.to_list(), - columns=('flow', 'scalars', 'percentile')) + Returns: + np.array of the flows with the extreme values replaced + """ + all_values = pd.DataFrame(np.transpose([q_adjust, p_exceed]), columns=('q', 'p')) + # compute the average and standard deviation for the values within the user specified fit_range + mid_vals = all_values.copy() + mid_vals = mid_vals[np.logical_and(mid_vals['p'] > fit_range[0], mid_vals['p'] < fit_range[1])] + xbar = statistics.mean(mid_vals['q'].values) + std = statistics.stdev(mid_vals['q'].values, xbar) + + # todo check that this is correct + q = [] + for p in mid_vals[mid_vals['p'] <= fit_range[0]]['p'].tolist(): + q.append(_solve_gumbel1(std, xbar, 1 / (1 - (p / 100)))) + mid_vals.loc[mid_vals['p'] <= fit_range[0], 'q'] = q + + q = [] + for p in mid_vals[mid_vals['p'] >= fit_range[1]]['p'].tolist(): + if p >= 100: + p = 99.999 + q.append(_solve_gumbel1(std, xbar, 1 / (1 - (p / 100)))) + mid_vals.loc[mid_vals['p'] >= fit_range[1], 'q'] = q + + qb_adjusted = mid_vals['q'].values + return qb_adjusted def calibrate_region(workdir: str, assign_table: pd.DataFrame, gauge_table: pd.DataFrame = None, obs_data_dir: str = None) -> None: """ - Creates a netCDF with the + Creates a netCDF of all corrected flows in a region. Args: workdir: path to project working directory @@ -153,13 +292,14 @@ def calibrate_region(workdir: str, assign_table: pd.DataFrame, Returns: None """ + # todo create a parquet instead of a netcdf if gauge_table is None: gauge_table = pd.read_csv(os.path.join(workdir, 'gis_inputs', 'gauge_table.csv')) if obs_data_dir is None: obs_data_dir = os.path.join(workdir, 'data_inputs', 'obs_csvs') bcs_nc_path = os.path.join(workdir, cal_nc_name) - ts = pd.read_pickle(os.path.join(workdir, 'data_processed', sim_ts_pickle)) + ts = pd.read_pickle(os.path.join(workdir, 'data_processed', table_hindcast)) # create the new netcdf bcs_nc = nc.Dataset(bcs_nc_path, 'w') @@ -238,7 +378,7 @@ def calibrate_region(workdir: str, assign_table: pd.DataFrame, continue try: - calibrated_df = calibrate_stream(ts[[asgn_mid]], obs_df, ts[[model_id]], ) + calibrated_df = calibrate(ts[[asgn_mid]], obs_df, ts[[model_id]], ) c_array[:, idx] = calibrated_df['flow'].values p_array[:, idx] = calibrated_df['percentile'].values s_array[:, idx] = calibrated_df['scalars'].values diff --git a/saber/_propagation.py b/saber/_propagation.py index 5c558f5..b738a95 100644 --- a/saber/_propagation.py +++ b/saber/_propagation.py @@ -1,11 +1,11 @@ import pandas as pd -from ._vocab import mid_col -from ._vocab import down_mid_col -from ._vocab import order_col -from ._vocab import asgn_mid_col -from ._vocab import asgn_gid_col -from ._vocab import reason_col +from .io import asgn_gid_col +from .io import asgn_mid_col +from .io import down_mid_col +from .io import mid_col +from .io import order_col +from .io import reason_col def walk_downstream(df: pd.DataFrame, start_id: int, same_order: bool = True, outlet_next_id: str or int = -1) -> tuple: @@ -71,7 +71,8 @@ def walk_upstream(df: pd.DataFrame, start_id: int, same_order: bool = True) -> t return tuple(set(upstream_ids)) -def propagate_in_table(df: pd.DataFrame, start_mid: int, start_gid: int, connected: tuple, max_prop: int, direction: str): +def propagate_in_table(df: pd.DataFrame, start_mid: int, start_gid: int, connected: tuple, max_prop: int, + direction: str): """ Args: diff --git a/saber/_vocab.py b/saber/_vocab.py deleted file mode 100644 index 29a077d..0000000 --- a/saber/_vocab.py +++ /dev/null @@ -1,18 +0,0 @@ -# assign table and gis_input file required column names -mid_col = 'model_id' -gid_col = 'gauge_id' -asgn_mid_col = 'assigned_model_id' -asgn_gid_col = 'assigned_gauge_id' -down_mid_col = 'downstream_model_id' -reason_col = 'reason' -area_col = 'drain_area' -order_col = 'stream_order' - -# name of some files produced by the algorithm -cluster_count_file = 'best-fit-cluster-count.json' -cal_nc_name = 'calibrated_simulated_flow.nc' -sim_ts_pickle = 'sim_time_series.pickle' - -# metrics computed on validation sets -metric_list = ['ME', 'MAE', 'RMSE', 'NRMSE (Mean)', 'MAPE', 'NSE', 'KGE (2012)'] -metric_nc_name_list = ['ME', 'MAE', 'RMSE', 'NRMSE', 'MAPE', 'NSE', 'KGE2012'] diff --git a/saber/_workflow.py b/saber/_workflow.py deleted file mode 100644 index d313aba..0000000 --- a/saber/_workflow.py +++ /dev/null @@ -1,49 +0,0 @@ -import pandas as pd - -from .prep import historical_simulation - -from .table import gen, cache -from .cluster import generate, summarize -from .assign import gauged, propagation, clusters_by_dist -from .gis import clip_by_assignment, clip_by_cluster, clip_by_unassigned -from ._calibrate import calibrate_region - - -def prep_region(workdir: str) -> None: - historical_simulation(workdir) - # observed_data(workdir) - return - - -def analyze_region(workdir: str, drain_shape: str, gauge_table: pd.DataFrame = None, obs_data_dir: str = None) -> None: - # Generate the assignments table - print("gen assign_table") - assign_table = gen(workdir) - cache(workdir, assign_table) - - # Generate the clusters using the historical simulation data - print("working on clustering") - generate(workdir) - assign_table = summarize(workdir, assign_table) - cache(workdir, assign_table) - - # Assign basins which are gauged and propagate those gauges - print("make assignments") - assign_table = gauged(assign_table) - assign_table = propagation(assign_table) - assign_table = clusters_by_dist(assign_table) - - # Cache the assignments table with the updates - print("cache table results") - cache(workdir, assign_table) - - # Generate GIS files so you can go explore your progress graphically - print("generate gis files") - clip_by_assignment(workdir, assign_table, drain_shape) - clip_by_cluster(workdir, assign_table, drain_shape) - clip_by_unassigned(workdir, assign_table, drain_shape) - - # Compute the corrected simulation data - print("calibrating region netcdf") - calibrate_region(workdir, assign_table, gauge_table, obs_data_dir) - return diff --git a/saber/analysis.py b/saber/analysis.py deleted file mode 100644 index b6ada4a..0000000 --- a/saber/analysis.py +++ /dev/null @@ -1,73 +0,0 @@ -import os - -import grids -import pandas as pd -import plotly.graph_objects as go - - -def plot(workdir, obs_data_dir, model_id) -> go.Figure: - files = [os.path.join(workdir, 'calibrated_simulated_flow.nc'), ] - variables = ('flow_sim', 'flow_bc') - dim_order = ('time', 'model_id') - a = grids.TimeSeries(files, variables, dim_order) - ts = a.point(None, model_id) - obs = pd.read_csv(os.path.join(obs_data_dir, )) - a = go.Figure( - [ - go.Scatter( - name='Original Simulation', - x=ts.index, - y=ts['flow_sim'] - ), - go.Scatter( - name='Adjusted Simulation', - x=ts.index, - y=ts['flow_bc'] - ), - go.Scatter( - name='Adjusted Simulation', - x=ts.index, - y=ts['flow_bc'] - ) - ] - ) - a.show() - return - - -def plot_results(sim, obs, bc, bcp, title): - sim_dates = sim.index.tolist() - scatters = [ - go.Scatter( - name='Propagated Corrected (Experimental)', - x=bcp.index.tolist(), - y=bcp['Propagated Corrected Streamflow'].values.flatten(), - ), - go.Scatter( - name='Bias Corrected (Jorges Method)', - x=sim_dates, - y=bc.values.flatten(), - ), - go.Scatter( - name='Simulated (ERA 5)', - x=sim_dates, - y=sim.values.flatten(), - ), - go.Scatter( - name='Observed', - x=obs.index.tolist(), - y=obs.values.flatten(), - ), - go.Scatter( - name='Percentile', - x=sim_dates, - y=bcp['Percentile'].values.flatten(), - ), - go.Scatter( - name='Scalar', - x=sim_dates, - y=bcp['Scalars'].values.flatten(), - ), - ] - a = go.Figure(scatters, layout={'title': title}) - return a diff --git a/saber/assign.py b/saber/assign.py index 54428b7..5017d26 100644 --- a/saber/assign.py +++ b/saber/assign.py @@ -1,19 +1,78 @@ +import os + +import joblib import numpy as np import pandas as pd +from ._propagation import propagate_in_table from ._propagation import walk_downstream from ._propagation import walk_upstream -from ._propagation import propagate_in_table +from .io import asgn_gid_col +from .io import asgn_mid_col +from .io import gid_col +from .io import mid_col +from .io import order_col +from .io import read_table +from .io import reason_col +from .io import write_table + +__all__ = ['gen', 'merge_clusters', 'assign_gauged', 'assign_propagation', 'assign_by_distance', ] + -from ._vocab import mid_col -from ._vocab import gid_col -from ._vocab import asgn_mid_col -from ._vocab import asgn_gid_col -from ._vocab import reason_col -from ._vocab import order_col +def gen(workdir: str, cache: bool = True) -> pd.DataFrame: + """ + Joins the drain_table.csv and gauge_table.csv to create the assign_table.csv + + Args: + workdir: path to the working directory + cache: whether to cache the assign table immediately + Returns: + None + """ + # read and merge the tables + assign_table = pd.merge( + read_table(workdir, 'drain_table'), + read_table(workdir, 'gauge_table'), + on=mid_col, + how='outer' + ) -def gauged(df: pd.DataFrame) -> pd.DataFrame: + # create the new columns + assign_table[asgn_mid_col] = np.nan + assign_table[asgn_gid_col] = np.nan + assign_table[reason_col] = np.nan + + if cache: + write_table(assign_table, workdir, 'assign_table') + + return assign_table + + +def merge_clusters(workdir: str, assign_table: pd.DataFrame, n_clusters: int = None) -> pd.DataFrame: + """ + Creates a csv listing the streams assigned to each cluster in workdir/kmeans_models and also adds that information + to assign_table.csv + + Args: + workdir: path to the project directory + assign_table: the assignment table DataFrame + n_clusters: number of clusters to use when applying the labels to the assign_table + + Returns: + None + """ + # create a dataframe with the optimal model's labels and the model_id's + df = pd.DataFrame({ + 'cluster': joblib.load(os.path.join(workdir, 'clusters', f'kmeans-{n_clusters}.pickle')).labels_, + mid_col: read_table(workdir, 'model_ids').values.flatten() + }, dtype=str) + + # merge the dataframes + return assign_table.merge(df, how='outer', on=mid_col) + + +def assign_gauged(df: pd.DataFrame) -> pd.DataFrame: """ Assigns basins a gauge for correction which contain a gauge @@ -24,13 +83,14 @@ def gauged(df: pd.DataFrame) -> pd.DataFrame: Copy of df with assignments made """ _df = df.copy() - _df.loc[~_df[gid_col].isna(), asgn_mid_col] = _df[mid_col] - _df.loc[~_df[gid_col].isna(), asgn_gid_col] = _df[gid_col] - _df.loc[~_df[gid_col].isna(), reason_col] = 'gauged' + selector = ~_df[gid_col].isna() + _df.loc[selector, asgn_mid_col] = _df[mid_col] + _df.loc[selector, asgn_gid_col] = _df[gid_col] + _df.loc[selector, reason_col] = 'gauged' return _df -def propagation(df: pd.DataFrame, max_prop: int = 5) -> pd.DataFrame: +def assign_propagation(df: pd.DataFrame, max_prop: int = 5) -> pd.DataFrame: """ Args: @@ -54,27 +114,7 @@ def propagation(df: pd.DataFrame, max_prop: int = 5) -> pd.DataFrame: return _df -def clusters_by_monavg(df: pd.DataFrame) -> pd.DataFrame: - """ - Assigns all possible ungauged basins a gauge that is - (1) of the same stream order - (2) in the same simulated fdc cluster - (3) in the same simulated monavg cluster (monthly averages) - (4) closest in total drainage area - - This requires matching 2 clusters. Basins in both of the same groups have high likelihood of behaving similarly. - - Args: - df: the assignments table dataframe - - Returns: - Copy of df with assignments made - """ - _df = df.copy() - return _df - - -def clusters_by_dist(df: pd.DataFrame) -> pd.DataFrame: +def assign_by_distance(df: pd.DataFrame) -> pd.DataFrame: """ Assigns all possible ungauged basins a gauge that is (1) is closer than any other gauge @@ -87,7 +127,6 @@ def clusters_by_dist(df: pd.DataFrame) -> pd.DataFrame: Returns: Copy of df with assignments made """ - # todo assign based on closest upstream drainage area?? _df = df.copy() # first filter by cluster number for c_num in sorted(set(_df['sim-fdc-cluster'].values)): diff --git a/saber/cluster.py b/saber/cluster.py index 122e78b..81f8109 100755 --- a/saber/cluster.py +++ b/saber/cluster.py @@ -1,116 +1,241 @@ import glob +import json import math import os -import json +from collections.abc import Iterable +import joblib import kneed +import matplotlib.cm as cm import matplotlib.pyplot as plt import numpy as np import pandas as pd -from tslearn.clustering import TimeSeriesKMeans -from tslearn.preprocessing import TimeSeriesScalerMeanVariance +from natsort import natsorted +from sklearn.cluster import KMeans +from sklearn.metrics import silhouette_samples + +from .io import cluster_count_file +from .io import read_table -from ._vocab import cluster_count_file -from ._vocab import mid_col -from ._vocab import gid_col +__all__ = ['generate', 'summarize', 'plot_clusters', 'plot_silhouette'] -def generate(workdir: str) -> None: +def generate(workdir: str, max_clusters: int = 12) -> None: """ - Creates trained kmeans model pickle files and plots of the results saved as png images + Trains kmeans clustering models, saves the model as pickle, generates images and supplementary files Args: workdir: path to the project directory + max_clusters: maximum number of clusters to train Returns: None """ - best_fit = {} - - for table in glob.glob(os.path.join(workdir, 'data_processed', '*.csv')): - # read the data and transform - time_series = pd.read_csv(table, index_col=0).dropna(axis=1) - time_series = np.transpose(time_series.values) - time_series = TimeSeriesScalerMeanVariance().fit_transform(time_series) - - # get the name of the data and start tracking stats - dataset = os.path.splitext(os.path.basename(table))[0] - inertia = {'number': [], 'inertia': []} - - for num_cluster in range(2, 11): - # build the kmeans model - km = TimeSeriesKMeans(n_clusters=num_cluster, random_state=0) - km.fit_predict(time_series) - inertia['number'].append(num_cluster) - inertia['inertia'].append(km.inertia_) - - # save the trained model - km.to_pickle(os.path.join(workdir, 'kmeans_models', f'{dataset}-{num_cluster}-clusters-model.pickle')) - - # generate a plot of the clusters - size = time_series.shape[1] - fig = plt.figure(figsize=(30, 15), dpi=450) - assigned_clusters = km.labels_ - for i in range(num_cluster): - plt.subplot(2, math.ceil(num_cluster / 2), i + 1) - for j in time_series[assigned_clusters == i]: - plt.plot(j.ravel(), "k-", alpha=.2) - plt.plot(km.cluster_centers_[i].ravel(), "r-") - plt.xlim(0, size) - plt.ylim(-3.5, 3.5) - plt.text(0.55, 0.85, f'Cluster {i}', transform=plt.gca().transAxes) - if i == math.floor(num_cluster / 4): - plt.title("Euclidean $k$-means") - plt.tight_layout() - fig.savefig(os.path.join(workdir, 'kmeans_images', f'{dataset}-{num_cluster}-clusters.png')) - plt.close(fig) - - # save the inertia results as a csv - pd.DataFrame.from_dict(inertia).to_csv(os.path.join(workdir, 'kmeans_models', f'{dataset}-inertia.csv')) - # find the knee/elbow - knee = kneed.KneeLocator(inertia['number'], inertia['inertia'], curve='convex', direction='decreasing').knee - best_fit[dataset] = int(knee) + # read the prepared data (array x) + x = read_table(workdir, 'hindcast_fdc_trans') + x = x.values + + # build the kmeans model for a range of cluster numbers + for n_clusters in range(2, max_clusters + 1): + kmeans = KMeans(n_clusters=n_clusters) + kmeans.fit_predict(x) + joblib.dump(kmeans, os.path.join(workdir, 'clusters', f'kmeans-{n_clusters}.pickle')) + return + + +def summarize(workdir: str) -> None: + """ + Generate a summary of the clustering results, calculate the silhouette score, save the centers and labels to parquet + + Args: + workdir: path to the project directory + + Returns: + + """ + summary = {'number': [], 'inertia': [], 'n_iter': [], 'silhouette': []} + silhouette_scores = [] + labels = [] + + # read the prepared data (array x) + x = read_table(workdir, 'hindcast_fdc_trans') + x = x.values + + for model_file in natsorted(glob.glob(os.path.join(workdir, 'clusters', 'kmeans-*.pickle'))): + kmeans = joblib.load(model_file) + n_clusters = int(kmeans.n_clusters) + + # save the cluster centroids to table - columns are the cluster number, rows are the centroid FDC values + pd.DataFrame( + np.transpose(kmeans.cluster_centers_), + columns=np.array(range(n_clusters)).astype(str) + ).to_parquet(os.path.join(workdir, 'clusters', f'kmeans-centers-{n_clusters}.parquet')) + + # save the silhouette score for each cluster + silhouette_scores.append(silhouette_samples(x, kmeans.labels_).flatten()) + labels.append(kmeans.labels_.flatten()) + + # save the summary stats from this model + summary['number'].append(n_clusters) + summary['inertia'].append(kmeans.inertia_) + summary['n_iter'].append(kmeans.n_iter_) + summary['silhouette'].append(np.mean(silhouette_scores[-1])) + + # save the summary results as a csv + pd.DataFrame.from_dict(summary) \ + .to_csv(os.path.join(workdir, 'clusters', f'clustering-summary-stats.csv')) + # save the silhouette scores as a parquet + silhouette_scores = np.transpose(np.array(silhouette_scores)) + pd.DataFrame(silhouette_scores, columns=np.array(range(2, silhouette_scores.shape[1] + 2)).astype(str)) \ + .to_parquet(os.path.join(workdir, 'clusters', 'kmeans-silhouette_scores.parquet')) + # save the labels as a parquet + labels = np.transpose(np.array(labels)) + pd.DataFrame(labels, columns=np.array(range(2, labels.shape[1] + 2)).astype(str)) \ + .to_parquet(os.path.join(workdir, 'clusters', 'kmeans-labels.parquet')) + + # find the knee/elbow + knee = kneed.KneeLocator(summary['number'], summary['inertia'], curve='convex', direction='decreasing').knee # save the best fitting cluster counts to a csv - with open(os.path.join(workdir, 'kmeans_models', cluster_count_file), 'w') as f: - f.write(json.dumps(best_fit)) + with open(os.path.join(workdir, 'clusters', cluster_count_file), 'w') as f: + f.write(json.dumps({'historical': int(knee)})) return -def summarize(workdir: str, assign_table: pd.DataFrame) -> pd.DataFrame: +def plot_clusters(workdir: str, clusters: int or Iterable = 'all', + max_cols: int = 3, plt_width: int = 3, plt_height: int = 3) -> None: """ - Creates a csv listing the streams assigned to each cluster in workdir/kmeans_models and also adds that information - to assign_table.csv + Generate figures of the clustered FDC's Args: workdir: path to the project directory - assign_table: the assign_table dataframe + clusters: number of clusters to create figures for + max_cols: maximum number of columns (subplots) in the figure + plt_width: width of each subplot in inches + plt_height: height of each subplot in inches Returns: None """ - # read the cluster results csv - with open(os.path.join(workdir, 'kmeans_models', cluster_count_file), 'r') as f: - clusters = json.loads(f.read()) - - assign_table[mid_col] = assign_table[mid_col].astype(int) - - for dataset, cluster_count in clusters.items(): - # read the list of simulated id's, pair them with their cluster label, save to df - merge_col = mid_col if "sim" in dataset else gid_col - csv_path = os.path.join(workdir, f'data_processed', f'{dataset}.csv') - ids = pd.read_csv(csv_path, index_col=0).columns + x = read_table(workdir, 'hindcast_fdc_trans').values + size = x.shape[1] + x_values = np.linspace(0, size, 5) + x_ticks = np.linspace(0, 100, 5).astype(int) + + kmeans_dir = os.path.join(workdir, 'clusters') + if clusters == 'all': + model_files = natsorted(glob.glob(os.path.join(kmeans_dir, 'kmeans-*.pickle'))) + elif isinstance(clusters, int): + model_files = glob.glob(os.path.join(kmeans_dir, f'kmeans-{clusters}.pickle')) + elif isinstance(clusters, Iterable): + model_files = natsorted([os.path.join(kmeans_dir, f'kmeans-{i}.pickle') for i in clusters]) + else: + raise TypeError('n_clusters should be of type int or an iterable') + + for model_file in model_files: + kmeans = joblib.load(model_file) + n_clusters = int(kmeans.n_clusters) + n_cols = min(n_clusters, max_cols) + n_rows = math.ceil(n_clusters / n_cols) + + fig, axs = plt.subplots( + n_rows, + n_cols, + figsize=(plt_width * n_cols + 1, plt_height * n_rows + 1), + dpi=500, + squeeze=False, + tight_layout=True, + sharey=True + ) + fig.suptitle("KMeans FDC Clustering") + fig.supxlabel('Exceedance Probability (%)') + fig.supylabel('Discharge Z-Score') + + for i, ax in enumerate(fig.axes[:n_clusters]): + ax.set_title(f'Cluster {i + 1} (n = {np.sum(kmeans.labels_ == i)})') + ax.set_xlim(0, size) + ax.set_xticks(x_values, x_ticks) + ax.set_ylim(-2, 4) + for j in x[kmeans.labels_ == i]: + ax.plot(j.ravel(), "k-") + ax.plot(kmeans.cluster_centers_[i].flatten(), "r-") + # turn off plotting axes which are blank (when ax number > n_clusters) + for ax in fig.axes[n_clusters:]: + ax.axis('off') + + fig.savefig(os.path.join(workdir, 'clusters', f'kmeans-clusters-{n_clusters}.png')) + plt.close(fig) + return - # open the optimal model pickle file - optimal_model = os.path.join(workdir, 'kmeans_models', f'{dataset}-{cluster_count}-clusters-model.pickle') - sim_labels = TimeSeriesKMeans.from_pickle(optimal_model).labels_.tolist() - # create a dataframe of the ids and their labels (assigned groups) - df = pd.DataFrame(np.transpose([sim_labels, ids]), columns=[f'{dataset}-cluster', merge_col]) - df.to_csv(os.path.join(workdir, 'kmeans_models', f'optimal-assigns-{dataset}.csv'), index=False) +def plot_silhouette(workdir: str, plt_width: int = 3, plt_height: int = 3) -> None: + """ + Plot the silhouette scores for each cluster. + Based on https://scikit-learn.org/stable/auto_examples/cluster/plot_kmeans_silhouette_analysis.html - # merge the dataframes - df[merge_col] = df[merge_col].astype(int) - assign_table = assign_table.merge(df, how='outer', on=merge_col) + Args: + workdir: path to the project directory + plt_width: width of each subplot in inches + plt_height: height of each subplot in inches - return assign_table + Returns: + None + """ + labels_df = pd.read_parquet(os.path.join(workdir, 'clusters', 'kmeans-labels.parquet')) + silhouette_df = pd.read_parquet(os.path.join(workdir, 'clusters', 'kmeans-silhouette_scores.parquet')) + + for tot_clusters in silhouette_df.columns: + centers_df = pd.read_parquet(os.path.join(workdir, 'clusters', f'kmeans-centers-{tot_clusters}.parquet')) + fig, (ax1, ax2) = plt.subplots( + nrows=1, + ncols=2, + figsize=(plt_width * 2 + 1, plt_height + 1), + dpi=500, + tight_layout=True, + ) + + # Plot 1 titles and labels + ax1.set_title("Silhouette Plot") + ax1.set_xlabel("Silhouette Score") + ax1.set_ylabel("Cluster Label") + ax1.set_yticks([]) # Clear the yaxis labels / ticks + ax1.set_xticks([-0.2, 0, 0.2, 0.4, 0.6, 0.8, 1]) + + # Plot 2 titles and labels + ax2.set_title("Cluster Centers") + ax2.set_xlabel("Exceedance Probability (%)") + ax2.set_ylabel("Discharge Z-Score") + + # The vertical line for average silhouette score of all the values + ax1.axvline(x=silhouette_df[tot_clusters].mean(), color="red", linestyle="--") + + y_lower = 10 + for sub_cluster in range(int(tot_clusters)): + # select the rows applicable to the current sub cluster + cluster_silhouettes = silhouette_df[labels_df[tot_clusters] == sub_cluster][tot_clusters].values.flatten() + cluster_silhouettes.sort() + + n = cluster_silhouettes.shape[0] + y_upper = y_lower + n + + color = cm.nipy_spectral(sub_cluster / int(tot_clusters)) + ax1.fill_betweenx( + np.arange(y_lower, y_upper), + 0, + cluster_silhouettes, + facecolor=color, + edgecolor=color, + alpha=0.7, + ) + # Label the silhouette plots with their cluster numbers at the middle + ax1.text(-0.05, y_lower + 0.5 * n, str(sub_cluster + 1)) + + # plot the cluster center + ax2.plot(centers_df[f'{sub_cluster}'].values, alpha=0.7, c=color, label=f'Cluster {sub_cluster + 1}') + + # add some buffer before the next cluster + y_lower = y_upper + 10 + + fig.savefig(os.path.join(workdir, 'clusters', f'kmeans-silhouettes-{tot_clusters}.png')) + plt.close(fig) + return diff --git a/saber/gis.py b/saber/gis.py index 5c494e8..f8f6de9 100644 --- a/saber/gis.py +++ b/saber/gis.py @@ -1,25 +1,23 @@ import os import warnings +import contextily as cx import geopandas as gpd +import matplotlib as mpl +import matplotlib.pyplot as plt import numpy as np import pandas as pd -from ._vocab import mid_col -from ._vocab import gid_col -from ._vocab import reason_col -from ._vocab import metric_nc_name_list - -import matplotlib as mpl -import matplotlib.pyplot as plt -import contextily as cx +from .io import gid_col +from .io import metric_nc_name_list +from .io import mid_col +from .io import reason_col __all__ = ['generate_all', 'clip_by_assignment', 'clip_by_cluster', 'clip_by_unassigned', 'clip_by_ids', 'validation_maps'] -def generate_all(workdir: str, assign_table: pd.DataFrame, drain_shape: str, prefix: str = '', - id_column: str = mid_col) -> None: +def generate_all(workdir: str, assign_table: pd.DataFrame, drain_gis: str, prefix: str = '') -> None: """ Runs all the clip functions which create subsets of the drainage lines GIS dataset based on how they were assigned for bias correction. @@ -27,129 +25,117 @@ def generate_all(workdir: str, assign_table: pd.DataFrame, drain_shape: str, pre Args: workdir: the path to the working directory for the project assign_table: the assign_table dataframe - drain_shape: path to a drainage line shapefile which can be clipped - prefix: a prefix for names of the outputs to distinguish between data generated at separate instances - id_column: name of the id column in the attributes of the shape table + drain_gis: path to a drainage line shapefile which can be clipped + prefix: a prefix for names of the outputs to distinguish between data generated in separate instances Returns: None """ - clip_by_assignment(workdir, assign_table, drain_shape, prefix, id_column) - clip_by_cluster(workdir, assign_table, drain_shape, prefix, id_column) - clip_by_unassigned(workdir, assign_table, drain_shape, prefix, id_column) + clip_by_assignment(workdir, assign_table, drain_gis, prefix) + clip_by_cluster(workdir, assign_table, drain_gis, prefix) + clip_by_unassigned(workdir, assign_table, drain_gis, prefix) return -def clip_by_assignment(workdir: str, assign_table: pd.DataFrame, drain_shape: str, prefix: str = '', - id_column: str = mid_col) -> None: +def clip_by_assignment(workdir: str, assign_table: pd.DataFrame, drain_gis: str, prefix: str = '') -> None: """ - Creates geojsons (in workdir/gis_outputs) for each unique value in the assignment column + Creates Geopackage files in workdir/gis_outputs for each unique value in the assignment column Args: workdir: the path to the working directory for the project assign_table: the assign_table dataframe - drain_shape: path to a drainage line shapefile which can be clipped + drain_gis: path to a drainage line shapefile which can be clipped prefix: a prefix for names of the outputs to distinguish between data generated at separate instances - id_column: name of the id column in the attributes of the shape table Returns: None """ # read the drainage line shapefile - dl = gpd.read_file(drain_shape) - save_dir = os.path.join(workdir, 'gis_outputs') + dl = gpd.read_file(drain_gis) + save_dir = os.path.join(workdir, 'gis') # get the unique list of assignment reasons for reason in set(assign_table[reason_col].dropna().values): ids = assign_table[assign_table[reason_col] == reason][mid_col].values - subset = dl[dl[id_column].isin(ids)] - name = f'{prefix}{"_" if prefix else ""}assignments_{reason}.json' + subset = dl[dl[mid_col].isin(ids)] + name = f'{prefix}{"_" if prefix else ""}assignments_{reason}.gpkg' if subset.empty: continue else: - subset.to_file(os.path.join(save_dir, name), driver='GeoJSON') + subset.to_file(os.path.join(save_dir, name)) return -def clip_by_cluster(workdir: str, assign_table: pd.DataFrame, drain_shape: str, prefix: str = '', - id_column: str = mid_col) -> None: +def clip_by_cluster(workdir: str, assign_table: pd.DataFrame, drain_gis: str, prefix: str = '') -> None: """ - Creates GIS files (in workdir/gis_outputs) of the drainage lines based on which fdc cluster they were assigned to + Creates Geopackage files in workdir/gis_outputs of the drainage lines based on the fdc cluster they were assigned to Args: workdir: the path to the working directory for the project assign_table: the assign_table dataframe - drain_shape: path to a drainage line shapefile which can be clipped + drain_gis: path to a drainage line shapefile which can be clipped prefix: optional, a prefix to prepend to each created file's name - id_column: name of the id column in the attributes of the shape table Returns: None """ - dl_gdf = gpd.read_file(drain_shape) - cluster_types = [a for a in assign_table if 'cluster' in a] - for ctype in cluster_types: - for gnum in sorted(set(assign_table[ctype].dropna().values)): - savepath = os.path.join(workdir, 'gis_outputs', f'{prefix}{"_" if prefix else ""}{ctype}-{int(gnum)}.json') - ids = assign_table[assign_table[ctype] == gnum][mid_col].values - if dl_gdf[dl_gdf[id_column].isin(ids)].empty: - continue - else: - dl_gdf[dl_gdf[id_column].isin(ids)].to_file(savepath, driver='GeoJSON') + dl_gdf = gpd.read_file(drain_gis) + for num in sorted(set(assign_table[mid_col].dropna().values)): + gdf = dl_gdf[dl_gdf[mid_col].isin(assign_table[assign_table[mid_col] == num][mid_col])] + if gdf.empty: + continue + gdf.to_file(os.path.join(workdir, 'gis', f'{prefix}{"_" if prefix else ""}-{int(num)}.gpkg')) return -def clip_by_unassigned(workdir: str, assign_table: pd.DataFrame, drain_shape: str, prefix: str = '', - id_column: str = mid_col) -> None: +def clip_by_unassigned(workdir: str, assign_table: pd.DataFrame, drain_gis: str, prefix: str = '') -> None: """ - Creates geojsons (in workdir/gis_outputs) of the drainage lines which haven't been assigned a gauge yet + Creates Geopackage files in workdir/gis_outputs of the drainage lines which haven't been assigned a gauge yet Args: workdir: the path to the working directory for the project assign_table: the assign_table dataframe - drain_shape: path to a drainage line shapefile which can be clipped + drain_gis: path to a drainage line shapefile which can be clipped prefix: optional, a prefix to prepend to each created file's name - id_column: name of the id column in the attributes of the shape table Returns: None """ - dl_gdf = gpd.read_file(drain_shape) + dl_gdf = gpd.read_file(drain_gis) ids = assign_table[assign_table[reason_col].isna()][mid_col].values - subset = dl_gdf[dl_gdf[id_column].isin(ids)] + subset = dl_gdf[dl_gdf[mid_col].isin(ids)] if subset.empty: warnings.warn('Empty filter: No streams are unassigned') return - savepath = os.path.join(workdir, 'gis_outputs', f'{prefix}{"_" if prefix else ""}assignments_unassigned.json') - subset.to_file(savepath, driver='GeoJSON') + savepath = os.path.join(workdir, 'gis', f'{prefix}{"_" if prefix else ""}assignments_unassigned.gpkg') + subset.to_file(savepath) return -def clip_by_ids(workdir: str, ids: list, drain_shape: str, prefix: str = '', - id_column: str = mid_col) -> None: +def clip_by_ids(workdir: str, ids: list, drain_gis: str, prefix: str = '', id_column: str = mid_col) -> None: """ - Creates geojsons (in workdir/gis_outputs) of the subset of 'drain_shape' with an ID in the specified list + Creates Geopackage files in workdir/gis_outputs of the subset of 'drain_shape' with an ID in the specified list Args: workdir: path to the project directory ids: any iterable containing a series of model_ids - drain_shape: path to the drainage shapefile to be clipped + drain_gis: path to the drainage shapefile to be clipped prefix: optional, a prefix to prepend to each created file's name id_column: name of the id column in the attributes of the shape table Returns: None """ - dl = gpd.read_file(drain_shape) - save_dir = os.path.join(workdir, 'gis_outputs') - name = f'{prefix}{"_" if prefix else ""}id_subset.json' - dl[dl[id_column].isin(ids)].to_file(os.path.join(save_dir, name), driver='GeoJSON') + dl = gpd.read_file(drain_gis) + save_dir = os.path.join(workdir, 'gis') + name = f'{prefix}{"_" if prefix else ""}id_subset.gpkg' + dl[dl[id_column].isin(ids)].to_file(os.path.join(save_dir, name)) return -def validation_maps(workdir: str, gauge_shape: str, val_table: pd.DataFrame = None, prefix: str = '') -> None: +def validation_maps(workdir: str, gauge_gis: str, val_table: pd.DataFrame = None, prefix: str = '') -> None: """ - Creates geojsons (in workdir/gis_outputs) of subsets of the gauge_shape. + Creates Geopackage files in workdir/gis_outputs of subsets of the gauge_shape. 1 is the fill gauge shape with added attribute columns for all the computed stats. There are 2 for each of the 5 validation groups; 1 which shows the gauges included in the validation set and 1 which shows gauges that were excluded from the validation set. @@ -157,7 +143,7 @@ def validation_maps(workdir: str, gauge_shape: str, val_table: pd.DataFrame = No Args: workdir: path to the project directory val_table: the validation table produced by saber.validate - gauge_shape: path to the gauge locations shapefile + gauge_gis: path to the gauge locations shapefile prefix: optional, a prefix to prepend to each created file's name Returns: @@ -165,12 +151,12 @@ def validation_maps(workdir: str, gauge_shape: str, val_table: pd.DataFrame = No """ if val_table is None: val_table = pd.read_csv(os.path.join(workdir, 'validation_runs', 'val_table.csv')) - save_dir = os.path.join(workdir, 'gis_outputs') + save_dir = os.path.join(workdir, 'gis') # merge gauge table with the validation table - gdf = gpd.read_file(gauge_shape) + gdf = gpd.read_file(gauge_gis) gdf = gdf.merge(val_table, on=gid_col, how='inner') - gdf.to_file(os.path.join(save_dir, 'gauges_with_validation_stats.json'), driver='GeoJSON') + gdf.to_file(os.path.join(save_dir, 'gauges_with_validation_stats.gpkg')) core_columns = [mid_col, gid_col, 'geometry'] @@ -182,12 +168,12 @@ def validation_maps(workdir: str, gauge_shape: str, val_table: pd.DataFrame = No gdf_sub = gdf[cols_to_select] gdf_sub = gdf_sub.rename(columns={f'{metric}_{val_set}': metric}) - name = f'{prefix}{"_" if prefix else ""}valset_{val_set}_{metric}_included.json' - gdf_sub[gdf_sub[val_set] == 1].to_file(os.path.join(save_dir, name), driver='GeoJSON') + name = f'{prefix}{"_" if prefix else ""}valset_{val_set}_{metric}_included.gpkg' + gdf_sub[gdf_sub[val_set] == 1].to_file(os.path.join(save_dir, name)) - name = f'{prefix}{"_" if prefix else ""}valset_{val_set}_{metric}_excluded.json' + name = f'{prefix}{"_" if prefix else ""}valset_{val_set}_{metric}_excluded.gpkg' exc = gdf_sub[gdf_sub[val_set] == 0] - exc.to_file(os.path.join(save_dir, name), driver='GeoJSON') + exc.to_file(os.path.join(save_dir, name)) if metric == 'KGE2012': histomaps(exc, metric, val_set, workdir) @@ -195,6 +181,17 @@ def validation_maps(workdir: str, gauge_shape: str, val_table: pd.DataFrame = No def histomaps(gdf: gpd.GeoDataFrame, metric: str, prct: str, workdir: str) -> None: + """ + + Args: + gdf: + metric: + prct: + workdir: + + Returns: + + """ core_columns = [mid_col, gid_col, 'geometry'] # world = gpd.read_file(gpd.datasets.get_path('naturalearth_lowres')) # world.plot(ax=axm, color='white', edgecolor='black') @@ -231,9 +228,8 @@ def histomaps(gdf: gpd.GeoDataFrame, metric: str, prct: str, workdir: str) -> No axm.set_xlabel('Longitude') axm.set_xticks([]) axm.set_yticks([]) - gdf[core_columns + [metric, ]].to_crs(epsg=3857).plot( - metric, ax=axm, cmap=cmap, norm=norm, legend=True, markersize=10) + gdf[core_columns + [metric, ]].to_crs(epsg=3857).plot(metric) cx.add_basemap(ax=axm, zoom=9, source=cx.providers.Esri.WorldTopoMap, attribution='') - fig.savefig(os.path.join(workdir, 'gis_outputs', f'{metric}_{prct}.png')) + fig.savefig(os.path.join(workdir, 'gis', f'{metric}_{prct}.png')) return diff --git a/saber/io.py b/saber/io.py new file mode 100644 index 0000000..8436a2d --- /dev/null +++ b/saber/io.py @@ -0,0 +1,104 @@ +import os + +import pandas as pd + +# assign table and gis_input file required column names +mid_col = 'model_id' +gid_col = 'gauge_id' +asgn_mid_col = 'assigned_model_id' +asgn_gid_col = 'assigned_gauge_id' +down_mid_col = 'downstream_model_id' +reason_col = 'reason' +area_col = 'drain_area' +order_col = 'strahler_order' + +# name of some files produced by the algorithm +cluster_count_file = 'best-fit-cluster-count.json' +cal_nc_name = 'calibrated_simulated_flow.nc' + +# scaffolded folders +dir_tables = 'tables' +dir_gis = 'gis' +dir_clusters = 'clusters' +dir_valid = 'validation' + +# name of the required input tables and the outputs +table_hindcast = 'hindcast.parquet' +table_hindcast_fdc = 'hindcast_fdc.parquet' +table_hindcast_fdc_trans = 'hindcast_fdc_transformed.parquet' +table_mids = 'mid_table.parquet' +table_drain = 'drain_table.parquet' +table_gauge = 'gauge_table.parquet' +table_assign = 'assign_table.csv' + +# metrics computed on validation sets +metric_list = ['ME', 'MAE', 'RMSE', 'MAPE', 'NSE', 'KGE (2012)'] +metric_nc_name_list = ['ME', 'MAE', 'RMSE', 'MAPE', 'NSE', 'KGE2012'] + + +def scaffold_workdir(path: str, include_validation: bool = True) -> None: + """ + Creates the correct directories for a Saber project within the specified directory + + Args: + path: the path to a directory where you want to create workdir subdirectories + include_validation: boolean, indicates whether to create the validation folder + + Returns: + None + """ + dir_list = [dir_tables, dir_gis, dir_clusters] + if not os.path.exists(path): + os.mkdir(path) + if include_validation: + dir_list.append(dir_valid) + for d in dir_list: + p = os.path.join(path, d) + if not os.path.exists(p): + os.mkdir(p) + return + + +def get_table_path(workdir: str, table_name: str) -> str: + if table_name == 'hindcast': + return os.path.join(workdir, dir_tables, table_hindcast) + elif table_name == 'hindcast_fdc': + return os.path.join(workdir, dir_tables, table_hindcast_fdc) + elif table_name == 'hindcast_fdc_trans': + return os.path.join(workdir, dir_tables, table_hindcast_fdc) + elif table_name == 'model_ids': + return os.path.join(workdir, dir_tables, table_mids) + elif table_name == 'drain_table': + return os.path.join(workdir, dir_tables, table_drain) + elif table_name == 'gauge_table': + return os.path.join(workdir, dir_tables, table_gauge) + elif table_name == 'assign_table': + return os.path.join(workdir, dir_tables, table_assign) + else: + raise ValueError(f'Unknown table name: {table_name}') + + +def read_table(workdir: str, table_name: str) -> pd.DataFrame: + table_path = get_table_path(workdir, table_name) + table_format = os.path.splitext(table_path)[-1] + if table_format == '.parquet': + return pd.read_parquet(table_path) + elif table_format == '.feather': + return pd.read_feather(table_path) + elif table_format == '.csv': + return pd.read_csv(table_path) + else: + raise ValueError(f'Unknown table format: {table_format}') + + +def write_table(table: pd.DataFrame, workdir: str, table_name: str) -> None: + table_path = get_table_path(workdir, table_name) + table_format = os.path.splitext(table_path)[-1] + if table_format == '.parquet': + return table.to_parquet(table_path) + elif table_format == '.feather': + return table.to_feather(table_path) + elif table_format == '.csv': + return table.to_csv(table_path) + else: + raise ValueError(f'Unknown table format: {table_format}') diff --git a/saber/prep.py b/saber/prep.py index f08f795..753ba17 100755 --- a/saber/prep.py +++ b/saber/prep.py @@ -1,88 +1,96 @@ -import glob -import os - -import grids +import geopandas as gpd +import netCDF4 as nc +import numpy as np import pandas as pd -import xarray as xr - -from .utils import compute_fdc +from sklearn.preprocessing import StandardScaler as Scalar -from ._vocab import mid_col +from .io import mid_col +from .io import order_col +from .io import read_table +from .io import write_table +__all__ = ['gis_tables', 'hindcast'] -def guess_hist_sim_path(workdir: str) -> str: - return glob.glob(os.path.join(workdir, 'data_inputs', '*.nc*'))[0] - -def historical_simulation(workdir: str, hist_nc_path: str = None) -> None: +def gis_tables(workdir: str, gauge_gis: str = None, drain_gis: str = None) -> None: """ - Creates sim-fdc.csv and sim_time_series.pickle in workdir/data_processed for geoglows historical simulation data + Generate copies of the drainage line attribute tables in parquet format using the Saber package vocabulary Args: workdir: path to the working directory for the project - hist_nc_path: path to the historical simulation netcdf if not located at workdir/data_simulated/*.nc + gauge_gis: path to the GIS dataset (e.g. geopackage) for the gauge locations (points) + drain_gis: path to the GIS dataset (e.g. geopackage) for the drainage line locations (polylines) Returns: None """ - if hist_nc_path is None: - hist_nc_path = guess_hist_sim_path(workdir) - - # read the assignments table - drain_table = pd.read_csv(os.path.join(workdir, 'gis_inputs', 'drain_table.csv')) - model_ids = list(set(sorted(drain_table[mid_col].tolist()))) - - # open the historical data netcdf file - hist_nc = xr.open_dataset(hist_nc_path) - - # start dataframes for the flow duration curve (fdc) and the monthly averages (ma) using the first comid in the list - first_id = model_ids.pop(0) - first_data = hist_nc.sel(rivid=first_id).Qout.to_dataframe()['Qout'] - fdc_df = compute_fdc(first_data.tolist(), col_name=first_id) - # ma_df = first_data.groupby(first_data.index.strftime('%m')).mean().to_frame(name=first_id) - - # for each remaining stream ID in the list, merge/append the fdc and ma with the previously created dataframes - for model_id in model_ids: - data = hist_nc.sel(rivid=model_id).Qout.to_dataframe()['Qout'] - fdc_df = fdc_df.merge(compute_fdc(data.tolist(), col_name=model_id), - how='outer', left_index=True, right_index=True) - - fdc_df.to_csv(os.path.join(workdir, 'data_processed', 'sim-fdc.csv')) - - # create the time series table of simulated flow values - coords = drain_table[[mid_col]] - coords.loc[:, 'time'] = None - coords = coords[['time', 'model_id']].values.tolist() - ts = grids.TimeSeries([hist_nc_path, ], 'Qout', ('time', 'rivid')) - ts = ts.multipoint(*coords) - ts.set_index('datetime', inplace=True) - ts.index = pd.to_datetime(ts.index, unit='s') - ts.columns = drain_table[mid_col].values.flatten() - ts.index = ts.index.tz_localize('UTC') - ts.to_pickle(os.path.join(workdir, 'data_processed', 'sim_time_series.pickle') -) + if gauge_gis is not None: + if gauge_gis.endswith('.parquet'): + gdf = gpd.read_parquet(gauge_gis) + else: + gdf = gpd.read_file(gauge_gis) + write_table(pd.DataFrame(gdf.drop('geometry', axis=1)), workdir, 'gauge_table') + + if drain_gis is not None: + if drain_gis.endswith('.parquet'): + gdf = gpd.read_parquet(drain_gis) + else: + gdf = gpd.read_file(drain_gis) + gdf['centroid_x'] = gdf.geometry.centroid.x + gdf['centroid_y'] = gdf.geometry.centroid.y + gdf = gdf.drop('geometry', axis=1) + write_table(pd.DataFrame(gdf), workdir, 'drain_table') return -def scaffold_workdir(path: str, include_validation: bool = True) -> None: +def hindcast(workdir: str, hind_nc_path: str, drop_order_1: bool = False) -> None: """ - Creates the correct directories for an RBC project within the a specified directory + Creates hindcast_series_table.parquet and hindcast_fdc_table.parquet in the workdir/tables directory + for the GEOGloWS hindcast data Args: - path: the path to a directory where you want to create directories - include_validation: boolean, indicates whether or not to create the validation folder + workdir: path to the working directory for the project + hind_nc_path: path to the hindcast or historical simulation netcdf + drop_order_1: whether to drop the order 1 streams from the hindcast Returns: None """ - if not os.path.isdir(path): - os.mkdir(path) - os.mkdir(os.path.join(path, 'data_inputs')) - os.mkdir(os.path.join(path, 'data_processed')) - os.mkdir(os.path.join(path, 'gis_inputs')) - os.mkdir(os.path.join(path, 'gis_outputs')) - os.mkdir(os.path.join(path, 'kmeans_models')) - os.mkdir(os.path.join(path, 'kmeans_images')) - if include_validation: - os.mkdir(os.path.join(path, 'validation_runs')) + # read the assignments table + model_ids = read_table(workdir, 'drain_table') + if drop_order_1: + model_ids = model_ids[model_ids[order_col] > 1] + model_ids = list(set(sorted(model_ids[mid_col].tolist()))) + + # read the hindcast netcdf, convert to dataframe, store as parquet + hnc = nc.Dataset(hind_nc_path) + ids = pd.Series(hnc['rivid'][:]) + ids_selector = ids.isin(model_ids) + ids = ids[ids_selector].astype(str).values.flatten() + + # save the model ids to table for reference + write_table(pd.DataFrame(ids, columns=[mid_col, ]), workdir, 'model_ids') + + # save the hindcast series to parquet + df = pd.DataFrame( + hnc['Qout'][:, ids_selector], + columns=ids, + index=pd.to_datetime(hnc.variables['time'][:], unit='s') + ) + df = df[df.index.year >= 1980] + df.index.name = 'datetime' + write_table(df, workdir, 'hindcast') + + # calculate the FDC and save to parquet + exceed_prob = np.linspace(0, 100, 101)[::-1] + df = df.apply(lambda x: np.transpose(np.nanpercentile(x, exceed_prob))) + df.index = exceed_prob + df.index.name = 'exceed_prob' + write_table(df, workdir, 'hindcast_fdc') + + # transform and prepare for clustering + df = pd.DataFrame(np.transpose(Scalar().fit_transform(np.squeeze(df.values)))) + df.index = ids + df.columns = df.columns.astype(str) + write_table(df, workdir, 'hindcast_fdc_trans') return diff --git a/saber/table.py b/saber/table.py deleted file mode 100644 index 076da48..0000000 --- a/saber/table.py +++ /dev/null @@ -1,77 +0,0 @@ -import os - -import numpy as np -import pandas as pd - -from ._vocab import mid_col -from ._vocab import asgn_mid_col -from ._vocab import asgn_gid_col -from ._vocab import reason_col - -__all__ = ['get_path', 'read', 'cache', 'gen'] - - -def get_path(workdir: str) -> str: - """ - Creates the path to the assign table based on the working directory provided - - Args: - workdir: path to the working directory - - Returns: - absolute path of the assign_table - """ - return os.path.join(workdir, 'assign_table.csv') - - -def gen(workdir) -> pd.DataFrame: - """ - Joins the drain_table.csv and gauge_table.csv to create the assign_table.csv - - Args: - workdir: path to the working directory - - Returns: - None - """ - # read and merge the tables - drain_df = pd.read_csv(os.path.join(workdir, 'gis_inputs', 'drain_table.csv')) - gauge_df = pd.read_csv(os.path.join(workdir, 'gis_inputs', 'gauge_table.csv')) - assign_table = pd.merge(drain_df, gauge_df, on=mid_col, how='outer') - - # create the new columns - assign_table[asgn_mid_col] = np.nan - assign_table[asgn_gid_col] = np.nan - assign_table[reason_col] = np.nan - - return assign_table - - -def read(workdir: str) -> pd.DataFrame: - """ - Reads the assign_table located in the provided directory - - Args: - workdir: path to the working directory - - Returns: - assign_table pandas.DataFrame - """ - return pd.read_csv(get_path(workdir)) - - -def cache(workdir: str, assign_table: pd.DataFrame) -> None: - """ - Saves the pandas dataframe to a csv in the proper place in the project directory - A shortcut for pd.DataFrame.to_csv so you don't have to code it all the time - - Args: - workdir: the project directory path - assign_table: the assign_table dataframe - - Returns: - None - """ - assign_table.to_csv(get_path(workdir), index=False) - return - diff --git a/saber/tests/calibrate.py b/saber/tests/calibrate.py deleted file mode 100644 index c58fa40..0000000 --- a/saber/tests/calibrate.py +++ /dev/null @@ -1,63 +0,0 @@ -from io import StringIO - -import geoglows -import pandas as pd -import requests - - -def collect_data(start_id, start_ideam_id, downstream_id, downstream_ideam_id): - # Upstream simulated flow - start = geoglows.streamflow.historic_simulation(start_id) - # Upstream observed flow - start_ideam = get_ideam_flow(start_ideam_id) - start_ideam.dropna(inplace=True) - - # Downstream simulated flow - downstream = geoglows.streamflow.historic_simulation(downstream_id) - # Downstream observed flow - downstream_ideam = get_ideam_flow(downstream_ideam_id) - downstream_ideam.dropna(inplace=True) - # Downstream bias corrected flow (for comparison to the data_4_assign_propagation method - downstream_bc = geoglows.bias.correct_historical(downstream, downstream_ideam) - - # Export all as csv - start.to_csv('start_flow.csv') - start_ideam.to_csv('start_ideam_flow.csv') - downstream.to_csv('downstream_flow.csv') - downstream_ideam.to_csv('downstream_ideam_flow.csv') - downstream_bc.to_csv('downstream_bc_flow.csv') - return - - -def get_ideam_flow(id): - # get the gauged data - url = f'https://www.hydroshare.org/resource/d222676fbd984a81911761ca1ba936bf/' \ - f'data/contents/Discharge_Data/{id}.csv' - df = pd.read_csv(StringIO(requests.get(url).text), index_col=0) - df.index = pd.to_datetime(df.index).tz_localize('UTC') - return df - -# collect_data(9012999, 22057070, 9012650, 22057010) -# collect_data(9017261, 32037030, 9015333, 32097010) # really long range down stream -# collect_data(9009660, 21237020, 9007292, 23097040) # large river -# collect_data(9007292, 23097040, 9009660, 21237020) # large river backwards (going upstream) - -# Read all as csv -start_flow = pd.read_csv('data_4_assign_propagation/start_flow.csv', index_col=0) -start_ideam_flow = pd.read_csv('data_4_assign_propagation/start_ideam_flow.csv', index_col=0) -downstream_flow = pd.read_csv('data_4_assign_propagation/downstream_flow.csv', index_col=0) -downstream_ideam_flow = pd.read_csv('data_4_assign_propagation/downstream_ideam_flow.csv', index_col=0) -downstream_bc_flow = pd.read_csv('data_4_assign_propagation/downstream_bc_flow.csv', index_col=0) -start_flow.index = pd.to_datetime(start_flow.index) -start_ideam_flow.index = pd.to_datetime(start_ideam_flow.index) -downstream_flow.index = pd.to_datetime(downstream_flow.index) -downstream_ideam_flow.index = pd.to_datetime(downstream_ideam_flow.index) -downstream_bc_flow.index = pd.to_datetime(downstream_bc_flow.index) - -downstream_prop_correct = rbc_calibrate(start_flow, start_ideam_flow, downstream_flow, - fit_gumbel=True, gumbel_range=(25, 75)) -plot_results(downstream_flow, downstream_ideam_flow, downstream_bc_flow, downstream_prop_correct, - f'Correct Monthly - Force Gumbel Distribution') -del downstream_prop_correct['Scalars'], downstream_prop_correct['Percentile'] -statistics_tables(downstream_prop_correct, downstream_flow, downstream_ideam_flow).to_csv( - 'data_4_assign_propagation/stats_test.csv') diff --git a/saber/utils.py b/saber/utils.py deleted file mode 100644 index 7dd3b80..0000000 --- a/saber/utils.py +++ /dev/null @@ -1,54 +0,0 @@ -import math - -import hydrostats as hs -import hydrostats.data as hd -import numpy as np -import pandas as pd - - -def solve_gumbel1(std, xbar, rp): - """ - Solves the Gumbel Type I pdf = exp(-exp(-b)) - where b is the covariate - """ - # xbar = statistics.mean(year_max_flow_list) - # std = statistics.stdev(year_max_flow_list, xbar=xbar) - return -math.log(-math.log(1 - (1 / rp))) * std * .7797 + xbar - (.45 * std) - - -def statistics_tables(corrected: pd.DataFrame, simulated: pd.DataFrame, observed: pd.DataFrame) -> pd.DataFrame: - # merge the datasets together - merged_sim_obs = hd.merge_data(sim_df=simulated, obs_df=observed) - merged_cor_obs = hd.merge_data(sim_df=corrected, obs_df=observed) - - metrics = ['ME', 'RMSE', 'NRMSE (Mean)', 'MAPE', 'NSE', 'KGE (2009)', 'KGE (2012)'] - # Merge Data - table1 = hs.make_table(merged_dataframe=merged_sim_obs, metrics=metrics) - table2 = hs.make_table(merged_dataframe=merged_cor_obs, metrics=metrics) - - table2 = table2.rename(index={'Full Time Series': 'Corrected Full Time Series'}) - table1 = table1.rename(index={'Full Time Series': 'Original Full Time Series'}) - table1 = table1.transpose() - table2 = table2.transpose() - - return pd.merge(table1, table2, right_index=True, left_index=True) - - -def compute_fdc(flows: np.array, steps: int = 500, exceed: bool = True, col_name: str = 'flow'): - percentiles = [round((1 / steps) * i * 100, 5) for i in range(steps + 1)] - flows = np.nanpercentile(flows, percentiles) - if exceed: - percentiles.reverse() - return pd.DataFrame(flows, columns=[col_name, ], index=percentiles) - - -def compute_scalar_fdc(first_fdc, second_fdc): - first_fdc = compute_fdc(first_fdc) - second_fdc = compute_fdc(second_fdc) - ratios = np.divide(first_fdc['flow'].values.flatten(), second_fdc['flow'].values.flatten()) - columns = (first_fdc.columns[0], 'Scalars') - scalars_df = pd.DataFrame(np.transpose([first_fdc.values[:, 0], ratios]), columns=columns) - scalars_df.replace(np.inf, np.nan, inplace=True) - scalars_df.dropna(inplace=True) - - return scalars_df diff --git a/saber/validate.py b/saber/validate.py index f4969a2..f40a140 100644 --- a/saber/validate.py +++ b/saber/validate.py @@ -2,16 +2,16 @@ import os import shutil -import pandas as pd import netCDF4 as nc import numpy as np +import pandas as pd -from .prep import scaffold_workdir -from ._workflow import analyze_region -from ._vocab import mid_col -from ._vocab import gid_col -from ._vocab import metric_nc_name_list -from ._vocab import cal_nc_name +from .io import cal_nc_name +from .io import gid_col +from .io import metric_nc_name_list +from .io import mid_col +from .io import read_table +from .io import scaffold_workdir def sample_gauges(workdir: str, overwrite: bool = False) -> None: @@ -27,14 +27,15 @@ def sample_gauges(workdir: str, overwrite: bool = False) -> None: Returns: None """ - vr_path = os.path.join(workdir, 'validation_runs') + vr_path = os.path.join(workdir, 'validation') if overwrite: if os.path.exists(vr_path): shutil.rmtree(vr_path) os.mkdir(vr_path) - gt = pd.read_csv(os.path.join(workdir, 'gis_inputs', 'gauge_table.csv')) + gt = read_table(workdir, 'gauge_table') initial_row_count = gt.shape[0] + rows_to_drop = round(gt.shape[0] / 10) for i in range(5): @@ -44,7 +45,6 @@ def sample_gauges(workdir: str, overwrite: bool = False) -> None: subpath = os.path.join(vr_path, f'{pct_remain}') # create the new project working directory - os.mkdir(subpath) scaffold_workdir(subpath, include_validation=False) # overwrite the processed data directory so we don't need to redo this each time @@ -56,9 +56,9 @@ def sample_gauges(workdir: str, overwrite: bool = False) -> None: # sample the gauge table gt = gt.sample(n=n) - gt.to_csv(os.path.join(subpath, 'gis_inputs', 'gauge_table.csv'), index=False) - shutil.copyfile(os.path.join(workdir, 'gis_inputs', 'drain_table.csv'), - os.path.join(subpath, 'gis_inputs', 'drain_table.csv')) + gt.to_csv(os.path.join(subpath, 'gis', 'gauge_table.csv'), index=False) + shutil.copyfile(os.path.join(workdir, 'gis', 'drain_table.csv'), + os.path.join(subpath, 'gis', 'drain_table.csv')) # filter the copied processed data to only contain the gauges included in this filtered step processed_sim_data = glob.glob(os.path.join(subpath, 'data_processed', 'obs-*.csv')) @@ -70,26 +70,6 @@ def sample_gauges(workdir: str, overwrite: bool = False) -> None: return -def run_series(workdir: str, drain_shape: str, obs_data_dir: str = None) -> None: - """ - Runs saber.analyze_region on each project in the validation_runs directory - - Args: - workdir: the project working directory - drain_shape: path to the drainage line gis file - obs_data_dir: path to the directory containing the observed data - - Returns: - None - """ - gauge_table = pd.read_csv(os.path.join(workdir, 'gis_inputs', 'gauge_table.csv')) - val_workdirs = [i for i in glob.glob(os.path.join(workdir, 'validation_runs', '*')) if os.path.isdir(i)] - for val_workdir in val_workdirs: - print(f'\n\n\t\t\tworking on {val_workdir}\n\n') - analyze_region(val_workdir, drain_shape, gauge_table, obs_data_dir) - return - - def gen_val_table(workdir: str) -> pd.DataFrame: """ Prepares the validation summary table that contains the list of gauged rivers plus their statistics computed in @@ -101,7 +81,7 @@ def gen_val_table(workdir: str) -> pd.DataFrame: Returns: pandas.DataFrame """ - df = pd.read_csv(os.path.join(workdir, 'gis_inputs', 'gauge_table.csv')) + df = pd.read_csv(os.path.join(workdir, 'gis', 'gauge_table.csv')) df['100'] = 1 stats_df = {} @@ -114,11 +94,11 @@ def gen_val_table(workdir: str) -> pd.DataFrame: a.close() for d in sorted( - [a for a in glob.glob(os.path.join(workdir, 'validation_runs', '*')) if os.path.isdir(a)], + [a for a in glob.glob(os.path.join(workdir, 'validation', '*')) if os.path.isdir(a)], reverse=True ): val_percent = os.path.basename(d) - valset_gids = pd.read_csv(os.path.join(d, 'gis_inputs', 'gauge_table.csv'))[gid_col].values.tolist() + valset_gids = pd.read_csv(os.path.join(d, 'gis', 'gauge_table.csv'))[gid_col].values.tolist() # mark a column indicating the gauges included in the validation set df[val_percent] = 0 @@ -132,6 +112,6 @@ def gen_val_table(workdir: str) -> pd.DataFrame: # merge gauge_table with the stats, save and return df = df.merge(pd.DataFrame(stats_df), on=mid_col, how='inner') - df.to_csv(os.path.join(workdir, 'validation_runs', 'val_table.csv'), index=False) + df.to_csv(os.path.join(workdir, 'validation', 'val_table.csv'), index=False) return df diff --git a/setup.py b/setup.py index 4949ea8..0453132 100644 --- a/setup.py +++ b/setup.py @@ -6,12 +6,12 @@ with open('requirements.txt', 'r') as req: install_requires = req.read().splitlines() -description = 'tools for hydrological bias correction on large models' -version = '0.3.0' +description = 'tools for bias correction on large hydrological models' +version = '0.4.0' setup( - name='saber', - packages=['saber'], + name='hydrosaber', + packages=['saber', ], version=version, description=description, long_description=long_description, @@ -32,4 +32,4 @@ ], python_requires=">=3", install_requires=install_requires -) \ No newline at end of file +)