From aa09931a8323e23d8a2a2f8d64b63acce41a71ba Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 11 Dec 2024 13:06:22 +0000 Subject: [PATCH 1/5] mock-based reproducer for issue #3696 --- parsl/jobs/strategy.py | 8 +++ .../test_regression_3696_oscillation.py | 62 +++++++++++++++++++ 2 files changed, 70 insertions(+) create mode 100644 parsl/tests/test_scaling/test_regression_3696_oscillation.py diff --git a/parsl/jobs/strategy.py b/parsl/jobs/strategy.py index e0898cccb1..c50bb21086 100644 --- a/parsl/jobs/strategy.py +++ b/parsl/jobs/strategy.py @@ -275,6 +275,14 @@ def _general_strategy(self, executors: List[BlockProviderExecutor], *, strategy_ logger.debug(f"Strategy case 2b: active_blocks {active_blocks} < max_blocks {max_blocks} so scaling out") excess_slots = math.ceil((active_tasks * parallelism) - active_slots) excess_blocks = math.ceil(float(excess_slots) / (tasks_per_node * nodes_per_block)) + print(f"BENC: active_tasks {active_tasks}") + print(f"BENC: active_slots {active_slots}") + print(f"BENC: excess slots {excess_slots}") + print(f"BENC: tpn {tasks_per_node}") + print(f"BENC: npb {nodes_per_block}") + print(f"BENC: excess blocks {excess_blocks}") + print(f"BENC: max blocks {max_blocks}") + print(f"BENC: active blocks {active_blocks}") excess_blocks = min(excess_blocks, max_blocks - active_blocks) logger.debug(f"Requesting {excess_blocks} more blocks") executor.scale_out_facade(excess_blocks) diff --git a/parsl/tests/test_scaling/test_regression_3696_oscillation.py b/parsl/tests/test_scaling/test_regression_3696_oscillation.py new file mode 100644 index 0000000000..2ede9df368 --- /dev/null +++ b/parsl/tests/test_scaling/test_regression_3696_oscillation.py @@ -0,0 +1,62 @@ +import math +from unittest.mock import MagicMock + +import pytest + +from parsl.executors.high_throughput.executor import HighThroughputExecutor +from parsl.jobs.states import JobState, JobStatus +from parsl.jobs.strategy import Strategy + + +@pytest.mark.local +def test_htex_strategy_does_not_oscillate(): + """Check for oscillations in htex scaling. + In issue 3696, with a large number of workers per block + and a smaller number of active tasks, the htex scaling + strategy oscillates between 0 and 1 active block, rather + than converging to 1 active block. + + The choices of 14 tasks and 48 workers per node are taken + from issue #3696. + """ + + s = Strategy(strategy='htex_auto_scale', max_idletime=math.inf) + + provider = MagicMock() + executor = MagicMock(spec=HighThroughputExecutor) + + statuses = {} + + executor.provider = provider + executor.outstanding = 14 + executor.status_facade = statuses + executor.workers_per_node = 48 + + provider.parallelism = 1 + provider.init_blocks = 0 + provider.min_blocks = 0 + provider.max_blocks = 2 + provider.nodes_per_block = 1 + + def f(n): + for _ in range(n): + statuses[len(statuses)] = JobStatus(state=JobState.PENDING) + + executor.scale_out_facade.side_effect = f + + s.add_executors([executor]) + + # In issue #3696, this first strategise does initial and load based + # scale outs, because 14 > 48*0 + s.strategize([executor]) + executor.scale_out_facade.assert_called() + executor.scale_in_facade.assert_not_called() + + # In issue #3696, this second strategize does a scale in, because 14 < 48*1 + s.strategize([executor]) + executor.scale_in_facade.assert_not_called() # this assert fails due to issue #3696 + + # Now check scale in happens with 0 load + executor.outstanding = 0 + s.strategize([executor]) + executor.scale_in_facade.assert_called() From 3b1fc2d8726b26acea384f41f41a86535054912b Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 11 Dec 2024 13:14:05 +0000 Subject: [PATCH 2/5] more asserting around number of blocks --- parsl/tests/test_scaling/test_regression_3696_oscillation.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/parsl/tests/test_scaling/test_regression_3696_oscillation.py b/parsl/tests/test_scaling/test_regression_3696_oscillation.py index 2ede9df368..bdfbb19aa9 100644 --- a/parsl/tests/test_scaling/test_regression_3696_oscillation.py +++ b/parsl/tests/test_scaling/test_regression_3696_oscillation.py @@ -49,7 +49,12 @@ def f(n): # In issue #3696, this first strategise does initial and load based # scale outs, because 14 > 48*0 s.strategize([executor]) + executor.scale_out_facade.assert_called() + assert len(statuses) == 1, "Only one block should have been launched" + # there might be several calls to scale_out_facade inside strategy, + # but the end effect should be that exactly one block is scaled out. + executor.scale_in_facade.assert_not_called() # In issue #3696, this second strategize does a scale in, because 14 < 48*1 From 74c845573a8d00b288f4d583e3aa542b6d78649d Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 11 Dec 2024 13:25:01 +0000 Subject: [PATCH 3/5] add more assertions into test, around scale in call --- .../test_regression_3696_oscillation.py | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/parsl/tests/test_scaling/test_regression_3696_oscillation.py b/parsl/tests/test_scaling/test_regression_3696_oscillation.py index bdfbb19aa9..f20a14afff 100644 --- a/parsl/tests/test_scaling/test_regression_3696_oscillation.py +++ b/parsl/tests/test_scaling/test_regression_3696_oscillation.py @@ -38,11 +38,22 @@ def test_htex_strategy_does_not_oscillate(): provider.max_blocks = 2 provider.nodes_per_block = 1 - def f(n): + def scale_out(n): for _ in range(n): statuses[len(statuses)] = JobStatus(state=JobState.PENDING) - executor.scale_out_facade.side_effect = f + executor.scale_out_facade.side_effect = scale_out + + def scale_in(n, max_idletime=None): + # find n PENDING jobs and set them to CANCELLED + for k in statuses: + if statuses[k].state == JobState.PENDING: + statuses[k].state = JobState.CANCELLED + n -= 1 + if n == 0: + return + + executor.scale_in_facade.side_effect = scale_in s.add_executors([executor]) @@ -51,7 +62,9 @@ def f(n): s.strategize([executor]) executor.scale_out_facade.assert_called() + print(f"BENC: {statuses}") assert len(statuses) == 1, "Only one block should have been launched" + assert len([k for k in statuses if statuses[k].state == JobState.PENDING]) == 1 # there might be several calls to scale_out_facade inside strategy, # but the end effect should be that exactly one block is scaled out. @@ -59,9 +72,13 @@ def f(n): # In issue #3696, this second strategize does a scale in, because 14 < 48*1 s.strategize([executor]) - executor.scale_in_facade.assert_not_called() # this assert fails due to issue #3696 + + # assert that there should still be 1 pending blocks + assert len([k for k in statuses if statuses[k].state == JobState.PENDING]) == 1 + # this assert fails due to issue #3696 # Now check scale in happens with 0 load executor.outstanding = 0 s.strategize([executor]) executor.scale_in_facade.assert_called() + assert len([k for k in statuses if statuses[k].state == JobState.PENDING]) == 0 From 5832d45b9982c4607a4d041fbbf987c12e1f3e2d Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 11 Dec 2024 13:30:53 +0000 Subject: [PATCH 4/5] WIP --- parsl/jobs/strategy.py | 4 ++-- .../test_regression_3696_oscillation.py | 19 +++++++++++-------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/parsl/jobs/strategy.py b/parsl/jobs/strategy.py index c50bb21086..92d715302c 100644 --- a/parsl/jobs/strategy.py +++ b/parsl/jobs/strategy.py @@ -306,8 +306,8 @@ def _general_strategy(self, executors: List[BlockProviderExecutor], *, strategy_ # Scale in for htex if isinstance(executor, HighThroughputExecutor): if active_blocks > min_blocks: - excess_slots = math.ceil(active_slots - (active_tasks * parallelism)) - excess_blocks = math.ceil(float(excess_slots) / (tasks_per_node * nodes_per_block)) + excess_slots = math.floor(active_slots - (active_tasks * parallelism)) + excess_blocks = math.floor(float(excess_slots) / (tasks_per_node * nodes_per_block)) excess_blocks = min(excess_blocks, active_blocks - min_blocks) logger.debug(f"Requesting scaling in by {excess_blocks} blocks with idle time {self.max_idletime}s") executor.scale_in_facade(excess_blocks, max_idletime=self.max_idletime) diff --git a/parsl/tests/test_scaling/test_regression_3696_oscillation.py b/parsl/tests/test_scaling/test_regression_3696_oscillation.py index f20a14afff..e5c4050308 100644 --- a/parsl/tests/test_scaling/test_regression_3696_oscillation.py +++ b/parsl/tests/test_scaling/test_regression_3696_oscillation.py @@ -9,7 +9,8 @@ @pytest.mark.local -def test_htex_strategy_does_not_oscillate(): +@pytest.mark.parametrize("ns", [ (14,48) ]) +def test_htex_strategy_does_not_oscillate(ns): """Check for oscillations in htex scaling. In issue 3696, with a large number of workers per block and a smaller number of active tasks, the htex scaling @@ -20,7 +21,9 @@ def test_htex_strategy_does_not_oscillate(): from issue #3696. """ - s = Strategy(strategy='htex_auto_scale', max_idletime=math.inf) + n_tasks, n_workers = ns + + s = Strategy(strategy='htex_auto_scale', max_idletime=0) provider = MagicMock() executor = MagicMock(spec=HighThroughputExecutor) @@ -28,9 +31,9 @@ def test_htex_strategy_does_not_oscillate(): statuses = {} executor.provider = provider - executor.outstanding = 14 + executor.outstanding = n_tasks executor.status_facade = statuses - executor.workers_per_node = 48 + executor.workers_per_node = n_workers provider.parallelism = 1 provider.init_blocks = 0 @@ -47,18 +50,18 @@ def scale_out(n): def scale_in(n, max_idletime=None): # find n PENDING jobs and set them to CANCELLED for k in statuses: + if n == 0: + return if statuses[k].state == JobState.PENDING: statuses[k].state = JobState.CANCELLED n -= 1 - if n == 0: - return executor.scale_in_facade.side_effect = scale_in s.add_executors([executor]) # In issue #3696, this first strategise does initial and load based - # scale outs, because 14 > 48*0 + # scale outs, because n_tasks > n_workers*0 s.strategize([executor]) executor.scale_out_facade.assert_called() @@ -70,7 +73,7 @@ def scale_in(n, max_idletime=None): executor.scale_in_facade.assert_not_called() - # In issue #3696, this second strategize does a scale in, because 14 < 48*1 + # In issue #3696, this second strategize does a scale in, because n_tasks < n_workers*1 s.strategize([executor]) # assert that there should still be 1 pending blocks From e8321451046d9cb92602736fab6610aea19d273a Mon Sep 17 00:00:00 2001 From: Ben Clifford Date: Wed, 11 Dec 2024 13:36:21 +0000 Subject: [PATCH 5/5] parameterization --- .../test_regression_3696_oscillation.py | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/parsl/tests/test_scaling/test_regression_3696_oscillation.py b/parsl/tests/test_scaling/test_regression_3696_oscillation.py index e5c4050308..e3af1ff121 100644 --- a/parsl/tests/test_scaling/test_regression_3696_oscillation.py +++ b/parsl/tests/test_scaling/test_regression_3696_oscillation.py @@ -9,7 +9,13 @@ @pytest.mark.local -@pytest.mark.parametrize("ns", [ (14,48) ]) +@pytest.mark.parametrize("ns", [(14, 48, 1), # issue #3696 regression + (1, 1, 1), # basic one task/one block behaviour + (100, 1, 20), # many one-task blocks, hitting max blocks + (47, 48, 1), # edge cases around #3696 + (48, 48, 1), # " + (49, 48, 2), # " + (149, 50, 3)]) # " def test_htex_strategy_does_not_oscillate(ns): """Check for oscillations in htex scaling. In issue 3696, with a large number of workers per block @@ -21,7 +27,7 @@ def test_htex_strategy_does_not_oscillate(ns): from issue #3696. """ - n_tasks, n_workers = ns + n_tasks, n_workers, n_blocks = ns s = Strategy(strategy='htex_auto_scale', max_idletime=0) @@ -38,7 +44,7 @@ def test_htex_strategy_does_not_oscillate(ns): provider.parallelism = 1 provider.init_blocks = 0 provider.min_blocks = 0 - provider.max_blocks = 2 + provider.max_blocks = 20 provider.nodes_per_block = 1 def scale_out(n): @@ -65,9 +71,8 @@ def scale_in(n, max_idletime=None): s.strategize([executor]) executor.scale_out_facade.assert_called() - print(f"BENC: {statuses}") - assert len(statuses) == 1, "Only one block should have been launched" - assert len([k for k in statuses if statuses[k].state == JobState.PENDING]) == 1 + assert len(statuses) == n_blocks, "Should have launched n_blocks" + assert len([k for k in statuses if statuses[k].state == JobState.PENDING]) == n_blocks # there might be several calls to scale_out_facade inside strategy, # but the end effect should be that exactly one block is scaled out. @@ -76,8 +81,8 @@ def scale_in(n, max_idletime=None): # In issue #3696, this second strategize does a scale in, because n_tasks < n_workers*1 s.strategize([executor]) - # assert that there should still be 1 pending blocks - assert len([k for k in statuses if statuses[k].state == JobState.PENDING]) == 1 + # assert that there should still be n_blocks pending blocks + assert len([k for k in statuses if statuses[k].state == JobState.PENDING]) == n_blocks # this assert fails due to issue #3696 # Now check scale in happens with 0 load