Skip to content

Commit

Permalink
Merge pull request EESSI#97 from smoors/hook_tasks_per
Browse files Browse the repository at this point in the history
add hook to assign tasks per node
  • Loading branch information
satishskamath authored Dec 13, 2023
2 parents 41e43ca + 91fa3b7 commit abfe896
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 27 deletions.
2 changes: 2 additions & 0 deletions eessi/testsuite/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
GPU = 'GPU'
GPU_VENDOR = 'GPU_VENDOR'
INTEL = 'INTEL'
NODE = 'NODE'
NVIDIA = 'NVIDIA'

DEVICE_TYPES = {
Expand All @@ -20,6 +21,7 @@
CPU: 'cpu',
CPU_SOCKET: 'cpu_socket',
GPU: 'gpu',
NODE: 'node',
}

TAGS = {
Expand Down
67 changes: 50 additions & 17 deletions eessi/testsuite/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def assign_default_num_cpus_per_node(test: rfm.RegressionTest):
log(f'default_num_cpus_per_node set to {test.default_num_cpus_per_node}')


def assign_one_task_per_compute_unit(test: rfm.RegressionTest, compute_unit: str):
def assign_tasks_per_compute_unit(test: rfm.RegressionTest, compute_unit: str, num_per: int = 1):
"""
Assign one task per compute unit (COMPUTE_UNIT[CPU], COMPUTE_UNIT[CPU_SOCKET] or COMPUTE_UNIT[GPU]).
Automatically sets num_tasks, num_tasks_per_node, num_cpus_per_task, and num_gpus_per_node,
Expand All @@ -49,15 +49,19 @@ def assign_one_task_per_compute_unit(test: rfm.RegressionTest, compute_unit: str
Examples:
On a single node with 2 sockets, 64 cores and 128 hyperthreads:
- assign_one_task_per_compute_unit(test, COMPUTE_UNIT[CPU]) will launch 64 tasks with 1 thread
- assign_one_task_per_compute_unit(test, COMPUTE_UNIT[CPU_SOCKET]) will launch 2 tasks with 32 threads per task
- assign_tasks_per_compute_unit(test, COMPUTE_UNIT[CPU]) will launch 64 tasks with 1 thread
- assign_tasks_per_compute_unit(test, COMPUTE_UNIT[CPU_SOCKET]) will launch 2 tasks with 32 threads per task
Future work:
Currently, on a single node with 2 sockets, 64 cores and 128 hyperthreads, this
- assign_one_task_per_compute_unit(test, COMPUTE_UNIT[CPU], true) launches 128 tasks with 1 thread
- assign_one_task_per_compute_unit(test, COMPUTE_UNIT[CPU_SOCKET], true) launches 2 tasks with 64 threads per task
In the future, we'd like to add an arugment that disables spawning tasks for hyperthreads.
"""
if num_per != 1 and compute_unit in [COMPUTE_UNIT[GPU], COMPUTE_UNIT[CPU], COMPUTE_UNIT[CPU_SOCKET]]:
raise NotImplementedError(
f'Non-default num_per {num_per} is not implemented for compute_unit {compute_unit}.')

check_proc_attribute_defined(test, 'num_cpus')
test.max_avail_cpus_per_node = test.current_partition.processor.num_cpus
log(f'max_avail_cpus_per_node set to {test.max_avail_cpus_per_node}')
Expand All @@ -74,17 +78,56 @@ def assign_one_task_per_compute_unit(test: rfm.RegressionTest, compute_unit: str

assign_default_num_cpus_per_node(test)


if compute_unit == COMPUTE_UNIT[GPU]:
_assign_one_task_per_gpu(test)
elif compute_unit == COMPUTE_UNIT[CPU]:
_assign_one_task_per_cpu(test)
elif compute_unit == COMPUTE_UNIT[CPU_SOCKET]:
_assign_one_task_per_cpu_socket(test)
elif compute_unit == COMPUTE_UNIT[NODE]:
_assign_num_tasks_per_node(test, num_per)
else:
raise ValueError(f'compute unit {compute_unit} is currently not supported')


def _assign_num_tasks_per_node(test: rfm.RegressionTest, num_per: int = 1):
"""
Sets num_tasks_per_node and num_cpus_per_task such that it will run
'num_per' tasks per node, unless specified with:
--setvar num_tasks_per_node=<x>
--setvar num_cpus_per_task=<y>.
In those cases, those take precedence, and the remaining variable, if any
(num_cpus_per task or num_tasks_per_node respectively), is calculated based
on the equality test.num_tasks_per_node * test.num_cpus_per_task ==
test.default_num_cpus_per_node.
Default resources requested:
- num_tasks_per_node = num_per
- num_cpus_per_task = test.default_num_cpus_per_node / num_tasks_per_node
"""
# neither num_tasks_per_node nor num_cpus_per_task are set
if not test.num_tasks_per_node and not test.num_cpus_per_task:
test.num_tasks_per_node = num_per
test.num_cpus_per_task = test.default_num_cpus_per_node / test.num_tasks_per_node

# num_tasks_per_node is not set, but num_cpus_per_task is
elif not test.num_tasks_per_node:
test.num_tasks_per_node = test.default_num_cpus_per_node / test.num_cpus_per_task

# num_cpus_per_task is not set, but num_tasks_per_node is
elif not test.num_cpus_per_task:
test.num_cpus_per_task = int(test.default_num_cpus_per_node / test.num_tasks_per_node)

else:
pass # both num_tasks_per_node and num_cpus_per_task are already set

test.num_tasks = test.num_nodes * test.num_tasks_per_node

log(f'num_tasks_per_node set to {test.num_tasks_per_node}')
log(f'num_cpus_per_task set to {test.num_cpus_per_task}')
log(f'num_tasks set to {test.num_tasks}')


def _assign_one_task_per_cpu_socket(test: rfm.RegressionTest):
"""
Determines the number of tasks per node by dividing the default_num_cpus_per_node by
Expand All @@ -103,11 +146,6 @@ def _assign_one_task_per_cpu_socket(test: rfm.RegressionTest):
num_tasks_per_node respectively) is calculated based on the equality
test.num_tasks_per_node * test.num_cpus_per_task == test.default_num_cpus_per_node.
Variables:
- default_num_cpus_per_node: default number of CPUs per node as defined in the test
(e.g. by earlier hooks like set_tag_scale)
Default resources requested:
- num_tasks_per_node = default_num_cpus_per_node
- num_cpus_per_task = default_num_cpus_per_node / num_tasks_per_node
Expand Down Expand Up @@ -147,25 +185,20 @@ def _assign_one_task_per_cpu(test: rfm.RegressionTest):
--setvar num_tasks_per_node=<x> and/or
--setvar num_cpus_per_task=<y>.
Variables:
- default_num_cpus_per_node: default number of CPUs per node as defined in the test
(e.g. by earlier hooks like set_tag_scale)
Default resources requested:
- num_tasks_per_node = default_num_cpus_per_node
- num_cpus_per_task = default_num_cpus_per_node / num_tasks_per_node
"""
# neither num_tasks_per_node nor num_cpus_per_node are set
# neither num_tasks_per_node nor num_cpus_per_task are set
if not test.num_tasks_per_node and not test.num_cpus_per_task:
test.num_tasks_per_node = test.default_num_cpus_per_node
test.num_cpus_per_task = 1

# num_tasks_per_node is not set, but num_cpus_per_node is
# num_tasks_per_node is not set, but num_cpus_per_task is
elif not test.num_tasks_per_node:
test.num_tasks_per_node = int(test.default_num_cpus_per_node / test.num_cpus_per_task)

# num_cpus_per_node is not set, but num_tasks_per_node is
# num_cpus_per_task is not set, but num_tasks_per_node is
elif not test.num_cpus_per_task:
test.num_cpus_per_task = int(test.default_num_cpus_per_node / test.num_tasks_per_node)

Expand Down
2 changes: 1 addition & 1 deletion eessi/testsuite/tests/apps/gromacs.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def run_after_setup(self):
# Calculate default requested resources based on the scale:
# 1 task per CPU for CPU-only tests, 1 task per GPU for GPU tests.
# Also support setting the resources on the cmd line.
hooks.assign_one_task_per_compute_unit(test=self, compute_unit=self.nb_impl)
hooks.assign_tasks_per_compute_unit(test=self, compute_unit=self.nb_impl)

@run_after('setup')
def set_omp_num_threads(self):
Expand Down
23 changes: 14 additions & 9 deletions eessi/testsuite/tests/apps/tensorflow/tensorflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import reframe.utility.sanity as sn

from eessi.testsuite import hooks, utils
from eessi.testsuite.constants import *
from eessi.testsuite.constants import * # noqa

@rfm.simple_test
class TENSORFLOW_EESSI(rfm.RunOnlyRegressionTest):
Expand All @@ -23,7 +23,7 @@ class TENSORFLOW_EESSI(rfm.RunOnlyRegressionTest):

# Make CPU and GPU versions of this test
device_type = parameter(['cpu', 'gpu'])

executable = 'python tf_test.py'

time_limit = '30m'
Expand All @@ -34,22 +34,23 @@ class TENSORFLOW_EESSI(rfm.RunOnlyRegressionTest):
@deferrable
def assert_tf_config_ranks(self):
'''Assert that each rank sets a TF_CONFIG'''
n_ranks = sn.count(sn.extractall('^Rank [0-9]+: Set TF_CONFIG for rank (?P<rank>[0-9]+)', self.stdout, tag='rank'))
n_ranks = sn.count(sn.extractall(
'^Rank [0-9]+: Set TF_CONFIG for rank (?P<rank>[0-9]+)', self.stdout, tag='rank'))
return sn.assert_eq(n_ranks, self.num_tasks)

@deferrable
def assert_completion(self):
'''Assert that the test ran until completion'''
n_fit_completed = sn.count(sn.extractall('^Rank [0-9]+: Keras fit completed', self.stdout))

return sn.all([
sn.assert_eq(n_fit_completed, self.num_tasks),
])

@deferrable
def assert_convergence(self):
'''Assert that the network learned _something_ during training'''
accuracy=sn.extractsingle('^Final accuracy: (?P<accuracy>\S+)', self.stdout, 'accuracy', float)
accuracy = sn.extractsingle('^Final accuracy: (?P<accuracy>\S+)', self.stdout, 'accuracy', float)
# mnist is a 10-class classification problem, so if accuracy >> 0.2 the network 'learned' something
return sn.assert_gt(accuracy, 0.2)

Expand Down Expand Up @@ -91,12 +92,13 @@ def run_after_setup(self):
"""hooks to run after the setup phase"""
# TODO: implement
# It should bind to socket, but different MPIs may have different arguments to do that...
# We should at very least prevent that it binds to single core per process, as that results in many threads being scheduled to one core
# We should at very least prevent that it binds to single core per process,
# as that results in many threads being scheduled to one core.
# binding may also differ per launcher used. It'll be hard to support a wide range and still get proper binding
if self.device_type == 'cpu':
hooks.assign_one_task_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT['CPU_SOCKET'])
hooks.assign_tasks_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT['CPU_SOCKET'])
elif self.device_type == 'gpu':
hooks.assign_one_task_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT['GPU'])
hooks.assign_tasks_per_compute_unit(test=self, compute_unit=COMPUTE_UNIT['GPU'])
else:
raise NotImplementedError(f'Failed to set number of tasks and cpus per task for device {self.device_type}')

Expand All @@ -110,5 +112,8 @@ def set_thread_count_args(self):

@run_after('setup')
def set_binding_policy(self):
"""Sets a binding policy for tasks. We don't bind threads because of https://github.com/tensorflow/tensorflow/issues/60843"""
"""
Sets a binding policy for tasks. We don't bind threads because of
https://github.com/tensorflow/tensorflow/issues/60843
"""
hooks.set_compact_process_binding(self)

0 comments on commit abfe896

Please sign in to comment.