-
Notifications
You must be signed in to change notification settings - Fork 381
Async
In developing the Azure IoT SDK for Python, we decided to design around the concept of asynchronous operations, providing APIs with methods that could be run simultaneously for better performance. This article assumes you are familiar with asynchronous concepts, as well as their implementation in Python
One of the trickier problems to solve during the implementation of the Python SDK was providing an asynchronous API utilizing the async/await syntax for Python 3.5+, while also maintaining a fully featured synchronous API for compatibility with Python 2.7 which does not support the relevant syntax and libraries to define and use async coroutines.
Providing an API itself was simple - we defined different clients for sync/async scenarios at the highest level of our application layer. However, completely different implementation all the way down the stack was not a viable option given the size and complexity of code in the transport layer - we already required a synchronous implementation of the transport for each of our supported protocols in order to support Python 2.7, re-implementing them all again to be natively async as well was simply too much work to build, and then also maintain in an ongoing basis. We needed to find a way to leverage the synchronous implementation into something that could be used asynchronously.
It is important to note here that Python (at least when using the default interpreter, CPython) has a Global Interpreter Lock, or GIL. What this means is that only one native thread can execute at a time in a given process when running Python code - even on a multi-core processor.
Multithreading does exist in Python, but it works differently than in most other languages. Traditionally, multithreaded applications can use multiple threads for distributed calculations by running them simultaneously, however in Python this is not possible due to the GIL. Instead, in Python the benefit of mulithreading is to take advantage of lost CPU clock cycles due to blocking I/O; the scheduler will automatically switch to a different thread when the current one is blocked. While very useful in some circumstances, this is not truly running threads simultaneously, as only one is ever running at a time.
Since asynchronous code cannot rely on multithreading, async code in Python is written using coroutines. By using the await
syntax in supported versions of Python, a developer can indicate manually when control of a single main thread should be suspended pending a result of another function (generally I/O related), allowing the control of the main thread to be passed to a different, unblocked operation. The crucial point here is that async code is single threaded, despite running multiple operations simultaneously.
In order to emulate async functionality with sync code, a function called emulate_async
was added to the azure.iot.common.asyncio_compat
module. When applied as a decorator to a sync method/function (or called with the method/function as an argument) it will make the method/function compatible with the await
syntax, as well as all relevant portions of the asyncio
library. For all intents and purposes, the function or method emulate_async
is applied to becomes seen to all other code as an async coroutine instead of a synchronous function/method.
This is accomplished by creating a coroutine wrapper that utilizes low-level asyncio
APIs to run the synchronous function/method in the async Event Loop's default ThreadPoolExecutor
. Multiple asynchronous calls mean multiple synchronous calls are each scheduled in a different worker thread in the ThreadPoolExecutor
.
Why is this able to largely emulate async behavior while the GIL is in effect? Because in a native async implementation of the transport methods, the main operations being awaited would be those surrounding network I/O - time spent waiting to send or receive data. When running the synchronous versions of these same methods, they would of course block in the same place regardless of the await
syntax, and due to the multithreading, the scheduler would automatically switch to a different thread, thus allowing multiple operations to occur at more or less the same time by using spare clock cycles.
Thus, using multithreading in this way we can emulate the high-level behavior of a native async code. However, it is only an emulation of high-level async coroutine behavior, wrapped up in a way that appears to be an async coroutine. It is NOT actually replicating the manner in which that behavior is achieved.
Because a native async implementation uses only a single thread and passes around control of the thread at developer designated await
points, an arbitrary number of async operations can be scheduled at once, and given sufficent blocking time, control flow could conceivably pass through all of them. However, under the multithread emulation we have implemented, the amount of operations that can truly be running at once is limited by the amount of worker threads in the ThreadPoolExecutor
, which is by default is CPU * 5
. What this means is that if there are 20 worker threads, and 40 scheduled operations, even if the 20 operations active in the worker threads are all blocked on I/O, none of the additional scheduled operations can utilize these clock cycles, because the ThreadPoolExecutor
is full and has no additional worker threads. One of the active worker threads must finish its operation before any additional operations can begin.
Represented visually:
Let computation time per operation be X
Let blocked I/O time per operation be Y
4 Operations using ThreadPoolExecutor with 4 Worker Threads
Operation 1 --X--> ------Y------>
Operation 2 --X--> ------Y------>
Operation 3 --X--> ------Y------>
Operation 4 --X--> ------Y------>
4 Operations using ThreadPoolExecutor with 2 Worker Threads
Operation 1 --X--> ------Y------>
Operation 2 --X--> ------Y------>
Operation 3 --X--> ------Y------>
Operation 4 --X--> ------Y------>
NOTE THAT OPERATION TIME IS NOT TO SCALE
This example, while extremely simplified from the more complex network I/O scenario we are using in practice, illustrates the way in which a ThreadPoolExecutor
's worker threads can affect the runtime when emulating async behavior. This is to say that, at scale of queued operations, and/or with sufficient value of Y, using async emulation is slower than using a native async implementation, sometimes significantly so. Consider the following code snippets:
OPERATION_COUNT = 800
BLOCKING_TIME = 10
def emulate_async(fn):
async def async_fn_wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, fn, *args, **kwargs)
return async_fn_wrapper
@emulate_async
def blocking_sync_fn():
time.sleep(BLOCKING_TIME) #represents I/O
async def main():
await asyncio.gather(*[asyncio.ensure_future(blocking_sync_fn()) for i in range(OPERATION_COUNT)])
OPERATION_COUNT = 800
BLOCKING_TIME = 10
async def async_fn():
await asyncio.sleep(BLOCKING_TIME) #represents I/O
async def main():
await asyncio.gather(*[asyncio.ensure_future(async_fn()) for i in range(OPERATION_COUNT)])
Each of these code snippets does the same thing: schedules 800 operations that sleep for 10 seconds each. On a machine with 12 CPU threads (60 worker threads per ThreadPoolExecutor
), async emulation took approximately 140 seconds. The native async implementation on the same machine took approximatley 11 seconds.
This could be remedied by not sharing a ThreadPoolExecutor
and thus having to deal with caps on the amount of worker threads. Instead each operation could be run in its own ThreadPoolExecutor
, which speeds up the runtime significantly to be on par with a native implementation in these examples, however this comes with an additional risk. Schedule too many operations under this paradigm and Python will crash from a stack overflow. This is too risky to expose.
So, if the emulation seems so poor, why are we using it? Because in practice, this is an extreme case, and not representative of the problems we face. We are doing network operations which are not the same as a series of 10 second sleeps. It turns out that due to the speed of network operations, there is no significant difference in runtime between our shared ThreadPoolExecutor
emulation approach, and the more accurate but less safe distributed ThreadPoolExecutor
approach.
The issues with the theoretical runtime complexity of our emulation approach are worth knowing however as the Python SDK develops, as they could rear their head as we add more and more features which may handle I/O in different ways. Adding performance tests in the gate to automatically catch any sudden and severe changes in expected runtime is recommended, as problems related to performance at scale may not be immediately apparent and may be hard to track down to their origin.
In the implementation of our SDK, making transports async isn't quite as simple as just applying the asyncio_compat.emulate_async
decorator, for transport code also relies on taking callbacks as arguments, which is not exactly fitting with the async design pattern. Thus, there each transport has an async adapter which provide a clean asynchronous interface, and then create the synchronous callbacks to pass down to the synchronous transport code.
This process is helped by an additional asyncio_compat
module construct, the AwaitableCallback
class. AwaitableCallback
s take a synchronous callback function as an argument, and using the running Event Loop, create a Future
. A wrapping function is then applied to the given callback, which calls the given callback and then also sets the Future
to indicate it has been completed, i.e. the callback has been called. This allows higher level code to await
the completion of an AwaitableCallback
. Furthermore, because the AwaitableCallback
is instantiated in the scope of the async adapter, where there is a running Event Loop, this callback can signal its completion even from down in the synchronous transport layer where there is no Event Loop.
By using multithreading we have successfully been able to emulate asynchronous behavior to leverage our existing synchronous logic. This has allowed us to provide our customers with both synchronous and asynchronous APIs without incurring large amounts of additional development or maintenance. However, the process has theoretical limitations, and we must be aware of them, so we only apply async emulation in domains where it is appropriate.