diff --git a/axlearn/cloud/common/bastion.py b/axlearn/cloud/common/bastion.py index 6c8bdd28f..e47fd1cbd 100644 --- a/axlearn/cloud/common/bastion.py +++ b/axlearn/cloud/common/bastion.py @@ -750,7 +750,11 @@ def _append_to_project_history( ): now = datetime.now(timezone.utc) for project_id, limits in schedule_results.project_limits.items(): - job_verdicts = schedule_results.job_verdicts.get(project_id, {}) + job_verdicts = { + job_id: verdict + for job_id, verdict in schedule_results.job_verdicts.items() + if jobs[job_id].project_id == project_id + } verdicts = [] for job_id, verdict in job_verdicts.items(): verdicts.append((job_id, verdict.should_run(), verdict.metadata)) @@ -1076,56 +1080,55 @@ def _update_jobs(self): verbosity=schedule_options["verbosity"], ) self._append_to_project_history(schedulable_jobs, schedule_results) - for verdicts in schedule_results.job_verdicts.values(): - for job_name, verdict in verdicts.items(): - job = self._active_jobs[job_name] - assert job.state.status in {JobStatus.PENDING, JobStatus.ACTIVE} - - if verdict: - old_tier = job.state.metadata.get("tier") - new_tier = verdict.metadata.get("tier") - changed_tiers = old_tier != new_tier - - jobspec_changed = job.state.metadata.get("updated") - - # Jobspec changed, trigger a restart of the runner. - if jobspec_changed: - self._append_to_job_history( - job, - msg="UPDATING: Detected updated jobspec. Will restart the runner " - "by sending to PENDING state", - state=JobLifecycleState.UPDATING, - ) - job.state.status = JobStatus.PENDING - elif job.state.status == JobStatus.PENDING or not changed_tiers: - # Resume if not running, or keep running if scheduling tier did not change. - job.state.status = JobStatus.ACTIVE - else: - # Job changed scheduling tiers, and must be restarted on the new tier. - # NOTE: this can possibly lead to thrashing of jobs that frequently switch - # tiers. One option is track per-job tier changes and hold off on promoting - # low priority to high priority if it was demoted recently. - # TODO(markblee): Add instrumentation to track frequency of tier changes to - # see whether this is necessary. - assert job.state.status == JobStatus.ACTIVE and changed_tiers - self._append_to_job_history( - job, - msg=f"Rescheduling at a different tier from {old_tier} to {new_tier}", - state=JobLifecycleState.RESCHEDULING, - ) - job.state.status = JobStatus.PENDING + for job_name, verdict in schedule_results.job_verdicts.items(): + job = self._active_jobs[job_name] + assert job.state.status in {JobStatus.PENDING, JobStatus.ACTIVE} + + if verdict: + old_tier = job.state.metadata.get("tier") + new_tier = verdict.metadata.get("tier") + changed_tiers = old_tier != new_tier + + jobspec_changed = job.state.metadata.get("updated") + + # Jobspec changed, trigger a restart of the runner. + if jobspec_changed: + self._append_to_job_history( + job, + msg="UPDATING: Detected updated jobspec. Will restart the runner " + "by sending to PENDING state", + state=JobLifecycleState.UPDATING, + ) + job.state.status = JobStatus.PENDING + elif job.state.status == JobStatus.PENDING or not changed_tiers: + # Resume if not running, or keep running if scheduling tier did not change. + job.state.status = JobStatus.ACTIVE else: - # Pre-empt/stay queued. - if job.command_proc is not None and _is_proc_complete(job.command_proc): - # As a slight optimization, we avoid pre-empting ACTIVE jobs that are - # complete, since we can directly transition to CLEANING. - job.state.status = JobStatus.ACTIVE - else: - job.state.status = JobStatus.PENDING - # Pending jobs which are not rescheduled should have no tier information. - verdict.metadata.pop("tier", None) - - job.state.metadata = verdict.metadata + # Job changed scheduling tiers, and must be restarted on the new tier. + # NOTE: this can possibly lead to thrashing of jobs that frequently switch + # tiers. One option is track per-job tier changes and hold off on promoting + # low priority to high priority if it was demoted recently. + # TODO(markblee): Add instrumentation to track frequency of tier changes to + # see whether this is necessary. + assert job.state.status == JobStatus.ACTIVE and changed_tiers + self._append_to_job_history( + job, + msg=f"Rescheduling at a different tier from {old_tier} to {new_tier}", + state=JobLifecycleState.RESCHEDULING, + ) + job.state.status = JobStatus.PENDING + else: + # Pre-empt/stay queued. + if job.command_proc is not None and _is_proc_complete(job.command_proc): + # As a slight optimization, we avoid pre-empting ACTIVE jobs that are + # complete, since we can directly transition to CLEANING. + job.state.status = JobStatus.ACTIVE + else: + job.state.status = JobStatus.PENDING + # Pending jobs which are not rescheduled should have no tier information. + verdict.metadata.pop("tier", None) + + job.state.metadata = verdict.metadata # TODO(markblee): Parallelize this. for job_name, job in self._active_jobs.items(): diff --git a/axlearn/cloud/common/cleaner.py b/axlearn/cloud/common/cleaner.py index f697889e0..1cfbc530e 100644 --- a/axlearn/cloud/common/cleaner.py +++ b/axlearn/cloud/common/cleaner.py @@ -89,6 +89,6 @@ def sweep(self, jobs: dict[str, JobSpec]) -> Sequence[str]: schedule_result = scheduler.schedule( dict(my_job=job_spec.metadata), ) - if schedule_result.job_verdicts[job_spec.metadata.project_id]["my_job"].over_limits: + if schedule_result.job_verdicts["my_job"].over_limits: result.append(job_name) return result diff --git a/axlearn/cloud/common/scheduler.py b/axlearn/cloud/common/scheduler.py index 9d3e1440b..ba340c634 100644 --- a/axlearn/cloud/common/scheduler.py +++ b/axlearn/cloud/common/scheduler.py @@ -154,12 +154,15 @@ class ScheduleResults: Attributes: project_limits: The effective resource limits. project_usages: The resource usages. - job_verdicts: A mapping of project_id -> (job_id -> run_or_not). + job_verdicts: A mapping of job_id -> run_or_not. + The entries will be ordered by descending scheduling priorities (not necessarily + JobMetadata.priority), where the higher priority jobs will be scheduled before + lower priority ones. The jobs not getting scheduled will also be ordered. """ project_limits: ProjectResourceMap[int] project_usages: ProjectResourceMap[int] - job_verdicts: dict[str, dict[str, JobVerdict]] + job_verdicts: dict[str, JobVerdict] def schedule( self, @@ -345,8 +348,8 @@ def traverse_tiers( break return tier_usages - job_verdicts = collections.defaultdict(dict) - while not project_queue.empty() and remaining_limits: + job_verdicts = {} + while not project_queue.empty(): project_usage_ratio, _, project_id = project_queue.get() job_id, job_metadata = project_jobs[project_id].popleft() @@ -381,17 +384,10 @@ def traverse_tiers( "Schedule %s(%s)/%s: %s", project_id, project_usage_ratio, job_id, verdict ) - job_verdicts[project_id][job_id] = verdict + job_verdicts[job_id] = verdict if project_jobs[project_id]: project_queue.put(project_queue_item(project_id)) - # Remaining jobs are rejected. - for project_id, job_queue in project_jobs.items(): - for job_id, job_metadata in job_queue: - job_verdicts[project_id][job_id] = JobVerdict( - over_limits=set(job_metadata.resources.keys()) - ) - return BaseScheduler.ScheduleResults( # Treat the usages as the limits. project_limits=_recursively_to_dict(project_usages), @@ -472,14 +468,15 @@ def schedule( logging.info("") logging.info("==Begin scheduling report") logging.info("Total resource limits: %s", resource_limits) - for project_id, project_verdicts in schedule_results.job_verdicts.items(): + for project_id, project_job_queue in project_jobs.items(): logging.info( "Verdicts for Project [%s] Quota [%s] Effective limits [%s]:", project_id, project_quotas.get(project_id, {}), schedule_results.project_limits.get(project_id, {}), ) - for job_name, job_verdict in project_verdicts.items(): + for job_name, job_metadata in project_job_queue: + job_verdict = schedule_results.job_verdicts[job_name] logging.info( "Job %s: Resources [%s] Over limits [%s] Should Run? [%s] Metadata [%s]", job_name, @@ -500,9 +497,6 @@ def schedule( schedule_results = BaseScheduler.ScheduleResults( project_limits=schedule_results.project_limits, project_usages=project_usages, - job_verdicts={ - project_id: {job_name: JobVerdict() for job_name in project_verdicts} - for project_id, project_verdicts in schedule_results.job_verdicts.items() - }, + job_verdicts={job_name: JobVerdict() for job_name in schedule_results.job_verdicts}, ) return schedule_results diff --git a/axlearn/cloud/common/scheduler_test.py b/axlearn/cloud/common/scheduler_test.py index 96e51202f..56530b26d 100644 --- a/axlearn/cloud/common/scheduler_test.py +++ b/axlearn/cloud/common/scheduler_test.py @@ -15,7 +15,6 @@ JobMetadata, JobQueue, JobScheduler, - JobVerdict, ProjectJobSorter, TierScheduler, _compute_total_limits, @@ -207,9 +206,10 @@ def test_validate_tiers(self): sched.schedule(resource_limits=[{"v4": 10, "v3": 3}], **common_kwargs) - @parameterized.parameters( + @parameterized.named_parameters( # Test a case where all jobs fit into tier 0. dict( + testcase_name="all_tier_0", project_jobs={ "a": (("a1", _mock_job_metadata({"v4": 5})),), "b": ( @@ -219,11 +219,14 @@ def test_validate_tiers(self): }, expected_project_limits={"a": {"v4": 5}, "b": {"v4": 5}}, expected_verdicts={"a1": True, "b1": True, "b2": True}, - expected_tiers={"a1": 0, "b1": 0, "b2": 0}, + # The order of entries in `expected_tiers` should reflect the job priorities. + # Here "b1" is ahead of "a1" because it requests fewer resources. + expected_tiers={"b1": 0, "a1": 0, "b2": 0}, ), # Test tie-break. Although both are requesting the same amount, "b" is requesting more # relative to its quota, so "a" will be prioritized. dict( + testcase_name="tiebreak_by_relative_demand", project_jobs={ "b": (("b1", _mock_job_metadata({"v4": 7})),), "a": (("a1", _mock_job_metadata({"v4": 7})),), @@ -236,6 +239,7 @@ def test_validate_tiers(self): # Test tie-break. Since both have the same quotas, we will tie-break using creation time. # In this case, a1 is created first. dict( + testcase_name="tiebreak_by_creation_time", project_jobs={ "a": ( ( @@ -258,6 +262,7 @@ def test_validate_tiers(self): # Test when a higher priority job does not fit into tier 0, thus allowing a lower priority # job to schedule onto tier 0. dict( + testcase_name="high_priority_forced_into_tier_1", project_jobs={ "a": (("a1", _mock_job_metadata({"v4": 12})),), "b": ( @@ -267,11 +272,12 @@ def test_validate_tiers(self): }, expected_project_limits={"a": {"v4": 12}, "b": {"v4": 3}}, expected_verdicts={"a1": True, "b1": True, "b2": True}, - expected_tiers={"a1": 1, "b1": 0, "b2": 0}, + expected_tiers={"b1": 0, "b2": 0, "a1": 1}, ), # In this case, "a" is requesting much more relative to its quota than "b", so "b" gets to # go first (so "a" doesn't fit). dict( + testcase_name="lower_demand_goes_first", project_jobs={ "a": (("a1", _mock_job_metadata({"v4": 13})),), "b": ( @@ -281,10 +287,11 @@ def test_validate_tiers(self): }, expected_project_limits={"a": {"v4": 0}, "b": {"v4": 6}}, expected_verdicts={"a1": False, "b1": True, "b2": True}, - expected_tiers={"a1": None, "b1": 0, "b2": 0}, + expected_tiers={"b1": 0, "b2": 0, "a1": None}, ), # Test that leftover resources from reserved tier are schedulable by subsequent tiers. dict( + testcase_name="leftover_from_reserved_tier", project_jobs={ "a": (("a1", _mock_job_metadata({"v4": 7})),), "b": ( @@ -294,10 +301,12 @@ def test_validate_tiers(self): }, expected_project_limits={"a": {"v4": 7}, "b": {"v4": 8}}, expected_verdicts={"a1": True, "b1": True, "b2": True}, - expected_tiers={"a1": 0, "b1": 0, "b2": 1}, + # "b1" is ahead of "a1" since it requests fewer resource. + expected_tiers={"b1": 0, "a1": 0, "b2": 1}, ), # Test load balance. dict( + testcase_name="load_balance", project_jobs={ "a": tuple((f"a{i}", _mock_job_metadata({"v4": 1})) for i in range(3)), "b": tuple((f"b{i}", _mock_job_metadata({"v4": 1})) for i in range(3)), @@ -321,14 +330,18 @@ def test_validate_tiers(self): "c0": 0, "a1": 1, "b1": 1, - "a2": None, - "b2": None, + # While the rest of the jobs are not scheduled, the order still reflects + # their priorities. + # Since "c" gets the least resource, its jobs take priority over those from a/b. "c1": None, "c2": None, + "a2": None, + "b2": None, }, ), # Test projects with no quotas. dict( + testcase_name="projects_with_no_quota", project_jobs={ "a": tuple((f"a{i}", _mock_job_metadata({"v4": 1})) for i in range(3)), "b": tuple((f"b{i}", _mock_job_metadata({"v4": 1})) for i in range(3)), @@ -348,19 +361,22 @@ def test_validate_tiers(self): for i in range(3) }, expected_tiers={ + # "b" has quota for "v4", so its jobs get priorities. "b0": 0, "b1": 0, "b2": 1, + # "a" and "c" jobs are interleaved in scheduling. "a0": 2, "c0": 2, "a1": None, - "a2": None, "c1": None, + "a2": None, "c2": None, }, ), # Test quotas of different scales. dict( + testcase_name="quotas_of_different_scales", project_jobs={ "a": ( ("a1", _mock_job_metadata({"v3": 1})), @@ -393,6 +409,7 @@ def test_validate_tiers(self): # Test that we cannot exceed total limit. # Note that missing resource types implicitly have limit 0. dict( + testcase_name="cannot_exceed_total_limit", project_jobs={ "a": (("a1", _mock_job_metadata({"v4": 1, "v3": 2})),), "b": ( @@ -403,10 +420,11 @@ def test_validate_tiers(self): resource_limits=[{"v4": 3}], expected_project_limits={"a": {"v4": 0, "v3": 0}, "b": {"v4": 2}}, expected_verdicts={"a1": False, "b1": True, "b2": False}, - expected_tiers={"a1": None, "b1": 0, "b2": None}, + expected_tiers={"b1": 0, "b2": None, "a1": None}, ), # Test that we can accumulate across tiers. Jobs should schedule onto the final tier. dict( + testcase_name="accumulation_across_tiers", project_jobs={ "a": (("a1", _mock_job_metadata({"v4": 1, "v3": 2})),), "b": ( @@ -417,10 +435,13 @@ def test_validate_tiers(self): resource_limits=[{"v4": 1}, {"v4": 1}, {"v4": 1}], expected_project_limits={"a": {"v4": 0, "v3": 0}, "b": {"v4": 3}}, expected_verdicts={"a1": False, "b1": True, "b2": False}, - expected_tiers={"a1": None, "b1": 2, "b2": None}, + # While both "a1" and "b2" are not scheduled, "b2" is ranked ahead of "a1" because + # its demand/limit ratio is lower (there's no v3 resource, so a1's ratio is infinite). + expected_tiers={"b1": 2, "b2": None, "a1": None}, ), # Test that we can accumulate across tiers across resource types. dict( + testcase_name="accumulation_across_tiers_resource_types", project_jobs={ "a": (("a1", _mock_job_metadata({"v4": 1, "v3": 2})),), "b": ( @@ -436,6 +457,7 @@ def test_validate_tiers(self): ), # Test that we acquire resources in reverse-tier-order. dict( + testcase_name="reverse_tier_order", project_jobs={ "a": (("a1", _mock_job_metadata({"v4": 1})),), "b": ( @@ -453,6 +475,7 @@ def test_validate_tiers(self): ), # Test that we acquire resources in reverse-tier-order (multiple resources). dict( + testcase_name="reverse_tier_order_multi_resource", project_jobs={ "a": (("a1", _mock_job_metadata({"v4": 1, "v3": 2})),), "b": ( @@ -470,6 +493,7 @@ def test_validate_tiers(self): ), # Test scheduling jobs with no demands. dict( + testcase_name="jobs_with_no_demands", project_jobs={ "a": (("a1", _mock_job_metadata({})),), "b": ( @@ -483,6 +507,7 @@ def test_validate_tiers(self): ), # Test a case where some resource types are invalid. dict( + testcase_name="invalid_resource_types", project_jobs={ "a": ( ("a1", _mock_job_metadata({"v4": 5})), @@ -497,7 +522,7 @@ def test_validate_tiers(self): }, expected_project_limits={"a": {"v4": 5}, "b": {"unknown": 0, "v4": 1}}, expected_verdicts={"a1": True, "a2": False, "b1": False, "b2": True}, - expected_tiers={"a1": 0, "a2": None, "b1": None, "b2": 0}, + expected_tiers={"b2": 0, "a1": 0, "a2": None, "b1": None}, ), ) def test_schedule( @@ -527,11 +552,7 @@ def test_schedule( ) # project_limits should reflect limits across tiers. self.assertEqual(expected_project_limits, results.project_limits) - job_verdicts: dict[str, JobVerdict] = { - job_name: verdict - for project_verdicts in results.job_verdicts.values() - for job_name, verdict in project_verdicts.items() - } + job_verdicts = results.job_verdicts # Check that verdicts are expected. self.assertEqual( expected_verdicts, @@ -545,6 +566,8 @@ def test_schedule( for job_name, job_verdict in job_verdicts.items() }, ) + # Check that the order of jobs in `job_verdicts` matches that in `expected_tiers`. + self.assertEqual(list(job_verdicts.keys()), list(expected_tiers.keys())) def _mock_get_resource_limits(*args): @@ -627,11 +650,7 @@ def test_init(self, dry_run: bool): results = sched.schedule(jobs, dry_run=dry_run, verbosity=1) # Get verdicts by job name. - job_verdicts: dict[str, JobVerdict] = { - job_name: verdict - for project_verdicts in results.job_verdicts.values() - for job_name, verdict in project_verdicts.items() - } + job_verdicts = results.job_verdicts if dry_run: # All of the jobs should be scheduled, regardless. expected = {"a": True, "b": True, "c": True, "d": True, "e": True, "f": True} @@ -674,11 +693,7 @@ def test_leftover(self): for index, proj in enumerate(["a", "b", "c"]) } results = sched.schedule(jobs) - job_verdicts: dict[str, JobVerdict] = { - job_name: verdict - for project_verdicts in results.job_verdicts.values() - for job_name, verdict in project_verdicts.items() - } + job_verdicts = results.job_verdicts # Two of the older jobs should run, even though every job's demand exceeds the project # limit. self.assertEqual( @@ -726,11 +741,7 @@ def test_leftover(self): results = sched.schedule(jobs) # Get verdicts by job name. - job_verdicts: dict[str, JobVerdict] = { - job_name: verdict - for project_verdicts in results.job_verdicts.values() - for job_name, verdict in project_verdicts.items() - } + job_verdicts = results.job_verdicts expected = { # The first job of each project will get scheduled. "a1": True,