Skip to content

Commit

Permalink
Refine ActorExecutor
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Aug 5, 2023
1 parent 077772d commit f237ad1
Show file tree
Hide file tree
Showing 3 changed files with 203 additions and 32 deletions.
2 changes: 2 additions & 0 deletions nautilus_trader/common/actor.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ cdef class Actor(Component):
cpdef void update_synthetic(self, SyntheticInstrument synthetic)
cpdef run_in_executor(self, func, tuple args=*, dict kwargs=*)
cpdef queue_for_executor(self, func, tuple args=*, dict kwargs=*)
cpdef list queued_task_ids(self)
cpdef list active_task_ids(self)
cpdef bint has_queued_tasks(self)
cpdef bint has_active_tasks(self)
cpdef void cancel_task(self, task_id)
cpdef void cancel_all_tasks(self)
Expand Down
63 changes: 61 additions & 2 deletions nautilus_trader/common/actor.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ cdef class Actor(Component):
----------
func : Callable
The function to be executed.
args : variable length argument list
args : positional arguments
The positional arguments for the call to `func`.
kwargs : arbitrary keyword arguments
The keyword arguments for the call to `func`.
Expand Down Expand Up @@ -793,7 +793,7 @@ cdef class Actor(Component):
----------
func : Callable
The function to be executed.
args : variable length argument list
args : positional arguments
The positional arguments for the call to `func`.
kwargs : arbitrary keyword arguments
The keyword arguments for the call to `func`.
Expand Down Expand Up @@ -832,25 +832,84 @@ cdef class Actor(Component):
self._log.info(f"Queued {task_id}.", LogColor.BLUE)
return task_id

cpdef list queued_task_ids(self):
"""
Return the queued task identifiers.
Returns
-------
list[TaskId]
"""
if self._executor is None:
return [] # Tasks are immediately executed

return self._executor.queued_task_ids()

cpdef list active_task_ids(self):
"""
Return the active task identifiers.
Returns
-------
list[TaskId]
"""
if self._executor is None:
return [] # Tasks are immediately executed

return self._executor.active_task_ids()

cpdef bint has_queued_tasks(self):
"""
Return a value indicating whether there are any queued tasks.
Returns
-------
bool
"""
if self._executor is None:
return False

return self._executor.has_queued_tasks()

cpdef bint has_active_tasks(self):
"""
Return a value indicating whether there are any active tasks.
Returns
-------
bool
"""
if self._executor is None:
return False

return self._executor.has_active_tasks()

cpdef void cancel_task(self, task_id: TaskId):
"""
Cancel the task with the given `task_id` (if queued or active).
If the task is not found then a warning is logged.
Parameters
----------
task_id : TaskId
The task identifier.
"""
if self._executor is None:
self._log.warning(f"Executor: {task_id} not found.")
return

self._executor.cancel_task(task_id)

cpdef void cancel_all_tasks(self):
"""
Cancel all queued and active tasks.
"""
if self._executor is None:
return

