Skip to content

Commit

Permalink
Implement algorithm from paper `CS-Shapley: Class-wise Shapley Values…
Browse files Browse the repository at this point in the history
… for Data Valuation in Classification` (https://arxiv.org/abs/2211.06800)
  • Loading branch information
Markus Semmler committed Aug 12, 2023
1 parent e42c304 commit 4bd92ec
Show file tree
Hide file tree
Showing 19 changed files with 1,780 additions and 45 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
[PR #382](https://github.com/appliedAI-Initiative/pyDVL/pull/382)
- Decouple ray.init from ParallelConfig
[PR #373](https://github.com/appliedAI-Initiative/pyDVL/pull/383)
- **New Method**: Add classwise Shapley algorithm.
[PR #338](https://github.com/appliedAI-Initiative/pyDVL/pull/338)

## 0.6.1 - 🏗 Bug fixes and small improvement

Expand Down
46 changes: 46 additions & 0 deletions docs/30-data-valuation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,52 @@ useful in applications.
u=utility, mode="truncated_montecarlo", done=MaxUpdates(1000)
)
Classwise Shapley
^^^^^^^^^^^^^^^^^^

A different schema applicable for classification problems first appeared in
:footcite:t:`schoch_csshapley_2022`. The key insight is that samples can be beneficial
for overall performance, while being detrimental for their own class. This could be an
indication of some problem with the data. CS-Shapley changes the utility to account for
this effect by decomposing it into a product of two functions: one gives
priority to in-class accuracy, while the other adds a slight discount which
increases as the out-of-class accuracy increases.

The value is computed as:

$$
v_u(x_i) \approx \frac{1}{K \cdot L}
\sum_{S^{(k)}_{-y_i} \subseteq T_{-y_i} \setminus \{i\}}
\sum_{\sigma^{(l)} \in \Pi(T_{y_i} \setminus \{i\})}
[u( \sigma_{\colon i} \cup \{i\} | S_{-y_i} )
− u( \sigma_{\colon i} | S_{-y_i})]
$$

where $K$ is the number of subsets $S^{(k)}_{-y_i}$ sampled from the class complement
set $T_{-y_i}$ of class c and $L$ is the number of permutations sampled from the class
indices set $T_{y_i}$. The scoring function used has the form

$$u(S_{y_i}|S_{-y_i}) = a_S(D_{y_i}))) \exp\{a_S(D_{-y_i}))\}.$$

This can be further customised, but that form is shown by the authors to have certain
desirable properties.

.. code-block:: python
from pydvl.utils import Dataset, Utility
from pydvl.value import compute_shapley_values
model = ...
scoring = ClassWiseScorer("accuracy")
data = Dataset(...)
utility = Utility(model, data, scoring)
values = classwise_shapley(
utility,
done=HistoryDeviation(n_steps=500, rtol=1e-3),
n_resample_complement_sets=10,
normalize_values=True
)
Exact Shapley for KNN
^^^^^^^^^^^^^^^^^^^^^
Expand Down
1 change: 1 addition & 0 deletions src/pydvl/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class ParallelConfig:
address: Optional[Union[str, Tuple[str, int]]] = None
n_cpus_local: Optional[int] = None
logging_level: int = logging.WARNING
_temp_dir: Optional[str] = None

