diff --git a/python/dp_accounting/dp_accounting/pld/privacy_loss_distribution.py b/python/dp_accounting/dp_accounting/pld/privacy_loss_distribution.py index 7a691d13..4da3fd30 100644 --- a/python/dp_accounting/dp_accounting/pld/privacy_loss_distribution.py +++ b/python/dp_accounting/dp_accounting/pld/privacy_loss_distribution.py @@ -1330,6 +1330,138 @@ def single_discrete_gaussian_pld( sampling_prob) +def from_double_mixture_gaussian_mechanism( + standard_deviation: float, + sensitivities_upper: Sequence[float], + sensitivities_lower: Sequence[float], + sampling_probs_upper: Sequence[float], + sampling_probs_lower: Sequence[float], + pessimistic_estimate: bool = True, + value_discretization_interval: float = 1e-4, + log_mass_truncation_bound: float = -50, + use_connect_dots: bool = True, +) -> PrivacyLossDistribution: + """Creates the pld of a Double Mixture of Gaussians mechanism. + + This method supports two algorithms for constructing the privacy loss + distribution. One given by the "Privacy Buckets" algorithm and other given by + "Connect the Dots" algorithm. See Sections 2.1 and 2.2 of supplementary + material for more details. + + Args: + standard_deviation: the standard_deviation of the Gaussian distribution. + sensitivities_upper: the support of the first mixture's + sensitivity distribution. + Must be the same length as sampling_probs_upper, and both should be 1D. + sensitivities_lower: the support of the second mixture's + sensitivity distribution. + Must be the same length as sampling_probs_lower, and both should be 1D. + sampling_probs_upper: the probabilities associated with sensitivities_upper. + sampling_probs_lower: the probabilities associated with sensitivities_lower. + pessimistic_estimate: a value indicating whether the rounding is done in + such a way that the resulting epsilon-hockey stick divergence computation + gives an upper estimate to the real value. + value_discretization_interval: the length of the dicretization interval for + the privacy loss distribution. The values will be rounded up/down to be + integer multiples of this number. Smaller value results in more accurate + estimates of the privacy loss, at the cost of increased run-time / memory + usage. + log_mass_truncation_bound: the ln of the probability mass that might be + discarded from the noise distribution. The larger this number, the more + error it may introduce in divergence calculations. + use_connect_dots: When True (default), the connect-the-dots algorithm will + be used to construct the privacy loss distribution. When False, the + privacy buckets algorithm will be used. + + Returns: + The privacy loss distribution corresponding to the Mixture of Gaussians + mechanism with given parameters. + """ + + pmf = _create_pld_pmf_from_additive_noise( + privacy_loss_mechanism.DoubleMixtureGaussianPrivacyLoss( + standard_deviation, + sensitivities_upper, + sensitivities_lower, + sampling_probs_upper, + sampling_probs_lower, + pessimistic_estimate=pessimistic_estimate, + log_mass_truncation_bound=log_mass_truncation_bound, + ), + pessimistic_estimate=pessimistic_estimate, + value_discretization_interval=value_discretization_interval, + use_connect_dots=use_connect_dots, + ) + + return PrivacyLossDistribution(pmf) + + +def from_double_mixture_laplace_mechanism( + scale: float, + sensitivities_upper: Sequence[float], + sensitivities_lower: Sequence[float], + sampling_probs_upper: Sequence[float], + sampling_probs_lower: Sequence[float], + pessimistic_estimate: bool = True, + value_discretization_interval: float = 1e-4, + log_mass_truncation_bound: float = -50, + use_connect_dots: bool = True, +) -> PrivacyLossDistribution: + """Creates the pld of a Double Mixture of Laplace mechanism. + + This method supports two algorithms for constructing the privacy loss + distribution. One given by the "Privacy Buckets" algorithm and other given by + "Connect the Dots" algorithm. See Sections 2.1 and 2.2 of supplementary + material for more details. + + Args: + scale: the scale of the Laplace distribution. + sensitivities_upper: the support of the first mixture's + sensitivity distribution. + Must be the same length as sampling_probs_upper, and both should be 1D. + sensitivities_lower: the support of the second mixture's + sensitivity distribution. + Must be the same length as sampling_probs_lower, and both should be 1D. + sampling_probs_upper: the probabilities associated with sensitivities_upper. + sampling_probs_lower: the probabilities associated with sensitivities_lower. + pessimistic_estimate: a value indicating whether the rounding is done in + such a way that the resulting epsilon-hockey stick divergence computation + gives an upper estimate to the real value. + value_discretization_interval: the length of the dicretization interval for + the privacy loss distribution. The values will be rounded up/down to be + integer multiples of this number. Smaller value results in more accurate + estimates of the privacy loss, at the cost of increased run-time / memory + usage. + log_mass_truncation_bound: the ln of the probability mass that might be + discarded from the noise distribution. The larger this number, the more + error it may introduce in divergence calculations. + use_connect_dots: When True (default), the connect-the-dots algorithm will + be used to construct the privacy loss distribution. When False, the + privacy buckets algorithm will be used. + + Returns: + The privacy loss distribution corresponding to the Mixture of Gaussians + mechanism with given parameters. + """ + + pmf = _create_pld_pmf_from_additive_noise( + privacy_loss_mechanism.DoubleMixtureLaplacePrivacyLoss( + scale, + sensitivities_upper, + sensitivities_lower, + sampling_probs_upper, + sampling_probs_lower, + pessimistic_estimate=pessimistic_estimate, + log_mass_truncation_bound=log_mass_truncation_bound, + ), + pessimistic_estimate=pessimistic_estimate, + value_discretization_interval=value_discretization_interval, + use_connect_dots=use_connect_dots, + ) + + return PrivacyLossDistribution(pmf) + + def from_mixture_gaussian_mechanism( standard_deviation: float, sensitivities: Sequence[float], diff --git a/python/dp_accounting/dp_accounting/pld/privacy_loss_mechanism.py b/python/dp_accounting/dp_accounting/pld/privacy_loss_mechanism.py index 62858d7e..b56cf767 100644 --- a/python/dp_accounting/dp_accounting/pld/privacy_loss_mechanism.py +++ b/python/dp_accounting/dp_accounting/pld/privacy_loss_mechanism.py @@ -26,7 +26,7 @@ import functools import math import numbers -from typing import Iterable, Mapping, Optional, Sequence, Union +from typing import Iterable, Mapping, Optional, Sequence, Tuple, Union import numpy as np import scipy @@ -1693,151 +1693,214 @@ def _pl_without_sampling_remove(pl, sampling_prob): return -_pl_without_sampling_add(-pl, sampling_prob) -class MixtureGaussianPrivacyLoss(AdditiveNoisePrivacyLoss): - """Privacy loss of the Mixture of Gaussians mechanism. - - This class gives the privacy loss for a scalar Gaussian mechanism where the - sensitivity is a random variable equal to sensitivities[i] with probability - sampling_probs[i]. +class DoubleMixturePrivacyLoss(AdditiveNoisePrivacyLoss): + """Privacy loss of a Double Mixture mechanism. - The privacy loss distribution of the Mixture of Gaussians mechanism is - equivalent to the privacy loss distribution between the Gaussian distribution - and the same distribution convolved with the discrete distribution specified - by sensitivities and sampling_probs. That is, let mu be the Gaussian noise PDF - with sigma = standard_deviation. The privacy loss distribution is generated as - follows: + It is the privacy loss distribution between two mixtures, each of which + is some base continuous distribution convolved with + a discrete distribution specified by sensitivities and sampling_probs. + That is, let mu be some continuous PDF. + The privacy loss distribution is generated as follows: For ADD adjacency type: - - Let mu_lower(x) := sum over i of sampling_probs[i] * - mu(x - sensitivities[i]) - - Sample x ~ mu_upper = mu and let the privacy loss be - ln(mu_upper(x) / mu_lower(x)). - For REMOVE adjacency type: - - Let mu_upper(x) := sum over i of sampling_probs[i] * - mu(x + sensitivities[i]) - - Sample x ~ mu_lower = mu and let the privacy loss be + - Let mu_upper(x) := sum over i of sampling_probs_upper[i] * + mu(x + sensitivities_upper[i]) + - Let mu_lower(x) := sum over j of sampling_probs_lower[j] * + mu(x - sensitivities_lower[j]) + - Sample x ~ mu_upper and let the privacy loss be ln(mu_upper(x) / mu_lower(x)). - For example: - sensitivities = [1.0], sampling_probs = [1.0] captures the sensitivity-1 - Gaussian mechanism. - sensitivities = [0.0, 1.0], sampling_probs = [0.9, 0.1] captures the - sensitivity-1 subsampled Gaussian mechanism with sampling probability - 0.1. - sensitivities = [0.0, 1.0, 2.0], sampling_probs = [0.81, 0.18, 0.01] - captures a generalization of the subsampled Gaussian mechanism where - we can potentially sample the sensitive example twice, each time - with sampling probability 0.1. - - This class can be used for vector-valued mechanisms via Corollary 4.7 from the - following paper: - Title: Privacy Amplification for Matrix Mechanisms - Authors: C. A. Choquette-Choo, A. Ganesh, T. Steinke, A. Thakurta - Link: https://arxiv.org/abs/2310.15526 - In particular, this corollary shows the vector-valued mixture of Gaussians - mechanism is dominated by the scalar-valued mechanism given by replacing each - sensitivity vector with its norm. - - For almost all methods (the exception is inverse_privacy_losses, for reasons - discussed in the docstring for that method), we reimplement the corresponding - method in GaussianPrivacyLoss, but with some minor changes to account for the - fact that this class is more general. + For example, when mu is a zero-mean Gaussian distribution: + sensitivities_upper = [1.0], sampling_probs_upper = [1.0], + sensitivities_lower = [0.0], sampling_probs_lower = [1.0] + captures the sensitivity-1 Gaussian mechanism. + sensitivities_upper = [0.0, 1.0], sampling_probs_upper = [0.9, 0.1] + sensitivities_lower = [0.0], sampling_probs_lower = [1.0] + captures the sensitivity-1 subsampled Gaussian mechanism. + sensitivities_upper = [0.0, 1.0], sampling_probs_upper = [0.9, 0.1] + sensitivities_lower = [0.0, 2.0, 2.0], + sampling_probs_lower = [0.8, 0.1, 0.1] + captures a generalization where mu_upper and mu_lower can sample + different sensitivities, and mu_lower samples the sensitive example twice, + each time with sampling probability 0.1. + + The concrete methods in this base class make the following assumptions + about privacy loss l(x) = log(mu_upper(x) / mu_lower(x)): + - l is monotonically decreasing + - l is strictly monotonic on some interval [a, b] with a < b + These assumptions hold for common mu (Gausian, Laplace, ...), + but methods may have to be overriden for atypical distributions. Attributes: - sensitivities: the support of the sensitivity distribution. - sampling_probs: the probabilities associated with the sensitivities. + sensitivities_upper: the support of the upper sensitivity distribution. + sensitivities_lower: the support of the lower sensitivity distribution. + sampling_probs_upper: the probabilities associated with sensitivities_upper. + sampling_probs_lower: the probabilities associated with sensitivities_upper. """ - def __init__( # pylint: disable=super-init-not-called self, - standard_deviation: float, - sensitivities: Sequence[float], - sampling_probs: Sequence[float], + sensitivities_upper: Sequence[float], + sensitivities_lower: Sequence[float], + sampling_probs_upper: Sequence[float], + sampling_probs_lower: Sequence[float], pessimistic_estimate: bool = True, log_mass_truncation_bound: float = -50, - adjacency_type: AdjacencyType = AdjacencyType.REMOVE, ) -> None: - """Initializes the privacy loss of the MoG mechanism. + """Initializes the privacy loss of a DoubleMixture mechanism. Args: - standard_deviation: The standard_deviation of the Gaussian distribution. - sensitivities: The support of the sensitivity distribution. Must be the - same length as sampling_probs, and both should be 1D. - sampling_probs: The probabilities associated with the sensitivities. + sensitivities_upper: The support of the upper sensitivity distribution. + Must be the ame length as sampling_probs_upper, and both should be 1D. + sampling_probs_upper: Probabilities associated with sensitivities_upper. + sensitivities_lower: The support of the lower sensitivity distribution. + Must be the ame length as sampling_probs_lower, and both should be 1D. + sampling_probs_lower: Probabilities associated with sensitivities_lower. pessimistic_estimate: A value indicating whether the rounding is done in such a way that the resulting epsilon-hockey stick divergence computation gives an upper estimate to the real value. log_mass_truncation_bound: The ln of the probability mass that might be discarded from the noise distribution. The larger this number, the more error it may introduce in divergence calculations. - adjacency_type: Type of adjacency relation to be used for defining the - privacy loss distribution. Raises: - ValueError: If args are invalid, e.g. standard_deviation is negative or - sensitivities and sampling_probs are different lengths. + ValueError: If args are invalid, e.g. sensitivities and sampling_probs + are different lengths. """ - if standard_deviation <= 0: - raise ValueError( - 'Standard deviation is not a positive real number: ' - f'{standard_deviation}' - ) + if log_mass_truncation_bound > 0: raise ValueError( 'Log mass truncation bound is not a non-positive real ' f'number: {log_mass_truncation_bound}' ) - if len(sampling_probs) != len(sensitivities): + + if len(sampling_probs_upper) != len(sensitivities_upper): raise ValueError( 'sensitivities and sampling_probs must have the same ' - f'length. Got {sampling_probs=} of length {len(sampling_probs)}, ' - f'{sensitivities=} of length {len(sensitivities)}.' + f'length. Got {sampling_probs_upper=} ' + f'of length {len(sampling_probs_upper)}, ' + f'{sensitivities_upper=} of length {len(sensitivities_upper)}.' ) - non_zero_indices = np.asarray(sampling_probs) != 0.0 - sensitivities = np.asarray(sensitivities)[non_zero_indices] - sampling_probs = np.asarray(sampling_probs)[non_zero_indices] - if np.any(sensitivities < 0): - raise ValueError( - f'Sensitivities contains a negative number: {sensitivities}.' - ) - if sensitivities.max() == 0.0: - raise ValueError('Must have at least one positive sensitivity.') - if not math.isclose(sum(sampling_probs), 1): + if len(sampling_probs_lower) != len(sampling_probs_lower): raise ValueError( - f'Probabilities do not add up to 1: {sum(sampling_probs)}' + 'sensitivities and sampling_probs must have the same ' + f'length. Got {sampling_probs_upper=} ' + f'of length {len(sampling_probs_upper)}, ' + f'{sampling_probs_lower=} of length {len(sampling_probs_lower)}.' ) - for sampling_prob in sampling_probs: - if sampling_prob <= 0 or sampling_prob > 1: + + non_zero_indices_upper = np.asarray(sampling_probs_upper) != 0.0 + sensitivities_upper = np.asarray(sensitivities_upper)[ + non_zero_indices_upper] + sampling_probs_upper = np.asarray(sampling_probs_upper)[ + non_zero_indices_upper] + + non_zero_indices_lower = np.asarray(sampling_probs_lower) != 0.0 + sensitivities_lower = np.asarray(sensitivities_lower)[ + non_zero_indices_lower] + sampling_probs_lower = np.asarray(sampling_probs_lower)[ + non_zero_indices_lower] + + if np.any(sensitivities_upper < 0) or np.any(sensitivities_lower < 0): raise ValueError( - f'Sampling probability is not in (0,1] : {sampling_prob}' + 'Sensitivities contain a negative number. ' + f'Got {sensitivities_upper=}, ' + f'{sensitivities_lower=}.' ) + + if not (math.isclose(sum(sampling_probs_upper), 1) + and + math.isclose(sum(sampling_probs_lower), 1)): + raise ValueError( + 'Probabilities do not add up to 1. ' + f'sum(sampling_probs_upper)={sum(sampling_probs_upper)}, ' + f'sum(sampling_probs_lower)={sum(sampling_probs_lower)}.' + ) + + if (np.any((sampling_probs_upper <= 0) | (sampling_probs_upper > 1)) + or np.any((sampling_probs_lower <= 0) | (sampling_probs_lower > 1))): + + raise ValueError( + 'Sampling probabilities are not in (0,1].' + ) + self.discrete_noise = False - self.adjacency_type = adjacency_type - self.sampling_probs = sampling_probs - self.sensitivities = sensitivities - self._standard_deviation = standard_deviation - self._variance = standard_deviation**2 + + self.sampling_probs_upper = sampling_probs_upper + self.sensitivities_upper = sensitivities_upper + self.sampling_probs_lower = sampling_probs_lower + self.sensitivities_lower = sensitivities_lower + self._pessimistic_estimate = pessimistic_estimate self._log_mass_truncation_bound = log_mass_truncation_bound # Constant properties. - self._log_sampling_probs = np.log(self.sampling_probs) - self._pos_sampling_probs = self.sampling_probs[self.sensitivities > 0.0] - self._sampling_prob = np.clip(self._pos_sampling_probs.sum(), 0, 1) - nonzero_sens = self.sensitivities[self.sensitivities > 0.0] - self._min_sens = np.min(nonzero_sens) - self._max_sens = np.max(nonzero_sens) - self._gaussian_random_variable = stats.norm(scale=standard_deviation) + self._log_sampling_probs_upper = np.log(self.sampling_probs_upper) + self._pos_sampling_probs_upper = self.sampling_probs_upper[ + self.sensitivities_upper > 0.0] + self._sampling_prob_upper = np.clip(self._pos_sampling_probs_upper.sum(), + 0, 1) + self._max_sens_upper = self.sensitivities_upper.max() + + self._log_sampling_probs_lower = np.log(self.sampling_probs_lower) + self._pos_sampling_probs_lower = self.sampling_probs_lower[ + self.sensitivities_lower > 0.0] + self._sampling_prob_lower = np.clip(self._pos_sampling_probs_lower.sum(), + 0, 1) + self._max_sens_lower = self.sensitivities_lower.max() + + if (self._max_sens_upper <= 0) and (self._max_sens_lower <= 0): + raise ValueError('Must have at least one positive sensitivity, ' + f'but {self._max_sens_upper=} and ' + f'{self._max_sens_lower=}.') + + @property + @abc.abstractmethod + def _strictly_monotonic_interval(self) -> Optional[Tuple[float, float]]: + """Interval on which privacy loss is strictly decreasing. + + Returns: + Optional[Tuple[float, float]]: Left and right boundary of the interval. + None if no such interval exists or privacy loss is not + (non-strictly) monotonic everywhere. + """ + raise NotImplementedError + + @property + @abc.abstractmethod + def _privacy_loss_at_boundaries(self) -> Optional[Tuple[float, float]]: + """Privacy loss at left and right boundary of _strictly_monotonic_interval. + + Returns: + Optional[Tuple[float, float]]: Privacy loss l(a) and l(b) + when _strictly_monotonic_interval=[a, b]. + None if _strictly_monotonic_interval=None. + """ + raise NotImplementedError + + def _verify_monotonicity(self) -> None: + """Verifies that privacy loss fulfills monotonicity requirements. + + These requiremnts are: + - l is monotonically decreasing + - l is strictly monotonic on some interval [a, b] with a < b + + Raises: + ValueError: If monotonicity requirements are violated. + """ + if ((self._strictly_monotonic_interval is None) + or (self._strictly_monotonic_interval[1] + <= self._strictly_monotonic_interval[0])): + raise ValueError('Privacy loss must be strictly monotonically ' + 'decreasing on some non-empty interval, but ' + f'{self._strictly_monotonic_interval=}.') def mu_upper_cdf( self, x: Union[float, Iterable[float]] ) -> Union[float, np.ndarray]: """Computes the cumulative density function of the mu_upper distribution. - For ADD adjacency type: - mu_upper(x) := mu - For REMOVE adjacency type: - mu_upper(x) := sum of sampling_probs[i] * mu(x + sensitivities[i]) + mu_upper(x) := sum of sampling_probs_upper[i] * + mu(x + sensitivities_upper[i]) Args: x: the point or points at which the cumulative density function is to be @@ -1847,27 +1910,21 @@ def mu_upper_cdf( The cumulative density function of the mu_upper distribution at x, i.e., the probability that mu_upper is less than or equal to x. """ - if self.adjacency_type == AdjacencyType.ADD: - return self.noise_cdf(x) - else: # Case: self.adjacency_type == AdjacencyType.REMOVE - points_per_sens = np.add.outer(np.atleast_1d(x), self.sensitivities) - output = (self.noise_cdf(points_per_sens) * self.sampling_probs).sum( - axis=1 - ) - if isinstance(x, numbers.Number): - return output[0] - else: - return output + points_per_sens = np.add.outer(np.atleast_1d(x), self.sensitivities_upper) + output = (self.noise_cdf(points_per_sens) * self.sampling_probs_upper).sum( + axis=1 + ) + if isinstance(x, numbers.Number): + return output[0] + else: + return output def mu_lower_log_cdf( - self, x: Union[float, Iterable[float]] - ) -> Union[float, np.ndarray]: + self, x: Union[float, Iterable[float]]) -> Union[float, np.ndarray]: """Computes log cumulative density function of the mu_lower distribution. - For ADD adjacency type: - mu_lower(x) := sum of sampling_probs[i] * mu(x - sensitivities[i]) - For REMOVE adjacency type: - mu_lower(x) := mu + mu_lower(x) := sum of sampling_probs_lower[i] * + mu(x - sensitivities_lower[i]) Args: x: the point or points at which the log of the cumulative density function @@ -1878,22 +1935,19 @@ def mu_lower_log_cdf( x, i.e., the log of the probability that mu_lower is less than or equal to x. """ - if self.adjacency_type == AdjacencyType.ADD: - points_per_sens = np.add.outer(np.atleast_1d(x), -self.sensitivities) - logcdf_per_sens = self.noise_log_cdf(points_per_sens) - output = scipy.special.logsumexp( - logcdf_per_sens, axis=1, b=self.sampling_probs - ) - if isinstance(x, numbers.Number): - return output[0] - else: - return output - else: # Case: self.adjacency_type == AdjacencyType.REMOVE - return self.noise_log_cdf(x) + points_per_sens = np.add.outer(np.atleast_1d(x), -self.sensitivities_lower) + logcdf_per_sens = self.noise_log_cdf(points_per_sens) + output = scipy.special.logsumexp( + logcdf_per_sens, axis=1, b=self.sampling_probs_lower + ) + if isinstance(x, numbers.Number): + return output[0] + else: + return output def get_delta_for_epsilon( - self, epsilon: Union[float, Sequence[float]] - ) -> Union[float, list[float]]: + self, epsilon: Union[float, Sequence[float]], + ) -> Union[float, Sequence[float]]: """Computes the epsilon-hockey stick divergence of the mechanism. Args: @@ -1903,43 +1957,51 @@ def get_delta_for_epsilon( Returns: A non-negative real number which is the epsilon-hockey stick divergence of the mechanism, or a numpy array if epsilon is list-like. + + Raises: + ValueError: If monotonicity requirements for privacy loss or epsilons + are violated. """ - epsilons = np.atleast_1d(epsilon) + self._verify_monotonicity() + + is_scalar = isinstance(epsilon, numbers.Number) + epsilons = np.array([epsilon]) if is_scalar else np.asarray(epsilon) if not np.all(epsilons[1:] >= epsilons[:-1]): raise ValueError(f'Epsilon values must be non-decreasing: {epsilons}') deltas = np.zeros_like(epsilons, dtype=float) - if self._sampling_prob == 1.0: - inverse_indices = np.full_like(epsilons, True, dtype=bool) - elif self.adjacency_type == AdjacencyType.ADD: - inverse_indices = epsilons < -np.log1p(-self._sampling_prob) - else: # Case: self.adjacency_type == AdjacencyType.REMOVE - inverse_indices = epsilons > np.log1p(-self._sampling_prob) - other_indices = np.logical_not(inverse_indices) - deltas[other_indices] = -np.expm1(epsilons[other_indices]) - x_cutoffs = self.inverse_privacy_losses(epsilons[inverse_indices]) - deltas[inverse_indices] = self.mu_upper_cdf(x_cutoffs) - np.exp( - epsilons[inverse_indices] + self.mu_lower_log_cdf(x_cutoffs) - ) + + left_boundary_mask = (epsilons >= self._privacy_loss_at_boundaries[0]) + x_left_boundary = self._strictly_monotonic_interval[0] + deltas[left_boundary_mask] = ( + self.mu_upper_cdf(x_left_boundary) - + np.exp(epsilons[left_boundary_mask] + + self.mu_lower_log_cdf(x_left_boundary))) + + right_boundary_mask = (epsilons <= self._privacy_loss_at_boundaries[1]) + deltas[right_boundary_mask] = -np.expm1(epsilons[right_boundary_mask]) + + inverse_indices = np.logical_not( + np.logical_or(left_boundary_mask, right_boundary_mask)) + + x_cutoffs = np.array([ + self.inverse_privacy_loss(eps) for eps in epsilons[inverse_indices] + ]) + deltas[inverse_indices] = ( + self.mu_upper_cdf(x_cutoffs) - + np.exp(epsilons[inverse_indices] + self.mu_lower_log_cdf(x_cutoffs))) # Clip delta values to lie in [0,1] (to avoid numerical errors) deltas = np.clip(deltas, 0, 1) - if isinstance(epsilon, numbers.Number): - return float(deltas) - else: - # For numerical stability reasons, deltas may not be non-increasing. This - # is fixed post-hoc at small cost in accuracy. - for i in reversed(range(deltas.shape[0] - 1)): - deltas[i] = max(deltas[i], deltas[i + 1]) - return deltas + return float(deltas[0]) if is_scalar else deltas def privacy_loss_tail( - self, precision: float = 1e-4 + self, precision: float = 1e-4 ) -> TailPrivacyLossDistribution: """Computes the privacy loss at the tail of the random-sensitivity Gaussian. - For ADD adjacency type: The upper distribution is a single Gaussian and we - can exactly compute the tails easily. + If max(sensitivity_upper) = 0: The upper distribution has a single component + and we can exactly compute the tails easily. - For REMOVE adjacency type: We set upper_x_truncation such that + Otherwise: We set upper_x_truncation such that CDF(upper_x_truncation) = 1 - 0.5 * exp(log_mass_truncation_bound). It is worthwhile to spend some up-front computation getting a more precise value for lower_x_truncation to save computation later on. So we binary search @@ -1952,8 +2014,7 @@ def privacy_loss_tail( Args: precision: The additive error we will compute the truncation values - within. That is, when we binary search for log_mass_truncation_bound in - the REMOVE case, we terminate the binary search when the interval has + within. That is, we terminate the binary search when the interval has length at most precision, and then use the more conservative endpoint of the interval as our truncation value. @@ -1962,16 +2023,16 @@ def privacy_loss_tail( privacy loss distribution. """ tail_mass = 0.5 * np.exp(self._log_mass_truncation_bound) - z_value = self._gaussian_random_variable.ppf(tail_mass) + z_value = self.noise_ppf(tail_mass) upper_x_truncation = -z_value - if self.adjacency_type == AdjacencyType.ADD: + if self._max_sens_upper == 0.0: lower_x_truncation = z_value else: # Case: self.adjacency_type == AdjacencyType.REMOVE lower_x_truncation = common.inverse_monotone_function( self.mu_upper_cdf, tail_mass, common.BinarySearchParameters( - z_value - self._max_sens, + z_value - self._max_sens_upper, z_value, tolerance=precision ), @@ -1995,7 +2056,7 @@ def privacy_loss_tail( ) def connect_dots_bounds(self) -> ConnectDotsBounds: - """Computes the bounds on epsilon values to use in connect-the-dots algorithm. + """Computes bounds on epsilon values to use in connect-the-dots algorithm. Returns: A ConnectDotsBounds instance containing upper and lower values of @@ -2008,103 +2069,23 @@ def connect_dots_bounds(self) -> ConnectDotsBounds: epsilon_lower=self.privacy_loss(tail_pld.upper_x_truncation), ) - @functools.cached_property - def _precompute_privacy_loss_constants(self) -> np.ndarray: - """(Pre-)computes the constants in the expression for the privacy loss.""" - sens_loss = self.privacy_loss_for_single_gaussian( - np.repeat(0, len(self.sensitivities)), self.sensitivities - ) - if self.adjacency_type == AdjacencyType.ADD: - return self._log_sampling_probs - sens_loss - else: # Case: self.adjacency_type == AdjacencyType.REMOVE - return self._log_sampling_probs + sens_loss - def privacy_loss(self, x: float) -> float: """Computes the privacy loss at a given point `x`.""" - # Because this method is called many times, we opt to precompute as much - # as possible. It can be see that the computation below is equivalent to: - # - # x_rep = np.repeat(x, len(self.sensitivities)) - # privacy_losses_without_subsampling = ( - # self.privacy_loss_for_single_gaussian(x_rep, self.sensitivities) - # ) - # if self.adjacency_type == AdjacencyType.ADD: - # summands = self._log_sampling_probs - privacy_losses_without_subsampling - # return -np.logaddexp.reduce(summands) - # - # and similarly for .REMOVE. - x_loss = self.sensitivities * x / (self._variance) - if self.adjacency_type == AdjacencyType.ADD: - summands = self._precompute_privacy_loss_constants + x_loss - return -np.logaddexp.reduce(summands) - else: # Case: self.adjacency_type == AdjacencyType.REMOVE - summands = self._precompute_privacy_loss_constants - x_loss - return np.logaddexp.reduce(summands) - - def privacy_loss_without_subsampling(self, x: float) -> float: - raise NotImplementedError( - 'MixtureGaussianPrivacyLoss uses multiple sensitivities, so ' - 'privacy loss without subsampling is ill-defined. Use ' - 'privacy_loss_for_single_gaussian instead.' - ) - - def privacy_loss_for_single_gaussian( - self, - x: Union[float, np.ndarray], - sensitivity: Union[float, np.ndarray] = 1.0 - ) -> np.ndarray: - """Computes the privacy loss of the Gaussian mechanism. + p_upper = scipy.special.logsumexp( + self.noise_log_pdf(x + self.sensitivities_upper), + b=self.sampling_probs_upper) - Args: - x: the point(s) at which the privacy loss is computed. - sensitivity: The sensitivity/sensitivities of interest. + p_lower = scipy.special.logsumexp( + self.noise_log_pdf(x - self.sensitivities_lower), + b=self.sampling_probs_lower) - Returns: - The privacy loss of the Gaussian mechanism without sub-sampling at - point(s) x with the given sensitivity, which is given as - For ADD adjacency type: - sensitivity * (0.5 * sensitivity - x) / standard_deviation^2. - For REMOVE adjacency type: - sensitivity * (- 0.5 * sensitivity - x) / standard_deviation^2. - """ - x = np.atleast_1d(x) - if self.adjacency_type == AdjacencyType.ADD: - scale = 0.5 - else: # Case: self.adjacency_type == AdjacencyType.REMOVE - scale = -0.5 - return sensitivity * (scale * sensitivity - x) / (self._variance) + return p_upper - p_lower - def inverse_privacy_loss_without_subsampling( - self, privacy_loss: float - ) -> float: + def privacy_loss_without_subsampling(self, x: float) -> float: raise NotImplementedError( - 'MixtureGaussianPrivacyLoss uses multiple sensitivities, so ' - 'inverse_privacy_loss_without_subsampling is ill-defined. Use ' - 'inverse_privacy_loss_for_single_gaussian instead.' - ) - - def inverse_privacy_loss_for_single_gaussian( - self, - privacy_loss: Union[float, np.ndarray], - sensitivity: Union[float, np.ndarray] = 1.0, - ) -> Union[float, np.ndarray]: - """Computes the inverse privacy loss for the Gaussian mechanism. - - Args: - privacy_loss: the privacy loss value(s). - sensitivity: sensitivity/sensitivies of the Gaussian. - - Returns: - The largest float(s) x such that the privacy loss at x is at least - privacy_loss. REMOVE is the same as ADD, minus sensitivity. - """ - add_inverse_loss = ( - 0.5 * sensitivity - privacy_loss * (self._variance) / sensitivity + 'MixturePrivacyLoss uses multiple sensitivities, so ' + 'privacy loss without subsampling is ill-defined.' ) - if self.adjacency_type == AdjacencyType.ADD: - return add_inverse_loss - else: # Case: self.adjacency_type == AdjacencyType.REMOVE - return add_inverse_loss - sensitivity def inverse_privacy_loss( self, privacy_loss: float, precision: float = 1e-6 @@ -2131,6 +2112,14 @@ def inverse_privacy_loss( self.inverse_privacy_losses(np.atleast_1d(privacy_loss), precision)[0] ) + def inverse_privacy_loss_without_subsampling( + self, privacy_loss: float + ) -> float: + raise NotImplementedError( + 'DoubleMixturePrivacyLoss uses multiple sensitivities, so ' + 'inverse_privacy_loss_without_subsampling is ill-defined.' + ) + def inverse_privacy_losses( self, privacy_losses: np.ndarray, @@ -2157,7 +2146,13 @@ def inverse_privacy_losses( Returns: For each l in privacy_losses, the smallest multiple of precision, x, such that the privacy loss at x is at most l. + + Raises: + ValueError: If monotonicity requirements for inversion of the privacy loss + via binary search are violated. """ + self._verify_monotonicity() + if not (np.diff(privacy_losses) >= 0).all(): raise ValueError( f'Expected non-decreasing privacy_losses, got: {privacy_losses}.' @@ -2165,75 +2160,92 @@ def inverse_privacy_losses( if len(privacy_losses) == 0: # pylint: disable=g-explicit-length-test return np.ndarray([]) - # If we have a non-zero probability of choosing sensitivity = 0, then the - # privacy loss does not take on all values in [-inf, inf], and so we need to - # make sure all values in privacy_losses are in the proper range for the - # given adjacency type. - - # Some privacy losses might be close to the privacy loss at x = +/-inf, in - # which case we report the corresponding infinity for them. - min_pl = privacy_losses[0] + # Some privacy losses might be close to the privacy loss at x=a or a=b, + # where [a, b] is the interval on which the privacy loss is strictly + # monotonic, in which case we report the corresponding a or infinity. max_pl = privacy_losses[-1] - log_1m_prob = ( - math.log1p(-self._sampling_prob) if self._sampling_prob < 1 else -np.inf - ) - if self.adjacency_type == AdjacencyType.ADD: - if max_pl > -log_1m_prob: + min_pl = privacy_losses[0] + output = np.empty_like(privacy_losses) + + if max_pl > self._privacy_loss_at_boundaries[0]: raise ValueError( f'max of privacy_losses ({max_pl}) is larger than ' - f'-log(1 - sampling_prob)={-log_1m_prob}.' + f'{self._privacy_loss_at_boundaries[0]=}.' ) - finite_indices = np.logical_not(np.isclose(privacy_losses, -log_1m_prob)) - max_pl = np.max(privacy_losses[finite_indices]) - else: # Case: self.adjacency_type == AdjacencyType.REMOVE - if min_pl <= log_1m_prob: + + left_boundary_mask = np.isclose(privacy_losses, + self._privacy_loss_at_boundaries[0]) + output[left_boundary_mask] = self._strictly_monotonic_interval[0] + + if min_pl <= self._privacy_loss_at_boundaries[1]: raise ValueError( f'min of privacy_losses ({min_pl}) is smaller than ' - f'log(1 - sampling_prob)={log_1m_prob}' + f'{self._privacy_loss_at_boundaries[1]=}' ) - finite_indices = np.logical_not(np.isclose(privacy_losses, log_1m_prob)) - min_pl = np.min(privacy_losses[finite_indices]) - # Now, we need to determine a suitable range to binary search over. To do - # this, we consider the subsampled Gaussian mechanisms given by moving - # all the probability mass on non-zero sensitivities to either the - # smallest or largest sensitivity. We can show the privacy loss of the - # mixture is contained between these two mechanisms' privacy losses, and - # the subsampled Gaussian privacy loss is easy to invert. Then, we can - # compute the inverse privacy loss at min_pl and max_pl to get four - # candidate bounds, and take the min/max of these bounds. - loss_bounds = np.array([min_pl, min_pl, max_pl, max_pl]) - sens_bounds = np.array([ - self._min_sens, - self._max_sens, - self._min_sens, - self._max_sens, - ]) - if self.adjacency_type == AdjacencyType.ADD: - pl_without_sampling = _pl_without_sampling_add - else: # Case: self.adjacency_type == AdjacencyType.REMOVE - pl_without_sampling = _pl_without_sampling_remove + right_boundary_mask = np.isclose(privacy_losses, + self._privacy_loss_at_boundaries[1]) + output[right_boundary_mask] = np.inf - possible_bounds = self.inverse_privacy_loss_for_single_gaussian( - pl_without_sampling(loss_bounds, self._sampling_prob), sens_bounds - ) - bounds = ( - np.floor(np.min(possible_bounds) / precision) * precision, - np.ceil(np.max(possible_bounds) / precision) * precision, - ) - if self.adjacency_type == AdjacencyType.ADD: - output = self.privacy_loss_for_single_gaussian( - np.full_like(privacy_losses, np.inf) - ) - else: - output = self.privacy_loss_for_single_gaussian( - np.full_like(privacy_losses, -np.inf) - ) - output[finite_indices] = self._inverse_privacy_losses_with_range( - privacy_losses[finite_indices], bounds, precision + within_interval_mask = np.logical_not( + np.logical_or( + left_boundary_mask, right_boundary_mask)) + max_pl = np.max(privacy_losses[within_interval_mask]) + min_pl = np.min(privacy_losses[within_interval_mask]) + + search_bounds = self._binary_search_bounds(min_pl, max_pl, precision) + output[within_interval_mask] = self._inverse_privacy_losses_with_range( + privacy_losses[within_interval_mask], search_bounds, precision ) return output + def _binary_search_bounds( + self, min_pl: float, max_pl: float, + precision: float = 1e-6,) -> Tuple[float, float]: + """Determines interval s.t. privacy loss contains max_pl and min_pl. + + Since have no additional assumptions about mu and sensitivities, + we choose pessimistic bounds and then refine them by searching + for min_pl and max_pl + (which is preferable to performing binary search with pessimistic bounds + for many privacy losses in inverse_privacy_losses). + + Args: + min_pl: Smallest privacy loss that must be attained within + the search bounds. + max_pl: Largest privacy loss that must be attained within + the search bounds. + precision: Precision of the output. In particular, for each entry l in + privacy_losses, we output the smallest multiple of precision, x, such + that the privacy loss at x is at most l. This ensures (i) given a + monotonic privacy_losses, we return a monotonic list of xs, and (ii) the + approximation results in an overestimate of epsilon, i.e. the final + epsilon reported is valid. + + Returns: + Tuple[float, float]: _description_ + """ + left_bound = max(-1, self._strictly_monotonic_interval[0]) + while (left_bound < 0) and (self.privacy_loss(left_bound) < max_pl): + left_bound *= 2 + if left_bound < self._strictly_monotonic_interval[0]: + left_bound = self._strictly_monotonic_interval[0] + + right_bound = min(1, self._strictly_monotonic_interval[1]) + while (right_bound > 0) and (self.privacy_loss(right_bound) > min_pl): + right_bound *= 2 + if right_bound > self._strictly_monotonic_interval[1]: + right_bound = self._strictly_monotonic_interval[1] + + refined_bounds = tuple(self._inverse_privacy_losses_with_range( + np.array([min_pl, max_pl]), + [left_bound, right_bound], + precision*0.5, # Higher precision to avoid off-by-one rounding errors. + )) + + # Must revert order, because inverse of max_pl is smaller. + return refined_bounds[1], refined_bounds[0] + def _inverse_privacy_losses_with_range( self, privacy_losses: np.ndarray, @@ -2277,6 +2289,184 @@ def _inverse_privacy_losses_with_range( ) return output + @abc.abstractmethod + def noise_cdf(self, x: Union[float, + Iterable[float]]) -> Union[float, np.ndarray]: + """Computes the cumulative density function of base distribution mu. + + Args: + x: the point or points at which the cumulative density function is to be + calculated. + + Returns: + The cumulative density function of the base noise at x, i.e., the + probability that the base noise is less than or equal to x. + """ + raise NotImplementedError + + @abc.abstractmethod + def noise_log_pdf( + self, x: Union[float, Iterable[float]] + ) -> Union[float, np.ndarray]: + """Computes the probability desnsity function of base distribution mu. + + Args: + x: the point or points at which the cumulative density function is to be + calculated. + + Returns: + The cumulative density function of the base noise at x, i.e., the + probability that the base noise is less than or equal to x. + """ + raise NotImplementedError + + @abc.abstractmethod + def noise_ppf(self, p: Union[float, + Iterable[float]]) -> Union[float, np.ndarray]: + """Computes the probability point function of base distribution mu. + + Args: + x: the point or points at which the probability point function, i.e., + the inverse cumulative density function is to be evaluated. + + Returns: + The probability point function of the base noise at p, i.e., an x + such that the interval (-infty, x] has probability p. + """ + raise NotImplementedError + + @abc.abstractmethod + def noise_log_cdf( + self, x: Union[float, Iterable[float]]) -> Union[float, np.ndarray]: + """Computes log of cumulative density function of the base distribution mu. + + Args: + x: the point or points at which the log cumulative density function is to + be calculated. + + Returns: + The log cumulative density function of the base noise at x, i.e., the + log of the probability that the base noise is less than or equal to x. + """ + raise NotImplementedError + + @classmethod + def from_privacy_guarantee( + cls, + privacy_parameters: common.DifferentialPrivacyParameters, + sensitivity: float = 1, + pessimistic_estimate: bool = True, + sampling_prob: float = 1.0, + adjacency_type: AdjacencyType = AdjacencyType.REMOVE + ) -> 'AdditiveNoisePrivacyLoss': + raise NotImplementedError( + 'MixtureGaussianPrivacy loss cannot be uniquely ' + 'instantiated from privacy parameters.' + ) + + +class DoubleMixtureGaussianPrivacyLoss(DoubleMixturePrivacyLoss): + """Privacy loss of a Double Mixture mechanism with Gaussian components. + + This class can be used for tight group privacy accounting of DP-SGD + via Theorem 3.8 of the following paper: + Title: Unified Mechanism-Specific Amplification by Subsampling + and Group Privacy Amplification + Authors: J. Schuchardt, M. Stoian*, A. Kosmala*, S. Guennemann + Link: https://arxiv.org/abs/2403.04867 + + Attributes: + sensitivities_upper: the support of the upper sensitivity distribution. + sensitivities_lower: the support of the lower sensitivity distribution. + sampling_probs_upper: the probabilities associated with sensitivities_upper. + sampling_probs_lower: the probabilities associated with sensitivities_upper. + _strictly_monotonic_interval: interval [a, b] on which privacy loss + is strictly monotic. None if no such interval exists. + _privacy_loss_at_boundaries: privacy losses at [a, b]. + None if _strictly_monotonic_interval is None. + """ + + def __init__( # pylint: disable=super-init-not-called + self, + standard_deviation: float, + sensitivities_upper: Sequence[float], + sensitivities_lower: Sequence[float], + sampling_probs_upper: Sequence[float], + sampling_probs_lower: Sequence[float], + pessimistic_estimate: bool = True, + log_mass_truncation_bound: float = -50, + ) -> None: + """Initializes the privacy loss of a DoubleMixtureGaussianPrivacyLoss. + + Args: + standard_deviation: The standard_deviation of the Gaussian distribution. + sensitivities_upper: The support of the upper sensitivity distribution. + Must be the ame length as sampling_probs_upper, and both should be 1D. + sampling_probs_upper: Probabilities associated with sensitivities_upper. + sensitivities_lower: The support of the lower sensitivity distribution. + Must be the ame length as sampling_probs_lower, and both should be 1D. + sampling_probs_lower: Probabilities associated with sensitivities_lower. + pessimistic_estimate: A value indicating whether the rounding is done in + such a way that the resulting epsilon-hockey stick divergence + computation gives an upper estimate to the real value. + log_mass_truncation_bound: The ln of the probability mass that might be + discarded from the noise distribution. The larger this number, the more + error it may introduce in divergence calculations. + + Raises: + ValueError: If args are invalid, e.g. sensitivities and sampling_probs + are different lengths. + """ + if standard_deviation <= 0: + raise ValueError( + 'Standard deviation is not a positive real number: ' + f'{standard_deviation}' + ) + + super().__init__( + sensitivities_upper, sensitivities_lower, + sampling_probs_upper, sampling_probs_lower, + pessimistic_estimate, log_mass_truncation_bound) + + self._gaussian_random_variable = stats.norm(scale=standard_deviation) + + @property + def _strictly_monotonic_interval(self) -> Optional[Tuple[float, float]]: + """Interval on which privacy loss is strictly decreasing. + + Returns: + Optional[Tuple[float, float]]: Left and right boundary of the interval. + None if no such interval exists or privacy loss is not + (non-strictly) monotonic everywhere. + """ + if (self._max_sens_upper == 0) and (self._max_sens_lower == 0): + # Distributions are identical, i.e., not strictly monotonic anywhere + return None + else: + # Privacy loss is strictly monotonic everywhere + return (-np.inf, np.inf) + + @property + def _privacy_loss_at_boundaries(self) -> Optional[Tuple[float, float]]: + """Privacy loss at left and right boundary of _strictly_monotonic_interval. + + Returns: + Optional[Tuple[float, float]]: Privacy loss l(a) and l(b) + when _strictly_monotonic_interval=[a, b]. + None if _strictly_monotonic_interval=None. + """ + if (self._max_sens_upper == 0) and (self._max_sens_lower == 0): + # Distributions are identical, i.e., not strictly monotonic anywhere + return None + elif (self._max_sens_upper == 0): + # Privacy loss converges for x -> -infty + return (-np.log1p(-self._sampling_prob_lower), -np.inf) + elif (self._max_sens_lower == 0): + # Privacy loss converges for x -> infty + return (np.inf, np.log1p(-self._sampling_prob_upper)) + else: + return (np.inf, -np.inf) + def noise_cdf( self, x: Union[float, Iterable[float]] ) -> Union[float, np.ndarray]: @@ -2292,6 +2482,35 @@ def noise_cdf( """ return self._gaussian_random_variable.cdf(x) + def noise_log_pdf( + self, x: Union[float, Iterable[float]] + ) -> Union[float, np.ndarray]: + """Computes the probability desnsity function of the Gaussian distribution. + + Args: + x: the point or points at which the cumulative density function is to be + calculated. + + Returns: + The cumulative density function of the Gaussian noise at x, i.e., the + probability that the Gaussian noise is less than or equal to x. + """ + return self._gaussian_random_variable.logpdf(x) + + def noise_ppf(self, p: Union[float, + Iterable[float]]) -> Union[float, np.ndarray]: + """Computes the probability point function of the Gaussian distribution. + + Args: + x: the point or points at which the probability point function, i.e., + the inverse cumulative density function is to be evaluated. + + Returns: + The probability point function of the Gaussian noise at p, i.e., an x + such that the interval (-infty, x] has probability p. + """ + raise self._gaussian_random_variable.ppf(p) + def noise_log_cdf( self, x: Union[float, Iterable[float]] ) -> Union[float, np.ndarray]: @@ -2307,16 +2526,436 @@ def noise_log_cdf( """ return self._gaussian_random_variable.logcdf(x) - @classmethod - def from_privacy_guarantee( - cls, - privacy_parameters: common.DifferentialPrivacyParameters, - sensitivity: float = 1, + +class DoubleMixtureLaplacePrivacyLoss(DoubleMixturePrivacyLoss): + """Privacy loss of a Double Mixture mechanism with Laplace components. + + Attributes: + sensitivities_upper: the support of the upper sensitivity distribution. + sensitivities_lower: the support of the lower sensitivity distribution. + sampling_probs_upper: the probabilities associated with sensitivities_upper. + sampling_probs_lower: the probabilities associated with sensitivities_upper. + _strictly_monotonic_interval: interval [a, b] on which privacy loss + is strictly monotic. None if no such interval exists. + _privacy_loss_at_boundaries: privacy losses at [a, b]. + None if _strictly_monotonic_interval is None. + """ + + def __init__( # pylint: disable=super-init-not-called + self, + scale: float, + sensitivities_upper: Sequence[float], + sensitivities_lower: Sequence[float], + sampling_probs_upper: Sequence[float], + sampling_probs_lower: Sequence[float], pessimistic_estimate: bool = True, - sampling_prob: float = 1.0, + log_mass_truncation_bound: float = -50, + ) -> None: + """Initializes the privacy loss of a DoubleMixtureGaussianPrivacyLoss. + + Args: + standard_deviation: The scale of the Laplace distribution. + sensitivities_upper: The support of the upper sensitivity distribution. + Must be the ame length as sampling_probs_upper, and both should be 1D. + sampling_probs_upper: Probabilities associated with sensitivities_upper. + sensitivities_lower: The support of the lower sensitivity distribution. + Must be the ame length as sampling_probs_lower, and both should be 1D. + sampling_probs_lower: Probabilities associated with sensitivities_lower. + pessimistic_estimate: A value indicating whether the rounding is done in + such a way that the resulting epsilon-hockey stick divergence + computation gives an upper estimate to the real value. + log_mass_truncation_bound: The ln of the probability mass that might be + discarded from the noise distribution. The larger this number, the more + error it may introduce in divergence calculations. + + Raises: + ValueError: If args are invalid, e.g. sensitivities and sampling_probs + are different lengths. + """ + if scale <= 0: + raise ValueError( + 'Scale is not a positive real number: ' + f'{scale}' + ) + + super().__init__( + sensitivities_upper, sensitivities_lower, + sampling_probs_upper, sampling_probs_lower, + pessimistic_estimate, log_mass_truncation_bound) + + self._laplace_random_variable = stats.laplace(scale=scale) + + @property + def _strictly_monotonic_interval(self) -> Optional[Tuple[float, float]]: + """Interval on which privacy loss is strictly decreasing. + + Returns: + Optional[Tuple[float, float]]: Left and right boundary of the interval. + None if no such interval exists or privacy loss is not + (non-strictly) monotonic everywhere. + """ + if (self._max_sens_upper == 0) and (self._max_sens_lower == 0): + # Distributions are identical, i.e., not strictly monotonic anywhere + return None + else: + # Outside this interval, both mu_upper and mu_lower + # increase/decrease with the same factor that only depends on x, + # i.e., privacy loss is constant. + return (-self._max_sens_upper, self._max_sens_lower) + + @property + def _privacy_loss_at_boundaries(self) -> Optional[Tuple[float, float]]: + """Privacy loss at left and right boundary of _strictly_monotonic_interval. + + Returns: + Optional[Tuple[float, float]]: Privacy loss l(a) and l(b) + when _strictly_monotonic_interval=[a, b]. + None if _strictly_monotonic_interval=None. + """ + if (self._max_sens_upper == 0) and (self._max_sens_lower == 0): + # Distributions are identical, i.e., not strictly monotonic anywhere + return None + else: + # Boundaries are finite, so we can just call privacy_loss instead of + # having to evaluate asymptotic bounds. + left_boundary, right_boundary = self._strictly_monotonic_interval + return (self.privacy_loss(left_boundary), + self.privacy_loss(right_boundary)) + + def noise_cdf( + self, x: Union[float, Iterable[float]] + ) -> Union[float, np.ndarray]: + """Computes the cumulative density function of the Gaussian distribution. + + Args: + x: the point or points at which the cumulative density function is to be + calculated. + + Returns: + The cumulative density function of the Gaussian noise at x, i.e., the + probability that the Gaussian noise is less than or equal to x. + """ + return self._laplace_random_variable.cdf(x) + + def noise_log_pdf( + self, x: Union[float, Iterable[float]] + ) -> Union[float, np.ndarray]: + """Computes the probability desnsity function of the Gaussian distribution. + + Args: + x: the point or points at which the cumulative density function is to be + calculated. + + Returns: + The cumulative density function of the Gaussian noise at x, i.e., the + probability that the Gaussian noise is less than or equal to x. + """ + return self._laplace_random_variable.logpdf(x) + + def noise_ppf(self, p: Union[float, + Iterable[float]]) -> Union[float, np.ndarray]: + """Computes the probability point function of the Gaussian distribution. + + Args: + x: the point or points at which the probability point function, i.e., + the inverse cumulative density function is to be evaluated. + + Returns: + The probability point function of the Gaussian noise at p, i.e., an x + such that the interval (-infty, x] has probability p. + """ + raise self._laplace_random_variable.ppf(p) + + def noise_log_cdf( + self, x: Union[float, Iterable[float]] + ) -> Union[float, np.ndarray]: + """Computes log of cumulative density function of the Gaussian distribution. + + Args: + x: the point or points at which the log cumulative density function is to + be calculated. + + Returns: + The log cumulative density function of the Gaussian noise at x, i.e., the + log of the probability that the Gaussian noise is less than or equal to x. + """ + return self._laplace_random_variable.logcdf(x) + + +class MixtureGaussianPrivacyLoss(DoubleMixtureGaussianPrivacyLoss): + """Privacy loss of the Mixture of Gaussians mechanism. + + This class gives the privacy loss for a scalar Gaussian mechanism where the + sensitivity is a random variable equal to sensitivities[i] with probability + sampling_probs[i]. + + The privacy loss distribution of the Mixture of Gaussians mechanism is + equivalent to the privacy loss distribution between the Gaussian distribution + and the same distribution convolved with the discrete distribution specified + by sensitivities and sampling_probs. That is, let mu be the Gaussian noise PDF + with sigma = standard_deviation. The privacy loss distribution is generated as + follows: + For ADD adjacency type: + - Let mu_lower(x) := sum over i of sampling_probs[i] * + mu(x - sensitivities[i]) + - Sample x ~ mu_upper = mu and let the privacy loss be + ln(mu_upper(x) / mu_lower(x)). + For REMOVE adjacency type: + - Let mu_upper(x) := sum over i of sampling_probs[i] * + mu(x + sensitivities[i]) + - Sample x ~ mu_lower = mu and let the privacy loss be + ln(mu_upper(x) / mu_lower(x)). + + For example: + sensitivities = [1.0], sampling_probs = [1.0] captures the sensitivity-1 + Gaussian mechanism. + sensitivities = [0.0, 1.0], sampling_probs = [0.9, 0.1] captures the + sensitivity-1 subsampled Gaussian mechanism with sampling probability + 0.1. + sensitivities = [0.0, 1.0, 2.0], sampling_probs = [0.81, 0.18, 0.01] + captures a generalization of the subsampled Gaussian mechanism where + we can potentially sample the sensitive example twice, each time + with sampling probability 0.1. + + This class can be used for vector-valued mechanisms via Corollary 4.7 from the + following paper: + Title: Privacy Amplification for Matrix Mechanisms + Authors: C. A. Choquette-Choo, A. Ganesh, T. Steinke, A. Thakurta + Link: https://arxiv.org/abs/2310.15526 + In particular, this corollary shows the vector-valued mixture of Gaussians + mechanism is dominated by the scalar-valued mechanism given by replacing each + sensitivity vector with its norm. + + For almost all methods (the exception is inverse_privacy_losses, for reasons + discussed in the docstring for that method), we reimplement the corresponding + method in GaussianPrivacyLoss, but with some minor changes to account for the + fact that this class is more general. + + Attributes: + sensitivities: the support of the sensitivity distribution. + sampling_probs: the probabilities associated with the sensitivities. + """ + + def __init__( # pylint: disable=super-init-not-called + self, + standard_deviation: float, + sensitivities: Sequence[float], + sampling_probs: Sequence[float], + pessimistic_estimate: bool = True, + log_mass_truncation_bound: float = -50, adjacency_type: AdjacencyType = AdjacencyType.REMOVE, - ) -> 'MixtureGaussianPrivacyLoss': + ) -> None: + """Initializes the privacy loss of the MoG mechanism. + + Args: + standard_deviation: The standard_deviation of the Gaussian distribution. + sensitivities: The support of the sensitivity distribution. Must be the + same length as sampling_probs, and both should be 1D. + sampling_probs: The probabilities associated with the sensitivities. + pessimistic_estimate: A value indicating whether the rounding is done in + such a way that the resulting epsilon-hockey stick divergence + computation gives an upper estimate to the real value. + log_mass_truncation_bound: The ln of the probability mass that might be + discarded from the noise distribution. The larger this number, the more + error it may introduce in divergence calculations. + adjacency_type: Type of adjacency relation to be used for defining the + privacy loss distribution. + + Raises: + ValueError: If args are invalid, e.g. standard_deviation is negative or + sensitivities and sampling_probs are different lengths. + """ + if standard_deviation <= 0: + raise ValueError( + 'Standard deviation is not a positive real number: ' + f'{standard_deviation}' + ) + + self.adjacency_type = adjacency_type + if self.adjacency_type == AdjacencyType.ADD: + sensitivities_upper = [0.0] + sensitivities_lower = sensitivities + sampling_probs_upper = [1.0] + sampling_probs_lower = sampling_probs + else: + sensitivities_upper = sensitivities + sensitivities_lower = [0.0] + sampling_probs_upper = sampling_probs + sampling_probs_lower = [1.0] + + super().__init__( + standard_deviation, + sensitivities_upper, sensitivities_lower, + sampling_probs_upper, sampling_probs_lower, + pessimistic_estimate, log_mass_truncation_bound + ) + + # Init does some cleanup like removing zero-prob entries, so assign here + if self.adjacency_type == AdjacencyType.ADD: + self.sensitivities = self.sensitivities_lower + self.sampling_probs = self.sampling_probs_lower + # Constant properties + self._log_sampling_probs = self._log_sampling_probs_lower + self._pos_sampling_probs = self._pos_sampling_probs_lower + self._sampling_prob = self._sampling_prob_lower + else: + self.sensitivities = self.sensitivities_upper + self.sampling_probs = self.sampling_probs_upper + # Constant properties + self._log_sampling_probs = self._log_sampling_probs_upper + self._pos_sampling_probs = self._pos_sampling_probs_upper + self._sampling_prob = self._sampling_prob_upper + + # Constant properties that are not present in parent class. + self._standard_deviation = standard_deviation + self._variance = standard_deviation**2 + nonzero_sens = self.sensitivities[self.sensitivities > 0.0] + self._min_sens = np.min(nonzero_sens) + self._max_sens = np.max(nonzero_sens) + + @functools.cached_property + def _precompute_privacy_loss_constants(self) -> np.ndarray: + """(Pre-)computes the constants in the expression for the privacy loss.""" + sens_loss = self.privacy_loss_for_single_gaussian( + np.repeat(0, len(self.sensitivities)), self.sensitivities + ) + if self.adjacency_type == AdjacencyType.ADD: + return self._log_sampling_probs - sens_loss + else: # Case: self.adjacency_type == AdjacencyType.REMOVE + return self._log_sampling_probs + sens_loss + + def privacy_loss(self, x: float) -> float: + """Computes the privacy loss at a given point `x`.""" + # Because this method is called many times, we opt to precompute as much + # as possible. It can be see that the computation below is equivalent to: + # + # x_rep = np.repeat(x, len(self.sensitivities)) + # privacy_losses_without_subsampling = ( + # self.privacy_loss_for_single_gaussian(x_rep, self.sensitivities) + # ) + # if self.adjacency_type == AdjacencyType.ADD: + # summands = self._log_sampling_probs - privacy_losses_without_subsampling + # return -np.logaddexp.reduce(summands) + # + # and similarly for .REMOVE. + x_loss = self.sensitivities * x / (self._variance) + if self.adjacency_type == AdjacencyType.ADD: + summands = self._precompute_privacy_loss_constants + x_loss + return -np.logaddexp.reduce(summands) + else: # Case: self.adjacency_type == AdjacencyType.REMOVE + summands = self._precompute_privacy_loss_constants - x_loss + return np.logaddexp.reduce(summands) + + def privacy_loss_without_subsampling(self, x: float) -> float: raise NotImplementedError( - 'MixtureGaussianPrivacy loss cannot be uniquely ' - 'instantiated from privacy parameters.' + 'MixtureGaussianPrivacyLoss uses multiple sensitivities, so ' + 'privacy loss without subsampling is ill-defined. Use ' + 'privacy_loss_for_single_gaussian instead.' + ) + + def privacy_loss_for_single_gaussian( + self, + x: Union[float, np.ndarray], + sensitivity: Union[float, np.ndarray] = 1.0 + ) -> np.ndarray: + """Computes the privacy loss of the Gaussian mechanism. + + Args: + x: the point(s) at which the privacy loss is computed. + sensitivity: The sensitivity/sensitivities of interest. + + Returns: + The privacy loss of the Gaussian mechanism without sub-sampling at + point(s) x with the given sensitivity, which is given as + For ADD adjacency type: + sensitivity * (0.5 * sensitivity - x) / standard_deviation^2. + For REMOVE adjacency type: + sensitivity * (- 0.5 * sensitivity - x) / standard_deviation^2. + """ + x = np.atleast_1d(x) + if self.adjacency_type == AdjacencyType.ADD: + scale = 0.5 + else: # Case: self.adjacency_type == AdjacencyType.REMOVE + scale = -0.5 + return sensitivity * (scale * sensitivity - x) / (self._variance) + + def inverse_privacy_loss_without_subsampling( + self, privacy_loss: float + ) -> float: + raise NotImplementedError( + 'MixtureGaussianPrivacyLoss uses multiple sensitivities, so ' + 'inverse_privacy_loss_without_subsampling is ill-defined. Use ' + 'inverse_privacy_loss_for_single_gaussian instead.' + ) + + def inverse_privacy_loss_for_single_gaussian( + self, + privacy_loss: Union[float, np.ndarray], + sensitivity: Union[float, np.ndarray] = 1.0, + ) -> Union[float, np.ndarray]: + """Computes the inverse privacy loss for the Gaussian mechanism. + + Args: + privacy_loss: the privacy loss value(s). + sensitivity: sensitivity/sensitivies of the Gaussian. + + Returns: + The largest float(s) x such that the privacy loss at x is at least + privacy_loss. REMOVE is the same as ADD, minus sensitivity. + """ + add_inverse_loss = ( + 0.5 * sensitivity - privacy_loss * (self._variance) / sensitivity + ) + if self.adjacency_type == AdjacencyType.ADD: + return add_inverse_loss + else: # Case: self.adjacency_type == AdjacencyType.REMOVE + return add_inverse_loss - sensitivity + + def _binary_search_bounds( + self, min_pl: float, max_pl: float, + precision: float = 1e-6,) -> Tuple[float, float]: + """Determines interval s.t. privacy loss contains max_pl and min_pl. + + # To do this, we consider the subsampled Gaussian mechanisms given by moving + # all the probability mass on non-zero sensitivities to either the + # smallest or largest sensitivity. We can show the privacy loss of the + # mixture is contained between these two mechanisms' privacy losses, and + # the subsampled Gaussian privacy loss is easy to invert. Then, we can + # compute the inverse privacy loss at min_pl and max_pl to get four + # candidate bounds, and take the min/max of these bounds. + + Args: + min_pl: Smallest privacy loss that must be attained within + the search bounds. + max_pl: Largest privacy loss that must be attained within + the search bounds. + precision: Precision of the output. In particular, for each entry l in + privacy_losses, we output the smallest multiple of precision, x, such + that the privacy loss at x is at most l. This ensures (i) given a + monotonic privacy_losses, we return a monotonic list of xs, and (ii) the + approximation results in an overestimate of epsilon, i.e. the final + epsilon reported is valid. + + Returns: + Tuple[float, float]: _description_ + """ + loss_bounds = np.array([min_pl, min_pl, max_pl, max_pl]) + sens_bounds = np.array([ + self._min_sens, + self._max_sens, + self._min_sens, + self._max_sens, + ]) + if self.adjacency_type == AdjacencyType.ADD: + pl_without_sampling = _pl_without_sampling_add + else: # Case: self.adjacency_type == AdjacencyType.REMOVE + pl_without_sampling = _pl_without_sampling_remove + + possible_bounds = self.inverse_privacy_loss_for_single_gaussian( + pl_without_sampling(loss_bounds, self._sampling_prob), sens_bounds + ) + + return ( + np.floor(np.min(possible_bounds) / precision) * precision, + np.ceil(np.max(possible_bounds) / precision) * precision, )