Skip to content

WorkGraph scheduler

Xing Wang edited this page Sep 3, 2024 · 4 revisions

Backgroud

daemon.worker_process_slots is used as the task_prefetch_count in rmq. The prefetch count setting limits the number of unacknowledged messages that can be in flight at any given time. This prevents any one consumer from being overwhelmed with too many messages simultaneously and helps balance the load across multiple consumers.

Engine

Screenshot from 2024-09-03 13-02-53

The engine operates through a series of message queues managed by RMQ (RabbitMQ). Initially, a user creates a WorkGraph and submits it, leading to the creation of a WorkGraph process node in the database. A message is then sent to the workgraph queue.

The workgraph queue distributes this message among available schedulers, where exactly one scheduler will pick up the task. This scheduler then loads the WorkGraph data from the database, analyzes the task dependencies, and identifies tasks that are ready to execute. These tasks are then submitted, resulting in the creation of task process nodes in the database. Subsequently, a message is sent to the task queue.

A task worker from the task queue will pick up the message, create the necessary task process, and execute it. Upon completion of the task, the worker broadcasts a message indicating that the task is finished. The scheduler then receives this message, re-evaluates the task dependencies, and identifies any new tasks that are ready to run, repeating the cycle.

Daemon worker

The scheduler should not listen to incoming launch requests. It only supports RPC.

@verdi_daemon.command('worker')
@decorators.with_dbenv()
@decorators.requires_broker
def worker():
    """Run a single daemon worker in the current interpreter."""
    from aiida.engine.daemon.worker import start_daemon_worker

    start_daemon_worker(foreground=True)
def create_daemon_runner(self, loop: Optional['asyncio.AbstractEventLoop'] = None) -> 'Runner':
        """Create and return a new daemon runner.

        This is used by workers when the daemon is running and in testing.

        :param loop: the (optional) asyncio event loop to use

        :return: a runner configured to work in the daemon configuration

        """
        from plumpy.persistence import LoadSaveContext

        from aiida.engine import persistence
        from aiida.engine.processes.launcher import ProcessLauncher

        runner = self.create_runner(broker_submit=True, loop=loop)
        runner_loop = runner.loop

        # Listen for incoming launch requests
        task_receiver = ProcessLauncher(
            loop=runner_loop,
            persister=self.get_persister(),
            load_context=LoadSaveContext(runner=runner),
            loader=persistence.get_object_loader(),
        )

        assert runner.communicator is not None, 'communicator not set for runner'
        runner.communicator.add_task_subscriber(task_receiver)

        return runner