diff --git a/mpi4py_fft/mpifft.py b/mpi4py_fft/mpifft.py index 4abe5d8..509a7c4 100644 --- a/mpi4py_fft/mpifft.py +++ b/mpi4py_fft/mpifft.py @@ -128,6 +128,11 @@ class PFFT(object): fftn/ifftn for complex input arrays. Real-to-real transforms can be configured using this dictionary and real-to-real transforms from the :mod:`.fftw.xfftn` module. See Examples. + comm_backend : str, optional + Choose backend for communication. When using GPU based serial backends, + the "NCCL" backend can be be used in `Alltoallw` operations to speedup + GPU to GPU transfer. Keep in mind that this is used alongside MPI and + assumes one GPU per MPI rankMPI is used. Other Parameters ---------------- @@ -201,7 +206,7 @@ class PFFT(object): """ def __init__(self, comm, shape=None, axes=None, dtype=float, grid=None, padding=False, collapse=False, backend='fftw', - transforms=None, darray=None, **kw): + transforms=None, darray=None, comm_backend='MPI', **kw): # pylint: disable=too-many-locals # pylint: disable=too-many-branches # pylint: disable=too-many-statements @@ -311,6 +316,7 @@ def __init__(self, comm, shape=None, axes=None, dtype=float, grid=None, self.pencil = [None, None] axes = self.axes[-1] + Pencil.backend = comm_backend pencil = Pencil(self.subcomm, shape, axes[-1]) xfftn = FFT(pencil.subshape, axes, dtype, padding, backend=backend, transforms=transforms, **kw) diff --git a/mpi4py_fft/pencil.py b/mpi4py_fft/pencil.py index 6671d8c..3a8a7b2 100644 --- a/mpi4py_fft/pencil.py +++ b/mpi4py_fft/pencil.py @@ -179,8 +179,15 @@ def forward(self, arrayA, arrayB): assert self.subshapeB == arrayB.shape assert self.dtype == arrayA.dtype assert self.dtype == arrayB.dtype - self.comm.Alltoallw([arrayA, self._counts_displs, self._subtypesA], - [arrayB, self._counts_displs, self._subtypesB]) + self.Alltoallw(arrayA, self._subtypesA, arrayB, self._subtypesB) + + def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB): + """ + Redistribute arrayA to arrayB + """ + self.comm.Alltoallw([arrayA, self._counts_displs, subtypesA], + [arrayB, self._counts_displs, subtypesB]) + def backward(self, arrayB, arrayA): """Global redistribution from arrayB to arrayA @@ -197,8 +204,7 @@ def backward(self, arrayB, arrayA): assert self.subshapeB == arrayB.shape assert self.dtype == arrayA.dtype assert self.dtype == arrayB.dtype - self.comm.Alltoallw([arrayB, self._counts_displs, self._subtypesB], - [arrayA, self._counts_displs, self._subtypesA]) + self.Alltoallw(arrayB, self._subtypesB, arrayA, self._subtypesA) def destroy(self): for datatype in self._subtypesA: @@ -209,6 +215,134 @@ def destroy(self): datatype.Free() +class NCCLTransfer(Transfer): + """ + Transfer class which uses NCCL for `Alltoallw` operations. The NCCL + communicator will share rank and size attributes with the MPI communicator. + In particular, this assumes one GPU per MPI rank. + """ + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + from cupy.cuda import nccl + self.comm_nccl = nccl.NcclCommunicator(self.comm.size, self.comm.bcast(nccl.get_unique_id(), root=0), self.comm.rank) + + def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB): + """ + Redistribute arrayA to arrayB. + + As NCCL does not have all to all, we replicate it by a bunch of individual send and receives. + As NCCL also does not have complex datatypes, we have to send real and imaginary parts separately. + """ + import cupy as cp + from cupy.cuda import nccl + assert type(arrayA) == cp.ndarray + assert type(arrayB) == cp.ndarray + assert arrayA.dtype == arrayB.dtype + assert self.comm.rank == self.comm_nccl.rank_id(), f'The structure of the communicator has changed unexpectedly' + + rank, size, comm = self.comm.rank, self.comm.size, self.comm_nccl + stream = cp.cuda.Stream.null.ptr + iscomplex = cp.iscomplexobj(arrayA) + NCCL_dtype, real_dtype = self.get_nccl_and_real_dtypes(arrayA) + + for i in range(size): + for j in range(size): + + if rank == i: + local_slice, shape = self.get_slice_and_shape(subtypesB[j]) + buff = self.get_buffer(shape, iscomplex, real_dtype) + + if i == j: + send_slice, _ = self.get_slice_and_shape(subtypesA[i]) + self.fill_buffer(buff, arrayA, send_slice, iscomplex) + else: + comm.recv(buff.data.ptr, buff.size, NCCL_dtype, j, stream) + + self.unpack_buffer(buff, arrayB, local_slice, iscomplex) + + elif rank == j: + local_slice, shape = self.get_slice_and_shape(subtypesA[i]) + buff = self.get_buffer(shape, iscomplex, real_dtype) + self.fill_buffer(buff, arrayA, local_slice, iscomplex) + + comm.send(buff.data.ptr, buff.size, NCCL_dtype, i, stream) + + + @staticmethod + def get_slice_and_shape(subtype): + """ + Extract from the subtype object generated for MPI what shape the buffer + should have and what part of the array we want to send / receive. + """ + decoded = subtype.decode() + subsizes = decoded[2]['subsizes'] + starts = decoded[2]['starts'] + return tuple(slice(starts[i], starts[i] + subsizes[i]) for i in range(len(starts))), subsizes + + @staticmethod + def get_nccl_and_real_dtypes(array): + """ + Translate the datatype of the array to a NCCL type for sending with NCCL. + As NCCL does not support complex types, we have to translate them to two + real values. + """ + from cupy.cuda import nccl + nccl_dtypes = { + np.dtype('float32'): nccl.NCCL_FLOAT32, + np.dtype('float64'): nccl.NCCL_FLOAT64, + np.dtype('complex64'): nccl.NCCL_FLOAT32, + np.dtype('complex128'): nccl.NCCL_FLOAT64, + } + real_dtypes = { + np.dtype('float32'): np.dtype('float32'), + np.dtype('float64'): np.dtype('float64'), + np.dtype('complex64'): np.dtype('float32'), + np.dtype('complex128'): np.dtype('float64'), + } + return nccl_dtypes[array.dtype], real_dtypes[array.dtype] + + @staticmethod + def get_buffer(shape, iscomplex, real_dtype): + """ + Get a buffer for communication. If complex numbers are used, we send + two real values instead. + """ + import cupy as cp + if iscomplex: + return cp.empty(shape=[2,] + shape, dtype=real_dtype) + else: + return cp.empty(shape=shape, dtype=real_dtype) + + @staticmethod + def fill_buffer(buff, array, local_slice, iscomplex): + """ + Fill buffer for communication. If complex numbers are used, we send + two real values instead. + """ + if iscomplex: + buff[0][:] = array[local_slice].real + buff[1][:] = array[local_slice].imag + else: + buff[:] = array[local_slice][:] + + @staticmethod + def unpack_buffer(buff, array, local_slice, iscomplex): + """ + Unpack buffer for communication. If complex numbers are used, we get + two real values instead. + """ + if iscomplex: + array[local_slice].real = buff[0][:] + array[local_slice].imag = buff[1][:] + else: + array[local_slice][:] = buff[:] + + def destroy(self): + super().destroy() + self.comm_nccl.destroy() + + class Pencil(object): """Class to represent a distributed array (pencil) @@ -274,6 +408,8 @@ class Pencil(object): aligned in axis 1. """ + backend = 'MPI' + def __init__(self, subcomm, shape, axis=-1): assert len(shape) >= 2 assert min(shape) >= 1 @@ -349,6 +485,13 @@ def transfer(self, pencil, dtype): shape = list(penA.subshape) shape[axis] = penA.shape[axis] - return Transfer(comm, shape, dtype, + if self.backend == 'MPI': + transfer_class = Transfer + elif self.backend == 'NCCL': + transfer_class = NCCLTransfer + else: + raise NotImplementedError(f'Don\'t have a transfer class for backend \"{self.backend}\"!') + + return transfer_class(comm, shape, dtype, penA.subshape, penA.axis, penB.subshape, penB.axis) diff --git a/tests/test_pencil.py b/tests/test_pencil.py index 9a74c40..c921706 100644 --- a/tests/test_pencil.py +++ b/tests/test_pencil.py @@ -11,53 +11,73 @@ def test_pencil(): sizes = (7, 8, 9) types = 'fdFD' #'hilfdgFDG' + backends = ['MPI'] + + xp = { + 'MPI': np, + } + + try: + import cupy as cp + from cupy.cuda import nccl + backends += ['NCCL'] + xp['NCCL'] = cp + except ImportError: + pass + for typecode in types: for dim in dims: for shape in product(*([sizes]*dim)): - axes = list(range(dim)) - for axis1, axis2, axis3 in product(axes, axes, axes): + for backend in backends: + + Pencil.backend = backend + axes = list(range(dim)) + + for axis1, axis2, axis3 in product(axes, axes, axes): + + if axis1 == axis2: continue + if axis2 == axis3: continue + axis3 -= len(shape) + #if comm.rank == 0: + # print(shape, axis1, axis2, axis3, typecode) - if axis1 == axis2: continue - if axis2 == axis3: continue - axis3 -= len(shape) - #if comm.rank == 0: - # print(shape, axis1, axis2, axis3, typecode) + for pdim in [None] + list(range(1, dim-1)): - for pdim in [None] + list(range(1, dim-1)): + subcomm = Subcomm(comm, pdim) + pencil0 = Pencil(subcomm, shape) - subcomm = Subcomm(comm, pdim) - pencil0 = Pencil(subcomm, shape) + pencilA = pencil0.pencil(axis1) + pencilB = pencilA.pencil(axis2) + pencilC = pencilB.pencil(axis3) - pencilA = pencil0.pencil(axis1) - pencilB = pencilA.pencil(axis2) - pencilC = pencilB.pencil(axis3) + assert pencilC.backend == backend - trans1 = Pencil.transfer(pencilA, pencilB, typecode) - trans2 = Pencil.transfer(pencilB, pencilC, typecode) + trans1 = Pencil.transfer(pencilA, pencilB, typecode) + trans2 = Pencil.transfer(pencilB, pencilC, typecode) - X = np.random.random(pencilA.subshape).astype(typecode) + X = xp[backend].random.random(pencilA.subshape).astype(typecode) - A = np.empty(pencilA.subshape, dtype=typecode) - B = np.empty(pencilB.subshape, dtype=typecode) - C = np.empty(pencilC.subshape, dtype=typecode) + A = xp[backend].empty(pencilA.subshape, dtype=typecode) + B = xp[backend].empty(pencilB.subshape, dtype=typecode) + C = xp[backend].empty(pencilC.subshape, dtype=typecode) - A[...] = X + A[...] = X - B.fill(0) - trans1.forward(A, B) - C.fill(0) - trans2.forward(B, C) + B.fill(0) + trans1.forward(A, B) + C.fill(0) + trans2.forward(B, C) - B.fill(0) - trans2.backward(C, B) - A.fill(0) - trans1.backward(B, A) + B.fill(0) + trans2.backward(C, B) + A.fill(0) + trans1.backward(B, A) - assert np.allclose(A, X) + assert xp[backend].allclose(A, X) - trans1.destroy() - trans2.destroy() - subcomm.destroy() + trans1.destroy() + trans2.destroy() + subcomm.destroy() if __name__ == '__main__':