Skip to content

Commit

Permalink
Generalize (log)likelihood implementation for all models
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelosthege committed Nov 17, 2021
1 parent f0b0220 commit 38738a5
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 106 deletions.
103 changes: 3 additions & 100 deletions calibr8/contrib/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,111 +2,14 @@
This module implements generic, reusable calibration models that can be subclassed to
implement custom calibration models.
"""
import numpy
import scipy
import typing
import warnings

from . import noise
from .. import core
from .. import utils

try:
try:
import pymc3 as pm
except ModuleNotFoundError:
import pymc as pm
except ModuleNotFoundError:
pm = utils.ImportWarner('pymc3')


class BaseModelT(core.CalibrationModel):
def loglikelihood(self, *, y, x, name: str=None, replicate_id: str=None, dependent_key: str=None, theta=None, **dist_kwargs):
""" Loglikelihood of observation (dependent variable) given the independent variable.
If both x and y are 1D-vectors, they must have the same length and the likelihood will be evaluated elementwise.
For a 2-dimensional `x`, the implementation *should* broadcast and return a result that has
the same length as the first dimension of `x`.
Parameters
----------
y : scalar or array-like
observed measurements (dependent variable)
x : scalar, array-like or TensorVariable
assumed independent variable
name : str
Name for the likelihood variable in a PyMC3 model (tensor mode).
Previously this was `f'{replicate_id}.{dependent_key}'`.
replicate_id : optional, str
Deprecated; pass the `name` kwarg instead.
dependent_key : optional, str
Deprecated; pass the `name` kwarg instead.
theta : optional, array-like
Parameters for the calibration model to use instead of `theta_fitted`.
The vector must have the correct length, but can have numeric and or symbolic entries.
Use this kwarg to run MCMC on calibration model parameters.
**dist_kwargs : dict
Additional keyword arguments are forwarded to the `pm.StudentT` distribution.
Most prominent example: `dims`.
Returns
-------
L : float or TensorVariable
sum of log-likelihoods
"""
if theta is None:
if self.theta_fitted is None:
raise Exception('No parameter vector was provided and the model is not fitted with data yet.')
theta = self.theta_fitted

if not isinstance(x, (list, numpy.ndarray, float, int)) and not utils.istensor(x):
raise ValueError(
f'Input x must be a scalar, TensorVariable or an array-like object, but not {type(x)}'
)
if not isinstance(y, (list, numpy.ndarray, float, int)) and not utils.istensor(x):
raise ValueError(
f'Input y must be a scalar or an array-like object, but not {type(y)}'
)

mu, scale, df = self.predict_dependent(x, theta=theta)
if utils.istensor(x) or utils.istensor(theta):
if pm.Model.get_context(error_if_none=False) is not None:
if replicate_id and dependent_key:
warnings.warn(
"The `replicate_id` and `dependent_key` parameters are deprecated. Use `name` instead.",
DeprecationWarning
)
name = f'{replicate_id}.{dependent_key}'
if not name:
raise ValueError("A `name` must be specified for the PyMC likelihood.")
rv = pm.StudentT(
name,
mu=mu,
sigma=scale,
nu=df,
observed=y,
**dist_kwargs or {}
)
else:
rv = pm.StudentT.dist(
mu=mu,
sigma=scale,
nu=df,
**dist_kwargs or {}
)
# The API to get log-likelihood tensors differs between PyMC versions
if pm.__version__[0] == "3":
if isinstance(rv, pm.model.ObservedRV):
return rv.logpt.sum()
elif isinstance(rv, pm.Distribution):
return rv.logp(y).sum()
else:
return pm.logpt(rv, y, sum=True)
else:
# If `x` is given as a column vector, this model can broadcast automatically.
# This gives considerable performance benefits for the `likelihood(..., scan_x=True)`
# case which is relevant for `infer_independent`.
return numpy.sum(scipy.stats.t.logpdf(x=y, loc=mu, scale=scale, df=df), axis=-1)
class BaseModelT(core.CalibrationModel, noise.StudentTNoise):
pass


class BasePolynomialModelT(BaseModelT):
Expand Down
78 changes: 73 additions & 5 deletions calibr8/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import Union

from . import utils
from . utils import pm


__version__ = '6.1.3'
Expand Down Expand Up @@ -558,7 +559,17 @@ def infer_independent(
ci_prob=ci_prob,
)

def loglikelihood(self, *, y, x, theta=None):
def loglikelihood(
self,
*,
y,
x,
name: str=None,
replicate_id: str=None,
dependent_key: str=None,
theta=None,
**dist_kwargs
):
""" Loglikelihood of observation (dependent variable) given the independent variable.
If both x and y are 1D-vectors, they must have the same length and the likelihood will be evaluated elementwise.
Expand All @@ -572,15 +583,72 @@ def loglikelihood(self, *, y, x, theta=None):
observed measurements (dependent variable)
x : scalar, array-like or TensorVariable
assumed independent variable
name : str
Name for the likelihood variable in a PyMC model (tensor mode).
Previously this was `f'{replicate_id}.{dependent_key}'`.
replicate_id : optional, str
Deprecated; pass the `name` kwarg instead.
dependent_key : optional, str
Deprecated; pass the `name` kwarg instead.
theta : optional, array-like
model parameters
Parameters for the calibration model to use instead of `theta_fitted`.
The vector must have the correct length, but can have numeric and or symbolic entries.
Use this kwarg to run MCMC on calibration model parameters.
**dist_kwargs : dict
Additional keyword arguments are forwarded to the PyMC distribution.
Most prominent example: `dims`.
Returns
-------
L : float or TensorVariable
sum of log-likelihoods
"""
raise NotImplementedError('The loglikelihood function should be implemented by the inheriting class.')
if theta is None:
if self.theta_fitted is None:
raise Exception('No parameter vector was provided and the model is not fitted with data yet.')
theta = self.theta_fitted

if not isinstance(x, (list, numpy.ndarray, float, int)) and not utils.istensor(x):
raise ValueError(
f'Input x must be a scalar, TensorVariable or an array-like object, but not {type(x)}'
)
if not isinstance(y, (list, numpy.ndarray, float, int)) and not utils.istensor(x):
raise ValueError(
f'Input y must be a scalar or an array-like object, but not {type(y)}'
)

params = self.predict_dependent(x, theta=theta)
if utils.istensor(x) or utils.istensor(theta):
if pm.Model.get_context(error_if_none=False) is not None:
if replicate_id and dependent_key:
warnings.warn(
"The `replicate_id` and `dependent_key` parameters are deprecated. Use `name` instead.",
DeprecationWarning
)
name = f'{replicate_id}.{dependent_key}'
if not name:
raise ValueError("A `name` must be specified for the PyMC likelihood.")
rv = self.pymc_dist(
name,
**self.to_pymc(*params),
observed=y,
**dist_kwargs or {}
)
else:
rv = self.pymc_dist.dist(
**self.to_pymc(*params),
**dist_kwargs or {}
)
# The API to get log-likelihood tensors differs between PyMC versions
if pm.__version__[0] == "3":
if isinstance(rv, pm.model.ObservedRV):
return rv.logpt.sum()
elif isinstance(rv, pm.Distribution):
return rv.logp(y).sum()
else:
return pm.logpt(rv, y, sum=True)
else:
return self.scipy_dist.logpdf(x=y, **self.to_scipy(*params)).sum(axis=-1)

def likelihood(self, *, y, x, theta=None, scan_x: bool=False):
""" Likelihood of observation (dependent variable) given the independent variable.
Expand All @@ -607,8 +675,8 @@ def likelihood(self, *, y, x, theta=None, scan_x: bool=False):
try:
# Try to pass `x` as a column vector to benefit from broadcasting
# if that's implemented by the underlying model.
result = numpy.exp(self.loglikelihood(y=y, x=x[:, None], theta=theta))
if not result.shape == x.shape:
result = numpy.exp(self.loglikelihood(y=y, x=x[..., None], theta=theta))
if not numpy.shape(result) == numpy.shape(x):
raise ValueError("The underlying model does not seem to implement broadcasting.")
return result
except:
Expand Down
119 changes: 118 additions & 1 deletion calibr8/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ def __init__(self, independent_key=None, dependent_key=None, theta_names=None):
super().__init__(independent_key='I', dependent_key='D', theta_names=theta_names)


