Skip to content

Commit

Permalink
Use per-element ML models for local descriptors (#39)
Browse files Browse the repository at this point in the history
* Implement a per-element version of SOAP

* Ensuring matrices are on same devices

* Scale scaling constants by number of atoms

* Use per-atom scaling in train, add patience to training

* Bug fix: Only scale forces one

I scaled the energy, so I don't need to scale the derivatives
of energy wrt descriptors

* Add lengthscale as option, remove HPO from notebook
  • Loading branch information
WardLT authored Dec 31, 2023
1 parent 0033eb9 commit 496c16f
Show file tree
Hide file tree
Showing 3 changed files with 321 additions and 428 deletions.
220 changes: 174 additions & 46 deletions jitterbug/model/dscribe/local.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Create a PyTorch-based model which uses features for each atom"""
from typing import Union, Optional, Callable
from typing import Union, Optional, Callable, Sequence

import ase
from ase.calculators.calculator import Calculator, all_changes
from dscribe.descriptors.descriptorlocal import DescriptorLocal
from torch.utils.data import TensorDataset, DataLoader
Expand All @@ -22,15 +23,21 @@ class InducedKernelGPR(torch.nn.Module):
inducing_x: Starting points for the reference points of the kernel
use_ard: Whether to employ a different length scale parameter for each descriptor,
a technique known as Automatic Relevance Detection (ARD)
initial_lengthscale: Initial value for the lengthscale parmaeter
"""

def __init__(self, inducing_x: torch.Tensor, use_ard: bool):
def __init__(self, inducing_x: torch.Tensor, use_ard: bool, initial_lengthscale: float = 10):
super().__init__()
n_points, n_desc = inducing_x.shape
self.inducing_x = torch.nn.Parameter(inducing_x.clone())
self.alpha = torch.nn.Parameter(torch.empty((n_points,), dtype=inducing_x.dtype))
torch.nn.init.normal_(self.alpha)
self.lengthscales = torch.nn.Parameter(-torch.ones((n_desc,), dtype=inducing_x.dtype) if use_ard else -torch.ones((1,), dtype=inducing_x.dtype))

# Initial value
ls = np.log(initial_lengthscale)
self.lengthscales = torch.nn.Parameter(
-torch.ones((n_desc,), dtype=inducing_x.dtype) * ls if use_ard else
-torch.ones((1,), dtype=inducing_x.dtype) * ls)

def forward(self, x) -> torch.Tensor:
# Compute an RBF kernel
Expand All @@ -40,64 +47,158 @@ def forward(self, x) -> torch.Tensor:
esd = torch.exp(-diff)

# Return the sum
return torch.tensordot(self.alpha, esd, dims=([0], [0]))
return torch.tensordot(self.alpha, esd, dims=([0], [0]))[:, None]


def make_gpr_model(train_descriptors: np.ndarray, num_inducing_points: int, use_ard_kernel: bool = False) -> InducedKernelGPR:
"""Make the GPR model for a certain atomic system
class PerElementModule(torch.nn.Module):
"""Fit a different model for each element
Args:
models: Map of atomic number to element to use
"""

def __init__(self, models: dict[int, torch.nn.Module]):
super().__init__()
self.models = torch.nn.ModuleDict(
dict((str(k), v) for k, v in models.items())
)

def forward(self, element: torch.IntTensor, desc: torch.Tensor) -> torch.Tensor:
output = torch.empty_like(desc[:, 0])
for elem, model in self.models.items():
elem_id = int(elem)
mask = element == elem_id
output[mask] = model(desc[mask, :])[:, 0]
return output


def make_nn_model(
elements: np.ndarray,
descriptors: np.ndarray,
hidden_layers: Sequence[int] = (),
activation: torch.nn.Module = torch.nn.Sigmoid()
) -> PerElementModule:
"""Make a neural network model for a certain atomic system
Assumes that the descriptors have already been scaled
Args:
train_descriptors: 3D array of all training points (num_configurations, num_atoms, num_descriptors)
num_inducing_points: Number of inducing points to use in the kernel. More points, more complex model
elements: Element for each atom in a structure (num_atoms,)
descriptors: 3D array of all training points (num_configurations, num_atoms, num_descriptors)
hidden_layers: Number of units in the hidden layers
activation: Activation function used for the hidden layers
"""

# Detect the dtype
dtype = torch.from_numpy(descriptors[0, 0, :1]).dtype

# Make a model for each element type
models: dict[int, torch.nn.Sequential] = {}
element_types = np.unique(elements)

for element in element_types:
# Make the neural network
nn_layers = []
input_size = descriptors.shape[2]
for hidden_size in hidden_layers:
nn_layers.extend([
torch.nn.Linear(input_size, hidden_size, dtype=dtype),
activation
])
input_size = hidden_size

# Make the last layer
nn_layers.append(torch.nn.Linear(input_size, 1, dtype=dtype))
models[element] = torch.nn.Sequential(*nn_layers)

return PerElementModule(models)


def make_gpr_model(elements: np.ndarray,
descriptors: np.ndarray,
num_inducing_points: int,
fix_inducing_points: bool = True,
use_ard_kernel: bool = False,
**kwargs) -> PerElementModule:
"""Make the GPR model for a certain atomic system
Assumes that the descriptors have already been scaled.
Passes additional kwargs to the :class:`InducedKernelGPR` constructor.
Args:
elements: Element for each atom in a structure (num_atoms,)
descriptors: 3D array of all training points (num_configurations, num_atoms, num_descriptors)
num_inducing_points: Number of inducing points to use in the kernel for each model. More points, more complex model
fix_inducing_points: Whether to fix the inducing points or allow them to be learned
use_ard_kernel: Whether to use a different length scale parameter for each descriptor
Returns:
Model which can predict energy given descriptors for a single configuration
"""

# Select a set of inducing points randomly
descriptors = np.reshape(train_descriptors, (-1, train_descriptors.shape[-1]))
num_inducing_points = min(num_inducing_points, descriptors.shape[0])
inducing_inds = np.random.choice(descriptors.shape[0], size=(num_inducing_points,), replace=False)
inducing_points = descriptors[inducing_inds, :]
# Make a model for each element type
models: dict[int, InducedKernelGPR] = {}
element_types = np.unique(elements)

for element in element_types:
# Select a set of inducing points from records of each atom
# TODO (wardlt): Use a method which ensures diversity, like KMeans
mask = elements == element
masked_descriptors = descriptors[:, mask, :]
masked_descriptors = np.reshape(masked_descriptors, (-1, masked_descriptors.shape[-1]))
num_inducing_points = min(num_inducing_points, masked_descriptors.shape[0])
inducing_inds = np.random.choice(masked_descriptors.shape[0], size=(num_inducing_points,), replace=False)
inducing_points = masked_descriptors[inducing_inds, :]

# Make the model
model = InducedKernelGPR(
inducing_x=torch.from_numpy(inducing_points),
use_ard=use_ard_kernel,
**kwargs
)
model.inducing_x.requires_grad = not fix_inducing_points
models[element] = model

# Make the model
return InducedKernelGPR(
inducing_x=torch.from_numpy(inducing_points),
use_ard=use_ard_kernel,
)
return PerElementModule(models)


def train_model(model: torch.nn.Module,
def train_model(model: PerElementModule,
train_e: np.ndarray,
train_x: np.ndarray,
train_y: np.ndarray,
steps: int,
batch_size: int = 4,
learning_rate: float = 0.01,
device: Union[str, torch.device] = 'cpu',
patience: Optional[int] = None,
verbose: bool = False) -> pd.DataFrame:
"""Train the kernel model over many iterations
Assumes that the descriptors have already been scaled
Args:
model: Model to be trained
train_e: Elements for each atom in a configuration (num_atoms,)
train_x: 3D array of all training points (num_configurations, num_atoms, num_descriptors)
train_y: Energies for each training point
steps: Number of interactions over all training points
batch_size: Number of conformers per batch
learning_rate: Learning rate used for the optimizer
device: Which device to use for training
patience: If provided, stop learning if train loss fails to improve after these many iterations
verbose: Whether to display a progress bar
Returns:
Mean loss over each iteration
"""

# Convert the data to Torches
# Convert the data to Tensors
n_conf, n_atoms = train_x.shape[:2]
train_x = torch.from_numpy(train_x)
train_y = torch.from_numpy(train_y)
train_e = torch.from_numpy(train_e).to(device)

# Duplicate the elements per batch size
train_e = train_e.repeat(batch_size)

# Make the data loader
dataset = TensorDataset(train_x, train_y)
Expand All @@ -115,7 +216,10 @@ def train_model(model: torch.nn.Module,

# Iterate over the data multiple times
losses = []
for _ in tqdm(range(steps), disable=not verbose, leave=False):
iterator = tqdm(range(steps), disable=not verbose, leave=False)
no_improvement = 0 # Number of epochs w/o improvement
best_loss = np.inf
for _ in iterator:
epoch_loss = 0
for batch_x, batch_y in loader:
# Prepare at the beginning of each batch
Expand All @@ -125,7 +229,7 @@ def train_model(model: torch.nn.Module,

# Predict on all configurations
batch_x = torch.reshape(batch_x, (-1, batch_x.shape[-1])) # Flatten from (n_confs, n_atoms, n_desc) -> (n_confs * n_atoms, n_desc)
pred_y_per_atoms_flat = model(batch_x)
pred_y_per_atoms_flat = model(train_e, batch_x)

# Get the mean sum for each atom
pred_y_per_atoms = torch.reshape(pred_y_per_atoms_flat, (batch_size, n_atoms))
Expand All @@ -140,8 +244,16 @@ def train_model(model: torch.nn.Module,
opt.step()
epoch_loss += batch_loss.item()

# Update the best loss
no_improvement = 0 if epoch_loss < best_loss else no_improvement + 1
best_loss = min(best_loss, epoch_loss)
iterator.set_description(f'Loss: {epoch_loss:.2e} - Patience: {no_improvement}')
losses.append(epoch_loss)

# Break if no improvement
if patience is not None and no_improvement > patience:
break

# Pull the model back off the GPU
model.to('cpu')
return pd.DataFrame({'loss': losses})
Expand All @@ -151,7 +263,7 @@ class DScribeLocalCalculator(Calculator):
"""Calculator which uses descriptors for each atom and PyTorch to compute energy
Keyword Args:
model (torch.nn.Module): A machine learning model which takes descriptors as inputs
model (PerElementModule): A machine learning model which takes atomic numbers and descriptors as inputs
desc (DescriptorLocal): Tool used to compute the descriptors
desc_scaling (tuple[np.ndarray, np.ndarray]): A offset and factor with which to adjust the energy per atom predictions,
which are typically he mean and standard deviation of energy per atom across the training set.
Expand All @@ -160,6 +272,8 @@ class DScribeLocalCalculator(Calculator):
device (str | torch.device): Device to use for inference
"""

# TODO (wardlt): Have scaling for descriptors and energies be per-element

implemented_properties = ['energy', 'forces', 'energies']
default_parameters = {
'model': None,
Expand All @@ -169,32 +283,38 @@ class DScribeLocalCalculator(Calculator):
'device': 'cpu'
}

def calculate(self, atoms=None, properties=('energy', 'forces', 'energies'),
def calculate(self, atoms: ase.Atoms = None, properties=('energy', 'forces', 'energies'),
system_changes=all_changes):
# Compute the descriptors for the atoms
d_desc_d_pos, desc = self.parameters['desc'].derivatives(atoms, attach=True)

# Scale the descriptors
offset, scale = self.parameters['desc_scaling']
desc = (desc - offset) / scale
d_desc_d_pos /= scale
desc_offset, desc_scale = self.parameters['desc_scaling']
desc = (desc - desc_offset) / desc_scale
d_desc_d_pos /= desc_scale

# Convert to pytorch
desc = torch.from_numpy(desc.astype(np.float32))
# TODO (wardlt): Make it possible to convert to float32 or lower
desc = torch.from_numpy(desc)
desc.requires_grad = True
d_desc_d_pos = torch.from_numpy(d_desc_d_pos.astype(np.float32))
d_desc_d_pos = torch.from_numpy(d_desc_d_pos)

# Move the model to device if need be
model: torch.nn.Module = self.parameters['model']
# Make sure the model has all the required elements
model: PerElementModule = self.parameters['model']
missing_elems = set(map(str, atoms.get_atomic_numbers())).difference(model.models.keys())
if len(missing_elems) > 0:
raise ValueError(f'Model lacks parameters for elements: {", ".join(missing_elems)}')
device = self.parameters['device']
model.to(device)

# Run inference
offset, scale = self.parameters['energy_scaling']
eng_offset, eng_scale = self.parameters['energy_scaling']
elements = torch.from_numpy(atoms.get_atomic_numbers())
model.eval() # Ensure we're in eval mode
elements = elements.to(device)
desc = desc.to(device)
pred_energies_dist = model(desc)
pred_energies = pred_energies_dist * scale + offset
pred_energies_dist = model(elements, desc)
pred_energies = pred_energies_dist * eng_scale + eng_offset
pred_energy = torch.sum(pred_energies)
self.results['energy'] = pred_energy.item()
self.results['energies'] = pred_energies.detach().cpu().numpy()
Expand All @@ -205,14 +325,19 @@ def calculate(self, atoms=None, properties=('energy', 'forces', 'energies'),
# Derivatives for the descriptors are for each center (which is the input to the model) with respect to each atomic coordinate changing.
# Energy is summed over the contributions from each center.
# The total force is therefore a sum over the effect of an atom moving on all centers
# Note: Forces are scaled because pred_energy was scaled
d_energy_d_desc = torch.autograd.grad(
outputs=pred_energy,
inputs=desc,
grad_outputs=torch.ones_like(pred_energy),
)[0] # Derivative of the energy with respect to the descriptors for each center
d_desc_d_pos = d_desc_d_pos.to(device)
d_energy_d_center_d_pos = torch.einsum('ijkl,il->ijk', d_desc_d_pos, d_energy_d_desc) # Derivative for each center with respect to each atom
pred_forces = -d_energy_d_center_d_pos.sum(dim=0) * scale # Total effect on each center from each atom

# Einsum is log-form for: dE_d(center:i from atom:j moving direction:k)
# = sum_(descriptors:l) d(descriptor:l)/d(i,j,k) * dE(center:i)/d(l)
# "Use the chain rule to get the change in energy for each center
d_energy_d_center_d_pos = torch.einsum('ijkl,il->ijk', d_desc_d_pos, d_energy_d_desc)
pred_forces = -d_energy_d_center_d_pos.sum(dim=0) # Total effect on each center from each atom

# Store the results
self.results['forces'] = pred_forces.detach().cpu().numpy()
Expand All @@ -238,7 +363,7 @@ class DScribeLocalEnergyModel(ASEEnergyModel):
def __init__(self,
reference: Atoms,
descriptors: DescriptorLocal,
model_fn: Callable[[np.ndarray], torch.nn.Module],
model_fn: Callable[[np.ndarray], PerElementModule],
num_calculators: int,
device: str = 'cpu',
train_options: Optional[dict] = None):
Expand All @@ -249,26 +374,29 @@ def __init__(self,
self.train_options = train_options or {'steps': 4}

def train_calculator(self, data: list[Atoms]) -> Calculator:
# Train it using the user-provided options
# Get the elements
elements = data[0].get_atomic_numbers()

# Prepare the training set, scaling the input
train_x = self.descriptors.create(data)
offset_x = train_x.mean(axis=(0, 1))
scale_x = np.clip(train_x.std(axis=(0, 1)), a_min=1e-6, a_max=None)
train_x -= offset_x
scale_x = np.clip(train_x.std(axis=(0, 1)), a_min=1e-6, a_max=None)
train_x /= scale_x

train_y = np.array([a.get_potential_energy() for a in data])
scale_y, offset_y = np.std(train_y), np.mean(train_y)
train_y = (train_y - offset_y) / scale_y
train_y_per_atom = np.array([a.get_potential_energy() / len(a) for a in data])
scale, offset = train_y_per_atom.std(), train_y_per_atom.mean()
train_y = np.array([(a.get_potential_energy() - len(a) * offset) / scale for a in data])

# Make then train the model
# Make the model and train it
model = self.model_fn(train_x)
train_model(model, train_x, train_y, device=self.device, **self.train_options)
train_model(model, elements, train_x, train_y, device=self.device, **self.train_options)

# Make the calculator
# Return the model
return DScribeLocalCalculator(
model=model,
desc=self.descriptors,
desc_scaling=(offset_x, scale_x),
energy_scaling=(offset_y, scale_y),
energy_scaling=(offset, scale),
device=self.device
)
Loading

0 comments on commit 496c16f

Please sign in to comment.