Skip to content

Commit

Permalink
Rework parallel doc example using CalcJob's
Browse files Browse the repository at this point in the history
The previous example did rely on calcfunctions that are always
run sequentially. This example now uses CalcJobs to actually
achieve parallel executions.
  • Loading branch information
agoscinski committed Sep 10, 2024
1 parent 3e1b35d commit c977986
Showing 1 changed file with 185 additions and 88 deletions.
273 changes: 185 additions & 88 deletions docs/gallery/howto/autogen/parallel.py
Original file line number Diff line number Diff line change
@@ -1,148 +1,245 @@
"""
=======================
=====================
Run tasks in parallel
=======================
=====================
"""
# %%
# Introduction
# ============
# In this tutorial, you will learn how to run task in parallel.
#
# Load the AiiDA profile.
#

# In this tutorial, you will learn how to run tasks and WorkGraphs in parallel.
# When defining the dependencies WorkGraph by linking tasks the WorkGraph
# engine will automatically take care of parallelizing the independent tasks. One
# caveat is that we cannot use calcfunctions for this purpose as they all run
# in the same runner environment and therefore are blocking each other. For
# that reason we need to use `CalcJob`s that can be run in different runner
# environments and therefore can be run in parallel.

# Load the AiiDA profile.
from aiida import load_profile

load_profile()


# %%
# First workflow
# ==============
# Suppose we want to calculate ```(x + y) * z ``` in two steps. First, add `x` and `y`, then multiply the result with `z`. And `X` is a list of values. We want to calculate these in parallel.
#
# Create task
# ------------
# First, one should know that we can not launch a subprocess inside a `task` or a `calcfunction`. We need a create a `WorkGraph` to run tasksin parallel. And then treat this `WorkGraph` as a task.
#
# Parallel addition workflow
# ==========================
# Suppose we want to calculate ```x + y + u + v``` in a parallel, instead of
# computing sequentially ```(((x + y) + u) + v)``` we compute it like
# ```((x + y) + (u + v))``` to compute ```x + y``` and ```u + v``` in parallel.
# aiida-core already provides a ArithmeticAddCalculation CalcJob for performing
# addition which we will use it for this example

from aiida_workgraph import WorkGraph, task
from aiida.calculations.arithmetic.add import ArithmeticAddCalculation
from aiida.orm import Int, InstalledCode, load_computer, load_code, load_node
from aiida.common.exceptions import NotExistent

# The ArithmeticAddCalculation needs to know where bash is stored
try:
code = load_code("add@localhost") # The computer label can also be omitted here
except NotExistent:
code = InstalledCode(
computer=load_computer("localhost"),
filepath_executable="/bin/bash",
label="add",
default_calc_job_plugin="core.arithmetic.add",
).store()

wg = WorkGraph("parallel")
x, y, u, v = (1, 2, 3, 4)
add_xy = wg.add_task(ArithmeticAddCalculation, x=x, y=y, code=code)
add_xy.set({"metadata.options.sleep": 5}) # the CalcJob will sleep 5 seconds
add_uv = wg.add_task(ArithmeticAddCalculation, x=u, y=v, code=code)
add_uv.set({"metadata.options.sleep": 5}) # the CalcJob will sleep 5 seconds
add_xyuv = wg.add_task(
ArithmeticAddCalculation,
x=add_xy.outputs["sum"],
y=add_uv.outputs["sum"],
code=code,
)
wg.submit(wait=True)

# %%
# We look at the ctime (the time of creation when submitted/run) and the mtime (the time the task has been last modified which is when its state changes to finish).
print("add_xy created at:", add_xy.ctime.time(), "finished at:", add_xy.mtime.time())
print("add_uv created at:", add_uv.ctime.time(), "finished at:", add_uv.mtime.time())

# %%
# We can see that both CalcJob's have been created almost at the same time

# %%
# Comparison with a calcfunction
# ------------------------------
#

from aiida_workgraph import task, WorkGraph

# define multiply task
@task.calcfunction()
def multiply(x, y):
return x * y
def add(x, y, sleep):
import time

