Skip to content

Commit

Permalink
Merge pull request NHERI-SimCenter#82 from ioannis-vm/2024_11_UQ_rv_p…
Browse files Browse the repository at this point in the history
…arams

Remove `_raw_sample` from `Empirical` RVs.
  • Loading branch information
zsarnoczay authored Dec 13, 2024
2 parents 1b491c1 + 1c8b297 commit ac03c43
Showing 1 changed file with 23 additions and 39 deletions.
62 changes: 23 additions & 39 deletions pelicun/uq.py
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ def _get_std_samples(
uni_sample = norm.cdf(samples_i, loc=theta_i[0], scale=theta_i[1])

# replace 0 and 1 values with the nearest float
uni_sample[uni_sample == 0] = np.nextafter(0, 1)
uni_sample[uni_sample == 0] = FIRST_POSITIVE_NUMBER
uni_sample[uni_sample == 1] = np.nextafter(1, -1)

# consider truncation if needed
Expand Down Expand Up @@ -660,7 +660,7 @@ def _neg_log_likelihood( # noqa: C901
return 1e10

# make sure that the likelihood of censoring a sample is positive
cen_likelihood = max(1.0 - det_alpha, np.nextafter(0, 1))
cen_likelihood = max(1.0 - det_alpha, FIRST_POSITIVE_NUMBER)

else:
# If the data is not censored, use 1.0 for cen_likelihood to get a
Expand All @@ -679,7 +679,7 @@ def _neg_log_likelihood( # noqa: C901
# Zeros are a result of limited floating point precision. Replace them
# with the smallest possible positive floating point number to
# improve convergence.
likelihoods = np.clip(likelihoods, a_min=np.nextafter(0, 1), a_max=None)
likelihoods = np.clip(likelihoods, a_min=FIRST_POSITIVE_NUMBER, a_max=None)

# calculate the total negative log likelihood
negative_log_likelihood = -(
Expand Down Expand Up @@ -819,7 +819,9 @@ def fit_distribution_to_sample( # noqa: C901

# replace zero standard dev with negligible standard dev
sig_zero_id = np.where(sig_init == 0.0)[0]
sig_init[sig_zero_id] = 1e-6 * np.abs(mu_init[sig_zero_id]) + np.nextafter(0, 1)
sig_init[sig_zero_id] = (
1e-6 * np.abs(mu_init[sig_zero_id]) + FIRST_POSITIVE_NUMBER
)

# prepare a vector of initial values
# Note: The actual optimization uses zeros as initial parameters to
Expand Down Expand Up @@ -1244,7 +1246,7 @@ class RandomVariable(BaseRandomVariable):
def __init__(
self,
name: str,
theta: np.ndarray | None,
theta: np.ndarray,
truncation_limits: np.ndarray | None = None,
f_map: Callable | None = None,
anchor: BaseRandomVariable | None = None,
Expand Down Expand Up @@ -1390,12 +1392,10 @@ def _prepare_theta_and_truncation_limit_arrays(
of `values`.
"""
theta = self.theta
assert theta is not None
truncation_limits = self.truncation_limits
assert truncation_limits is not None
if self.constant_parameters():
theta = np.atleast_2d(theta)
assert theta is not None
elif len(values) != theta.shape[0]:
msg = (
'Number of elements in `values` variable should '
Expand Down Expand Up @@ -1547,7 +1547,6 @@ def __init__(
f_map=f_map,
anchor=anchor,
)
assert self.theta is not None, '`theta` is required for Normal RVs'
self.distribution = 'normal'

def cdf(self, values: np.ndarray) -> np.ndarray:
Expand Down Expand Up @@ -1724,7 +1723,6 @@ def __init__(
f_map=f_map,
anchor=anchor,
)
assert self.theta is not None, '`theta` is required for LogNormal RVs'
self.distribution = 'lognormal'

def cdf(self, values: np.ndarray) -> np.ndarray:
Expand All @@ -1751,7 +1749,7 @@ def cdf(self, values: np.ndarray) -> np.ndarray:
a, b = truncation_limits.T

# Replace NaN values
a = np.nan_to_num(a, nan=np.nextafter(0, 1))
a = np.nan_to_num(a, nan=FIRST_POSITIVE_NUMBER)
b = np.nan_to_num(b, nan=np.inf)

p_a, p_b = (
Expand All @@ -1769,7 +1767,7 @@ def cdf(self, values: np.ndarray) -> np.ndarray:
result = (p_vals - p_a) / (p_b - p_a)

else:
values = np.maximum(values, np.nextafter(0, 1))
values = np.maximum(values, FIRST_POSITIVE_NUMBER)

result = norm.cdf(np.log(values), loc=np.log(theta), scale=beta)

Expand Down Expand Up @@ -1802,8 +1800,8 @@ def inverse_transform(self, values: np.ndarray) -> np.ndarray:
a, b = truncation_limits.T

# Replace NaN values
a = np.nan_to_num(a, nan=np.nextafter(0, 1))
a[a <= 0] = np.nextafter(0, 1)
a = np.nan_to_num(a, nan=FIRST_POSITIVE_NUMBER)
a[a <= 0] = FIRST_POSITIVE_NUMBER
b = np.nan_to_num(b, nan=np.inf)

p_a, p_b = (
Expand Down Expand Up @@ -1852,7 +1850,6 @@ def __init__(
f_map=f_map,
anchor=anchor,
)
assert self.theta is not None, '`theta` is required for Uniform RVs'
self.distribution = 'uniform'

if self.theta.ndim != 1:
Expand All @@ -1877,7 +1874,6 @@ def cdf(self, values: np.ndarray) -> np.ndarray:
1D float ndarray containing CDF values
"""
assert self.theta is not None
a, b = self.theta[:2]

if np.isnan(a):
Expand Down Expand Up @@ -1908,7 +1904,6 @@ def inverse_transform(self, values: np.ndarray) -> np.ndarray:
Inverse CDF values
"""
assert self.theta is not None
a, b = self.theta[:2]

if np.isnan(a):
Expand Down Expand Up @@ -1953,7 +1948,6 @@ def __init__(
f_map=f_map,
anchor=anchor,
)
assert self.theta is not None, '`theta` is required for Weibull RVs'
self.distribution = 'weibull'

if self.theta.ndim != 1:
Expand All @@ -1978,7 +1972,6 @@ def cdf(self, values: np.ndarray) -> np.ndarray:
1D float ndarray containing CDF values
"""
assert self.theta is not None
lambda_, kappa = self.theta[:2]

if np.any(~np.isnan(self.truncation_limits)):
Expand Down Expand Up @@ -2029,7 +2022,6 @@ def inverse_transform(self, values: np.ndarray) -> np.ndarray:
Inverse CDF values
"""
assert self.theta is not None
lambda_, kappa = self.theta[:2]

if np.any(~np.isnan(self.truncation_limits)):
Expand Down Expand Up @@ -2096,7 +2088,6 @@ def __init__(
f_map=f_map,
anchor=anchor,
)
assert self.theta is not None, '`theta` is required for MultilinearCDF RVs'
self.distribution = 'multilinear_CDF'

if not np.all(np.isnan(truncation_limits)):
Expand Down Expand Up @@ -2159,7 +2150,6 @@ def cdf(self, values: np.ndarray) -> np.ndarray:
1D float ndarray containing CDF values
"""
assert self.theta is not None
x_i = [-np.inf] + [x[0] for x in self.theta] + [np.inf]
y_i = [0.00] + [x[1] for x in self.theta] + [1.00]

Expand All @@ -2184,7 +2174,6 @@ def inverse_transform(self, values: np.ndarray) -> np.ndarray:
Inverse CDF values
"""
assert self.theta is not None
x_i = [x[0] for x in self.theta]
y_i = [x[1] for x in self.theta]

Expand All @@ -2201,7 +2190,7 @@ def inverse_transform(self, values: np.ndarray) -> np.ndarray:
class EmpiricalRandomVariable(RandomVariable):
"""Empirical random variable."""

__slots__: list[str] = ['_raw_sample']
__slots__: list[str] = []

def __init__(
self,
Expand All @@ -2214,9 +2203,12 @@ def __init__(
"""Instantiate an Empirical random variable."""
if truncation_limits is None:
truncation_limits = np.array((np.nan, np.nan))

theta = np.atleast_1d(theta)

super().__init__(
name=name,
theta=None,
theta=theta,
truncation_limits=truncation_limits,
f_map=f_map,
anchor=anchor,
Expand All @@ -2226,8 +2218,6 @@ def __init__(
msg = f'{self.distribution} RVs do not support truncation'
raise NotImplementedError(msg)

self._raw_sample = np.atleast_1d(theta)

def inverse_transform(self, values: np.ndarray) -> np.ndarray:
"""
Evaluate the inverse CDF.
Expand All @@ -2251,14 +2241,14 @@ def inverse_transform(self, values: np.ndarray) -> np.ndarray:
normalized positions.
"""
s_ids = (values * len(self._raw_sample)).astype(int)
return self._raw_sample[s_ids]
s_ids = (values * len(self.theta)).astype(int)
return self.theta[s_ids]


class CoupledEmpiricalRandomVariable(UtilityRandomVariable):
"""Coupled empirical random variable."""

__slots__: list[str] = ['_raw_sample']
__slots__: list[str] = ['theta']

def __init__(
self,
Expand Down Expand Up @@ -2295,19 +2285,17 @@ def __init__(
When truncation limits are provided
"""
if truncation_limits is None:
truncation_limits = np.array((np.nan, np.nan))
super().__init__(
name=name,
f_map=f_map,
anchor=anchor,
)
self.distribution = 'coupled_empirical'
if not np.all(np.isnan(truncation_limits)):
if truncation_limits is not None:
msg = f'{self.distribution} RVs do not support truncation'
raise NotImplementedError(msg)

self._raw_sample = np.atleast_1d(theta)
self.theta = np.atleast_1d(theta)

def inverse_transform(self, sample_size: int) -> np.ndarray:
"""
Expand All @@ -2332,10 +2320,8 @@ def inverse_transform(self, sample_size: int) -> np.ndarray:
dataset.
"""
raw_sample_count = len(self._raw_sample)
new_sample = np.tile(
self._raw_sample, int(sample_size / raw_sample_count) + 1
)
raw_sample_count = len(self.theta)
new_sample = np.tile(self.theta, int(sample_size / raw_sample_count) + 1)
return new_sample[:sample_size]


Expand Down Expand Up @@ -2450,7 +2436,6 @@ def __init__(
if not np.all(np.isnan(truncation_limits)):
msg = f'{self.distribution} RVs do not support truncation'
raise NotImplementedError(msg)
assert self.theta is not None, '`theta` is required for Multinomial RVs'
self.distribution = 'multinomial'
if np.sum(theta) > 1.00:
msg = (
Expand Down Expand Up @@ -2482,7 +2467,6 @@ def inverse_transform(self, values: np.ndarray) -> np.ndarray:
Discrete events corresponding to the input values.
"""
assert self.theta is not None
p_cum = np.cumsum(self.theta)[:-1]

for i, p_i in enumerate(p_cum):
Expand Down

0 comments on commit ac03c43

Please sign in to comment.