class _TestBivariateLinearModel(calibr8.CalibrationModel, calibr8.NormalNoise):
def __init__(self, independent_key: str=None, dependent_key: str=None, theta_names=None):
super().__init__(independent_key="x1,x2", dependent_key="y", theta_names="i,s1,s2,sd".split(","), ndim=2)

def predict_dependent(self, x, *, theta=None):
x = numpy.array(x)
if theta is None:
theta = self.theta_fitted
i, s1, s2, sd = theta
return i + s1 * x[..., 0] + s2 * x[..., 1], sd


class _TestPolynomialModel(calibr8.BasePolynomialModelT):
def __init__(self, independent_key=None, dependent_key=None, theta_names=None, *, scale_degree=0, mu_degree=1):
super().__init__(independent_key='I', dependent_key='D', mu_degree=mu_degree, scale_degree=scale_degree)
Expand Down Expand Up @@ -191,7 +203,7 @@ def test_exceptions(self):
cmodel.predict_dependent(x)
with pytest.raises(NotImplementedError, match="predict_independent function"):
cmodel.predict_independent(x)
with pytest.raises(NotImplementedError, match="loglikelihood function"):
with pytest.raises(NotImplementedError, match="predict_dependent function"):
cmodel.loglikelihood(y=y, x=x, theta=[1, 2, 3])
with pytest.raises(ValueError, match=r"Unexpected `ci_prob`"):
cmodel.loglikelihood = lambda x, y, theta: 1
Expand Down Expand Up @@ -286,6 +298,44 @@ def test_objective(self):
assert -obj_max(theta) == obj_min(theta)
pass

