Skip to content

Commit

Permalink
Update Sunspot Configurations and Tests (#133)
Browse files Browse the repository at this point in the history
* Switch to using torch and lightning 2

* Update lightning code to 2.0 compliant

* Install lammps with apt

* Ensure all tensors are on same device

Reverses a 2x performance loss

* Update cuda runtime environment to 11.8

* Ensure model gets to XPU

Automatic placement on an accelerator only works
for CUDA, maybe AMD, and MPS

* Add CPU affinity appropriate for SR chips

* Flake8 fixes

* Switch to using @coreyjadams's branch

* Update build instructions for Aurora

* Fix lammps test
  • Loading branch information
WardLT authored May 22, 2024
1 parent de15092 commit f0ace80
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 74 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-package-conda.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
run: |
if [ "$RUNNER_OS" == "Linux" ]; then
sudo apt update
sudo apt install -y cp2k
sudo apt install -y cp2k lammps
else
echo "$RUNNER_OS not supported"
exit 1
Expand Down
13 changes: 12 additions & 1 deletion component-tests/generation/run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ def test_function(model_path: Path, n_atoms: int, template: Path, n_samples: int
)
])
elif args.config.startswith("sunspot"):
# Map processes to specific tiles or whole devices
if args.config == "sunspot":
accel_ids = [
f"{gid}.{tid}"
Expand All @@ -109,13 +110,23 @@ def test_function(model_path: Path, n_atoms: int, template: Path, n_samples: int
]
else:
raise ValueError(f'Not supported: {args.config}')

# Ensures processes are mapped to physical cores
workers_per_socket = len(accel_ids) // 2
cores_per_socket = 52
cores_per_worker = cores_per_socket // workers_per_socket
assigned_cores = []
for socket in range(2):
start = cores_per_socket * socket
assigned_cores.extend(f"{start + w * cores_per_worker}-{start + (w + 1) * cores_per_worker - 1}" for w in range(workers_per_socket))

config = Config(
retries=2,
executors=[
HighThroughputExecutor(
label="sunspot_test",
available_accelerators=accel_ids, # Ensures one worker per accelerator
cpu_affinity="block", # Assigns cpus in sequential order
cpu_affinity='list:' + ":".join(assigned_cores), # Assigns cpus in sequential order
prefetch_capacity=0,
max_workers=len(accel_ids),
cores_per_worker=208 // len(accel_ids),
Expand Down
13 changes: 13 additions & 0 deletions envs/build-aurora.sh
Original file line number Diff line number Diff line change
@@ -1,10 +1,23 @@
#! /bin/bash

# Active the modules needed to build libraries later
module reset
module use /soft/modulefiles/
module use /home/ftartagl/graphics-compute-runtime/modulefiles
module load oneapi/release/2023.12.15.001
module load intel_compute_runtime/release/775.20
module load gcc/12.2.0

# Build then activate the environment
conda env create --file envs/environment-aurora.yml --force -p ./env
conda activate ./env

# Build torch_ccl locally
# Clone from: https://github.com/intel/torch-ccl
cd libs/torch_ccl
COMPUTE_BACKEND=dpcpp pip install -e .

# Now install Corey's stuff
# Clone from: https://github.com/coreyjadams/lightning
cd ../lightning
PACKAGE_NAME=pytorch pip install -e .
1 change: 0 additions & 1 deletion envs/environment-aurora.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ dependencies:

- pip
- pip:
- pytorch-lightning<2
- git+https://gitlab.com/ase/ase.git
- git+https://github.com/exalearn/colmena.git # Fixes for streaming not yet on PyPI
- -e ..[test]
Expand Down
4 changes: 2 additions & 2 deletions envs/environment-cpu.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dependencies:

# Tools used as part of the ML packages
- imageio
- pytorch-lightning<2
- pytorch-lightning
- wandb

# Tools to work with crystalline and molecular data
Expand All @@ -32,7 +32,7 @@ dependencies:
- lammps

# Use Conda PyTorch to avoid OpenMP disagreement with other libraries
- pytorch==1.13.*=*cpu*
- pytorch==2.3.*=*cpu*

- pip
- pip:
Expand Down
8 changes: 4 additions & 4 deletions envs/environment-cuda117.yml → envs/environment-cuda11.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ dependencies:

# Tools used as part of the ML packages
- imageio
- pytorch-lightning<2
- pytorch-lightning
- wandb

# Tools to work with crystalline and molecular data
Expand All @@ -30,9 +30,9 @@ dependencies:
- conda-forge::raspa2

# Use Conda PyTorch to avoid OpenMP disagreement with other libraries
- pytorch::pytorch==1.13.1
- pytorch::pytorch-cuda=11.7
- nvidia::cudatoolkit=11.7
- pytorch::pytorch==2.3.*
- pytorch::pytorch-cuda=11.8
- cuda-version==11.8

# Services used by the workflow
- redis==5.*
Expand Down
67 changes: 13 additions & 54 deletions mofa/difflinker_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,62 +2,27 @@
import os
from contextlib import redirect_stderr, redirect_stdout
from pathlib import Path
from typing import Any

import torch
from pytorch_lightning import Trainer, callbacks
from pytorch_lightning.accelerators import Accelerator
from pytorch_lightning.callbacks import TQDMProgressBar

try:
import intel_extension_for_pytorch as ipex # noqa: F401
import oneccl_bindings_for_pytorch # noqa: F401
except ImportError:
pass

from mofa.utils.src.const import NUMBER_OF_ATOM_TYPES, GEOM_NUMBER_OF_ATOM_TYPES
from mofa.utils.src.lightning import DDPM
from mofa.utils.src.utils import disable_rdkit_logging


def _intel_on_train_start(trainer: Trainer):
"""Hook for optimizing the model and optimizer before training"""
import intel_extension_for_pytorch as ipex
assert len(trainer.optimizers) == 1, 'We only support one optimizer for now'
trainer.model, trainer.optimizers[0] = ipex.optimize(trainer.model, optimizer=trainer.optimizers[0])


# Placeholder until XPU support merged: https://github.com/Lightning-AI/pytorch-lightning/pull/17700
class XPUAccelerator(Accelerator):
"""Shim for XPU support"""

# See: https://lightning.ai/docs/pytorch/stable/extensions/accelerator.html#create-a-custom-accelerator

def setup(self, trainer: Trainer) -> None:
pass

def setup_device(self, device: torch.device) -> None:
return

def teardown(self) -> None:
return

@staticmethod
def parse_devices(devices: Any) -> Any:
return devices

@staticmethod
def get_parallel_devices(devices: Any) -> Any:
return [torch.device("xpu", idx) for idx in devices]

@staticmethod
def auto_device_count() -> int:
# Return a value for auto-device selection when `Trainer(devices="auto")`
raise NotImplementedError()

@staticmethod
def is_available() -> bool:
return True

def get_device_stats(self, device: str | torch.device) -> dict[str, Any]:
# Return optional device statistics for loggers
return {}


def get_args(args: list[str]) -> argparse.Namespace:
"""Assemble arguments form model training
Expand Down Expand Up @@ -185,14 +150,6 @@ def main(
if '.' in args.train_data_prefix:
context_node_nf += 1

# Make an XPU acceleator, if needed
if 'xpu' in args.device:
pl_device = XPUAccelerator()
devices = [0]
else:
pl_device = args.device
devices = "auto"

checkpoint_callback = [callbacks.ModelCheckpoint(
dirpath=checkpoints_dir,
filename='difflinker_{epoch:02d}',
Expand All @@ -204,15 +161,14 @@ def main(
default_root_dir=log_directory,
max_epochs=args.n_epochs,
callbacks=checkpoint_callback,
accelerator=pl_device,
devices=devices,
accelerator=args.device,
num_sanity_val_steps=0,
enable_progress_bar=args.enable_progress_bar,
)

# Add a callback for fit setup
if args.device == "xpu":
trainer.on_train_start = _intel_on_train_start
trainer.on_fit_start = _intel_on_train_start

# Get the model
if args.resume is None:
Expand Down Expand Up @@ -259,6 +215,7 @@ def main(
last_checkpoint = find_last_checkpoint(checkpoints_dir)
ddpm = DDPM.load_from_checkpoint(
last_checkpoint,
map_location=args.device,
strict=False,
data_path=args.data,
train_data_prefix=args.train_data_prefix,
Expand Down Expand Up @@ -297,10 +254,12 @@ def main(
anchors_context=anchors_context,
dataset_override=args.dataset_override)

# Force loading of the dataset now before we start distributed training
# Force converting the dataset now before we start distributed training
# There might be issues in each training rank writing to disk at the same time
# TODO (wardlt): Separate the data loader from the model code so it's clearer how to set up
# TODO (wardlt): Separate the data loader from the model code so we can avoid these problems
ddpm.to('cpu')
ddpm.setup('fit')
ddpm.train_dataset = ddpm.val_dataset = None # Unload the dataset to avoid needing to copy it to subprocesses
trainer.fit(model=ddpm)

# Save the last model
Expand Down
2 changes: 1 addition & 1 deletion mofa/utils/difflinker_sample_and_analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def generate_animation(ddpm, chain_batch, node_mask, n_mol):
@lru_cache(maxsize=1) # Keep only one model in memory
def load_model(path, device) -> DDPM:
"""Load the DDPM model from disk"""
return DDPM.load_from_checkpoint(path, torch_device=device).eval().to(device)
return DDPM.load_from_checkpoint(path, map_location=device).eval().to(device)


def main_run(templates: list[LigandTemplate],
Expand Down
11 changes: 7 additions & 4 deletions mofa/utils/src/egnn.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def forward(self, t, xh, node_mask, linker_mask, edge_mask, context):
"""

bs, n_nodes = xh.shape[0], xh.shape[1]
edges = self.get_edges(n_nodes, bs) # (2, B*N)
edges = self.get_edges(n_nodes, bs, device=xh.device) # (2, B*N)
node_mask = node_mask.view(bs * n_nodes, 1) # (B*N, 1)

if linker_mask is not None:
Expand Down Expand Up @@ -434,7 +434,7 @@ def forward(self, t, xh, node_mask, linker_mask, edge_mask, context):

return torch.cat([vel, h_final], dim=2)

def get_edges(self, n_nodes, batch_size):
def get_edges(self, n_nodes, batch_size, device):
if n_nodes in self.edge_cache:
edges_dic_b = self.edge_cache[n_nodes]
if batch_size in edges_dic_b:
Expand All @@ -447,12 +447,15 @@ def get_edges(self, n_nodes, batch_size):
for j in range(n_nodes):
rows.append(i + batch_idx * n_nodes)
cols.append(j + batch_idx * n_nodes)
edges = [torch.LongTensor(rows).to(self.device), torch.LongTensor(cols).to(self.device)]
edges = [
torch.LongTensor(rows).to(device),
torch.LongTensor(cols).to(device)
]
edges_dic_b[batch_size] = edges
return edges
else:
self.edge_cache[n_nodes] = {}
return self.get_edges(n_nodes, batch_size)
return self.get_edges(n_nodes, batch_size, device)


class DynamicsWithPockets(Dynamics):
Expand Down
13 changes: 9 additions & 4 deletions mofa/utils/src/lightning.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ def __init__(
)
self.linker_size_sampler = DistributionNodes(LINKER_SIZE_DIST)

self.validation_step_outputs = []

def setup(self, stage: Optional[str] = None):
dataset_type = MOADDataset if '.' in self.train_data_prefix else ZincDataset
if self.dataset_override == "MOFA":
Expand Down Expand Up @@ -236,7 +238,7 @@ def validation_step(self, data, *args):
loss = vlb_loss
else:
raise NotImplementedError(self.loss_type)
return {
validation_step = {
'loss': loss,
'delta_log_px': delta_log_px,
'kl_prior': kl_prior,
Expand All @@ -247,6 +249,7 @@ def validation_step(self, data, *args):
'noise_t': noise_t,
'noise_0': noise_0
}
self.validation_step_outputs.append(validation_step)

def test_step(self, data, *args):
delta_log_px, kl_prior, loss_term_t, loss_term_0, l2_loss, noise_t, noise_0 = self.forward(data, training=False)
Expand Down Expand Up @@ -275,9 +278,9 @@ def train_epoch_end(self, training_step_outputs):
self.metrics.setdefault(f'{metric}/train', []).append(avg_metric)
self.log(f'{metric}/train', avg_metric, prog_bar=True)

def validation_epoch_end(self, validation_step_outputs):
for metric in validation_step_outputs[0].keys():
avg_metric = self.aggregate_metric(validation_step_outputs, metric)
def on_validation_epoch_end(self):
for metric in self.validation_step_outputs[0].keys():
avg_metric = self.aggregate_metric(self.validation_step_outputs, metric)
self.metrics.setdefault(f'{metric}/val', []).append(avg_metric)
self.log(f'{metric}/val', avg_metric, prog_bar=True)

Expand All @@ -293,6 +296,8 @@ def validation_epoch_end(self, validation_step_outputs):
for metric, value in best_metrics.items():
self.log(f'best_{metric}', value, prog_bar=True, batch_size=self.batch_size)

self.validation_step_outputs.clear()

def test_epoch_end(self, test_step_outputs):
for metric in test_step_outputs[0].keys():
avg_metric = self.aggregate_metric(test_step_outputs, metric)
Expand Down
4 changes: 2 additions & 2 deletions tests/simulation/test_lammps.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
def test_lammps_runner(cif_name, cif_dir, tmpdir):
# Make a LAMMPS simulator that reads and writes to a
lmprunner = LAMMPSRunner(
lammps_command=["lmp"],
lammps_command=["lmp_serial"],
lmp_sims_root_path=tmpdir / "lmp_sims",
lammps_environ={'OMP_NUM_THREADS': '1'}
)
Expand All @@ -28,7 +28,7 @@ def test_lammps_runner(cif_name, cif_dir, tmpdir):

# Make sure that it runs
ret = lmprunner.invoke_lammps(lmp_path)
assert ret.returncode == 0
assert ret.returncode == 0, Path(lmp_path).joinpath('stdout.lmp').read_text()

# Test the full pipeline, forcing deletion on the end
lmprunner.delete_finished = True
Expand Down

0 comments on commit f0ace80

Please sign in to comment.