From eebc7820cbe505471531c6b07eb56954dedb3944 Mon Sep 17 00:00:00 2001 From: bsaakash <11618528+bsaakash@users.noreply.github.com> Date: Mon, 30 Sep 2024 14:47:58 -0700 Subject: [PATCH 1/3] abs - gp_ab algorithm implementation --- modules/performUQ/common/adaptive_doe.py | 64 +++ modules/performUQ/common/common_datamodels.py | 114 ++++ .../performUQ/common/convergence_metrics.py | 120 ++++ modules/performUQ/common/gp_ab_algorithm.py | 535 ++++++++++++++++++ modules/performUQ/common/gp_model.py | 72 +++ .../common/principal_component_analysis.py | 48 ++ modules/performUQ/common/space_filling_doe.py | 33 ++ modules/performUQ/common/tmcmc.py | 236 ++++++++ 8 files changed, 1222 insertions(+) create mode 100644 modules/performUQ/common/adaptive_doe.py create mode 100644 modules/performUQ/common/common_datamodels.py create mode 100644 modules/performUQ/common/convergence_metrics.py create mode 100644 modules/performUQ/common/gp_ab_algorithm.py create mode 100644 modules/performUQ/common/gp_model.py create mode 100644 modules/performUQ/common/principal_component_analysis.py create mode 100644 modules/performUQ/common/space_filling_doe.py create mode 100644 modules/performUQ/common/tmcmc.py diff --git a/modules/performUQ/common/adaptive_doe.py b/modules/performUQ/common/adaptive_doe.py new file mode 100644 index 000000000..caef0177c --- /dev/null +++ b/modules/performUQ/common/adaptive_doe.py @@ -0,0 +1,64 @@ +import numpy as np + +from space_filling_doe import LatinHypercubeSampling + + +class AdaptiveDesignOfExperiments: + def __init__(self, gp_model, pca, domain): + self.gp_model = gp_model + self.pca = pca + self.domain = domain + + self._lengthscale_for_doe() + self._kernel_for_doe() + self._gp_for_doe() + + def _lengthscale_for_doe(self): + eigenvalues = self.pca.explained_variance_ + w = eigenvalues / np.sum(eigenvalues) + lengthscales = np.atleast_2d([m.kernel.lengthscale for m in self.gp_model]) + self.lengthscale_star = np.sum(w * lengthscales) + return self.lengthscale_star + + def _kernel_for_doe(self): + self.kernel = self.gp_model.kernel.copy() + self.kernel.lengthscale = self.lengthscale_star + return self.kernel + + def _gp_for_doe(self): + self.gp_model_for_doe = self.gp_model[0].copy() + self.gp_model_for_doe.kernel = self.kernel + return self.gp_model_for_doe + + def _imse_w_approximation(self, X_train, mci_samples, candidate_training_points): + self.gp_model_for_doe.set_XY( + X_train, + np.zeros((X_train.shape[0], 1)), + ) + _, pred_var = self.gp_model_for_doe.predict(mci_samples) + n_theta = X_train.shape[1] + beta = 2.0 * n_theta + imse = np.zeros((candidate_training_points.shape[0], 1)) + for i, candidate in enumerate(candidate_training_points): + correlation_vector = self.gp_model_for_doe.kern.K(mci_samples, candidate) + imse[i] = ( + 1 + / (mci_samples.shape[0]) + * np.dot(correlation_vector**beta, pred_var) + ) + return imse + + def select_training_points(self, X_train, n_points, mci_samples, n_candidates): + dimension = X_train.shape[1] + for _ in range(n_points): + lhs = LatinHypercubeSampling( + n_samples=n_candidates, n_dimensions=dimension + ) + candidate_training_points = lhs.generate(self.domain) + imse = self._imse_w_approximation( + X_train, mci_samples, candidate_training_points + ) + next_training_point = candidate_training_points[np.argmax(imse)] + X_train = np.vstack((X_train, next_training_point)) + new_training_points = X_train[-n_points:, :] + return new_training_points diff --git a/modules/performUQ/common/common_datamodels.py b/modules/performUQ/common/common_datamodels.py new file mode 100644 index 000000000..57538ecdc --- /dev/null +++ b/modules/performUQ/common/common_datamodels.py @@ -0,0 +1,114 @@ +import sys +from pathlib import Path +from typing import Any, Dict, List, Literal, Optional + +from pydantic import BaseModel, Field, validator + +sys.path.append( + "/Users/aakash/SimCenter/SimCenterBackendApplications/modules/performUQ/common" +) + + +class EDPItem(BaseModel): + name: str + type: Literal["scalar", "field"] + length: int = Field(gt=0) + + +# class EDP(BaseModel): +# __root__: list[EDPItem] + + +class CorrelationValue(BaseModel): + values: float = Field(..., ge=-1, le=1) + + # class CorrelationMatrix(BaseModel): + # CorrelationMatrix: list[CorrelationValue] + + # class CorrelationMatrix(BaseModel): + # __root__: Annotated[ + # List[Annotated[float, Field(ge=-1, le=1)]], Field() + # ] # List of floats, each between -1 and 1 + + # @model_validator(mode="after") + # def check_square_matrix(cls, model): + # matrix = model.correlationMatrix # Access the matrix directly from the model + # if matrix: + # length = len(matrix) + # sqrt_len = math.isqrt(length) # Integer square root + # if sqrt_len * sqrt_len != length: + # raise ValueError( + # "The length of the correlationMatrix must be a perfect square." + # ) + # return model + + # @classmethod + # def from_list(cls, correlation_matrix: List[float]): + # """Construct the model directly from a list.""" + # return cls(correlationMatrix=correlation_matrix) + + +class ApplicationData(BaseModel): + MS_Path: Path + mainScript: str + postprocessScript: str + + +class FEM_App(BaseModel): + Application: str + ApplicationData: ApplicationData + + +class UQ_App(BaseModel): + Application: str + ApplicationData: dict[str, Any] + + +class Applications(BaseModel): + FEM: FEM_App + UQ: UQ_App + + +class GP_AB_UQData(BaseModel): + calibration_data_file_name: str + calibration_data_path: Path + log_likelihood_file_name: Optional[str] + log_likelihood_path: Optional[Path] + sample_size: int + seed: int + + +# class Model(BaseModel): +# Applications: Applications +# EDP: EDP +# FEM: dict[str, Any] +# UQ: GP_AB_UQData +# correlationMatrix: CorrelationMatrix +# localAppDir: str +# randomVariables: list[dict[str, Any]] +# remoteAppDir: str +# runType: str +# workingDir: str + + +class Model(BaseModel): + Applications: Applications + EDP: List[EDPItem] + FEM: Dict[str, str] # Empty dict + UQ: GP_AB_UQData + WorkflowType: str + correlationMatrix: List[float] + localAppDir: str + randomVariables: list[dict[str, Any]] + remoteAppDir: str + resultType: str + runDir: str + runType: str + summary: List[str] + workingDir: str + + @validator("correlationMatrix", each_item=True) + def check_correlation_matrix(cls, v): + if not (-1 <= v <= 1): + raise ValueError("Each correlation value must be between -1 and 1.") + return v diff --git a/modules/performUQ/common/convergence_metrics.py b/modules/performUQ/common/convergence_metrics.py new file mode 100644 index 000000000..5ccf2343d --- /dev/null +++ b/modules/performUQ/common/convergence_metrics.py @@ -0,0 +1,120 @@ +import numpy as np +from scipy.optimize import minimize +from scipy.special import logsumexp + + +def _calculate_normalization_constants( + current_function_values, previous_function_values +): + # Define the objective function to minimize the difference from 1 for c1 and c2 + def objective_function(log_alphas): + log_alpha_1, log_alpha_2 = log_alphas + + numerator_current = current_function_values - log_alpha_1 + numerator_previous = previous_function_values - log_alpha_2 + + log_normalized_proposal = np.log(0.5) + np.logaddexp( + numerator_current, numerator_previous + ) + + c1 = ( + 1 + / len(numerator_current) + * np.exp(logsumexp(numerator_current - log_normalized_proposal)) + ) + c2 = ( + 1 + / len(numerator_previous) + * np.exp(logsumexp(numerator_previous - log_normalized_proposal)) + ) + + return (c1 - 1) ** 2 + (c2 - 1) ** 2 + + # Initial guesses for log(alpha_1) and log(alpha_2) + initial_log_alpha_1 = logsumexp(current_function_values) + initial_log_alpha_2 = logsumexp(previous_function_values) + initial_guess = [initial_log_alpha_1, initial_log_alpha_2] + + # Perform the optimization + result = minimize(objective_function, initial_guess, method="BFGS") + + # Extract optimized alpha_1 and alpha_2 + optimized_log_alpha_1, optimized_log_alpha_2 = result.x + alpha_1_optimized = np.exp(optimized_log_alpha_1) + alpha_2_optimized = np.exp(optimized_log_alpha_2) + + return (alpha_1_optimized, alpha_2_optimized) + + +def _calculate_kl_divergence( + current_log_target_function, previous_log_target_function, samples +): + current_function_values = current_log_target_function(samples) + previous_function_values = previous_log_target_function(samples) + alpha_1, alpha_2 = _calculate_normalization_constants( + current_function_values, previous_function_values + ) + kl_divergence_estimate = ( + 1 + / len(samples) + * np.sum( + np.log( + current_function_values + / previous_function_values + * alpha_2 + / alpha_1 + ) + * current_function_values + / ( + 1 + / 2 + * ( + current_function_values + + alpha_1 / alpha_2 * previous_function_values + ) + ) + ) + ) + + return kl_divergence_estimate + + +def calculate_gkl( + current_log_target_function, previous_log_target_function, samples +): + kl_divergence_estimate = _calculate_kl_divergence( + current_log_target_function, previous_log_target_function, samples + ) + n_theta = np.shape(samples)[1] + gkl = 1 / n_theta * kl_divergence_estimate + return gkl + + +def calculate_gmap( + current_log_target_function, + previous_log_target_function, + samples, + prior_variances, +): + current_function_values = current_log_target_function(samples) + previous_function_values = previous_log_target_function(samples) + current_map = np.argmax(current_function_values) + previous_map = np.argmax(previous_function_values) + + gmap = np.sqrt(np.sum((current_map - previous_map) ** 2 / prior_variances)) + return gmap + + +def calculate_loo_nrmse_w( + loo_predictions, + gp_surrogate_model_prediction, + weights=None, +): + if weights is None: + weights = np.ones_like(loo_predictions) + normalized_weights = weights / np.sum(weights) + g_i = np.linalg.norm( + loo_predictions - gp_surrogate_model_prediction, axis=1, keepdims=True + ) / np.linalg.norm(gp_surrogate_model_prediction, axis=1, keepdims=True) + g_cv = normalized_weights * g_i / len(loo_predictions) + return g_cv diff --git a/modules/performUQ/common/gp_ab_algorithm.py b/modules/performUQ/common/gp_ab_algorithm.py new file mode 100644 index 000000000..09c53013a --- /dev/null +++ b/modules/performUQ/common/gp_ab_algorithm.py @@ -0,0 +1,535 @@ +import importlib +import json +import os +import sys +import time +from functools import partial +from pathlib import Path +from typing import Literal + +import numpy as np +import pydantic + +# sys.path.append( +# "/Users/aakash/SimCenter/SimCenterBackendApplications/modules/performUQ/common" +# ) +import uq_utilities +from scipy.stats import invgamma, norm + +import common_datamodels +import convergence_metrics +from adaptive_doe import AdaptiveDesignOfExperiments +from gp_model import GaussianProcessModel +from principal_component_analysis import PrincipalComponentAnalysis +from space_filling_doe import LatinHypercubeSampling +from tmcmc import TMCMC, calculate_warm_start_stage + + +def log_likelihood(prediction_error_vector, prediction_error_variance): + return np.sum( + norm.logpdf(prediction_error_vector, 0, np.sqrt(prediction_error_variance)) + ) + + +def _response_approximation(current_gp, current_pca, model_parameters): + latent_predictions, _ = current_gp.predict(model_parameters) + gp_prediction = current_pca.project_back_to_original_space(latent_predictions) + return gp_prediction + + +class GP_AB_Algorithm: + def __init__( + self, + data, + output_length_list, + output_names_list, + input_dimension, + output_dimension, + domain, + model_evaluation_function, + sample_transformation_function, + prior_pdf_function, + log_likelihood_function, + prior_variances, + max_simulations=np.inf, + max_computational_time=np.inf, + pca_threshold=0.999, + run_type="runningLocal", + loocv_threshold=0.2, + recalibration_ratio=0.1, + gkl_threshold=0.01, + gmap_threshold=0.01, + ): + self.data = data + self.input_dimension = input_dimension + self.output_dimension = output_dimension + self.output_length_list = output_length_list + self.domain = domain + self.prior_variances = prior_variances + + self.inputs = None + self.outputs = None + self.latent_outputs = None + + self.prior_pdf_function = prior_pdf_function + self.log_likelihood_function = log_likelihood_function + + self.pca_threshold = pca_threshold + self.num_pca_components_list = [] + + self.gkl_threshold = gkl_threshold + self.gmap_threshold = gmap_threshold + + self.max_simulations = max_simulations + self.max_computational_time = max_computational_time + self.start_time = time.time() + + self.converged = False + self.budget_exceeded = False + self.terminate = False + + self.num_experiments = [0] + self.num_recalibration_experiments = 0 + self.recalibration_ratio = recalibration_ratio + + self.sample_transformation_function = sample_transformation_function + self.model_evaluation_function = model_evaluation_function + self.run_type = run_type + self.parallel_pool = uq_utilities.get_parallel_pool_instance(run_type) + self.parallel_evaluation_function = self.parallel_pool.pool.starmap + + self.loocv_threshold = loocv_threshold + + self.results = dict() + self.current_gp_model = GaussianProcessModel( + self.input_dimension, 1, ARD=True + ) + self.current_pca = PrincipalComponentAnalysis(self.pca_threshold) + + self.samples_dict = dict() + self.betas_dict = dict() + self.log_likelihoods_dict = dict() + self.log_target_density_values_dict = dict() + self.log_evidence_dict = dict() + + def _evaluate_in_parallel(self, func, samples): + transformed_samples = self.sample_transformation_function(samples) + simulation_numbers = np.arange(len(samples)) + iterable = zip(simulation_numbers, transformed_samples) + outputs = np.atleast_2d( + list(self.parallel_evaluation_function(func, iterable)) + ) + return outputs + + def _perform_initial_doe(self, n_samples): + self.initial_doe = LatinHypercubeSampling( + n_samples=n_samples, n_dimensions=self.input_dimension + ) + samples = self.initial_doe.generate(self.domain) + return samples + + def _get_initial_training_set(self, n_samples): + inputs = self._perform_initial_doe(n_samples) + outputs = self._evaluate_in_parallel(self.model_evaluation_function, inputs) + # initial_training_set = np.hstack((inputs, outputs)) + # return initial_training_set + return inputs, outputs + + def _log_likelihood_approximation(self, response_approximation, samples): + u_values = samples[:, : self.input_dimension] + model_parameters = self.sample_transformation_function(u_values).reshape( + 1, -1 + ) + predictions = response_approximation(model_parameters) + prediction_errors = self.data - predictions + + q = samples[:, self.input_dimension :] + + log_likes = [] + num_samples = samples.shape[0] + for i in range(num_samples): + ll = [] + start = 0 + for j in range(q.shape[1]): + ll.append( + self.log_likelihood_function( + prediction_errors[ + i, start : start + self.output_length_list[j] + ], + q[i, j], + ) + ) + start += self.output_length_list[j] + log_likes.append(np.sum(ll)) + return np.array(log_likes).reshape((num_samples, 1)) + + def _log_prior_pdf(self, samples): + u_values = samples[:, : self.input_dimension] + model_parameters = self.sample_transformation_function(u_values) + log_prior_model_parameters = np.log( + self.prior_pdf_function(model_parameters) + ).reshape((-1, 1)) + # TODO: (ABS) Decide what prior to use for q + q = samples[:, self.input_dimension :] + log_prior_q = np.sum(invgamma.logpdf(q, 1, scale=0.5), axis=1) + prior_log_pdf = log_prior_model_parameters + log_prior_q + return prior_log_pdf + + def _log_posterior_approximation(self, samples, log_likelihoods): + log_prior = self._log_prior_pdf(samples) + log_posterior = log_likelihoods + log_prior + return log_posterior + + def loocv_measure(self, log_likelihood_approximation): + lls = log_likelihood_approximation(self.inputs) + + # weights = ( + # 2 / 3 * np.ones((self.database.shape[0], 1)) + # + 1 / 3 * self.adaptive_weights_per_stage + # ) + loo_predictions = self.current_gp_model.loo_predictions(self.outputs) + loocv_measure = convergence_metrics.calculate_loo_nrmse_w( + loo_predictions, lls + ) + return loocv_measure + + def calibrate_gp(self, model_parameters, model_outputs): + latent_outputs = self.current_pca.project_to_latent_space(model_outputs) + self.num_pca_components_list.append(np.shape(latent_outputs)[1]) + self.current_gp_model = GaussianProcessModel( + self.input_dimension, self.num_pca_components_list[-1], ARD=True + ) + self.current_gp_model.fit(model_parameters, latent_outputs) + self.num_recalibration_experiments = self.num_experiments[-1] + + def run_iteration(self, k): + self.iteration_number = k + model_parameters = self.inputs + model_outputs = self.outputs + + # Step 1.1: GP calibration + if ( + self.num_experiments[-1] - self.num_recalibration_experiments + >= self.recalibration_ratio * self.num_recalibration_experiments + ): + self.previous_gp_model = self.current_gp_model + self.latent_outputs = self.current_pca.project_to_latent_space( + model_outputs + ) + self.num_pca_components_list.append(np.shape(self.latent_outputs)[1]) + self.current_gp_model = GaussianProcessModel( + self.input_dimension, self.num_pca_components_list[-1], ARD=True + ) + self.current_gp_model.fit(model_parameters, self.latent_outputs) + self.num_recalibration_experiments = self.num_experiments[-1] + else: + self.current_gp_model = self.previous_gp_model + self.latent_outputs = self.current_pca.project_to_latent_space( + model_outputs + ) + self.num_pca_components_list.append(np.shape(self.latent_outputs)[1]) + + # Step 1.2: GP predictive model + gp_prediction_latent_mean, gp_prediction_latent_variance = ( + self.current_gp_model.predict(model_parameters) + ) + gp_prediction_mean = self.current_pca.project_back_to_original_space( + gp_prediction_latent_mean + ) + loo_predictions_latent_space = self.current_gp_model.loo_predictions( + self.latent_outputs + ) + loo_predictions = self.current_pca.project_back_to_original_space( + loo_predictions_latent_space + ) + + # Step 1.3: Posterior distribution approximation + response_approximation = partial( + _response_approximation, self.current_gp_model, self.current_pca + ) + log_likelihood_approximation = partial( + self._log_likelihood_approximation, response_approximation + ) + log_posterior_approximation = self._log_posterior_approximation + + # Step 2.1: Evaluate warm-starting for TMCMC + tmcmc = TMCMC(log_likelihood_approximation, log_posterior_approximation) + + j_star = 0 + beta = 0 + + samples_dict = dict() + betas_dict = dict() + log_likelihoods_dict = dict() + log_target_density_values_dict = dict() + log_evidence_dict = dict() + + num_samples_per_stage = 50 + if k > 0: + # loocv_measure = self.loocv_measure() + # if loocv_measure < self.loocv_threshold: + # j_star = self._calculate_warm_start_stage( + # log_likelihood_approximation, self.betas_dict + # ) + j_star = calculate_warm_start_stage( + log_likelihood_approximation, current_tmcmc_results + ) + previous_log_posterior_approximation = log_posterior_approximation + + # Step 2.2: Sequential MC sampling + if j_star == 0: + samples = self._perform_initial_doe(num_samples_per_stage) + log_target_density_values = np.log( + self.prior_pdf_function(samples) + ).reshape((-1, 1)) + log_likelihood_values = np.ones_like(log_target_density_values) + log_evidence = 0 + beta = 0 + + samples_dict[j_star] = samples + betas_dict[j_star] = beta + log_likelihoods_dict[j_star] = log_likelihood_values + log_target_density_values_dict[j_star] = log_target_density_values + log_evidence_dict[j_star] = log_evidence + else: + for j in range(j_star): + samples_dict[j] = self.samples_dict[j] + betas_dict[j] = self.betas_dict[j] + log_likelihoods_dict[j] = self.log_likelihoods_dict[j] + log_target_density_values_dict[j] = ( + self.log_target_density_values_dict[j] + ) + log_evidence_dict[j] = self.log_evidence_dict[j] + + current_tmcmc_results = tmcmc.run( + samples_dict, + betas_dict, + log_likelihoods_dict, + log_target_density_values_dict, + log_evidence_dict, + np.random.default_rng(), + j_star, + num_burn_in=0, + ) + + # Step 3.1: Assess convergence + gkl = convergence_metrics.calculate_gkl( + log_posterior_approximation, + previous_log_posterior_approximation, + samples, + ) + gmap = convergence_metrics.calculate_gmap( + log_posterior_approximation, + previous_log_posterior_approximation, + samples, + self.prior_variances, + ) + self.converged = gkl < self.gkl_threshold and gmap < self.gmap_threshold + + # Step 3.2: Computational budget related termination + self.budget_exceeded = (len(self.inputs) >= self.max_simulations) or ( + time.time() - self.start_time >= self.max_computational_time + ) + self.terminate = self.converged or self.budget_exceeded + if self.terminate: + return self.terminate, current_tmcmc_results + + # Step 4.1: Select variance for DoE + n_training_points = 2 * self.input_dimension + self.exploitation_proportion = 0.5 + n_exploit = np.ceil(self.exploitation_proportion * n_training_points) + current_doe = AdaptiveDesignOfExperiments( + self.current_gp_model, self.current_pca, self.domain + ) + + # Step 4.2: Exploitation DoE + exploitation_training_points = current_doe.select_training_points( + self.inputs, n_exploit, samples, 1000 + ) + self.inputs = np.vstack([self.inputs, exploitation_training_points]) + + # Step 4.3: Exploration DoE + exploration_training_points = current_doe.select_training_points( + self.inputs, n_training_points - n_exploit, samples, 1000 + ) + self.inputs = np.vstack([self.inputs, exploration_training_points]) + + new_training_points = np.vstack( + [exploitation_training_points, exploration_training_points] + ) + self.num_experiments.append(len(self.inputs)) + + # Step 5: Response estimation + new_training_outputs = self._evaluate_in_parallel( + self.model_evaluation_function, new_training_points + ) + self.outputs = np.vstack([self.outputs, new_training_outputs]) + + return self.terminate, current_tmcmc_results + + def run_initial_doe(self, num_initial_doe_per_dim=2): + num_initial_doe_samples = num_initial_doe_per_dim * self.input_dimension + inputs, outputs = self._get_initial_training_set(num_initial_doe_samples) + return inputs, outputs, num_initial_doe_samples + + def write_results(self): + print(f"{self.iteration_number = }") + print("Results written to file") + + +def main(input_arguments): + gp_ab = GP_AB_Algorithm(*preprocess(input_arguments)) + inputs, outputs, num_initial_doe_samples = gp_ab.run_initial_doe( + num_initial_doe_per_dim=2 + ) + gp_ab.inputs = inputs + gp_ab.outputs = outputs + gp_ab.num_experiments.append(num_initial_doe_samples) + iteration = 0 + while not gp_ab.terminate: + gp_ab.terminate, gp_ab.results = gp_ab.run_iteration(iteration) + gp_ab.write_results() + iteration += 1 + + +def read_inputs(input_json_file): + with open(input_json_file, encoding="utf-8") as f: + inputs = json.load(f) + + # application_inputs = inputs["Applications"] + # uq_inputs = inputs["UQ"] + # rv_inputs = inputs["randomVariables"] + # correlation_matrix_inputs = inputs["correlationMatrix"] + # edp_inputs = inputs["EDP"] + + input_data = common_datamodels.Model.model_validate(inputs) + uq_inputs = input_data.UQ + rv_inputs = input_data.randomVariables + correlation_matrix_inputs = input_data.correlationMatrix + edp_inputs = inputs["EDP"] + application_inputs = input_data.Applications + + # application_inputs = common_datamodels.Applications.model_validate( + # inputs["Applications"] + # ) + # uq_inputs = GP_AB_UQData.model_validate(inputs["UQ"]) + # rv_inputs = inputs["randomVariables"] + # correlation_matrix_inputs = common_datamodels.CorrelationMatrix.from_list( + # inputs["correlationMatrix"] + # ) + # edp_inputs = common_datamodels.EDP(inputs["EDP"]) + + return ( + uq_inputs, + rv_inputs, + correlation_matrix_inputs, + edp_inputs, + application_inputs, + ) + + +def preprocess(input_arguments): + ( + uq_inputs, + rv_inputs, + correlation_matrix_inputs, + edp_inputs, + application_inputs, + ) = read_inputs(input_arguments.input_json_file) + + data_file = ( + uq_inputs.calibration_data_path / uq_inputs.calibration_data_file_name + ) + with open(data_file, "r") as f: + data = np.genfromtxt(f, delimiter=",") + + log_likelihood_file_name = uq_inputs.log_likelihood_file_name + log_likelihood_path = uq_inputs.log_likelihood_path + log_likelihood_function = log_likelihood + if log_likelihood_file_name: + sys.path.append(str(log_likelihood_path)) + ll_module = importlib.import_module(log_likelihood_file_name) + log_likelihood_function = getattr(ll_module, "log_likelihood") + + joint_distribution = uq_utilities.ERANatafJointDistribution( + rv_inputs, + correlation_matrix_inputs, + ) + domain = [(-3, 3) for _ in range(len(rv_inputs))] + prior_variances = [1 for _ in range(len(rv_inputs))] # TODO: (ABS) Validate this + # Transformation function from standard to physical space + sample_transformation_function = joint_distribution.u_to_x + + # Prior logpdf function + prior_pdf_function = joint_distribution.pdf + + main_script_path = str(application_inputs.FEM.ApplicationData.MS_Path) + + model = uq_utilities.get_default_model( + list_of_rv_data=rv_inputs, + edp_data=edp_inputs, + list_of_dir_names_to_copy_files_from=[main_script_path], + run_directory=input_arguments.path_to_working_directory, + driver_filename=str(input_arguments.driver_file_name), + workdir_prefix="workdir", + ) + model_evaluation_function = model.evaluate_model_once + + edp_names_list = [edp["name"] for edp in edp_inputs] + edp_lengths_list = [edp["length"] for edp in edp_inputs] + input_dimension = len(rv_inputs) + output_dimension = sum( + edp_lengths_list + ) # TODO: (ABS) Validate this against length of data + + max_simulations = np.inf + max_computational_time = np.inf + pca_threshold = 0.999 + run_type = input_arguments.run_type + loocv_threshold = 0.2 + recalibration_ratio = 0.1 + + return ( + data, + edp_lengths_list, + edp_names_list, + input_dimension, + output_dimension, + domain, + model_evaluation_function, + sample_transformation_function, + prior_pdf_function, + log_likelihood_function, + prior_variances, + max_simulations, + max_computational_time, + pca_threshold, + run_type, + loocv_threshold, + recalibration_ratio, + ) + + +class InputArguments(pydantic.BaseModel): + path_to_working_directory: Path + path_to_template_directory: Path + run_type: Literal["runningLocal", "runningRemote"] + driver_file_name: Path + input_json_file: Path + + model_config = pydantic.ConfigDict(revalidate_instances="always") + + +if __name__ == "__main__": + this = Path(__file__).resolve() + os.chdir("/Users/aakash/Documents/quoFEM/LocalWorkDir/tmp.SimCenter") + args = { + "path_to_working_directory": Path(sys.argv[1]).resolve(), + "path_to_template_directory": Path(sys.argv[2]).resolve(), + "run_type": sys.argv[3], + "driver_file_name": Path(sys.argv[4]).resolve(), + "input_json_file": Path(sys.argv[5]).resolve(), + } + input_arguments = InputArguments.model_validate(args) + main(input_arguments) + os.chdir(this.parent) diff --git a/modules/performUQ/common/gp_model.py b/modules/performUQ/common/gp_model.py new file mode 100644 index 000000000..b25391918 --- /dev/null +++ b/modules/performUQ/common/gp_model.py @@ -0,0 +1,72 @@ +import GPy +import numpy as np +from scipy.linalg import cho_solve + + +class GaussianProcessModel: + def __init__(self, input_dimension, output_dimension, ARD): + self.input_dimension = input_dimension + self.output_dimension = output_dimension + self.ARD = ARD + + self.kernel = self._create_kernel() + self.model = self._create_surrogate() + # self._add_nugget_parameter() + self._fix_nugget_parameter() + + def _create_kernel(self): + return GPy.kern.Matern52(input_dim=self.input_dimension, ARD=self.ARD) + + def _create_surrogate(self): + m_list = [] + for _ in range(self.output_dimension): + m_list += [ + GPy.models.GPRegression( + np.zeros((1, self.input_dimension)), + np.zeros((1, 1)), + self.kernel.copy(), + ) + ] + return m_list + + def _add_nugget_parameter(self): + for i in range(self.output_dimension): + self.model[i].Gaussian_noise.variance = 1e-6 + + def _fix_nugget_parameter(self, value=1e-8): + for i in range(self.output_dimension): + self.model[i].likelihood.variance = value + self.model[i].likelihood.variance.fix() + + def fit(self, X_train, Y_train): + for i in range(self.output_dimension): + self.model[i].set_XY(X_train, np.reshape(Y_train[:, i], (-1, 1))) + self.model[i].optimize() + + def predict(self, X_predict): + Y_mean = np.zeros((X_predict.shape[0], self.output_dimension)) + Y_var = np.zeros((X_predict.shape[0], self.output_dimension)) + for i in range(self.output_dimension): + mean, var = self.model[i].predict(X_predict) + Y_mean[:, i] = mean.flatten() + Y_var[:, i] = var.flatten() + + return Y_mean, Y_var + + def loo_predictions(self, Y_train): + loo_pred = np.zeros_like(Y_train) + for i in range(self.output_dimension): + cholesky_factor = self.model[i].posterior.K_chol + alpha = cho_solve( + (cholesky_factor, True), np.reshape(Y_train[:, i], (-1, 1)) + ) + cholesky_factor_inverse = cho_solve( + (cholesky_factor, True), np.eye(cholesky_factor.shape[0]) + ) + K_inv_diag = np.reshape( + np.sum(cholesky_factor_inverse**2, axis=1), (-1, 1) + ) + loo_pred[:, i] = ( + np.reshape(Y_train[:, i], (-1, 1)) - alpha / K_inv_diag + ).flatten() + return loo_pred diff --git a/modules/performUQ/common/principal_component_analysis.py b/modules/performUQ/common/principal_component_analysis.py new file mode 100644 index 000000000..e0b7a3643 --- /dev/null +++ b/modules/performUQ/common/principal_component_analysis.py @@ -0,0 +1,48 @@ +import numpy as np +from sklearn.decomposition import PCA + + +class PrincipalComponentAnalysis: + def __init__(self, pca_threshold) -> None: + self.pca_threshold = pca_threshold + + self.pca = None + self.mean_vec = None + self.proj_matrix = None + self.eigenvalues = None + self.explained_variance_ratio = None + self.n_components = None + + def project_to_latent_space(self, outputs): + # Perform PCA on the output data to reduce the dimensionality + pca = PCA() + pca.fit(outputs) + + # Determine the number of components to retain based on the threshold value + explained_variance_ratio = pca.explained_variance_ratio_ + self.n_components = ( + np.argmax(np.cumsum(explained_variance_ratio) >= self.pca_threshold) + 1 + ) + + # Perform PCA with the specified number of components and transform the output data + # TODO(ABS): Reimplement without the second PCA fit + self.pca = PCA(n_components=self.n_components) + self.pca.fit(outputs) + latent_outputs = self.pca.transform(outputs) + + # Store the PCA parameters for later use in inverse transformation + self.mean_vec = self.pca.mean_ + self.proj_matrix = self.pca.components_ + self.eigenvalues = self.pca.explained_variance_ + self.explained_variance_ratio = self.pca.explained_variance_ratio_ + + return latent_outputs + + def project_back_to_original_space(self, latent_outputs): + # Check if the PCA model has been fitted + if self.pca is None: + raise ValueError("No PCA model has been fitted yet.") + else: + # Inverse transform the latent outputs using the stored PCA parameters + outputs = self.pca.inverse_transform(latent_outputs) + return outputs diff --git a/modules/performUQ/common/space_filling_doe.py b/modules/performUQ/common/space_filling_doe.py new file mode 100644 index 000000000..62b3f86b7 --- /dev/null +++ b/modules/performUQ/common/space_filling_doe.py @@ -0,0 +1,33 @@ +from typing import Optional + +import numpy as np +from pydantic import BaseModel, PositiveInt +from scipy.stats import qmc + + +class LatinHypercubeSampling(BaseModel, validate_assignment=True): + n_samples: PositiveInt + n_dimensions: PositiveInt + seed: Optional[PositiveInt] = None + + def generate(self, domain=None): + lhs = qmc.LatinHypercube(self.n_dimensions, seed=self.seed) + samples = lhs.random(self.n_samples) + if domain is not None: + # Scale samples to the provided domain + lower_bounds = np.array([bound[0] for bound in domain]) + upper_bounds = np.array([bound[1] for bound in domain]) + samples = qmc.scale(samples, lower_bounds, upper_bounds) + return samples + + +if __name__ == "__main__": + initial_doe = LatinHypercubeSampling(n_samples=10, n_dimensions=2) + print(repr(initial_doe)) + samples = initial_doe.generate() + print(samples) + domain = [(0, 10), (-5, 5)] # Example of domain for each dimension + scaled_samples = initial_doe.generate(domain) + print(scaled_samples) + # initial_doe.seed = 0 + # idoe = InitialDesignOfExperiments(n_samples=0, n_dimensions=0, seed=-1) diff --git a/modules/performUQ/common/tmcmc.py b/modules/performUQ/common/tmcmc.py new file mode 100644 index 000000000..4edc8a1d4 --- /dev/null +++ b/modules/performUQ/common/tmcmc.py @@ -0,0 +1,236 @@ +import math + +import numpy as np +from scipy.special import logsumexp + + +def _calculate_weights_warm_start( + beta, current_loglikelihoods, previous_loglikelihoods +): + log_weights = beta * (current_loglikelihoods - previous_loglikelihoods) + log_sum_weights = logsumexp(log_weights) + normalized_log_weights = log_weights - log_sum_weights + normalized_weights = np.exp(normalized_log_weights) + weights = normalized_weights / np.sum(normalized_weights) + return weights + + +def calculate_warm_start_stage( + current_loglikelihood_approximation, previous_results, threshold_cov=1 +): + stage_nums = sorted(previous_results[0].keys(), reverse=True) + for stage_num in stage_nums: + current_loglikelihoods = current_loglikelihood_approximation( + previous_results[0][stage_num] + ) + previous_loglikelihoods = previous_results[2][stage_num] + beta = previous_results[1][stage_num] + weights = _calculate_weights_warm_start( + beta, current_loglikelihoods, previous_loglikelihoods + ) + cov_weights = np.nanstd(weights) / np.nanmean(weights) + if cov_weights < threshold_cov: + return stage_num + return 0 + + +def _calculate_weights(beta_increment, log_likelihoods): + log_weights = beta_increment * log_likelihoods + log_sum_weights = logsumexp(log_weights) + normalized_log_weights = log_weights - log_sum_weights + normalized_weights = np.exp(normalized_log_weights) + weights = normalized_weights / np.sum(normalized_weights) + return weights + + +def _calculate_log_evidence(beta_increment, log_likelihoods): + log_evidence = logsumexp(beta_increment * log_likelihoods) - np.log( + len(log_likelihoods) + ) + return log_evidence + + +def _increment_beta(log_likelihoods, beta, threshold_cov=1): + if beta >= 1: + return 1 + beta_increment = 1 - beta + weights = _calculate_weights(beta_increment, log_likelihoods) + cov_weights = np.nanstd(weights) / np.nanmean(weights) + while cov_weights > threshold_cov: + beta_increment = 0.99 * beta_increment + weights = _calculate_weights(beta_increment, log_likelihoods) + cov_weights = np.nanstd(weights) / np.nanmean(weights) + proposed_beta = beta + beta_increment + new_beta = min(proposed_beta, 1) + return new_beta + + +def _get_scaled_proposal_covariance(samples, weights, scale_factor=0.2): + return scale_factor**2 * np.cov( + samples, rowvar=False, aweights=weights.flatten() + ) + + +class TMCMC: + def __init__( + self, + log_likelihood_approximation_function, + log_target_density_approximation_function, + threshold_cov=1, + num_steps=1, + thinning_factor=10, + adapt_frequency=100, + ): + self._log_likelihood_approximation = log_likelihood_approximation_function + self._log_posterior_approximation = log_target_density_approximation_function + + self.num_steps = num_steps + self.threshold_cov = threshold_cov + self.thinning_factor = thinning_factor + self.adapt_frequency = adapt_frequency + + def _run_one_stage( + self, + samples, + log_likelihoods, + log_target_density_values, + beta, + rng, + log_likelihood_function, + log_target_density_function, + scale_factor, + target_acceptance_rate, + do_thinning=False, + burn_in_steps=0, + ): + new_beta = _increment_beta(log_likelihoods, beta) + log_evidence = _calculate_log_evidence(new_beta - beta, log_likelihoods) + weights = _calculate_weights(new_beta - beta, log_likelihoods) + + proposal_covariance = _get_scaled_proposal_covariance( + samples, weights, scale_factor + ) + + new_samples = np.zeros_like(samples) + new_log_likelihoods = np.zeros_like(log_likelihoods) + new_log_target_density_values = np.zeros_like(log_target_density_values) + + current_samples = samples.copy() + current_log_likelihoods = log_likelihoods.copy() + current_log_target_density_values = log_target_density_values.copy() + + num_samples = samples.shape[0] + num_accepts = 0 + n_adapt = 1 + step_count = 0 + for k in range(burn_in_steps + num_samples): + print(f"{k=}") + index = rng.choice(num_samples, p=weights.flatten()) + if k >= burn_in_steps: + if new_beta == 1 or do_thinning: + self.num_steps = self.num_steps * self.thinning_factor + for _ in range(self.num_steps): + step_count += 1 + if step_count % self.adapt_frequency == 0: + acceptance_rate = num_accepts / self.adapt_frequency + num_accepts = 0 + n_adapt += 1 + ca = (acceptance_rate - target_acceptance_rate) / ( + math.sqrt(n_adapt) + ) + scale_factor = scale_factor * np.exp(ca) + proposal_covariance = _get_scaled_proposal_covariance( + current_samples, weights, scale_factor + ) + + proposed_state = rng.multivariate_normal( + current_samples[index, :], proposal_covariance + ).reshape(1, -1) + log_likelihood_at_proposed_state = log_likelihood_function( + proposed_state + ) + log_target_density_at_proposed_state = log_target_density_function( + proposed_state, log_likelihood_at_proposed_state + ) + log_hastings_ratio = ( + log_target_density_at_proposed_state + - current_log_target_density_values[index] + ) + u = rng.uniform() + accept = np.log(u) <= log_hastings_ratio + if accept: + num_accepts += 1 + current_samples[index, :] = proposed_state + current_log_likelihoods[index] = log_likelihood_at_proposed_state + current_log_target_density_values[index] = ( + log_target_density_at_proposed_state + ) + if k >= burn_in_steps: + weights = _calculate_weights( + new_beta - beta, current_log_likelihoods + ) + if k >= burn_in_steps: + k_prime = k - burn_in_steps + new_samples[k_prime, :] = current_samples[index, :] + new_log_likelihoods[k_prime] = current_log_likelihoods[index] + new_log_target_density_values[k_prime] = ( + current_log_target_density_values[index] + ) + + return ( + new_samples, + new_log_likelihoods, + new_log_target_density_values, + new_beta, + log_evidence, + ) + + def run( + self, + samples_dict, + betas_dict, + log_likelihoods_dict, + log_target_density_values_dict, + log_evidence_dict, + rng, + stage_num, + num_burn_in=0, + ): + self.num_dimensions = samples_dict[0].shape[1] + self.target_acceptance_rate = 0.23 + 0.21 / self.num_dimensions + self.scale_factor = 2.4 / np.sqrt(self.num_dimensions) + while betas_dict[stage_num] < 1: + print(f"Stage {stage_num}") + ( + new_samples, + new_log_likelihoods, + new_log_target_density_values, + new_beta, + log_evidence, + ) = self._run_one_stage( + samples_dict[stage_num], + log_likelihoods_dict[stage_num], + log_target_density_values_dict[stage_num], + betas_dict[stage_num], + rng, + self._log_likelihood_approximation, + self._log_posterior_approximation, + self.scale_factor, + self.target_acceptance_rate, + do_thinning=False, + burn_in_steps=num_burn_in, + ) + stage_num += 1 + samples_dict[stage_num] = new_samples + betas_dict[stage_num] = new_beta + log_likelihoods_dict[stage_num] = new_log_likelihoods + log_target_density_values_dict[stage_num] = new_log_target_density_values + log_evidence_dict[stage_num] = log_evidence + + return ( + samples_dict, + betas_dict, + log_likelihoods_dict, + log_target_density_values_dict, + log_evidence_dict, + ) From 88842248906ef79289f6076fb6ef44edbfabf753 Mon Sep 17 00:00:00 2001 From: bsaakash <11618528+bsaakash@users.noreply.github.com> Date: Mon, 30 Sep 2024 15:31:45 -0700 Subject: [PATCH 2/3] abs - linting --- modules/performUQ/common/__init__.py | 1 + modules/performUQ/common/adaptive_doe.py | 116 +++++- modules/performUQ/common/common_datamodels.py | 206 +++++++---- .../performUQ/common/convergence_metrics.py | 89 ++++- modules/performUQ/common/gp_ab_algorithm.py | 343 +++++++++++++++--- modules/performUQ/common/gp_model.py | 100 ++++- .../common/principal_component_analysis.py | 58 ++- modules/performUQ/common/space_filling_doe.py | 53 ++- modules/performUQ/common/tmcmc.py | 147 +++++++- 9 files changed, 937 insertions(+), 176 deletions(-) create mode 100644 modules/performUQ/common/__init__.py diff --git a/modules/performUQ/common/__init__.py b/modules/performUQ/common/__init__.py new file mode 100644 index 000000000..6e031999e --- /dev/null +++ b/modules/performUQ/common/__init__.py @@ -0,0 +1 @@ +# noqa: D104 diff --git a/modules/performUQ/common/adaptive_doe.py b/modules/performUQ/common/adaptive_doe.py index caef0177c..3a636386d 100644 --- a/modules/performUQ/common/adaptive_doe.py +++ b/modules/performUQ/common/adaptive_doe.py @@ -1,10 +1,50 @@ -import numpy as np +"""Implements an adaptive design of experiments strategy using Gaussian Process (GP) and Principal Component Analysis (PCA).""" +import numpy as np from space_filling_doe import LatinHypercubeSampling class AdaptiveDesignOfExperiments: + """ + Adaptive Design of Experiments (DoE) using Gaussian Process (GP) and Principal Component Analysis (PCA). + + This class implements an adaptive design of experiments strategy to select new training points + based on the Integrated Mean Squared Error (IMSE) criterion. + + Attributes + ---------- + gp_model (list): List of Gaussian Process models. + pca (PCA): Principal Component Analysis object. + domain (array-like): The domain of the input space. + + Methods + ------- + _lengthscale_for_doe(): + Compute the lengthscale for the design of experiments. + + _kernel_for_doe(): + Create a kernel for the design of experiments. + + _gp_for_doe(): + Create a Gaussian Process model for the design of experiments. + + _imse_w_approximation(X_train, mci_samples, candidate_training_points): + Compute the IMSE approximation for candidate training points. + + select_training_points(X_train, n_points, mci_samples, n_candidates): + Select new training points based on the IMSE criterion. + """ + def __init__(self, gp_model, pca, domain): + """ + Initialize the AdaptiveDesignOfExperiments class. + + Parameters + ---------- + gp_model (list): List of Gaussian Process models. + pca (PCA): Principal Component Analysis object. + domain (array-like): The domain of the input space. + """ self.gp_model = gp_model self.pca = pca self.domain = domain @@ -14,6 +54,16 @@ def __init__(self, gp_model, pca, domain): self._gp_for_doe() def _lengthscale_for_doe(self): + """ + Compute the lengthscale for the design of experiments. + + The lengthscale is computed as a weighted sum of the lengthscales of the individual GP models, + where the weights are the explained variances of the PCA components. + + Returns + ------- + float: The computed lengthscale. + """ eigenvalues = self.pca.explained_variance_ w = eigenvalues / np.sum(eigenvalues) lengthscales = np.atleast_2d([m.kernel.lengthscale for m in self.gp_model]) @@ -21,22 +71,53 @@ def _lengthscale_for_doe(self): return self.lengthscale_star def _kernel_for_doe(self): + """ + Create a kernel for the design of experiments. + + The kernel is a copy of the kernel of the first GP model, with the lengthscale set to the computed lengthscale. + + Returns + ------- + Kernel: The created kernel. + """ self.kernel = self.gp_model.kernel.copy() self.kernel.lengthscale = self.lengthscale_star return self.kernel def _gp_for_doe(self): + """ + Create a Gaussian Process model for the design of experiments. + + The GP model is a copy of the first GP model, with the kernel set to the created kernel. + + Returns + ------- + GaussianProcessRegressor: The created GP model. + """ self.gp_model_for_doe = self.gp_model[0].copy() self.gp_model_for_doe.kernel = self.kernel return self.gp_model_for_doe - def _imse_w_approximation(self, X_train, mci_samples, candidate_training_points): + def _imse_w_approximation(self, x_train, mci_samples, candidate_training_points): + """ + Compute the IMSE approximation for candidate training points. + + Parameters + ---------- + X_train (array-like): The current training data. + mci_samples (array-like): Monte Carlo integration samples. + candidate_training_points (array-like): Candidate training points. + + Returns + ------- + array: The IMSE values for the candidate training points. + """ self.gp_model_for_doe.set_XY( - X_train, - np.zeros((X_train.shape[0], 1)), + x_train, + np.zeros((x_train.shape[0], 1)), ) _, pred_var = self.gp_model_for_doe.predict(mci_samples) - n_theta = X_train.shape[1] + n_theta = x_train.shape[1] beta = 2.0 * n_theta imse = np.zeros((candidate_training_points.shape[0], 1)) for i, candidate in enumerate(candidate_training_points): @@ -48,17 +129,30 @@ def _imse_w_approximation(self, X_train, mci_samples, candidate_training_points) ) return imse - def select_training_points(self, X_train, n_points, mci_samples, n_candidates): - dimension = X_train.shape[1] + def select_training_points(self, x_train, n_points, mci_samples, n_candidates): + """ + Select new training points based on the IMSE criterion. + + Parameters + ---------- + X_train (array-like): The current training data. + n_points (int): The number of new training points to select. + mci_samples (array-like): Monte Carlo integration samples. + n_candidates (int): The number of candidate training points to generate. + + Returns + ------- + array: The selected new training points. + """ + dimension = x_train.shape[1] for _ in range(n_points): lhs = LatinHypercubeSampling( n_samples=n_candidates, n_dimensions=dimension ) candidate_training_points = lhs.generate(self.domain) imse = self._imse_w_approximation( - X_train, mci_samples, candidate_training_points + x_train, mci_samples, candidate_training_points ) next_training_point = candidate_training_points[np.argmax(imse)] - X_train = np.vstack((X_train, next_training_point)) - new_training_points = X_train[-n_points:, :] - return new_training_points + x_train = np.vstack((x_train, next_training_point)) + return x_train[-n_points:, :] diff --git a/modules/performUQ/common/common_datamodels.py b/modules/performUQ/common/common_datamodels.py index 57538ecdc..f0afa9ad5 100644 --- a/modules/performUQ/common/common_datamodels.py +++ b/modules/performUQ/common/common_datamodels.py @@ -1,114 +1,194 @@ +""" +Defines data models for the performUQ application using Pydantic. + +Classes: + EDPItem: Represents an Engineering Demand Parameter item. + CorrelationValue: Represents a correlation value. + ApplicationData: Represents application data. + FEM_App: Represents a FEM application. + UQ_App: Represents a UQ application. + Applications: Represents a collection of applications. + GP_AB_UQData: Represents GP_AB_UQ data. + Model: Represents the main model. +""" + +from __future__ import annotations + import sys -from pathlib import Path -from typing import Any, Dict, List, Literal, Optional +from typing import TYPE_CHECKING, Any, Literal, Optional + +if TYPE_CHECKING: + from pathlib import Path from pydantic import BaseModel, Field, validator sys.path.append( - "/Users/aakash/SimCenter/SimCenterBackendApplications/modules/performUQ/common" + '/Users/aakash/SimCenter/SimCenterBackendApplications/modules/performUQ/common' ) class EDPItem(BaseModel): - name: str - type: Literal["scalar", "field"] - length: int = Field(gt=0) + """ + Represents an EDP (Engineering Demand Parameter) item. + Attributes + ---------- + name (str): The name of the EDP item. + type (Literal["scalar", "field"]): The type of the EDP item, either "scalar" or "field". + length (int): The length of the EDP item, must be greater than 0. + """ -# class EDP(BaseModel): -# __root__: list[EDPItem] + name: str + type: Literal['scalar', 'field'] + length: int = Field(gt=0) class CorrelationValue(BaseModel): - values: float = Field(..., ge=-1, le=1) + """ + Represents a correlation value. - # class CorrelationMatrix(BaseModel): - # CorrelationMatrix: list[CorrelationValue] + Attributes + ---------- + values (float): The correlation value, must be between -1 and 1. + """ - # class CorrelationMatrix(BaseModel): - # __root__: Annotated[ - # List[Annotated[float, Field(ge=-1, le=1)]], Field() - # ] # List of floats, each between -1 and 1 + values: float = Field(..., ge=-1, le=1) - # @model_validator(mode="after") - # def check_square_matrix(cls, model): - # matrix = model.correlationMatrix # Access the matrix directly from the model - # if matrix: - # length = len(matrix) - # sqrt_len = math.isqrt(length) # Integer square root - # if sqrt_len * sqrt_len != length: - # raise ValueError( - # "The length of the correlationMatrix must be a perfect square." - # ) - # return model - # @classmethod - # def from_list(cls, correlation_matrix: List[float]): - # """Construct the model directly from a list.""" - # return cls(correlationMatrix=correlation_matrix) +class ApplicationData(BaseModel): + """ + Represents application data. + Attributes + ---------- + MS_Path (Path): The path to the MS. + mainScript (str): The main script name. + postprocessScript (str): The post-process script name. + """ -class ApplicationData(BaseModel): MS_Path: Path - mainScript: str - postprocessScript: str + mainScript: str # noqa: N815 + postprocessScript: str # noqa: N815 class FEM_App(BaseModel): + """ + Represents a FEM (Finite Element Method) application. + + Attributes + ---------- + Application (str): The name of the application. + ApplicationData (ApplicationData): The application data. + """ + Application: str ApplicationData: ApplicationData class UQ_App(BaseModel): + """ + Represents a UQ (Uncertainty Quantification) application. + + Attributes + ---------- + Application (str): The name of the application. + ApplicationData (dict[str, Any]): The application data. + """ + Application: str ApplicationData: dict[str, Any] class Applications(BaseModel): + """ + Represents a collection of applications. + + Attributes + ---------- + FEM (FEM_App): The FEM application. + UQ (UQ_App): The UQ application. + """ + FEM: FEM_App UQ: UQ_App class GP_AB_UQData(BaseModel): + """ + Represents GP_AB_UQ data. + + Attributes + ---------- + calibration_data_file_name (str): The name of the calibration data file. + calibration_data_path (Path): The path to the calibration data. + log_likelihood_file_name (Optional[str]): The name of the log likelihood file. + log_likelihood_path (Optional[Path]): The path to the log likelihood file. + sample_size (int): The sample size. + seed (int): The seed value. + """ + calibration_data_file_name: str calibration_data_path: Path - log_likelihood_file_name: Optional[str] - log_likelihood_path: Optional[Path] + log_likelihood_file_name: str | None + log_likelihood_path: Path | None sample_size: int seed: int -# class Model(BaseModel): -# Applications: Applications -# EDP: EDP -# FEM: dict[str, Any] -# UQ: GP_AB_UQData -# correlationMatrix: CorrelationMatrix -# localAppDir: str -# randomVariables: list[dict[str, Any]] -# remoteAppDir: str -# runType: str -# workingDir: str - - class Model(BaseModel): + """ + Represents the main model. + + Attributes + ---------- + Applications (Applications): The applications. + EDP (List[EDPItem]): The list of EDP items. + FEM (Dict[str, str]): The FEM data. + UQ (GP_AB_UQData): The UQ data. + WorkflowType (str): The workflow type. + correlationMatrix (List[float]): The correlation matrix. + localAppDir (str): The local application directory. + randomVariables (list[dict[str, Any]]): The list of random variables. + remoteAppDir (str): The remote application directory. + resultType (str): The result type. + runDir (str): The run directory. + runType (str): The run type. + summary (List[str]): The summary. + workingDir (str): The working directory. + """ + Applications: Applications - EDP: List[EDPItem] - FEM: Dict[str, str] # Empty dict + EDP: list[EDPItem] + FEM: dict[str, str] # Empty dict UQ: GP_AB_UQData WorkflowType: str - correlationMatrix: List[float] - localAppDir: str - randomVariables: list[dict[str, Any]] - remoteAppDir: str - resultType: str - runDir: str - runType: str - summary: List[str] - workingDir: str - - @validator("correlationMatrix", each_item=True) - def check_correlation_matrix(cls, v): + correlationMatrix: list[float] # noqa: N815 + localAppDir: str # noqa: N815 + randomVariables: list[dict[str, Any]] # noqa: N815 + remoteAppDir: str # noqa: N815 + resultType: str # noqa: N815 + runDir: str # noqa: N815 + runType: str # noqa: N815 + summary: list[str] + workingDir: str # noqa: N815 + + @validator('correlationMatrix', each_item=True) + def check_correlation_matrix(cls, v): # noqa: N805 + """ + Validate each item in the correlation matrix. + + Args: + v (float): The correlation value. + + Raises + ------ + ValueError: If the correlation value is not between -1 and 1. + + Returns + ------- + float: The validated correlation value. + """ if not (-1 <= v <= 1): - raise ValueError("Each correlation value must be between -1 and 1.") + error_message = 'Each correlation value must be between -1 and 1.' + raise ValueError(error_message) return v diff --git a/modules/performUQ/common/convergence_metrics.py b/modules/performUQ/common/convergence_metrics.py index 5ccf2343d..ad3f06bb3 100644 --- a/modules/performUQ/common/convergence_metrics.py +++ b/modules/performUQ/common/convergence_metrics.py @@ -1,3 +1,10 @@ +""" +Functions to calculate various convergence metrics. + +These include generalized KL divergence (GKL), generalized MAP (GMAP), and +Leave-One-Out Normalized Root Mean Square Error (LOO NRMSE). +""" + import numpy as np from scipy.optimize import minimize from scipy.special import logsumexp @@ -6,8 +13,29 @@ def _calculate_normalization_constants( current_function_values, previous_function_values ): - # Define the objective function to minimize the difference from 1 for c1 and c2 + """ + Calculate normalization constants for the current and previous function values. + + Args: + current_function_values (np.ndarray): The current function values. + previous_function_values (np.ndarray): The previous function values. + + Returns + ------- + tuple: A tuple containing the optimized alpha_1 and alpha_2 values. + """ + def objective_function(log_alphas): + """ + Objective function to minimize the difference from 1 for c1 and c2. + + Args: + log_alphas (list): List containing log(alpha_1) and log(alpha_2). + + Returns + ------- + float: The value of the objective function. + """ log_alpha_1, log_alpha_2 = log_alphas numerator_current = current_function_values - log_alpha_1 @@ -36,7 +64,7 @@ def objective_function(log_alphas): initial_guess = [initial_log_alpha_1, initial_log_alpha_2] # Perform the optimization - result = minimize(objective_function, initial_guess, method="BFGS") + result = minimize(objective_function, initial_guess, method='BFGS') # Extract optimized alpha_1 and alpha_2 optimized_log_alpha_1, optimized_log_alpha_2 = result.x @@ -49,6 +77,18 @@ def objective_function(log_alphas): def _calculate_kl_divergence( current_log_target_function, previous_log_target_function, samples ): + """ + Calculate the KL divergence between the current and previous log target functions. + + Args: + current_log_target_function (callable): The current log target function. + previous_log_target_function (callable): The previous log target function. + samples (np.ndarray): The samples to evaluate the functions. + + Returns + ------- + float: The KL divergence estimate. + """ current_function_values = current_log_target_function(samples) previous_function_values = previous_log_target_function(samples) alpha_1, alpha_2 = _calculate_normalization_constants( @@ -76,18 +116,30 @@ def _calculate_kl_divergence( ) ) - return kl_divergence_estimate + return kl_divergence_estimate # noqa: RET504 def calculate_gkl( current_log_target_function, previous_log_target_function, samples ): + """ + Calculate the generalized KL divergence (GKL). + + Args: + current_log_target_function (callable): The current log target function. + previous_log_target_function (callable): The previous log target function. + samples (np.ndarray): The samples to evaluate the functions. + + Returns + ------- + float: The GKL value. + """ kl_divergence_estimate = _calculate_kl_divergence( current_log_target_function, previous_log_target_function, samples ) n_theta = np.shape(samples)[1] gkl = 1 / n_theta * kl_divergence_estimate - return gkl + return gkl # noqa: RET504 def calculate_gmap( @@ -96,13 +148,26 @@ def calculate_gmap( samples, prior_variances, ): + """ + Calculate the generalized MAP (GMAP). + + Args: + current_log_target_function (callable): The current log target function. + previous_log_target_function (callable): The previous log target function. + samples (np.ndarray): The samples to evaluate the functions. + prior_variances (np.ndarray): The prior variances. + + Returns + ------- + float: The GMAP value. + """ current_function_values = current_log_target_function(samples) previous_function_values = previous_log_target_function(samples) current_map = np.argmax(current_function_values) previous_map = np.argmax(previous_function_values) gmap = np.sqrt(np.sum((current_map - previous_map) ** 2 / prior_variances)) - return gmap + return gmap # noqa: RET504 def calculate_loo_nrmse_w( @@ -110,6 +175,18 @@ def calculate_loo_nrmse_w( gp_surrogate_model_prediction, weights=None, ): + """ + Calculate the Leave-One-Out Normalized Root Mean Square Error (LOO NRMSE) with weights. + + Args: + loo_predictions (np.ndarray): The LOO predictions. + gp_surrogate_model_prediction (np.ndarray): The GP surrogate model predictions. + weights (np.ndarray, optional): The weights for the predictions. Defaults to None. + + Returns + ------- + np.ndarray: The LOO NRMSE values. + """ if weights is None: weights = np.ones_like(loo_predictions) normalized_weights = weights / np.sum(weights) @@ -117,4 +194,4 @@ def calculate_loo_nrmse_w( loo_predictions - gp_surrogate_model_prediction, axis=1, keepdims=True ) / np.linalg.norm(gp_surrogate_model_prediction, axis=1, keepdims=True) g_cv = normalized_weights * g_i / len(loo_predictions) - return g_cv + return g_cv # noqa: RET504 diff --git a/modules/performUQ/common/gp_ab_algorithm.py b/modules/performUQ/common/gp_ab_algorithm.py index 09c53013a..801a7e721 100644 --- a/modules/performUQ/common/gp_ab_algorithm.py +++ b/modules/performUQ/common/gp_ab_algorithm.py @@ -1,3 +1,10 @@ +""" +Module implementing the GP-AB Algorithm for Bayesian calibration. + +It includes classes and functions for performing Gaussian Process modeling, +Principal Component Analysis, and various convergence metrics. +""" + import importlib import json import os @@ -7,6 +14,8 @@ from pathlib import Path from typing import Literal +import common_datamodels +import convergence_metrics import numpy as np import pydantic @@ -14,35 +23,96 @@ # "/Users/aakash/SimCenter/SimCenterBackendApplications/modules/performUQ/common" # ) import uq_utilities -from scipy.stats import invgamma, norm - -import common_datamodels -import convergence_metrics from adaptive_doe import AdaptiveDesignOfExperiments from gp_model import GaussianProcessModel from principal_component_analysis import PrincipalComponentAnalysis +from scipy.stats import invgamma, norm from space_filling_doe import LatinHypercubeSampling from tmcmc import TMCMC, calculate_warm_start_stage def log_likelihood(prediction_error_vector, prediction_error_variance): + """ + Calculate the log-likelihood of the prediction errors given the variance. + + Args: + prediction_error_vector (np.ndarray): The vector of prediction errors. + prediction_error_variance (float): The variance of the prediction errors. + + Returns + ------- + float: The log-likelihood value. + """ return np.sum( norm.logpdf(prediction_error_vector, 0, np.sqrt(prediction_error_variance)) ) def _response_approximation(current_gp, current_pca, model_parameters): + """ + Approximate the response using the current GP model and PCA. + + Args: + current_gp (GaussianProcessModel): The current Gaussian Process model. + current_pca (PrincipalComponentAnalysis): The current PCA model. + model_parameters (np.ndarray): The model parameters. + + Returns + ------- + np.ndarray: The approximated response. + """ latent_predictions, _ = current_gp.predict(model_parameters) gp_prediction = current_pca.project_back_to_original_space(latent_predictions) - return gp_prediction + return gp_prediction # noqa: RET504 class GP_AB_Algorithm: - def __init__( + """ + A class to represent the GP-AB Algorithm for Bayesian calibration. + + Attributes + ---------- + data (np.ndarray): The observed data. + input_dimension (int): The input dimension of the model. + output_dimension (int): The output dimension of the model. + output_length_list (list[int]): The list of output lengths. + domain (list[tuple[float, float]]): The domain for each dimension. + prior_variances (np.ndarray): The prior variances. + prior_pdf_function (callable): The prior PDF function. + log_likelihood_function (callable): The log-likelihood function. + pca_threshold (float): The threshold for PCA. + gkl_threshold (float): The threshold for GKL. + gmap_threshold (float): The threshold for GMAP. + max_simulations (int): The maximum number of simulations. + max_computational_time (float): The maximum computational time. + start_time (float): The start time of the algorithm. + converged (bool): Whether the algorithm has converged. + budget_exceeded (bool): Whether the budget has been exceeded. + terminate (bool): Whether to terminate the algorithm. + num_experiments (list[int]): The number of experiments. + num_recalibration_experiments (int): The number of recalibration experiments. + recalibration_ratio (float): The recalibration ratio. + sample_transformation_function (callable): The sample transformation function. + model_evaluation_function (callable): The model evaluation function. + run_type (str): The run type (e.g., "runningLocal"). + parallel_pool (uq_utilities.ParallelPool): The parallel pool instance. + parallel_evaluation_function (callable): The parallel evaluation function. + loocv_threshold (float): The threshold for LOOCV. + results (dict): The results dictionary. + current_gp_model (GaussianProcessModel): The current GP model. + current_pca (PrincipalComponentAnalysis): The current PCA model. + samples_dict (dict): The dictionary of samples. + betas_dict (dict): The dictionary of betas. + log_likelihoods_dict (dict): The dictionary of log-likelihoods. + log_target_density_values_dict (dict): The dictionary of log-target density values. + log_evidence_dict (dict): The dictionary of log evidence. + """ + + def __init__( # noqa: PLR0913 self, data, output_length_list, - output_names_list, + output_names_list, # noqa: ARG002 input_dimension, output_dimension, domain, @@ -54,12 +124,36 @@ def __init__( max_simulations=np.inf, max_computational_time=np.inf, pca_threshold=0.999, - run_type="runningLocal", + run_type='runningLocal', loocv_threshold=0.2, recalibration_ratio=0.1, gkl_threshold=0.01, gmap_threshold=0.01, ): + """ + Initialize the GP_AB_Algorithm class. + + Args: + data (np.ndarray): The observed data. + output_length_list (list[int]): The list of output lengths. + output_names_list (list[str]): The list of output names. + input_dimension (int): The input dimension of the model. + output_dimension (int): The output dimension of the model. + domain (list[tuple[float, float]]): The domain for each dimension. + model_evaluation_function (callable): The model evaluation function. + sample_transformation_function (callable): The sample transformation function. + prior_pdf_function (callable): The prior PDF function. + log_likelihood_function (callable): The log-likelihood function. + prior_variances (np.ndarray): The prior variances. + max_simulations (int, optional): The maximum number of simulations. Defaults to np.inf. + max_computational_time (float, optional): The maximum computational time. Defaults to np.inf. + pca_threshold (float, optional): The threshold for PCA. Defaults to 0.999. + run_type (str, optional): The run type (e.g., "runningLocal"). Defaults to "runningLocal". + loocv_threshold (float, optional): The threshold for LOOCV. Defaults to 0.2. + recalibration_ratio (float, optional): The recalibration ratio. Defaults to 0.1. + gkl_threshold (float, optional): The threshold for GKL. Defaults to 0.01. + gmap_threshold (float, optional): The threshold for GMAP. Defaults to 0.01. + """ self.data = data self.input_dimension = input_dimension self.output_dimension = output_dimension @@ -100,42 +194,82 @@ def __init__( self.loocv_threshold = loocv_threshold - self.results = dict() + self.results = {} self.current_gp_model = GaussianProcessModel( self.input_dimension, 1, ARD=True ) self.current_pca = PrincipalComponentAnalysis(self.pca_threshold) - self.samples_dict = dict() - self.betas_dict = dict() - self.log_likelihoods_dict = dict() - self.log_target_density_values_dict = dict() - self.log_evidence_dict = dict() + self.samples_dict = {} + self.betas_dict = {} + self.log_likelihoods_dict = {} + self.log_target_density_values_dict = {} + self.log_evidence_dict = {} def _evaluate_in_parallel(self, func, samples): + """ + Evaluate the model in parallel using the provided function and samples. + + Args: + func (callable): The function to evaluate. + samples (np.ndarray): The samples to evaluate. + + Returns + ------- + np.ndarray: The evaluated outputs. + """ transformed_samples = self.sample_transformation_function(samples) simulation_numbers = np.arange(len(samples)) iterable = zip(simulation_numbers, transformed_samples) outputs = np.atleast_2d( list(self.parallel_evaluation_function(func, iterable)) ) - return outputs + return outputs # noqa: RET504 def _perform_initial_doe(self, n_samples): + """ + Perform the initial Design of Experiments (DoE) using Latin Hypercube Sampling. + + Args: + n_samples (int): The number of samples to generate. + + Returns + ------- + np.ndarray: The generated samples. + """ self.initial_doe = LatinHypercubeSampling( n_samples=n_samples, n_dimensions=self.input_dimension ) samples = self.initial_doe.generate(self.domain) - return samples + return samples # noqa: RET504 def _get_initial_training_set(self, n_samples): + """ + Get the initial training set by performing DoE and evaluating the model. + + Args: + n_samples (int): The number of samples to generate. + + Returns + ------- + tuple: A tuple containing the inputs and outputs of the initial training set. + """ inputs = self._perform_initial_doe(n_samples) outputs = self._evaluate_in_parallel(self.model_evaluation_function, inputs) - # initial_training_set = np.hstack((inputs, outputs)) - # return initial_training_set return inputs, outputs def _log_likelihood_approximation(self, response_approximation, samples): + """ + Approximate the log-likelihood for the given samples. + + Args: + response_approximation (callable): The response approximation function. + samples (np.ndarray): The samples to evaluate. + + Returns + ------- + np.ndarray: The approximated log-likelihood values. + """ u_values = samples[:, : self.input_dimension] model_parameters = self.sample_transformation_function(u_values).reshape( 1, -1 @@ -164,36 +298,68 @@ def _log_likelihood_approximation(self, response_approximation, samples): return np.array(log_likes).reshape((num_samples, 1)) def _log_prior_pdf(self, samples): + """ + Calculate the log-prior PDF for the given samples. + + Args: + samples (np.ndarray): The samples to evaluate. + + Returns + ------- + np.ndarray: The log-prior PDF values. + """ u_values = samples[:, : self.input_dimension] model_parameters = self.sample_transformation_function(u_values) log_prior_model_parameters = np.log( self.prior_pdf_function(model_parameters) ).reshape((-1, 1)) - # TODO: (ABS) Decide what prior to use for q q = samples[:, self.input_dimension :] log_prior_q = np.sum(invgamma.logpdf(q, 1, scale=0.5), axis=1) prior_log_pdf = log_prior_model_parameters + log_prior_q - return prior_log_pdf + return prior_log_pdf # noqa: RET504 def _log_posterior_approximation(self, samples, log_likelihoods): + """ + Approximate the log-posterior for the given samples and log-likelihoods. + + Args: + samples (np.ndarray): The samples to evaluate. + log_likelihoods (np.ndarray): The log-likelihood values. + + Returns + ------- + np.ndarray: The approximated log-posterior values. + """ log_prior = self._log_prior_pdf(samples) log_posterior = log_likelihoods + log_prior - return log_posterior + return log_posterior # noqa: RET504 def loocv_measure(self, log_likelihood_approximation): - lls = log_likelihood_approximation(self.inputs) + """ + Calculate the Leave-One-Out Cross-Validation (LOOCV) measure. - # weights = ( - # 2 / 3 * np.ones((self.database.shape[0], 1)) - # + 1 / 3 * self.adaptive_weights_per_stage - # ) + Args: + log_likelihood_approximation (callable): The log-likelihood approximation function. + + Returns + ------- + float: The LOOCV measure. + """ + lls = log_likelihood_approximation(self.inputs) loo_predictions = self.current_gp_model.loo_predictions(self.outputs) loocv_measure = convergence_metrics.calculate_loo_nrmse_w( loo_predictions, lls ) - return loocv_measure + return loocv_measure # noqa: RET504 def calibrate_gp(self, model_parameters, model_outputs): + """ + Calibrate the Gaussian Process (GP) model using the given parameters and outputs. + + Args: + model_parameters (np.ndarray): The model parameters. + model_outputs (np.ndarray): The model outputs. + """ latent_outputs = self.current_pca.project_to_latent_space(model_outputs) self.num_pca_components_list.append(np.shape(latent_outputs)[1]) self.current_gp_model = GaussianProcessModel( @@ -203,6 +369,16 @@ def calibrate_gp(self, model_parameters, model_outputs): self.num_recalibration_experiments = self.num_experiments[-1] def run_iteration(self, k): + """ + Run a single iteration of the GP-AB Algorithm. + + Args: + k (int): The iteration number. + + Returns + ------- + tuple: A tuple containing a boolean indicating whether to terminate and the current TMCMC results. + """ self.iteration_number = k model_parameters = self.inputs model_outputs = self.outputs @@ -233,13 +409,13 @@ def run_iteration(self, k): gp_prediction_latent_mean, gp_prediction_latent_variance = ( self.current_gp_model.predict(model_parameters) ) - gp_prediction_mean = self.current_pca.project_back_to_original_space( + gp_prediction_mean = self.current_pca.project_back_to_original_space( # noqa: F841 gp_prediction_latent_mean ) loo_predictions_latent_space = self.current_gp_model.loo_predictions( self.latent_outputs ) - loo_predictions = self.current_pca.project_back_to_original_space( + loo_predictions = self.current_pca.project_back_to_original_space( # noqa: F841 loo_predictions_latent_space ) @@ -258,11 +434,11 @@ def run_iteration(self, k): j_star = 0 beta = 0 - samples_dict = dict() - betas_dict = dict() - log_likelihoods_dict = dict() - log_target_density_values_dict = dict() - log_evidence_dict = dict() + samples_dict = {} + betas_dict = {} + log_likelihoods_dict = {} + log_target_density_values_dict = {} + log_evidence_dict = {} num_samples_per_stage = 50 if k > 0: @@ -272,7 +448,8 @@ def run_iteration(self, k): # log_likelihood_approximation, self.betas_dict # ) j_star = calculate_warm_start_stage( - log_likelihood_approximation, current_tmcmc_results + log_likelihood_approximation, + current_tmcmc_results, # noqa: F821 ) previous_log_posterior_approximation = log_posterior_approximation @@ -368,16 +545,33 @@ def run_iteration(self, k): return self.terminate, current_tmcmc_results def run_initial_doe(self, num_initial_doe_per_dim=2): + """ + Run the initial Design of Experiments (DoE). + + Args: + num_initial_doe_per_dim (int, optional): Number of initial DoE samples per dimension. Defaults to 2. + + Returns + ------- + tuple: A tuple containing the inputs, outputs, and number of initial DoE samples. + """ num_initial_doe_samples = num_initial_doe_per_dim * self.input_dimension inputs, outputs = self._get_initial_training_set(num_initial_doe_samples) return inputs, outputs, num_initial_doe_samples def write_results(self): - print(f"{self.iteration_number = }") - print("Results written to file") + """Write the results of the GP-AB Algorithm to a file.""" + # print(f'{self.iteration_number = }') + # print('Results written to file') def main(input_arguments): + """ + Run the GP-AB Algorithm. + + Args: + input_arguments (InputArguments): The input arguments for the algorithm. + """ gp_ab = GP_AB_Algorithm(*preprocess(input_arguments)) inputs, outputs, num_initial_doe_samples = gp_ab.run_initial_doe( num_initial_doe_per_dim=2 @@ -393,7 +587,17 @@ def main(input_arguments): def read_inputs(input_json_file): - with open(input_json_file, encoding="utf-8") as f: + """ + Read and parse the input JSON file. + + Args: + input_json_file (str): Path to the input JSON file. + + Returns + ------- + tuple: A tuple containing UQ inputs, random variables, correlation matrix, EDP inputs, and application inputs. + """ + with Path(input_json_file).open(encoding='utf-8') as f: inputs = json.load(f) # application_inputs = inputs["Applications"] @@ -406,7 +610,7 @@ def read_inputs(input_json_file): uq_inputs = input_data.UQ rv_inputs = input_data.randomVariables correlation_matrix_inputs = input_data.correlationMatrix - edp_inputs = inputs["EDP"] + edp_inputs = inputs['EDP'] application_inputs = input_data.Applications # application_inputs = common_datamodels.Applications.model_validate( @@ -429,6 +633,16 @@ def read_inputs(input_json_file): def preprocess(input_arguments): + """ + Preprocess the input arguments for the GP-AB Algorithm. + + Args: + input_arguments (InputArguments): The input arguments for the algorithm. + + Returns + ------- + tuple: A tuple containing the preprocessed data required for the GP-AB Algorithm. + """ ( uq_inputs, rv_inputs, @@ -440,8 +654,8 @@ def preprocess(input_arguments): data_file = ( uq_inputs.calibration_data_path / uq_inputs.calibration_data_file_name ) - with open(data_file, "r") as f: - data = np.genfromtxt(f, delimiter=",") + with data_file.open() as f: + data = np.genfromtxt(f, delimiter=',') log_likelihood_file_name = uq_inputs.log_likelihood_file_name log_likelihood_path = uq_inputs.log_likelihood_path @@ -449,14 +663,14 @@ def preprocess(input_arguments): if log_likelihood_file_name: sys.path.append(str(log_likelihood_path)) ll_module = importlib.import_module(log_likelihood_file_name) - log_likelihood_function = getattr(ll_module, "log_likelihood") + log_likelihood_function = ll_module.log_likelihood joint_distribution = uq_utilities.ERANatafJointDistribution( rv_inputs, correlation_matrix_inputs, ) domain = [(-3, 3) for _ in range(len(rv_inputs))] - prior_variances = [1 for _ in range(len(rv_inputs))] # TODO: (ABS) Validate this + prior_variances = [1 for _ in range(len(rv_inputs))] # TODO(ABS): Validate this # Transformation function from standard to physical space sample_transformation_function = joint_distribution.u_to_x @@ -471,16 +685,16 @@ def preprocess(input_arguments): list_of_dir_names_to_copy_files_from=[main_script_path], run_directory=input_arguments.path_to_working_directory, driver_filename=str(input_arguments.driver_file_name), - workdir_prefix="workdir", + workdir_prefix='workdir', ) model_evaluation_function = model.evaluate_model_once - edp_names_list = [edp["name"] for edp in edp_inputs] - edp_lengths_list = [edp["length"] for edp in edp_inputs] + edp_names_list = [edp['name'] for edp in edp_inputs] + edp_lengths_list = [edp['length'] for edp in edp_inputs] input_dimension = len(rv_inputs) output_dimension = sum( edp_lengths_list - ) # TODO: (ABS) Validate this against length of data + ) # TODO(ABS): Validate this against length of data max_simulations = np.inf max_computational_time = np.inf @@ -511,24 +725,41 @@ def preprocess(input_arguments): class InputArguments(pydantic.BaseModel): + """ + A class to represent the input arguments for the GP-AB Algorithm. + + Attributes + ---------- + path_to_working_directory : Path + The path to the working directory. + path_to_template_directory : Path + The path to the template directory. + run_type : Literal['runningLocal', 'runningRemote'] + The type of run (local or remote). + driver_file_name : Path + The name of the driver file. + input_json_file : Path + The path to the input JSON file. + """ + path_to_working_directory: Path path_to_template_directory: Path - run_type: Literal["runningLocal", "runningRemote"] + run_type: Literal['runningLocal', 'runningRemote'] driver_file_name: Path input_json_file: Path - model_config = pydantic.ConfigDict(revalidate_instances="always") + model_config = pydantic.ConfigDict(revalidate_instances='always') -if __name__ == "__main__": +if __name__ == '__main__': this = Path(__file__).resolve() - os.chdir("/Users/aakash/Documents/quoFEM/LocalWorkDir/tmp.SimCenter") + os.chdir('/Users/aakash/Documents/quoFEM/LocalWorkDir/tmp.SimCenter') args = { - "path_to_working_directory": Path(sys.argv[1]).resolve(), - "path_to_template_directory": Path(sys.argv[2]).resolve(), - "run_type": sys.argv[3], - "driver_file_name": Path(sys.argv[4]).resolve(), - "input_json_file": Path(sys.argv[5]).resolve(), + 'path_to_working_directory': Path(sys.argv[1]).resolve(), + 'path_to_template_directory': Path(sys.argv[2]).resolve(), + 'run_type': sys.argv[3], + 'driver_file_name': Path(sys.argv[4]).resolve(), + 'input_json_file': Path(sys.argv[5]).resolve(), } input_arguments = InputArguments.model_validate(args) main(input_arguments) diff --git a/modules/performUQ/common/gp_model.py b/modules/performUQ/common/gp_model.py index b25391918..3d8713b3c 100644 --- a/modules/performUQ/common/gp_model.py +++ b/modules/performUQ/common/gp_model.py @@ -1,10 +1,32 @@ +"""Module for creating and using Gaussian Process models with the GaussianProcessModel class.""" + import GPy import numpy as np from scipy.linalg import cho_solve class GaussianProcessModel: - def __init__(self, input_dimension, output_dimension, ARD): + """ + A class to represent a Gaussian Process Model. + + Attributes + ---------- + input_dimension (int): The input dimension of the model. + output_dimension (int): The output dimension of the model. + ARD (bool): Automatic Relevance Determination flag. + kernel (GPy.kern.Matern52): The kernel used in the model. + model (list[GPy.models.GPRegression]): The list of GP regression models. + """ + + def __init__(self, input_dimension, output_dimension, ARD): # noqa: N803 + """ + Initialize the GaussianProcessModel. + + Args: + input_dimension (int): The input dimension of the model. + output_dimension (int): The output dimension of the model. + ARD (bool): Automatic Relevance Determination flag. + """ self.input_dimension = input_dimension self.output_dimension = output_dimension self.ARD = ARD @@ -15,9 +37,23 @@ def __init__(self, input_dimension, output_dimension, ARD): self._fix_nugget_parameter() def _create_kernel(self): + """ + Create the kernel for the Gaussian Process. + + Returns + ------- + GPy.kern.Matern52: The Matern52 kernel. + """ return GPy.kern.Matern52(input_dim=self.input_dimension, ARD=self.ARD) def _create_surrogate(self): + """ + Create the surrogate GP regression models. + + Returns + ------- + list[GPy.models.GPRegression]: The list of GP regression models. + """ m_list = [] for _ in range(self.output_dimension): m_list += [ @@ -30,43 +66,77 @@ def _create_surrogate(self): return m_list def _add_nugget_parameter(self): + """Add a nugget parameter to the GP models to improve numerical stability.""" for i in range(self.output_dimension): self.model[i].Gaussian_noise.variance = 1e-6 def _fix_nugget_parameter(self, value=1e-8): + """ + Fix the nugget parameter to a specific value. + + Args: + value (float): The value to fix the nugget parameter to. + """ for i in range(self.output_dimension): self.model[i].likelihood.variance = value self.model[i].likelihood.variance.fix() - def fit(self, X_train, Y_train): + def fit(self, x_train, y_train): + """ + Fit the GP models to the training data. + + Args: + x_train (np.ndarray): The input training data. + y_train (np.ndarray): The output training data. + """ for i in range(self.output_dimension): - self.model[i].set_XY(X_train, np.reshape(Y_train[:, i], (-1, 1))) + self.model[i].set_XY(x_train, np.reshape(y_train[:, i], (-1, 1))) self.model[i].optimize() - def predict(self, X_predict): - Y_mean = np.zeros((X_predict.shape[0], self.output_dimension)) - Y_var = np.zeros((X_predict.shape[0], self.output_dimension)) + def predict(self, x_predict): + """ + Predict the output for the given input data. + + Args: + x_predict (np.ndarray): The input data for prediction. + + Returns + ------- + tuple: A tuple containing the mean and variance of the predictions. + """ + y_mean = np.zeros((x_predict.shape[0], self.output_dimension)) + y_var = np.zeros((x_predict.shape[0], self.output_dimension)) for i in range(self.output_dimension): - mean, var = self.model[i].predict(X_predict) - Y_mean[:, i] = mean.flatten() - Y_var[:, i] = var.flatten() + mean, var = self.model[i].predict(x_predict) + y_mean[:, i] = mean.flatten() + y_var[:, i] = var.flatten() + + return y_mean, y_var + + def loo_predictions(self, y_train): + """ + Calculate the Leave-One-Out (LOO) predictions. - return Y_mean, Y_var + Args: + y_train (np.ndarray): The output training data. - def loo_predictions(self, Y_train): - loo_pred = np.zeros_like(Y_train) + Returns + ------- + np.ndarray: The LOO predictions. + """ + loo_pred = np.zeros_like(y_train) for i in range(self.output_dimension): cholesky_factor = self.model[i].posterior.K_chol alpha = cho_solve( - (cholesky_factor, True), np.reshape(Y_train[:, i], (-1, 1)) + (cholesky_factor, True), np.reshape(y_train[:, i], (-1, 1)) ) cholesky_factor_inverse = cho_solve( (cholesky_factor, True), np.eye(cholesky_factor.shape[0]) ) - K_inv_diag = np.reshape( + k_inv_diag = np.reshape( np.sum(cholesky_factor_inverse**2, axis=1), (-1, 1) ) loo_pred[:, i] = ( - np.reshape(Y_train[:, i], (-1, 1)) - alpha / K_inv_diag + np.reshape(y_train[:, i], (-1, 1)) - alpha / k_inv_diag ).flatten() return loo_pred diff --git a/modules/performUQ/common/principal_component_analysis.py b/modules/performUQ/common/principal_component_analysis.py index e0b7a3643..55f80a100 100644 --- a/modules/performUQ/common/principal_component_analysis.py +++ b/modules/performUQ/common/principal_component_analysis.py @@ -1,9 +1,31 @@ +"""Provides a class for performing Principal Component Analysis (PCA) for dimensionality reduction.""" + import numpy as np from sklearn.decomposition import PCA class PrincipalComponentAnalysis: + """ + A class to perform Principal Component Analysis (PCA) for dimensionality reduction. + + Attributes + ---------- + pca_threshold (float): The threshold for the cumulative explained variance ratio to determine the number of components. + pca (PCA): The PCA model. + mean_vec (np.ndarray): The mean vector of the original data. + proj_matrix (np.ndarray): The projection matrix (PCA components). + eigenvalues (np.ndarray): The eigenvalues of the PCA components. + explained_variance_ratio (np.ndarray): The explained variance ratio of the PCA components. + n_components (int): The number of components to retain. + """ + def __init__(self, pca_threshold) -> None: + """ + Initialize the PrincipalComponentAnalysis class. + + Args: + pca_threshold (float): The threshold for the cumulative explained variance ratio to determine the number of components. + """ self.pca_threshold = pca_threshold self.pca = None @@ -14,6 +36,16 @@ def __init__(self, pca_threshold) -> None: self.n_components = None def project_to_latent_space(self, outputs): + """ + Perform PCA on the output data to reduce the dimensionality. + + Args: + outputs (np.ndarray): The output data to be projected to the latent space. + + Returns + ------- + np.ndarray: The latent outputs after PCA transformation. + """ # Perform PCA on the output data to reduce the dimensionality pca = PCA() pca.fit(outputs) @@ -25,7 +57,7 @@ def project_to_latent_space(self, outputs): ) # Perform PCA with the specified number of components and transform the output data - # TODO(ABS): Reimplement without the second PCA fit + # TODO (ABS): Reimplement without the second PCA fit self.pca = PCA(n_components=self.n_components) self.pca.fit(outputs) latent_outputs = self.pca.transform(outputs) @@ -39,10 +71,24 @@ def project_to_latent_space(self, outputs): return latent_outputs def project_back_to_original_space(self, latent_outputs): + """ + Inverse transform the latent outputs back to the original space using the stored PCA parameters. + + Args: + latent_outputs (np.ndarray): The latent outputs to be projected back to the original space. + + Returns + ------- + np.ndarray: The outputs in the original space. + + Raises + ------ + ValueError: If the PCA model has not been fitted yet. + """ # Check if the PCA model has been fitted if self.pca is None: - raise ValueError("No PCA model has been fitted yet.") - else: - # Inverse transform the latent outputs using the stored PCA parameters - outputs = self.pca.inverse_transform(latent_outputs) - return outputs + error_message = 'No PCA model has been fitted yet.' + raise ValueError(error_message) + # Inverse transform the latent outputs using the stored PCA parameters + outputs = self.pca.inverse_transform(latent_outputs) + return outputs # noqa: RET504 diff --git a/modules/performUQ/common/space_filling_doe.py b/modules/performUQ/common/space_filling_doe.py index 62b3f86b7..becd826fd 100644 --- a/modules/performUQ/common/space_filling_doe.py +++ b/modules/performUQ/common/space_filling_doe.py @@ -1,4 +1,10 @@ -from typing import Optional +""" +Provides a class for performing Latin Hypercube Sampling (LHS). + +This module generates space-filling designs. +""" + +from __future__ import annotations import numpy as np from pydantic import BaseModel, PositiveInt @@ -6,11 +12,31 @@ class LatinHypercubeSampling(BaseModel, validate_assignment=True): + """ + A class to perform Latin Hypercube Sampling (LHS) for generating space-filling designs. + + Attributes + ---------- + n_samples (PositiveInt): The number of samples to generate. + n_dimensions (PositiveInt): The number of dimensions for each sample. + seed (Optional[PositiveInt]): The seed for random number generation. + """ + n_samples: PositiveInt n_dimensions: PositiveInt - seed: Optional[PositiveInt] = None + seed: PositiveInt | None = None def generate(self, domain=None): + """ + Generate samples using Latin Hypercube Sampling. + + Args: + domain (list[tuple[float, float]], optional): The domain for each dimension. Defaults to None. + + Returns + ------- + np.ndarray: The generated samples. + """ lhs = qmc.LatinHypercube(self.n_dimensions, seed=self.seed) samples = lhs.random(self.n_samples) if domain is not None: @@ -21,13 +47,16 @@ def generate(self, domain=None): return samples -if __name__ == "__main__": - initial_doe = LatinHypercubeSampling(n_samples=10, n_dimensions=2) - print(repr(initial_doe)) - samples = initial_doe.generate() - print(samples) - domain = [(0, 10), (-5, 5)] # Example of domain for each dimension - scaled_samples = initial_doe.generate(domain) - print(scaled_samples) - # initial_doe.seed = 0 - # idoe = InitialDesignOfExperiments(n_samples=0, n_dimensions=0, seed=-1) +if __name__ == '__main__': + """ + Example usage of the LatinHypercubeSampling class. + """ + # initial_doe = LatinHypercubeSampling(n_samples=10, n_dimensions=2) + # print(repr(initial_doe)) + # samples = initial_doe.generate() + # print(samples) + # domain = [(0, 10), (-5, 5)] # Example of domain for each dimension + # scaled_samples = initial_doe.generate(domain) + # print(scaled_samples) + # # initial_doe.seed = 0 + # # idoe = InitialDesignOfExperiments(n_samples=0, n_dimensions=0, seed=-1) diff --git a/modules/performUQ/common/tmcmc.py b/modules/performUQ/common/tmcmc.py index 4edc8a1d4..a375db6e5 100644 --- a/modules/performUQ/common/tmcmc.py +++ b/modules/performUQ/common/tmcmc.py @@ -1,3 +1,5 @@ +"""Implementation of the Transitional Markov Chain Monte Carlo (TMCMC) algorithm.""" + import math import numpy as np @@ -7,17 +9,41 @@ def _calculate_weights_warm_start( beta, current_loglikelihoods, previous_loglikelihoods ): + """ + Calculate the weights for the warm start stage. + + Args: + beta (float): The current beta value. + current_loglikelihoods (np.ndarray): The current log-likelihood values. + previous_loglikelihoods (np.ndarray): The previous log-likelihood values. + + Returns + ------- + np.ndarray: The calculated weights. + """ log_weights = beta * (current_loglikelihoods - previous_loglikelihoods) log_sum_weights = logsumexp(log_weights) normalized_log_weights = log_weights - log_sum_weights normalized_weights = np.exp(normalized_log_weights) weights = normalized_weights / np.sum(normalized_weights) - return weights + return weights # noqa: RET504 def calculate_warm_start_stage( current_loglikelihood_approximation, previous_results, threshold_cov=1 ): + """ + Calculate the warm start stage number based on the coefficient of variation of weights. + + Args: + current_loglikelihood_approximation (callable): Function to approximate current log-likelihoods. + previous_results (tuple): The previous results containing samples, betas, and log-likelihoods. + threshold_cov (float, optional): The threshold for the coefficient of variation. Defaults to 1. + + Returns + ------- + int: The stage number for the warm start. + """ stage_nums = sorted(previous_results[0].keys(), reverse=True) for stage_num in stage_nums: current_loglikelihoods = current_loglikelihood_approximation( @@ -35,22 +61,56 @@ def calculate_warm_start_stage( def _calculate_weights(beta_increment, log_likelihoods): + """ + Calculate the weights for the given beta increment and log-likelihoods. + + Args: + beta_increment (float): The increment in beta. + log_likelihoods (np.ndarray): The log-likelihood values. + + Returns + ------- + np.ndarray: The calculated weights. + """ log_weights = beta_increment * log_likelihoods log_sum_weights = logsumexp(log_weights) normalized_log_weights = log_weights - log_sum_weights normalized_weights = np.exp(normalized_log_weights) weights = normalized_weights / np.sum(normalized_weights) - return weights + return weights # noqa: RET504 def _calculate_log_evidence(beta_increment, log_likelihoods): + """ + Calculate the log evidence for the given beta increment and log-likelihoods. + + Args: + beta_increment (float): The increment in beta. + log_likelihoods (np.ndarray): The log-likelihood values. + + Returns + ------- + float: The calculated log evidence. + """ log_evidence = logsumexp(beta_increment * log_likelihoods) - np.log( len(log_likelihoods) ) - return log_evidence + return log_evidence # noqa: RET504 def _increment_beta(log_likelihoods, beta, threshold_cov=1): + """ + Increment the beta value based on the coefficient of variation of weights. + + Args: + log_likelihoods (np.ndarray): The log-likelihood values. + beta (float): The current beta value. + threshold_cov (float, optional): The threshold for the coefficient of variation. Defaults to 1. + + Returns + ------- + float: The new beta value. + """ if beta >= 1: return 1 beta_increment = 1 - beta @@ -62,16 +122,41 @@ def _increment_beta(log_likelihoods, beta, threshold_cov=1): cov_weights = np.nanstd(weights) / np.nanmean(weights) proposed_beta = beta + beta_increment new_beta = min(proposed_beta, 1) - return new_beta + return new_beta # noqa: RET504 def _get_scaled_proposal_covariance(samples, weights, scale_factor=0.2): + """ + Calculate the scaled proposal covariance matrix. + + Args: + samples (np.ndarray): The samples. + weights (np.ndarray): The weights. + scale_factor (float, optional): The scale factor. Defaults to 0.2. + + Returns + ------- + np.ndarray: The scaled proposal covariance matrix. + """ return scale_factor**2 * np.cov( samples, rowvar=False, aweights=weights.flatten() ) class TMCMC: + """ + A class to perform Transitional Markov Chain Monte Carlo (TMCMC) sampling. + + Attributes + ---------- + _log_likelihood_approximation (callable): Function to approximate log-likelihoods. + _log_posterior_approximation (callable): Function to approximate log-posterior densities. + num_steps (int): The number of steps for each stage. + threshold_cov (float): The threshold for the coefficient of variation. + thinning_factor (int): The thinning factor for the MCMC chain. + adapt_frequency (int): The frequency of adaptation for the proposal distribution. + """ + def __init__( self, log_likelihood_approximation_function, @@ -81,6 +166,17 @@ def __init__( thinning_factor=10, adapt_frequency=100, ): + """ + Initialize the TMCMC class. + + Args: + log_likelihood_approximation_function (callable): Function to approximate log-likelihoods. + log_target_density_approximation_function (callable): Function to approximate log-posterior densities. + threshold_cov (float, optional): The threshold for the coefficient of variation. Defaults to 1. + num_steps (int, optional): The number of steps for each stage. Defaults to 1. + thinning_factor (int, optional): The thinning factor for the MCMC chain. Defaults to 10. + adapt_frequency (int, optional): The frequency of adaptation for the proposal distribution. Defaults to 100. + """ self._log_likelihood_approximation = log_likelihood_approximation_function self._log_posterior_approximation = log_target_density_approximation_function @@ -100,9 +196,29 @@ def _run_one_stage( log_target_density_function, scale_factor, target_acceptance_rate, - do_thinning=False, + do_thinning=False, # noqa: FBT002 burn_in_steps=0, ): + """ + Run one stage of the TMCMC algorithm. + + Args: + samples (np.ndarray): The samples. + log_likelihoods (np.ndarray): The log-likelihood values. + log_target_density_values (np.ndarray): The log-target density values. + beta (float): The current beta value. + rng (np.random.Generator): The random number generator. + log_likelihood_function (callable): Function to calculate log-likelihoods. + log_target_density_function (callable): Function to calculate log-target densities. + scale_factor (float): The scale factor for the proposal distribution. + target_acceptance_rate (float): The target acceptance rate for the MCMC chain. + do_thinning (bool, optional): Whether to perform thinning. Defaults to False. + burn_in_steps (int, optional): The number of burn-in steps. Defaults to 0. + + Returns + ------- + tuple: A tuple containing the new samples, new log-likelihoods, new log-target density values, new beta, and log evidence. + """ new_beta = _increment_beta(log_likelihoods, beta) log_evidence = _calculate_log_evidence(new_beta - beta, log_likelihoods) weights = _calculate_weights(new_beta - beta, log_likelihoods) @@ -124,7 +240,7 @@ def _run_one_stage( n_adapt = 1 step_count = 0 for k in range(burn_in_steps + num_samples): - print(f"{k=}") + # print(f'{k=}') index = rng.choice(num_samples, p=weights.flatten()) if k >= burn_in_steps: if new_beta == 1 or do_thinning: @@ -196,11 +312,28 @@ def run( stage_num, num_burn_in=0, ): + """ + Run the TMCMC algorithm. + + Args: + samples_dict (dict): Dictionary of samples for each stage. + betas_dict (dict): Dictionary of beta values for each stage. + log_likelihoods_dict (dict): Dictionary of log-likelihood values for each stage. + log_target_density_values_dict (dict): Dictionary of log-target density values for each stage. + log_evidence_dict (dict): Dictionary of log evidence values for each stage. + rng (np.random.Generator): The random number generator. + stage_num (int): The current stage number. + num_burn_in (int, optional): The number of burn-in steps. Defaults to 0. + + Returns + ------- + tuple: A tuple containing the updated dictionaries for samples, betas, log-likelihoods, log-target density values, and log evidence. + """ self.num_dimensions = samples_dict[0].shape[1] self.target_acceptance_rate = 0.23 + 0.21 / self.num_dimensions self.scale_factor = 2.4 / np.sqrt(self.num_dimensions) while betas_dict[stage_num] < 1: - print(f"Stage {stage_num}") + # print(f'Stage {stage_num}') ( new_samples, new_log_likelihoods, From f7ded1ee51e552e9bd9a1a526b05181ef4f84a5b Mon Sep 17 00:00:00 2001 From: bsaakash <11618528+bsaakash@users.noreply.github.com> Date: Mon, 30 Sep 2024 15:37:44 -0700 Subject: [PATCH 3/3] abs - formatting files --- .../shakeMapEvent/shakeMapEvent.py | 21 +++---- .../GISSpecifiedEvents/GISSpecifiedEvent.py | 21 +++---- .../GISSpecifiedEvents/RasterEvent.py | 56 ++++++++----------- .../NearestNeighborEvents/NNE.py | 43 +++++++++----- 4 files changed, 73 insertions(+), 68 deletions(-) diff --git a/modules/createEVENT/shakeMapEvent/shakeMapEvent.py b/modules/createEVENT/shakeMapEvent/shakeMapEvent.py index 7956743a4..d6f21e1b5 100644 --- a/modules/createEVENT/shakeMapEvent/shakeMapEvent.py +++ b/modules/createEVENT/shakeMapEvent/shakeMapEvent.py @@ -45,10 +45,9 @@ def create_shakemap_event(eventDirectory, eventPath, IMTypes): # noqa: D103 + IMTypesList = eval(IMTypes) - IMTypesList = eval(IMTypes) - - print("Creating shakemap event") + print('Creating shakemap event') xml_file_path = Path(eventDirectory) / eventPath / 'grid.xml' @@ -69,7 +68,7 @@ def create_shakemap_event(eventDirectory, eventPath, IMTypes): # noqa: D103 lon, lat = float(values[0]), float(values[1]) point = Point(lon, lat) points.append(point) - + # Store only the specified attributes attr = {} attribute_mapping = { @@ -78,37 +77,35 @@ def create_shakemap_event(eventDirectory, eventPath, IMTypes): # noqa: D103 'MMI': 4, 'PSA03': 5, 'PSA10': 6, - 'PSA30': 7 + 'PSA30': 7, } - + for im_type in IMTypesList: if im_type in attribute_mapping: attr[im_type] = float(values[attribute_mapping[im_type]]) - + attributes.append(attr) # Create GeoDataFrame - gdf = gpd.GeoDataFrame(attributes, geometry=points, crs="EPSG:4326") + gdf = gpd.GeoDataFrame(attributes, geometry=points, crs='EPSG:4326') # Display the first few rows - print("Saving shakemap to gpkg") + print('Saving shakemap to gpkg') # Save as a GeoPackage file gdf_path = Path(eventDirectory) / 'EventGrid.gpkg' - gdf.to_file(gdf_path, driver="GPKG") + gdf.to_file(gdf_path, driver='GPKG') return if __name__ == '__main__': - parser = argparse.ArgumentParser() parser.add_argument('--input', help='Input file') parser.add_argument('--Directory', help='Directory path') parser.add_argument('--EventPath', help='Event path') parser.add_argument('--IntensityMeasureType', help='types of intensity measures') - args = parser.parse_args() create_shakemap_event(args.Directory, args.EventPath, args.IntensityMeasureType) diff --git a/modules/performRegionalMapping/GISSpecifiedEvents/GISSpecifiedEvent.py b/modules/performRegionalMapping/GISSpecifiedEvents/GISSpecifiedEvent.py index 67aaef869..c23ad992d 100644 --- a/modules/performRegionalMapping/GISSpecifiedEvents/GISSpecifiedEvent.py +++ b/modules/performRegionalMapping/GISSpecifiedEvents/GISSpecifiedEvent.py @@ -43,21 +43,23 @@ from RasterEvent import create_event as create_raster_event + def is_raster_file(filename): # Define a set of common raster file extensions raster_extensions = {'.jpg', '.jpeg', '.png', '.bmp', '.gif', '.tiff', '.tif'} - + # Create a Path object from the filename file_path = Path(filename) - + # Extract the file extension and check if it is in the set of raster extensions return file_path.suffix.lower() in raster_extensions + def is_xml_file(filename): # Check if the file has an .xml extension if not filename.lower().endswith('.xml'): return False - + # Try to parse the file as XML try: ET.parse(filename) @@ -65,17 +67,18 @@ def is_xml_file(filename): except ET.ParseError: return False -def create_event(asset_file : str, event_grid_file: str): # noqa: C901, N803, D103 +def create_event(asset_file: str, event_grid_file: str): # noqa: C901, N803, D103 if is_raster_file(event_grid_file): return create_raster_event(asset_file, event_grid_file) elif is_xml_file(event_grid_file): # Here you would call a function to handle XML files # For now, we'll just raise a NotImplementedError - raise NotImplementedError("XML file handling is not yet implemented.") + raise NotImplementedError('XML file handling is not yet implemented.') else: - raise ValueError(f"{event_grid_file} is not a raster. Only rasters are currently supported.") - + raise ValueError( + f'{event_grid_file} is not a raster. Only rasters are currently supported.' + ) if __name__ == '__main__': @@ -84,6 +87,4 @@ def create_event(asset_file : str, event_grid_file: str): # noqa: C901, N803, D parser.add_argument('--filenameEVENTgrid') args = parser.parse_args() - create_event( - args.assetFile, args.filenameEVENTgrid - ) + create_event(args.assetFile, args.filenameEVENTgrid) diff --git a/modules/performRegionalMapping/GISSpecifiedEvents/RasterEvent.py b/modules/performRegionalMapping/GISSpecifiedEvents/RasterEvent.py index a7dcddbc0..eb2ecad25 100644 --- a/modules/performRegionalMapping/GISSpecifiedEvents/RasterEvent.py +++ b/modules/performRegionalMapping/GISSpecifiedEvents/RasterEvent.py @@ -51,16 +51,15 @@ def sample_raster_at_latlon(src, lat, lon): # Ensure the indices are within the bounds of the raster if row < 0 or row >= src.height or col < 0 or col >= src.width: - raise IndexError("Transformed coordinates are out of raster bounds") + raise IndexError('Transformed coordinates are out of raster bounds') # Read the raster value at the given row and column raster_value = src.read(1)[row, col] return raster_value -def create_event(asset_file, event_grid_file): # noqa: C901, N803, D103 - +def create_event(asset_file, event_grid_file): # noqa: C901, N803, D103 # read the event grid data file event_grid_path = Path(event_grid_file).resolve() event_dir = event_grid_path.parent @@ -82,22 +81,21 @@ def create_event(asset_file, event_grid_file): # noqa: C901, N803, D103 asset_dict = json.load(f) data_final = [ - ['GP_file','Latitude','Longitude'], - ] + ['GP_file', 'Latitude', 'Longitude'], + ] # Iterate through each asset for asset in asset_dict: asset_id = asset['id'] asset_file_path = asset['file'] - - # Load the corresponding file for each asset - with open(asset_file_path, encoding='utf-8') as asset_file : + # Load the corresponding file for each asset + with open(asset_file_path, encoding='utf-8') as asset_file: # Load the asset data asset_data = json.load(asset_file) im_tag = asset_data['RegionalEvent']['intensityMeasures'][0] - + # Extract the latitude and longitude lat = float(asset_data['GeneralInformation']['location']['latitude']) lon = float(asset_data['GeneralInformation']['location']['longitude']) @@ -107,34 +105,33 @@ def create_event(asset_file, event_grid_file): # noqa: C901, N803, D103 # Check if the transformed coordinates are within the raster bounds bounds = src.bounds - if (bounds.left <= lon_transformed <= bounds.right and - bounds.bottom <= lat_transformed <= bounds.top): + if ( + bounds.left <= lon_transformed <= bounds.right + and bounds.bottom <= lat_transformed <= bounds.top + ): try: - val = sample_raster_at_latlon(src=src, - lat=lat_transformed, - lon=lon_transformed) - - data = [ - [im_tag], - [val] - ] - + val = sample_raster_at_latlon( + src=src, lat=lat_transformed, lon=lon_transformed + ) + + data = [[im_tag], [val]] + # Save the simcenter file name file_name = f'Site_{asset_id}.csvx{0}x{int(asset_id):05d}' - data_final.append([file_name,lat,lon]) + data_final.append([file_name, lat, lon]) csv_save_path = event_dir / f'Site_{asset_id}.csv' with open(csv_save_path, 'w', newline='') as file: # Create a CSV writer object writer = csv.writer(file) - + # Write the data to the CSV file writer.writerows(data) # prepare a dictionary of events event_list_json = [[file_name, 1.0]] - + asset_data['Events'] = [{}] asset_data['Events'][0] = { 'EventFolderPath': str(event_dir), @@ -145,12 +142,10 @@ def create_event(asset_file, event_grid_file): # noqa: C901, N803, D103 with open(asset_file_path, 'w', encoding='utf-8') as f: # noqa: PTH123 json.dump(asset_data, f, indent=2) - except IndexError as e: - print(f"Error for asset ID {asset_id}: {e}") + print(f'Error for asset ID {asset_id}: {e}') else: - print(f"Asset ID: {asset_id} is outside the raster bounds") - + print(f'Asset ID: {asset_id} is outside the raster bounds') # # save the event dictionary to the BIM # asset_data['Events'] = [{}] @@ -165,13 +160,12 @@ def create_event(asset_file, event_grid_file): # noqa: C901, N803, D103 # with open(asset_file, 'w', encoding='utf-8') as f: # noqa: PTH123 # json.dump(asset_data, f, indent=2) - # Save the final event grid csv_save_path = event_dir / 'EventGrid.csv' with open(csv_save_path, 'w', newline='') as file: # Create a CSV writer object writer = csv.writer(file) - + # Write the data to the CSV file writer.writerows(data_final) @@ -185,6 +179,4 @@ def create_event(asset_file, event_grid_file): # noqa: C901, N803, D103 parser.add_argument('--filenameEVENTgrid') args = parser.parse_args() - create_event( - args.assetFile, args.filenameEVENTgrid - ) + create_event(args.assetFile, args.filenameEVENTgrid) diff --git a/modules/performRegionalMapping/NearestNeighborEvents/NNE.py b/modules/performRegionalMapping/NearestNeighborEvents/NNE.py index bd4e953e5..2d2a0cb26 100644 --- a/modules/performRegionalMapping/NearestNeighborEvents/NNE.py +++ b/modules/performRegionalMapping/NearestNeighborEvents/NNE.py @@ -48,6 +48,7 @@ from sklearn.neighbors import NearestNeighbors import geopandas as gpd + def find_neighbors( # noqa: C901, D103 asset_file, event_grid_file, @@ -89,7 +90,7 @@ def find_neighbors( # noqa: C901, D103 if file_extension == '.csv': # Existing code for CSV files grid_df = pd.read_csv(event_dir / event_grid_file, header=0) - + # store the locations of the grid points in X lat_E = grid_df['Latitude'] # noqa: N806 lon_E = grid_df['Longitude'] # noqa: N806 @@ -103,15 +104,15 @@ def find_neighbors( # noqa: C901, D103 else: # Else assume GIS files - works will all gis files that geopandas supports gdf = gpd.read_file(event_dir / event_grid_file) - + # Ensure the GIS file is in a geographic coordinate system if not gdf.crs.is_geographic: gdf = gdf.to_crs(epsg=4326) # Convert to WGS84 - + # Extract coordinates from the geometry gdf['Longitude'] = gdf.geometry.x gdf['Latitude'] = gdf.geometry.y - + # store the locations of the grid points in X lat_E = gdf['Latitude'] # noqa: N806 lon_E = gdf['Longitude'] # noqa: N806 @@ -121,7 +122,7 @@ def find_neighbors( # noqa: C901, D103 grid_extra_keys = list( gdf.drop(['geometry', 'Longitude', 'Latitude'], axis=1).columns ) - + # Convert GeoDataFrame to regular DataFrame for consistency with the rest of the code grid_df = pd.DataFrame(gdf.drop(columns='geometry')) @@ -131,7 +132,9 @@ def find_neighbors( # noqa: C901, D103 else: neighbors_to_get = neighbors - nbrs = NearestNeighbors(n_neighbors=neighbors_to_get, algorithm='ball_tree').fit(X) + nbrs = NearestNeighbors(n_neighbors=neighbors_to_get, algorithm='ball_tree').fit( + X + ) # load the building data file with open(asset_file, encoding='utf-8') as f: # noqa: PTH123 @@ -307,34 +310,46 @@ def find_neighbors( # noqa: C901, D103 ] * e scale_list = np.ones(len(event_list)) - else : + else: event_list = [] scale_list = [] event_type = 'intensityMeasure' # Determine event_count (number of IMs per grid point) - im_columns = [col for col in grid_df.columns if col not in ['geometry', 'Longitude', 'Latitude']] + im_columns = [ + col + for col in grid_df.columns + if col not in ['geometry', 'Longitude', 'Latitude'] + ] event_count = len(im_columns) # for each neighbor for sample_j, nbr in enumerate(nbr_samples): - # make sure we resample events if samples > event_count event_j = sample_j % event_count # get the index of the nth neighbor nbr_index = ind_list[nbr] - + # For GIS files, create a new CSV file csv_filename = f'Site_{sample_j}.csv' csv_path = event_dir / csv_filename - + # Create a CSV file with data from the GIS file # Use actual data from the GIS file if available, otherwise use dummy data - im_columns = [col for col in grid_df.columns if col not in ['geometry', 'Longitude', 'Latitude']] - - im_data = pd.DataFrame({col: [grid_df.iloc[nbr_index][col]] * event_count for col in im_columns}) + im_columns = [ + col + for col in grid_df.columns + if col not in ['geometry', 'Longitude', 'Latitude'] + ] + + im_data = pd.DataFrame( + { + col: [grid_df.iloc[nbr_index][col]] * event_count + for col in im_columns + } + ) im_data.to_csv(csv_path, index=False) # save the collection file name and the IM row id