Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented dbscan for RGDR #57

Merged
merged 10 commits into from
Aug 3, 2022
1,912 changes: 1,912 additions & 0 deletions notebooks/working_notebook.ipynb

Large diffs are not rendered by default.

10 changes: 8 additions & 2 deletions s2spy/rgdr/_map_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
A toolbox for spatial-temporal data analysis, including regression,
correlation, auto-correlation and relevant utilities functions.
"""
from typing import Tuple
Peter9192 marked this conversation as resolved.
Show resolved Hide resolved
import numpy as np
import xarray as xr
from scipy.stats import pearsonr as _pearsonr


def _pearsonr_nan(x: np.ndarray, y: np.ndarray):
"""NaN friendly implementation of scipy.stats.pearsonr. Calculates the correlation
coefficient between two arrays, as well as the p-value of this correlation.
coefficient between two arrays, as well as the p-value of this correlation. However,
instead of raising an error when encountering NaN values, this function will return
both the correlation coefficient and the p-value as NaN.

Args:
x: 1-D array
Expand All @@ -25,7 +28,9 @@ def _pearsonr_nan(x: np.ndarray, y: np.ndarray):
return _pearsonr(x, y)


def correlation(field: xr.DataArray, target: xr.DataArray, corr_dim: str = "time"):
def correlation(
field: xr.DataArray, target: xr.DataArray, corr_dim: str = "time"
) -> Tuple[xr.DataArray, xr.DataArray]:
"""Calculate correlation maps.

Args:
Expand All @@ -34,6 +39,7 @@ def correlation(field: xr.DataArray, target: xr.DataArray, corr_dim: str = "time
target data.
target: Data which has to be correlated with the spatial data. Requires a
dimension named `corr_dim`.
corr_dim: Dimension over which the correlation coefficient should be calculated.

Returns:
r_coefficient: DataArray filled with the correlation coefficient for each
Expand Down
114 changes: 106 additions & 8 deletions s2spy/rgdr/_map_regions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,120 @@
A module for clustering regions based on the given correlation
between spatial fields and target timeseries.
"""
import numpy as np
import xarray as xr
from sklearn.cluster import DBSCAN
from . import _map_utils