time.sleep(sleep.value)
return x + y

# Create a WorkGraph as a task
@task.graph_builder()
def multiply_parallel(X, y):
wg = WorkGraph()
# here the task `multiply` is created and will run in parallel
for key, value in X.items():
wg.add_task(multiply, name=f"multiply_{key}", x=value, y=y)
return wg

wg = WorkGraph("parallel")
x, y, u, v = (1, 2, 3, 4)
add_xy = wg.add_task(add, x=x, y=y, sleep=5)
add_uv = wg.add_task(add, x=x, y=y, sleep=5)
add_xyuv = wg.add_task(
add, x=add_xy.outputs["result"], y=add_uv.outputs["result"], sleep=0
)

# %%
# Create the workflow
# ---------------------
#
wg.submit(wait=True)

from aiida_workgraph import WorkGraph
from aiida.orm import Int, List
# %%
# Printing timings

X = {"a": Int(1), "b": Int(2), "c": Int(3)}
y = Int(2)
z = Int(3)
wg = WorkGraph("parallel_tasks")
multiply_parallel1 = wg.add_task(multiply_parallel, name="multiply_parallel1", X=X, y=y)
print("add_xy created at", add_xy.ctime.time(), "finished at", add_xy.mtime.time())
print("add_uv created at", add_uv.ctime.time(), "finished at", add_uv.mtime.time())

wg.submit(wait=True)
# %%
# We can see that the calcfunctions have been run with a 5 seconds delay


# %%
# Check the status and results
# -----------------------------
#
# Parallelizing WorkGraphs
# ========================
# We will parallelize a workgraph by two ways, one time we submit all workgraphs,
# the other time we use the graph builder to submit once the whole workflow.


print("State of WorkGraph: {}".format(wg.state))
# This is our initial WorkGraph we want to parallelize
@task.graph_builder(
inputs=[{"name": "integer"}], outputs=[{"name": "sum", "from": "sum_task.result"}]
)
def add10_wg(integer):
wg = WorkGraph()
code = load_code("add@localhost") # code needs to loaded in the graph builder
add = wg.add_task(
ArithmeticAddCalculation, name="sum_task", x=10, y=integer, code=code
)
add.set({"metadata.options.sleep": 5})
return wg


# %%
# Generate node graph from the AiiDA process:
#

from aiida_workgraph.utils import generate_node_graph
wgs = []
for i in range(2):
wg = WorkGraph(f"parallel_wg{i}")
wg.add_task(add10_wg, name=f"add10_{i}", integer=i)
wgs.append(wg)

generate_node_graph(wg.pk)
# We use wait=False so we can continue submitting
wgs[0].submit(wait=False)
# we wait for the last WorkGraph to finish
wgs[1].submit(wait=True)

# %%
# Second workflow: gather results
# ================================
# Now I want to gather the results from the previous `multiply_parallel` tasks and calculate the sum of all their results.
# Let's update the `multiply_parallel` function to `multiply_parallel_gather`.
#
# We print the difference between the mtime (the time the WorkGraph has been last time changed) and the ctime (the time of creation). Since the WorkGraph's status is changed when finished, this give us a good estimate of the running time.
print(
"WG0 created:",
load_node(wgs[0].pk).ctime.time(),
"finished:",
load_node(wgs[0].pk).mtime.time(),
)
print("Time WG0", load_node(wgs[0].pk).mtime - load_node(wgs[0].pk).ctime)
print(
"WG1 created:",
load_node(wgs[1].pk).ctime.time(),
"finished:",
load_node(wgs[1].pk).mtime.time(),
)
print("Time WG1", load_node(wgs[1].pk).mtime - load_node(wgs[1].pk).ctime)


# %%
# Using graph builder
# -------------------


