Skip to content

Commit

Permalink
Merge pull request #503 from TeskaLabs/docs/task-proactor
Browse files Browse the repository at this point in the history
Documentation for TaskService and ProactorService
  • Loading branch information
mejroslav authored Sep 19, 2023
2 parents cbf6093 + 29c3c8b commit 26093ad
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 107 deletions.
23 changes: 14 additions & 9 deletions asab/proactor/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@


class ProactorService(asab.Service):
"""
Proactor service is useful for running CPU bound operations from asynchronous part of the code that would potentially block the main thread.
It allows to run these processes from different threads.
"""

def __init__(self, app, service_name):
super().__init__(app, service_name)
Expand All @@ -17,7 +21,8 @@ def __init__(self, app, service_name):
max_workers = None

self.Executor = concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers,
max_workers=max_workers, # The maximum number of threads that can be used to execute the given calls.
# If None, ThreadPoolExecutor will determine the number itself based on number of CPU's.
thread_name_prefix="AsabProactorThread"
)

Expand All @@ -27,18 +32,18 @@ def __init__(self, app, service_name):

# There was the method run, which is obsolete
def execute(self, func, *args):
'''
The `execute` method executes func(*args) in the thread from the Proactor Service pool.
The method returns the future/task that MUST BE awaited and it provides the result of the func() call.
'''
"""
Execute `func(*args)` in the thread from the Proactor Service pool.
Return Future or Task that must be awaited and it provides the result of the `func()` call.
"""
return self.Loop.run_in_executor(self.Executor, func, *args)


def schedule(self, func, *args):
'''
The `schedule` method executes func(*args) in the thread from the Proactor Service pool.
The result of the future is discarted (using Task Service)
'''
"""
Execute `func(*args)` in the thread from the Proactor Service pool.
The result of the future is discarded (using Task Service).
"""

future = self.execute(func, *args)
self.App.TaskService.schedule(future)
38 changes: 17 additions & 21 deletions asab/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@


class TaskService(asab.Service):

