Skip to content

WorkGraph scheduler

Xing Wang edited this page Aug 27, 2024 · 4 revisions

Backgroud

Daemon worker

The scheduler should not listen to incoming launch requests. It only supports RPC.

@verdi_daemon.command('worker')
@decorators.with_dbenv()
@decorators.requires_broker
def worker():
    """Run a single daemon worker in the current interpreter."""
    from aiida.engine.daemon.worker import start_daemon_worker

    start_daemon_worker(foreground=True)
def create_daemon_runner(self, loop: Optional['asyncio.AbstractEventLoop'] = None) -> 'Runner':
        """Create and return a new daemon runner.

        This is used by workers when the daemon is running and in testing.

        :param loop: the (optional) asyncio event loop to use

        :return: a runner configured to work in the daemon configuration

        """
        from plumpy.persistence import LoadSaveContext

        from aiida.engine import persistence
        from aiida.engine.processes.launcher import ProcessLauncher

        runner = self.create_runner(broker_submit=True, loop=loop)
        runner_loop = runner.loop

        # Listen for incoming launch requests
        task_receiver = ProcessLauncher(
            loop=runner_loop,
            persister=self.get_persister(),
            load_context=LoadSaveContext(runner=runner),
            loader=persistence.get_object_loader(),
        )

        assert runner.communicator is not None, 'communicator not set for runner'
        runner.communicator.add_task_subscriber(task_receiver)

        return runner
Clone this wiki locally