Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix rounding error in htex block scale in #3721

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions parsl/jobs/strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,14 @@ def _general_strategy(self, executors: List[BlockProviderExecutor], *, strategy_
logger.debug(f"Strategy case 2b: active_blocks {active_blocks} < max_blocks {max_blocks} so scaling out")
excess_slots = math.ceil((active_tasks * parallelism) - active_slots)
excess_blocks = math.ceil(float(excess_slots) / (tasks_per_node * nodes_per_block))
print(f"BENC: active_tasks {active_tasks}")
print(f"BENC: active_slots {active_slots}")
print(f"BENC: excess slots {excess_slots}")
print(f"BENC: tpn {tasks_per_node}")
print(f"BENC: npb {nodes_per_block}")
print(f"BENC: excess blocks {excess_blocks}")
print(f"BENC: max blocks {max_blocks}")
print(f"BENC: active blocks {active_blocks}")
excess_blocks = min(excess_blocks, max_blocks - active_blocks)
logger.debug(f"Requesting {excess_blocks} more blocks")
executor.scale_out_facade(excess_blocks)
Expand All @@ -298,8 +306,8 @@ def _general_strategy(self, executors: List[BlockProviderExecutor], *, strategy_
# Scale in for htex
if isinstance(executor, HighThroughputExecutor):
if active_blocks > min_blocks:
excess_slots = math.ceil(active_slots - (active_tasks * parallelism))
excess_blocks = math.ceil(float(excess_slots) / (tasks_per_node * nodes_per_block))
excess_slots = math.floor(active_slots - (active_tasks * parallelism))
excess_blocks = math.floor(float(excess_slots) / (tasks_per_node * nodes_per_block))
excess_blocks = min(excess_blocks, active_blocks - min_blocks)
logger.debug(f"Requesting scaling in by {excess_blocks} blocks with idle time {self.max_idletime}s")
executor.scale_in_facade(excess_blocks, max_idletime=self.max_idletime)
Expand Down
92 changes: 92 additions & 0 deletions parsl/tests/test_scaling/test_regression_3696_oscillation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import math
from unittest.mock import MagicMock

import pytest

from parsl.executors.high_throughput.executor import HighThroughputExecutor
from parsl.jobs.states import JobState, JobStatus
from parsl.jobs.strategy import Strategy


@pytest.mark.local
@pytest.mark.parametrize("ns", [(14, 48, 1), # issue #3696 regression
(1, 1, 1), # basic one task/one block behaviour
(100, 1, 20), # many one-task blocks, hitting max blocks
(47, 48, 1), # edge cases around #3696
(48, 48, 1), # "
(49, 48, 2), # "
(149, 50, 3)]) # "
def test_htex_strategy_does_not_oscillate(ns):
"""Check for oscillations in htex scaling.
In issue 3696, with a large number of workers per block
and a smaller number of active tasks, the htex scaling
strategy oscillates between 0 and 1 active block, rather
than converging to 1 active block.

The choices of 14 tasks and 48 workers per node are taken
from issue #3696.
"""

n_tasks, n_workers, n_blocks = ns

s = Strategy(strategy='htex_auto_scale', max_idletime=0)

provider = MagicMock()
executor = MagicMock(spec=HighThroughputExecutor)

statuses = {}

executor.provider = provider
executor.outstanding = n_tasks
executor.status_facade = statuses
executor.workers_per_node = n_workers

provider.parallelism = 1
provider.init_blocks = 0
provider.min_blocks = 0
provider.max_blocks = 20
provider.nodes_per_block = 1

def scale_out(n):
for _ in range(n):
statuses[len(statuses)] = JobStatus(state=JobState.PENDING)

executor.scale_out_facade.side_effect = scale_out

def scale_in(n, max_idletime=None):
# find n PENDING jobs and set them to CANCELLED
for k in statuses:
if n == 0:
return
if statuses[k].state == JobState.PENDING:
statuses[k].state = JobState.CANCELLED
n -= 1

executor.scale_in_facade.side_effect = scale_in

s.add_executors([executor])

# In issue #3696, this first strategise does initial and load based
# scale outs, because n_tasks > n_workers*0
s.strategize([executor])

executor.scale_out_facade.assert_called()
assert len(statuses) == n_blocks, "Should have launched n_blocks"
assert len([k for k in statuses if statuses[k].state == JobState.PENDING]) == n_blocks
# there might be several calls to scale_out_facade inside strategy,
# but the end effect should be that exactly one block is scaled out.

executor.scale_in_facade.assert_not_called()

# In issue #3696, this second strategize does a scale in, because n_tasks < n_workers*1
s.strategize([executor])

# assert that there should still be n_blocks pending blocks
assert len([k for k in statuses if statuses[k].state == JobState.PENDING]) == n_blocks
# this assert fails due to issue #3696

# Now check scale in happens with 0 load
executor.outstanding = 0
s.strategize([executor])
executor.scale_in_facade.assert_called()
assert len([k for k in statuses if statuses[k].state == JobState.PENDING]) == 0