Skip to content

Commit

Permalink
Don't create new session in stuck queue reschedule handler (apache#44192
Browse files Browse the repository at this point in the history
)

This is a fix up / followup to apache#43520

It does not really make a material difference, just, I'm avoiding use of the session decorator, and the create / dispose session logic, when it is not needed. i also commit as i go along since there's no reason to handle multiple distinct tis in the same transaction.
  • Loading branch information
dstandish authored Nov 19, 2024
1 parent 931a6b9 commit a825c95
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 10 deletions.
8 changes: 4 additions & 4 deletions airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -1800,6 +1800,7 @@ def _handle_tasks_stuck_in_queued(self, session: Session = NEW_SESSION) -> None:
ti=ti,
session=session,
)
session.commit()
except NotImplementedError:
# this block only gets entered if the executor has not implemented `revoke_task`.
# in which case, we try the fallback logic
Expand Down Expand Up @@ -1838,7 +1839,7 @@ def _maybe_requeue_stuck_ti(self, *, ti, session):
),
)
)
self._reschedule_stuck_task(ti)
self._reschedule_stuck_task(ti, session=session)
else:
self.log.info(
"Task requeue attempts exceeded max; marking failed. task_instance=%s",
Expand Down Expand Up @@ -1875,8 +1876,7 @@ def _stuck_in_queued_backcompat_logic(self, executor, stuck_tis):
ti_repr,
)

@provide_session
def _reschedule_stuck_task(self, ti, session=NEW_SESSION):
def _reschedule_stuck_task(self, ti: TaskInstance, session: Session):
session.execute(
update(TI)
.where(TI.filter_for_tis([ti]))
Expand All @@ -1890,7 +1890,7 @@ def _reschedule_stuck_task(self, ti, session=NEW_SESSION):
@provide_session
def _get_num_times_stuck_in_queued(self, ti: TaskInstance, session: Session = NEW_SESSION) -> int:
"""
Check the Log table to see how many times a taskinstance has been stuck in queued.
Check the Log table to see how many times a task instance has been stuck in queued.
We can then use this information to determine whether to reschedule a task or fail it.
"""
Expand Down
9 changes: 3 additions & 6 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2276,8 +2276,7 @@ def _queue_tasks(tis):
scheduler._task_queued_timeout = -300 # always in violation of timeout

with _loader_mock(mock_executors):
scheduler._handle_tasks_stuck_in_queued(session=session)

scheduler._handle_tasks_stuck_in_queued()
# If the task gets stuck in queued once, we reset it to scheduled
tis = dr.get_task_instances(session=session)
assert [x.state for x in tis] == ["scheduled", "scheduled"]
Expand All @@ -2291,8 +2290,7 @@ def _queue_tasks(tis):
]

with _loader_mock(mock_executors):
scheduler._handle_tasks_stuck_in_queued(session=session)
session.commit()
scheduler._handle_tasks_stuck_in_queued()

log_events = [x.event for x in session.scalars(select(Log).where(Log.run_id == run_id)).all()]
assert log_events == [
Expand All @@ -2307,8 +2305,7 @@ def _queue_tasks(tis):
_queue_tasks(tis=tis)

with _loader_mock(mock_executors):
scheduler._handle_tasks_stuck_in_queued(session=session)
session.commit()
scheduler._handle_tasks_stuck_in_queued()
log_events = [x.event for x in session.scalars(select(Log).where(Log.run_id == run_id)).all()]
assert log_events == [
"stuck in queued reschedule",
Expand Down

0 comments on commit a825c95

Please sign in to comment.