Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GPU parallel optimized #2

Open
wants to merge 8 commits into
base: graph_arrays
Choose a base branch
from
20 changes: 16 additions & 4 deletions nums/core/application_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@

import logging
import sys

import time
from nums.core import settings
from nums.core.systems.filesystem import FileSystem
from nums.core.systems import numpy_compute
from nums.core.systems.systems import System, SerialSystem, RaySystem
from nums.core.systems.gpu_systems import CupyParallelSystem
from nums.core.systems.schedulers import RayScheduler, TaskScheduler, BlockCyclicScheduler
from nums.core.array.application import ArrayApplication

Expand Down Expand Up @@ -71,6 +72,11 @@ def create():
use_head=settings.use_head)
system: System = RaySystem(compute_module=compute_module,
scheduler=scheduler)
elif system_name == "cupy-parallel":
system = CupyParallelSystem()
system.optimizer = settings.optimizer
system.num_gpus = settings.num_gpus
system.cluster_shape = (settings.num_gpus, 1)
else:
raise Exception()
system.init()
Expand All @@ -82,10 +88,16 @@ def destroy():
if _instance is None:
return
# This will shutdown ray if ray was started by NumS.
_instance.system.shutdown()
del _instance
_instance = None

system = _instance.system
del _instance.one_half
del _instance.two
del _instance.one
del _instance.zero
del _instance
system.shutdown()
# _instance.system.shutdown()


def configure_logging():
root = logging.getLogger()
Expand Down
7 changes: 6 additions & 1 deletion nums/core/array/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
from nums.core.storage.storage import ArrayGrid, StoredArray, StoredArrayS3
# TODO(hme): Remove dependence on specific system and scheduler implementations.
from nums.core.systems.systems import System, RaySystem, SerialSystem
from nums.core.systems.gpu_systems import CupyParallelSystem
from nums.core.systems.schedulers import BlockCyclicScheduler
from nums.core.systems import utils as systems_utils
from nums.core.systems.filesystem import FileSystem
from nums.core.array.random import NumsRandomState


# pylint: disable = too-many-lines


