-
Notifications
You must be signed in to change notification settings - Fork 57
/
hello_signal.py
74 lines (58 loc) · 2.21 KB
/
hello_signal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import asyncio
from typing import List
from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker
@workflow.defn
class GreetingWorkflow:
def __init__(self) -> None:
self._pending_greetings: asyncio.Queue[str] = asyncio.Queue()
self._exit = False
@workflow.run
async def run(self) -> List[str]:
# Continually handle from queue or wait for exit to be received
greetings: List[str] = []
while True:
# Wait for queue item or exit
await workflow.wait_condition(
lambda: not self._pending_greetings.empty() or self._exit
)
# Drain and process queue
while not self._pending_greetings.empty():
greetings.append(f"Hello, {self._pending_greetings.get_nowait()}")
# Exit if complete
if self._exit:
return greetings
@workflow.signal
async def submit_greeting(self, name: str) -> None:
await self._pending_greetings.put(name)
@workflow.signal
def exit(self) -> None:
self._exit = True
async def main():
# Start client
client = await Client.connect("localhost:7233")
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-signal-task-queue",
workflows=[GreetingWorkflow],
):
# While the worker is running, use the client to start the workflow.
# Note, in many production setups, the client would be in a completely
# separate process from the worker.
handle = await client.start_workflow(
GreetingWorkflow.run,
id="hello-signal-workflow-id",
task_queue="hello-signal-task-queue",
)
# Send a few signals for names, then signal it to exit
await handle.signal(GreetingWorkflow.submit_greeting, "user1")
await handle.signal(GreetingWorkflow.submit_greeting, "user2")
await handle.signal(GreetingWorkflow.submit_greeting, "user3")
await handle.signal(GreetingWorkflow.exit)
# Show result
result = await handle.result()
print(f"Result: {result}")
if __name__ == "__main__":
asyncio.run(main())