def test_likelihood_multivariate(self):
cm = _TestBivariateLinearModel()
cm.theta_fitted = (0.5, 1, 2, 0.4)

X = [
[1, 2],
[0.5, 1],
[1.5, 2.5],
]
Y = [5.5, 3.0, 7.0]
y_obs = [5.6, 3.0, 6.9]

# Check the expected output for one coordinate
assert cm.predict_dependent(X[0]) == (Y[0], 0.4)

# And its broadcasting for multiple coordinates
numpy.testing.assert_array_equal(cm.predict_dependent(X)[0], Y)

# Expected likelihoods of coordinate/observation paris
LL_elemwise = cm.scipy_dist.logpdf(x=y_obs, loc=Y, scale=0.4).sum()

# Expected likelihoods of all observation at each coordinate
LL_scan = numpy.array([
cm.scipy_dist.logpdf(x=y_obs, loc=y, scale=0.4).sum()
for y in Y
])

assert LL_elemwise.shape == ()
assert LL_scan.shape == (3,)

# Testing the underlying loglikelihood
numpy.testing.assert_array_equal(cm.loglikelihood(y=y_obs, x=X), LL_elemwise)

# Now check if likelihood wraps it correctly
numpy.testing.assert_array_equal(cm.likelihood(y=y_obs, x=X, scan_x=False), numpy.exp(LL_elemwise))
numpy.testing.assert_array_equal(cm.likelihood(y=y_obs, x=X, scan_x=True), numpy.exp(LL_scan))
pass


class TestUnivariateInferenceHelpers:
def test_get_eti(self):
Expand Down Expand Up @@ -1004,6 +1054,73 @@ def test_likelihood(self):
assert x_dense[numpy.argmax(actual)] == 2
pass

@pytest.mark.xfail(reason="Draft test case, see https://github.com/JuBiotech/calibr8/issues/15")
def test_likelihood_nobroadcasting_fallback(self):
class _TestSwitchableBroadcastingModel(calibr8.CalibrationModel, calibr8.NormalNoise):
def __init__(self):
self.x_shapes = []
self.can_broadcast = False
super().__init__(independent_key="I", dependent_key="D", theta_names="i,s,sd".split(","), ndim=1)

def predict_dependent(self, x, *, theta=None):
if theta is None:
theta = self.theta_fitted
i, s, sd = self.theta_fitted
# This part broadcasts just fine
x = numpy.array(x)
mu = i + x * s
return mu, sd

def loglikelihood(self, *, y, x, **kwargs):
# This overrides the native CalibrationModel.loglikelihood with one
# that can be externally set to return non-broadcasted results.
LL_broadcasted = super().loglikelihood(y=y, x=x, **kwargs)
if not self.can_broadcast:
return LL_broadcasted.sum()
return LL_broadcasted

cm = _TestSwitchableBroadcastingModel()
cm.theta_fitted = [0.5, 0.6, 0.7]

X = numpy.array([1, 2, 3])
Y = [1.1, 1.7, 2.3]
y_obs = [1.0, 1.7, 2.4]

# Check the prediction of the test model
for x, y in zip(X, Y):
assert cm.predict_dependent(x) == (y, 0.7)

# The predict_dependent can broadcast
mu, sd = cm.predict_dependent(X)
numpy.testing.assert_array_equal(mu, Y)
assert sd == 0.7

# Test the switching between can_broadcast modes
cm.can_broadcast = True
LL = cm.loglikelihood(x=X[..., None], y=y_obs)
assert numpy.shape(LL) == (3,)

cm.can_broadcast = False
LL = cm.loglikelihood(x=X[..., None], y=y_obs)
assert numpy.shape(LL) == ()

# The CalibrationModel.likelihood should give the same results either way.
cm.can_broadcast = True
L_broadcasted = cm.likelihood(x=X[..., None], y=y_obs, scan_x=True)
cm.can_broadcast = False
L_looped = cm.likelihood(x=X[..., None], y=y_obs, scan_x=True)

numpy.testing.assert_array_equal(L_broadcasted, L_looped)

# Of course the values should also be correct.
L_expected = numpy.exp([
cm.scipy_dist.logpdf(x=y_obs, loc=mui, scale=sd).sum()
for mui in mu
])
assert numpy.shape(L_expected) == (3,)
numpy.testing.assert_array_equal(L_broadcasted, L_expected)
pass


class TestBaseAsymmetricLogisticModelT:
def test_predict_dependent(self):
Expand Down

0 comments on commit 38738a5

Please sign in to comment.