Skip to content

Commit

Permalink
Refine NautilusKernel and TradingNode
Browse files Browse the repository at this point in the history
  • Loading branch information
cjdsellers committed Aug 13, 2023
1 parent 20afaa3 commit 0084ba4
Show file tree
Hide file tree
Showing 2 changed files with 303 additions and 272 deletions.
300 changes: 108 additions & 192 deletions nautilus_trader/live/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
from datetime import timedelta

from nautilus_trader.cache.base import CacheFacade
from nautilus_trader.common.enums import LogColor
from nautilus_trader.common.logging import Logger
from nautilus_trader.config import TradingNodeConfig
from nautilus_trader.core.correctness import PyCondition
Expand Down Expand Up @@ -273,111 +272,49 @@ def run(self) -> None:
except RuntimeError as e:
self.kernel.log.exception("Error on run", e)

def stop(self) -> None:
"""
Stop the trading node gracefully.
After a specified delay the internal `Trader` residual state will be checked.
If save strategy is configured, then strategy states will be saved.
"""
try:
if self.kernel.loop.is_running():
self.kernel.loop.create_task(self.stop_async())
else:
self.kernel.loop.run_until_complete(self.stop_async())
except RuntimeError as e:
self.kernel.log.exception("Error on stop", e)

def dispose(self) -> None: # noqa: C901
async def run_async(self) -> None:
"""
Dispose of the trading node.
Gracefully shuts down the executor and event loop.
Start and run the trading node asynchronously.
"""
try:
timeout = self.kernel.clock.utc_now() + timedelta(
seconds=self._config.timeout_disconnection,
)
while self._is_running:
time.sleep(0.1)
if self.kernel.clock.utc_now() >= timeout:
self.kernel.log.warning(
f"Timed out ({self._config.timeout_disconnection}s) waiting for node to stop."
f"\nStatus"
f"\n------"
f"\nDataEngine.check_disconnected() == {self.kernel.data_engine.check_disconnected()}"
f"\nExecEngine.check_disconnected() == {self.kernel.exec_engine.check_disconnected()}",
)
break

self.kernel.log.info("DISPOSING...")

self.kernel.log.debug(str(self.kernel.data_engine.get_cmd_queue_task()))
self.kernel.log.debug(str(self.kernel.data_engine.get_req_queue_task()))
self.kernel.log.debug(str(self.kernel.data_engine.get_res_queue_task()))
self.kernel.log.debug(str(self.kernel.data_engine.get_data_queue_task()))
self.kernel.log.debug(str(self.kernel.exec_engine.get_cmd_queue_task()))
self.kernel.log.debug(str(self.kernel.exec_engine.get_evt_queue_task()))
self.kernel.log.debug(str(self.kernel.risk_engine.get_cmd_queue_task()))
self.kernel.log.debug(str(self.kernel.risk_engine.get_evt_queue_task()))

if self.kernel.trader.is_running:
self.kernel.trader.stop()
if self.kernel.data_engine.is_running:
self.kernel.data_engine.stop()
if self.kernel.risk_engine.is_running:
self.kernel.risk_engine.stop()
if self.kernel.exec_engine.is_running:
self.kernel.exec_engine.stop()
if self.kernel.emulator.is_running:
self.kernel.emulator.stop()

self.kernel.trader.dispose()
self.kernel.data_engine.dispose()
self.kernel.risk_engine.dispose()
self.kernel.exec_engine.dispose()
self.kernel.emulator.dispose()

# Cleanup writer
if self.kernel.writer is not None:
self.kernel.writer.close()

if self.kernel.executor:
self.kernel.log.info("Shutting down executor...")
self.kernel.executor.shutdown(wait=True, cancel_futures=True)
if not self._is_built:
raise RuntimeError(
"The trading nodes clients have not been built. "
"Run `node.build()` prior to start.",
)

self.kernel.log.info("Stopping event loop...")
self.kernel.cancel_all_tasks()
self.kernel.loop.stop()
except (asyncio.CancelledError, RuntimeError) as e:
self.kernel.log.exception("Error on dispose", e)
finally:
if self.kernel.loop.is_running():
self.kernel.log.warning("Cannot close a running event loop.")
else:
self.kernel.log.info("Closing event loop...")
self.kernel.loop.close()
self._is_running = True
await self.kernel.start_async()

# Check and log if event loop is running
if self.kernel.loop.is_running():
self.kernel.log.warning(f"loop.is_running={self.kernel.loop.is_running()}")
self.kernel.log.info("RUNNING.")
else:
self.kernel.log.info(f"loop.is_running={self.kernel.loop.is_running()}")
self.kernel.log.warning("Event loop is not running.")

