Skip to content

Commit

Permalink
Encourage threaded activities, warn when max_workers too low, and oth…
Browse files Browse the repository at this point in the history
…er small changes (#387)
  • Loading branch information
cretz authored Oct 6, 2023
1 parent d03f356 commit ded3747
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 62 deletions.
75 changes: 48 additions & 27 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ execute asynchronous, long-running business logic in a scalable and resilient wa
"Temporal Python SDK" is the framework for authoring workflows and activities using the Python programming language.

Also see:
* [Application Development Guide](https://docs.temporal.io/application-development?lang=python) - Once you've tried our [Quick Start](#quick-start), check out our guide on how to use Temporal in your Python applications, including information around Temporal core concepts.
* [Application Development Guide](https://docs.temporal.io/application-development?lang=python) - Once you've tried our
[Quick Start](#quick-start), check out our guide on how to use Temporal in your Python applications, including
information around Temporal core concepts.
* [Python Code Samples](https://github.com/temporalio/samples-python)
* [API Documentation](https://python.temporal.io) - Complete Temporal Python SDK Package reference.

Expand Down Expand Up @@ -84,10 +86,10 @@ informal introduction to the features and their implementation.
- [Activities](#activities)
- [Definition](#definition-1)
- [Types of Activities](#types-of-activities)
- [Asynchronous Activities](#asynchronous-activities)
- [Synchronous Activities](#synchronous-activities)
- [Synchronous Multithreaded Activities](#synchronous-multithreaded-activities)
- [Synchronous Multiprocess/Other Activities](#synchronous-multiprocessother-activities)
- [Asynchronous Activities](#asynchronous-activities)
- [Activity Context](#activity-context)
- [Heartbeating and Cancellation](#heartbeating-and-cancellation)
- [Worker Shutdown](#worker-shutdown)
Expand All @@ -111,7 +113,9 @@ informal introduction to the features and their implementation.

# Quick Start

We will guide you through the Temporal basics to create a "hello, world!" script on your machine. It is not intended as one of the ways to use Temporal, but in reality it is very simplified and decidedly not "the only way" to use Temporal. For more information, check out the docs references in "Next Steps" below the quick start.
We will guide you through the Temporal basics to create a "hello, world!" script on your machine. It is not intended as
one of the ways to use Temporal, but in reality it is very simplified and decidedly not "the only way" to use Temporal.
For more information, check out the docs references in "Next Steps" below the quick start.

## Installation

Expand All @@ -136,7 +140,7 @@ Create the following in `activities.py`:
from temporalio import activity

@activity.defn
async def say_hello(name: str) -> str:
def say_hello(name: str) -> str:
return f"Hello, {name}!"
```

Expand All @@ -163,6 +167,7 @@ Create the following in `run_worker.py`:

```python
import asyncio
import concurrent.futures
from temporalio.client import Client
from temporalio.worker import Worker

Expand All @@ -175,8 +180,15 @@ async def main():
client = await Client.connect("localhost:7233")

# Run the worker
worker = Worker(client, task_queue="my-task-queue", workflows=[SayHello], activities=[say_hello])
await worker.run()
with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
worker = Worker(
client,
task_queue="my-task-queue",
workflows=[SayHello],
activities=[say_hello],
activity_executor=activity_executor,
)
await worker.run()

if __name__ == "__main__":
asyncio.run(main())
Expand Down Expand Up @@ -235,8 +247,9 @@ give you much more information about how Temporal works with Python:

# Usage

From here, you will find reference documentation about specific pieces of the Temporal Python SDK that were built around Temporal concepts.
*This section is not intended as a how-to guide* -- For more how-to oriented information, check out the links in the [Next Steps](#next-steps) section above.
From here, you will find reference documentation about specific pieces of the Temporal Python SDK that were built around
Temporal concepts. *This section is not intended as a how-to guide* -- For more how-to oriented information, check out
the links in the [Next Steps](#next-steps) section above.

### Client

Expand Down Expand Up @@ -269,6 +282,7 @@ Some things to note about the above code:
does the same thing
* Clients can have many more options not shown here (e.g. data converters and interceptors)
* A string can be used instead of the method reference to call a workflow by name (e.g. if defined in another language)
* Clients to not work across forks

Clients also provide a shallow copy of their config for use in making slightly different clients backed by the same
connection. For instance, given the `client` above, this is how to have a client in another namespace:
Expand Down Expand Up @@ -516,7 +530,7 @@ class GreetingInfo:
name: str = "<unknown>"

@activity.defn
async def create_greeting_activity(info: GreetingInfo) -> str:
def create_greeting_activity(info: GreetingInfo) -> str:
return f"{info.salutation}, {info.name}!"
```

Expand Down Expand Up @@ -1044,13 +1058,14 @@ Activities are decorated with `@activity.defn` like so:
from temporalio import activity

@activity.defn
async def say_hello_activity(name: str) -> str:
def say_hello_activity(name: str) -> str:
return f"Hello, {name}!"
```

Some things to note about activity definitions:

* The `say_hello_activity` is `async` which is the recommended activity type (see "Types of Activities" below)
* The `say_hello_activity` is synchronous which is the recommended activity type (see "Types of Activities" below), but
it can be `async`
* A custom name for the activity can be set with a decorator argument, e.g. `@activity.defn(name="my activity")`
* Long running activities should regularly heartbeat and handle cancellation
* Activities can only have positional arguments. Best practice is to only take a single argument that is an
Expand All @@ -1066,19 +1081,8 @@ Some things to note about activity definitions:

#### Types of Activities

There are 3 types of activity callables accepted and described below: asynchronous, synchronous multithreaded, and
synchronous multiprocess/other. Only positional parameters are allowed in activity callables.

##### Asynchronous Activities

Asynchronous activities, i.e. functions using `async def`, are the recommended activity type. When using asynchronous
activities no special worker parameters are needed.

Cancellation for asynchronous activities is done via
[`asyncio.Task.cancel`](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel). This means that
`asyncio.CancelledError` will be raised (and can be caught, but it is not recommended). A non-local activity must
heartbeat to receive cancellation and there are other ways to be notified about cancellation (see "Activity Context" and
"Heartbeating and Cancellation" later).
There are 3 types of activity callables accepted and described below: synchronous multithreaded, synchronous
multiprocess/other, and asynchronous. Only positional parameters are allowed in activity callables.

##### Synchronous Activities

Expand All @@ -1102,8 +1106,9 @@ will fail and shutdown.
###### Synchronous Multithreaded Activities

If `activity_executor` is set to an instance of `concurrent.futures.ThreadPoolExecutor` then the synchronous activities
are considered multithreaded activities. Besides `activity_executor`, no other worker parameters are required for
synchronous multithreaded activities.
are considered multithreaded activities. If `max_workers` is not set to at least the worker's
`max_concurrent_activities` setting a warning will be issued. Besides `activity_executor`, no other worker parameters
are required for synchronous multithreaded activities.

By default, cancellation of a synchronous multithreaded activity is done via a `temporalio.exceptions.CancelledError`
thrown into the activity thread. Activities that do not wish to have cancellation thrown can set
Expand All @@ -1118,6 +1123,8 @@ there is a return statement within, it will throw the cancellation if there was

If `activity_executor` is set to an instance of `concurrent.futures.Executor` that is _not_
`concurrent.futures.ThreadPoolExecutor`, then the synchronous activities are considered multiprocess/other activities.
Users should prefer threaded activities over multiprocess ones since, among other reasons, threaded activities can raise
on cancellation.

These require special primitives for heartbeating and cancellation. The `shared_state_manager` worker parameter must be
set to an instance of `temporalio.worker.SharedStateManager`. The most common implementation can be created by passing a
Expand All @@ -1127,6 +1134,20 @@ set to an instance of `temporalio.worker.SharedStateManager`. The most common im
Also, all of these activity functions must be
["picklable"](https://docs.python.org/3/library/pickle.html#what-can-be-pickled-and-unpickled).

##### Asynchronous Activities

Asynchronous activities are functions defined with `async def`. Asynchronous activities are often much more performant
than synchronous ones. When using asynchronous activities no special worker parameters are needed.

**⚠️ WARNING: Do not block the thread in `async def` Python functions. This can stop the processing of the rest of the
Temporal.**

Cancellation for asynchronous activities is done via
[`asyncio.Task.cancel`](https://docs.python.org/3/library/asyncio-task.html#asyncio.Task.cancel). This means that
`asyncio.CancelledError` will be raised (and can be caught, but it is not recommended). A non-local activity must
heartbeat to receive cancellation and there are other ways to be notified about cancellation (see "Activity Context" and
"Heartbeating and Cancellation" later).

#### Activity Context

During activity execution, an implicit activity context is set as a
Expand Down Expand Up @@ -1155,7 +1176,7 @@ occurs. Synchronous activities cannot call any of the `async` functions.
In order for a non-local activity to be notified of cancellation requests, it must be given a `heartbeat_timeout` at
invocation time and invoke `temporalio.activity.heartbeat()` inside the activity. It is strongly recommended that all
but the fastest executing activities call this function regularly. "Types of Activities" has specifics on cancellation
for asynchronous and synchronous activities.
for synchronous and asynchronous activities.

In addition to obtaining cancellation information, heartbeats also support detail data that is persisted on the server
for retrieval during activity retry. If an activity calls `temporalio.activity.heartbeat(123, 456)` and then fails and
Expand Down
2 changes: 2 additions & 0 deletions temporalio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ class Client:
than where it was created, make sure the event loop where it was created is
captured, and then call :py:func:`asyncio.run_coroutine_threadsafe` with the
client call and that event loop.
Clients do not work across forks since runtimes do not work across forks.
"""

@staticmethod
Expand Down
2 changes: 2 additions & 0 deletions temporalio/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class Runtime:
Users are encouraged to use :py:meth:`default`. It can be set with
:py:meth:`set_default`. Every time a new runtime is created, a new internal
thread pool is created.
Runtimes do not work across forks.
"""

@staticmethod
Expand Down
25 changes: 20 additions & 5 deletions temporalio/worker/_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import hashlib
import logging
import sys
import warnings
from datetime import timedelta
from typing import Any, Awaitable, Callable, List, Optional, Sequence, Type, cast

Expand Down Expand Up @@ -93,11 +94,14 @@ def __init__(
workflows: Set of workflow classes decorated with
:py:func:`@workflow.defn<temporalio.workflow.defn>`.
activity_executor: Concurrent executor to use for non-async
activities. This is required if any activities are non-async. If
this is a :py:class:`concurrent.futures.ProcessPoolExecutor`,
all non-async activities must be picklable. Note, a broken
executor failure from this executor will cause the worker to
fail and shutdown.
activities. This is required if any activities are non-async.
:py:class:`concurrent.futures.ThreadPoolExecutor` is
recommended. If this is a
:py:class:`concurrent.futures.ProcessPoolExecutor`, all
non-async activities must be picklable. ``max_workers`` on the
executor should at least be ``max_concurrent_activities`` or a
warning is issued. Note, a broken-executor failure from this
executor will cause the worker to fail and shutdown.
workflow_task_executor: Thread pool executor for workflow tasks. If
this is not present, a new
:py:class:`concurrent.futures.ThreadPoolExecutor` will be
Expand Down Expand Up @@ -262,6 +266,17 @@ def __init__(
self._activity_worker: Optional[_ActivityWorker] = None
runtime = bridge_client.config.runtime or temporalio.runtime.Runtime.default()
if activities:
# Issue warning here if executor max_workers is lower than max
# concurrent activities. We do this here instead of in
# _ActivityWorker so the stack level is predictable.
max_workers = getattr(activity_executor, "_max_workers", None)
if isinstance(max_workers, int) and max_workers < max_concurrent_activities:
warnings.warn(
f"Worker max_concurrent_activities is {max_concurrent_activities} "
+ f"but activity_executor's max_workers is only {max_workers}",
stacklevel=2,
)

self._activity_worker = _ActivityWorker(
bridge_worker=lambda: self._bridge_worker,
task_queue=task_queue,
Expand Down
Loading

0 comments on commit ded3747

Please sign in to comment.