diff --git a/calibr8/contrib/base.py b/calibr8/contrib/base.py index 4f41501..da8518f 100644 --- a/calibr8/contrib/base.py +++ b/calibr8/contrib/base.py @@ -2,111 +2,14 @@ This module implements generic, reusable calibration models that can be subclassed to implement custom calibration models. """ -import numpy -import scipy import typing -import warnings +from . import noise from .. import core -from .. import utils -try: - try: - import pymc3 as pm - except ModuleNotFoundError: - import pymc as pm -except ModuleNotFoundError: - pm = utils.ImportWarner('pymc3') - -class BaseModelT(core.CalibrationModel): - def loglikelihood(self, *, y, x, name: str=None, replicate_id: str=None, dependent_key: str=None, theta=None, **dist_kwargs): - """ Loglikelihood of observation (dependent variable) given the independent variable. - - If both x and y are 1D-vectors, they must have the same length and the likelihood will be evaluated elementwise. - - For a 2-dimensional `x`, the implementation *should* broadcast and return a result that has - the same length as the first dimension of `x`. - - Parameters - ---------- - y : scalar or array-like - observed measurements (dependent variable) - x : scalar, array-like or TensorVariable - assumed independent variable - name : str - Name for the likelihood variable in a PyMC3 model (tensor mode). - Previously this was `f'{replicate_id}.{dependent_key}'`. - replicate_id : optional, str - Deprecated; pass the `name` kwarg instead. - dependent_key : optional, str - Deprecated; pass the `name` kwarg instead. - theta : optional, array-like - Parameters for the calibration model to use instead of `theta_fitted`. - The vector must have the correct length, but can have numeric and or symbolic entries. - Use this kwarg to run MCMC on calibration model parameters. - **dist_kwargs : dict - Additional keyword arguments are forwarded to the `pm.StudentT` distribution. - Most prominent example: `dims`. - - Returns - ------- - L : float or TensorVariable - sum of log-likelihoods - """ - if theta is None: - if self.theta_fitted is None: - raise Exception('No parameter vector was provided and the model is not fitted with data yet.') - theta = self.theta_fitted - - if not isinstance(x, (list, numpy.ndarray, float, int)) and not utils.istensor(x): - raise ValueError( - f'Input x must be a scalar, TensorVariable or an array-like object, but not {type(x)}' - ) - if not isinstance(y, (list, numpy.ndarray, float, int)) and not utils.istensor(x): - raise ValueError( - f'Input y must be a scalar or an array-like object, but not {type(y)}' - ) - - mu, scale, df = self.predict_dependent(x, theta=theta) - if utils.istensor(x) or utils.istensor(theta): - if pm.Model.get_context(error_if_none=False) is not None: - if replicate_id and dependent_key: - warnings.warn( - "The `replicate_id` and `dependent_key` parameters are deprecated. Use `name` instead.", - DeprecationWarning - ) - name = f'{replicate_id}.{dependent_key}' - if not name: - raise ValueError("A `name` must be specified for the PyMC likelihood.") - rv = pm.StudentT( - name, - mu=mu, - sigma=scale, - nu=df, - observed=y, - **dist_kwargs or {} - ) - else: - rv = pm.StudentT.dist( - mu=mu, - sigma=scale, - nu=df, - **dist_kwargs or {} - ) - # The API to get log-likelihood tensors differs between PyMC versions - if pm.__version__[0] == "3": - if isinstance(rv, pm.model.ObservedRV): - return rv.logpt.sum() - elif isinstance(rv, pm.Distribution): - return rv.logp(y).sum() - else: - return pm.logpt(rv, y, sum=True) - else: - # If `x` is given as a column vector, this model can broadcast automatically. - # This gives considerable performance benefits for the `likelihood(..., scan_x=True)` - # case which is relevant for `infer_independent`. - return numpy.sum(scipy.stats.t.logpdf(x=y, loc=mu, scale=scale, df=df), axis=-1) +class BaseModelT(core.CalibrationModel, noise.StudentTNoise): + pass class BasePolynomialModelT(BaseModelT): diff --git a/calibr8/core.py b/calibr8/core.py index 9c9102d..f9707a1 100644 --- a/calibr8/core.py +++ b/calibr8/core.py @@ -16,6 +16,7 @@ from typing import Union from . import utils +from . utils import pm __version__ = '6.1.3' @@ -558,7 +559,17 @@ def infer_independent( ci_prob=ci_prob, ) - def loglikelihood(self, *, y, x, theta=None): + def loglikelihood( + self, + *, + y, + x, + name: str=None, + replicate_id: str=None, + dependent_key: str=None, + theta=None, + **dist_kwargs + ): """ Loglikelihood of observation (dependent variable) given the independent variable. If both x and y are 1D-vectors, they must have the same length and the likelihood will be evaluated elementwise. @@ -572,15 +583,72 @@ def loglikelihood(self, *, y, x, theta=None): observed measurements (dependent variable) x : scalar, array-like or TensorVariable assumed independent variable + name : str + Name for the likelihood variable in a PyMC model (tensor mode). + Previously this was `f'{replicate_id}.{dependent_key}'`. + replicate_id : optional, str + Deprecated; pass the `name` kwarg instead. + dependent_key : optional, str + Deprecated; pass the `name` kwarg instead. theta : optional, array-like - model parameters + Parameters for the calibration model to use instead of `theta_fitted`. + The vector must have the correct length, but can have numeric and or symbolic entries. + Use this kwarg to run MCMC on calibration model parameters. + **dist_kwargs : dict + Additional keyword arguments are forwarded to the PyMC distribution. + Most prominent example: `dims`. Returns ------- L : float or TensorVariable sum of log-likelihoods """ - raise NotImplementedError('The loglikelihood function should be implemented by the inheriting class.') + if theta is None: + if self.theta_fitted is None: + raise Exception('No parameter vector was provided and the model is not fitted with data yet.') + theta = self.theta_fitted + + if not isinstance(x, (list, numpy.ndarray, float, int)) and not utils.istensor(x): + raise ValueError( + f'Input x must be a scalar, TensorVariable or an array-like object, but not {type(x)}' + ) + if not isinstance(y, (list, numpy.ndarray, float, int)) and not utils.istensor(x): + raise ValueError( + f'Input y must be a scalar or an array-like object, but not {type(y)}' + ) + + params = self.predict_dependent(x, theta=theta) + if utils.istensor(x) or utils.istensor(theta): + if pm.Model.get_context(error_if_none=False) is not None: + if replicate_id and dependent_key: + warnings.warn( + "The `replicate_id` and `dependent_key` parameters are deprecated. Use `name` instead.", + DeprecationWarning + ) + name = f'{replicate_id}.{dependent_key}' + if not name: + raise ValueError("A `name` must be specified for the PyMC likelihood.") + rv = self.pymc_dist( + name, + **self.to_pymc(*params), + observed=y, + **dist_kwargs or {} + ) + else: + rv = self.pymc_dist.dist( + **self.to_pymc(*params), + **dist_kwargs or {} + ) + # The API to get log-likelihood tensors differs between PyMC versions + if pm.__version__[0] == "3": + if isinstance(rv, pm.model.ObservedRV): + return rv.logpt.sum() + elif isinstance(rv, pm.Distribution): + return rv.logp(y).sum() + else: + return pm.logpt(rv, y, sum=True) + else: + return self.scipy_dist.logpdf(x=y, **self.to_scipy(*params)).sum(axis=-1) def likelihood(self, *, y, x, theta=None, scan_x: bool=False): """ Likelihood of observation (dependent variable) given the independent variable. @@ -607,8 +675,8 @@ def likelihood(self, *, y, x, theta=None, scan_x: bool=False): try: # Try to pass `x` as a column vector to benefit from broadcasting # if that's implemented by the underlying model. - result = numpy.exp(self.loglikelihood(y=y, x=x[:, None], theta=theta)) - if not result.shape == x.shape: + result = numpy.exp(self.loglikelihood(y=y, x=x[..., None], theta=theta)) + if not numpy.shape(result) == numpy.shape(x): raise ValueError("The underlying model does not seem to implement broadcasting.") return result except: diff --git a/calibr8/tests.py b/calibr8/tests.py index bffe133..f500522 100644 --- a/calibr8/tests.py +++ b/calibr8/tests.py @@ -44,6 +44,18 @@ def __init__(self, independent_key=None, dependent_key=None, theta_names=None): super().__init__(independent_key='I', dependent_key='D', theta_names=theta_names) +class _TestBivariateLinearModel(calibr8.CalibrationModel, calibr8.NormalNoise): + def __init__(self, independent_key: str=None, dependent_key: str=None, theta_names=None): + super().__init__(independent_key="x1,x2", dependent_key="y", theta_names="i,s1,s2,sd".split(","), ndim=2) + + def predict_dependent(self, x, *, theta=None): + x = numpy.array(x) + if theta is None: + theta = self.theta_fitted + i, s1, s2, sd = theta + return i + s1 * x[..., 0] + s2 * x[..., 1], sd + + class _TestPolynomialModel(calibr8.BasePolynomialModelT): def __init__(self, independent_key=None, dependent_key=None, theta_names=None, *, scale_degree=0, mu_degree=1): super().__init__(independent_key='I', dependent_key='D', mu_degree=mu_degree, scale_degree=scale_degree) @@ -191,7 +203,7 @@ def test_exceptions(self): cmodel.predict_dependent(x) with pytest.raises(NotImplementedError, match="predict_independent function"): cmodel.predict_independent(x) - with pytest.raises(NotImplementedError, match="loglikelihood function"): + with pytest.raises(NotImplementedError, match="predict_dependent function"): cmodel.loglikelihood(y=y, x=x, theta=[1, 2, 3]) with pytest.raises(ValueError, match=r"Unexpected `ci_prob`"): cmodel.loglikelihood = lambda x, y, theta: 1 @@ -286,6 +298,44 @@ def test_objective(self): assert -obj_max(theta) == obj_min(theta) pass + def test_likelihood_multivariate(self): + cm = _TestBivariateLinearModel() + cm.theta_fitted = (0.5, 1, 2, 0.4) + + X = [ + [1, 2], + [0.5, 1], + [1.5, 2.5], + ] + Y = [5.5, 3.0, 7.0] + y_obs = [5.6, 3.0, 6.9] + + # Check the expected output for one coordinate + assert cm.predict_dependent(X[0]) == (Y[0], 0.4) + + # And its broadcasting for multiple coordinates + numpy.testing.assert_array_equal(cm.predict_dependent(X)[0], Y) + + # Expected likelihoods of coordinate/observation paris + LL_elemwise = cm.scipy_dist.logpdf(x=y_obs, loc=Y, scale=0.4).sum() + + # Expected likelihoods of all observation at each coordinate + LL_scan = numpy.array([ + cm.scipy_dist.logpdf(x=y_obs, loc=y, scale=0.4).sum() + for y in Y + ]) + + assert LL_elemwise.shape == () + assert LL_scan.shape == (3,) + + # Testing the underlying loglikelihood + numpy.testing.assert_array_equal(cm.loglikelihood(y=y_obs, x=X), LL_elemwise) + + # Now check if likelihood wraps it correctly + numpy.testing.assert_array_equal(cm.likelihood(y=y_obs, x=X, scan_x=False), numpy.exp(LL_elemwise)) + numpy.testing.assert_array_equal(cm.likelihood(y=y_obs, x=X, scan_x=True), numpy.exp(LL_scan)) + pass + class TestUnivariateInferenceHelpers: def test_get_eti(self): @@ -1004,6 +1054,73 @@ def test_likelihood(self): assert x_dense[numpy.argmax(actual)] == 2 pass + @pytest.mark.xfail(reason="Draft test case, see https://github.com/JuBiotech/calibr8/issues/15") + def test_likelihood_nobroadcasting_fallback(self): + class _TestSwitchableBroadcastingModel(calibr8.CalibrationModel, calibr8.NormalNoise): + def __init__(self): + self.x_shapes = [] + self.can_broadcast = False + super().__init__(independent_key="I", dependent_key="D", theta_names="i,s,sd".split(","), ndim=1) + + def predict_dependent(self, x, *, theta=None): + if theta is None: + theta = self.theta_fitted + i, s, sd = self.theta_fitted + # This part broadcasts just fine + x = numpy.array(x) + mu = i + x * s + return mu, sd + + def loglikelihood(self, *, y, x, **kwargs): + # This overrides the native CalibrationModel.loglikelihood with one + # that can be externally set to return non-broadcasted results. + LL_broadcasted = super().loglikelihood(y=y, x=x, **kwargs) + if not self.can_broadcast: + return LL_broadcasted.sum() + return LL_broadcasted + + cm = _TestSwitchableBroadcastingModel() + cm.theta_fitted = [0.5, 0.6, 0.7] + + X = numpy.array([1, 2, 3]) + Y = [1.1, 1.7, 2.3] + y_obs = [1.0, 1.7, 2.4] + + # Check the prediction of the test model + for x, y in zip(X, Y): + assert cm.predict_dependent(x) == (y, 0.7) + + # The predict_dependent can broadcast + mu, sd = cm.predict_dependent(X) + numpy.testing.assert_array_equal(mu, Y) + assert sd == 0.7 + + # Test the switching between can_broadcast modes + cm.can_broadcast = True + LL = cm.loglikelihood(x=X[..., None], y=y_obs) + assert numpy.shape(LL) == (3,) + + cm.can_broadcast = False + LL = cm.loglikelihood(x=X[..., None], y=y_obs) + assert numpy.shape(LL) == () + + # The CalibrationModel.likelihood should give the same results either way. + cm.can_broadcast = True + L_broadcasted = cm.likelihood(x=X[..., None], y=y_obs, scan_x=True) + cm.can_broadcast = False + L_looped = cm.likelihood(x=X[..., None], y=y_obs, scan_x=True) + + numpy.testing.assert_array_equal(L_broadcasted, L_looped) + + # Of course the values should also be correct. + L_expected = numpy.exp([ + cm.scipy_dist.logpdf(x=y_obs, loc=mui, scale=sd).sum() + for mui in mu + ]) + assert numpy.shape(L_expected) == (3,) + numpy.testing.assert_array_equal(L_broadcasted, L_expected) + pass + class TestBaseAsymmetricLogisticModelT: def test_predict_dependent(self):