# Check and log if event loop is closed
if not self.kernel.loop.is_closed():
self.kernel.log.warning(f"loop.is_closed={self.kernel.loop.is_closed()}")
else:
self.kernel.log.info(f"loop.is_closed={self.kernel.loop.is_closed()}")
# Continue to run while engines are running...
tasks: list[asyncio.Task] = [
self.kernel.data_engine.get_cmd_queue_task(),
self.kernel.data_engine.get_req_queue_task(),
self.kernel.data_engine.get_res_queue_task(),
self.kernel.data_engine.get_data_queue_task(),
self.kernel.risk_engine.get_cmd_queue_task(),
self.kernel.risk_engine.get_evt_queue_task(),
self.kernel.exec_engine.get_cmd_queue_task(),
self.kernel.exec_engine.get_evt_queue_task(),
]

self.kernel.log.info("DISPOSED.")
if self._config.heartbeat_interval:
self._task_heartbeats = asyncio.create_task(
self.maintain_heartbeat(self._config.heartbeat_interval),
)
if self._config.cache and self._config.cache.snapshot_positions_interval:
self._task_position_snapshots = asyncio.create_task(
self.snapshot_open_positions(self._config.cache.snapshot_positions_interval),
)

def _loop_sig_handler(self, sig: signal.Signals) -> None:
self.kernel.log.warning(f"Received {sig!s}, shutting down...")
self.stop()
await asyncio.gather(*tasks)
except asyncio.CancelledError as e:
self.kernel.log.error(str(e))

async def maintain_heartbeat(self, interval: float) -> None:
"""
Expand Down Expand Up @@ -418,51 +355,24 @@ async def snapshot_open_positions(self, interval: float) -> None:
except asyncio.CancelledError:
pass

async def run_async(self) -> None:
"""
Start and run the trading node asynchronously.
def stop(self) -> None:
"""
try:
if not self._is_built:
raise RuntimeError(
"The trading nodes clients have not been built. "
"Run `node.build()` prior to start.",
)
Stop the trading node gracefully.
self._is_running = True
await self.kernel.start_async()
After a specified delay the internal `Trader` residual state will be checked.
If save strategy is configured, then strategy states will be saved.
"""
try:
if self.kernel.loop.is_running():
self.kernel.log.info("RUNNING.")
self.kernel.loop.create_task(self.stop_async())
else:
self.kernel.log.warning("Event loop is not running.")

# Continue to run while engines are running...
tasks: list[asyncio.Task] = [
self.kernel.data_engine.get_cmd_queue_task(),
self.kernel.data_engine.get_req_queue_task(),
self.kernel.data_engine.get_res_queue_task(),
self.kernel.data_engine.get_data_queue_task(),
self.kernel.risk_engine.get_cmd_queue_task(),
self.kernel.risk_engine.get_evt_queue_task(),
self.kernel.exec_engine.get_cmd_queue_task(),
self.kernel.exec_engine.get_evt_queue_task(),
]

if self._config.heartbeat_interval:
self._task_heartbeats = asyncio.create_task(
self.maintain_heartbeat(self._config.heartbeat_interval),
)
if self._config.cache and self._config.cache.snapshot_positions_interval:
self._task_position_snapshots = asyncio.create_task(
self.snapshot_open_positions(self._config.cache.snapshot_positions_interval),
)

await asyncio.gather(*tasks)
except asyncio.CancelledError as e:
self.kernel.log.error(str(e))
self.kernel.loop.run_until_complete(self.stop_async())
except RuntimeError as e:
self.kernel.log.exception("Error on stop", e)

