Skip to content

Commit

Permalink
Added custom MPI based Alltoallw implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas committed Jan 4, 2024
1 parent 6b47e36 commit 21805de
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 22 deletions.
108 changes: 87 additions & 21 deletions mpi4py_fft/pencil.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def __init__(self,
comm, shape, dtype,
subshapeA, axisA,
subshapeB, axisB):

self.comm = comm
self.shape = tuple(shape)
self.dtype = dtype = np.dtype(dtype)
Expand Down Expand Up @@ -215,6 +216,62 @@ def destroy(self):
datatype.Free()


class CustomMPITransfer(Transfer):

def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB):
"""
Redistribute arrayA to arrayB.
"""
if type(arrayA) == np.ndarray:
xp = np
def synchronize_stream():
pass
else:
import cupy as xp
synchronize_stream = xp.cuda.get_current_stream().synchronize

rank, size, comm = self.comm.rank, self.comm.size, self.comm

for i in range(size):
send_to = (rank + i) % size
recv_from = (rank -i + size) % size

sliceA, shapeA = self.get_slice_and_shape(subtypesA[send_to])
sliceB, shapeB = self.get_slice_and_shape(subtypesB[recv_from])

if send_to == rank:
arrayB[sliceB][:] = arrayA[sliceA][:]
else:
# send asynchronously
sendbuff = xp.empty(shapeA, dtype=self.dtype)
sendbuff[:] = arrayA[sliceA][:]
synchronize_stream()
req = comm.Isend(sendbuff, dest=send_to)

# receive
recvbuff = xp.empty(shapeB, dtype=self.dtype)
comm.Recv(recvbuff, source=recv_from)
synchronize_stream()
arrayB[sliceB][:] = recvbuff[:]

# finish send and clean up
req.wait()
del sendbuff
del recvbuff

@staticmethod
def get_slice_and_shape(subtype):
"""
Extract from the subtype object generated for MPI what shape the buffer
should have and what part of the array we want to send / receive.
"""
decoded = subtype.decode()
subsizes = decoded[2]['subsizes']
starts = decoded[2]['starts']
return tuple(slice(starts[i], starts[i] + subsizes[i]) for i in range(len(starts))), subsizes



