Skip to content

Commit

Permalink
Implemented NCCL for Alltoallw operations
Browse files Browse the repository at this point in the history
  • Loading branch information
Thomas committed Dec 18, 2023
1 parent 202f30c commit e5d4b56
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 38 deletions.
8 changes: 7 additions & 1 deletion mpi4py_fft/mpifft.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,11 @@ class PFFT(object):
fftn/ifftn for complex input arrays. Real-to-real transforms can be
configured using this dictionary and real-to-real transforms from the
:mod:`.fftw.xfftn` module. See Examples.
comm_backend : str, optional
Choose backend for communication. When using GPU based serial backends,
the "NCCL" backend can be be used in `Alltoallw` operations to speedup
GPU to GPU transfer. Keep in mind that this is used alongside MPI and
assumes one GPU per MPI rankMPI is used.
Other Parameters
----------------
Expand Down Expand Up @@ -201,7 +206,7 @@ class PFFT(object):
"""
def __init__(self, comm, shape=None, axes=None, dtype=float, grid=None,
padding=False, collapse=False, backend='fftw',
transforms=None, darray=None, **kw):
transforms=None, darray=None, comm_backend='MPI', **kw):
# pylint: disable=too-many-locals
# pylint: disable=too-many-branches
# pylint: disable=too-many-statements
Expand Down Expand Up @@ -311,6 +316,7 @@ def __init__(self, comm, shape=None, axes=None, dtype=float, grid=None,
self.pencil = [None, None]

axes = self.axes[-1]
Pencil.backend = comm_backend
pencil = Pencil(self.subcomm, shape, axes[-1])
xfftn = FFT(pencil.subshape, axes, dtype, padding, backend=backend,
transforms=transforms, **kw)
Expand Down
153 changes: 148 additions & 5 deletions mpi4py_fft/pencil.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,15 @@ def forward(self, arrayA, arrayB):
assert self.subshapeB == arrayB.shape
assert self.dtype == arrayA.dtype
assert self.dtype == arrayB.dtype
self.comm.Alltoallw([arrayA, self._counts_displs, self._subtypesA],
[arrayB, self._counts_displs, self._subtypesB])
self.Alltoallw(arrayA, self._subtypesA, arrayB, self._subtypesB)

def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB):
"""
Redistribute arrayA to arrayB
"""
self.comm.Alltoallw([arrayA, self._counts_displs, subtypesA],
[arrayB, self._counts_displs, subtypesB])


def backward(self, arrayB, arrayA):
"""Global redistribution from arrayB to arrayA
Expand All @@ -197,8 +204,7 @@ def backward(self, arrayB, arrayA):
assert self.subshapeB == arrayB.shape
assert self.dtype == arrayA.dtype
assert self.dtype == arrayB.dtype
self.comm.Alltoallw([arrayB, self._counts_displs, self._subtypesB],
[arrayA, self._counts_displs, self._subtypesA])
self.Alltoallw(arrayB, self._subtypesB, arrayA, self._subtypesA)

def destroy(self):
for datatype in self._subtypesA:
Expand All @@ -209,6 +215,134 @@ def destroy(self):
datatype.Free()


