Skip to content

WorkGraph scheduler

Xing Wang edited this page Sep 2, 2024 · 4 revisions

Backgroud

daemon.worker_process_slots is used as the task_prefetch_count in rmq. The prefetch count setting limits the number of unacknowledged messages that can be in flight at any given time. This prevents any one consumer from being overwhelmed with too many messages simultaneously and helps balance the load across multiple consumers.

Daemon worker

The scheduler should not listen to incoming launch requests. It only supports RPC.

@verdi_daemon.command('worker')
@decorators.with_dbenv()
@decorators.requires_broker
def worker():
    """Run a single daemon worker in the current interpreter."""
    from aiida.engine.daemon.worker import start_daemon_worker

    start_daemon_worker(foreground=True)
def create_daemon_runner(self, loop: Optional['asyncio.AbstractEventLoop'] = None) -> 'Runner':
        """Create and return a new daemon runner.

        This is used by workers when the daemon is running and in testing.

        :param loop: the (optional) asyncio event loop to use

        :return: a runner configured to work in the daemon configuration

        """
        from plumpy.persistence import LoadSaveContext

        from aiida.engine import persistence
        from aiida.engine.processes.launcher import ProcessLauncher

        runner = self.create_runner(broker_submit=True, loop=loop)
        runner_loop = runner.loop

        # Listen for incoming launch requests
        task_receiver = ProcessLauncher(
            loop=runner_loop,
            persister=self.get_persister(),
            load_context=LoadSaveContext(runner=runner),
            loader=persistence.get_object_loader(),
        )

        assert runner.communicator is not None, 'communicator not set for runner'
        runner.communicator.add_task_subscriber(task_receiver)

        return runner
Clone this wiki locally