Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix forecasting lead times and improve forecasting functionality #132

Merged
merged 8 commits into from
Oct 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CITATION.cff
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,5 @@ keywords:
- neural processes
- active learning
license: MIT
version: 0.3.8
date-released: '2024-07-28'
version: 0.4.0
date-released: '2024-10-20'
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ data with neural processes</p>

-----------

[![release](https://img.shields.io/badge/release-v0.3.8-green?logo=github)](https://github.com/alan-turing-institute/deepsensor/releases)
[![release](https://img.shields.io/badge/release-v0.4.0-green?logo=github)](https://github.com/alan-turing-institute/deepsensor/releases)
[![Latest Docs](https://img.shields.io/badge/docs-latest-blue.svg)](https://alan-turing-institute.github.io/deepsensor/)
![Tests](https://github.com/alan-turing-institute/deepsensor/actions/workflows/tests.yml/badge.svg)
[![Coverage Status](https://coveralls.io/repos/github/alan-turing-institute/deepsensor/badge.svg?branch=main)](https://coveralls.io/github/alan-turing-institute/deepsensor?branch=main)
Expand Down
1 change: 1 addition & 0 deletions deepsensor/eval/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .metrics import *
24 changes: 24 additions & 0 deletions deepsensor/eval/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import xarray as xr
from deepsensor.model.pred import Prediction


def compute_errors(pred: Prediction, target: xr.Dataset) -> xr.Dataset:
"""
Compute errors between predictions and targets.

Args:
pred: Prediction object.
target: Target data.

Returns:
xr.Dataset: Dataset of pointwise differences between predictions and targets
at the same valid time in the predictions. Note, the difference is positive
when the prediction is greater than the target.
"""
errors = {}
for var_ID, pred_var in pred.items():
target_var = target[var_ID]
error = pred_var["mean"] - target_var.sel(time=pred_var.time)
error.name = f"{var_ID}"
errors[var_ID] = error
return xr.Dataset(errors)
77 changes: 73 additions & 4 deletions deepsensor/model/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,19 +348,49 @@ def predict(
if ar_sample and n_samples < 1:
raise ValueError("Must pass `n_samples` > 0 to use `ar_sample`.")

target_delta_t = self.task_loader.target_delta_t
dts = [pd.Timedelta(dt) for dt in target_delta_t]
dts_all_zero = all([dt == pd.Timedelta(seconds=0) for dt in dts])
if target_delta_t is not None and dts_all_zero:
forecasting_mode = False
lead_times = None
elif target_delta_t is not None and not dts_all_zero:
target_var_IDs_set = set(self.task_loader.target_var_IDs)
msg = f"""
Got more than one set of target variables in target sets,
but predictions can only be made with one set of target variables
to simplify implementation.
Got {target_var_IDs_set}.
"""
assert len(target_var_IDs_set) == 1, msg
# Repeat lead_tim for each variable in each target set
lead_times = []
for target_set_idx, dt in enumerate(target_delta_t):
target_set_dim = self.task_loader.target_dims[target_set_idx]
lead_times += [
pd.Timedelta(dt, unit=self.task_loader.time_freq)
for _ in range(target_set_dim)
]
forecasting_mode = True
else:
forecasting_mode = False
lead_times = None

if type(tasks) is Task:
tasks = [tasks]

if n_samples >= 1:
B.set_random_seed(seed)
np.random.seed(seed)

dates = [task["time"] for task in tasks]
init_dates = [task["time"] for task in tasks]

# Flatten tuple of tuples to single list
target_var_IDs = [
var_ID for set in self.task_loader.target_var_IDs for var_ID in set
]
if lead_times is not None:
assert len(lead_times) == len(target_var_IDs)

# TODO consider removing this logic, can we just depend on the dim names in X_t?
if not unnormalise:
Expand Down Expand Up @@ -450,11 +480,13 @@ def predict(
pred = Prediction(
target_var_IDs,
pred_params_to_store,
dates,
init_dates,
X_t,
X_t_mask,
coord_names,
n_samples=n_samples,
forecasting_mode=forecasting_mode,
lead_times=lead_times,
)

def unnormalise_pred_array(arr, **kwargs):
Expand Down Expand Up @@ -605,14 +637,22 @@ def unnormalise_pred_array(arr, **kwargs):
# Assign predictions to Prediction object
for param, arr in prediction_arrs.items():
if param != "mixture_probs":
pred.assign(param, task["time"], arr)
pred.assign(param, task["time"], arr, lead_times=lead_times)
elif param == "mixture_probs":
assert arr.shape[0] == self.N_mixture_components, (
f"Number of mixture components ({arr.shape[0]}) does not match "
f"model attribute N_mixture_components ({self.N_mixture_components})."
)
for component_i, probs in enumerate(arr):
pred.assign(f"{param}_{component_i}", task["time"], probs)
pred.assign(
f"{param}_{component_i}",
task["time"],
probs,
lead_times=lead_times,
)

if forecasting_mode:
pred = add_valid_time_coord_to_pred(pred)

if verbose:
dur = time.time() - tic
Expand All @@ -621,6 +661,35 @@ def unnormalise_pred_array(arr, **kwargs):
return pred


def add_valid_time_coord_to_pred(pred: Prediction) -> Prediction:
"""
Add a valid time coordinate "time" to a Prediction object based on the
initialisation times "init_time" and lead times "lead_time".

Args:
pred (:class:`~.model.pred.Prediction`):
Prediction object to add valid time coordinate to.

Returns:
:class:`~.model.pred.Prediction`:
Prediction object with valid time coordinate added.
"""
for var_ID in pred.keys():
if isinstance(pred[var_ID], pd.DataFrame):
x = pred[var_ID].reset_index()
pred[var_ID]["time"] = (x["lead_time"] + x["init_time"]).values
print(f"{x}")
print(f"{x.dtypes}")
elif isinstance(pred[var_ID], xr.Dataset):
x = pred[var_ID]
pred[var_ID] = pred[var_ID].assign_coords(
time=x["lead_time"] + x["init_time"]
)
else:
raise ValueError(f"Unsupported prediction type {type(pred[var_ID])}.")
return pred


def main(): # pragma: no cover
import deepsensor.tensorflow
from deepsensor.data.loader import TaskLoader
Expand Down
103 changes: 86 additions & 17 deletions deepsensor/model/pred.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import pandas as pd
import xarray as xr

Timestamp = Union[str, pd.Timestamp, np.datetime64]


class Prediction(dict):
"""
Expand Down Expand Up @@ -32,13 +34,20 @@ class Prediction(dict):
n_samples (int)
Number of joint samples to draw from the model. If 0, will not
draw samples. Default 0.
forecasting_mode (bool)
If True, stored forecast predictions with an init_time and lead_time dimension,
and a valid_time coordinate. If False, stores prediction at t=0 only
(i.e. spatial interpolation), with only a single time dimension. Default False.
lead_times (List[pd.Timedelta], optional)
List of lead times to store in predictions. Must be provided if
forecasting_mode is True. Default None.
"""

def __init__(
self,
target_var_IDs: List[str],
pred_params: List[str],
dates: List[Union[str, pd.Timestamp]],
dates: List[Timestamp],
X_t: Union[
xr.Dataset,
xr.DataArray,
Expand All @@ -50,6 +59,8 @@ def __init__(
X_t_mask: Optional[Union[xr.Dataset, xr.DataArray]] = None,
coord_names: dict = None,
n_samples: int = 0,
forecasting_mode: bool = False,
lead_times: Optional[List[pd.Timedelta]] = None,
):
self.target_var_IDs = target_var_IDs
self.X_t_mask = X_t_mask
Expand All @@ -58,6 +69,13 @@ def __init__(
self.x1_name = coord_names["x1"]
self.x2_name = coord_names["x2"]

self.forecasting_mode = forecasting_mode
if forecasting_mode:
assert (
lead_times is not None
), "If forecasting_mode is True, lead_times must be provided."
self.lead_times = lead_times

self.mode = infer_prediction_modality_from_X_t(X_t)

self.pred_params = pred_params
Expand All @@ -67,15 +85,25 @@ def __init__(
*[f"sample_{i}" for i in range(n_samples)],
]

# Create empty xarray/pandas objects to store predictions
if self.mode == "on-grid":
for var_ID in self.target_var_IDs:
# Create empty xarray/pandas objects to store predictions
if self.forecasting_mode:
prepend_dims = ["lead_time"]
prepend_coords = {"lead_time": lead_times}
else:
prepend_dims = None
prepend_coords = None
self[var_ID] = create_empty_spatiotemporal_xarray(
X_t,
dates,
data_vars=self.pred_params,
coord_names=coord_names,
prepend_dims=prepend_dims,
prepend_coords=prepend_coords,
)
if self.forecasting_mode:
self[var_ID] = self[var_ID].rename(time="init_time")
if self.X_t_mask is None:
# Create 2D boolean array of True values to simplify indexing
self.X_t_mask = (
Expand All @@ -86,8 +114,18 @@ def __init__(
)
elif self.mode == "off-grid":
# Repeat target locs for each date to create multiindex
idxs = [(date, *idxs) for date in dates for idxs in X_t.index]
index = pd.MultiIndex.from_tuples(idxs, names=["time", *X_t.index.names])
if self.forecasting_mode:
index_names = ["lead_time", "init_time", *X_t.index.names]
idxs = [
(lt, date, *idxs)
for lt in lead_times
for date in dates
for idxs in X_t.index
]
else:
index_names = ["time", *X_t.index.names]
idxs = [(date, *idxs) for date in dates for idxs in X_t.index]
index = pd.MultiIndex.from_tuples(idxs, names=index_names)
for var_ID in self.target_var_IDs:
self[var_ID] = pd.DataFrame(index=index, columns=self.pred_params)

Expand All @@ -106,6 +144,7 @@ def assign(
prediction_parameter: str,
date: Union[str, pd.Timestamp],
data: np.ndarray,
lead_times: Optional[List[pd.Timedelta]] = None,
):
"""

Expand All @@ -117,11 +156,29 @@ def assign(
data (np.ndarray)
If off-grid: Shape (N_var, N_targets) or (N_samples, N_var, N_targets).
If on-grid: Shape (N_var, N_x1, N_x2) or (N_samples, N_var, N_x1, N_x2).
lead_time (pd.Timedelta, optional)
Lead time of the forecast. Required if forecasting_mode is True. Default None.
"""
if self.forecasting_mode:
assert (
lead_times is not None
), "If forecasting_mode is True, lead_times must be provided."

msg = f"""
If forecasting_mode is True, lead_times must be of equal length to the number of
variables in the data (the first dimension). Got {lead_times=} of length
{len(lead_times)} lead times and data shape {data.shape}.
"""
assert len(lead_times) == data.shape[0], msg

if self.mode == "on-grid":
if prediction_parameter != "samples":
for var_ID, pred in zip(self.target_var_IDs, data):
self[var_ID][prediction_parameter].loc[date].data[
for i, (var_ID, pred) in enumerate(zip(self.target_var_IDs, data)):
if self.forecasting_mode:
index = (lead_times[i], date)
else:
index = date
self[var_ID][prediction_parameter].loc[index].data[
self.X_t_mask.data
] = pred.ravel()
elif prediction_parameter == "samples":
Expand All @@ -130,28 +187,44 @@ def assign(
f"have shape (N_samples, N_var, N_x1, N_x2). Got {data.shape}."
)
for sample_i, sample in enumerate(data):
for var_ID, pred in zip(self.target_var_IDs, sample):
self[var_ID][f"sample_{sample_i}"].loc[date].data[
for i, (var_ID, pred) in enumerate(
zip(self.target_var_IDs, sample)
):
if self.forecasting_mode:
index = (lead_times[i], date)
else:
index = date
self[var_ID][f"sample_{sample_i}"].loc[index].data[
self.X_t_mask.data
] = pred.ravel()

elif self.mode == "off-grid":
if prediction_parameter != "samples":
for var_ID, pred in zip(self.target_var_IDs, data):
self[var_ID][prediction_parameter].loc[date] = pred
for i, (var_ID, pred) in enumerate(zip(self.target_var_IDs, data)):
if self.forecasting_mode:
index = (lead_times[i], date)
else:
index = date
self[var_ID][prediction_parameter].loc[index] = pred
elif prediction_parameter == "samples":
assert len(data.shape) == 3, (
f"If prediction_parameter is 'samples', and mode is 'off-grid', data must"
f"have shape (N_samples, N_var, N_targets). Got {data.shape}."
)
for sample_i, sample in enumerate(data):
for var_ID, pred in zip(self.target_var_IDs, sample):
self[var_ID][f"sample_{sample_i}"].loc[date] = pred
for i, (var_ID, pred) in enumerate(
zip(self.target_var_IDs, sample)
):
if self.forecasting_mode:
index = (lead_times[i], date)
else:
index = date
self[var_ID][f"sample_{sample_i}"].loc[index] = pred


def create_empty_spatiotemporal_xarray(
X: Union[xr.Dataset, xr.DataArray],
dates: List,
dates: List[Timestamp],
coord_names: dict = None,
data_vars: List[str] = None,
prepend_dims: Optional[List[str]] = None,
Expand Down Expand Up @@ -231,10 +304,6 @@ def create_empty_spatiotemporal_xarray(
# Convert time coord to pandas timestamps
pred_ds = pred_ds.assign_coords(time=pd.to_datetime(pred_ds.time.values))

# TODO: Convert init time to forecast time?
# pred_ds = pred_ds.assign_coords(
# time=pred_ds['time'] + pd.Timedelta(days=task_loader.target_delta_t[0]))

return pred_ds


Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = deepsensor
version = 0.3.8
version = 0.4.0
author = Tom R. Andersson
author_email = [email protected]
description = A Python package for modelling xarray and pandas data with neural processes.
Expand Down
Loading
Loading