async def stop_async(self) -> None: # noqa (too complex)
async def stop_async(self) -> None:
"""
Stop the trading node gracefully, asynchronously.
Expand All @@ -483,70 +393,76 @@ async def stop_async(self) -> None: # noqa (too complex)
self._task_position_snapshots.cancel()
self._task_position_snapshots = None

if self.kernel.trader.is_running:
self.kernel.trader.stop()
self.kernel.log.info(
f"Awaiting post stop ({self._config.timeout_post_stop}s timeout)...",
color=LogColor.BLUE,
)
await asyncio.sleep(self._config.timeout_post_stop)
self.kernel.trader.check_residuals()
await self.kernel.stop_async()

if self.kernel.save_state:
self.kernel.trader.save()
self._is_running = False

# Disconnect all clients
self.kernel.data_engine.disconnect()
self.kernel.exec_engine.disconnect()
def dispose(self) -> None:
"""
Dispose of the trading node.
self.kernel.log.info(
f"Awaiting engine disconnections "
f"({self._config.timeout_disconnection}s timeout)...",
color=LogColor.BLUE,
)
if not await self._await_engines_disconnected():
self.kernel.log.error(
f"Timed out ({self._config.timeout_disconnection}s) waiting for engines to disconnect."
f"\nStatus"
f"\n------"
f"\nDataEngine.check_disconnected() == {self.kernel.data_engine.check_disconnected()}"
f"\nExecEngine.check_disconnected() == {self.kernel.exec_engine.check_disconnected()}",
Gracefully shuts down the executor and event loop.
"""
try:
timeout = self.kernel.clock.utc_now() + timedelta(
seconds=self._config.timeout_disconnection,
)
while self._is_running:
time.sleep(0.1)
if self.kernel.clock.utc_now() >= timeout:
self.kernel.log.warning(
f"Timed out ({self._config.timeout_disconnection}s) waiting for node to stop."
f"\nStatus"
f"\n------"
f"\nDataEngine.check_disconnected() == {self.kernel.data_engine.check_disconnected()}"
f"\nExecEngine.check_disconnected() == {self.kernel.exec_engine.check_disconnected()}",
)
break

if self.kernel.data_engine.is_running:
self.kernel.data_engine.stop()
if self.kernel.risk_engine.is_running:
self.kernel.risk_engine.stop()
if self.kernel.exec_engine.is_running:
self.kernel.exec_engine.stop()
if self.kernel.emulator.is_running:
self.kernel.emulator.stop()
self.kernel.log.info("DISPOSING...")

# Clean up remaining timers
timer_names = self.kernel.clock.timer_names
self.kernel.clock.cancel_timers()
self.kernel.log.debug(str(self.kernel.data_engine.get_cmd_queue_task()))
self.kernel.log.debug(str(self.kernel.data_engine.get_req_queue_task()))
self.kernel.log.debug(str(self.kernel.data_engine.get_res_queue_task()))
self.kernel.log.debug(str(self.kernel.data_engine.get_data_queue_task()))
self.kernel.log.debug(str(self.kernel.exec_engine.get_cmd_queue_task()))
self.kernel.log.debug(str(self.kernel.exec_engine.get_evt_queue_task()))
self.kernel.log.debug(str(self.kernel.risk_engine.get_cmd_queue_task()))
self.kernel.log.debug(str(self.kernel.risk_engine.get_evt_queue_task()))

for name in timer_names:
self.kernel.log.info(f"Canceled Timer(name={name}).")
self.kernel.dispose()

# Flush writer
if self.kernel.writer is not None:
self.kernel.writer.flush()
if self.kernel.executor:
self.kernel.log.info("Shutting down executor...")
self.kernel.executor.shutdown(wait=True, cancel_futures=True)

self.kernel.log.info("STOPPED.")
self._is_running = False
self.kernel.log.info("Stopping event loop...")
self.kernel.cancel_all_tasks()
self.kernel.loop.stop()
except (asyncio.CancelledError, RuntimeError) as e:
self.kernel.log.exception("Error on dispose", e)
finally:
if self.kernel.loop.is_running():
self.kernel.log.warning("Cannot close a running event loop.")
else:
self.kernel.log.info("Closing event loop...")
self.kernel.loop.close()

async def _await_engines_disconnected(self) -> bool:
seconds = self._config.timeout_disconnection
timeout: timedelta = self.kernel.clock.utc_now() + timedelta(seconds=seconds)
while True:
await asyncio.sleep(0)
if self.kernel.clock.utc_now() >= timeout:
return False
if not self.kernel.data_engine.check_disconnected():
continue
if not self.kernel.exec_engine.check_disconnected():
continue
break

return True # Engines disconnected
# Check and log if event loop is running
if self.kernel.loop.is_running():
self.kernel.log.warning(f"loop.is_running={self.kernel.loop.is_running()}")
else:
self.kernel.log.info(f"loop.is_running={self.kernel.loop.is_running()}")

# Check and log if event loop is closed
if not self.kernel.loop.is_closed():
self.kernel.log.warning(f"loop.is_closed={self.kernel.loop.is_closed()}")
else:
self.kernel.log.info(f"loop.is_closed={self.kernel.loop.is_closed()}")

self.kernel.log.info("DISPOSED.")

def _loop_sig_handler(self, sig: signal.Signals) -> None:
self.kernel.log.warning(f"Received {sig!s}, shutting down...")
self.stop()
Loading

0 comments on commit 0084ba4

Please sign in to comment.