From 21805dea9a4cad8ca4d66b2d4367699a4a3a8b12 Mon Sep 17 00:00:00 2001 From: Thomas Date: Thu, 4 Jan 2024 13:12:57 +0100 Subject: [PATCH] Added custom MPI based Alltoallw implementation --- mpi4py_fft/pencil.py | 108 ++++++++++++++++++++++++++------- tests/test_pencil.py | 4 +- tests/test_transfer_classes.py | 81 +++++++++++++++++++++++++ 3 files changed, 171 insertions(+), 22 deletions(-) create mode 100644 tests/test_transfer_classes.py diff --git a/mpi4py_fft/pencil.py b/mpi4py_fft/pencil.py index f97b28b..52a2b7d 100644 --- a/mpi4py_fft/pencil.py +++ b/mpi4py_fft/pencil.py @@ -155,6 +155,7 @@ def __init__(self, comm, shape, dtype, subshapeA, axisA, subshapeB, axisB): + self.comm = comm self.shape = tuple(shape) self.dtype = dtype = np.dtype(dtype) @@ -215,6 +216,62 @@ def destroy(self): datatype.Free() +class CustomMPITransfer(Transfer): + + def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB): + """ + Redistribute arrayA to arrayB. + """ + if type(arrayA) == np.ndarray: + xp = np + def synchronize_stream(): + pass + else: + import cupy as xp + synchronize_stream = xp.cuda.get_current_stream().synchronize + + rank, size, comm = self.comm.rank, self.comm.size, self.comm + + for i in range(size): + send_to = (rank + i) % size + recv_from = (rank -i + size) % size + + sliceA, shapeA = self.get_slice_and_shape(subtypesA[send_to]) + sliceB, shapeB = self.get_slice_and_shape(subtypesB[recv_from]) + + if send_to == rank: + arrayB[sliceB][:] = arrayA[sliceA][:] + else: + # send asynchronously + sendbuff = xp.empty(shapeA, dtype=self.dtype) + sendbuff[:] = arrayA[sliceA][:] + synchronize_stream() + req = comm.Isend(sendbuff, dest=send_to) + + # receive + recvbuff = xp.empty(shapeB, dtype=self.dtype) + comm.Recv(recvbuff, source=recv_from) + synchronize_stream() + arrayB[sliceB][:] = recvbuff[:] + + # finish send and clean up + req.wait() + del sendbuff + del recvbuff + + @staticmethod + def get_slice_and_shape(subtype): + """ + Extract from the subtype object generated for MPI what shape the buffer + should have and what part of the array we want to send / receive. + """ + decoded = subtype.decode() + subsizes = decoded[2]['subsizes'] + starts = decoded[2]['starts'] + return tuple(slice(starts[i], starts[i] + subsizes[i]) for i in range(len(starts))), subsizes + + + class NCCLTransfer(Transfer): """ Transfer class which uses NCCL for `Alltoallw` operations. The NCCL @@ -245,41 +302,45 @@ def Alltoallw(self, arrayA, subtypesA, arrayB, subtypesB): iscomplex = cp.iscomplexobj(arrayA) NCCL_dtype, real_dtype = self.get_nccl_and_real_dtypes(arrayA) - send_stream = cp.cuda.Stream(non_blocking=False) - recv_stream = cp.cuda.Stream(non_blocking=False) - def send(array, subtype, send_to, iscomplex, stream): local_slice, shape = self.get_slice_and_shape(subtype) - buff = self.get_buffer(shape, iscomplex, real_dtype) + buff = self.get_buffer(shape, iscomplex, real_dtype, stream) self.fill_buffer(buff, array, local_slice, iscomplex) comm.send(buff.data.ptr, buff.size, NCCL_dtype, send_to, stream.ptr) - for i in range(size): - send_to = (rank + i) % size - recv_from = (rank -i + size) % size + events = [] + streams = [cp.cuda.Stream(null=False) for _ in range(size)] + for i, stream in zip(range(size), streams): + with stream: - if send_to > rank: - with send_stream: - send(arrayA, subtypesA[send_to], send_to, iscomplex, send_stream) + send_to = (rank + i) % size + recv_from = (rank -i + size) % size + + if send_to > rank: + send(arrayA, subtypesA[send_to], send_to, iscomplex, stream) - with recv_stream: local_slice, shape = self.get_slice_and_shape(subtypesB[recv_from]) - buff = self.get_buffer(shape, iscomplex, real_dtype) + buff = self.get_buffer(shape, iscomplex, real_dtype, stream) if recv_from == rank: send_slice, _ = self.get_slice_and_shape(subtypesA[send_to]) self.fill_buffer(buff, arrayA, send_slice, iscomplex) else: - comm.recv(buff.data.ptr, buff.size, NCCL_dtype, recv_from, recv_stream.ptr) + comm.recv(buff.data.ptr, buff.size, NCCL_dtype, recv_from, stream.ptr) self.unpack_buffer(buff, arrayB, local_slice, iscomplex) - if send_to < rank: - with send_stream: - send(arrayA, subtypesA[send_to], send_to, iscomplex, send_stream) + if send_to < rank: + send(arrayA, subtypesA[send_to], send_to, iscomplex, stream) + + events += [stream.record()] - send_stream.synchronize() - recv_stream.synchronize() + null_stream = cp.cuda.Stream(null=True) + null_stream.use() + for event in events: + null_stream.wait_event(event) + + cp.cuda.Device(0).synchronize() @staticmethod def get_slice_and_shape(subtype): @@ -315,16 +376,17 @@ def get_nccl_and_real_dtypes(array): return nccl_dtypes[array.dtype], real_dtypes[array.dtype] @staticmethod - def get_buffer(shape, iscomplex, real_dtype): + def get_buffer(shape, iscomplex, real_dtype, stream): """ Get a buffer for communication. If complex numbers are used, we send two real values instead. """ import cupy as cp + if iscomplex: - return cp.empty(shape=[2,] + shape, dtype=real_dtype) + return cp.ndarray(shape=[2,] + shape, dtype=real_dtype) else: - return cp.empty(shape=shape, dtype=real_dtype) + return cp.ndarray(shape=shape, dtype=real_dtype) @staticmethod def fill_buffer(buff, array, local_slice, iscomplex): @@ -501,6 +563,10 @@ def transfer(self, pencil, dtype): transfer_class = Transfer elif self.backend == 'NCCL': transfer_class = NCCLTransfer + elif self.backend == 'CUDAMemCpy': + transfer_class = CUDAMemCpy + elif self.backend == 'customMPI': + transfer_class = CustomMPITransfer else: raise NotImplementedError(f'Don\'t have a transfer class for backend \"{self.backend}\"!') diff --git a/tests/test_pencil.py b/tests/test_pencil.py index c921706..1675e0f 100644 --- a/tests/test_pencil.py +++ b/tests/test_pencil.py @@ -11,10 +11,11 @@ def test_pencil(): sizes = (7, 8, 9) types = 'fdFD' #'hilfdgFDG' - backends = ['MPI'] + backends = ['MPI', 'customMPI'] xp = { 'MPI': np, + 'customMPI': np, } try: @@ -22,6 +23,7 @@ def test_pencil(): from cupy.cuda import nccl backends += ['NCCL'] xp['NCCL'] = cp + cp.cuda.set_allocator(cp.cuda.MemoryAsyncPool().malloc) except ImportError: pass diff --git a/tests/test_transfer_classes.py b/tests/test_transfer_classes.py new file mode 100644 index 0000000..6534807 --- /dev/null +++ b/tests/test_transfer_classes.py @@ -0,0 +1,81 @@ +from mpi4py import MPI +from mpi4py_fft.pencil import Transfer, CustomMPITransfer, Pencil, Subcomm +import numpy as np + + +def get_args(comm, shape, dtype): + subcomm = Subcomm(comm=comm, dims=None) + pencilA = Pencil(subcomm, shape, 0) + pencilB = Pencil(subcomm, shape, 1) + + kwargs = { + 'comm': comm, + 'shape': shape, + 'subshapeA': pencilA.subshape, + 'subshapeB': pencilB.subshape, + 'axisA': 0, + 'axisB': 1, + 'dtype': dtype, + } + return kwargs + + +def get_arrays(kwargs): + arrayA = np.zeros(shape=kwargs['subshapeA'], dtype=kwargs['dtype']) + arrayB = np.zeros(shape=kwargs['subshapeB'], dtype=kwargs['dtype']) + + arrayA[:] = np.random.random(arrayA.shape).astype(arrayA.dtype) + return arrayA, arrayB + + +def single_test_all_to_allw(transfer_class, shape, dtype, comm=None): + comm = comm if comm else MPI.COMM_WORLD + kwargs = get_args(comm, shape, dtype) + arrayA, arrayB = get_arrays(kwargs) + arrayB_ref = arrayB.copy() + + transfer = transfer_class(**kwargs) + reference_transfer = Transfer(**kwargs) + + transfer.Alltoallw(arrayA, transfer._subtypesA, arrayB, transfer._subtypesB) + reference_transfer.Alltoallw(arrayA, transfer._subtypesA, arrayB_ref, transfer._subtypesB) + assert np.allclose(arrayB, arrayB_ref), f'Did not get the same result from `alltoallw` with {transfer_class.__name__} transfer class as MPI implementation on rank {comm.rank}!' + + comm.Barrier() + if comm.rank == 0: + print(f'{transfer_class.__name__} passed alltoallw test with shape {shape} and dtype {dtype}') + + +def single_test_forward_backward(transfer_class, shape, dtype, comm=None): + comm = comm if comm else MPI.COMM_WORLD + kwargs = get_args(comm, shape, dtype) + arrayA, arrayB = get_arrays(kwargs) + arrayA_ref = arrayA.copy() + + transfer = transfer_class(**kwargs) + + transfer.forward(arrayA, arrayB) + transfer.backward(arrayB, arrayA) + assert np.allclose(arrayA, arrayA_ref), f'Did not get the same result when transferring back and forth with {transfer_class.__name__} transfer class on rank {comm.rank}!' + + comm.Barrier() + if comm.rank == 0: + print(f'{transfer_class.__name__} passed forward/backward test with shape {shape} and dtype {dtype}') + + +def test_transfer_class(): + dims = (2, 3) + sizes = (7, 8, 9, 128) + dtypes = 'fFdD' + transfer_class = CustomMPITransfer + + shapes = [[size] * dim for size in sizes for dim in dims] + [[32, 256, 129]] + + for shape in shapes: + for dtype in dtypes: + single_test_all_to_allw(transfer_class, shape, dtype) + single_test_forward_backward(transfer_class, shape, dtype) + + +if __name__ == '__main__': + test_transfer_class()