'''
"""
Task service is for managed execution of fire-and-forget, one-off, background tasks.
The task is a coroutine, future (asyncio.ensure_future) or task (asyncio.create_task).
The task is executed in the main event loop.
Task can be a coroutine, `asyncio.Future` or `asyncio.Task` and it is executed in the main event loop.
The result of the task is collected (and discarted) automatically
and if there was an exception, it will be printed to the log.
'''
The result of the task is collected (and discarded) automatically.
When the task raises Exception, it will be printed to the log.
"""

def __init__(self, app, service_name="asab.TaskService"):
super().__init__(app, service_name)
Expand Down Expand Up @@ -67,7 +65,7 @@ async def finalize(self, app):

total_tasks = len(self.PendingTasks) + self.NewTasks.qsize()
if total_tasks > 0:
L.warning("{}+{} pending and incompleted tasks".format(len(self.PendingTasks), self.NewTasks.qsize()))
L.warning("{}+{} pending and incomplete tasks".format(len(self.PendingTasks), self.NewTasks.qsize()))


def _main_task_exited(self, ctx):
Expand All @@ -86,41 +84,39 @@ def _main_task_exited(self, ctx):


def schedule(self, *tasks):
'''
Schedule a task (or tasks) for immediate fire-and-forget execution.
Task can be a simple coroutine, future or task.
"""
Schedule a task (or tasks) for immediate fire-and-forget execution (e.g. compressing files).
Example of use:
Examples:
```python
app.TaskService.schedule(self._start())
'''
```
"""
for task in tasks:
self.NewTasks.put_nowait(task)


def run_forever(self, *async_functions):
'''
"""
Schedule an async function (or functions) for immediate fire-and-forget execution.
The function is expected to run forever.
If function exits, the error is logged and the function is restarted.
Function is called without any argument.
Example of use:
Examples:
```python
class MyClass(object):
def __init__(self, app):
...
app.TaskService.run_forever(self.my_forever_method)
async def my_forever_method(self):
while True:
await ...
'''
```
"""
for async_fn in async_functions:
self.NewTasks.put_nowait(
forever(async_fn)
Expand Down
49 changes: 49 additions & 0 deletions docs/examples/config_getmultiline.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
author: mejroslav
commit: 347076ffa2182fde8a6292434cb60f2dcbf3d8c6
date: 2023-08-08 10:48:26+02:00
title: Config getmultiline

---

!!! example

```python title='config_getmultiline.py' linenums="1"
#!/usr/bin/env python3
import asab
import logging

#

L = logging.getLogger(__name__)

#

asab.Config.read_string(
"""
[places]
visited:
Praha
Brno
Pardubice Plzeň
"""
)


class MyApplication(asab.Application):

async def main(self):
visited = asab.Config.getmultiline("places", "visited")
unvisited = asab.Config.getmultiline("places", "unvisited", fallback=[])
nonexisting = asab.Config.getmultiline("places", "nonexisting", fallback=["Gottwaldov"])
L.log(asab.LOG_NOTICE, "Places I've already visited: {}".format(visited))
L.log(asab.LOG_NOTICE, "Places I want to visit: {}".format(unvisited))
L.log(asab.LOG_NOTICE, "Places that don't exist: {}".format(nonexisting))
self.stop()


if __name__ == "__main__":
app = MyApplication()
app.run()

```
78 changes: 57 additions & 21 deletions docs/examples/storage_elasticsearch.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
author: mejroslav
commit: 04a232b899de3bbe8c634361f5547865dea1a4c7
date: 2023-03-20 17:49:20+01:00
author: Ales Teska
commit: b4f862b4a414ed184ab7d9f27645934d07433860
date: 2023-07-11 20:25:43+02:00
title: Storage elasticsearch

---
Expand All @@ -19,7 +19,11 @@ title: Storage elasticsearch
{
'asab:storage': {
'type': 'elasticsearch',
'elasticsearch_url': 'http://localhost:9200/',
'elasticsearch_url': 'https://localhost:9200/', # enter one URL or list of URL's
'elasticsearch_username': '<username>',
'elasticsearch_password': '<password>',
# 'elasticsearch_api_key': '<your api key>',
# 'cafile': '<CA Certificate>',
}
}
)
Expand All @@ -35,51 +39,83 @@ title: Storage elasticsearch

async def main(self):
storage = self.get_service("asab.StorageService")
print("=" * 72)

# Check the connection
connected = await storage.is_connected()
if connected:
print("Connected to ElasticSearch on {}".format(storage.URL))
else:
print("Connection to {} failed".format(storage.URL))

# Obtain upsertor object which is associated with given "test-collection"
# To create new object we keep default `version` to zero
print("-" * 72)
print("Creating default id and version")
u = storage.upsertor("test-collection")
u.set("bar", {"data": "test"})
objid = await u.execute()
object_id = await u.execute()

obj = await storage.get("test-collection", objid)
print("Result of get by id '{}'".format(objid))
obj = await storage.get("test-collection", object_id)
print("-" * 72)
print("Result of get by id '{}'".format(object_id))
pprint.pprint(obj)

obj = await storage.get("test-collection", objid)
# Obtain upsertor object for update - specify existing `version` number
print("Specify version when updating")
u = storage.upsertor("test-collection", obj_id=objid, version=obj['_v'])
obj = await storage.get("test-collection", object_id)
u = storage.upsertor("test-collection", obj_id=object_id, version=obj['_v'])
print("-" * 72)
print("Updating an object with ID '{}' and version {}".format(object_id, obj['_v']))
u.set("foo", "buzz")
objid = await u.execute()
object_id = await u.execute()

obj = await storage.get("test-collection", objid)
print("Result of get by id '{}'".format(objid))
obj = await storage.get("test-collection", object_id)
print("-" * 72)
print("Result of get by id '{}'".format(object_id))
pprint.pprint(obj)

# Reindex the collection
print("-" * 72)
print("Reindexing the collection")
await storage.reindex("test-collection", "test-collection-reindex")
await storage.reindex("test-collection-reindex", "test-collection")

obj = await storage.get("test-collection-reindex", object_id)
print("-" * 72)
print("Result of get by id '{}'".format(object_id))
pprint.pprint(obj)

# Remove the reindexed collection
await storage.delete("test-collection-reindex")
print("-" * 72)
print("Deleting the entire reindexed collection")
await storage.delete("test-collection-reindex") # returns {'acknowledged': True}

# Delete the item
await storage.delete("test-collection", objid)
print("-" * 72)
print("Deleting the object with ID {}".format(object_id))
await storage.delete("test-collection", object_id)


# Insert the document with provided ObjId
print("Insert the document with provided ObjId")
print("-" * 72)
print("Insert the document with ID 'test'")
u = storage.upsertor("test-collection", "test")
u.set("foo", "bar")
objid = await u.execute()
object_id = await u.execute()

obj = await storage.get("test-collection", objid)
print("Result of get by id '{}'".format(objid))
obj = await storage.get("test-collection", object_id)
print("-" * 72)
print("Result of get by id '{}'".format(object_id))
pprint.pprint(obj)

print("-" * 72)
print("Delete the document with provided ObjId")
await storage.delete("test-collection", objid)
deleted_document = await storage.delete("test-collection", object_id)

print("-" * 72)
print("Deleted document:")
pprint.pprint(deleted_document)

print("=" * 72)

self.stop()

Expand Down
30 changes: 16 additions & 14 deletions docs/examples/task.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
author: eliska
commit: d83dcedb619098678100883d1faa15ad2b08e878
date: 2022-02-09 10:16:42+01:00
author: mejroslav
commit: b14beb6f77c41c9efef5c1a365b31057f8b29cb4
date: 2023-09-19 11:52:35+02:00
title: Task

---
Expand All @@ -11,30 +11,32 @@ title: Task
```python title='task.py' linenums="1"
#!/usr/bin/env python3
import asab
import asyncio


class MyApplication(asab.Application):

async def main(self):
task_service = app.get_service("asab.TaskService")

# Schedule tasks to be executed
# They will be executed in ~ 5 seconds
task_service.schedule(
print("Your tasks are scheduled. Meanwhile, take a deep breath and make yourself comfortable.")
self.TaskService.schedule(
self.task1(),
self.task2(),
self.task3(),
self.task3(), # throws Exception
)


async def task1(self):
print("Task1")
print("Task 1 started.")
await asyncio.sleep(5.0)
print("Task 1 is complete.")

async def task2(self):
print("Task2")
print("Task 2 started.")
await asyncio.sleep(5.0)
print("Task 2 is complete.")

async def task3(self):
print("Task3")
print("Task 3 started.")
await asyncio.sleep(5.0)
raise Exception("An exception occurred during Task 3.")


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 26093ad

Please sign in to comment.