diff --git a/docs/.buildinfo b/docs/.buildinfo
new file mode 100644
index 0000000..c823e82
--- /dev/null
+++ b/docs/.buildinfo
@@ -0,0 +1,4 @@
+# Sphinx build info version 1
+# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done.
+config: 33da0a7c7b169e6713563d9e953a03e8
+tags: 645f666f9bcd5a90fca523b33c5a78b7
diff --git a/docs/Makefile b/docs/Makefile
index d0c3cbf..582e706 100644
--- a/docs/Makefile
+++ b/docs/Makefile
@@ -6,7 +6,7 @@
SPHINXOPTS ?=
SPHINXBUILD ?= sphinx-build
SOURCEDIR = source
-BUILDDIR = build
+BUILDDIR = . #build
# Put it first so that "make" without argument is like "make help".
help:
diff --git a/docs/_images/New_bached_Algo_row3.png b/docs/_images/New_bached_Algo_row3.png
new file mode 100644
index 0000000..2ad2957
Binary files /dev/null and b/docs/_images/New_bached_Algo_row3.png differ
diff --git a/docs/_images/benchmark_GPU.png b/docs/_images/benchmark_GPU.png
new file mode 100644
index 0000000..36397aa
Binary files /dev/null and b/docs/_images/benchmark_GPU.png differ
diff --git a/docs/_images/benchmark_strongweak.png b/docs/_images/benchmark_strongweak.png
new file mode 100644
index 0000000..3c4a35b
Binary files /dev/null and b/docs/_images/benchmark_strongweak.png differ
diff --git a/docs/_images/pyDNMFk_RD500.png b/docs/_images/pyDNMFk_RD500.png
new file mode 100644
index 0000000..478a881
Binary files /dev/null and b/docs/_images/pyDNMFk_RD500.png differ
diff --git a/docs/_modules/index.html b/docs/_modules/index.html
index 23daacb..c9d0b5f 100644
--- a/docs/_modules/index.html
+++ b/docs/_modules/index.html
@@ -1,155 +1,237 @@
-
-
[docs]classGetTopology():
+r"""
+ Constructor for the GetTopology class.
+
+ This class is used for identifying the system topology in the context of MPI.
+ It determines the global and local ranks, identifies the host system, and
+ determines GPU availability and assignments.
+
+ Args:
+ - comm (MPI Comm): The MPI communicator. Defaults to MPI.COMM_WORLD.
+ - gmasterRank (int): Rank of the global master. Defaults to 0.
+ """
+ def__init__(self,comm=None,gmasterRank=0):
+ ifcommisNone:comm=MPI.COMM_WORLD
+ rank=comm.Get_rank()
+ size=comm.Get_size()
+ myHost=os.uname()[1]
+ GMASTER=(rank==gmasterRank)
+ nGPUs_local=cp.cuda.runtime.getDeviceCount()
+ self.comm,self.rank,self.size,self.myHost,self.GMASTER,self.nGPU_local=comm,rank,size,myHost,GMASTER,nGPUs_local
+
+ ifGMASTER:
+ hosts,local_size,members=[myHost],{myHost:1},{myHost:[gmasterRank]}
+ forrinrange(size):
+ ifr!=gmasterRank:
+ rh=comm.recv(source=r,tag=10*r)
+ ifrhnotinhosts:
+ hosts.append(rh)
+ local_size[rh]=0
+ members[rh]=[]
+ local_size[rh]+=1
+ ifrnotinmembers[rh]:members[rh].append(r)
+ else:
+ hosts,local_size=None,None
+ comm.send(myHost,dest=gmasterRank,tag=10*rank)
+ comm.barrier()
+
+ ifGMASTER:
+ #if rank == 0: print("[+] OK020")
+ global_uID=None
+ ifNCCL_backend:
+ #if rank == 0: print("[+] OK021")
+ try:
+ global_uID=nccl.get_unique_id()
+ #if rank == 0: print("[+] OK022")
+ except:
+ ifrank==0:print("[!] UNABLE to get NCCL GLOBAL_UID")
+ nHosts=len(hosts)
+ stride=int(size/nHosts)
+ MPI_TOPO={'GMASTER':rank,'hosts':hosts,'nHosts':nHosts,'stride':stride,'nccl_GUID':global_uID,'local_size':local_size,'members':members}
+ MPI_TOPO['LMASTER']={}
+ #if rank == 0: print("[+] OK023")
+ forhostinhosts:
+ idx=hosts.index(host)
+ MPI_TOPO['LMASTER'][host]=idx
+ else:
+ MPI_TOPO=None
+ comm.barrier()
+
+ MPI_TOPO=comm.bcast(MPI_TOPO,root=0)
+ #if rank == 0: print("[+] OK04")
+ nHosts=MPI_TOPO['nHosts']
+ stride=MPI_TOPO['stride']
+ hosts=MPI_TOPO['hosts']
+ local_size=MPI_TOPO['local_size']
+ idx=hosts.index(myHost)
+ lrank=rank//nHosts
+ myMaster=MPI_TOPO['LMASTER'][myHost]
+ LMASTER=(lrank==0)
+ self.topology,self.nHost,self.hosts,self.host_idx,self.mpi_stride=MPI_TOPO,nHosts,hosts,idx,stride
+ self.local_size,self.lrank,self.myMaster,self.LMASTER=local_size,lrank,myMaster,LMASTER
+
+ ifGMASTER:
+ gIDs={myHost:np.arange(nGPUs_local,dtype=np.int32)}
+ nGPUs=nGPUs_local
+ lastID=gIDs[myHost][-1]
+ forrinlist(MPI_TOPO['LMASTER'].keys()):
+ ifMPI_TOPO['LMASTER'][r]!=rank:
+ ng=comm.recv(source=MPI_TOPO['LMASTER'][r],tag=100*MPI_TOPO['LMASTER'][r])
+ nGPUs+=ng
+ gIDs[r]=np.arange(ng,dtype=np.int32)+lastID+1
+ lastID=gIDs[r][-1]
+ nGPUs=min(nGPUs,size)
+ gIDs['nGPUs']=min(nGPUs,size)
+ elif(LMASTERand(notGMASTER)):
+ gIDs=None
+ ng=None
+ comm.send(nGPUs_local,dest=MPI_TOPO['GMASTER'],tag=100*rank)
+ else:
+ ng=None
+ gIDs=None
+ comm.barrier()
+ #if rank == 0: print("[+] OK06")
+ gIDs=comm.bcast(gIDs,root=MPI_TOPO['GMASTER'])
+ #if rank == 0: print("[+] OK07")
+ nGPUs=gIDs['nGPUs']
+ delgIDs['nGPUs']
+ MPI_TOPO['gIDs']=gIDs
+ MPI_TOPO['nGPUs']=nGPUs
+ iflrank<nGPUs_local:
+ self.gID=MPI_TOPO['gIDs'][myHost][lrank]
+
+
[docs]defshowTopology(self):
+"""
+ Method to display the detected topology for debugging purposes.
+ """
+ ifself.GMASTER:
+ role=f"<{red('GMASTER')}>"
+ elifself.LMASTERand(notself.GMASTER):
+ role=f"<{blue('LMASTER')}>"
+ else:
+ role=f"<{amber('WORKER')}>"
+ log(msg=f"{role} on host {self.myHost}",rank=self.rank,lrank=self.lrank)
+ self.comm.barrier()
+
+
[docs]defgetReduceCommTree(self,VRBZ=False):
+"""
+ Retrieve a communication tree for efficient reduce operations.
+
+ Args:
+ - VRBZ (bool): Verbosity level for debug outputs. Defaults to False.
+ """
+ x=np.arange(self.topology['nGPUs'])
+ self.reduceCommTree=getReduceCommTree(x=x,VRBZ=VRBZ)
+
+
+
+
+
[docs]classsuperCommunicator():
+r"""
+ Constructor for the superCommunicator class.
+
+ This class acts as a wrapper around the MPI and NCCL communication protocols,
+ enabling simplified and efficient communication setup based on the detected
+ system topology.
+
+ Args:
+ - comm (MPI Comm): The MPI communicator.
+ - gmasterRank (int): Rank of the global master. Defaults to 0.
+ """
+ def__init__(self,comm,gmasterRank=0):
+ self.globalComm=comm
+ self.topology=GetTopology(comm=comm,gmasterRank=gmasterRank)
+ self.comm={'GLOBAL':comm}
+ self.comms=['GLOBAL']
+ self.rank=self.topology.rank
+ self.GMASTER=self.topology.GMASTER
+ self.LMASTER=self.topology.LMASTER
+ self.myMaster=self.topology.myMaster
+ self.inContext={'GLOBAL':True}
+ self.contexts=list(self.inContext.keys())
+
+
[docs]defcreateComm(self,root,members,name,backEnd='NCCL'):
+"""
+ Create a new communication context based on the provided parameters.
+
+ Args:
+ - root (int): Root rank for this communicator.
+ - members (list): List of member ranks included in this communicator.
+ - name (str): Name for this communicator.
+ - backEnd (str): Communication backend (e.g., 'NCCL'). Defaults to 'NCCL'.
+ """
+ ifnameinself.comms:
+ ifself.GMASTER:
+ log(msg=f"[{red('!')} ]Communcator {name} existAlready",rank=self.rank,lrank=self.lrank)
+ return
+ self.comm[name]=NCCL_COMM(comm=self.globalComm,root=root,members=members,name=name)
+ ifself.rankinmembers:
+ self.inContext[name]=True
+ else:
+ self.inContext[name]=False
+ self.contexts=list(self.inContext.keys())
+
+
+
[docs]classNCCL_COMM():
+r"""
+ Constructor for the NCCL_COMM class.
+
+ This class encapsulates NCCL-specific communication functions, enabling
+ efficient GPU-to-GPU communications.
+
+ Args:
+ - comm (MPI Comm): The MPI communicator.
+ - root (int): Root rank for this communicator.
+ - members (list): List of member ranks included in this communicator.
+ - name (str): Name for this communicator.
+ """
+ def__init__(self,comm,root,members,name):
+ ifcomm.rankinmembers:
+ self.rank=comm.rank
+ self.lrank=members.index(comm.rank)
+ self.size=len(members)
+ self.ROOT=root
+ uID=None
+ self.MASTER=(comm.rank==root)
+ ifself.MASTER:
+ uID=nccl.get_unique_id()
+ print(f"[+] <LMASTER>[{comm.rank}]: uID = {uID[:8]}")
+ forminmembers:
+ ifm!=root:comm.send(uID,dest=m,tag=1000*m)
+ else:
+ uID=comm.recv(source=root,tag=1000*self.rank)
+ print(f"[+] <WORKER>[{comm.rank}]: uID = {uID[:8]}")
+ #comm.barrier()
+ ifuIDisnotNone:
+ print(f" -> Building NCCL comm [{len(members)}, uID:{uID[:8]}, lrank:{self.lrank}]")
+ comm.barrier()
+ ncclcomm=nccl.NcclCommunicator.initAll(members)
+ ifself.MASTER:log(msg=f"[{green('+')}] NCCL comm {name} built OK",rank=self.rank,lrank=self.lrank)
+ else:
+ ncclcomm=None
+ self.comm=ncclcomm
+# @author: Ismael Boureima
+importsys
+frommpi4pyimportMPI
+
+# Attempt to import the cupy library for GPU array operations.
+try:
+ importcupyascp
+exceptImportError:
+ print("Unable to import Cupy.",file=sys.stderr)
+ cp=None
+
+# Attempt to import NCCL for GPU-to-GPU communication support from cupy.
+try:
+ fromcupy.cudaimportnccl
+exceptImportError:
+ importsys
+ print("NCCL is not supported.",file=sys.stderr)
+ nccl=None
+
+
+
[docs]defget_NCCL_unique_id():
+"""
+ Fetch a unique ID for NCCL group communication.
+
+ Returns:
+ - Unique ID if NCCL is available, otherwise None.
+ """
+ ifncclisnotNone:returnnccl.get_unique_id()
+
+
+
+
[docs]classMPIComm():
+"""
+ Class to handle basic MPI communication.
+
+ This class acts as a simple wrapper around basic MPI communication
+ functions for simplification and clarity.
+ """
+ def__init__(self):
+ self.comm=MPI.COMM_WORLD
+ self.size=self.comm.Get_size()
+ self.rank=self.comm.Get_rank()
+
+
[docs]defAllreduce(self,send_arr,op=MPI.SUM,stream=None):
+"""
+ Perform an all-reduce operation across all MPI processes.
+
+ Args:
+ send_arr (array-like): Data to be reduced.
+ op (MPI.Op): The reduction operation to be applied. Default is MPI.SUM.
+ stream: CUDA stream for async operations (currently unused).
+
+ Returns:
+ array-like: The reduced array.
+ """
+
+ returnself.comm.allreduce(send_arr)
+
+
[docs]defReduce(self,send_arr,op=MPI.SUM,root=0,stream=None):
+"""
+ Perform a reduce operation over MPI processes.
+
+ Args:
+ send_arr (array-like): Data to be reduced.
+ op (MPI.Op): The reduction operation to be applied. Default is MPI.SUM.
+ root (int): Rank of the root process. Default is 0.
+ stream: CUDA stream for async operations (currently unused).
+ """
+ self.comm.reduce(send_arr,op,root=root)
+
+
[docs]defBcast(self,arr,root=0,stream=None):
+"""
+ Broadcast data to all MPI processes.
+
+ Args:
+ arr (array-like): Data to be broadcasted.
+ root (int): Rank of the root process initiating the broadcast. Default is 0.
+ stream: CUDA stream for async operations (currently unused).
+ """
+ self.comm.bcast(arr,root=root)
+
+
+
+
+
[docs]classNCCLComm(nccl.NcclCommunicator):
+"""
+ Class to handle NCCL GPU communication.
+
+ This class acts as a wrapper around NCCL's communication functions
+ optimized for GPU-to-GPU communications.
+ """
+ def__init__(self,ndev,commId,rank):
+"""
+ Args:
+ ndev (int): Number of devices (GPUs) involved in the communication.
+ commId (ncclUniqueId): A unique NCCL ID for group communication.
+ rank (int): Rank of the current process within the group.
+ """
+ super(NCCLComm,self).__init__(ndev,commId,rank)
+ self.comm=nccl.NcclCommunicator
+ self.size=ndev
+ self.rank=rank
+ self.commId=commId
+
+
+
[docs]defget_unique_id(self):
+"""
+ Returns:
+ ncclUniqueId: The unique NCCL ID for this communicator.
+ """
+ returnself.commId
+
+
[docs]defget_NCCL_count_dtype(self,arr):
+"""
+ Determine the data count and data type for NCCL communication based on array dtype.
+ """
+ ifarr.dtype==cp.complex64:
+ returnarr.size*2,nccl.NCCL_FLOAT32
+ elifarr.dtype==cp.complex128:
+ returnarr.size*2,nccl.NCCL_FLOAT64
+ elifarr.dtype==cp.float32:
+ returnarr.size,nccl.NCCL_FLOAT32
+ elifarr.dtype==cp.float64:
+ returnarr.size,nccl.NCCL_FLOAT64
+ else:
+ raiseValueError("This dtype is not supported by NCCL.")
+
+
[docs]defAllreduce(self,send_arr,recv_arr,op=nccl.NCCL_SUM,stream=None):
+"""
+ Perform an all-reduce operation across all GPU processes using NCCL.
+ """
+ sendbuf=send_arr.__cuda_array_interface__['data'][0]
+ recvbuf=recv_arr.__cuda_array_interface__['data'][0]
+ count,datatype=self.get_NCCL_count_dtype(send_arr)
+ ifstreamisNone:
+ stream=cp.cuda.Stream.null.ptr
+ else:
+ stream=stream.ptr
+ super(NCCLComm,self).allReduce(sendbuf,recvbuf,count,datatype,op,stream)
+
+
[docs]defReduce(self,send_arr,recv_arr,op=nccl.NCCL_SUM,root=0,stream=None):
+"""
+ Perform a reduce operation across all GPU processes using NCCL.
+ """
+ sendbuf=send_arr.__cuda_array_interface__['data'][0]
+ recvbuf=recv_arr.__cuda_array_interface__['data'][0]
+ count,datatype=self.get_NCCL_count_dtype(send_arr)
+ ifstreamisNone:
+ stream=cp.cuda.Stream.null.ptr
+ else:
+ stream=stream.ptr
+ super(NCCLComm,self).reduce(sendbuf,recvbuf,count,datatype,op,root,stream)
+
+
[docs]defBcast(self,arr,root=0,stream=None):
+"""
+ Perform broadcast operation from root to all GPU processes using NCCL.
+ """
+ buff=arr.__cuda_array_interface__['data'][0]
+ count,datatype=self.get_NCCL_count_dtype(arr)
+ ifstreamisNone:
+ stream=cp.cuda.Stream.null.ptr
+ else:
+ stream=stream.ptr
+ super(NCCLComm,self).bcast(buff,count,datatype,root,stream)
+
+
+
+
+
+
[docs]classSUPER_COMM():
+"""
+ A unified communicator that supports both MPI (for CPUs) and NCCL (for GPUs).
+ """
+ def__init__(self,ndev=None,commId=None,rank=None,backend="MPI"):
+"""
+ Args:
+ ndev (int, optional): Number of devices (GPUs) involved in the communication.
+ commId (ncclUniqueId, optional): A unique NCCL ID for group communication.
+ rank (int, optional): Rank of the current process within the group.
+ backend (str): Communication backend to use. Can be "MPI" or "NCCL".
+ """
+ self.backend=backend.upper()
+ ifself.backendin["MPI","CPU","DEFAULT"]:
+ self.comm=MPI_COMM()
+ elifself.backendin["NCCL","GPU","CUDA","BEST","PERFORMANCE"]:
+ assertndevisnotNone,"[!][COMM ERROR] SUPER_COMM NCCL backend requires valid ndev"
+ assertcommIdisnotNone,"[!][COMM ERROR] SUPER_COMM NCCL backend requires valid commID"
+ assertrankisnotNone,"[!][COMM ERROR] SUPER_COMM NCCL backend requires valid rank"
+ self.comm=NCCL_COMM(ndev=ndev,commId=commId,rank=rank)
+ #self.comm = NCCLComm(ndev=ndev, commId=commId, rank=rank)
+ else:
+ raise"[!][COMM ERROR] Backend {} not supported".format(self.backend)
+ self.size=self.comm.size
+ self.rank=self.comm.rank
[docs]definit(arg):
- """ Global variables declaration here. The variables declared within this function in this file are shared across other files and functions during import."""
+""" Global variables declaration here. The variables declared within this function in this file are shared across other files and functions during import."""globaltime,flagtime={}flag=0
+# @author: Ismael Boureima
+# Necessary imports for the module
+importos,time
+importnumpyasnp
+importnumpy
+fromqueueimportQueue
+from.toolzimportlog,blue,green,red,amber
+
+# Try importing cupy modules for GPU support
+try:
+ importcupyascp
+ importcupy,cupyx
+except:
+ raiseException(f"[{red('!!')}] Unable to import Cupy")
+
+# Try importing necessary cupy backend modules
+try:
+ fromcupy.cudaimportnccl
+except:
+ raiseException(f"[{red('!!')}] Unable to import NCCL Cupy backend")
+
+# Importing specific functions from cupy to work with
+fromcupyimportasarrayas_asarray
+fromcupyimportasnumpyas_asnumpy
+fromcupyimportdivideas_divide
+fromcupyimportmatmulas_matmul
+fromcupyimportmultiplyas_multiply
+fromcupyimportzerosas_zeros
+
+# Additional imports for the module
+from.communicatorsimportNCCLComm
+from.comm_utilsimportGetTopology
+frommpi4pyimportMPI
+from.cupyCuSPARSELibimportspMMas_spMM
+from.cupyCuSPARSELibimportspMat,spRandMat,spMM
+from.cupy_utilsimportpin_memoryas_pin_mem
+fromscipy.sparseimportcoo_matrix,csc_matrix,csr_matrix,dia_matrix,issparse
+importscipy
+from.utilsimportdata_operations
+
+
+# Defining some constants for the module
+PRECISIONS={'float32':np.float32,'float64':np.float64,'int16':np.int16,'int32':np.int32,'int64':np.int64}
+WORKLOADS={'nmf':'NMF','nmfk':'NMFk','bench':'BENCHMARK'}
+NMF_NORMS={'fro':'FROBENUS','kl':'KL'}
+NMF_METHODS={'mu':'Multiplicative Update'}
+NMF_INITS={'rand':'RANDOM','random':'RANDOM','nnsvd':'Non-Negative SVD','svd':'SVD'}
+
+# Utility function to raise an exception for unsupported operations
+
[docs]defnoSupportFor(X):
+ raiseException(f"[{red('!!')}] {amber(X)} is not yet supported on GPU")
+
+
+# The main class to perform NMF on GPU
+
[docs]classcudaNMF():
+
+ def__init__(self,A_ij,k,params,factors=None,save_factors=False):
+r"""
+ Initialize the cudaNMF class.
+
+ Parameters
+ ----------
+ A_ij : array-like
+ The matrix to be factorized.
+ k : int
+ Rank of the factorization.
+ params : dict
+ Configuration parameters for NMF.
+ factors : array-like, optional
+ Initial guess for the factors.
+ save_factors : bool, optional
+ Whether to save factors or not.
+
+ """
+ self.A_ij=A_ij
+ self.params=params
+ self.k=k
+
+ self.factors=factors
+ self.save_factors=save_factors
+ self.VRBZ=params.verbose
+ try:
+ self.comm=params.comm1
+ except:
+ print(f"{149*'#'}")
+ print("[{}] --[{} comm {}]--".format(red("!"),blue('MPI'),red("NOT FOUND")))
+ self.comm=MPI.COMM_WORLD
+ self.rank=self.comm.rank
+ ifself.rank==0:
+ print(f"{149*'#'}")
+ log("[{}] --[{} comm built {}]--".format(green("+"),blue('MPI'),green("OK")))
+ self.rank=self.comm.rank
+ self.eps=np.finfo(self.A_ij.dtype).eps
+ self.params.eps=self.eps
+ self.precision=self.params.precision
+ self.topo_dim=self.params.topo_dim
+ self.local_m,self.local_n=self.A_ij.shape
+ self.p_r,self.p_c=self.params.p_r,self.params.p_c
+ #[1] Identify distributed system's topology
+ self.NCCL_COMM_BUILT=False
+ self.identify_distributed_system_compute_topology()
+ #[2] Identify Global compute grid parameters
+ self.data_op=data_operations(self.A_ij,self.params)
+ self.params=self.data_op.params
+ self.GPU_GRID_BUILT=False
+ #[3] Identify A matrix partition method
+ self.find_optimat_partition_strategy()
+ #[4] Build GPU communication COMM
+ self.checkGPUContext()
+ self.buildSubCommunicators()
+ METHOD=f"NMF_{self.GPU_PARTITION}_cuda"
+ #[5] Check format (dense/sparse) of A matrix
+ self.SPARSE_BUFF_PINNED_MEM_PARAMS_CONFIGURED=False
+ self.check_A_matrix_format()
+ ifself.A_IS_SPARSE:
+ METHOD=f"sparse_{METHOD}"
+ else:
+ METHOD=f"dense_{METHOD}"
+ self.NMFMethod=METHOD
+ #[6] Identifiy arrays that will be cached on GPU
+ self.A_IS_BATCHED=self.params.A_is_batched
+ self.A_is_cached,self.W_is_cached,self.H_is_cached=None,None,None
+ try:
+ gpu_arrays=params.GPU_arrays
+ except:
+ ifself.rank==0:
+ self.log(msg=f"[!][CACHING] GPU arrays unspecified")
+ self.log(msg=f"[!][CACHING] Assuming A, W, and H are cached on GPU")
+ gpu_arrays=[]
+ self.gpu_arrays=[]
+ self.automatic_gpu_array_selection=False
+ forARRingpu_arrays:
+ arr=ARR.lower()
+ ifarrin['auto','optim']:
+ self.automatic_gpu_array_selection=True
+ else:
+ self.gpu_arrays.append(arr)
+ ifself.automatic_gpu_array_selection:
+ ifself.rank==0:self.log(msg=f"[{red('!')}][{amber('COMING SOON')}] Automatic/Optimal GPU array selection is {red('Not yet supported')}")
+ #[7] Build GPU MEM grid
+ self.nBatch=self.params.nBatch
+ self.batchQeueSize=self.params.batchQeueSize
+ self.qs=self.batchQeueSize
+ self.SQ=None
+ self.MAXSTREAMS=self.params.MAXSTREAMS
+ ifnotself.GPU_GRID_BUILT:self.get_gpu_grid_params()
+ self.BATCHING_AXIS=None
+ self.COL_BATCHING_AXIS=None
+ self.ROW_BATCHING_AXIS=None
+ self.find_optimal_baching_axis()
+ self.set_batching_params()
+ ifself.A_IS_BATCHED:
+ self.NMFMethod="{}_BATCHED".format(self.NMFMethod)
+ else:
+ self.cache_A_on_device()
+ self.X_per_initialized=False
+ self.show_A_info()
+ #[8] Prepare events
+ self.GB=1024**3
+ self.TB=1024*self.GB
+ self.dt={'NMF':0.0,'W_up':0.0,'H_up':0.0,
+ "allRed_XHT":0.0,"allRed_WHHT":0.0,
+ 'allRed_WTX':0.0,'allRed_WTWH':0.0,
+ 'allRed_WTW':0.0,'allRed_HHT':0.0,
+ 'H2D_A':0.0,'H2D_H':0.0,'H2D_W':0.0,
+ 'D2H_A':0.0,'D2H_H':0.0,'D2H_W':0.0,
+ 'units':'[ms]'}
+ self.events={}
+ self.events['start']=cp.cuda.Event()
+ self.events['end']=cp.cuda.Event()
+ self.events['h2d_s']=cp.cuda.Event()
+ self.events['h2d_e']=cp.cuda.Event()
+ self.events['d2h_s']=cp.cuda.Event()
+ self.events['d2h_e']=cp.cuda.Event()
+ self.events['reduce_s']=cp.cuda.Event()
+ self.events['reduce_e']=cp.cuda.Event()
+ self.events['nmf_start']=cp.cuda.Event()
+ self.events['nmf_end']=cp.cuda.Event()
+ #[9] Build random generation initial state seeds # This is used so that cofactors are initialized
+ # locally and therefor avoiding broadcat communications
+ ifself.rank==0:
+ iter_seeds=np.random.randint(100*self.params.end_k,size=2*self.params.end_k)
+ else:
+ iter_seeds=None
+ iter_seeds=self.comm.bcast(iter_seeds,root=0)
+ self.iter_seeds=iter_seeds
+ #[10] Configure Pinned Memory management Pool
+ #self.PINNEDMEMPOOL = cp.cuda.PinnedMemoryPool()
+ #cp.cuda.set_pinned_memory_allocator(self.PINNEDMEMPOOL.malloc)
+ ifself.A_IS_BATCHED:
+ self.MEMPOOL=cp.get_default_memory_pool()
+ self.PINNEDMEMPOOL=cp.cuda.PinnedMemoryPool()
+ cp.cuda.set_pinned_memory_allocator(self.PINNEDMEMPOOL.malloc)
+ else:
+ self.MEMPOOL=cp.get_default_memory_pool()
+ self.PINNEDMEMPOOL=cp.get_default_pinned_memory_pool()
+ self.showMemStats()
+ #[11] Build Qeues of CUDA STREAMS for Async copies management
+ self.STREAMS={}
+ self.stream=[]
+ self.FREE_STREAM={}
+ self.REDUCE_STREAM=cp.cuda.stream.Stream()
+ self.MAXSTREAMS=min(self.nBatch,self.MAXSTREAMS)
+ forninrange(self.MAXSTREAMS):
+ self.stream.append("s%04d"%n)
+ self.STREAMS[self.stream[-1]]=cp.cuda.stream.Stream()
+ self.FREE_STREAM[self.stream[-1]]=cp.cuda.Event()
+
+
+
[docs]deflog(self,msg):
+r"""
+ Prints a log message.
+
+ Parameters
+ ----------
+ msg : str
+ The message to be logged.
+
+ """
+ log(msg=msg,rank=self.rank,lrank=self.lrank)
+
+
+
[docs]defisCupyObject(self,x):
+r"""
+ Checks if the input object is a Cupy object.
+
+ Parameters
+ ----------
+ x : object
+ The object to be checked.
+
+ Returns
+ -------
+ bool
+ True if x is a Cupy object, False otherwise.
+
+ """
+ returntype(x)in[cp._core.core.ndarray]
+
+
+
[docs]defgetArrayType(self,x):
+r"""
+ Returns the type of the array (CPU/GPU, dense/sparse, etc.).
+
+ Parameters
+ ----------
+ x : array-like
+ The array whose type needs to be determined.
+
+ Returns
+ -------
+ str
+ Type of the array (e.g., "CPU_DENSE", "GPU_SPARSE").
+
+ """
+ T=type(x)
+ ifTin[list]:
+ return"LIST[{}]".format(self.getObjectType(x[0]))
+ elifTin[np.ndarray]:
+ return"CPU_DENSE"
+ elifTin[cp._core.core.ndarray]:
+ return"GPU_DENSE"
+ elifTin[scipy.sparse.coo.coo_matrix,scipy.sparse.csr.csr_matrix,scipy.sparse.csc.csc_matrix]:
+ return"CPU_SPARSE"
+ elifTin[cupyx.scipy.sparse.coo.coo_matrix,cupyx.scipy.sparse.csr.csr_matrix,cupyx.scipy.sparse.csc.csc_matrix]:
+ return"GPU_SPARSE"
+ else:
+ return"UNKNOWN"
+
+
+
[docs]defidentify_distributed_system_compute_topology(self):
+r"""
+ Identifies the distributed system's compute topology and sets up the necessary
+ attributes related to topology.
+ """
+ self.topology=GetTopology(self.comm)
+ ifself.VRBZ:self.show_topology()
+ ifself.rank==0:
+ global_uID=nccl.get_unique_id()
+ else:
+ global_uID=None
+ self.global_uID=self.comm.bcast(global_uID,root=0)
+ self.comm.barrier()
+ self.myHost=self.topology.myHost
+ self.lrank=self.topology.lrank
+ self.nGPU_local=self.topology.nGPU_local
+ self.nGPUs=self.topology.topology['nGPUs']
+
+
+
[docs]defcompute_global_dim(self):##[IDB]: This is needed here
+r"""Computes global dimensions m and n from given chunk sizes for any grid configuration"""
+ ifself.p_r!=1andself.p_c==1:
+ self.global_n=self.local_n
+ self.global_m=self.comm.allreduce(self.local_m)
+ elifself.p_c!=1andself.p_r==1:
+ self.global_n=self.comm.allreduce(self.local_n)
+ self.global_m=self.local_m
+ else:
+ ifself.rank%self.p_c==0:
+ self.global_m=self.local_m
+ else:
+ self.global_m=0
+ self.global_m=self.comm.allreduce(self.global_m)
+ ifself.rank//self.p_c==0:
+ self.global_n=self.local_n
+ else:
+ self.global_n=0
+ self.global_n=self.comm.allreduce(self.global_n)
+ self.comm.barrier()
+
+
+
[docs]defcheckGPUContext(self):
+r"""
+ Determines if the current instance is operating within a GPU context and sets up
+ GPU-related attributes.
+ """
+ ifself.lrank<self.nGPU_local:
+ self.IN_GPU_CONTEXT=True
+ self.lID=self.lrank# Local GPU ID
+ self.gID=self.topology.gID# Global GPU ID
+ cp.cuda.Device(self.lrank).use()
+ #self.device = cp.cuda.Device()
+ self.cudaDevice=cp.cuda.runtime.getDevice()
+ ifself.VRBZ:self.log(msg=f"I am using <Global Device{self.gID}> | <Local Device{self.cudaDevice}> on {self.myHost}")
+ else:
+ self.IN_GPU_CONTEXT=False
+ self.lID=None
+ self.gID=None
+ ifself.VRBZ:self.log(msg=f"I am not using CUDA")
+ self.comm.barrier()
+ self.nccl_group_size=self.comm.allreduce(int(self.IN_GPU_CONTEXT),op=MPI.SUM)
+ ifself.rank==0:self.log(msg=f"[+][NCCL] GROUP size = {self.nccl_group_size}")
+
+
+
[docs]defcheck_A_matrix_format(self):
+r"""
+ Checks the format of matrix A, decides if it's sparse or dense, and sets the
+ necessary attributes related to the matrix's format.
+ """
+ iftype(self.A_ij)in[scipy.sparse.coo.coo_matrix,scipy.sparse.csr.csr_matrix,scipy.sparse.csc.csc_matrix]:
+ ifself.params.densify_A:# Dense A and Array format
+ self.A_ij=self.A_ij.toarray()# Converting to dense for now (Fix Not Efficient)
+ self.A_in_sparse_format=False
+ self.A_IS_SPARSE=False
+ else:# Sparse A and sparse format
+ self.A_in_sparse_format=True
+ self.A_IS_SPARSE=True
+ self.A_nnz=self.A_ij.nnz
+ self.A_density=self.A_nnz/(self.global_m*self.global_n)
+ self.MM=_spMM
+ else:
+ self.A_in_sparse_format=False
+ ifnotself.A_in_sparse_format:
+ self.A_nnz=np.count_nonzero(self.A_ij)
+ self.A_nnz=self.comm.allreduce(self.A_nnz)
+ self.A_density=self.A_nnz/(self.global_m*self.global_n)
+ ifself.A_density>self.params.sparsity_tresh:# Dense A and Array format
+ #self.NMFMethod = 'dense_{}'.format(self.NMFMethod)
+ self.A_IS_SPARSE=False
+ self.MM=_matmul
+ else:# Dense A and sparse format
+ self.A_IS_SPARSE=True
+ ifself.params.sparsify_A:
+ self.A_ij=csr_matrix(self.A_ij)
+ self.A_in_sparse_format=True
+ self.MM=_spMM
+ else:
+ self.A_IS_SPARSE=False
+ self.MM=_matmul
+ self.A_IS_DENSE=notself.A_IS_SPARSE
[docs]defcache_H_on_device(self):# W.I.P
+r"""
+ This method sets the flag indicating that the matrix H is cached on the device.
+ """
+ self.H_is_cached=True
+
+
+
[docs]defcache_W_on_device(self):# W.I,P
+r"""
+ This method sets the flag indicating that the matrix W is cached on the device.
+ """
+ self.W_is_cached=True
+
+
+
+
[docs]deffind_optimat_partition_strategy(self):
+r"""
+ This method determines the best GPU partitioning strategy based on the provided parameters.
+ It supports partitioning in 'row', 'column' or 'auto' modes.
+ """
+ # Attempt to set partition type from parameters; default to 'auto' on failure.
+ try:
+ partition_type=self.params.gpu_partition.lower()
+ except:
+ partition_type='auto'
+
+ # Check the partition type and set the GPU_PARTITION attribute accordingly.
+ # Multiple alias names are supported for each partitioning strategy.
+ ifpartition_typein['row','row1','1row','row_1d','1d_row']:
+ self.GPU_PARTITION='row_1d'
+ # ... (similar checks for other partition types) ...
+ elifpartition_typein['col','col1','1col','col_1d','1d_col','column','column_1d','1d_column']:
+ self.GPU_PARTITION='col_1d'
+ # If 'auto' mode, optimize for grid partition.
+ elifpartition_typein['auto','optim','optimal']:
+ # ... (logic to automatically choose the partition type) ...
+ self.GPU_PARTITION='auto'
+ ifself.rank==0:
+ log(msg=f"[{amber('!')}][PARTION]: GPU_PARTITION is set to {amber(self.GPU_PARTITION)}",rank=self.rank,lrank=self.lrank)
+ log(msg=f"[{amber('.')}][PARTION]: Optimizing for grid partion ...",rank=self.rank,lrank=self.lrank)
+ ifself.global_m>=self.global_n:
+ self.GPU_PARTITION='row_1d'
+ else:
+ self.GPU_PARTITION='col_1d'
+ ifself.rank==0:log(msg=f"[{green('+')}][PARTION]: GPU_PARTITION is set to {green(self.GPU_PARTITION)}",rank=self.rank,lrank=self.lrank)
+ else:
+ raiseException(f"[!!] GPU grid partition {red(params.gpu_partition)} is not supported. Try {green('auto')}, {green('column')}, {green('row')}")
+ # Set ROW and COL partition flags based on GPU_PARTITION.
+ ifself.GPU_PARTITIONin['row_1d']:
+ self.COL_PARTITION=True
+ else:
+ self.COL_PARTITION=False
+ self.ROW_PARTITION=notself.COL_PARTITION
+
+
+
+
[docs]deffind_optimal_baching_axis(self):
+ try:
+ self.BATCHING_AXIS=self.params.batching_axis.lower()
+ except:
+ self.BATCHING_AXIS='auto'
+ ifself.BATCHING_AXISin['row','row1','1row','row_1d','1d_row']:
+ self.BATCHING_AXIS='row_1d'
+ elifself.BATCHING_AXISin['col','col1','1col','col_1d','1d_col','column','column_1d','1d_column']:
+ self.BATCHING_AXIS='col_1d'
+ elifself.BATCHING_AXISin['auto','optim','optimal']:
+ self.BATCHING_AXIS='auto'
+ ifself.rank==0:
+ log(msg=f"[{amber('!')}][BATCHING]: STRATEGY is set to {amber(self.GPU_PARTITION)}",rank=self.rank,lrank=self.lrank)
+ log(msg=f"[{amber('.')}][BATCHING]: Optimizing for BATCHING STRATEGY...",rank=self.rank,lrank=self.lrank)
+ ifself.grid_loc_m>=self.grid_loc_n:
+ self.BATCHING_AXIS='row_1d'
+ else:
+ self.BATCHING_AXIS='col_1d'
+ ifself.rank==0:log(msg=f"[{green('+')}][BATCHING]: STRATEGY is set to {green(self.BATCHING_AXIS)}",rank=self.rank,lrank=self.lrank)
+ else:
+ raiseException(f"[!!] BATCHING STRATEGY {red(params.BATCHING_AXIS)} is not supported. Try {green('auto')}, {green('column')}, {green('row')}")
+ ifself.BATCHING_AXISin['row_1d']:
+ self.ROW_BATCHING_AXIS=True
+ else:
+ self.ROW_BATCHING_AXIS=False
+ self.COL_BATCHING_AXIS=notself.ROW_BATCHING_AXIS
[docs]defget_managed_stream_queue(self):
+"""
+ Initializes and returns a dictionary for managing stream queues for GPU operations.
+ """
+ SQ={}
+ SQ['Queue']=Queue(maxsize=self.qs)
+ SQ['READY']={}
+ #[1] En-queue working streams and set their states to ready to work
+ forjinrange(self.qs):
+ i=j+1
+ SQ['READY'][i]=cp.cuda.stream.Stream()
+ SQ['Queue'].put(i)
+ SQ['REDUCE']=cp.cuda.stream.Stream()
+ SQ['COMPLETED']=[]
+ returnSQ
+
+
+
[docs]defconfigure_sparse_buffers_pinned_mem_params(self):
+r"""
+ Configures the parameters for pinned memory buffers when dealing with sparse matrices.
+ """
+ ifself.SPARSE_BUFF_PINNED_MEM_PARAMS_CONFIGURED:return
+ assertself.BATCHING_AXISisnotNone,"[!] PINNED MEM BUFFERS are used only for BATCHING"
+ assertself.A_IS_SPARSE,"[!] SPARSE PINNED MEM BUFFERS are used only for BATCHING A in SPARSE FORMAT"
+ self.buff_idx,self.buff_ptr=[],[]
+ self.sparse_vect_size={'dat':[],'idx':[],'ptr':[]}
+ self.sparseData_max_vect_size,self.sparseIdx_max_vect_size,self.sparsePtr_max_vect_size=0,0,0
+ forbinrange(self.nBatch):
+ #if self.ROW_PARTITION:
+ ifself.ROW_BATCHING_AXIS:
+ i0,i1=b*self.batch_size,(b+1)*self.batch_size
+ self.sparse_vect_size['dat'].append(len(self.A_ij[i0:i1,:].data))
+ self.sparse_vect_size['idx'].append(len(self.A_ij[i0:i1,:].indices))
+ self.sparse_vect_size['ptr'].append(len(self.A_ij[i0:i1,:].indptr))
+ self.sparseData_max_vect_size=max(self.sparseData_max_vect_size,len(self.A_ij[i0:i1,:].data))
+ self.sparseIdx_max_vect_size=max(self.sparseIdx_max_vect_size,len(self.A_ij[i0:i1,:].indices))
+ self.sparsePtr_max_vect_size=max(self.sparsePtr_max_vect_size,len(self.A_ij[i0:i1,:].indptr))
+ else:# Col Partition
+ j0,j1=b*self.batch_size,(b+1)*self.batch_size
+ self.sparse_vect_size['dat'].append(len(self.A_ij[:,j0:j1].data))
+ self.sparse_vect_size['idx'].append(len(self.A_ij[:,j0:j1].indices))
+ self.sparse_vect_size['ptr'].append(len(self.A_ij[:,j0:j1].indptr))
+ self.sparseData_max_vect_size=max(self.sparseData_max_vect_size,len(self.A_ij[:,j0:j1].data))
+ self.sparseIdx_max_vect_size=max(self.sparseIdx_max_vect_size,len(self.A_ij[:,j0:j1].indices))
+ self.sparsePtr_max_vect_size=max(self.sparsePtr_max_vect_size,len(self.A_ij[:,j0:j1].indptr))
+ #print("[b%04d] lenData, lenIdx, lenPtr = {},{},{}".format(self.sparseData_max_vect_size, self.sparseIdx_max_vect_size, self.sparsePtr_max_vect_size) %b)
+ self.SPARSE_BUFF_PINNED_MEM_PARAMS_CONFIGURED=True
+
+
+
+
+
[docs]defallocate_sparse_batch_buffers(self):
+r"""
+ Allocate memory for sparse batch buffers.
+
+ This method configures memory parameters if not already set. Based on the specified
+ batching axis, it initializes the appropriate buffers to store the data.
+ """
+ # Check if memory parameters for sparse buffers are configured
+ ifnotself.SPARSE_BUFF_PINNED_MEM_PARAMS_CONFIGURED:
+ self.configure_sparse_buffers_pinned_mem_params()
+
+ # Allocate memory based on column batching axis
+ ifself.COL_BATCHING_AXIS:
+ self.H_d,self.X_d=[],[]
+ forninrange(self.batchQeueSize):
+ self.H_d.append(cp.empty((self.k,self.batch_size),dtype=self.A_ij.dtype))
+ self.X_d.append(cp.empty(self.sparseData_max_vect_size,dtype=self.A_ij.dtype))
+ self.buff_idx.append(cp.empty(self.sparseIdx_max_vect_size,dtype=cp.int32))
+ self.buff_ptr.append(cp.empty(self.sparsePtr_max_vect_size,dtype=cp.int32))
+ else:# ROW_BATCHING_AXIS
+ self.W_d,self.X_d=[],[]
+ forninrange(self.batchQeueSize):
+ self.W_d.append(cp.empty((self.batch_size,self.k),dtype=self.A_ij.dtype))
+ self.X_d.append(cp.empty(self.sparseData_max_vect_size,dtype=self.A_ij.dtype))
+ self.buff_idx.append(cp.empty(self.sparseIdx_max_vect_size,dtype=cp.int32))
+ self.buff_ptr.append(cp.empty(self.sparsePtr_max_vect_size,dtype=cp.int32))
+
+
+
[docs]defallocate_dense_batch_buffers(self):
+r"""
+ Allocate memory for dense batch buffers.
+
+ Based on the specified batching axis, this method initializes the appropriate
+ buffers to store dense data.
+ """
+
+ # Allocate memory based on column batching axis
+ ifself.COL_BATCHING_AXIS:
+ self.H_d,self.X_d=[],[]
+ forninrange(self.batchQeueSize):
+ self.H_d.append(cp.empty((self.k,self.batch_size),dtype=self.A_ij.dtype))
+ #self.X_d.append(cp.empty((self.grid_loc_m, self.batch_size), dtype=self.A_ij.dtype))
+ self.X_d.append(cp.empty((self.grid_glob_m,self.batch_size),dtype=self.A_ij.dtype))
+ else:# ROW_BATCHING_AXIS
+ self.W_d,self.X_d=[],[]
+ forninrange(self.batchQeueSize):
+ self.W_d.append(cp.empty((self.batch_size,self.k),dtype=self.A_ij.dtype))
+ #self.X_d.append(cp.empty((self.batch_size, self.grid_loc_n), dtype=self.A_ij.dtype))
+ self.X_d.append(cp.empty((self.batch_size,self.grid_glob_n),dtype=self.A_ij.dtype))
+
+
+
[docs]defallocate_gpu_batch_buffers(self):
+r"""
+ Allocate memory for batch buffers on GPU.
+
+ Depending on whether the data is sparse or dense, it calls the appropriate
+ method to initialize the buffers.
+ """
+ # Check if the data is batched
+ ifself.A_IS_BATCHED:
+ ifself.A_IS_SPARSE:
+ self.allocate_sparse_batch_buffers()
+ else:
+ self.allocate_dense_batch_buffers()
+
+
[docs]defshowMemStats(self,msg=None):
+r"""
+ Display memory statistics.
+
+ It logs information about used and available memory, including details about
+ the pinned memory pool.
+
+ Parameters:
+ msg (str): Optional message to include in the log.
+ """
+ # Only process with rank 0 displays the memory stats
+ ifself.rank==0:
+ ifmsgisnotNone:log(msg=f"[{amber('!')}][MEM INFO] {msg}",rank=self.rank,lrank=self.lrank)
+ # Log info based on whether the data is batched or not
+ ifself.A_IS_BATCHED:
+ ifmsgisnotNone:log(msg=f"[{amber('!')}][MEM INFO] {msg}",rank=self.rank,lrank=self.lrank)
+ log(msg=f"[{amber('!')}][PINNEDMEM]: FREE BLOCKS = {amber(self.PINNEDMEMPOOL.n_free_blocks())}",rank=self.rank,lrank=self.lrank)
+ else:
+ log(msg=f"[{amber('!')}][MEMPOOL]: USED = {amber(self.MEMPOOL.used_bytes()/self.GB)} GB || TOTAL = {red(self.MEMPOOL.total_bytes()/self.GB)} GB",rank=self.rank,lrank=self.lrank)
+ log(msg=f"[{amber('!')}][PINNEDMEM]: FREE BLOCKS = {amber(self.PINNEDMEMPOOL.n_free_blocks())}",rank=self.rank,lrank=self.lrank)
+
+
[docs]defsampleA(self,noise_var,method,seed=None):
+r"""
+ Samples matrix A based on a specified method and noise variance.
+
+ Parameters:
+ - noise_var (float): Variance of the noise.
+ - method (str): Sampling method, either 'uniform' or 'poisson'.
+ - seed (int, optional): Random seed for reproducibility.
+
+ Raises:
+ - Exception if an unsupported sampling method is provided.
+ """
+ self.noise_var=noise_var
+ self.seed=seed
+ ifself.seed!=None:
+ cp.random.seed(self.seed)
+ self.sampling_method=method
+ ifself.sampling_method=='uniform':
+ self.randM()
+ elifself.sampling_method=='poisson':
+ self.poisson()
+
+
+
[docs]defrandM(self):
+r"""
+ Perturbs the elements of X by multiplying them with a uniform random number in the range (1-epsilon, 1+epsilon).
+ """
+
+ # Check if X_per has been initialized, if not, initialize the arrays
+ ifnotself.X_per_initialized:self.X_per,self.X_idx,self.X_ptr=[],[],[]
+ #self.showMemStats(msg = " RandM() 0")
+
+ # Free up memory blocks in the memory pool
+ self.MEMPOOL.free_all_blocks()
+ #self.showMemStats(msg = " RandM() MEMPOOL Freed OK")
+
+ # Get the max available memory in bytes
+ MAXMEM=self.MEMPOOL.total_bytes()*1.0
+ ifself.A_IS_BATCHED:#CPU Array
+ forbinrange(self.nBatch):
+ ifself.A_IS_SPARSE:
+ ifself.ROW_BATCHING_AXIS:
+ i0,i1=b*self.batch_size,(b+1)*self.batch_size
+ X=self.A_ij[i0:i1,:]
+ else:# Col Axis
+ j0,j1=b*self.batch_size,(b+1)*self.batch_size
+ X=self.A_ij[:,j0:j1]
+ M=np.random.random_sample(X.data.shape).astype(self.A_ij.dtype)
+ M=2.0*self.noise_var*M+self.noise_var+1.0
+ X.data*=M
+ ifnotself.X_per_initialized:# Initialize sparse array pinned mem buffers
+ self.X_per.append(_pin_mem(np.append(X.data,np.zeros(self.sparseData_max_vect_size-len(X.data),dtype=X.data.dtype))))
+ self.X_idx.append(_pin_mem(np.append(X.indices,np.zeros(self.sparseIdx_max_vect_size-len(X.indices),dtype=X.indices.dtype))))
+ self.X_ptr.append(_pin_mem(np.append(X.indptr,np.zeros(self.sparsePtr_max_vect_size-len(X.indptr),dtype=X.indptr.dtype))))
+ else:
+ #print(f"Before : type(X_per) = {self.X_per[b].dtype}")
+ self.X_per[b]=np.append(X.data,np.zeros(self.sparseData_max_vect_size-len(X.data),dtype=X.data.dtype)).astype(self.A_ij.dtype)
+ self.X_idx[b]=np.append(X.indices,np.zeros(self.sparseIdx_max_vect_size-len(X.indices),dtype=X.indices.dtype))
+ self.X_ptr[b]=np.append(X.indptr,np.zeros(self.sparsePtr_max_vect_size-len(X.indptr),dtype=X.indptr.dtype))
+ #print(f"Afterpe : type(X_per) = {self.X_per[b].dtype}")
+
+ else:# A in Dense
+ ifself.ROW_BATCHING_AXIS:
+ i0,i1=b*self.batch_size,(b+1)*self.batch_size
+ M=np.random.random_sample(self.A_ij[i0:i1,:].shape).astype(self.A_ij.dtype)
+ M=2.0*self.noise_var*M+self.noise_var+1.0
+ ifnotself.X_per_initialized:
+ self.X_per.append(_pin_mem(np.multiply(self.A_ij[i0:i1,:],M)))
+ else:
+ self.X_per[b]=np.multiply(self.A_ij[i0:i1,:],M)
+ else:# Row Axis
+ j0,j1=b*self.batch_size,(b+1)*self.batch_size
+ M=np.random.random_sample(self.A_ij[:,j0:j1].shape).astype(self.A_ij.dtype)
+ M=2.0*self.noise_var*M+self.noise_var+1.0
+ ifnotself.X_per_initialized:
+ self.X_per.append(_pin_mem(np.multiply(self.A_ij[:,j0:j1],M)))
+ else:
+ self.X_per[b]=np.multiply(self.A_ij[:,j0:j1],M)
+
+ else:#GPU Array
+ forbinrange(self.nBatch):
+ ifself.A_IS_SPARSE:
+ X=self.A_d[b]*1.0
+ M=cp.random.random_sample(self.A_d[b].data.shape,dtype=self.A_d[b].dtype)
+ M=2.0*self.noise_var*M+self.noise_var+1.0
+ X.data*=M
+ #self.X_per.append(X)
+ ifnotself.X_per_initialized:
+ self.X_per.append(X)
+ else:
+ self.X_per[b]=X
+ else:# A in Dense
+ self.showMemStats(msg=" RandM() BEFORE X_per init b%04d"%b)
+ MAXMEM=max(MAXMEM,self.MEMPOOL.total_bytes()*1.0)
+ ifnotself.X_per_initialized:
+ self.X_per.append(cp.random.random_sample(self.A_d[b].shape,dtype=self.A_d[b].dtype))
+ self.showMemStats(msg=" RandM() AFTER X_per init 1 b%04d"%b)
+ MAXMEM=max(MAXMEM,self.MEMPOOL.total_bytes()*1.0)
+ else:
+ self.X_per[b]=cp.random.random_sample(self.A_d[b].shape,dtype=self.A_d[b].dtype)
+ self.showMemStats(msg=" RandM() AFTER X_per set 1 b%04d"%b)
+ MAXMEM=max(MAXMEM,self.MEMPOOL.total_bytes()*1.0)
+ self.X_per[b]=2.0*self.noise_var
+ self.X_per[b]+=self.noise_var+1.0
+ self.X_per[b]=cp.multiply(self.A_d[b],self.X_per[b])
+ self.showMemStats(msg=" RandM() X_per init OK b%04d"%b)
+ ifself.rank==0:log(msg=f"[{amber('!')}][MEMPOOL]: randM(): MEM Peak = {blue(MAXMEM/self.GB)} GB",rank=self.rank,lrank=self.lrank)
+ self.MEMPOOL.free_all_blocks()
+
+ ifnotself.X_per_initialized:self.X_per_initialized=True
+
+
+
[docs]definit_factors(self):
+r"""
+ Initializes NMF factor matrices W and H based on the selected initialization method.
+ """
+ # Random initialization
+ ifself.params.init=='rand':
+ ifself.topo_dim=='2d':
+ #raise Exception(f"[{red('!!')}] 2D topology is not yet supported on GPU")
+ noSupport("2D topology")
+ elifself.topo_dimin['1d']:
+ rng_h=np.random.RandomState(self.iter_seeds[self.k])
+ rng_d=cp.random.RandomState(self.iter_seeds[self.k])
+ ifself.ROW_BATCHING_AXIS:
+ self.H_d=cp.random.rand(self.k,self.grid_glob_n).astype(self.A_ij.dtype)# H [k, J]
+ self.H_is_cached=True
+ ifself.A_IS_BATCHED:# W is distributed, H is replicate on Hostd
+ self.W_h=[]
+ forninrange(self.nBatch):
+ self.W_h.append(_pin_mem(rng_h.rand(self.batch_size,self.k).astype(self.A_ij.dtype)))
+ self.W_is_cached=False
+ else:
+ self.W_d=rng_d.rand(self.grid_glob_m,self.k).astype(self.A_ij.dtype)# W_h [m, k]
+ self.W_is_cached=True
+ else:# COL_BATCHING_AXIS:
+ self.W_d=cp.random.rand(self.grid_glob_m,self.k).astype(self.A_ij.dtype)# W_d [I, k]
+ self.W_is_cached=True
+ ifself.A_IS_BATCHED:# H is distributed, W is replicate on Hostd
+ self.H_h=[]
+ forninrange(self.nBatch):
+ self.H_h.append(_pin_mem(rng_h.rand(self.k,self.batch_size).astype(self.A_ij.dtype)))
+ self.H_is_cached=False
+ else:
+ self.H_d=rng_d.rand(self.k,self.grid_glob_n).astype(self.A_ij.dtype)# W_h [m, k]
+ self.H_is_cached=True
+ # NNSVD-based initialization
+ elifself.params.initin['nnsvd','svd','singular']:
+ ifself.topo_dim=='1d':
+ self.AA=[]
+ ifself.A_IS_BATCHED:
+ foriinrange(self.nGPUs):
+ self.AA.append(np.zeros(self.grid_L,self.grid_L))
+ else:
+ foriinrange(self.nGPUs):
+ self.AA.append(_zeros((self.grid_L,self.grid_L)))
+ self.svdSoFar=[[1,0,0]]
+ self.W_i,self.H_j=self.nnsvd(flag=1,verbose=0)
+ elifself.topo_dim=='2d':
+ raiseException('NNSVD init only available for 1D topology, please try with 1d topo.')
+ else:
+ # Raise an error if an unsupported initialization method is chosen
+ raiseException(f"[{red('!!')}] Only Random and nnsvd init is supported on GPU")
+
+
+
+
+
[docs]defnormalize_features(self):
+r"""
+ Normalizes the NMF factor matrices W and H.
+ """
+ # If A is batched
+ ifself.A_IS_BATCHED:# W.I.P
+ ifself.COL_PARTITION:# W.I.P
+ pass
+ else:
+ Wall_norm=self.W_d.sum(axis=0,keepdims=True)
+ ifself.params.p_r!=1:self.nccl_comm.Allreduce(Wall_norm,Wall_norm,op=nccl.NCCL_SUM,stream=None)
+ self.W_d/=Wall_norm+self.eps
+ self.H_h*=cp.asnumpy(Wall_norm.T)
+
+ else:# If A is not batched
+ Wall_norm=self.W_d.sum(axis=0,keepdims=True)
+ ifself.topo_dim=='2d':
+ raiseException(f"[{red('!!')}] 2D topology is not yet supported on GPU")
+ elifself.topo_dim=='1d':
+ ifself.params.p_r!=1:
+ self.nccl_comm.Allreduce(Wall_norm,Wall_norm,op=nccl.NCCL_SUM,stream=None)
+ else:
+ raiseException(f"[{red('!!')}] Topology {self.topo_dim}is not yet supported")
+ self.W_d/=Wall_norm+self.eps
+ self.H_d*=Wall_norm.T
+
+
+
[docs]defrelative_err(self):
+r"""
+ Computes the relative reconstruction error of the NMF decomposition.
+ """
+
+ # Check topology and raise an error for unsupported topologies
+ ifself.topo_dim=='2d':raiseException(f"[{red('!!')}] 2D topology is not yet supported on GPU")
+ err=0.0
+ # Get the max available memory in bytes
+ MAXMEM=self.MEMPOOL.total_bytes()*1.0
+
+ # Compute the error for batched CPU arrays
+ ifself.A_IS_BATCHED:#CPU Array
+ forbinrange(self.nBatch):
+ ifself.ROW_BATCHING_AXIS:# ROW AXIS
+ self.H_h=cp.asnumpy(self.H_d)# Download H
+ ifself.A_IS_SPARSE:
+ err+=np.square(self.X_per[b].toarray()-(self.W_h[b]@self.H_h)).sum()
+ else:
+ err+=np.square(self.X_per[b]-(self.W_h[b]@self.H_h)).sum()
+ else:# COL AXIS
+ self.W_h=cp.asnumpy(self.W_d)# Downlod W
+ ifself.A_IS_SPARSE:
+ #err += np.square( self.X_per[b].toarray() - (self.W_h @ self.H_h[b]) ).sum()
+ err+=0.0
+ else:
+ err+=np.square(self.X_per[b]-(self.W_h@self.H_h[b])).sum()
+ self.glob_norm_A=self.dist_norm(self.A_ij)
+ else:# Compute the error for GPU arrays
+ #self.showMemStats(msg = " relative_err() STARTING BATCHES")
+ MAXMEM=max(MAXMEM,self.MEMPOOL.total_bytes()*1.0)
+ forbinrange(self.nBatch):
+ ifself.ROW_BATCHING_AXIS:# Row Partition
+ i0,i1=b*self.grid_I,(b+1)*self.grid_I
+ ifself.A_IS_SPARSE:
+ err+=cp.square(self.A_d[b].toarray()-(_matmul(self.W_d[i0:i1],self.H_d))).sum()
+ else:
+
+ err+=cp.square(self.A_d[b]-(_matmul(self.W_d[i0:i1],self.H_d))).sum()
+ MAXMEM=max(MAXMEM,self.MEMPOOL.total_bytes()*1.0)
+ self.MEMPOOL.free_all_blocks()
+
+ else:# Col Partition
+ j0,j1=b*self.grid_J,(b+1)*self.grid_J
+ ifself.A_IS_SPARSE:
+ err+=cp.square(self.A_d[b].toarray()-(_matmul(self.W_d,self.H_d[:,j0:j1]))).sum()
+ else:
+ err+=cp.square(self.A_d[b]-(_matmul(self.W_d,self.H_d[:,j0:j1]))).sum()
+ err=err.get()
+ self.glob_norm_A=self.dist_norm(self.A_d)
+ #self.glob_norm_A = self.dist_norm(self.X_per)
+ #del err
+ err=self.sub_comm.allreduce(err)
+ self.glob_norm_err=np.sqrt(err)
+ self.recon_err=self.glob_norm_err/self.glob_norm_A
+ ifself.rank==0:log(msg=f"[{amber('!')}][MEMPOOL]: relative_err(): MEM Peak = {blue(MAXMEM/self.GB)} GB",rank=self.rank,lrank=self.lrank)
+
+
[docs]defdist_norm(self,X,proc=-1,norm='fro',axis=None):
+r"""Computes the distributed norm of an array.
+
+ Args:
+ - X (list/array): The input array or list of arrays.
+ - proc (int, optional): Processor ID. Default is -1.
+ - norm (str, optional): Type of matrix norm. Default is 'fro' (Frobenius norm).
+ - axis (optional): Axis for norm computation. Default is None.
+
+ Returns:
+ - float: The computed distributed norm.
+ """
+
+ # If X is a list, this is a batched computation.
+ iftype(X)in[list]:# BACHED
+ array_type=self.getArrayType(X[0])
+ N=len(X)
+ err=0.0
+ try:
+ device,density=array_type.split('_')
+ except:
+ raiseException('[!][dist_norm()] UNABLE TO IDENTIFY ARRAY OBJECT TYPE {}!!'.format(array_type))
+ ifdevicein['CPU']:
+ ifdensityin['DENSE']:
+ forninrange(N):err+=np.square(X[n]).sum()
+ elifdensityin['SPARSE']:
+ forninrange(N):err+=np.square(X[n].data).sum()
+ else:
+ raiseException('[!][dist_norm()] UNABLE TO IDENTIFY ARRAY OBJECT TYPE {}!!'.format(array_type))
+ elifdevicein['GPU']:
+ ifdensityin['DENSE']:
+ forninrange(N):err+=cp.square(X[n]).sum()
+ elifdensityin['SPARSE']:
+ forninrange(N):err+=cp.square(X[n].data).sum()
+ else:
+ raiseException('[!][dist_norm()] UNABLE TO IDENTIFY ARRAY OBJECT TYPE {}!!'.format(array_type))
+ err=err.get()
+
+ else:
+ raiseException('[!][dist_norm()] UNABLE TO IDENTIFY ARRAY OBJECT TYPE {}!!'.format(array_type))
+ else:# If X is not a list, this is a non-batched computation.
+ array_type=self.getArrayType(X)
+ try:
+ device,density=array_type.split('_')
+ except:
+ raiseException('[!][dist_norm()] UNABLE TO IDENTIFY ARRAY OBJECT TYPE {}!!'.format(array_type))
+ ifdevicein['CPU']:
+ ifdensityin['DENSE']:
+ err=np.square(X).sum()
+ elifdensityin['SPARSE']:
+ err=np.square(X.data).sum()
+ elifdevicein['GPU']:
+ ifdensityin['DENSE']:
+ err=cp.square(X).sum()
+ elifdensityin['SPARSE']:
+ err=cp.square(X.data).sum()
+ else:
+ raiseException('[!][dist_norm()] UNABLE TO IDENTIFY ARRAY OBJECT TYPE {}!!'.format(array_type))
+ err=err.get()
+ else:
+ raiseException('[!][dist_norm()] UNABLE TO IDENTIFY ARRAY OBJECT TYPE {}!!'.format(array_type))
+
+ err=self.sub_comm.allreduce(err)
+ returnnp.sqrt(err)
+
+
+
[docs]deffit(self,factors=None,save_factors=False,rtrn=['W','H']):
+r"""Fits the NMF model based on provided parameters.
+
+ Args:
+ - factors (list, optional): Initial factor matrices. Default is None.
+ - save_factors (bool, optional): Flag to decide whether to save factor matrices. Default is False.
+ - rtrn (list, optional): List of matrix names to return. Default is ['W','H'].
+
+ Returns:
+ - tuple: Factor matrices and meta information.
+ """
+ ifself.params.norm.upper()=='FRO':
+ ifself.params.method.upper()=='MU':
+ self.Fro_MU_update(factors=factors,save_factors=save_factors,rtrn=rtrn)
+ else:
+ raiseException('Not a valid method: Choose (mu)')
+ elifself.params.norm.upper()=='KL':
+ ifself.params.method.upper()=='MU':
+ self.KL_MU_update(factors=factors,save_factors=save_factors,rtrn=rtrn)
+ else:
+ raiseException('Not a valid method: Choose (mu)')
+ else:
+ raiseException('Not a valid norm: Choose (fro/kl)')
+ #return self.W_i, self.H_j
+ metha={}
+ metha['dt']=self.dt
+ metha['err']=self.recon_err
+ ifself.A_IS_BATCHED:
+ ifself.ROW_BATCHING_AXIS:
+ returnself.W_h,cp.asnumpy(self.H_d),metha
+ else:
+ returncp.asnumpy(self.W_d),self.H_h,metha
+ #return self.W_h, self.H_h, self.recon_err
+ else:
+ returncp.asnumpy(self.W_d),cp.asnumpy(self.H_d),metha#self.recon_err.get()
+
+
+
[docs]defFro_MU_update(self,factors=None,save_factors=False,rtrn=['W','H']):
+r"""Performs updates for NMF using the Frobenius norm and MU optimization method.
+
+ Args:
+ - factors (list, optional): Initial factor matrices. Default is None.
+ - save_factors (bool, optional): Flag to decide whether to save factor matrices. Default is False.
+ - rtrn (list, optional): List of matrix names to return. Default is ['W','H'].
+
+ Returns:
+ - None
+ """
+ # Memory management and initialization steps.
+ self.showMemStats(msg=" Fro_MU_update() 0")
+ self.MEMPOOL.free_all_blocks()
+ #self.showMemStats(msg = " Fro_MU_update() MEMPOOL Freed OK")
+ self.PINNEDMEMPOOL.free_all_blocks()
+ # If the matrix is batched, perform the updates using the appropriate method based on partitioning axis.
+ ifself.A_IS_BATCHED:# BATCHED CPU -> GPU
+ ifself.COL_BATCHING_AXIS:
+ self.FroNMF_1D_row(factors=factors,save_factors=save_factors,rtrn=rtrn)
+ elifself.ROW_BATCHING_AXIS:
+ #self.FroNMF_1D_col(factors=factors, save_factors=save_factors, rtrn=rtrn)
+ self.FroNMF_1D_row_batched(factors=factors,save_factors=save_factors,rtrn=rtrn)
+ else:
+ raiseException(f"[{red('!!')}] Grid Partition UNDEFINED!! ")
+ else:# LOCAL BATCHING on GPU Mem
+ ifself.COL_BATCHING_AXIS:
+ self.FroNMF_1D_col_partion(factors=factors,save_factors=save_factors,rtrn=rtrn)
+ elifself.ROW_BATCHING_AXIS:
+ self.FroNMF_1D_row_partion(factors=factors,save_factors=save_factors,rtrn=rtrn)
+ else:
+ raiseException(f"[{red('!!')}] Grid Partition UNDEFINED!! ")
[docs]defspMat(A,Format='coo',shape=None,dtype=None,copy=False):
+r"""
+ Converts a given matrix A into a specified sparse format using CuPy.
+
+ Parameters:
+ -----------
+ A : ndarray or sparse matrix
+ Input matrix to be converted.
+ Format : {'coo', 'csr', 'csc', 'dia'}
+ Desired sparse format. Default is 'coo'.
+ shape : tuple, optional
+ Desired shape for the sparse matrix.
+ dtype : data-type, optional
+ Data type of the result.
+ copy : bool, optional
+ If true, it guarantees the input data A is not modified. Default is False.
+
+ Returns:
+ --------
+ Sparse matrix in the desired format and library (cupy).
+ """
+ fmt=Format.lower()
+ assertfmtin['coo','csr','csc','dia'],f"[{red('!')}] Format must be in [{green('coo')}, {green('crs')}, {green('csc')}, {green('dia')}] "
+ isNdArr=False
+ iftype(A)in[numpy.ndarray]:isNdArr=True
+ iffmtin['coo']:
+ ifisNdArr:
+ returncupyx.scipy.sparse.coo_matrix(coo_matrix(A),shape=shape,dtype=dtype,copy=copy)
+ else:
+ returncupyx.scipy.sparse.coo_matrix(A,shape=shape,dtype=dtype,copy=copy)
+ eliffmtin['csc']:
+ ifisNdArr:
+ returncupyx.scipy.sparse.csc_matrix(coo_matrix(A),shape=shape,dtype=dtype,copy=copy)
+ else:
+ returncupyx.scipy.sparse.csc_matrix(A,shape=shape,dtype=dtype,copy=copy)
+ eliffmtin['csr']:
+ ifisNdArr:
+ returncupyx.scipy.sparse.csr_matrix(coo_matrix(A),shape=shape,dtype=dtype,copy=copy)
+ else:
+ returncupyx.scipy.sparse.csr_matrix(A,shape=shape,dtype=dtype,copy=copy)
+ eliffmtin['dia']:
+ ifisNdArr:
+ returncupyx.scipy.sparse.dia_matrix(coo_matrix(A),shape=shape,dtype=dtype,copy=copy)
+ else:
+ returncupyx.scipy.sparse.dia_matrix(A,shape=shape,dtype=dtype,copy=copy)
+ else:
+ print(f"[!] sparse matrix format '{fmt}' is not supported")
+
+
+
+
[docs]defspCu2Sc(A,copy=False):
+r"""
+ Converts a CuPy sparse matrix to a SciPy sparse matrix.
+
+ Parameters:
+ -----------
+ A : cupyx sparse matrix
+ Input matrix to be converted.
+ copy : bool, optional
+ If true, it guarantees the input data A is not modified. Default is False.
+
+ Returns:
+ --------
+ Sparse matrix in the desired format and library (scipy).
+ """
+ fmt=A.format.lower()
+ assertfmtin['coo','csr','csc','dia'],f"[{red('!')}] Format must be in [{green('coo')}, {green('crs')}, {green('csc')}, {green('dia')}] "
+ iffmtin['coo']:
+ returnscipy.sparse.coo_matrix((A.data.get(),A.indices.get(),A.indptr.get()),shape=A.shape,dtype=A.dtype)
+ eliffmtin['csc']:
+ returnscipy.sparse.csc_matrix((A.data.get(),A.indices.get(),A.indptr.get()),shape=A.shape,dtype=A.dtype)
+ eliffmtin['csr']:
+ returnscipy.sparse.csr_matrix((A.data.get(),A.indices.get(),A.indptr.get()),shape=A.shape,dtype=A.dtype)
+ eliffmtin['dia']:
+ returnscipy.sparse.dia_matrix((A.data.get(),A.indices.get(),A.indptr.get()),shape=A.shape,dtype=A.dtype)
+ else:
+ print(f"[!] sparse matrix format '{fmt}' is not supported")
+
+
[docs]defspSc2Cu(A,copy=False):
+r"""
+ Converts a SciPy sparse matrix to a CuPy sparse matrix.
+
+ Parameters:
+ -----------
+ A : scipy sparse matrix
+ Input matrix to be converted.
+ copy : bool, optional
+ If true, it guarantees the input data A is not modified. Default is False.
+
+ Returns:
+ --------
+ Sparse matrix in the desired format and library (cupy).
+ """
+ fmt=A.format.lower()
+ assertfmtin['coo','csr','csc','dia'],f"[{red('!')}] Format must be in [{green('coo')}, {green('crs')}, {green('csc')}, {green('dia')}] "
+ iffmtin['coo']:
+ returncupyx.scipy.sparse.coo_matrix((cp.array(A.data),cp.array(A.indices),cp.array(A.indptr)),shape=A.shape,dtype=A.dtype)
+ eliffmtin['csc']:
+ returncupyx.scipy.sparse.csc_matrix((cp.array(A.data),cp.array(A.indices),cp.array(A.indptr)),shape=A.shape,dtype=A.dtype)
+ eliffmtin['csr']:
+ returncupyx.scipy.sparse.csr_matrix((cp.array(A.data),cp.array(A.indices),cp.array(A.indptr)),shape=A.shape,dtype=A.dtype)
+ eliffmtin['dia']:
+ returncupyx.scipy.sparse.dia_matrix((cp.array(A.data),cp.array(A.indices),cp.array(A.indptr)),shape=A.shape,dtype=A.dtype)
+ else:
+ print(f"[!] sparse matrix format '{fmt}' is not supported")
+
+
+
[docs]defspRandMat(nr,nc,density,Format='coo',dtype=cupy.float32):
+r"""
+ Generates a random sparse matrix using CuPy.
+
+ Parameters:
+ -----------
+ nr : int
+ Number of rows.
+ nc : int
+ Number of columns.
+ density : float
+ Desired density for the sparse matrix. Values between 0 and 1.
+ Format : {'coo', 'csr', 'csc', 'dia'}, optional
+ Desired sparse format. Default is 'coo'.
+ dtype : data-type, optional
+ Data type of the result. Default is cupy.float32.
+
+ Returns:
+ --------
+ Random sparse matrix in the desired format and library (cupy).
+ """
+ fmt=Format.lower()
+ returncupyx.scipy.sparse.random(nr,nc,density=density,format=fmt,dtype=dtype)
+
+
+
[docs]defspMM(A,B,alpha=1.0,beta=0.0,transpA=False,transpB=False):
+r"""
+ Performs matrix multiplication of sparse matrix A and dense matrix B using CuPy.
+
+ Parameters:
+ -----------
+ A : cupyx sparse matrix
+ Left sparse matrix.
+ B : ndarray
+ Right dense matrix.
+ alpha : float, optional
+ Scalar multiplier for the product of A and B. Default is 1.0.
+ beta : float, optional
+ Scalar multiplier for the initial matrix C (if provided). Default is 0.0.
+ transpA : bool, optional
+ If True, transpose matrix A before multiplication. Default is False.
+ transpB : bool, optional
+ If True, transpose matrix B before multiplication. Default is False.
+
+ Returns:
+ --------
+ Resultant matrix after multiplication.
+ """
+ assertcupy.cusparse.check_availability('spmm'),"[!] spmm is not available"
+ #if transpA : A = A.T
+ #if transpB : B = B.T
+ ifnotA.has_canonical_format:A.sum_duplicates()
+ B=cupy.array(B,order='f')
+ returncupy.cusparse.spmm(A,B,alpha=alpha,beta=beta,transa=transpA,transb=transpB)
+
+
+
[docs]defspMM_with_C(A,B,C,alpha=1.0,beta=0.0,transpA=False,transpB=False):
+r"""
+ Performs matrix multiplication of sparse matrix A and dense matrix B, and adds it to matrix C using CuPy.
+
+ Parameters:
+ -----------
+ A : cupyx sparse matrix
+ Left sparse matrix.
+ B : ndarray
+ Right dense matrix.
+ C : ndarray
+ Dense matrix to which the result is added.
+ alpha : float, optional
+ Scalar multiplier for the product of A and B. Default is 1.0.
+ beta : float, optional
+ Scalar multiplier for the initial matrix C. Default is 0.0.
+ transpA : bool, optional
+ If True, transpose matrix A before multiplication. Default is False.
+ transpB : bool, optional
+ If True, transpose matrix B before multiplication. Default is False.
+
+ Returns:
+ --------
+ Resultant matrix after multiplication and addition to C.
+ """
+ assertcupy.cusparse.check_availability('spmm'),"[!] spmm is not available"
+ #if transpA : A = A.T
+ #if transpB : B = B.T
+ ifnotA.has_canonical_format:A.sum_duplicates()
+ B=cupy.array(B,order='f')
+ C=cupy.array(C,order='f')
+ returncupy.cusparse.spmm(A,B,c=C,alpha=alpha,beta=beta,transa=transpA,transb=transpB)
+# @author: Ismael Boureima
+
+TPB=32
+
+importcupyascp
+importnumpyasnp
+#from sgemm import sgemm
+from.toolzimportamber,blue,green,purple,red
+
+# Initializing a pinned memory pool in CuPy to speed up host-device memory transfers.
+# However, the following two lines are commented out and thus inactive.
+#pinned_memory_pool = cupy.cuda.PinnedMemoryPool()
+#cupy.cuda.set_pinned_memory_allocator(pinned_memory_pool.malloc)
+#mem = cp.cuda.alloc_pinned_memory(array.nbytes)
+
+
[docs]defpin_memory(array):
+"""Allocate memory in the pinned (or "page-locked") area. Data in this memory can be transferred to the GPU faster."""
+ mem=cp.cuda.alloc_pinned_memory(array.nbytes)
+ ret=np.frombuffer(mem,array.dtype,array.size).reshape(array.shape)
+ ret[...]=array
+ returnret
+
+
[docs]defread_code(code_filename,params):
+"""Read a code file and prepend it with CUDA kernel parameter definitions."""
+ withopen(code_filename,'r')asf:
+ code=f.read()
+ fork,vinparams.items():
+ code='#define '+k+' '+str(v)+'\n'+code
+ returncode
+
+
+
[docs]defbenchmark(func,args,n_run=1):
+"""Benchmark a given function with provided arguments over n_run runs. Return a list of execution times."""
+ times=[]
+ for_inrange(n_run):
+ start=cp.cuda.Event()
+ end=cp.cuda.Event()
+ start.record()
+ func(*args)
+ end.record()
+ end.synchronize()
+ times.append(cp.cuda.get_elapsed_time(start,end))# milliseconds
+ returntimes
+
+
[docs]deftimeExecution(func):
+"""Decorator to time the execution of a function and print it."""
+ defwrapper(*args,**kwargs):
+ start,end=cp.cuda.Event(),cp.cuda.Event()
+ start.record()
+ out=func(*args,**kwargs)
+ end.record()
+ end.synchronize()
+ t=cp.cuda.get_elapsed_time(start,end)# milliseconds
+ print(f"[INFO]: {blue(func.__name__)} ran in {red(round(t,4))} [ms]")
+ returnout
+ returnwrapper
+
+@timeExecution
+def_asarray(x):
+"""Convert the input to a CuPy array."""
+ returncp.asarray(x,dtype=x.dtype)
+
+@timeExecution
+def_zeros(*args,**kwargs):
+"""Return a new CuPy array of given shape and type, filled with zeros."""
+ returncp.zeros(*args,**kwargs)
+
+@timeExecution
+def_asnumpy(x):
+"""Convert a CuPy array to a NumPy array."""
+ returncp.asnumpy(x)
+
+@timeExecution
+def_divide(a,b):
+"""Divide two CuPy arrays element-wise."""
+ returncp.divide(a,b,out=None)
+
+@timeExecution
+def_matmul(a,b):
+"""Matrix multiplication of two CuPy arrays."""
+ returncp.matmul(a,b,out=None)
+
+@timeExecution
+def_multiply(a,b):
+"""Multiply two CuPy arrays element-wise."""
+ returncp.multiply(a,b)
+
+
+
+
+
# @Author: Manish Bhattarai, Erik Skauimportargparse
@@ -160,7 +235,7 @@
Source code for pyDNMFk.data_generator
[docs]defparser():
- r"""
+r""" Reads the input arguments from the user and parses the parameters to the data generator module. """parser=argparse.ArgumentParser(description='Data generator arguments')
@@ -175,7 +250,7 @@
Source code for pyDNMFk.data_generator
[docs]classdata_generator():
- r"""
+r""" Generates synthetic data in distributed manner where each MPI process generates a chunk from the data parallelly. The W matrix is generated with gaussian distribution whereas the H matrix is random.
@@ -211,7 +286,7 @@
Source code for pyDNMFk.data_generator
# self.factor = k
[docs]defgauss_matrix_generator(self,n,k):
- r"""
+r""" Construct a matrix of dimensions n by k where the ith column is a Gaussian kernel corresponding to approximately N(i*n/k, 0.01*n^2) Parameters
@@ -230,28 +305,28 @@
[docs]defdetermine_block_index_range_asymm(self):
- '''Determines the start and end indices for the Data block for each rank'''
+'''Determines the start and end indices for the Data block for each rank'''chunk_ind=np.unravel_index(self.rank,self.pgrid)start_inds=[i*(n//k)+min(i,n%k)forn,k,iinzip(self.shape,self.pgrid,chunk_ind)]end_inds=[(i+1)*(n//k)+min((i+1),n%k)-1forn,k,iinzip(self.shape,self.pgrid,chunk_ind)]returnstart_inds,end_inds
[docs]defdetermine_block_shape_asymm(self):
- '''Determines the shape for the Data block for each rank'''
+'''Determines the shape for the Data block for each rank'''start_inds,end_inds=self.determine_block_index_range_asymm()return[(j-i+1)for(i,j)inzip(start_inds,end_inds)]
[docs]defrandom_matrix_generator(self,n,k,seed):
- '''Generator for random matric with given seed'''
+'''Generator for random matric with given seed'''np.random.seed(seed)returnnp.random.rand(n,k)
[docs]defdist_fromfunction(self,func,shape,pgrid,*args,unravel_index=np.unravel_index,**kwargs):
- """
+""" produces X_{i,j} = func(i,j) in a distributed manner, so that each processor has an array_split section of X according to the grid. """grid_index=unravel_index()
@@ -260,7 +335,7 @@
[docs]defunravel_column(self):
- '''finds the column rank for 2d grid'''
+'''finds the column rank for 2d grid'''defwrapper(*args,**kwargs):row,col=np.unravel_index(self.rank,self.pgrid)
@@ -269,29 +344,29 @@
Source code for pyDNMFk.data_generator
returnwrapper
[docs]defunravel_row(self):# ,ind, shape):
- '''finds the row rank for 2d grid'''
+'''finds the row rank for 2d grid'''row,col=np.unravel_index(self.rank,self.pgrid)return(row//self.pgrid[0],col)
[docs]defcreate_folder_dir(self,fpath):
- '''Create a folder if doesn't exist'''
+'''Create a folder if doesn't exist'''try:os.mkdir(fpath)except:pass
[docs]defgenerate_factors_data(self):
- """Generates the chunk of factors W,H and data X for each MPI process"""
+"""Generates the chunk of factors W,H and data X for each MPI process"""W_gen=self.dist_fromfunction(self.gauss_matrix_generator(self.m,self.k),(self.m,self.k),(self.p_r,1),
- unravel_index=self.unravel_column())
+ unravel_index=self.unravel_column()).astype(np.float32)H_gen=self.random_matrix_generator(self.k,self.determine_block_shape_asymm()[1],
- self.unravel_row()[1])
+ self.unravel_row()[1]).astype(np.float32)X_gen=W_gen@H_genprint('For rank=',self.rank,' dimensions of W,H and X are ',W_gen.shape,H_gen.shape,X_gen.shape)returnW_gen,H_gen,X_gen
[docs]deffit(self):
- '''generates and save factors'''
+'''generates and save factors'''W_gen,H_gen,X_gen=self.generate_factors_data()self.create_folder_dir(self.fpath)self.create_folder_dir(self.fpath+'W_factors')
@@ -311,47 +386,73 @@
[docs]@comm_timing()defread_file_npy(self):
- r"""Numpy data read function"""
+r"""Numpy data read function"""self.data=np.load(self.file_path)
[docs]@comm_timing()defread_file_csv(self):
- r"""CSV data read function"""
+r"""CSV data read function"""self.data=pd.read_csv(self.file_path,header=None).values
[docs]@comm_timing()defdata_partition(self):
- r"""
+r""" This function divides the input matrix into chunks as specified by grid configuration. Return n array of shape (nrows_i, ncols_i) where i is the index of each chunk.
@@ -233,16 +315,19 @@
Source code for pyDNMFk.data_io
[docs]@comm_timing()defsave_data_to_file(self,fpath):
- r"""This function saves the splitted data to numpy array indexed with chunk number"""
+r"""This function saves the splitted data to numpy array indexed with chunk number"""fname=fpath+'A_'+self.comm.rank+'.npy'np.save(fname,self.data)
[docs]@comm_timing()defread_dat(self):
- r"""Function for reading the data and split into chunks to be reach by each MPI rank"""
+r"""Function for reading the data and split into chunks to be reach by each MPI rank"""ifself.ftype=='npy':self.read_file_npy()self.data_partition()
+ elifself.ftype=="npz":
+ self.read_file_npz()
+ self.data_partition()elifself.ftype=='csv'orself.ftype=='txt':self.read_file_csv()self.data_partition()
@@ -251,11 +336,11 @@
[docs]classsplit_files_save():
- r"""Rank 0 based data read, split and save"""
+r"""Rank 0 based data read, split and save"""@comm_timing()def__init__(self,data,pgrid,fpath):
@@ -267,14 +352,14 @@
Source code for pyDNMFk.data_io
[docs]@comm_timing()defsplit_files(self):
- r"""Compute the index range for each block and partition the data as per the chunk"""
+r"""Compute the index range for each block and partition the data as per the chunk"""dtr_blk_idx=[determine_block_params(rank,self.pgrid,self.data.shape).determine_block_index_range_asymm()forrankinrange(np.product(self.pgrid))]self.split=[self.data[i[0][0]:i[1][0]+1,i[0][1]:i[1][1]+1]foriindtr_blk_idx]
[docs]@comm_timing()defsave_data_to_file(self):
- r"""Function to save the chunks into numpy files"""
+r"""Function to save the chunks into numpy files"""s=0self.split=self.split_files()foriinrange(self.p_r*self.p_c):
@@ -286,7 +371,7 @@
Source code for pyDNMFk.data_io
[docs]classdata_write():
- r"""Class for writing data/results.
+r"""Class for writing data/results. Parameters ----------
@@ -310,7 +395,7 @@
Source code for pyDNMFk.data_io
[docs]@comm_timing()defcreate_folder_dir(self,fpath):
- r"""Create directory if not present"""
+r"""Create directory if not present"""try:os.mkdir(fpath)except:
@@ -318,7 +403,7 @@
Source code for pyDNMFk.data_io
[docs]@comm_timing()defsave_factors(self,factors,reg=False):
- r"""Save the W and H factors for each MPI process"""
+r"""Save the W and H factors for each MPI process"""self.create_folder_dir(self.fpath)ifreg==True:W_factors_pth=self.fpath+'W_reg_factors/'
@@ -342,7 +427,7 @@
Source code for pyDNMFk.data_io
[docs]@comm_timing()defsave_cluster_results(self,params):
- r"""Save cluster results to a h5 file with rank 0"""
+r"""Save cluster results to a h5 file with rank 0"""ifself.rank==0:withh5py.File(self.fpath+'results.h5','w')ashf:hf.create_dataset('clusterSilhouetteCoefficients',data=params['clusterSilhouetteCoefficients'])
@@ -350,11 +435,12 @@
[docs]@comm_timing()defread_factor(self,fpath):
- """Read factors as chunks and stack them"""
+"""Read factors as chunks and stack them"""files=glob.glob(fpath)data=[]iflen(files)==1:
- data=self.custom_read_npy(files)
+ data=self.custom_read_npy(files[0])else:forfileinnp.sort(files):data.append(self.custom_read_npy(file))
@@ -392,7 +478,7 @@
Source code for pyDNMFk.data_io
[docs]@comm_timing()defload_factors(self):
- r"""Load the final stacked factors for visualization"""
+r"""Load the final stacked factors for visualization"""W_data,ct_W=self.read_factor(self.W_path)H_data,ct_H=self.read_factor(self.H_path)ifct_W>1:W_data=np.vstack((W_data))
@@ -405,47 +491,73 @@
# @Author: Manish Bhattarai, Erik Skaufrom.utilsimport*
[docs]classcustom_clustering():
- r"""
+r""" Greedy algorithm to approximate a quadratic assignment problem to cluster vectors. Given p groups of k vectors, construct k clusters, each cluster containing a single vector from each of the p groups. This clustering approximation uses cos distances and mean centroids. Parameters
@@ -182,7 +257,7 @@
Source code for pyDNMFk.dist_clustering
[docs]@comm_timing()defnormalize_by_W(self):
- r'''Normalize the factors W and H'''
+r'''Normalize the factors W and H'''Wall_norm=(self.W_all*self.W_all).sum(axis=0)ifself.p_r!=1:Wall_norm=self.comm1.allreduce(Wall_norm)
@@ -193,7 +268,7 @@
Source code for pyDNMFk.dist_clustering
[docs]@comm_timing()defmad(self,data,flag=1,axis=-1):
- r'''Compute the median/mean absolute deviation'''
+r'''Compute the median/mean absolute deviation'''ifflag==1:# the median absolute deviationreturnnp.nanmedian(np.absolute(data-np.nanmedian(data,axis=axis,keepdims=True)),axis=axis)else:# flag = 0 the mean absolute deviation
@@ -202,7 +277,7 @@
Source code for pyDNMFk.dist_clustering
[docs]@comm_timing()defchange_order(self,tens):
- r'''change the order of features'''
+r'''change the order of features'''ans=list(range(len(tens)))forpintens:ans[p[0]]=p[1]
@@ -210,7 +285,7 @@
Source code for pyDNMFk.dist_clustering
[docs]@comm_timing()defgreedy_lsa(self,A):
- r"""Return the permutation order"""
+r"""Return the permutation order"""X=A.copy()pairs=[]foriinrange(X.shape[0]):
@@ -223,7 +298,7 @@
Source code for pyDNMFk.dist_clustering
[docs]@comm_timing()defdist_feature_ordering(self,centroids,W_sub):
- r'''return the features in proper order'''
+r'''return the features in proper order'''k=W_sub.shape[1]dist=centroids.T@W_subifself.p_r!=1:
@@ -235,7 +310,7 @@
[docs]@comm_timing()defdist_silhouettes(self):
- """
+""" Computes the cosine distances silhouettes of a distributed clustering of vectors. Returns
@@ -314,7 +389,7 @@
Source code for pyDNMFk.dist_clustering
[docs]@comm_timing()deffit(self):
- r"""
+r""" Calls the sub routines to perform distributed custom clustering and compute silhouettes Returns
@@ -341,47 +416,73 @@
[docs]classMPI_comm():
- """Initialization of MPI communicator to construct the cartesian topology and sub communicators
+"""Initialization of MPI communicator to construct the cartesian topology and sub communicators Parameters ----------
@@ -176,7 +251,7 @@
[docs]defcart_1d_row(self):
- """
+""" Constructs a cartesian row communicator through construction of a sub communicator across rows Returns
@@ -190,7 +265,7 @@
Source code for pyDNMFk.dist_comm
returnself.cartesian1d_row
[docs]defcart_1d_column(self):
- """
+""" Constructs a cartesian column communicator through construction of a sub communicator across columns Returns
@@ -204,52 +279,78 @@
Source code for pyDNMFk.dist_comm
returnself.cartesian1d_column
[docs]defFree(self):
- """ Frees the sub communicators"""
+""" Frees the sub communicators"""self.cart_1d_row().Free()self.cart_1d_column().Free()
[docs]classnmf_algorithms_2D():
- """
+""" Performs the distributed NMF operation along 2D cartesian grid Parameters
@@ -217,7 +292,7 @@
Source code for pyDNMFk.dist_nmf
self.local_H_n=self.H_ij.shape[1]
[docs]defupdate(self):
- """Performs 1 step Update for factors W and H based on NMF method and corresponding norm minimization
+"""Performs 1 step Update for factors W and H based on NMF method and corresponding norm minimization Returns -------
@@ -247,7 +322,7 @@
Source code for pyDNMFk.dist_nmf
[docs]@comm_timing()defglobal_gram(self,A):
- r""" Distributed gram computation
+r""" Distributed gram computation Computes the global gram operation of matrix A .. math:: A^TA
@@ -271,7 +346,7 @@
Source code for pyDNMFk.dist_nmf
[docs]@comm_timing()defglobal_mm(self,A,B):
- r""" Distributed matrix multiplication
+r""" Distributed matrix multiplication Computes the global matrix multiplication of matrix A and B .. math:: AB
@@ -292,12 +367,12 @@
Source code for pyDNMFk.dist_nmf
self.comm1.barrier()returnAB_glob
- '''Functions for Fro MU NMF update'''
+'''Functions for Fro MU NMF update'''
[docs]@comm_timing()defATW_glob(self):
- r""" Distributed computation of W^TA
+r""" Distributed computation of W^TA Computes the global matrix multiplication of matrix W and A .. math:: W^TA
@@ -327,7 +402,7 @@
Source code for pyDNMFk.dist_nmf
[docs]@comm_timing()defAH_glob(self,H_ij=None):
- r""" Distributed computation of AH^T
+r""" Distributed computation of AH^T Computes the global matrix multiplication of matrix A and H .. math:: AH^T
@@ -359,7 +434,7 @@
Source code for pyDNMFk.dist_nmf
[docs]defFro_MU_update_H(self):
- r"""
+r""" Frobenius norm based multiplicative update of H parameter Function computes updated H parameter for each mpi rank
@@ -379,7 +454,7 @@
Source code for pyDNMFk.dist_nmf
[docs]defFro_MU_update_W(self):
- r"""
+r""" Frobenius norm based multiplicative update of W parameter Function computes updated H parameter for each mpi rank
@@ -398,7 +473,7 @@
Source code for pyDNMFk.dist_nmf
self.W_ij*=AH/WHTH
[docs]defFro_MU_update(self,W_update=True):
- r"""
+r""" Frobenius norm based multiplicative update of W and H parameter Function computes updated W and H parameter for each mpi rank
@@ -415,11 +490,11 @@
Source code for pyDNMFk.dist_nmf
self.Fro_MU_update_W()self.Fro_MU_update_H()
- '''Functions for KL MU NMF update'''
+'''Functions for KL MU NMF update'''
[docs]@comm_timing()defgather_W_H(self,gW=True,gH=True):
- r"""
+r""" Gathers W and H factors across cartesian groups i.e H_ij -> H_j if gH=True and W_ij -> W_i and gW=True
@@ -445,7 +520,7 @@
Source code for pyDNMFk.dist_nmf
[docs]@comm_timing()defWTU_glob(self):
- r""" Distributed computation of W^TU
+r""" Distributed computation of W^TU Computes the global matrix multiplication of matrix W and U for KL .. math:: W^TU
@@ -472,7 +547,7 @@
Source code for pyDNMFk.dist_nmf
[docs]@comm_timing()defUHT_glob(self):
- r""" Distributed computation of UH^T
+r""" Distributed computation of UH^T Computes the global matrix multiplication of matrix W and U for KL .. math:: UH^T
@@ -502,7 +577,7 @@
Source code for pyDNMFk.dist_nmf
returntmp
[docs]defKL_MU_update_W(self):
- r"""
+r""" KL divergence based multiplicative update of W parameter Function computes updated W parameter for each mpi rank
@@ -522,7 +597,7 @@
Source code for pyDNMFk.dist_nmf
self.W_ij*=sk/(X2+self.eps)
[docs]defKL_MU_update_H(self):
- r"""
+r""" Frobenius norm based multiplicative update of H parameter Function computes updated H parameter for each mpi rank
@@ -542,7 +617,7 @@
Source code for pyDNMFk.dist_nmf
self.H_ij*=ks/(X1+self.eps)
[docs]defKL_MU_update(self,W_update=True):
- r"""
+r""" KL divergence based multiplicative update of W and H parameter Function computes updated W and H parameter for each mpi rank
@@ -559,10 +634,10 @@
Source code for pyDNMFk.dist_nmf
self.KL_MU_update_W()self.KL_MU_update_H()
- '''Functions for FRO HALS NMF update'''
+'''Functions for FRO HALS NMF update'''
[docs]defFRO_HALS_update_W(self):
- r"""
+r""" Frobenius norm minimization based HALS update of W parameter Function computes updated W parameter for each mpi rank
@@ -585,7 +660,7 @@
Source code for pyDNMFk.dist_nmf
self.W_ij[:,kk]/=ss
[docs]defFRO_HALS_update_H(self):
- r"""
+r""" Frobenius norm minimization based HALS update of H parameter Function computes updated H parameter for each mpi rank
@@ -605,7 +680,7 @@
Source code for pyDNMFk.dist_nmf
self.H_ij[kk,:]=np.maximum(temp_vec,self.eps)
[docs]defFRO_HALS_update(self,W_update=True):
- r"""
+r""" Frobenius norm minimization based HALS update of W and H parameter Function computes updated W and H parameter for each mpi rank
@@ -622,11 +697,11 @@
Source code for pyDNMFk.dist_nmf
self.FRO_HALS_update_W()self.FRO_HALS_update_H()
- '''Functions for FRO BCD NMF update'''
+'''Functions for FRO BCD NMF update'''
[docs]@comm_timing()defglobalSqNorm(self,comm,X):
- """ Calc global squared norm of any matrix"""
+""" Calc global squared norm of any matrix"""normX=np.linalg.norm(X)sqnormX=normX*normXXnorm=self.comm1.allreduce(sqnormX,op=MPI.SUM)
@@ -634,7 +709,7 @@
Source code for pyDNMFk.dist_nmf
[docs]@comm_timing()definitWandH(self):
- """ Initialize the parameters for BCD updates"""
+""" Initialize the parameters for BCD updates"""# global Frobenius norm u calculated beforeXnorm=self.globalSqNorm(self.comm1,self.A_ij)
@@ -654,7 +729,7 @@
Source code for pyDNMFk.dist_nmf
returnWm,Hm,HHT,AHT,W_old,H_old,obj_old,Xnorm
[docs]defFRO_BCD_update(self,W_update=True,itr=1000):
- r"""
+r""" Frobenius norm minimization based BCD update of W and H parameter Function computes updated W and H parameter for each mpi rank
@@ -680,7 +755,7 @@
Source code for pyDNMFk.dist_nmf
# relerr1 = relerr2 = []# Iterate for max iterations:foriinrange(max_iters):
- """
+""" W update """HHTnorm_old=HHTnorm#
@@ -694,7 +769,7 @@
[docs]classnmf_algorithms_1D():
- """
+""" Performs the distributed NMF operation along 1D cartesian grid Parameters
@@ -785,7 +860,7 @@
Source code for pyDNMFk.dist_nmf
self.local_H_n=self.H_j.shape[1]
[docs]defupdate(self):
- """Performs 1 step Update for factors W and H based on NMF method and corresponding norm minimization
+"""Performs 1 step Update for factors W and H based on NMF method and corresponding norm minimization Returns -------
@@ -814,7 +889,7 @@
Source code for pyDNMFk.dist_nmf
[docs]@comm_timing()defglobal_gram(self,A,p=1):
- r""" Distributed gram computation
+r""" Distributed gram computation Computes the global gram operation of matrix A .. math:: A^TA
@@ -839,7 +914,7 @@
Source code for pyDNMFk.dist_nmf
[docs]@comm_timing()defglobal_mm(self,A,B,p=-1):
- r""" Distributed matrix multiplication
+r""" Distributed matrix multiplication Computes the global matrix multiplication of matrix A and B .. math:: AB
@@ -855,7 +930,8 @@
- '''Functions for Fro MU NMF update'''
+'''Functions for Fro MU NMF update'''
[docs]@comm_timing()defFro_MU_update_W(self):
- r"""
+r""" Frobenius norm based multiplicative update of W parameter Function computes updated H parameter for each mpi rank
@@ -879,15 +955,16 @@
[docs]@comm_timing()defFro_MU_update_H(self):
- r"""
+r""" Frobenius norm based multiplicative update of H parameter Function computes updated H parameter for each mpi rank
@@ -898,15 +975,21 @@
[docs]@comm_timing()defFro_MU_update(self,W_update=True):
- r"""
+r""" Frobenius norm based multiplicative update of W and H parameter Function computes updated W and H parameter for each mpi rank
@@ -923,11 +1006,11 @@
Source code for pyDNMFk.dist_nmf
self.Fro_MU_update_W()self.Fro_MU_update_H()
- '''Functions for KL MU NMF update'''
+'''Functions for KL MU NMF update'''
[docs]@comm_timing()defsum_along_axis(self,X,p=1,axis=0):
- r"""
+r""" Performs sum of the matrix along given axis Parameters
@@ -955,7 +1038,7 @@
Source code for pyDNMFk.dist_nmf
[docs]@comm_timing()defglob_UX(self,axis):
- """Perform a global operation UX for W and H update with KL"""
+"""Perform a global operation UX for W and H update with KL"""UX=self.A_ij/(self.W_i@self.H_j+self.eps)ifaxis==1:UX=self.global_mm(self.W_i.T,UX,p=self.p_r)
@@ -964,7 +1047,7 @@
Source code for pyDNMFk.dist_nmf
returnUX
[docs]defKL_MU_update_W(self):
- r"""
+r""" KL divergence based multiplicative update of W parameter Function computes updated W parameter for each mpi rank
@@ -983,7 +1066,7 @@
Source code for pyDNMFk.dist_nmf
self.W_i*=sk/(X2+self.eps)
[docs]defKL_MU_update_H(self):
- r"""
+r""" KL divergence based multiplicative update of H parameter Function computes updated H parameter for each mpi rank
@@ -1002,7 +1085,7 @@
Source code for pyDNMFk.dist_nmf
self.H_j*=sk/(X2+self.eps)
[docs]defKL_MU_update(self,W_update=True):
- r"""
+r""" KL divergence based multiplicative update of W and H parameter Function computes updated W and H parameter for each mpi rank
@@ -1021,10 +1104,10 @@
Source code for pyDNMFk.dist_nmf
self.KL_MU_update_W()self.KL_MU_update_H()
- '''Functions for FRO HALS NMF update'''
+'''Functions for FRO HALS NMF update'''
[docs]defFRO_HALS_update_W(self):
- r"""
+r""" Frobenius norm minimization based HALS update of W parameter Function computes updated W parameter for each mpi rank
@@ -1046,7 +1129,7 @@
Source code for pyDNMFk.dist_nmf
self.W_i[:,kk]/=ss
[docs]defFRO_HALS_update_H(self):
- r"""
+r""" Frobenius norm minimization based HALS update of H parameter Function computes updated H parameter for each mpi rank
@@ -1067,7 +1150,7 @@
Source code for pyDNMFk.dist_nmf
# self.H_j = self.H_j.T
[docs]defFRO_HALS_update(self,W_update=True):
- r"""
+r""" Frobenius norm minimizatio based HALS update of W and H parameter Function computes updated W and H parameter for each mpi rank
@@ -1087,11 +1170,11 @@
Source code for pyDNMFk.dist_nmf
self.FRO_HALS_update_H()
- '''Functions for FRO BCD NMF update'''
+'''Functions for FRO BCD NMF update'''
[docs]@comm_timing()defglobalSqNorm(self,X,p=-1):
- """ Calc global squared norm of any matrix"""
+""" Calc global squared norm of any matrix"""normX=np.linalg.norm(X)sqnormX=normX*normXifp!=1:
@@ -1103,7 +1186,7 @@
Source code for pyDNMFk.dist_nmf
[docs]@comm_timing()definitWandH(self):
- """ Initialize the parameters for BCD updates"""
+""" Initialize the parameters for BCD updates"""# global Frobenius norm u calculated beforeXnorm=self.globalSqNorm(self.A_ij)
@@ -1122,7 +1205,7 @@
Source code for pyDNMFk.dist_nmf
returnWm,Hm,HHT,AHT,W_old,H_old,obj_old,Xnorm
[docs]defFRO_BCD_update(self,W_update=True,itr=1000):
- r"""
+r""" Frobenius norm minimization based BCD update of W and H parameter Function computes updated W and H parameter for each mpi rank
@@ -1147,7 +1230,7 @@
Source code for pyDNMFk.dist_nmf
# relerr1 = relerr2 = []# Iterate for max iterations:foriinrange(max_iters):
- """
+""" W update """HHTnorm_old=HHTnorm#
@@ -1163,7 +1246,7 @@
[docs]classDistSVD():
- r"""
+r""" Distributed Computation of SVD along 1D distribution of the data. Only U or V is distributed based on data size. Parameters
@@ -220,7 +295,7 @@
Source code for pyDNMFk.dist_svd
[docs]@comm_timing()defnormalize_by_W(self,Wall,Hall,comm1):
- """Normalize the factors W and H"""
+"""Normalize the factors W and H"""Wall_norm=Wall.sum(axis=0,keepdims=True)ifself.proc_rows!=1:Wall_norm=comm1.allreduce(Wall_norm,op=MPI.SUM)
@@ -232,7 +307,7 @@
Source code for pyDNMFk.dist_svd
[docs]@comm_timing()defrandomUnitVector(self,d):
- """
+""" Construnct a rondom unit vector """unnormalized=[normalvariate(0,1)for_inrange(d)]
@@ -241,14 +316,14 @@
Source code for pyDNMFk.dist_svd
[docs]@comm_timing()defglobalGram(self,X,Y):
- """Compute the global gram betwee X and Y"""
+"""Compute the global gram betwee X and Y"""B=X@YB=self.grid_comm.allreduce(B)returnB
[docs]@comm_timing()defcalc_norm(self,vec):
- """Compute the norm of vector"""
+"""Compute the norm of vector"""partial_sq_sum=sum(vec*vec)global_sq_sum=self.grid_comm.allreduce(partial_sq_sum)# norm = torch.sqrt(global_sq_sum)
@@ -300,7 +375,7 @@
Source code for pyDNMFk.dist_svd
[docs]@comm_timing()defsvd(self):
- """
+""" Computes the SVD for a given matrix Returns
@@ -340,7 +415,7 @@
Source code for pyDNMFk.dist_svd
[docs]@comm_timing()defrel_error(self,U,S,V):
- """Computes the relative error between the reconstructed data with factors vs original data"""
+"""Computes the relative error between the reconstructed data with factors vs original data"""X_recon=U@S@Verr_num=np.sum((self.A-X_recon)**2)norm_deno=np.sum(self.A**2)
@@ -351,7 +426,7 @@
Source code for pyDNMFk.dist_svd
[docs]@comm_timing()defnnsvd(self,flag=1,verbose=1):
- r"""
+r""" Computes the distributed Non-Negative SVD(NNSVD) components from the computed SVD factors. Parameters
@@ -420,47 +495,73 @@
[docs]defplot_err(err):
- """Plots the relative error for NMF decomposition as a function of number of iterations"""
+"""Plots the relative error for NMF decomposition as a function of number of iterations"""idx=np.linspace(1,len(err),len(err))plt.plot(idx,err)plt.xlabel('Iterations')
@@ -169,7 +244,7 @@
Source code for pyDNMFk.plot_results
[docs]defread_plot_factors(factors_path,pgrid):
- """Reads the factors W and H and Plots them"""
+"""Reads the factors W and H and Plots them"""W,H=read_factors(factors_path,pgrid)plot_W(W)plt.savefig(factors_path+'W.png')
@@ -178,7 +253,7 @@
Source code for pyDNMFk.plot_results
[docs]defplot_W(W):
- """Reads a factor and plots into subplots for each component"""
+"""Reads a factor and plots into subplots for each component"""m,k=W.shapeparams={'legend.fontsize':60,
@@ -215,10 +290,10 @@
Source code for pyDNMFk.plot_results
plt.show()
-
[docs]defplot_results(startProcess,endProcess,RECON,RECON1,SILL_MIN,out_put,name):
- """Plots the relative error and Silhouette results for estimation of k"""
+
[docs]defplot_results(startProcess,endProcess,stepProcess,RECON,RECON1,SILL_MIN,out_put,name):
+"""Plots the relative error and Silhouette results for estimation of k"""######################################## Plotting ####################################################
- t=range(startProcess,endProcess+1)
+ t=range(startProcess,endProcess+1,stepProcess)fig,ax1=plt.subplots(num=None,figsize=(10,6),dpi=300,facecolor='w',edgecolor='k')title='Num'color='tab:red'
@@ -235,9 +310,7 @@
Source code for pyDNMFk.plot_results
# manipulate the y-axis values into percentage vals=ax1.get_yticks()ax1.set_yticklabels(['{:,.0%}'.format(x)forxinvals])
-
# ax1.legend(loc=0)
-
ax2=ax1.twinx()# instantiate a second axes that shares the same x-axiscolor='tab:blue'ax2.set_ylabel('Minimum Stability',color=color)# we already handled the x-label with ax1
@@ -246,19 +319,61 @@
Source code for pyDNMFk.plot_results
# ax2.legend(loc=1)fig.tight_layout()# otherwise the right y-label is slightly clipped# plt.show()
-
# added these three lineslns=lns1+lns2+lns3labs=[l.get_label()forlinlns]ax1.legend(lns,labs,loc=0)
-
plt.savefig(out_put+'/'+name+'_selection_plot.pdf')
-
plt.close()
+
[docs]defplot_results_fpath(params):
+"""Plots the relative error and Silhouette results for estimation of k from given folder location"""
+ ######################################## Plotting ####################################################
+ t=range(params.start_k,params.end_k+1,params.step)
+ fig,ax1=plt.subplots(num=None,figsize=(10,6),dpi=300,facecolor='w',edgecolor='k')
+ title='Num'
+ color='tab:red'
+ ax1.set_xlabel('Total Signatures')
+ ax1.set_ylabel('Mean L2 %',color=color)
+ ax1.set_title(title)
+ RECON=[]
+ RECON1=[]
+ SILL_MIN=[]
+ forkint:
+ results_paths=params.results_path+str(k)+'/'
+ data=File(results_paths+'/results.h5','r')
+ RECON.append(np.array(data['L_errDist']))
+ RECON1.append(np.array(data['avgErr']))
+ SILL_MIN.append(round(np.min(np.array(data['clusterSilhouetteCoefficients'])),2))
+
+ lns1=ax1.plot(t,RECON,marker='o',linestyle=':',color=color,label='Mean L2 %')
+ lns3=ax1.plot(t,RECON1,marker='X',linestyle=':',color='tab:green',label="Relative error %")
+ ax1.tick_params(axis='y',labelcolor=color)
+ ax1.xaxis.set_ticks(np.arange(min(t),max(t)+1,1))
+ # ax1.axvspan(shadow_start, shadow_end, alpha=0.20, color='#ADD8E6')
+ # ax1.axvspan(shadow_alternative_start, shadow_alternative_end, alpha=0.20, color='#696969')
+ # manipulate the y-axis values into percentage
+ vals=ax1.get_yticks()
+ ax1.set_yticklabels(['{:,.0%}'.format(x)forxinvals])
+ # ax1.legend(loc=0)
+ ax2=ax1.twinx()# instantiate a second axes that shares the same x-axis
+ color='tab:blue'
+ ax2.set_ylabel('Minimum Stability',color=color)# we already handled the x-label with ax1
+ lns2=ax2.plot(t,SILL_MIN,marker='s',linestyle="-.",color=color,label='Minimum Stability')
+ ax2.tick_params(axis='y',labelcolor=color)
+ # ax2.legend(loc=1)
+ fig.tight_layout()# otherwise the right y-label is slightly clipped
+ # plt.show()
+ # added these three lines
+ lns=lns1+lns2+lns3
+ labs=[l.get_label()forlinlns]
+ ax1.legend(lns,labs,loc=0)
+ plt.savefig(params.results_path+'/'+params.fname+'_selection_plot.pdf')
+ plt.close()
+
[docs]defbox_plot(dat,respath):
- """Plots the boxplot from the given data and saves the results"""
+"""Plots the boxplot from the given data and saves the results"""dat.plot.bar()plt.xlabel('operation')plt.ylabel('timing(sec)')
@@ -268,7 +383,7 @@
Source code for pyDNMFk.plot_results
[docs]deftiming_stats(fpath):
- """Reads the timing stats dictionary from the stored file and parses the data. """
+"""Reads the timing stats dictionary from the stored file and parses the data. """importcopydata=pd.read_csv(fpath).iloc[0,1:]breakdown_level_2={'init':['__init__','init_factors'],
@@ -286,7 +401,7 @@
[docs]defplot_timing_stats(fpath,respath):
- ''' Plots the timing stats for the MPI operation.
+''' Plots the timing stats for the MPI operation. fpath: Stats data path respath: Path to save graph'''res1,res2=timing_stats(fpath)
@@ -327,47 +442,73 @@
[docs]@comm_timing()deffit(self):
- r"""
+r""" Calls the sub routines to perform distributed NMF decomposition with initialization for a given norm minimization and update method Returns
@@ -317,6 +382,7 @@
[docs]@comm_timing()defnormalize_features(self,Wall,Hall):
- """Normalizes features Wall and Hall"""
- Wall_norm=Wall.sum(axis=0,keepdims=True)+self.eps
+"""Normalizes features Wall and Hall"""
+ Wall_norm=Wall.sum(axis=0,keepdims=True)ifself.topo=='2d':Wall_norm=self.comm1.allreduce(Wall_norm,op=MPI.SUM)elifself.topo=='1d':ifself.p_r!=1:Wall_norm=self.comm1.allreduce(Wall_norm,op=MPI.SUM)
- Wall/=Wall_norm
+ Wall/=Wall_norm+self.epsHall*=Wall_norm.TreturnWall,Hall
[docs]@comm_timing()defcart_2d_collect_factors(self):
- """Collects factors along each sub communicators"""
+"""Collects factors along each sub communicators"""self.H_j=self.cart_1d_row.allgather(self.H_ij)self.H_j=np.hstack((self.H_j))self.W_i=self.cart_1d_column.allgather(self.W_ij)
@@ -368,7 +440,7 @@
Source code for pyDNMFk.pyDNMF
[docs]@comm_timing()defrelative_err(self):
- """Computes the relative error for NMF decomposition"""
+"""Computes the relative error for NMF decomposition"""ifself.topo=='2d':self.cart_2d_collect_factors()self.glob_norm_err=self.dist_norm(self.A_ij-self.W_i@self.H_j)self.glob_norm_A=self.dist_norm(self.A_ij)
@@ -376,15 +448,20 @@
Source code for pyDNMFk.pyDNMF
[docs]@comm_timing()defdist_norm(self,X,proc=-1,norm='fro',axis=None):
- """Computes the distributed norm"""
- nm=np.linalg.norm(X,axis=axis,ord=norm)
+"""Computes the distributed norm"""
+ iftype(X)in[numpy.matrix,numpy.ndarray]:
+ nm=np.linalg.norm(X,axis=axis,ord=norm)
+ eliftype(X)in[scipy.sparse.coo.coo_matrix,scipy.sparse.csr.csr_matrix,scipy.sparse.csc.csc_matrix]:
+ nm=scipy_sparse_norm(X,ord=norm,axis=axis)
+ else:
+ raiseException("[!!] type(X): {} is not understaood".format(type(X)))ifproc!=1:nm=self.comm1.allreduce(nm**2)returnnp.sqrt(nm)
[docs]@comm_timing()defcolumn_err(self):
- """Computes the distributed column wise norm"""
+"""Computes the distributed column wise norm"""dtr_blk=determine_block_params(self.comm1,(self.p_r,self.p_c),(self.params.m,self.params.n))dtr_blk_idx=dtr_blk.determine_block_index_range_asymm()dtr_blk_shp=dtr_blk.determine_block_shape_asymm()
@@ -394,8 +471,12 @@
[docs]classsample():
- """
+""" Generates perturbed version of data based on sampling distribution. Parameters
@@ -188,7 +287,7 @@
Source code for pyDNMFk.pyDNMFk
[docs]@comm_timing()defrandM(self):
- """
+""" Multiplies each element of X by a uniform random number in (1-epsilon, 1+epsilon). """
@@ -198,13 +297,13 @@
Source code for pyDNMFk.pyDNMFk
[docs]@comm_timing()defpoisson(self):
- """Resamples each element of a matrix from a Poisson distribution with the mean set by that element. Y_{i,j} = Poisson(X_{i,j}"""
+"""Resamples each element of a matrix from a Poisson distribution with the mean set by that element. Y_{i,j} = Poisson(X_{i,j}"""self.X_per=np.random.poisson(self.X).astype(self.X.dtype)
[docs]@comm_timing()deffit(self):
- r"""
+r""" Calls the sub routines to perform resampling on data Returns
@@ -221,7 +320,7 @@
Source code for pyDNMFk.pyDNMFk
[docs]classPyNMFk():
- r"""
+r""" Performs the distributed NMF decomposition with custom clustering for estimating hidden factors k Parameters
@@ -278,10 +377,21 @@