diff --git a/src/pydvl/utils/numeric.py b/src/pydvl/utils/numeric.py index 57ab98c77..8e115f6d1 100644 --- a/src/pydvl/utils/numeric.py +++ b/src/pydvl/utils/numeric.py @@ -156,14 +156,15 @@ def random_powerset_group_conditional( if n_samples is None: n_samples = np.iinfo(np.int32).max + unique_labels = np.unique(labels) while total <= n_samples: subsets: List[NDArray[T]] = [] - for label in labels: + for label in unique_labels: label_indices = np.asarray(np.where(labels == label)[0]) subset_length = int( rng.integers( - min(min_elements, len(label_indices) - 1), len(label_indices) + min(min_elements, len(label_indices)), len(label_indices) + 1 ) ) subsets.append(random_subset_of_size(s[label_indices], subset_length)) diff --git a/src/pydvl/utils/score.py b/src/pydvl/utils/score.py index 933706d98..9a8cfae6a 100644 --- a/src/pydvl/utils/score.py +++ b/src/pydvl/utils/score.py @@ -21,7 +21,13 @@ from pydvl.utils.types import SupervisedModel -__all__ = ["Scorer", "compose_score", "squashed_r2", "squashed_variance"] +__all__ = [ + "Scorer", + "ScorerCallable", + "compose_score", + "squashed_r2", + "squashed_variance", +] class ScorerCallable(Protocol): diff --git a/src/pydvl/utils/utility.py b/src/pydvl/utils/utility.py index 33b361429..7e96dc1ca 100644 --- a/src/pydvl/utils/utility.py +++ b/src/pydvl/utils/utility.py @@ -27,7 +27,7 @@ from pydvl.utils import Dataset from pydvl.utils.caching import CacheStats, memcached, serialize from pydvl.utils.config import MemcachedConfig -from pydvl.utils.score import Scorer +from pydvl.utils.score import Scorer, ScorerCallable from pydvl.utils.types import SupervisedModel __all__ = ["Utility", "DataUtilityLearning", "MinerGameUtility", "GlovesGameUtility"] @@ -110,7 +110,7 @@ def __init__( self, model: SupervisedModel, data: Dataset, - scorer: Optional[Union[str, Scorer]] = None, + scorer: Optional[Union[str, ScorerCallable]] = None, *, default_score: float = 0.0, score_range: Tuple[float, float] = (-np.inf, np.inf), diff --git a/src/pydvl/value/result.py b/src/pydvl/value/result.py index 0c7ae7fe4..b2ff03dba 100644 --- a/src/pydvl/value/result.py +++ b/src/pydvl/value/result.py @@ -499,9 +499,9 @@ def __add__(self, other: "ValuationResult") -> "ValuationResult": """ # empty results - if len(self.values) == 0: + if len(self.values) == 0 or np.all(self.counts == 0): return other - if len(other.values) == 0: + if len(other.values) == 0 or np.all(other.counts == 0): return self self._check_compatible(other) diff --git a/src/pydvl/value/shapley/classwise.py b/src/pydvl/value/shapley/classwise.py index 12ec7c015..fd80e5fb6 100644 --- a/src/pydvl/value/shapley/classwise.py +++ b/src/pydvl/value/shapley/classwise.py @@ -14,9 +14,12 @@ import numpy as np from numpy._typing import NDArray -from pydvl.utils import MapReduceJob, ParallelConfig, SupervisedModel, Utility +from pydvl.utils import MapReduceJob, ParallelConfig, Scorer, SupervisedModel, Utility -__all__ = ["class_wise_shapley", "CSScorer"] +__all__ = [ + "class_wise_shapley", + "CSScorer", +] from sklearn.metrics import accuracy_score from tqdm import tqdm @@ -24,35 +27,7 @@ from pydvl.utils.numeric import random_powerset_group_conditional from pydvl.value import StoppingCriterion, ValuationResult - -def _estimate_in_out_cls_accuracy( - model: SupervisedModel, x: np.ndarray, labels: np.ndarray, label: np.int_ -) -> Tuple[float, float]: - """ - Estimate the in and out of class accuracy as defined in [1], Equation 3. - - :param model: A model to be used for predicting the labels. - :param x: The inputs to be used for measuring the accuracies. Has to match the labels. - :param labels: The labels ot be used for measuring the accuracies. It is divided further by the passed label. - :param label: The label of the class, which is currently viewed. - :return: A tuple, containing the in class accuracy as well as the out of class accuracy. - """ - n = len(x) - y_pred = model.predict(x) - label_set_match = labels == label - label_set = np.where(label_set_match)[0] - complement_label_set = np.where(~label_set_match)[0] - - acc_in_cls = ( - accuracy_score(labels[label_set], y_pred[label_set], normalize=False) / n - ) - acc_out_of_cls = ( - accuracy_score( - labels[complement_label_set], y_pred[complement_label_set], normalize=False - ) - / n - ) - return acc_in_cls, acc_out_of_cls +IntArray = NDArray[np.int_] class CSScorer: @@ -87,13 +62,13 @@ def __call__( def _class_wise_shapley_worker( - indices: Sequence[int], + indices: np.ndarray, u: Utility, *, progress: bool = True, - num_resample_complement_sets: int = 1, done: StoppingCriterion, eps: float = 1e-4, + num_resample_complement_sets: int = 1, ) -> ValuationResult: r"""Computes the class-wise Shapley value using the formulation with permutations: @@ -119,52 +94,26 @@ def _class_wise_shapley_worker( data_names=u.data.data_names[indices], ) - x_train, y_train = u.data.get_training_data(indices) + _, y_train = u.data.get_training_data(indices) unique_labels = np.unique(y_train) pbar = tqdm(disable=not progress, position=0, total=100, unit="%") while not done(result): + pbar.n = 100 * done.completion() pbar.refresh() for idx_label, label in enumerate(unique_labels): u.scorer.label = label - active_elements = y_train == label - label_set = np.where(active_elements)[0] - complement_label_set = np.where(~active_elements)[0] - label_set = indices[label_set] - complement_label_set = indices[complement_label_set] - - _, complement_y_train = u.data.get_training_data(complement_label_set) - permutation_label_set = np.random.permutation(label_set) - - for kl, subset_complement in enumerate( - random_powerset_group_conditional( - complement_label_set, - complement_y_train, - n_samples=num_resample_complement_sets, - ) - ): - - train_set = np.concatenate((label_set, subset_complement)) - final_score = u(train_set) - prev_score = 0.0 - - for i, _ in enumerate(label_set): - - if np.abs(prev_score - final_score) < eps: - score = prev_score - - else: - train_set = np.concatenate( - (permutation_label_set[: i + 1], subset_complement) - ) - score = u(train_set) - - marginal = score - prev_score - result.update(permutation_label_set[i], marginal) - prev_score = score + result = __class_complement_conditional_sampler( + u, + label, + result, + active_indices=indices, + eps=eps, + num_samples=num_resample_complement_sets, + ) return result @@ -179,6 +128,18 @@ def class_wise_shapley( n_jobs: int = 4, config: ParallelConfig = ParallelConfig(), ) -> ValuationResult: + """ + Computes the class-wise Shapley value using the formulation with permutations using map reduce. + + :param u: Utility object with model, data, and scoring function. The scoring function has to be of type CSScorer. + :param progress: Whether to display progress bars for each job. + :param done: Criterion on when no new permutation shall be sampled. + :param normalize_score: Whether to normalize the score by the number of classes. + :param eps: The threshold when updating using the truncated monte carlo estimator. + :param n_jobs: Number of jobs to run in parallel. + :param config: Parallel configuration. + :return: ValuationResult object with the data values. + """ map_reduce_job: MapReduceJob[NDArray, ValuationResult] = MapReduceJob( u.data.indices, @@ -209,3 +170,139 @@ def class_wise_shapley( result.values[label_set] *= in_cls_acc / sigma return result + + +def __class_complement_conditional_sampler( + u: Utility, + label: int, + result: ValuationResult, + *, + active_indices: IntArray = None, + eps: float = 1e-4, + num_samples: int = 10, +) -> ValuationResult: + """ + Samples a random subset of the complement set and computes the truncated monte carlo estimator. + + :param u: Utility object with model, data, and scoring function. The scoring function has to be of type CSScorer. + :param label: The label for which the complement set shall be sampled. + :param result: The current result object. + :param active_indices: The indices of the active elements. + :param eps: The threshold when updating using the truncated monte carlo estimator. + :param num_samples: The number of subset samples to be drawn from the complement set. + :return: The updated result object. + """ + if active_indices is None: + active_indices = u.data.indices + + _, y_train = u.data.get_training_data(active_indices) + label_set, complement_label_set = __split_into_label_sets( + y_train, label, active_indices + ) + _, complement_y_train = u.data.get_training_data(complement_label_set) + + for kl, subset_complement in enumerate( + random_powerset_group_conditional( + complement_label_set, + complement_y_train, + n_samples=num_samples, + ) + ): + result = __truncated_permutation_mcmc_sampler( + u, label_set, subset_complement, result, eps + ) + return result + + +def _estimate_in_out_cls_accuracy( + model: SupervisedModel, x: np.ndarray, labels: np.ndarray, label: np.int_ +) -> Tuple[float, float]: + """ + Estimate the in and out of class accuracy as defined in [1], Equation 3. + + :param model: A model to be used for predicting the labels. + :param x: The inputs to be used for measuring the accuracies. Has to match the labels. + :param labels: The labels ot be used for measuring the accuracies. It is divided further by the passed label. + :param label: The label of the class, which is currently viewed. + :return: A tuple, containing the in class accuracy as well as the out of class accuracy. + """ + n = len(x) + y_pred = model.predict(x) + label_set_match = labels == label + label_set = np.where(label_set_match)[0] + complement_label_set = np.where(~label_set_match)[0] + + acc_in_cls = ( + accuracy_score(labels[label_set], y_pred[label_set], normalize=False) / n + ) + acc_out_of_cls = ( + accuracy_score( + labels[complement_label_set], y_pred[complement_label_set], normalize=False + ) + / n + ) + return acc_in_cls, acc_out_of_cls + + +def __split_into_label_sets( + labels: IntArray, label: int, indices: IntArray +) -> Tuple[IntArray, IntArray]: + """ + Splits the indices into two sets, one containing the indices with the label and the other the remaining indices. + :param labels: The labels of the indices. + :param label: The label to be used for splitting. + :param indices: The indices to be split. + :return: The two sets of indices. + """ + active_elements = labels == label + label_set = np.where(active_elements)[0] + complement_label_set = np.where(~active_elements)[0] + label_set = indices[label_set] + complement_label_set = indices[complement_label_set] + return label_set, complement_label_set + + +def __truncated_permutation_mcmc_sampler( + u: Utility, + class_label_set: np.ndarray, + complement_set: np.ndarray, + result: ValuationResult, + atol: float, +) -> ValuationResult: + """ + A truncated version of a permutation-based MCMC estimator for class-wise shapley values. It generates + a permutation p[i] of the class label set and adds one element at a time to estimate the utility on + the newly created training set. + + :param u: Utility object with model, data, and scoring function. The scoring function has to be of type CSScorer. + :param class_label_set: The set of indices of the class label set. + :param complement_set: The set of indices of the complement set. + :param atol: The threshold when updating using the truncated monte carlo estimator. + :return: ValuationResult object with the data values. + """ + if len(np.intersect1d(class_label_set, complement_set)) > 0: + raise ValueError( + "The class label set and the complement set have to be disjoint." + ) + + class_label_set = np.random.permutation(class_label_set) + train_set = np.concatenate((class_label_set, complement_set)) + final_score = u(train_set) + prev_score = None + + for i in range(len(class_label_set)): + + if prev_score is not None and np.abs(prev_score - final_score) < atol: + score = prev_score + + else: + train_set = np.concatenate((class_label_set[: i + 1], complement_set)) + score = u(train_set) + + if prev_score is not None: + marginal = score - prev_score + result.update(class_label_set[i], marginal) + + prev_score = score + + return result diff --git a/tests/value/shapley/test_classwise.py b/tests/value/shapley/test_classwise.py index 23a73dccc..0a9e9abe7 100644 --- a/tests/value/shapley/test_classwise.py +++ b/tests/value/shapley/test_classwise.py @@ -1,7 +1,5 @@ """ -TestCases: - -1 Not fitting utility function +Test cases for the class wise shapley value. """ from typing import Tuple @@ -10,8 +8,14 @@ import pytest from numpy._typing import NDArray -from pydvl.utils import SupervisedModel -from pydvl.value.shapley.classwise import _estimate_in_out_cls_accuracy +from pydvl.utils import Dataset, SupervisedModel, Utility +from pydvl.value import MaxChecks, ValuationResult +from pydvl.value.shapley.classwise import ( + CSScorer, + _class_wise_shapley_worker, + _estimate_in_out_cls_accuracy, +) +from tests.value import check_values @pytest.fixture(scope="function") @@ -50,3 +54,83 @@ def test_estimate_in_out_cls_accuracy( in_cls_acc_1, out_of_cls_acc_1 = _estimate_in_out_cls_accuracy(mock_model, x, y, 1) assert in_cls_acc_1 == out_of_cls_acc_0 assert in_cls_acc_0 == out_of_cls_acc_1 + + +@pytest.fixture(scope="function") +def dataset_cs_shapley() -> Dataset: + """ + A simple dataset for testing the class wise shapley value. + """ + x_train = np.arange(1, 5).reshape([-1, 1]) + y_train = np.array([0, 0, 1, 1]) + x_test = x_train + y_test = np.array([0, 0, 0, 1]) + return Dataset(x_train, y_train, x_test, y_test) + + +@pytest.fixture(scope="function") +def analytical_solution_cs_shapley( + dataset_cs_shapley: Dataset, + linear_regression_classifier: SupervisedModel, +) -> Tuple[Utility, ValuationResult]: + exact = ValuationResult( + values=np.array( + [ + 0.0, + 0.24075476562, # See + -0.0228, # Manual derivation missing + 0.0, # Manual derivation missing + ] + ) + ) + scorer = CSScorer() + utility = Utility( + linear_regression_classifier, dataset_cs_shapley, scorer, catch_errors=False + ) + return utility, exact + + +@pytest.fixture(scope="function") +def linear_regression_classifier() -> SupervisedModel: + """ + A classifier based on linear regression, so that a closed form solution exists + """ + + class _LinearRegressionBasedClassifier(SupervisedModel): + def __init__(self): + self._beta = None + + def fit(self, x: NDArray, y: NDArray) -> float: + v = x[:, 0] + self._beta = np.dot(v, y) / np.dot(v, v) + return -1 + + def predict(self, x: NDArray) -> NDArray: + if self._beta is None: + raise AttributeError("Model not fitted") + + x = x[:, 0] + probs = self._beta * x + return np.clip(np.round(probs), 0, 1).astype(int) + + def score(self, x: NDArray, y: NDArray) -> float: + pred_y = self.predict(x) + return np.sum(pred_y == y) / 4 + + return _LinearRegressionBasedClassifier() + + +def test_cs_shapley_exact_solution( + analytical_solution_cs_shapley: Tuple[Utility, ValuationResult], + n_samples: int = 100, + rtol: float = 1e-2, +): + utility, exact_values = analytical_solution_cs_shapley + values = _class_wise_shapley_worker( + utility.data.indices, + utility, + done=MaxChecks(n_samples), + progress=True, + num_resample_complement_sets=10, + ) + check_values(values, exact_values, rtol=rtol, atol=rtol)