From a50137f3bc971fce51be83ad3280d529409aab0e Mon Sep 17 00:00:00 2001 From: Thomas Date: Fri, 8 Dec 2023 12:31:56 +0100 Subject: [PATCH] Implemented CuPy version of `DistArray` --- mpi4py_fft/distarray.py | 473 +++++++++++++++++++++++++++------------- 1 file changed, 316 insertions(+), 157 deletions(-) diff --git a/mpi4py_fft/distarray.py b/mpi4py_fft/distarray.py index 3a69e5c..e2275f1 100644 --- a/mpi4py_fft/distarray.py +++ b/mpi4py_fft/distarray.py @@ -5,106 +5,24 @@ from .pencil import Pencil, Subcomm from .io import HDF5File, NCFile, FileBase -comm = MPI.COMM_WORLD - -class DistArray(np.ndarray): - """Distributed Numpy array - - This Numpy array is part of a larger global array. Information about the - distribution is contained in the attributes. - - Parameters - ---------- - global_shape : sequence of ints - Shape of non-distributed global array - subcomm : None, :class:`.Subcomm` object or sequence of ints, optional - Describes how to distribute the array - val : Number or None, optional - Initialize array with this number if buffer is not given - dtype : np.dtype, optional - Type of array - buffer : Numpy array, optional - Array of correct shape. The buffer owns the memory that is used for - this array. - alignment : None or int, optional - Make sure array is aligned in this direction. Note that alignment does - not take rank into consideration. - rank : int, optional - Rank of tensor (number of free indices, a scalar is zero, vector one, - matrix two) - - - For more information, see `numpy.ndarray `_ - - Note - ---- - Tensors of rank higher than 0 are not distributed in the first ``rank`` - indices. For example, - - >>> from mpi4py_fft import DistArray - >>> a = DistArray((3, 8, 8, 8), rank=1) - >>> print(a.pencil.shape) - (8, 8, 8) +try: + import cupy as cp +except ImportError: + # TODO: move cupy stuff to seperate file as to avoid confusion! + import numpy as cp - The array ``a`` cannot be distributed in the first axis of length 3 since - rank is 1 and this first index represent the vector component. The ``pencil`` - attribute of ``a`` thus only considers the last three axes. +comm = MPI.COMM_WORLD - Also note that the ``alignment`` keyword does not take rank into - consideration. Setting alignment=2 for the array above means that the last - axis will be aligned, also when rank>0. +class DistArrayBase: """ - def __new__(cls, global_shape, subcomm=None, val=None, dtype=float, - buffer=None, strides=None, alignment=None, rank=0): - if len(global_shape[rank:]) < 2: # 1D case - obj = np.ndarray.__new__(cls, global_shape, dtype=dtype, buffer=buffer, strides=strides) - if buffer is None and isinstance(val, Number): - obj.fill(val) - obj._rank = rank - obj._p0 = None - return obj + Abstract class for distributed arrays in numpy or cupy implementation - if isinstance(subcomm, Subcomm): - pass - else: - if isinstance(subcomm, (tuple, list)): - assert len(subcomm) == len(global_shape[rank:]) - # Do nothing if already containing communicators. A tuple of subcommunicators is not necessarily a Subcomm - if not np.all([isinstance(s, MPI.Comm) for s in subcomm]): - subcomm = Subcomm(comm, subcomm) - else: - assert subcomm is None - subcomm = [0] * len(global_shape[rank:]) - if alignment is not None: - subcomm[alignment] = 1 - else: - subcomm[-1] = 1 - alignment = len(subcomm)-1 - subcomm = Subcomm(comm, subcomm) - sizes = [s.Get_size() for s in subcomm] - if alignment is not None: - assert isinstance(alignment, (int, np.integer)) - assert sizes[alignment] == 1 - else: - # Decide that alignment is the last axis with size 1 - alignment = np.flatnonzero(np.array(sizes) == 1)[-1] - p0 = Pencil(subcomm, global_shape[rank:], axis=alignment) - subshape = p0.subshape - if rank > 0: - subshape = global_shape[:rank] + subshape - obj = np.ndarray.__new__(cls, subshape, dtype=dtype, buffer=buffer) - if buffer is None and isinstance(val, Number): - obj.fill(val) - obj._p0 = p0 - obj._rank = rank - return obj + Attributes: + - xp: Numerical library, a.k.a. numpy or cupy as class attribute + """ - def __array_finalize__(self, obj): - if obj is None: - return - self._p0 = getattr(obj, '_p0', None) - self._rank = getattr(obj, '_rank', None) + xp = None @property def alignment(self): @@ -120,17 +38,17 @@ def alignment(self): @property def global_shape(self): """Return global shape of ``self``""" - return self.shape[:self.rank] + self._p0.shape + return self.shape[: self.rank] + self._p0.shape @property def substart(self): """Return starting indices of local ``self`` array""" - return (0,)*self.rank + self._p0.substart + return (0,) * self.rank + self._p0.substart @property def subcomm(self): """Return tuple of subcommunicators for all axes of ``self``""" - return (MPI.COMM_SELF,)*self.rank + self._p0.subcomm + return (MPI.COMM_SELF,) * self.rank + self._p0.subcomm @property def commsizes(self): @@ -152,32 +70,78 @@ def dimensions(self): """Return dimensions of array not including rank""" return len(self._p0.shape) + @property + def v(self): + """Return local ``self`` array as an ``ndarray`` object""" + return self.__array__() + + @staticmethod + def getSubcomm(subcomm, global_shape, rank, alignment): + if isinstance(subcomm, Subcomm): + pass + else: + if isinstance(subcomm, (tuple, list)): + assert len(subcomm) == len(global_shape[rank:]) + # Do nothing if already containing communicators. A tuple of subcommunicators is not necessarily a Subcomm + if not np.all([isinstance(s, MPI.Comm) for s in subcomm]): + subcomm = Subcomm(comm, subcomm) + else: + assert subcomm is None + subcomm = [0] * len(global_shape[rank:]) + if alignment is not None: + subcomm[alignment] = 1 + else: + subcomm[-1] = 1 + alignment = len(subcomm) - 1 + subcomm = Subcomm(comm, subcomm) + return subcomm + + @classmethod + def getPencil(cls, subcomm, rank, global_shape, alignment): + sizes = [s.Get_size() for s in subcomm] + if alignment is not None: + assert isinstance(alignment, (int, cls.xp.integer)) + assert sizes[alignment] == 1 + else: + # Decide that alignment is the last axis with size 1 + alignment = cls.xp.flatnonzero(cls.xp.array(sizes) == 1)[-1] + + p0 = Pencil(subcomm, global_shape[rank:], axis=alignment) + + if rank > 0: + subshape = global_shape[:rank] + subshape + else: + subshape = p0.subshape + + return p0, subshape + + def __array_finalize__(self, obj): + if obj is None: + return + self._p0 = getattr(obj, "_p0", None) + self._rank = getattr(obj, "_rank", None) + def __getitem__(self, i): # Return DistArray if the result is a component of a tensor # Otherwise return ndarray view if self.ndim == 1: - return np.ndarray.__getitem__(self, i) + return self.xp.ndarray.__getitem__(self, i) if isinstance(i, (Integral, slice)) and self.rank > 0: - v0 = np.ndarray.__getitem__(self, i) + v0 = self.xp.ndarray.__getitem__(self, i) v0._rank = self.rank - (self.ndim - v0.ndim) return v0 if isinstance(i, (Integral, slice)) and self.rank == 0: - return np.ndarray.__getitem__(self.v, i) + return self.xp.ndarray.__getitem__(self.v, i) assert isinstance(i, tuple) if len(i) <= self.rank: - v0 = np.ndarray.__getitem__(self, i) + v0 = self.xp.ndarray.__getitem__(self, i) v0._rank = self.rank - (self.ndim - v0.ndim) return v0 - return np.ndarray.__getitem__(self.v, i) - - @property - def v(self): - """ Return local ``self`` array as an ``ndarray`` object""" - return self.__array__() + return self.xp.ndarray.__getitem__(self.v, i) def get(self, gslice): """Return global slice of ``self`` @@ -215,14 +179,19 @@ def get(self, gslice): # global MPI. We create a global file with MPI, but then open it without # MPI and only on rank 0. import h5py - f = h5py.File('tmp.h5', 'w', driver="mpio", comm=comm) + + f = h5py.File("tmp.h5", "w", driver="mpio", comm=comm) s = self.local_slice() - sp = np.nonzero([isinstance(x, slice) for x in gslice])[0] - sf = tuple(np.take(s, sp)) - f.require_dataset('data', shape=tuple(np.take(self.global_shape, sp)), dtype=self.dtype) + sp = self.xp.nonzero([isinstance(x, slice) for x in gslice])[0] + sf = tuple(self.xp.take(s, sp)) + f.require_dataset( + "data", shape=tuple(self.xp.take(self.global_shape, sp)), dtype=self.dtype + ) gslice = list(gslice) # We are required to check if the indices in si are on this processor - si = np.nonzero([isinstance(x, int) and not z == slice(None) for x, z in zip(gslice, s)])[0] + si = self.xp.nonzero( + [isinstance(x, int) and not z == slice(None) for x, z in zip(gslice, s)] + )[0] on_this_proc = True for i in si: if gslice[i] >= s[i].start and gslice[i] < s[i].stop: @@ -234,10 +203,10 @@ def get(self, gslice): f.close() c = None if comm.Get_rank() == 0: - h = h5py.File('tmp.h5', 'r') - c = h['data'].__array__() + h = h5py.File("tmp.h5", "r") + c = h["data"].__array__() h.close() - os.remove('tmp.h5') + os.remove("tmp.h5") return c def local_slice(self): @@ -273,27 +242,11 @@ def local_slice(self): (slice(0, 16, None), slice(7, 14, None), slice(0, 6, None)) (slice(0, 16, None), slice(7, 14, None), slice(6, 12, None)) """ - v = [slice(start, start+shape) for start, shape in zip(self._p0.substart, - self._p0.subshape)] - return tuple([slice(0, s) for s in self.shape[:self.rank]] + v) - - def get_pencil_and_transfer(self, axis): - """Return pencil and transfer objects for alignment along ``axis`` - - Parameters - ---------- - axis : int - The new axis to align data with - - Returns - ------- - 2-tuple - 2-tuple where first item is a :class:`.Pencil` aligned in ``axis``. - Second item is a :class:`.Transfer` object for executing the - redistribution of data - """ - p1 = self._p0.pencil(axis) - return p1, self._p0.transfer(p1, self.dtype) + v = [ + slice(start, start + shape) + for start, shape in zip(self._p0.substart, self._p0.subshape) + ] + return tuple([slice(0, s) for s in self.shape[: self.rank]] + v) def redistribute(self, axis=None, out=None): """Global redistribution of local ``self`` array @@ -322,7 +275,7 @@ def redistribute(self, axis=None, out=None): # Check if self is already aligned along axis. In that case just switch # axis of pencil (both axes are undivided) and return if axis is not None: - if self.commsizes[self.rank+axis] == 1: + if self.commsizes[self.rank + axis] == 1: self.pencil.axis = axis return self @@ -343,11 +296,13 @@ def redistribute(self, axis=None, out=None): p1, transfer = self.get_pencil_and_transfer(axis) if out is None: - out = DistArray(self.global_shape, - subcomm=p1.subcomm, - dtype=self.dtype, - alignment=axis, - rank=self.rank) + out = type(self)( + self.global_shape, + subcomm=p1.subcomm, + dtype=self.dtype, + alignment=axis, + rank=self.rank, + ) if self.rank == 0: transfer.forward(self, out) @@ -362,8 +317,33 @@ def redistribute(self, axis=None, out=None): transfer.destroy() return out - def write(self, filename, name='darray', step=0, global_slice=None, - domain=None, as_scalar=False): + def get_pencil_and_transfer(self, axis): + """Return pencil and transfer objects for alignment along ``axis`` + + Parameters + ---------- + axis : int + The new axis to align data with + + Returns + ------- + 2-tuple + 2-tuple where first item is a :class:`.Pencil` aligned in ``axis``. + Second item is a :class:`.Transfer` object for executing the + redistribution of data + """ + p1 = self._p0.pencil(axis) + return p1, self._p0.transfer(p1, self.dtype) + + def write( + self, + filename, + name="darray", + step=0, + global_slice=None, + domain=None, + as_scalar=False, + ): """Write snapshot ``step`` of ``self`` to file ``filename`` Parameters @@ -396,14 +376,14 @@ def write(self, filename, name='darray', step=0, global_slice=None, >>> u.write('h5file.h5', 'u', (slice(None), 4)) """ if isinstance(filename, str): - writer = HDF5File if filename.endswith('.h5') else NCFile - f = writer(filename, domain=domain, mode='a') + writer = HDF5File if filename.endswith(".h5") else NCFile + f = writer(filename, domain=domain, mode="a") elif isinstance(filename, FileBase): f = filename field = [self] if global_slice is None else [(self, global_slice)] f.write(step, {name: field}, as_scalar=as_scalar) - def read(self, filename, name='darray', step=0): + def read(self, filename, name="darray", step=0): """Read data ``name`` at index ``step``from file ``filename`` into ``self`` @@ -432,14 +412,181 @@ def read(self, filename, name='darray', step=0): """ if isinstance(filename, str): - writer = HDF5File if filename.endswith('.h5') else NCFile - f = writer(filename, mode='r') + writer = HDF5File if filename.endswith(".h5") else NCFile + f = writer(filename, mode="r") elif isinstance(filename, FileBase): f = filename f.read(self, name, step=step) -def newDistArray(pfft, forward_output=True, val=0, rank=0, view=False): +class DistArray(DistArrayBase, np.ndarray): + """Distributed Numpy array + + This Numpy array is part of a larger global array. Information about the + distribution is contained in the attributes. + + Parameters + ---------- + global_shape : sequence of ints + Shape of non-distributed global array + subcomm : None, :class:`.Subcomm` object or sequence of ints, optional + Describes how to distribute the array + val : Number or None, optional + Initialize array with this number if buffer is not given + dtype : np.dtype, optional + Type of array + buffer : Numpy array, optional + Array of correct shape. The buffer owns the memory that is used for + this array. + alignment : None or int, optional + Make sure array is aligned in this direction. Note that alignment does + not take rank into consideration. + rank : int, optional + Rank of tensor (number of free indices, a scalar is zero, vector one, + matrix two) + + + For more information, see `numpy.ndarray `_ + + Note + ---- + Tensors of rank higher than 0 are not distributed in the first ``rank`` + indices. For example, + + >>> from mpi4py_fft import DistArray + >>> a = DistArray((3, 8, 8, 8), rank=1) + >>> print(a.pencil.shape) + (8, 8, 8) + + The array ``a`` cannot be distributed in the first axis of length 3 since + rank is 1 and this first index represent the vector component. The ``pencil`` + attribute of ``a`` thus only considers the last three axes. + + Also note that the ``alignment`` keyword does not take rank into + consideration. Setting alignment=2 for the array above means that the last + axis will be aligned, also when rank>0. + + """ + + xp = np + + def __new__( + cls, + global_shape, + subcomm=None, + val=None, + dtype=float, + buffer=None, + strides=None, + alignment=None, + rank=0, + ): + if len(global_shape[rank:]) < 2: # 1D case + obj = cls.xp.ndarray.__new__( + cls, global_shape, dtype=dtype, buffer=buffer, strides=strides + ) + if buffer is None and isinstance(val, Number): + obj.fill(val) + obj._rank = rank + obj._p0 = None + return obj + + subcomm = cls.getSubcomm(subcomm, global_shape, rank, alignment) + p0, subshape = cls.getPencil(subcomm, rank, global_shape, alignment) + + obj = cls.xp.ndarray.__new__(cls, subshape, dtype=dtype, buffer=buffer) + if buffer is None and isinstance(val, Number): + obj.fill(val) + obj._p0 = p0 + obj._rank = rank + return obj + + +class DistArrayCuPy(DistArrayBase, cp.ndarray): + """Distributed CuPy array + + This Numpy array is part of a larger global array. Information about the + distribution is contained in the attributes. + + Parameters + ---------- + global_shape : sequence of ints + Shape of non-distributed global array + subcomm : None, :class:`.Subcomm` object or sequence of ints, optional + Describes how to distribute the array + val : Number or None, optional + Initialize array with this number if buffer is not given + dtype : np.dtype, optional + Type of array + buffer : Numpy array, optional + Array of correct shape. The buffer owns the memory that is used for + this array. + alignment : None or int, optional + Make sure array is aligned in this direction. Note that alignment does + not take rank into consideration. + rank : int, optional + Rank of tensor (number of free indices, a scalar is zero, vector one, + matrix two) + + + For more information, see `numpy.ndarray `_ + + Note + ---- + Tensors of rank higher than 0 are not distributed in the first ``rank`` + indices. For example, + + >>> from mpi4py_fft import DistArray + >>> a = DistArray((3, 8, 8, 8), rank=1) + >>> print(a.pencil.shape) + (8, 8, 8) + + The array ``a`` cannot be distributed in the first axis of length 3 since + rank is 1 and this first index represent the vector component. The ``pencil`` + attribute of ``a`` thus only considers the last three axes. + + Also note that the ``alignment`` keyword does not take rank into + consideration. Setting alignment=2 for the array above means that the last + axis will be aligned, also when rank>0. + + """ + + # TODO: docs + xp = cp + + def __new__( + cls, + global_shape, + subcomm=None, + val=None, + dtype=float, + memptr=None, + strides=None, + alignment=None, + rank=0, + ): + if len(global_shape[rank:]) < 2: # 1D case + obj = cls.xp.ndarray.__new__( + cls, global_shape, dtype=dtype, memptr=memptr, strides=strides + ) + if memptr is None and isinstance(val, Number): + obj.fill(val) + obj._rank = rank + obj._p0 = None + return obj + + subcomm = cls.getSubcomm(subcomm, global_shape, rank, alignment) + p0, subshape = cls.getPencil(subcomm, rank, global_shape, alignment) + + obj = cls.xp.ndarray.__new__(cls, subshape, dtype=dtype, memptr=memptr) + if memptr is None and isinstance(val, Number): + obj.fill(val) + obj._p0 = p0 + obj._rank = rank + return obj + + +def newDistArray(pfft, forward_output=True, val=0, rank=0, view=False, useCuPy=False): """Return a new :class:`.DistArray` object for provided :class:`.PFFT` object Parameters @@ -457,6 +604,8 @@ def newDistArray(pfft, forward_output=True, val=0, rank=0, view=False): If True return view of the underlying Numpy array, i.e., return cls.view(np.ndarray). Note that the DistArray still will be accessible through the base attribute of the view. + useCuPy : bool, optional + If True, returns DistArrayCuPy instead of DistArray Returns ------- @@ -479,15 +628,25 @@ def newDistArray(pfft, forward_output=True, val=0, rank=0, view=False): dtype = pfft.forward.output_array.dtype else: dtype = pfft.forward.input_array.dtype - global_shape = (len(global_shape),)*rank + global_shape - z = DistArray(global_shape, subcomm=p0.subcomm, val=val, dtype=dtype, - alignment=p0.axis, rank=rank) + global_shape = (len(global_shape),) * rank + global_shape + + darraycls = DistArrayCuPy if useCuPy else DistArray + z = darraycls( + global_shape, + subcomm=p0.subcomm, + val=val, + dtype=dtype, + alignment=p0.axis, + rank=rank, + ) return z.v if view else z -def Function(*args, **kwargs): #pragma: no cover + +def Function(*args, **kwargs): # pragma: no cover import warnings + warnings.warn("Function() is deprecated; use newDistArray().", FutureWarning) - if 'tensor' in kwargs: - kwargs['rank'] = 1 - del kwargs['tensor'] + if "tensor" in kwargs: + kwargs["rank"] = 1 + del kwargs["tensor"] return newDistArray(*args, **kwargs)