Expand All @@ -51,6 +51,9 @@ def num_cores_total(self):
system: RaySystem = self.system
nodes = system.nodes()
num_cores = sum(map(lambda n: n["Resources"]["CPU"], nodes))
elif isinstance(self.system, CupyParallelSystem):
system: CupyParallelSystem = self.system
num_cores = system.num_gpus
else:
assert isinstance(self.system, SerialSystem)
num_cores = systems_utils.get_num_cores()
Expand Down Expand Up @@ -93,6 +96,8 @@ def compute_block_shape(self,
and isinstance(self.system.scheduler, BlockCyclicScheduler):
# This configuration is the default.
cluster_shape = self.system.scheduler.cluster_shape
elif isinstance(self.system, CupyParallelSystem):
cluster_shape = self.system.cluster_shape
else:
assert isinstance(self.system, SerialSystem)
cluster_shape = (1, 1)
Expand Down
5 changes: 2 additions & 3 deletions nums/core/array/blockarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def from_scalar(cls, val, system):
if isinstance(val, int):
dtype = np.int
elif isinstance(val, float):
dtype = np.float
dtype = np.float32
else:
assert isinstance(val, (np.int32, np.int64, np.float32, np.float64))
dtype = None
Expand Down Expand Up @@ -122,7 +122,6 @@ def touch(self):
for grid_entry in self.grid.get_entry_iterator():
block: Block = self.blocks[grid_entry]
oids.append(self.system.touch(block.oid, syskwargs=block.syskwargs()))
self.system.get(oids)
return self

def reshape(self, *shape, **kwargs):
Expand Down Expand Up @@ -714,7 +713,7 @@ def __rpow__(self, other):
return other ** self

def __neg__(self):
return -1 * self
return -1.0 * self

def __pos__(self):
return self
Expand Down
30 changes: 8 additions & 22 deletions nums/core/optimizer/cluster_sim.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,28 +52,14 @@ def get_cluster_node_ids(self):
def get_cluster_entry_iterator(self):
return itertools.product(*map(range, self.cluster_shape))

def get_cluster_entry(self, grid_entry):
cluster_entry = []
num_grid_entry_axes = len(grid_entry)
num_cluster_axes = len(self.cluster_shape)
if num_grid_entry_axes <= num_cluster_axes:
# When array has fewer or equal # of axes than cluster.
for cluster_axis in range(num_cluster_axes):
if cluster_axis < num_grid_entry_axes:
cluster_dim = self.cluster_shape[cluster_axis]
grid_entry_dim = grid_entry[cluster_axis]
cluster_entry.append(grid_entry_dim % cluster_dim)
else:
cluster_entry.append(0)
elif num_grid_entry_axes > num_cluster_axes:
# When array has more axes then cluster.
for cluster_axis in range(num_cluster_axes):
cluster_dim = self.cluster_shape[cluster_axis]
grid_entry_dim = grid_entry[cluster_axis]
cluster_entry.append(grid_entry_dim % cluster_dim)
# Ignore trailing axes, as these are "cycled" to 0 by assuming
# the dimension of those cluster axes is 1.
return tuple(cluster_entry)
def get_cluster_entry(self, grid_entry, grid_shape):
ret = [0]
for i in range(len(grid_entry)):
dim = 1 if i == len(grid_entry) - 1 else grid_shape[i+1]
ret[0] = (ret[0] + grid_entry[i]) * dim
ret[0] = ret[0] % self.system.num_gpus
ret.append(0)
return tuple(ret)

# Block Ops.

Expand Down
2 changes: 1 addition & 1 deletion nums/core/optimizer/comp_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ def graphs_from_ba(ba: BlockArrayBase, cluster_state: ClusterState, copy_on_op)
for grid_entry in ba.grid.get_entry_iterator():
block: Block = ba.blocks[grid_entry]
# Allocate the block to the node on which it's created.
node_id = cluster_state.get_cluster_entry(block.true_grid_entry())
node_id = cluster_state.get_cluster_entry(block.true_grid_entry(), ba.grid.grid_shape)
cluster_state.add_block(block, node_ids=[node_id])
cluster_state.init_mem_load(node_id, block.id)

Expand Down
9 changes: 4 additions & 5 deletions nums/core/optimizer/tree_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,17 @@ def get_bc_action(self, tnode: TreeNode):
# This is hacky, but no good way to do it w/ current abstractions.
if isinstance(tnode, BinaryOp):
grid_entry = self.get_tnode_grid_entry(tnode)
node_id = self.arr.cluster_state.get_cluster_entry(grid_entry)
node_id = self.arr.cluster_state.get_cluster_entry(grid_entry, self.arr.grid.grid_shape)
actions = [(tnode.tree_node_id, {"node_id": node_id})]
elif isinstance(tnode, ReductionOp):
leaf_ids = tuple(tnode.leafs_dict.keys())[:2]
grid_entry = self.get_tnode_grid_entry(tnode)
node_id = self.arr.cluster_state.get_cluster_entry(grid_entry)
node_id = self.arr.cluster_state.get_cluster_entry(grid_entry, self.arr.grid.grid_shape)
actions = [(tnode.tree_node_id, {"node_id": node_id,
"leaf_ids": leaf_ids})]
elif isinstance(tnode, UnaryOp):
grid_entry = self.get_tnode_grid_entry(tnode)
node_id = self.arr.cluster_state.get_cluster_entry(grid_entry)
node_id = self.arr.cluster_state.get_cluster_entry(grid_entry, self.arr.grid.grid_shape)
actions = [(tnode.tree_node_id, {"node_id": node_id})]
else:
raise Exception()
Expand Down Expand Up @@ -120,9 +120,8 @@ def simulate_action(self, action):
return self.objective(new_resources)

def objective(self, resources):
max_axes = tuple(np.arange(1, len(self.arr.cluster_state.cluster_shape) + 1))
# Our simple objective.
return np.sum(np.max(resources, axis=max_axes))
return np.sum(resources[1:])

def get_tnode_grid_entry(self, tnode: TreeNode):
if tnode.parent is None:
Expand Down
8 changes: 7 additions & 1 deletion nums/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,13 @@


# System settings.
system_name = os.environ.get("NUMS_SYSTEM", "ray-cyclic")
# system_name = os.environ.get("NUMS_SYSTEM", "ray-cyclic")

# Parallel system only uses the following three values
system_name = os.environ.get("NUMS_SYSTEM", "cupy-parallel")
num_gpus = 4
optimizer = True

# TODO (hme):
# - Make cluster shape an environment variable. Default depends on available resources.
# - use_head => use_driver, and should be an environment variable.
Expand Down
Loading