Expand Down
170 changes: 140 additions & 30 deletions nautilus_trader/common/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class TaskId:
Represents the identifier for a task executing as a `asyncio.Future`.
This also corresponds to the future objects memory address, unless the task was
queued, in which case it is a random integer.
queued, in which case it is a pre-assigned random integer.
"""

Expand Down Expand Up @@ -72,6 +72,7 @@ def __init__(
self._log: LoggerAdapter = logger

self._active_tasks: dict[TaskId, Future[Any]] = {}
self._queued_tasks: set[TaskId] = set()

self._queue: Queue = Queue()
self._worker_task = self._loop.create_task(self._worker())
Expand All @@ -80,17 +81,46 @@ async def _worker(self) -> None:
try:
while True:
task_id, func, args, kwargs = await self._queue.get()
partial_func = functools.partial(func, *args, **kwargs)
task: Future[Any] = self._loop.run_in_executor(self._executor, partial_func)
task.add_done_callback(self._remove_done_task)
self._log.debug(f"Executor: scheduled {task} ...")
if task_id not in self._queued_tasks:
continue # Already canceled

task = self._submit_to_executor(func, *args, **kwargs)

# Use pre-assigned task_id
self._active_tasks[task_id] = task
await asyncio.wrap_future(task)
self._log.debug(f"Executor: scheduled {task_id}, {task} ...")

# Sequentially execute tasks
await asyncio.wrap_future(self._active_tasks[task_id])
self._queue.task_done()
except asyncio.CancelledError:
self._log.debug("Executor: worker task canceled.")

def _remove_done_task(self, task: Future[Any]) -> None:
task_id = TaskId(id(task))
for active_task_id, active_task in self._active_tasks.items():
if task == active_task:
task_id = active_task_id

if task.done():
try:
if task.exception() is not None:
self._log.error(f"Exception in {task_id}: {task.exception()}")
except asyncio.CancelledError:
self._log.info(f"Task {task_id} was canceled.")
self._active_tasks.pop(task_id, None)

def _submit_to_executor(
self,
func: Callable[..., Any],
*args: Any,
**kwargs: Any,
) -> Future[Any]:
partial_func = functools.partial(func, *args, **kwargs)
task: Future[Any] = self._loop.run_in_executor(self._executor, partial_func)
task.add_done_callback(self._remove_done_task)
return task

def queue_for_executor(
self,
func: Callable[..., Any],
Expand All @@ -99,9 +129,25 @@ def queue_for_executor(
) -> TaskId:
"""
Enqueue the given callable to be executed sequentially.
Parameters
----------
func : Callable
The function to be executed.
args : positional arguments
The positional arguments for the call to `func`.
kwargs : arbitrary keyword arguments
The keyword arguments for the call to `func`.
Returns
-------
TaskId
"""
task_id = TaskId(id(uuid.uuid4()))
self._queue.put_nowait((task_id, func, args, kwargs))
self._queued_tasks.add(task_id)

return task_id

def run_in_executor(
Expand All @@ -110,48 +156,112 @@ def run_in_executor(
*args: Any,
**kwargs: Any,
) -> TaskId:
"""
Arrange for the given callable to be executed.
Parameters
----------
func : Callable
The function to be executed.
args : positional arguments
The positional arguments for the call to `func`.
kwargs : arbitrary keyword arguments
The keyword arguments for the call to `func`.
Returns
-------
TaskId
"""
self._log.info(f"Executor: {type(func).__name__}({args=}, {kwargs=})")
partial_func = functools.partial(func, *args, **kwargs)
task: Future[Any] = self._loop.run_in_executor(self._executor, partial_func)
task.add_done_callback(self._remove_done_task)
self._log.debug(f"Executor: scheduled {task} ...")
task = self._submit_to_executor(func, *args, **kwargs)

task_id = TaskId(id(task))
self._active_tasks[task_id] = task
self._log.debug(f"Executor: scheduled {task_id}, {task} ...")

return task_id

def queued_task_ids(self) -> list[TaskId]:
"""
Return the queued task identifiers.
Returns
-------
list[TaskId]
"""
return list(self._queued_tasks)

def active_task_ids(self) -> list[TaskId]:
"""
Return the active task identifiers.
Returns
-------
list[TaskId]
"""
return list(self._active_tasks.keys())

def has_queued_tasks(self) -> bool:
"""
Return a value indicating whether there are any queued tasks.
Returns
-------
bool
"""
return bool(self._queued_tasks)

def has_active_tasks(self) -> bool:
"""
Return a value indicating whether there are any active tasks.
Returns
-------
bool
"""
return bool(self._active_tasks)

def cancel_task(self, task_id: TaskId) -> None:
"""
Cancel the task with the given `task_id` (if queued or active).
If the task is not found then a warning is logged.
Parameters
----------
task_id : TaskId
The active task identifier.
"""
if task_id in self._queued_tasks:
self._queued_tasks.discard(task_id)
self._log.info(f"Executor: {task_id} canceled prior to execution.")
return

future: Future | None = self._active_tasks.get(task_id)
if not future:
self._log.warning(f"Executor: {task_id} not found.")
return

result = future.cancel()
self._log.info(f"Executor: {task_id} canceled {result}.")

def cancel_all_tasks(self) -> None:
"""
Cancel all active and queued tasks.
"""
# Drain queue
while not self._queue.empty():
task_id, _, _, _ = self._queue.get_nowait()
self._log.info(f"Executor: {task_id} dequeued prior to execution.")

if self._worker_task is not None:
self._worker_task.cancel()

for task in self._active_tasks:
self.cancel_task(task)

def active_task_ids(self) -> list[TaskId]:
return list(self._active_tasks.keys())

def has_active_tasks(self) -> bool:
return bool(self._active_tasks)

def _remove_done_task(self, task: Future[Any]) -> None:
task_id = TaskId(id(task))
for active_task_id, active_task in self._active_tasks.items():
if task == active_task:
task_id = active_task_id

if task.done():
try:
if task.exception() is not None:
self._log.error(f"Exception in {task_id}: {task.exception()}")
except asyncio.CancelledError:
self._log.info(f"Task {task_id} was cancelled.")
self._active_tasks.pop(task_id, None)

0 comments on commit f237ad1

Please sign in to comment.