Skip to content

Commit

Permalink
Fix new scheduled tasks jumping the queue
Browse files Browse the repository at this point in the history
Currently, when a new scheduled task is added and its scheduled time has
already passed, we set it to ACTIVE. This is problematic, because it means it
will jump the queue ahead of all other SCHEDULED tasks; furthermore, if the
Synapse process gets restarted, it will jump ahead of any ACTIVE tasks which
have been started but are taking a while to run.

Instead, we leave it set to SCHEDULED, but kick off a call to
`_launch_scheduled_tasks`, which will decide if we actually have capacity to
start a new task, and start the newly-added task if so.
  • Loading branch information
richvdh committed Nov 26, 2024
1 parent cd7d90b commit afb7d08
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 39 deletions.
1 change: 1 addition & 0 deletions changelog.d/17962.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix new scheduled tasks jumping the queue.
2 changes: 1 addition & 1 deletion synapse/replication/tcp/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ def to_line(self) -> str:


class NewActiveTaskCommand(_SimpleCommand):
"""Sent to inform instance handling background tasks that a new active task is available to run.
"""Sent to inform instance handling background tasks that a new task is ready to run.
Format::
Expand Down
2 changes: 1 addition & 1 deletion synapse/replication/tcp/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,7 @@ def on_NEW_ACTIVE_TASK(
) -> None:
"""Called when get a new NEW_ACTIVE_TASK command."""
if self._task_scheduler:
self._task_scheduler.launch_task_by_id(cmd.data)
self._task_scheduler.on_new_task(cmd.data)

def new_connection(self, connection: IReplicationConnection) -> None:
"""Called when we have a new connection."""
Expand Down
30 changes: 13 additions & 17 deletions synapse/util/task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,10 @@ async def schedule_task(
The id of the scheduled task
"""
status = TaskStatus.SCHEDULED
start_now = False
if timestamp is None or timestamp < self._clock.time_msec():
timestamp = self._clock.time_msec()
status = TaskStatus.ACTIVE
start_now = True

task = ScheduledTask(
random_string(16),
Expand All @@ -190,9 +191,11 @@ async def schedule_task(
)
await self._store.insert_scheduled_task(task)

if status == TaskStatus.ACTIVE:
# If the task is ready to run immediately, run the scheduling algorithm now
# rather than waiting
if start_now:
if self._run_background_tasks:
await self._launch_task(task)
self._launch_scheduled_tasks()
else:
self._hs.get_replication_command_handler().send_new_active_task(task.id)

Expand Down Expand Up @@ -300,23 +303,14 @@ async def delete_task(self, id: str) -> None:
raise Exception(f"Task {id} is currently ACTIVE and can't be deleted")
await self._store.delete_scheduled_task(id)

def launch_task_by_id(self, id: str) -> None:
"""Try launching the task with the given ID."""
# Don't bother trying to launch new tasks if we're already at capacity.
if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
return

run_as_background_process("launch_task_by_id", self._launch_task_by_id, id)

async def _launch_task_by_id(self, id: str) -> None:
"""Helper async function for `launch_task_by_id`."""
task = await self.get_task(id)
if task:
await self._launch_task(task)
def on_new_task(self, task_id: str) -> None:
"""Handle a notification that a new ready-to-run task has been added to the queue"""
# Just run the scheduler
self._launch_scheduled_tasks()

@wrap_as_background_process("launch_scheduled_tasks")
async def _launch_scheduled_tasks(self) -> None:
"""Retrieve and launch scheduled tasks that should be running at that time."""
"""Retrieve and launch scheduled tasks that should be running."""
# Don't bother trying to launch new tasks if we're already at capacity.
if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
return
Expand All @@ -330,6 +324,8 @@ async def _launch_scheduled_tasks(self) -> None:
for task in await self.get_tasks(
statuses=[TaskStatus.ACTIVE], limit=self.MAX_CONCURRENT_RUNNING_TASKS
):
# _launch_task will ignore tasks that we're already running, and
# will also do nothing if we're already at the maximum capacity.
await self._launch_task(task)
for task in await self.get_tasks(
statuses=[TaskStatus.SCHEDULED],
Expand Down
49 changes: 29 additions & 20 deletions tests/util/test_task_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
# [This file includes modifications made by New Vector Limited]
#
#

from typing import Optional, Tuple
from typing import List, Optional, Tuple

from twisted.internet.task import deferLater
from twisted.test.proto_helpers import MemoryReactor
Expand Down Expand Up @@ -104,33 +103,43 @@ def test_schedule_lot_of_tasks(self) -> None:
)
)

# This is to give the time to the active tasks to finish
def get_tasks_of_status(status: TaskStatus) -> List[ScheduledTask]:
tasks = (
self.get_success(self.task_scheduler.get_task(task_id))
for task_id in task_ids
)
return [t for t in tasks if t is not None and t.status == status]

# At this point, there should be MAX_CONCURRENT_RUNNING_TASKS active tasks and
# one scheduled task.
self.assertEquals(
len(get_tasks_of_status(TaskStatus.ACTIVE)),
TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS,
)
self.assertEquals(
len(get_tasks_of_status(TaskStatus.SCHEDULED)),
1,
)

# Give the time to the active tasks to finish
self.reactor.advance(1)

# Check that only MAX_CONCURRENT_RUNNING_TASKS tasks has run and that one
# Check that MAX_CONCURRENT_RUNNING_TASKS tasks have run and that one
# is still scheduled.
tasks = [
self.get_success(self.task_scheduler.get_task(task_id))
for task_id in task_ids
]

self.assertEquals(
len(
[t for t in tasks if t is not None and t.status == TaskStatus.COMPLETE]
),
len(get_tasks_of_status(TaskStatus.COMPLETE)),
TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS,
)

scheduled_tasks = [
t for t in tasks if t is not None and t.status == TaskStatus.ACTIVE
]
scheduled_tasks = get_tasks_of_status(TaskStatus.SCHEDULED)
self.assertEquals(len(scheduled_tasks), 1)

# We need to wait for the next run of the scheduler loop
self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
self.reactor.advance(1)
# The scheduled task should start 0.1s after the first of the active tasks
# finishes
self.reactor.advance(0.1)
self.assertEquals(len(get_tasks_of_status(TaskStatus.ACTIVE)), 1)

# Check that the last task has been properly executed after the next scheduler loop run
# ... and should finally complete after another second
self.reactor.advance(1)
prev_scheduled_task = self.get_success(
self.task_scheduler.get_task(scheduled_tasks[0].id)
)
Expand Down

0 comments on commit afb7d08

Please sign in to comment.