Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message-processing design patterns #114

Draft
wants to merge 41 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
5b44135
reuse policy
dandavison May 14, 2024
abec6c0
Signal and update design patterns
dandavison May 22, 2024
6d2df90
Carry state over CAN
dandavison May 22, 2024
0f91210
Serialized, non-ordered handling of n messages
dandavison May 22, 2024
8060e50
Add return values to WashAndDryCycle sample
drewhoskins May 22, 2024
06e56c2
rename directory
dandavison May 23, 2024
53fd1af
Whitespace
dandavison May 23, 2024
67f70ee
Appease type checker
dandavison May 23, 2024
3790247
Silence warnings in 3rd party code
dandavison May 23, 2024
cdd24e8
Tests
dandavison May 23, 2024
511ce73
Use non-blocking queue and drain it before CAN
dandavison May 23, 2024
59d0c69
Cleanup
dandavison May 23, 2024
cccaf6e
Queue can be created before workflow start
dandavison May 23, 2024
4ba0d00
Cleanup
dandavison May 23, 2024
2988e44
cleanup
dandavison May 23, 2024
62f5848
Add explanatory comment
dandavison May 23, 2024
6bba635
Clarify and abstract
dandavison May 23, 2024
59e53a5
Clarify and abstract II
dandavison May 23, 2024
dcb8f38
Atomic Message Handlers w/ Stateful Workflow sample
drewhoskins May 24, 2024
d5c1e51
Rename variables task -> job for consistency with API
dandavison May 24, 2024
c0b1dc4
Fix type annotations
dandavison May 24, 2024
965f1f9
Add some documentation of ClusterManager
dandavison May 24, 2024
ee47f1c
Add @cretz's example solution
dandavison May 24, 2024
2c82d05
New APIs
dandavison May 24, 2024
5166484
Delete non-current work
dandavison May 29, 2024
efd8a52
Job runner notes
dandavison May 26, 2024
00c77a8
Job runner base
dandavison May 28, 2024
16835af
Job runner I1
dandavison May 28, 2024
887b729
Job runner I2
dandavison May 29, 2024
4f86b4c
Modify I2 for native
dandavison May 30, 2024
a75fb69
Job runner I2 native base
dandavison May 30, 2024
158eb78
Start sketching I2 native
dandavison May 30, 2024
87052fb
Job runner I1 native base
dandavison May 30, 2024
1473af6
Start sketching I1 native
dandavison May 30, 2024
c73b8b7
Try to prototype new decorator
dandavison May 30, 2024
7064b7a
Revert "Try to prototype new decorator"
dandavison May 30, 2024
0f335b4
Continue sketching
dandavison May 30, 2024
d2cf21f
Add top-level explanatory comments to files
dandavison May 30, 2024
bd9fc67
rm notes
dandavison May 30, 2024
569eed1
Introduce max_concurrent
dandavison May 30, 2024
139100d
Resurrect potentially useful additional sample with test
dandavison May 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ asyncio_mode = "auto"
log_cli = true
log_cli_level = "INFO"
log_cli_format = "%(asctime)s [%(levelname)8s] %(message)s (%(filename)s:%(lineno)s)"
filterwarnings = [
"ignore::DeprecationWarning:google\\..*",
"ignore::DeprecationWarning:importlib\\..*"
]

[tool.isort]
profile = "black"
Expand Down
95 changes: 95 additions & 0 deletions tests/update/serialized_handling_of_n_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import asyncio
import logging
import uuid
from dataclasses import dataclass
from unittest.mock import patch

import temporalio.api.common.v1
import temporalio.api.enums.v1
import temporalio.api.update.v1
import temporalio.api.workflowservice.v1
from temporalio.client import Client, WorkflowHandle
from temporalio.worker import Worker
from temporalio.workflow import UpdateMethodMultiParam

from update.serialized_handling_of_n_messages import (
MessageProcessor,
Result,
get_current_time,
)


async def test_continue_as_new_doesnt_lose_updates(client: Client):
with patch(
"temporalio.workflow.Info.is_continue_as_new_suggested", return_value=True
):
tq = str(uuid.uuid4())
wf = await client.start_workflow(
MessageProcessor.run, id=str(uuid.uuid4()), task_queue=tq
)
update_requests = [
UpdateRequest(wf, MessageProcessor.process_message, i) for i in range(10)
]
for req in update_requests:
await req.wait_until_admitted()

async with Worker(
client,
task_queue=tq,
workflows=[MessageProcessor],
activities=[get_current_time],
):
for req in update_requests:
update_result = await req.task
assert update_result.startswith(req.expected_result_prefix())