class NCCLTransfer(Transfer):
"""
Transfer class which uses NCCL for `Alltoallw` operations. The NCCL
Expand Down Expand Up @@ -245,41 +302,45 @@ def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB):
iscomplex = cp.iscomplexobj(arrayA)
NCCL_dtype, real_dtype = self.get_nccl_and_real_dtypes(arrayA)

send_stream = cp.cuda.Stream(non_blocking=False)
recv_stream = cp.cuda.Stream(non_blocking=False)

def send(array, subtype, send_to, iscomplex, stream):
local_slice, shape = self.get_slice_and_shape(subtype)
buff = self.get_buffer(shape, iscomplex, real_dtype)
buff = self.get_buffer(shape, iscomplex, real_dtype, stream)
self.fill_buffer(buff, array, local_slice, iscomplex)
comm.send(buff.data.ptr, buff.size, NCCL_dtype, send_to, stream.ptr)

for i in range(size):
send_to = (rank + i) % size
recv_from = (rank -i + size) % size
events = []
streams = [cp.cuda.Stream(null=False) for _ in range(size)]
for i, stream in zip(range(size), streams):
with stream:

if send_to > rank:
with send_stream:
send(arrayA, subtypesA[send_to], send_to, iscomplex, send_stream)
send_to = (rank + i) % size
recv_from = (rank -i + size) % size

if send_to > rank:
send(arrayA, subtypesA[send_to], send_to, iscomplex, stream)

with recv_stream:
local_slice, shape = self.get_slice_and_shape(subtypesB[recv_from])
buff = self.get_buffer(shape, iscomplex, real_dtype)
buff = self.get_buffer(shape, iscomplex, real_dtype, stream)

if recv_from == rank:
send_slice, _ = self.get_slice_and_shape(subtypesA[send_to])
self.fill_buffer(buff, arrayA, send_slice, iscomplex)
else:
comm.recv(buff.data.ptr, buff.size, NCCL_dtype, recv_from, recv_stream.ptr)
comm.recv(buff.data.ptr, buff.size, NCCL_dtype, recv_from, stream.ptr)

self.unpack_buffer(buff, arrayB, local_slice, iscomplex)

if send_to < rank:
with send_stream:
send(arrayA, subtypesA[send_to], send_to, iscomplex, send_stream)
if send_to < rank:
send(arrayA, subtypesA[send_to], send_to, iscomplex, stream)

events += [stream.record()]

send_stream.synchronize()
recv_stream.synchronize()
null_stream = cp.cuda.Stream(null=True)
null_stream.use()
for event in events:
null_stream.wait_event(event)

cp.cuda.Device(0).synchronize()

@staticmethod
def get_slice_and_shape(subtype):
Expand Down Expand Up @@ -315,16 +376,17 @@ def get_nccl_and_real_dtypes(array):
return nccl_dtypes[array.dtype], real_dtypes[array.dtype]

@staticmethod
def get_buffer(shape, iscomplex, real_dtype):
def get_buffer(shape, iscomplex, real_dtype, stream):
"""
Get a buffer for communication. If complex numbers are used, we send
two real values instead.
"""
import cupy as cp

if iscomplex:
return cp.empty(shape=[2,] + shape, dtype=real_dtype)
return cp.ndarray(shape=[2,] + shape, dtype=real_dtype)
else:
return cp.empty(shape=shape, dtype=real_dtype)
return cp.ndarray(shape=shape, dtype=real_dtype)

@staticmethod
def fill_buffer(buff, array, local_slice, iscomplex):
Expand Down Expand Up @@ -501,6 +563,10 @@ def transfer(self, pencil, dtype):
transfer_class = Transfer
elif self.backend == 'NCCL':
transfer_class = NCCLTransfer
elif self.backend == 'CUDAMemCpy':
transfer_class = CUDAMemCpy
elif self.backend == 'customMPI':
transfer_class = CustomMPITransfer
else:
raise NotImplementedError(f'Don\'t have a transfer class for backend \"{self.backend}\"!')

Expand Down
4 changes: 3 additions & 1 deletion tests/test_pencil.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,19 @@ def test_pencil():
sizes = (7, 8, 9)
types = 'fdFD' #'hilfdgFDG'

backends = ['MPI']
backends = ['MPI', 'customMPI']

xp = {
'MPI': np,
'customMPI': np,
}

try:
import cupy as cp
from cupy.cuda import nccl
backends += ['NCCL']
xp['NCCL'] = cp
cp.cuda.set_allocator(cp.cuda.MemoryAsyncPool().malloc)
except ImportError:
pass

Expand Down
81 changes: 81 additions & 0 deletions tests/test_transfer_classes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from mpi4py import MPI
from mpi4py_fft.pencil import Transfer, CustomMPITransfer, Pencil, Subcomm
import numpy as np


def get_args(comm, shape, dtype):
subcomm = Subcomm(comm=comm, dims=None)
pencilA = Pencil(subcomm, shape, 0)
pencilB = Pencil(subcomm, shape, 1)

kwargs = {
'comm': comm,
'shape': shape,
'subshapeA': pencilA.subshape,
'subshapeB': pencilB.subshape,
'axisA': 0,
'axisB': 1,
'dtype': dtype,
}
return kwargs


def get_arrays(kwargs):
arrayA = np.zeros(shape=kwargs['subshapeA'], dtype=kwargs['dtype'])
arrayB = np.zeros(shape=kwargs['subshapeB'], dtype=kwargs['dtype'])

arrayA[:] = np.random.random(arrayA.shape).astype(arrayA.dtype)
return arrayA, arrayB


def single_test_all_to_allw(transfer_class, shape, dtype, comm=None):
comm = comm if comm else MPI.COMM_WORLD
kwargs = get_args(comm, shape, dtype)
arrayA, arrayB = get_arrays(kwargs)
arrayB_ref = arrayB.copy()

transfer = transfer_class(**kwargs)
reference_transfer = Transfer(**kwargs)

transfer.Alltoallw(arrayA, transfer._subtypesA, arrayB, transfer._subtypesB)
reference_transfer.Alltoallw(arrayA, transfer._subtypesA, arrayB_ref, transfer._subtypesB)
assert np.allclose(arrayB, arrayB_ref), f'Did not get the same result from `alltoallw` with {transfer_class.__name__} transfer class as MPI implementation on rank {comm.rank}!'

comm.Barrier()
if comm.rank == 0:
print(f'{transfer_class.__name__} passed alltoallw test with shape {shape} and dtype {dtype}')


def single_test_forward_backward(transfer_class, shape, dtype, comm=None):
comm = comm if comm else MPI.COMM_WORLD
kwargs = get_args(comm, shape, dtype)
arrayA, arrayB = get_arrays(kwargs)
arrayA_ref = arrayA.copy()

transfer = transfer_class(**kwargs)

transfer.forward(arrayA, arrayB)
transfer.backward(arrayB, arrayA)
assert np.allclose(arrayA, arrayA_ref), f'Did not get the same result when transferring back and forth with {transfer_class.__name__} transfer class on rank {comm.rank}!'

comm.Barrier()
if comm.rank == 0:
print(f'{transfer_class.__name__} passed forward/backward test with shape {shape} and dtype {dtype}')


def test_transfer_class():
dims = (2, 3)
sizes = (7, 8, 9, 128)
dtypes = 'fFdD'
transfer_class = CustomMPITransfer

shapes = [[size] * dim for size in sizes for dim in dims] + [[32, 256, 129]]

for shape in shapes:
for dtype in dtypes:
single_test_all_to_allw(transfer_class, shape, dtype)
single_test_forward_backward(transfer_class, shape, dtype)


if __name__ == '__main__':
test_transfer_class()

0 comments on commit 21805de

Please sign in to comment.