Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize noise models #16

Merged
merged 3 commits into from
Nov 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions calibr8/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@

from .contrib.noise import (
NormalNoise,
LaplaceNoise,
LogNormalNoise,
StudentTNoise,
)
from .contrib.base import (
BaseAsymmetricLogisticT,
BaseLogIndependentAsymmetricLogisticT,
Expand All @@ -6,6 +13,7 @@
)
from .core import (
CalibrationModel,
DistributionMixin,
InferenceResult,
UnivariateInferenceResult,
NumericPosterior,
Expand Down
103 changes: 3 additions & 100 deletions calibr8/contrib/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,111 +2,14 @@
This module implements generic, reusable calibration models that can be subclassed to
implement custom calibration models.
"""
import numpy
import scipy
import typing
import warnings

from . import noise
from .. import core
from .. import utils

try:
try:
import pymc3 as pm
except ModuleNotFoundError:
import pymc as pm
except ModuleNotFoundError:
pm = utils.ImportWarner('pymc3')


class BaseModelT(core.CalibrationModel):
def loglikelihood(self, *, y, x, name: str=None, replicate_id: str=None, dependent_key: str=None, theta=None, **dist_kwargs):
""" Loglikelihood of observation (dependent variable) given the independent variable.

If both x and y are 1D-vectors, they must have the same length and the likelihood will be evaluated elementwise.

For a 2-dimensional `x`, the implementation *should* broadcast and return a result that has
the same length as the first dimension of `x`.

Parameters
----------
y : scalar or array-like
observed measurements (dependent variable)
x : scalar, array-like or TensorVariable
assumed independent variable
name : str
Name for the likelihood variable in a PyMC3 model (tensor mode).
Previously this was `f'{replicate_id}.{dependent_key}'`.
replicate_id : optional, str
Deprecated; pass the `name` kwarg instead.
dependent_key : optional, str
Deprecated; pass the `name` kwarg instead.
theta : optional, array-like
Parameters for the calibration model to use instead of `theta_fitted`.
The vector must have the correct length, but can have numeric and or symbolic entries.
Use this kwarg to run MCMC on calibration model parameters.
**dist_kwargs : dict
Additional keyword arguments are forwarded to the `pm.StudentT` distribution.
Most prominent example: `dims`.

Returns
-------
L : float or TensorVariable
sum of log-likelihoods
"""
if theta is None:
if self.theta_fitted is None:
raise Exception('No parameter vector was provided and the model is not fitted with data yet.')
theta = self.theta_fitted

if not isinstance(x, (list, numpy.ndarray, float, int)) and not utils.istensor(x):
raise ValueError(
f'Input x must be a scalar, TensorVariable or an array-like object, but not {type(x)}'
)
if not isinstance(y, (list, numpy.ndarray, float, int)) and not utils.istensor(x):
raise ValueError(
f'Input y must be a scalar or an array-like object, but not {type(y)}'
)

mu, scale, df = self.predict_dependent(x, theta=theta)
if utils.istensor(x) or utils.istensor(theta):
if pm.Model.get_context(error_if_none=False) is not None:
if replicate_id and dependent_key:
warnings.warn(
"The `replicate_id` and `dependent_key` parameters are deprecated. Use `name` instead.",
DeprecationWarning
)
name = f'{replicate_id}.{dependent_key}'
if not name:
raise ValueError("A `name` must be specified for the PyMC likelihood.")
rv = pm.StudentT(
name,
mu=mu,
sigma=scale,
nu=df,
observed=y,
**dist_kwargs or {}
)
else:
rv = pm.StudentT.dist(
mu=mu,
sigma=scale,
nu=df,
**dist_kwargs or {}
)
# The API to get log-likelihood tensors differs between PyMC versions
if pm.__version__[0] == "3":
if isinstance(rv, pm.model.ObservedRV):
return rv.logpt.sum()
elif isinstance(rv, pm.Distribution):
return rv.logp(y).sum()
else:
return pm.logpt(rv, y, sum=True)
else:
# If `x` is given as a column vector, this model can broadcast automatically.
# This gives considerable performance benefits for the `likelihood(..., scan_x=True)`
# case which is relevant for `infer_independent`.
return numpy.sum(scipy.stats.t.logpdf(x=y, loc=mu, scale=scale, df=df), axis=-1)
class BaseModelT(core.CalibrationModel, noise.StudentTNoise):
pass


class BasePolynomialModelT(BaseModelT):
Expand Down
64 changes: 64 additions & 0 deletions calibr8/contrib/noise.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import numpy
import scipy.stats

from .. core import DistributionMixin
from .. utils import HAS_PYMC, pm


class NormalNoise(DistributionMixin):
"""Normal noise, predicted in terms of mean and standard deviation."""
scipy_dist = scipy.stats.norm
pymc_dist = pm.Normal if HAS_PYMC else None

@staticmethod
def to_scipy(*params):
return dict(loc=params[0], scale=params[1])

@staticmethod
def to_pymc(*params):
return dict(mu=params[0], sigma=params[1])


class LaplaceNoise(DistributionMixin):
"""Normal noise, predicted in terms of mean and scale."""
scipy_dist = scipy.stats.laplace
pymc_dist = pm.Laplace if HAS_PYMC else None

@staticmethod
def to_scipy(*params):
return dict(loc=params[0], scale=params[1])

@staticmethod
def to_pymc(*params):
return dict(mu=params[0], b=params[1])


class LogNormalNoise(DistributionMixin):
"""Log-Normal noise, predicted in logarithmic mean and standard deviation.
⚠ This corresponds to the NumPy/Aesara/PyMC parametrization!
"""
scipy_dist = scipy.stats.lognorm
pymc_dist = pm.Lognormal if HAS_PYMC else None

@staticmethod
def to_scipy(*params):
# SciPy wants linear scale mean and log scale standard deviation!
return dict(scale=numpy.exp(params[0]), s=params[1])

@staticmethod
def to_pymc(*params):
return dict(mu=params[0], sigma=params[1])


class StudentTNoise(DistributionMixin):
"""Student-t noise, predicted in terms of mean, scale and degree of freedom."""
scipy_dist = scipy.stats.t
pymc_dist = pm.StudentT if HAS_PYMC else None

@staticmethod
def to_scipy(*params):
return dict(loc=params[0], scale=params[1], df=params[2])
lhelleckes marked this conversation as resolved.
Show resolved Hide resolved

@staticmethod
def to_pymc(*params):
return dict(mu=params[0], sigma=params[1], nu=params[2])
115 changes: 109 additions & 6 deletions calibr8/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import Union

from . import utils
from . utils import pm


__version__ = '6.1.3'
Expand Down Expand Up @@ -369,7 +370,34 @@ def _infer_univariate_independent(
)


class CalibrationModel:
class DistributionMixin:
"""Maps the values returned by `CalibrationModel.predict_dependent`
to a SciPy distribution and its parameters, and optionally also to
a PyMC distribution and its parameters.
"""
scipy_dist = None
pymc_dist = None

def to_scipy(*args) -> dict:
raise NotImplementedError("This model does not implement a mapping to SciPy distribution parameters.")

def to_pymc(*args) -> dict:
raise NotImplementedError("This model does not implement a mapping to PyMC distribution parameters.")


def _inherits_noisemodel(cls):
"""Determines if cls is a sub-type of DistributionMixin that's not DistributionMixin or a calibration model."""
for m in cls.__mro__:
if (
issubclass(m, DistributionMixin)
and m is not DistributionMixin
and not issubclass(m, CalibrationModel)
):
return True
return False


class CalibrationModel(DistributionMixin):
"""A parent class providing the general structure of a calibration model."""

def __init__(self, independent_key:str, dependent_key:str, *, theta_names:typing.Tuple[str], ndim=1):
Expand All @@ -388,6 +416,14 @@ def __init__(self, independent_key:str, dependent_key:str, *, theta_names:typing
Most calibrations are univariate (ndim=1).
Multivariate calibrations (ndim > 1) may need to override the `infer_independent` implementation.
"""
if not _inherits_noisemodel(type(self)):
warnings.warn(
lhelleckes marked this conversation as resolved.
Show resolved Hide resolved
"This model does not implement a noise model yet."
"\nAdd a noise model mixin to your class definition. For example:"
"\n`class MyModel(CalibrationModel, LaplaceNoise)`",
DeprecationWarning,
stacklevel=2
)
# make sure that the inheriting type has no required constructor (kw)args
args, varargs, varkw, defaults, kwonlyargs, kwonlydefaults, annotations = inspect.getfullargspec(type(self).__init__)
n_defaults = 0 if not defaults else len(defaults)
Expand Down Expand Up @@ -523,7 +559,17 @@ def infer_independent(
ci_prob=ci_prob,
)

def loglikelihood(self, *, y, x, theta=None):
def loglikelihood(
self,
*,
y,
x,
name: str=None,
replicate_id: str=None,
dependent_key: str=None,
theta=None,
**dist_kwargs
):
""" Loglikelihood of observation (dependent variable) given the independent variable.

If both x and y are 1D-vectors, they must have the same length and the likelihood will be evaluated elementwise.
Expand All @@ -537,15 +583,72 @@ def loglikelihood(self, *, y, x, theta=None):
observed measurements (dependent variable)
x : scalar, array-like or TensorVariable
assumed independent variable
name : str
Name for the likelihood variable in a PyMC model (tensor mode).
Previously this was `f'{replicate_id}.{dependent_key}'`.
lhelleckes marked this conversation as resolved.
Show resolved Hide resolved
replicate_id : optional, str
Deprecated; pass the `name` kwarg instead.
dependent_key : optional, str
Deprecated; pass the `name` kwarg instead.
theta : optional, array-like
model parameters
Parameters for the calibration model to use instead of `theta_fitted`.
The vector must have the correct length, but can have numeric and or symbolic entries.
Use this kwarg to run MCMC on calibration model parameters.
**dist_kwargs : dict
Additional keyword arguments are forwarded to the PyMC distribution.
Most prominent example: `dims`.

Returns
-------
L : float or TensorVariable
sum of log-likelihoods
"""
raise NotImplementedError('The loglikelihood function should be implemented by the inheriting class.')
if theta is None:
if self.theta_fitted is None:
raise Exception('No parameter vector was provided and the model is not fitted with data yet.')
theta = self.theta_fitted

if not isinstance(x, (list, numpy.ndarray, float, int)) and not utils.istensor(x):
raise ValueError(
f'Input x must be a scalar, TensorVariable or an array-like object, but not {type(x)}'
)
if not isinstance(y, (list, numpy.ndarray, float, int)) and not utils.istensor(x):
raise ValueError(
f'Input y must be a scalar or an array-like object, but not {type(y)}'
)

params = self.predict_dependent(x, theta=theta)
if utils.istensor(x) or utils.istensor(theta):
if pm.Model.get_context(error_if_none=False) is not None:
if replicate_id and dependent_key:
warnings.warn(
"The `replicate_id` and `dependent_key` parameters are deprecated. Use `name` instead.",
DeprecationWarning
)
name = f'{replicate_id}.{dependent_key}'
if not name:
raise ValueError("A `name` must be specified for the PyMC likelihood.")
rv = self.pymc_dist(
name,
**self.to_pymc(*params),
observed=y,
**dist_kwargs or {}
)
else:
rv = self.pymc_dist.dist(
**self.to_pymc(*params),
**dist_kwargs or {}
)
# The API to get log-likelihood tensors differs between PyMC versions
if pm.__version__[0] == "3":
if isinstance(rv, pm.model.ObservedRV):
return rv.logpt.sum()
elif isinstance(rv, pm.Distribution):
return rv.logp(y).sum()
else:
return pm.logpt(rv, y, sum=True)
else:
return self.scipy_dist.logpdf(x=y, **self.to_scipy(*params)).sum(axis=-1)

def likelihood(self, *, y, x, theta=None, scan_x: bool=False):
""" Likelihood of observation (dependent variable) given the independent variable.
Expand All @@ -572,8 +675,8 @@ def likelihood(self, *, y, x, theta=None, scan_x: bool=False):
try:
# Try to pass `x` as a column vector to benefit from broadcasting
# if that's implemented by the underlying model.
result = numpy.exp(self.loglikelihood(y=y, x=x[:, None], theta=theta))
if not result.shape == x.shape:
result = numpy.exp(self.loglikelihood(y=y, x=x[..., None], theta=theta))
if not numpy.shape(result) == numpy.shape(x):
raise ValueError("The underlying model does not seem to implement broadcasting.")
return result
except:
Expand Down
Loading