@dataclass
class UpdateRequest:
wf_handle: WorkflowHandle
update: UpdateMethodMultiParam
sequence_number: int

def __post_init__(self):
self.task = asyncio.Task[Result](
self.wf_handle.execute_update(self.update, args=[self.arg], id=self.id)
)

async def wait_until_admitted(self):
while True:
try:
return await self._poll_update_non_blocking()
except Exception as err:
logging.warning(err)

async def _poll_update_non_blocking(self):
req = temporalio.api.workflowservice.v1.PollWorkflowExecutionUpdateRequest(
namespace=self.wf_handle._client.namespace,
update_ref=temporalio.api.update.v1.UpdateRef(
workflow_execution=temporalio.api.common.v1.WorkflowExecution(
workflow_id=self.wf_handle.id,
run_id="",
),
update_id=self.id,
),
identity=self.wf_handle._client.identity,
)
res = await self.wf_handle._client.workflow_service.poll_workflow_execution_update(
req
)
# TODO: @cretz how do we work with these raw proto objects?
assert "stage: UPDATE_WORKFLOW_EXECUTION_LIFECYCLE_STAGE_ADMITTED" in str(res)

@property
def arg(self) -> str:
return str(self.sequence_number)

@property
def id(self) -> str:
return str(self.sequence_number)

def expected_result_prefix(self) -> str:
# TODO: Currently the server does not send updates to the worker in order of admission When
# this is fixed (https://github.com/temporalio/temporal/pull/5831), we can make a stronger
# assertion about the activity numbers used to construct each result.
return f"{self.arg}-result"
222 changes: 222 additions & 0 deletions update/atomic_message_handlers_with_stateful_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
import asyncio
from datetime import timedelta
import logging
from typing import Dict, List, Optional

from temporalio import activity, common, workflow
from temporalio.client import Client, WorkflowHandle
from temporalio.worker import Worker

@activity.defn
async def allocate_nodes_to_job(nodes: List[int], job_name: str):
print(f"Assigning nodes {nodes} to job {job_name}")
await asyncio.sleep(0.1)

@activity.defn
async def deallocate_nodes_for_job(nodes: List[int], job_name: str):
print(f"Deallocating nodes {nodes} from job {job_name}")
await asyncio.sleep(0.1)

@activity.defn
async def find_bad_nodes(nodes: List[int]) -> List[int]:
await asyncio.sleep(0.1)
bad_nodes = [n for n in nodes if n % 5 == 0]
print(f"Found bad nodes: {bad_nodes}")
return bad_nodes

# This samples shows off
# - Making signal and update handlers only operate when the workflow is within a certain state
# (here between cluster_started and cluster_shutdown)
# - Using a lock to protect shared state shared by the workflow and its signal and update handlers
# interleaving writes
# - Running start_workflow with an initializer signal that you want to run before anything else.
@workflow.defn
class ClusterManager:
"""
A workflow to manage a cluster of compute nodes.

The cluster is transitioned between operational and non-operational states by two signals:
`start_cluster` and `shutdown_cluster`.

While it is active, the workflow maintains a mapping of nodes to assigned job, and exposes the
following API (implemented as updates):

- allocate_n_nodes_to_job: attempt to find n free nodes, assign them to the job; return assigned node IDs
- delete_job: unassign any nodes assigned to job; return a success acknowledgement
- resize_job: assign or unassign nodes as needed; return assigned node IDs

An API call made while the cluster is non-operational will block until the cluster is
operational.

If an API call is made while another is in progress, it will block until all other thus-enqueued
requests are complete.
"""

def __init__(self) -> None:
self.cluster_started = False
self.cluster_shutdown = False
self.nodes_lock = asyncio.Lock()

@workflow.signal
async def start_cluster(self):
self.cluster_started = True
self.nodes : Dict[int, Optional[str]] = dict([(k, None) for k in range(25)])
workflow.logger.info("Cluster started")

@workflow.signal
async def shutdown_cluster(self):
await workflow.wait_condition(lambda: self.cluster_started)
self.cluster_shutdown = True
workflow.logger.info("Cluster shut down")

@workflow.update
async def allocate_n_nodes_to_job(self, job_name: str, num_nodes: int, ) -> List[int]:
"""
Attempt to find n free nodes, assign them to the job, return assigned node IDs.
"""
await workflow.wait_condition(lambda: self.cluster_started)
assert not self.cluster_shutdown

