From f237ad1df2dac0b46891745ff6be17722c4abf43 Mon Sep 17 00:00:00 2001 From: Chris Sellers Date: Sat, 5 Aug 2023 23:32:10 +1000 Subject: [PATCH] Refine ActorExecutor --- nautilus_trader/common/actor.pxd | 2 + nautilus_trader/common/actor.pyx | 63 ++++++++++- nautilus_trader/common/executor.py | 170 ++++++++++++++++++++++++----- 3 files changed, 203 insertions(+), 32 deletions(-) diff --git a/nautilus_trader/common/actor.pxd b/nautilus_trader/common/actor.pxd index 028f85c8929..d2699335a85 100644 --- a/nautilus_trader/common/actor.pxd +++ b/nautilus_trader/common/actor.pxd @@ -113,7 +113,9 @@ cdef class Actor(Component): cpdef void update_synthetic(self, SyntheticInstrument synthetic) cpdef run_in_executor(self, func, tuple args=*, dict kwargs=*) cpdef queue_for_executor(self, func, tuple args=*, dict kwargs=*) + cpdef list queued_task_ids(self) cpdef list active_task_ids(self) + cpdef bint has_queued_tasks(self) cpdef bint has_active_tasks(self) cpdef void cancel_task(self, task_id) cpdef void cancel_all_tasks(self) diff --git a/nautilus_trader/common/actor.pyx b/nautilus_trader/common/actor.pyx index 3a2a5ba77c9..cd0fc3469dd 100644 --- a/nautilus_trader/common/actor.pyx +++ b/nautilus_trader/common/actor.pyx @@ -735,7 +735,7 @@ cdef class Actor(Component): ---------- func : Callable The function to be executed. - args : variable length argument list + args : positional arguments The positional arguments for the call to `func`. kwargs : arbitrary keyword arguments The keyword arguments for the call to `func`. @@ -793,7 +793,7 @@ cdef class Actor(Component): ---------- func : Callable The function to be executed. - args : variable length argument list + args : positional arguments The positional arguments for the call to `func`. kwargs : arbitrary keyword arguments The keyword arguments for the call to `func`. @@ -832,25 +832,84 @@ cdef class Actor(Component): self._log.info(f"Queued {task_id}.", LogColor.BLUE) return task_id + cpdef list queued_task_ids(self): + """ + Return the queued task identifiers. + + Returns + ------- + list[TaskId] + + """ + if self._executor is None: + return [] # Tasks are immediately executed + + return self._executor.queued_task_ids() + cpdef list active_task_ids(self): + """ + Return the active task identifiers. + + Returns + ------- + list[TaskId] + + """ if self._executor is None: return [] # Tasks are immediately executed return self._executor.active_task_ids() + cpdef bint has_queued_tasks(self): + """ + Return a value indicating whether there are any queued tasks. + + Returns + ------- + bool + + """ + if self._executor is None: + return False + + return self._executor.has_queued_tasks() + cpdef bint has_active_tasks(self): + """ + Return a value indicating whether there are any active tasks. + + Returns + ------- + bool + + """ if self._executor is None: return False return self._executor.has_active_tasks() cpdef void cancel_task(self, task_id: TaskId): + """ + Cancel the task with the given `task_id` (if queued or active). + + If the task is not found then a warning is logged. + + Parameters + ---------- + task_id : TaskId + The task identifier. + + """ if self._executor is None: + self._log.warning(f"Executor: {task_id} not found.") return self._executor.cancel_task(task_id) cpdef void cancel_all_tasks(self): + """ + Cancel all queued and active tasks. + """ if self._executor is None: return diff --git a/nautilus_trader/common/executor.py b/nautilus_trader/common/executor.py index f76473f3367..a5479fe2dde 100644 --- a/nautilus_trader/common/executor.py +++ b/nautilus_trader/common/executor.py @@ -34,7 +34,7 @@ class TaskId: Represents the identifier for a task executing as a `asyncio.Future`. This also corresponds to the future objects memory address, unless the task was - queued, in which case it is a random integer. + queued, in which case it is a pre-assigned random integer. """ @@ -72,6 +72,7 @@ def __init__( self._log: LoggerAdapter = logger self._active_tasks: dict[TaskId, Future[Any]] = {} + self._queued_tasks: set[TaskId] = set() self._queue: Queue = Queue() self._worker_task = self._loop.create_task(self._worker()) @@ -80,17 +81,46 @@ async def _worker(self) -> None: try: while True: task_id, func, args, kwargs = await self._queue.get() - partial_func = functools.partial(func, *args, **kwargs) - task: Future[Any] = self._loop.run_in_executor(self._executor, partial_func) - task.add_done_callback(self._remove_done_task) - self._log.debug(f"Executor: scheduled {task} ...") + if task_id not in self._queued_tasks: + continue # Already canceled + task = self._submit_to_executor(func, *args, **kwargs) + + # Use pre-assigned task_id self._active_tasks[task_id] = task - await asyncio.wrap_future(task) + self._log.debug(f"Executor: scheduled {task_id}, {task} ...") + + # Sequentially execute tasks + await asyncio.wrap_future(self._active_tasks[task_id]) self._queue.task_done() except asyncio.CancelledError: self._log.debug("Executor: worker task canceled.") + def _remove_done_task(self, task: Future[Any]) -> None: + task_id = TaskId(id(task)) + for active_task_id, active_task in self._active_tasks.items(): + if task == active_task: + task_id = active_task_id + + if task.done(): + try: + if task.exception() is not None: + self._log.error(f"Exception in {task_id}: {task.exception()}") + except asyncio.CancelledError: + self._log.info(f"Task {task_id} was canceled.") + self._active_tasks.pop(task_id, None) + + def _submit_to_executor( + self, + func: Callable[..., Any], + *args: Any, + **kwargs: Any, + ) -> Future[Any]: + partial_func = functools.partial(func, *args, **kwargs) + task: Future[Any] = self._loop.run_in_executor(self._executor, partial_func) + task.add_done_callback(self._remove_done_task) + return task + def queue_for_executor( self, func: Callable[..., Any], @@ -99,9 +129,25 @@ def queue_for_executor( ) -> TaskId: """ Enqueue the given callable to be executed sequentially. + + Parameters + ---------- + func : Callable + The function to be executed. + args : positional arguments + The positional arguments for the call to `func`. + kwargs : arbitrary keyword arguments + The keyword arguments for the call to `func`. + + Returns + ------- + TaskId + """ task_id = TaskId(id(uuid.uuid4())) self._queue.put_nowait((task_id, func, args, kwargs)) + self._queued_tasks.add(task_id) + return task_id def run_in_executor( @@ -110,48 +156,112 @@ def run_in_executor( *args: Any, **kwargs: Any, ) -> TaskId: + """ + Arrange for the given callable to be executed. + + Parameters + ---------- + func : Callable + The function to be executed. + args : positional arguments + The positional arguments for the call to `func`. + kwargs : arbitrary keyword arguments + The keyword arguments for the call to `func`. + + Returns + ------- + TaskId + + """ self._log.info(f"Executor: {type(func).__name__}({args=}, {kwargs=})") - partial_func = functools.partial(func, *args, **kwargs) - task: Future[Any] = self._loop.run_in_executor(self._executor, partial_func) - task.add_done_callback(self._remove_done_task) - self._log.debug(f"Executor: scheduled {task} ...") + task = self._submit_to_executor(func, *args, **kwargs) task_id = TaskId(id(task)) self._active_tasks[task_id] = task + self._log.debug(f"Executor: scheduled {task_id}, {task} ...") return task_id + def queued_task_ids(self) -> list[TaskId]: + """ + Return the queued task identifiers. + + Returns + ------- + list[TaskId] + + """ + return list(self._queued_tasks) + + def active_task_ids(self) -> list[TaskId]: + """ + Return the active task identifiers. + + Returns + ------- + list[TaskId] + + """ + return list(self._active_tasks.keys()) + + def has_queued_tasks(self) -> bool: + """ + Return a value indicating whether there are any queued tasks. + + Returns + ------- + bool + + """ + return bool(self._queued_tasks) + + def has_active_tasks(self) -> bool: + """ + Return a value indicating whether there are any active tasks. + + Returns + ------- + bool + + """ + return bool(self._active_tasks) + def cancel_task(self, task_id: TaskId) -> None: + """ + Cancel the task with the given `task_id` (if queued or active). + + If the task is not found then a warning is logged. + + Parameters + ---------- + task_id : TaskId + The active task identifier. + + """ + if task_id in self._queued_tasks: + self._queued_tasks.discard(task_id) + self._log.info(f"Executor: {task_id} canceled prior to execution.") + return + future: Future | None = self._active_tasks.get(task_id) if not future: self._log.warning(f"Executor: {task_id} not found.") return + result = future.cancel() self._log.info(f"Executor: {task_id} canceled {result}.") def cancel_all_tasks(self) -> None: + """ + Cancel all active and queued tasks. + """ + # Drain queue + while not self._queue.empty(): + task_id, _, _, _ = self._queue.get_nowait() + self._log.info(f"Executor: {task_id} dequeued prior to execution.") + if self._worker_task is not None: self._worker_task.cancel() for task in self._active_tasks: self.cancel_task(task) - - def active_task_ids(self) -> list[TaskId]: - return list(self._active_tasks.keys()) - - def has_active_tasks(self) -> bool: - return bool(self._active_tasks) - - def _remove_done_task(self, task: Future[Any]) -> None: - task_id = TaskId(id(task)) - for active_task_id, active_task in self._active_tasks.items(): - if task == active_task: - task_id = active_task_id - - if task.done(): - try: - if task.exception() is not None: - self._log.error(f"Exception in {task_id}: {task.exception()}") - except asyncio.CancelledError: - self._log.info(f"Task {task_id} was cancelled.") - self._active_tasks.pop(task_id, None)