@task.graph_builder(outputs=[{"name": "result", "from": "context.mul"}])
def multiply_parallel_gather(X, y):
# This graph_builder runs the add10_wg over a loop and its
@task.graph_builder()
def parallel_add(nb_it):
wg = WorkGraph()
for key, value in X.items():
multiply1 = wg.add_task(multiply, x=value, y=y)
# add result of multiply1 to `self.context.mul`
# self.context.mul is a dict {"a": value1, "b": value2, "c": value3}
multiply1.set_context({"result": f"mul.{key}"})
for i in range(nb_it):
wg.add_task(add10_wg, name=f"add10_{i}", integer=i)
return wg


@task.calcfunction()
# the input is dynamic, we must use a variable kewword argument. **datas
def sum(**datas):
from aiida.orm import Float
# Submitting a parallel that adds 10 two times to different numbers
wg = WorkGraph(f"parallel_graph_builder")
add_task = wg.add_task(parallel_add, name="parallel_add", nb_it=2)
wg.submit(wait=True)

total = 0
for key, data in datas.items():
total += data
return Float(total)

# %%
# We look at the times of creation and last change
print("Time for running with graph builder", add_task.mtime - add_task.ctime)

# %%
# Now, let's create a `WorkGraph` to use the new task:
#
# We can see that the time is more than 5 seconds which means that the two additions
# were performed in parallel

from aiida_workgraph import WorkGraph
from aiida.orm import Int, List
# %%
# Increasing number of daemon workers
# -----------------------------------
# Since each daemon worker can only manage one WorkGraph (handling the results)
# at a time, one can experience slow downs when running many jobs that can be
# run in parallel. The optimal number of workers depends highly on the jobs
# that are run.

from aiida.engine.daemon.client import get_daemon_client

client = get_daemon_client()

X = {"a": Int(1), "b": Int(2), "c": Int(3)}
y = Int(2)
z = Int(3)
wg = WorkGraph("parallel_tasks")
multiply_parallel_gather1 = wg.add_task(multiply_parallel_gather, X=X, y=y)
sum1 = wg.add_task(sum, name="sum1")
# wg.add_link(add1.outputs[0], multiply_parallel_gather1.inputs["uuids"])
wg.add_link(multiply_parallel_gather1.outputs[0], sum1.inputs[0])
# %%
# We rerun the last graph builder for 5 iterations

wg = WorkGraph("wg_daemon_worker_1")
wg.add_task(parallel_add, name="parallel_add", nb_it=5)
wg.submit(wait=True)
print(
f"Time for running with {client.get_numprocesses()['numprocesses']} worker",
load_node(wg.pk).mtime - load_node(wg.pk).ctime,
)

# %%
# Get the result of the tasks:
#
# We increase the number of workers by one. One can also do this in the workgraph GUI.

client = get_daemon_client()
client.increase_workers(1)

print("State of WorkGraph: {}".format(wg.state))
print("Result of task add1: {}".format(wg.tasks["sum1"].outputs["result"].value))
# %%
# Now we submit again and the time have shortens a bit.

wg = WorkGraph("wg_daemon_worker_2")
wg.add_task(parallel_add, name="parallel_add", nb_it=5)
wg.submit(wait=True)
print(
f"Time for running with {client.get_numprocesses()['numprocesses']} worker",
load_node(wg.pk).mtime - load_node(wg.pk).ctime,
)

# %%
# Generate node graph from the AiiDA process:
#
# Note that on readthedocs you will not see a big difference due to the hardware.
# With a limited number of CPU the workers cannot be parallelized

from aiida_workgraph.utils import generate_node_graph
import multiprocessing

generate_node_graph(wg.pk)
print("Number of CPUs", multiprocessing.cpu_count())

# %%
# You can see that the outputs of `multiply_parallel_gather` workgraph is linked to the input of the `sum` task.
# Reset back to one worker
client.decrease_workers(1)

# %%
# Maximum number of active WorkGraphs
# -----------------------------------
# Be aware that for the moment AiiDA can only run 200 WorkGraphs at the same time.
# To increase that limit one can set this variable to a higher value.
#
# .. code-block:: bash
#
# verdi config set daemon.worker_process_slots 200
# verdi daemon restart

0 comments on commit c977986

Please sign in to comment.