await self.nodes_lock.acquire()
try:
unassigned_nodes = [k for k, v in self.nodes.items() if v is None]
if len(unassigned_nodes) < num_nodes:
raise ValueError(f"Cannot allocate {num_nodes} nodes; have only {len(unassigned_nodes)} available")
assigned_nodes = unassigned_nodes[:num_nodes]
await self._allocate_nodes_to_job(assigned_nodes, job_name)
return assigned_nodes
finally:
self.nodes_lock.release()


async def _allocate_nodes_to_job(self, assigned_nodes: List[int], job_name: str):
await workflow.execute_activity(
allocate_nodes_to_job, args=[assigned_nodes, job_name], start_to_close_timeout=timedelta(seconds=10)
)
for node in assigned_nodes:
self.nodes[node] = job_name


@workflow.update
async def delete_job(self, job_name: str) -> str:
"""
Unassign any nodes assigned to job; return a success acknowledgement.
"""
await workflow.wait_condition(lambda: self.cluster_started)
assert not self.cluster_shutdown
await self.nodes_lock.acquire()
try:
nodes_to_free = [k for k, v in self.nodes.items() if v == job_name]
await self._deallocate_nodes_for_job(nodes_to_free, job_name)
return "Done"
finally:
self.nodes_lock.release()

async def _deallocate_nodes_for_job(self, nodes_to_free: List[int], job_name: str):
await workflow.execute_activity(
deallocate_nodes_for_job, args=[nodes_to_free, job_name], start_to_close_timeout=timedelta(seconds=10)
)
for node in nodes_to_free:
self.nodes[node] = None


@workflow.update
async def resize_job(self, job_name: str, new_size: int) -> List[int]:
"""
Assign or unassign nodes as needed; return assigned node IDs.
"""
await workflow.wait_condition(lambda: self.cluster_started)
assert not self.cluster_shutdown
await self.nodes_lock.acquire()
try:
allocated_nodes = [k for k, v in self.nodes.items() if v == job_name]
delta = new_size - len(allocated_nodes)
if delta == 0:
return allocated_nodes
elif delta > 0:
unassigned_nodes = [k for k, v in self.nodes.items() if v is None]
if len(unassigned_nodes) < delta:
raise ValueError(f"Cannot allocate {delta} nodes; have only {len(unassigned_nodes)} available")
nodes_to_assign = unassigned_nodes[:delta]
await self._allocate_nodes_to_job(nodes_to_assign, job_name)
return allocated_nodes + nodes_to_assign
else:
nodes_to_deallocate = allocated_nodes[delta:]
await self._deallocate_nodes_for_job(nodes_to_deallocate, job_name)
return list(filter(lambda x: x not in nodes_to_deallocate, allocated_nodes))
finally:
self.nodes_lock.release()

async def perform_health_checks(self):
await self.nodes_lock.acquire()
try:
assigned_nodes = [k for k, v in self.nodes.items() if v is not None]
bad_nodes = await workflow.execute_activity(find_bad_nodes, assigned_nodes, start_to_close_timeout=timedelta(seconds=10))
for node in bad_nodes:
self.nodes[node] = "BAD!"
finally:
self.nodes_lock.release()

@workflow.run
async def run(self):
await workflow.wait_condition(lambda: self.cluster_started)

while True:
try:
await workflow.wait_condition(lambda: self.cluster_shutdown, timeout=timedelta(seconds=1))
except asyncio.TimeoutError:
pass
await self.perform_health_checks()

# Now we can start allocating jobs to nodes
await workflow.wait_condition(lambda: self.cluster_shutdown)


async def do_cluster_lifecycle(wf: WorkflowHandle):

allocation_updates = []
for i in range(6):

allocation_updates.append(wf.execute_update(ClusterManager.allocate_n_nodes_to_job, args=[f"job-{i}", 2]))
await asyncio.gather(*allocation_updates)
resize_updates = []
for i in range(6):
resize_updates.append(wf.execute_update(ClusterManager.resize_job, args=[f"job-{i}", 4]))
await asyncio.gather(*resize_updates)

deletion_updates = []
for i in range(6):
deletion_updates.append(wf.execute_update(ClusterManager.delete_job, f"job-{i}"))
await asyncio.gather(*deletion_updates)

await wf.signal(ClusterManager.shutdown_cluster)
print("Cluster shut down")



async def main():
client = await Client.connect("localhost:7233")

async with Worker(
client,
task_queue="tq",
workflows=[ClusterManager],
activities=[allocate_nodes_to_job, deallocate_nodes_for_job, find_bad_nodes],
):
wf = await client.start_workflow(
ClusterManager.run,
id="wid",
task_queue="tq",
id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING,
start_signal='start_cluster',

)
await do_cluster_lifecycle(wf)


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
asyncio.run(main())



Loading
Loading