def spatial_mean():
"""Calculate 1-d timeseries for each precursor region. Precursor regions
are integer label masks within the np.ndarray labels.

radius_earth_km = 6371


def weighted_groupby_mean(ds, groupby: str, weight: str):
Peter9192 marked this conversation as resolved.
Show resolved Hide resolved
"""Apply a weighted mean after a groupby call. xarray does not currently support
combining `weighted` and `groupby`. An open PR adds supports for this functionality
(https://github.com/pydata/xarray/pull/5480/files), but this branch was never merged.

Args:
ds (xr.Dataset): Dataset containing the coordinates or variables specified in
the `groupby` and `weight` kwargs.
groupby (str): Coordinate which should be used to make the groups.
weight (str): Variable in the Dataset containing the weights that should be used.

Returns:
xr.Dataset: Dataset reduced using the `groupby` coordinate, using weighted average
based on `ds[weight]`.
"""
groups = ds.groupby(groupby)
means = [
g.weighted(g[weight]).mean(dim=["stacked_latitude_longitude"])
Peter9192 marked this conversation as resolved.
Show resolved Hide resolved
for _, g in groups
]
return xr.concat(means, dim=groupby)


def dbscan(
Peter9192 marked this conversation as resolved.
Show resolved Hide resolved
ds: xr.Dataset, alpha: float = 0.05, eps_km: float = 600, min_area_km2: float = None
) -> xr.Dataset:
"""Determines the clusters based on sklearn's DBSCAN implementation. Alpha determines
the mask based on the minimum p_value. Grouping can be adjusted using the `eps_km`
kwarg.

Args:
ds (xr.Dataset): Dataset containing 'latitude' and 'longitude' dimensions in
degrees. Must also contain 'p_val' and 'corr' to base the groups on.
alpha (float): Value below which the correlation is significant enough to be
considered
eps_km (float): The maximum distance (in km) between two samples for one to be
considered as in the neighborhood of the other. This is not a maximum bound
on the distances of points within a cluster. This is the most important
DBSCAN parameter to choose appropriately.

Returns:
xr.Dataset: Dataset grouped by the DBSCAN clusters. Cluster labels are negative
Peter9192 marked this conversation as resolved.
Show resolved Hide resolved
for areas with a negative correlation coefficient and positive for areas
with a positive correlation coefficient.
"""
raise NotImplementedError
ds = ds.stack(coord=["latitude", "longitude"])
coords = np.asarray(list(ds["coord"].values)) # turn array of tuples to 2d-array

# Prepare labels, default value is 0 (not in cluster)
labels = np.zeros(len(coords))

for sign, sign_mask in zip([1, -1], [ds["corr"] >= 0, ds["corr"] < 0]):
mask = np.logical_and(ds["p_val"] < alpha, sign_mask)

def cluster_dbscan():
"""Perform DBSCAN clustering from vector array or distance matrix.
if np.sum(mask) > 0: # Check if the mask contains any points to cluster
masked_coords = np.radians(coords[mask])
Peter9192 marked this conversation as resolved.
Show resolved Hide resolved
db = DBSCAN(
eps=eps_km / radius_earth_km,
min_samples=1,
algorithm="auto",
metric="haversine",
).fit(masked_coords)

labels[mask] = sign * (db.labels_ + 1)

ds["cluster_labels"] = ("coord", labels)

ds = ds.unstack(("coord"))

resolution = np.abs(ds.longitude.values[1] - ds.longitude.values[0])
ds["area"] = _map_utils.spherical_area(ds.latitude, resolution=resolution)

if min_area_km2:
ds = _map_utils.remove_small_area_clusters(ds, min_area_km2)

return ds


def cluster(
ds: xr.Dataset, alpha: float = 0.05, eps_km: float = 600, min_area_km2: float = None
) -> xr.Dataset:
"""Perform DBSCAN clustering on a prepared Dataset, and then group the data by their
determined clusters, taking the weighted mean. The weight is based on the area of
each grid cell.

Density-Based Spatial Clustering of Applications with Noise (DBSCAN).
Clusters gridcells together which are of the same sign and in proximity to
each other using DBSCAN.

The input data will be processed in this function to ensure that the distance
is free of the impact from spherical curvature. The actual Euclidean distance
is free of the impact from spherical curvature. The actual geodesic distance
will be obtained and passed to the DBSCAN clustering function.

Args:
ds: Dataset prepared to have p_val and corr.
alpha (float): Value below which the correlation is significant enough to be
considered
eps_km (float): The maximum distance (in km) between two samples for one to be
considered as in the neighborhood of the other. This is not a maximum bound
on the distances of points within a cluster. This is the most important
DBSCAN parameter to choose appropriately.

"""
raise NotImplementedError
ds = dbscan(ds, alpha=alpha, eps_km=eps_km, min_area_km2=min_area_km2)

ds = weighted_groupby_mean(ds, groupby="cluster_labels", weight="area")

return ds
62 changes: 62 additions & 0 deletions s2spy/rgdr/_map_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import numpy as np
import xarray as xr


surface_area_earth_km2 = 5.1e8


def spherical_area(latitude: float, resolution: float) -> float:
Peter9192 marked this conversation as resolved.
Show resolved Hide resolved
"""Approximate the area of a square grid cell on a spherical (!) earth.
Returns the area in square kilometers of earth surface.

Args:
latitude (float): Latitude at the center of the grid cell (deg)
resolution (float): Grid resolution (deg)

Returns:
float: Area of the grid cell (km^2)
"""
lat = np.radians(latitude)
resolution = np.radians(resolution)
h = np.sin(lat + resolution / 2) - np.sin(lat - resolution / 2)
spherical_area = h * resolution / np.pi * 4
return spherical_area * surface_area_earth_km2


def cluster_area(ds: xr.Dataset, cluster_label: float) -> float:
"""Determines the total area of a cluster. Requires the input dataset to have the
variables `area` and `cluster_labels`.

Args:
ds (xr.Dataset): Dataset containing the variables `area` and `cluster_labels`.
cluster_label (float): The label for which the area should be calculated.

Returns:
float: Area of the cluster `cluster_label`.
"""
# Use where statement to
Peter9192 marked this conversation as resolved.
Show resolved Hide resolved
return (
ds["area"].where(ds["cluster_labels"] == cluster_label).sum(skipna=True).values
)


def remove_small_area_clusters(ds: xr.Dataset, min_area_km2: float) -> xr.Dataset:
"""Removes the clusters where the area is under the input threshold.

Args:
ds (xr.Dataset): Dataset containing `cluster_labels` and `area`.
min_area_km2 (float): The minimum allowed area of each cluster

Returns:
xr.Dataset: The input dataset with the labels of the clusters set to 0 when the
area of the cluster is under the `min_area_km2` threshold.
"""
clusters = np.unique(ds["cluster_labels"])
areas = [cluster_area(ds, c) for c in clusters]
valid_clusters = np.array([c for c, a in zip(clusters, areas) if a > min_area_km2])

ds["cluster_labels"] = ds["cluster_labels"].where(
np.isin(ds["cluster_labels"], valid_clusters), 0
)

return ds
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ zip_safe = False
include_package_data = True
packages = find:
install_requires =
netcdf4
numpy
pandas
scikit-learn
Expand Down
Binary file not shown.
Binary file added tests/test_rgdr/test_data/tf5_nc5_dendo_80d77.nc
Binary file not shown.
9 changes: 3 additions & 6 deletions tests/test_rgdr/test_map_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class TestMapAnalysis:
def dummy_dataarray(self):
time = pd.date_range("20161001", "20211001", freq="60d")

da = xr.DataArray(
return xr.DataArray(
np.tile(np.arange(len(time)), (2, 2, 1)),
dims=["lat", "lon", "time"],
coords={
Expand All @@ -23,7 +23,6 @@ def dummy_dataarray(self):
"lon": np.arange(0, 2),
},
)
return da

@pytest.fixture(autouse=True)
def dummy_timeseries(self, dummy_dataarray):
Expand All @@ -48,9 +47,7 @@ def test_correlation(self, dummy_dataarray, dummy_timeseries):
def test_correlation_dim_name(self, dummy_dataarray, dummy_timeseries):
da = dummy_dataarray.rename({"time": "i_interval"})
ts = dummy_timeseries.rename({"time": "i_interval"})
c_val, p_val = _map_analysis.correlation(
da, ts, corr_dim="i_interval"
)
c_val, p_val = _map_analysis.correlation(da, ts, corr_dim="i_interval")

np.testing.assert_equal(c_val.values, 1)
np.testing.assert_equal(p_val.values, 0)
Expand All @@ -66,7 +63,7 @@ def test_correlation_wrong_field_dim_name(self, dummy_dataarray, dummy_timeserie
_map_analysis.correlation(da, dummy_timeseries)

def test_correlation_wrong_target_dims(self, dummy_dataarray):
ts = dummy_dataarray.rename({'lat': 'latitude'})
ts = dummy_dataarray.rename({"lat": "latitude"})
with pytest.raises(AssertionError):
_map_analysis.correlation(dummy_dataarray, ts)

Expand Down
67 changes: 60 additions & 7 deletions tests/test_rgdr/test_map_regions.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,69 @@
"""Tests for the s2s._RGDR.map_regions module."""
import numpy as np
import pytest
from s2spy.rgdr import _map_analysis
from s2spy.rgdr import _map_regions
from s2spy.time import AdventCalendar
import xarray as xr


# pylint: disable=protected-access
test_file_path = './tests/test_rgdr/test_data'


class TestMapRegions:
def test_spatial_mean(self):
with pytest.raises(NotImplementedError):
_map_regions.spatial_mean()
@pytest.fixture(autouse=True)
def dummy_calendar(self):
return AdventCalendar(anchor_date=(8, 31), freq="30d")

def test_cluster_dbscan(self):
with pytest.raises(NotImplementedError):
_map_regions.cluster_dbscan()
@pytest.fixture(autouse=True)
def example_target(self):
return xr.open_dataset(f'{test_file_path}/tf5_nc5_dendo_80d77.nc').sel(cluster=3)

@pytest.fixture(autouse=True)
def example_field(self):
return xr.open_dataset(
f'{test_file_path}/sst_daily_1979-2018_5deg_Pacific_175_240E_25_50N.nc')

@pytest.fixture(autouse=True)
def example_corr(self, dummy_calendar, example_target, example_field):
field = dummy_calendar.resample(example_field)
target = dummy_calendar.resample(example_target)

field['corr'], field['p_val'] = _map_analysis.correlation(
field.sst,
target.ts.sel(i_interval=0),
corr_dim='anchor_year'
)

return field

@pytest.fixture(autouse=True)
def expected_labels(self):
return np.array([
[ 0., 0., 0., 0., -1., -1., 0., -2., -2., -2., 0., 0., 0.],
[ 0., 0., 0., 0., 0., 0., 0., 0., -2., -2., -2., 0., 0.],
[ 0., 0., 0., 0., 0., 0., 0., 0., -2., -2., -2., 0., 0.],
[ 0., 0., 1., 1., 0., 0., 0., 0., 0., 0., -2., 0., 0.],
[ 1., 1., 1., 0., 0., 0., 0., 0., -2., -2., -2., -2., -2.]
])

def test_dbscan(self, example_corr, expected_labels):
clusters = _map_regions.dbscan(example_corr.sel(i_interval=1))

np.testing.assert_array_equal(clusters["cluster_labels"], expected_labels)

def test_dbscan_min_area(self, example_corr, expected_labels):
clusters = _map_regions.dbscan(example_corr.sel(i_interval=1),
min_area_km2=3000**2)
expected_labels[expected_labels == -1] = 0 # Small -1 cluster is missing

np.testing.assert_array_equal(clusters["cluster_labels"], expected_labels)

def test_cluster(self, example_corr):
clustered_data = _map_regions.cluster(example_corr.sel(i_interval=1),
eps_km=600, alpha=0.04)

np.testing.assert_array_equal(clustered_data["cluster_labels"],
[-1, 0, 1])

assert set(clustered_data.sst.dims) == {'anchor_year', 'cluster_labels'}