Run particle filter. init_model is the function which initialise the model, input_file_path is the path to the YAML file with the input parameters. observation_file_path is the path to the HDF5 file containing the observation sequence to perform filtering for. filter_type is the particle filter type to use. See ParticleFilter for the possible values. summary_stat_type is a type specifying the summary statistics of the particles to compute at each time step. See AbstractSummaryStat for the possible values. rng is a random number generator to use to generate random variates while filtering - a seeded random number generator may be specified to ensure reproducible results. If running with multiple threads a thread-safe generator such as Random.TaskLocalRNG (the default) must be used.
Returns a tuple containing the state particles representing an estimate of the filtering distribution at the final observation time (with each particle a column of the returned matrix) and a named tuple containing the estimated summary statistics of this final filtering distribution. If running on multiple ranks using MPI, the returned states array will correspond only to the particles local to this rank and the summary statistics will be returned only on the master rank with all other ranks returning nothing for their second return value.
To simulate observations from the model (which can be used to for example test the filtering algorithms) you can use the function simulate_observations_from_model
Run particle filter. init_model is the function which initialise the model, input_file_path is the path to the YAML file with the input parameters. observation_file_path is the path to the HDF5 file containing the observation sequence to perform filtering for. filter_type is the particle filter type to use. See ParticleFilter for the possible values. summary_stat_type is a type specifying the summary statistics of the particles to compute at each time step. See AbstractSummaryStat for the possible values. rng is a random number generator to use to generate random variates while filtering - a seeded random number generator may be specified to ensure reproducible results. If running with multiple threads a thread-safe generator such as Random.TaskLocalRNG (the default) must be used.
Returns a tuple containing the state particles representing an estimate of the filtering distribution at the final observation time (with each particle a column of the returned matrix) and a named tuple containing the estimated summary statistics of this final filtering distribution. If running on multiple ranks using MPI, the returned states array will correspond only to the particles local to this rank and the summary statistics will be returned only on the master rank with all other ranks returning nothing for their second return value.
To simulate observations from the model (which can be used to for example test the filtering algorithms) you can use the function simulate_observations_from_model
Simulate observations from the state space model initialised by the init_model function with parameters specified by the model key in the input YAML file at input_file_path and save the simulated observation and state sequences to a HDF5 file at output_file_path. rng is a random number generator to use to generate random variates while simulating from the model - a seeded random number generator may be specified to ensure reproducible results.
The input YAML file at input_file_path should have a simulate_observations key with value a dictionary with keys seed and n_time_step corresponding to respectively the number of time steps to generate observations for from the model and the seed to use to initialise the state of the random number generator used to simulate the observations.
The simulated observation sequence is returned as a matrix with columns corresponding to the observation vectors at each time step.
Singleton type OptimalFilter. This can be used as argument of run_particle_filter to select the optimal proposal filter (for conditionally linear-Gaussian models).
Abstract type for summary statistics of particle ensemble. Concrete subtypes can be passed as the filter_type argument to run_particle_filter to specify which summary statistics to record and how they are computed.
Abstract type for summary statistics computed using custom MPI reductions. Allows greater flexibility in computing statistics which can support more numerically stable implementations, but at a cost of not being compatible with all CPU architectures. In particular, MPI.jl does not currently support custom operators on Power PC and ARM architecures.
Abstract type for summary statistics computed using standard MPI sum reductions. Compatible with a wider range of CPU architectures but may require less numerically stable implementations.
Custom reduction based summary statistic type which computes the means and variances o the particle ensemble for each state dimension. On CPU architectures which do not support custom reductions NaiveMeanAndVarSummaryStat can be used instead.
Custom reduction based summary statistic type which computes the means of the particle ensemble for each state dimension. On CPU architectures which do not support custom reductions NaiveMeanSummaryStat can be used instead.
Sum reduction based summary statistic type which computes the means and variances of the particle ensemble for each state dimension. The mean and variance are computed by directly accumulating the sums of the particle values, the squared particle values and number of particles on each rank, with the variance computed as the scaled difference between the sum of the squares and square of the sums. This 'naive' implementation avoids custom MPI reductions but can be numerically unstable for large ensembles or state components with large values. If custom reductions are supported by the CPU architecture in use the more numerically stable MeanAndVarSummaryStat should be used instead.
Sum reduction based summary statistic type which computes the means of the particle ensemble for each state dimension. The mean is computed by directly accumulating the sums of the particle values and number of particles on each rank. If custom reductions are supported by the CPU architecture in use the more numerically stable MeanSummaryStat should be used instead.
The model needs to define a custom data structure and a few functions, that will be used by run_particle_filter:
a custom structure which holds the data about the model. This will be used to dispatch the methods to be defined, listed below;
an initialisation function with the following signature:
init(params_dict::Dict, n_tasks::Integer) -> model
with params_dict a dictionary with the parameters of the model and n_tasks an integer specifying the maximum number tasks (coroutines) parallelisable operations will be scheduled over. This initialisation function should create an instance of the model data structure and return it. The value of n_tasks can be used to create task-specific buffers for writing to when computing the model updates to avoid reallocating data structures on each function call. As tasks may be run in parallel over multiple threads, any buffers used in functions called within tasks should be unique to the task; to facilitate this functions in the model interface (see below) which may be called within tasks scheduled in parallel are passed a task_index argument which is a integer index in 1:n_tasks which is guaranteed to be unique to a particular task and so can be used to index in to task specific buffers.
The model needs to extend the following methods, using the model type for dispatch:
Sample value for state vector from its initial distribution for model described by model using random number generator rng to generate random draws and writing to state argument.
This method is intended to be extended by the user with the above signature, specifying the type of model.
Apply the deterministic component of the state time update at discrete time index time_index for the model described by model for the state vector state writing the updated state back to the state argument.
This method is intended to be extended by the user with the above signature, specifying the type of model.
Apply the stochastic component of the state time update for the model described by model for the state vector state, using random number generator rng to generate random draws and writing the updated state back to the state argument.
This method is intended to be extended by the user with the above signature, specifying the type of model.
Simulate observations from the state space model initialised by the init_model function with parameters specified by the model key in the input YAML file at input_file_path and save the simulated observation and state sequences to a HDF5 file at output_file_path. rng is a random number generator to use to generate random variates while simulating from the model - a seeded random number generator may be specified to ensure reproducible results.
The input YAML file at input_file_path should have a simulate_observations key with value a dictionary with keys seed and n_time_step corresponding to respectively the number of time steps to generate observations for from the model and the seed to use to initialise the state of the random number generator used to simulate the observations.
The simulated observation sequence is returned as a matrix with columns corresponding to the observation vectors at each time step.
Singleton type OptimalFilter. This can be used as argument of run_particle_filter to select the optimal proposal filter (for conditionally linear-Gaussian models).
Abstract type for summary statistics of particle ensemble. Concrete subtypes can be passed as the filter_type argument to run_particle_filter to specify which summary statistics to record and how they are computed.
Abstract type for summary statistics computed using custom MPI reductions. Allows greater flexibility in computing statistics which can support more numerically stable implementations, but at a cost of not being compatible with all CPU architectures. In particular, MPI.jl does not currently support custom operators on Power PC and ARM architecures.
Abstract type for summary statistics computed using standard MPI sum reductions. Compatible with a wider range of CPU architectures but may require less numerically stable implementations.
Custom reduction based summary statistic type which computes the means and variances o the particle ensemble for each state dimension. On CPU architectures which do not support custom reductions NaiveMeanAndVarSummaryStat can be used instead.
Custom reduction based summary statistic type which computes the means of the particle ensemble for each state dimension. On CPU architectures which do not support custom reductions NaiveMeanSummaryStat can be used instead.
Sum reduction based summary statistic type which computes the means and variances of the particle ensemble for each state dimension. The mean and variance are computed by directly accumulating the sums of the particle values, the squared particle values and number of particles on each rank, with the variance computed as the scaled difference between the sum of the squares and square of the sums. This 'naive' implementation avoids custom MPI reductions but can be numerically unstable for large ensembles or state components with large values. If custom reductions are supported by the CPU architecture in use the more numerically stable MeanAndVarSummaryStat should be used instead.
Sum reduction based summary statistic type which computes the means of the particle ensemble for each state dimension. The mean is computed by directly accumulating the sums of the particle values and number of particles on each rank. If custom reductions are supported by the CPU architecture in use the more numerically stable MeanSummaryStat should be used instead.
The model needs to define a custom data structure and a few functions, that will be used by run_particle_filter:
a custom structure which holds the data about the model. This will be used to dispatch the methods to be defined, listed below;
an initialisation function with the following signature:
init(params_dict::Dict, n_tasks::Integer) -> model
with params_dict a dictionary with the parameters of the model and n_tasks an integer specifying the maximum number tasks (coroutines) parallelisable operations will be scheduled over. This initialisation function should create an instance of the model data structure and return it. The value of n_tasks can be used to create task-specific buffers for writing to when computing the model updates to avoid reallocating data structures on each function call. As tasks may be run in parallel over multiple threads, any buffers used in functions called within tasks should be unique to the task; to facilitate this functions in the model interface (see below) which may be called within tasks scheduled in parallel are passed a task_index argument which is a integer index in 1:n_tasks which is guaranteed to be unique to a particular task and so can be used to index in to task specific buffers.
The model needs to extend the following methods, using the model type for dispatch:
Sample value for state vector from its initial distribution for model described by model using random number generator rng to generate random draws and writing to state argument.
This method is intended to be extended by the user with the above signature, specifying the type of model.
Apply the deterministic component of the state time update at discrete time index time_index for the model described by model for the state vector state writing the updated state back to the state argument.
This method is intended to be extended by the user with the above signature, specifying the type of model.
Apply the stochastic component of the state time update for the model described by model for the state vector state, using random number generator rng to generate random draws and writing the updated state back to the state argument.
This method is intended to be extended by the user with the above signature, specifying the type of model.
ParticleDA.get_log_density_observation_given_state(
observation, state, model, task_index=1
-) -> Real
Return the logarithm of the probability density of an observation vector observation given a state vector state for the model associated with model. Any additive terms that are constant with respect to the state may be neglected.
This method is intended to be extended by the user with the above signature, specifying the type of model.
Optionally, if the model has additive Gaussian observation and state noise, it may also extend the following methods, again using the model type for dispatch, to allow using the more statistically efficient OptimalFilter for filtering
ParticleDA.get_observation_mean_given_state!(
+) -> Real
Return the logarithm of the probability density of an observation vector observation given a state vector state for the model associated with model. Any additive terms that are constant with respect to the state may be neglected.
This method is intended to be extended by the user with the above signature, specifying the type of model.
Optionally, if the model has additive Gaussian observation and state noise, it may also extend the following methods, again using the model type for dispatch, to allow using the more statistically efficient OptimalFilter for filtering
Compute the mean of the multivariate normal distribution on the observations given the current state and write to the first argument.
This method is intended to be extended by the user with the above signature, specifying the type of model. Only required for filtering conditionally Gaussian models with the optimal proposal filter implementation in OptimalFilter.
ParticleDA.get_covariance_state_noise(model, i, j) -> Real
Return covariance cov(U[i], U[j]) between components of the zero-mean Gaussian state noise vector U.
This method is intended to be extended by the user with the above signature, specifying the type of model. Only required for filtering conditionally Gaussian models with the optimal proposal filter implementation in OptimalFilter.
ParticleDA.get_covariance_observation_noise(model, i, j) -> Real
Return covariance cov(V[i], V[j]) between components of the zero-mean Gaussian observation noise vector V.
This method is intended to be extended by the user with the above signature, specifying the type of model. Only required for filtering conditionally Gaussian models with the optimal proposal filter implementation in OptimalFilter.
Compute the mean of the multivariate normal distribution on the observations given the current state and write to the first argument.
This method is intended to be extended by the user with the above signature, specifying the type of model. Only required for filtering conditionally Gaussian models with the optimal proposal filter implementation in OptimalFilter.
ParticleDA.get_covariance_state_noise(model, i, j) -> Real
Return covariance cov(U[i], U[j]) between components of the zero-mean Gaussian state noise vector U.
This method is intended to be extended by the user with the above signature, specifying the type of model. Only required for filtering conditionally Gaussian models with the optimal proposal filter implementation in OptimalFilter.
ParticleDA.get_covariance_observation_noise(model, i, j) -> Real
Return covariance cov(V[i], V[j]) between components of the zero-mean Gaussian observation noise vector V.
This method is intended to be extended by the user with the above signature, specifying the type of model. Only required for filtering conditionally Gaussian models with the optimal proposal filter implementation in OptimalFilter.
ParticleDA.get_covariance_state_observation_given_previous_state(
model, i, j
-) -> Real
Return the covariance cov(X[i], Y[j]) between components of the state vector X = F(x) + U and observation vector Y = H * X + V where H is the linear observation operator, F the (potentially non-linear) forward operator describing the deterministic state dynamics, U is a zero-mean Gaussian state noise vector, V is a zero-mean Gaussian observation noise vector and x is the state at the previous observation time.
This method is intended to be extended by the user with the above signature, specifying the type of model. Only required for filtering conditionally Gaussian models with the optimal proposal filter implementation in OptimalFilter.
ParticleDA.get_covariance_observation_observation_given_previous_state(
+) -> Real
Return the covariance cov(X[i], Y[j]) between components of the state vector X = F(x) + U and observation vector Y = H * X + V where H is the linear observation operator, F the (potentially non-linear) forward operator describing the deterministic state dynamics, U is a zero-mean Gaussian state noise vector, V is a zero-mean Gaussian observation noise vector and x is the state at the previous observation time.
This method is intended to be extended by the user with the above signature, specifying the type of model. Only required for filtering conditionally Gaussian models with the optimal proposal filter implementation in OptimalFilter.
ParticleDA.get_covariance_observation_observation_given_previous_state(
model, i, j
-) -> Real
Return covariance cov(Y[i], Y[j]) between components of the observation vector Y = H * (F(x) + U) + V where H is the linear observation operator, F the (potentially non-linear) forward operator describing the deterministic state dynamics, U is a zero-mean Gaussian state noise vector, V is a zero-mean Gaussian observation noise vector and x is the state at the previous observation time.
This method is intended to be extended by the user with the above signature, specifying the type of model. Only required for filtering conditionally Gaussian models with the optimal proposal filter implementation in OptimalFilter.
You can store the input parameters in an YAML file with the following structure
filter:
+) -> Real
Return covariance cov(Y[i], Y[j]) between components of the observation vector Y = H * (F(x) + U) + V where H is the linear observation operator, F the (potentially non-linear) forward operator describing the deterministic state dynamics, U is a zero-mean Gaussian state noise vector, V is a zero-mean Gaussian observation noise vector and x is the state at the previous observation time.
This method is intended to be extended by the user with the above signature, specifying the type of model. Only required for filtering conditionally Gaussian models with the optimal proposal filter implementation in OptimalFilter.
master_rank : Id of MPI rank that performs serial computations
verbose::Bool : Flag to control whether to write output
output_filename::String : Name of output file
nprt::Int : Number of particles for particle filter
enable_timers::Bool : Flag to control run time measurements
particle_save_time_indices: Set of time indices to save particles at
seed: Seed to initialise state of random number generator used for filtering
n_tasks: Number of tasks to use for running parallelisable operations. Positive integers indicate the number of tasks directly, while the absolute value of negative integers indicate the number of task to use per-thread (as reported by Threads.nthreads()). Using multiple tasks per thread will improve the ability of the scheduler to balance load across threads but potentially increase overheads. If simulation of the model being filtered use multiple threads then it may be beneficial to set the n_tasks = 1 to avoid too much contention between threads.
A full example of a model interfacing ParticleDA is available in test/models/llw2d.jl. This model represents a simple two dimensional simulation of tsunami dynamics and is partly based on the tsunami data assimilation code by Takuto Maeda. The particle filter can be run with observations simulated from the model as follows
# Load ParticleDA
+ key5: value5
The parameters under filter are related to the particle filter, under model you can specify the parameters for different models.
The particle filter parameters are saved in the following data structure:
master_rank : Id of MPI rank that performs serial computations
verbose::Bool : Flag to control whether to write output
output_filename::String : Name of output file
nprt::Int : Number of particles for particle filter
enable_timers::Bool : Flag to control run time measurements
particle_save_time_indices: Set of time indices to save particles at
seed: Seed to initialise state of random number generator used for filtering
n_tasks: Number of tasks to use for running parallelisable operations. Positive integers indicate the number of tasks directly, while the absolute value of negative integers indicate the number of task to use per-thread (as reported by Threads.nthreads()). Using multiple tasks per thread will improve the ability of the scheduler to balance load across threads but potentially increase overheads. If simulation of the model being filtered use multiple threads then it may be beneficial to set the n_tasks = 1 to avoid too much contention between threads.
A full example of a model interfacing ParticleDA is available in test/models/llw2d.jl. This model represents a simple two dimensional simulation of tsunami dynamics and is partly based on the tsunami data assimilation code by Takuto Maeda. The particle filter can be run with observations simulated from the model as follows
# Load ParticleDA
using ParticleDA
# Save some variables for later use
@@ -53,7 +53,7 @@
# to use NaiveMeanAndVarSummaryStat instead
final_states, final_statistics = run_particle_filter(
LLW2d.init, input_file, observation_file, OptimalFilter, MeanAndVarSummaryStat
-)
The Input parameters section shows how to pass the input parameters for the filter and the model. For the model included in the test suite, called llw2d, you can set the following parameters:
lambda::AbstractFloat : Length scale for Matérn covariance kernel in background noise
nu::AbstractFloat : Smoothess parameter for Matérn covariance kernel in background noise
sigma::AbstractFloat : Marginal standard deviation for Matérn covariance kernel in background noise
lambda_initial_state::AbstractFloat : Length scale for Matérn covariance kernel in initial state of particles
nu_initial_state::AbstractFloat : Smoothess parameter for Matérn covariance kernel in initial state of particles
sigma_initial_state::AbstractFloat : Marginal standard deviation for Matérn covariance kernel in initial state of particles
padding::Int : Min padding for circulant embedding gaussian random field generator
primes::Int: Whether the size of the minimum circulant embedding of the covariance matrix can be written as a product of small primes (2, 3, 5 and 7). Default is true.
use_peak_initial_state_mean::Bool: Whether to set mean of initial height field to a wave peak (true) or to all zeros (false). In both cases the initial mean of the other state variables is zero.
absorber_thickness_fraction::Float : Thickness of absorber for sponge absorbing boundary conditions, fraction of grid size
boundary_damping::Float : Damping for boundaries
cutoff_depth::Float : Shallowest water depth
obs_noise_std::Vector: Standard deviations of noise added to observations of the true state
observed_state_var_indices::Vector: Vector containing the indices of the observed state variables (1: height, 2: velocity x-component, 3: velocity y-component)
The coordinates of the observation stations can be set in two different ways.
Provide the coordinates in an input file. Set the parameter station_filename to the name of your input file. The input file is in plain text, the format is one row per station containing x, y - coordinates in metres. Here is a simple example with two stations
# Comment lines starting with '#' will be ignored by the code
+)
The Input parameters section shows how to pass the input parameters for the filter and the model. For the model included in the test suite, called llw2d, you can set the following parameters:
lambda::AbstractFloat : Length scale for Matérn covariance kernel in background noise
nu::AbstractFloat : Smoothess parameter for Matérn covariance kernel in background noise
sigma::AbstractFloat : Marginal standard deviation for Matérn covariance kernel in background noise
lambda_initial_state::AbstractFloat : Length scale for Matérn covariance kernel in initial state of particles
nu_initial_state::AbstractFloat : Smoothess parameter for Matérn covariance kernel in initial state of particles
sigma_initial_state::AbstractFloat : Marginal standard deviation for Matérn covariance kernel in initial state of particles
padding::Int : Min padding for circulant embedding gaussian random field generator
primes::Int: Whether the size of the minimum circulant embedding of the covariance matrix can be written as a product of small primes (2, 3, 5 and 7). Default is true.
use_peak_initial_state_mean::Bool: Whether to set mean of initial height field to a wave peak (true) or to all zeros (false). In both cases the initial mean of the other state variables is zero.
absorber_thickness_fraction::Float : Thickness of absorber for sponge absorbing boundary conditions, fraction of grid size
boundary_damping::Float : Damping for boundaries
cutoff_depth::Float : Shallowest water depth
obs_noise_std::Vector: Standard deviations of noise added to observations of the true state
observed_state_var_indices::Vector: Vector containing the indices of the observed state variables (1: height, 2: velocity x-component, 3: velocity y-component)
The coordinates of the observation stations can be set in two different ways.
Provide the coordinates in an input file. Set the parameter station_filename to the name of your input file. The input file is in plain text, the format is one row per station containing x, y - coordinates in metres. Here is a simple example with two stations
# Comment lines starting with '#' will be ignored by the code
# This file contains two stations: at [1km, 1km] and [2km, 2km]
1.0e3, 1.0e3
2.0e3, 2.0e3
Provide parameters for an equispaced rectilinear grid of observation stations. The values of these parameters should then be set:
n_stations_x : Number of observation stations in the x direction
n_stations_y : Number of observation stations in the y direction
station_distance_x : Distance between stations in the x direction (m)
station_distance_y : Distance between stations in the y direction (m)
station_boundary_x : Distance between bottom left edge of box and first station in the x direction (m)
station_boundary_y : Distance between bottom left edge of box and first station in the y direction (m)
If the filter parameter verbose is set to true, run_particle_filter will produce an HDF5 file in the run directory. The file name is particle_da.h5 by default (this is configurable using the output_filename filter parameter). The file contains the summary statistics of the estimate state distribution (by default the mean and variance), particle weights, parameters used, and other metadata at each time step observation were assimilated. To read the output file, use the HDF5 library.
A basic plotting tool is provided in a Jupyter notebook. This is intended as a template to build more sophisticated postprocessing tools, but can be used for some simple analysis. Set the variable timestamp in the third cell to plot different time slices from the output file. More functionality may be added as the package develops.
The particle state update is parallelised using both MPI and threading. According to our preliminary tests both methods work well at small scale. To use the threading, set the environment variable JULIA_NUM_THREADS to the number of threads you want to use before starting Julia and then call the run_particle_filter function normally. You can check the number of threads julia has available by calling in Julia's REPL
Threads.nthreads()
To use the MPI parallelisation, write a Julia script that calls the run_particle_filter function for the relevant model and observations and run it in an Unix shell with
mpirun -np <your_number_of_processes> julia <your_julia_script>
Note that the parallel performance may vary depending on the performance of the algorithm. In general, a degeneracy of the particle weights will lead to poor load balance and parallel performance. See this issue for more details.
We have a basic test suite for ParticleDA.jl. You can run the tests by entering the package manager mode in Julia's REPL with ] and running the command