-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Code for prediction and cross-validation.
- Loading branch information
Showing
4 changed files
with
245 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
import numpy as np | ||
|
||
from .nested_cv import nested_cv_ridge | ||
from .kfold import split_kfold, split_kfold_simple | ||
|
||
|
||
def compute_ss0(y, folds): | ||
""" | ||
Compute the sum of squares based on null models (i.e., always predict the average `y` of the training data). | ||
Parameters | ||
---------- | ||
y : ndarray | ||
folds : list of ndarray | ||
Each element is an ndarray of integers, which are the indices of members of the fold. | ||
Returns | ||
------- | ||
ss0 : float | ||
The sum of squares based on null models. | ||
""" | ||
|
||
yhat0 = np.zeros_like(y) | ||
for test_idx in folds: | ||
m = np.ones_like(y, dtype=bool) | ||
m[test_idx] = False | ||
yhat0[test_idx] = y[m].mean() | ||
ss0 = np.sum((y - yhat0)**2) | ||
return ss0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,58 @@ | ||
import numpy as np | ||
|
||
|
||
def split_kfold(families, k=10, seed=None): | ||
""" | ||
k-fold cross-validation that takes family information into consideration and guarantees that members from the same family are in the same fold. | ||
Parameters | ||
---------- | ||
families : list of ndarrays | ||
Each element is an ndarray of integers, which are indices for members of a family. | ||
k : int | ||
The number of data folds. | ||
seed : {int, None} | ||
Returns | ||
------- | ||
folds : list of lists | ||
Each element (fold) is a list, which are families that are in the fold. | ||
""" | ||
d = {} | ||
for fam in families: | ||
l = len(fam) | ||
if l not in d: | ||
d[l] = [fam] | ||
else: | ||
d[l].append(fam) | ||
lengths = sorted(d.keys())[::-1] | ||
|
||
rng = np.random.default_rng(seed) | ||
|
||
folds = [[] for _ in range(k)] | ||
expected_sizes = [len(_) for _ in np.array_split(np.concatenate(families), k)] | ||
current_sizes = np.zeros((k, ), dtype=int) | ||
for l in lengths: | ||
while True: | ||
s = len(d[l]) | ||
if s == 0: | ||
break | ||
fam = d[l].pop(rng.choice(s)) | ||
while True: | ||
k_ = rng.choice(k) | ||
if current_sizes[k_] + len(fam) <= expected_sizes[k_]: | ||
folds[k_].append(fam) | ||
current_sizes[k_] += len(fam) | ||
break | ||
return folds | ||
|
||
|
||
def split_kfold_simple(families, k=10, seed=None): | ||
""" | ||
Similar to `split_kfold` but does NOT take family information into consideration. That is, for some test subjects, other members of the family MAY be used for training. | ||
""" | ||
ss = np.concatenate(families) | ||
rng = np.random.default_rng(seed) | ||
rng.shuffle(ss) | ||
folds = np.array_split(ss, k) | ||
return folds |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
import numpy as np | ||
from sklearn.model_selection import StratifiedKFold | ||
|
||
from IDM_pred.ridge import ridge, grid_ridge | ||
|
||
|
||
def nested_cv_ridge( | ||
X, y, test_index, n_bins=4, n_folds=3, | ||
alphas = 10**np.linspace(-20, 20, 81), | ||
npcs=[10, 20, 40, 80, 160, 320, None], | ||
train_index=None, | ||
): | ||
""" | ||
Predict the scores of the testing subjects based on data from the training subjects using ridge regression. Hyperparameters are chosen based on a nested cross-validation. The inner-loop of the nested cross-validation is a stratified k-fold cross-validation. | ||
Parameters | ||
---------- | ||
X : ndarray of shape (n_samples, n_features) | ||
y : ndarray of shape (n_samples, ) | ||
test_idx : ndarray of shape (n_test_samples, ) | ||
Indices for the samples that are used for testing. | ||
n_bins : int | ||
Training data are divided into `n_bins` bins for stratified k-fold cross-validation. | ||
n_folds : int | ||
Number of folds for stratified k-fold cross-validation. | ||
alphas : {list, ndarray of shape (n_alphas, )} | ||
Choices of the regularization parameter for ridge regression. | ||
npcs : list | ||
Choices of the number of PCs used in the prediction model in increasing order. Each element in the list should be an integer or `None`. `None` means all PCs are used. | ||
train_idx : {None, ndarray of shape (n_training_samples, )} | ||
Indices for the samples that are used for training. If it is `None`, then all the samples except for the test samples are used. | ||
Returns | ||
------- | ||
yhat : ndarray of shape (n_test_samples, ) | ||
Predicted scores for the test samples. | ||
alpha : float | ||
The chosen element of `alphas` based on nested cross-validation. | ||
npc : {int, None} | ||
The chosen element of `npcs` based on nested cross-validation. | ||
cost : float | ||
The cost based on the chosen hyperparameters, which is the minimum cost for training data among all hyperparameter choices. | ||
""" | ||
if train_index is None: | ||
train_index = np.setdiff1d(np.arange(X.shape[0], dtype=int), test_index) | ||
X_train, X_test = X[train_index], X[test_index] | ||
y_train, y_test = y[train_index], y[test_index] | ||
|
||
bin_limits = np.histogram(y_train, n_bins)[1] | ||
bins = np.digitize(y_train, bin_limits[:-1]) | ||
|
||
cv = StratifiedKFold(n_splits=n_folds) | ||
costs = [] | ||
for train, test in cv.split(X_train, bins): | ||
yhat = grid_ridge(X_train[train], X_train[test], y_train[train], alphas, npcs) | ||
cost = ((y_train[test][:, np.newaxis, np.newaxis] - yhat)**2).sum(axis=0) | ||
costs.append(cost) | ||
costs = np.sum(costs, axis=0) | ||
a, b = np.unravel_index(costs.argmin(), costs.shape) | ||
|
||
alpha = alphas[a] | ||
npc = npcs[b] | ||
|
||
yhat = ridge(X_train, X_test, y_train, alpha, npc) | ||
|
||
return yhat, alpha, npc, costs[a, b] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
import numpy as np | ||
from scipy.linalg import svd, LinAlgError | ||
|
||
|
||
def ridge(X_train, X_test, y_train, alpha, npc, fit_intercept=True): | ||
""" | ||
Parameters | ||
---------- | ||
X_train : ndarray of shape (n_training_samples, n_features) | ||
X_test : ndarray of shape (n_test_samples, n_features) | ||
y_train : ndarray of shape (n_training_samples, ) | ||
alpha : float | ||
The regularization parameter for ridge regression. | ||
npc : {int, None} | ||
The number of PCs used in the prediction model. `None` means all PCs are used. | ||
Returns | ||
------- | ||
yhat : ndarray of shape (n_test_samples, ) | ||
Predicted values of the target variable. | ||
""" | ||
# TODO multiple measures | ||
if fit_intercept: | ||
X_offset = np.mean(X_train, axis=0, keepdims=True) | ||
y_offset = np.mean(y_train, axis=0, keepdims=True) | ||
X_train = X_train - X_offset | ||
y_train = y_train - y_offset | ||
try: | ||
U, s, Vt = svd(X_train, full_matrices=False) | ||
except LinAlgError: | ||
U, s, Vt = svd(X_train, full_matrices=False, lapack_driver='gesvd') | ||
if fit_intercept: | ||
X_test_V = (X_test - X_offset) @ Vt.T[:, :npc] | ||
else: | ||
X_test_V = X_test @ Vt.T[:, :npc] | ||
UT_y = U.T[:npc, :] @ y_train | ||
d = s[:npc] / (s[:npc]**2 + alpha) | ||
d_UT_y = d * UT_y | ||
yhat = (X_test_V @ d_UT_y) | ||
if fit_intercept: | ||
yhat += y_offset | ||
return yhat | ||
|
||
|
||
def grid_ridge(X_train, X_test, y_train, alphas, npcs, fit_intercept=True): | ||
""" | ||
Parameters | ||
---------- | ||
X_train : ndarray of shape (n_training_samples, n_features) | ||
X_test : ndarray of shape (n_test_samples, n_features) | ||
y_train : ndarray of shape (n_training_samples, ) | ||
alphas : {list, ndarray of shape (n_alphas, )} | ||
Choices of the regularization parameter for ridge regression. | ||
npcs : {list, ndarray of shape (n_npcs, )} | ||
Choices of the number of PCs used in the prediction model in increasing order. Each element in the list should be an integer or `None`. `None` means all PCs are used. | ||
Returns | ||
------- | ||
yhat : ndarray of shape (n_test_samples, n_alphas, n_npcs) | ||
""" | ||
if fit_intercept: | ||
X_offset = np.mean(X_train, axis=0, keepdims=True) | ||
y_offset = np.mean(y_train, axis=0, keepdims=True) | ||
X_train = X_train - X_offset | ||
y_train = y_train - y_offset | ||
try: | ||
U, s, Vt = svd(X_train, full_matrices=False) | ||
except LinAlgError: | ||
U, s, Vt = svd(X_train, full_matrices=False, lapack_driver='gesvd') | ||
|
||
if fit_intercept: | ||
X_test_V = (X_test - X_offset) @ Vt.T # (n_test_samples, k) | ||
else: | ||
X_test_V = X_test @ Vt.T | ||
UT_y = U.T @ y_train # (k, ) | ||
|
||
d = s[:, np.newaxis] / ((s**2)[:, np.newaxis] + np.array(alphas)[np.newaxis, :]) # (k, n_alphas) | ||
if len(y_train.shape) == 1: | ||
d_UT_y = d * UT_y[..., np.newaxis] # (k, n_alphas) | ||
yhat = np.zeros((X_test.shape[0], len(alphas), len(npcs))) | ||
else: | ||
d_UT_y = d[:, np.newaxis, :] * UT_y[..., np.newaxis] # (k, n_measures, n_alphas) | ||
yhat = np.zeros((X_test.shape[0], y_train.shape[1], len(alphas), len(npcs))) | ||
y_offset = y_offset[:, :, np.newaxis, np.newaxis] | ||
|
||
npcs_ = [0] + list(npcs) | ||
for i in range(len(npcs)): | ||
yhat[..., i] = np.tensordot(X_test_V[:, npcs_[i]:npcs[i]], d_UT_y[npcs_[i]:npcs[i]], axes=(1, 0)) | ||
yhat = np.cumsum(yhat, axis=-1) | ||
if fit_intercept: | ||
yhat += y_offset | ||
return yhat |