def __post_init__(self) -> None:
if self.address is not None and self.n_cpus_local is not None:
Expand Down
9 changes: 4 additions & 5 deletions src/pydvl/utils/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ def indices(self):
"""
return self._indices

@indices.setter
def indices(self, indices: np.ndarray):
self._indices = indices

@property
def data_names(self):
"""Names of each individual datapoint.
Expand Down Expand Up @@ -410,11 +414,6 @@ def __init__(
def __len__(self):
return len(self.groups)

@property
def indices(self):
"""Indices of the groups."""
return self._indices

# FIXME this is a misnomer, should be `names` in `Dataset` so that here it
# makes sense
@property
Expand Down
84 changes: 83 additions & 1 deletion src/pydvl/utils/numeric.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,22 @@
"""
from __future__ import annotations

import logging
import os
import random
import time
from itertools import chain, combinations
from typing import Collection, Generator, Iterator, Optional, Tuple, TypeVar, overload
from typing import (
Collection,
Generator,
Iterator,
List,
Optional,
Tuple,
TypeVar,
cast,
overload,
)

import numpy as np
from numpy.typing import NDArray
Expand All @@ -17,10 +31,15 @@
"random_matrix_with_condition_number",
"random_subset",
"random_powerset",
"random_powerset_group_conditional",
"random_subset_of_size",
"top_k_value_accuracy",
]


logger = logging.getLogger(__name__)


T = TypeVar("T", bound=np.generic)


Expand Down Expand Up @@ -110,6 +129,69 @@ def random_powerset(
total += 1


def random_powerset_group_conditional(
s: NDArray[T],
groups: NDArray[np.int_],
min_elements_per_group: int = 1,
) -> Generator[NDArray[T], None, None]:
"""
Draw infinite random group-conditional subsets from the passed set s. It is ensured
that in each sampled set, each unique group is represented at least ``min_elements``
times. The groups are specified as integers for all elements of the set separately.
:param s: Vector of size N representing the set to sample elements from.
:param groups: Vector of size N containing the group as an integer for each element.
:param min_elements_per_group: The minimum number of elements for each group.
:return: Generated draw from the power set of s with ``min_elements`` of each group.
:raises: TypeError: If the data ``s`` or ``groups`` is not a NumPy array.
:raises: ValueError: If the length of ``s``and ``groups`` different or
``min_elements`` is smaller than 0.
"""
if not isinstance(s, np.ndarray):
raise TypeError("Set must be an NDArray")

if not isinstance(groups, np.ndarray):
raise TypeError("Labels must be an NDArray")

if len(groups) != len(s):
raise ValueError("Set and labels have to be of same size.")

if min_elements_per_group < 0:
raise ValueError(
f"Parameter min_elements={min_elements_per_group} needs to be bigger or equal to 0."
)

if min_elements_per_group == 0:
logger.warning(
"It is recommended to ensure at least one element of each group is"
" contained in the sampled and yielded set."
)

rng = np.random.default_rng()
unique_labels = np.unique(groups)

while True:
subsets: List[NDArray[T]] = []
for label in unique_labels:
label_indices = np.asarray(np.where(groups == label)[0])
subset_length = int(
rng.integers(
min(min_elements_per_group, len(label_indices)),
len(label_indices) + 1,
)
)
if subset_length > 0:
subsets.append(random_subset_of_size(s[label_indices], subset_length))

if len(subsets) > 0:
subset = np.concatenate(tuple(subsets))
rng.shuffle(subset)
yield subset
else:
yield np.array([])


def random_subset_of_size(s: NDArray[T], size: int) -> NDArray[T]:
"""Samples a random subset of given size uniformly from the powerset
of ``s``.
Expand Down
151 changes: 147 additions & 4 deletions src/pydvl/utils/score.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
This module provides a :class:`Scorer` class that wraps scoring functions with
additional information.
Scorers can be constructed in the same way as in scikit-learn: either from
Scorers can be constructed in the same way as in scikit-learn: either from
known strings or from a callable. Greater values must be better. If they are not,
a negated version can be used, see scikit-learn's `make_scorer()
<https://scikit-learn.org/stable/modules/generated/sklearn.metrics.make_scorer.html>`_.
Expand All @@ -17,11 +17,17 @@
import numpy as np
from numpy.typing import NDArray
from scipy.special import expit
from sklearn.metrics import get_scorer
from sklearn.metrics import accuracy_score, get_scorer, make_scorer

from pydvl.utils.types import SupervisedModel

__all__ = ["Scorer", "compose_score", "squashed_r2", "squashed_variance"]
__all__ = [
"Scorer",
"ClasswiseScorer",
"compose_score",
"squashed_r2",
"squashed_variance",
]


class ScorerCallable(Protocol):
Expand Down Expand Up @@ -58,7 +64,7 @@ class Scorer:
def __init__(
self,
scoring: Union[str, ScorerCallable],
default: float = np.nan,
default: float = 0.0,
range: Tuple = (-np.inf, np.inf),
name: Optional[str] = None,
):
Expand All @@ -81,6 +87,143 @@ def __repr__(self):
return f"{capitalized_name} (scorer={self._scorer})"


class ClasswiseScorer(Scorer):
"""A Scorer which is applicable for valuation in classification problems. Its value
is based on in-cls and out-of-cls score :footcite:t:`schoch_csshapley_2022`. For
each class ``label`` it separates the elements into two groups, namely in-cls
instances and out-of-cls instances. The value function itself than estimates the
in-cls metric discounted by the out-of-cls metric. In other words the value function
for each element of one class is conditioned on the out-of-cls instances (or a
subset of it). The form of the value function can be written as
.. math::
v_{y_i}(D) = f(a_S(D_{y_i}))) * g(a_S(D_{-y_i})))
where f and g are continuous, monotonic functions and D is the test set.
in order to produce meaningful results. For further reference see also section four
of :footcite:t:`schoch_csshapley_2022`.
:param default: Score used when a model cannot be fit, e.g. when too little data is
passed, or errors arise.
:param range: Numerical range of the score function. Some Monte Carlo methods can
use this to estimate the number of samples required for a certain quality of
approximation. If not provided, it can be read from the ``scoring`` object if it
provides it, for instance if it was constructed with
:func:`~pydvl.utils.types.compose_score`.
:param in_class_discount_fn: Continuous, monotonic increasing function used to
discount the in-class score.
:param out_of_class_discount_fn: Continuous, monotonic increasing function used to
discount the out-of-class score.
:param initial_label: Set initial label (Doesn't require to set parameter ``label``
on ``ClassWiseDiscountedScorer`` in first iteration)
:param name: Name of the scorer. If not provided, the name of the passed
function will be prefixed by 'classwise '.
.. versionadded:: 0.7.0
"""

def __init__(
self,
scoring: str = "accuracy",
default: float = 0.0,
range: Tuple[float, float] = (-np.inf, np.inf),
in_class_discount_fn: Callable[[float], float] = lambda x: x,
out_of_class_discount_fn: Callable[[float], float] = np.exp,
initial_label: Optional[int] = None,
name: Optional[str] = None,
):
disc_score_in_cls = in_class_discount_fn(range[1])
disc_score_out_of_cls = out_of_class_discount_fn(range[1])
transformed_range = (0, disc_score_in_cls * disc_score_out_of_cls)
super().__init__(
"accuracy",
range=transformed_range,
default=default,
name=name or f"classwise {scoring}",
)
self._in_cls_discount_fn = in_class_discount_fn
self._out_of_cls_discount_fn = out_of_class_discount_fn
self.label = initial_label

def __str__(self):
return self._name

def __call__(
self: "ClasswiseScorer",
model: SupervisedModel,
x_test: NDArray[np.float_],
y_test: NDArray[np.int_],
) -> float:
"""
:param model: Model used for computing the score on the validation set.
:param x_test: Array containing the features of the classification problem.
:param y_test: Array containing the labels of the classification problem.
:return: Calculated score.
"""
in_cls_score, out_of_cls_score = self.estimate_in_cls_and_out_of_cls_score(
model, x_test, y_test
)
disc_score_in_cls = self._in_cls_discount_fn(in_cls_score)
disc_score_out_of_cls = self._out_of_cls_discount_fn(out_of_cls_score)
return disc_score_in_cls * disc_score_out_of_cls

def estimate_in_cls_and_out_of_cls_score(
self,
model: SupervisedModel,
x_test: NDArray[np.float_],
y_test: NDArray[np.int_],
rescale_scores: bool = True,
) -> Tuple[float, float]:
r"""
Computes in-class and out-of-class scores using the provided scoring function,
which can be expressed as:
.. math::
a_S(D=\{(\hat{x}_1, \hat{y}_1), \dots, (\hat{x}_K, \hat{y}_K)\}) &=
\frac{1}{N} \sum_k s(y(\hat{x}_k), \hat{y}_k)
In this context, the computation is performed twice: once on D_i and once on D_o
to calculate the in-class and out-of-class scores. Here, D_i contains only
samples with the specified 'label' from the validation set, while D_o contains
all other samples. By default, the scores are scaled to have the same order of
magnitude. In such cases, the raw scores are multiplied by:
.. math::
N_{y_i} = \frac{a_S(D_{y_i})}{a_S(D_{y_i})+a_S(D_{-y_i})} \quad \text{and}
\quad N_{-y_i} = \frac{a_S(D_{-y_i})}{a_S(D_{y_i})+a_S(D_{-y_i})}
:param model: Model used for computing the score on the validation set.
:param x_test: Array containing the features of the classification problem.
:param y_test: Array containing the labels of the classification problem.
:param rescale_scores: If set to True, the scores will be denormalized. This is
particularly useful when the inner score is calculated by an estimator of
the form 1/N sum_i x_i.
:return: Tuple containing the in-class and out-of-class scores.
"""
scorer = self._scorer
label_set_match = y_test == self.label
label_set = np.where(label_set_match)[0]
num_classes = len(np.unique(y_test))

if len(label_set) == 0:
return 0, 1 / (num_classes - 1)

complement_label_set = np.where(~label_set_match)[0]
in_cls_score = scorer(model, x_test[label_set], y_test[label_set])
out_of_cls_score = scorer(
model, x_test[complement_label_set], y_test[complement_label_set]
)

if rescale_scores:
n_in_cls = np.count_nonzero(y_test == self.label)
n_out_of_cls = len(y_test) - n_in_cls
in_cls_score *= n_in_cls / (n_in_cls + n_out_of_cls)
out_of_cls_score *= n_out_of_cls / (n_in_cls + n_out_of_cls)

return in_cls_score, out_of_cls_score


def compose_score(
scorer: Scorer,
transformation: Callable[[float], float],
Expand Down
14 changes: 14 additions & 0 deletions src/pydvl/utils/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import numpy as np
from numpy.typing import NDArray


def arr_or_writeable_copy(arr: NDArray) -> NDArray:
"""Return a copy of ``arr`` if it's not writeable, otherwise return ``arr``.
:param arr: Array to copy if it's not writeable.
:return: Copy of ``arr`` if it's not writeable, otherwise ``arr``.
"""
if not arr.flags.writeable:
return np.copy(arr)

return arr
Loading

0 comments on commit 4bd92ec

Please sign in to comment.