class NCCLTransfer(Transfer):
"""
Transfer class which uses NCCL for `Alltoallw` operations. The NCCL
communicator will share rank and size attributes with the MPI communicator.
In particular, this assumes one GPU per MPI rank.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

from cupy.cuda import nccl
self.comm_nccl = nccl.NcclCommunicator(self.comm.size, self.comm.bcast(nccl.get_unique_id(), root=0), self.comm.rank)

def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB):
"""
Redistribute arrayA to arrayB.
As NCCL does not have all to all, we replicate it by a bunch of individual send and receives.
As NCCL also does not have complex datatypes, we have to send real and imaginary parts separately.
"""
import cupy as cp
from cupy.cuda import nccl
assert type(arrayA) == cp.ndarray
assert type(arrayB) == cp.ndarray
assert arrayA.dtype == arrayB.dtype
assert self.comm.rank == self.comm_nccl.rank_id(), f'The structure of the communicator has changed unexpectedly'

rank, size, comm = self.comm.rank, self.comm.size, self.comm_nccl
stream = cp.cuda.Stream.null.ptr
iscomplex = cp.iscomplexobj(arrayA)
NCCL_dtype, real_dtype = self.get_nccl_and_real_dtypes(arrayA)

for i in range(size):
for j in range(size):

if rank == i:
local_slice, shape = self.get_slice_and_shape(subtypesB[j])
buff = self.get_buffer(shape, iscomplex, real_dtype)

if i == j:
send_slice, _ = self.get_slice_and_shape(subtypesA[i])
self.fill_buffer(buff, arrayA, send_slice, iscomplex)
else:
comm.recv(buff.data.ptr, buff.size, NCCL_dtype, j, stream)

self.unpack_buffer(buff, arrayB, local_slice, iscomplex)

elif rank == j:
local_slice, shape = self.get_slice_and_shape(subtypesA[i])
buff = self.get_buffer(shape, iscomplex, real_dtype)
self.fill_buffer(buff, arrayA, local_slice, iscomplex)

comm.send(buff.data.ptr, buff.size, NCCL_dtype, i, stream)


@staticmethod
def get_slice_and_shape(subtype):
"""
Extract from the subtype object generated for MPI what shape the buffer
should have and what part of the array we want to send / receive.
"""
decoded = subtype.decode()
subsizes = decoded[2]['subsizes']
starts = decoded[2]['starts']
return tuple(slice(starts[i], starts[i] + subsizes[i]) for i in range(len(starts))), subsizes

@staticmethod
def get_nccl_and_real_dtypes(array):
"""
Translate the datatype of the array to a NCCL type for sending with NCCL.
As NCCL does not support complex types, we have to translate them to two
real values.
"""
from cupy.cuda import nccl
nccl_dtypes = {
np.dtype('float32'): nccl.NCCL_FLOAT32,
np.dtype('float64'): nccl.NCCL_FLOAT64,
np.dtype('complex64'): nccl.NCCL_FLOAT32,
np.dtype('complex128'): nccl.NCCL_FLOAT64,
}
real_dtypes = {
np.dtype('float32'): np.dtype('float32'),
np.dtype('float64'): np.dtype('float64'),
np.dtype('complex64'): np.dtype('float32'),
np.dtype('complex128'): np.dtype('float64'),
}
return nccl_dtypes[array.dtype], real_dtypes[array.dtype]

@staticmethod
def get_buffer(shape, iscomplex, real_dtype):
"""
Get a buffer for communication. If complex numbers are used, we send
two real values instead.
"""
import cupy as cp
if iscomplex:
return cp.empty(shape=[2,] + shape, dtype=real_dtype)
else:
return cp.empty(shape=shape, dtype=real_dtype)

@staticmethod
def fill_buffer(buff, array, local_slice, iscomplex):
"""
Fill buffer for communication. If complex numbers are used, we send
two real values instead.
"""
if iscomplex:
buff[0][:] = array[local_slice].real
buff[1][:] = array[local_slice].imag
else:
buff[:] = array[local_slice][:]

@staticmethod
def unpack_buffer(buff, array, local_slice, iscomplex):
"""
Unpack buffer for communication. If complex numbers are used, we get
two real values instead.
"""
if iscomplex:
array[local_slice].real = buff[0][:]
array[local_slice].imag = buff[1][:]
else:
array[local_slice][:] = buff[:]

def destroy(self):
super().destroy()
self.comm_nccl.destroy()


class Pencil(object):
"""Class to represent a distributed array (pencil)
Expand Down Expand Up @@ -274,6 +408,8 @@ class Pencil(object):
aligned in axis 1.
"""
backend = 'MPI'

def __init__(self, subcomm, shape, axis=-1):
assert len(shape) >= 2
assert min(shape) >= 1
Expand Down Expand Up @@ -349,6 +485,13 @@ def transfer(self, pencil, dtype):
shape = list(penA.subshape)
shape[axis] = penA.shape[axis]

return Transfer(comm, shape, dtype,
if self.backend == 'MPI':
transfer_class = Transfer
elif self.backend == 'NCCL':
transfer_class = NCCLTransfer
else:
raise NotImplementedError(f'Don\'t have a transfer class for backend \"{self.backend}\"!')

return transfer_class(comm, shape, dtype,
penA.subshape, penA.axis,
penB.subshape, penB.axis)
84 changes: 52 additions & 32 deletions tests/test_pencil.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,53 +11,73 @@ def test_pencil():
sizes = (7, 8, 9)
types = 'fdFD' #'hilfdgFDG'

backends = ['MPI']

xp = {
'MPI': np,
}

try:
import cupy as cp
from cupy.cuda import nccl
backends += ['NCCL']
xp['NCCL'] = cp
except ImportError:
pass

for typecode in types:
for dim in dims:
for shape in product(*([sizes]*dim)):
axes = list(range(dim))
for axis1, axis2, axis3 in product(axes, axes, axes):
for backend in backends:

Pencil.backend = backend
axes = list(range(dim))

for axis1, axis2, axis3 in product(axes, axes, axes):

if axis1 == axis2: continue
if axis2 == axis3: continue
axis3 -= len(shape)
#if comm.rank == 0:
# print(shape, axis1, axis2, axis3, typecode)

if axis1 == axis2: continue
if axis2 == axis3: continue
axis3 -= len(shape)
#if comm.rank == 0:
# print(shape, axis1, axis2, axis3, typecode)
for pdim in [None] + list(range(1, dim-1)):

for pdim in [None] + list(range(1, dim-1)):
subcomm = Subcomm(comm, pdim)
pencil0 = Pencil(subcomm, shape)

subcomm = Subcomm(comm, pdim)
pencil0 = Pencil(subcomm, shape)
pencilA = pencil0.pencil(axis1)
pencilB = pencilA.pencil(axis2)
pencilC = pencilB.pencil(axis3)

pencilA = pencil0.pencil(axis1)
pencilB = pencilA.pencil(axis2)
pencilC = pencilB.pencil(axis3)
assert pencilC.backend == backend

trans1 = Pencil.transfer(pencilA, pencilB, typecode)
trans2 = Pencil.transfer(pencilB, pencilC, typecode)
trans1 = Pencil.transfer(pencilA, pencilB, typecode)
trans2 = Pencil.transfer(pencilB, pencilC, typecode)

X = np.random.random(pencilA.subshape).astype(typecode)
X = xp[backend].random.random(pencilA.subshape).astype(typecode)

A = np.empty(pencilA.subshape, dtype=typecode)
B = np.empty(pencilB.subshape, dtype=typecode)
C = np.empty(pencilC.subshape, dtype=typecode)
A = xp[backend].empty(pencilA.subshape, dtype=typecode)
B = xp[backend].empty(pencilB.subshape, dtype=typecode)
C = xp[backend].empty(pencilC.subshape, dtype=typecode)

A[...] = X
A[...] = X

B.fill(0)
trans1.forward(A, B)
C.fill(0)
trans2.forward(B, C)
B.fill(0)
trans1.forward(A, B)
C.fill(0)
trans2.forward(B, C)

B.fill(0)
trans2.backward(C, B)
A.fill(0)
trans1.backward(B, A)
B.fill(0)
trans2.backward(C, B)
A.fill(0)
trans1.backward(B, A)

assert np.allclose(A, X)
assert xp[backend].allclose(A, X)

trans1.destroy()
trans2.destroy()
subcomm.destroy()
trans1.destroy()
trans2.destroy()
subcomm.destroy()


if __name__ == '__main__':
Expand Down

0 comments on commit e5d4b56

Please sign in to comment.