From dfa991e47ca76fc68cc9a96c8a019d68ca5a02e9 Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 23 Feb 2022 14:11:25 +0100 Subject: [PATCH] Drop Tornado --- .github/workflows/ci.yml | 1 + .github/workflows/downstream.yml | 1 + examples/embedding/inprocess_qtconsole.py | 36 ----------- examples/embedding/inprocess_terminal.py | 36 ----------- ipykernel/control.py | 10 +-- ipykernel/debugger.py | 21 ++++--- ipykernel/eventloops.py | 4 +- ipykernel/inprocess/tests/test_kernel.py | 40 ------------ ipykernel/iostream.py | 25 +++++--- ipykernel/ipkernel.py | 10 ++- ipykernel/kernelapp.py | 77 +++++------------------ ipykernel/kernelbase.py | 44 +++++++------ ipykernel/tests/test_eventloop.py | 4 -- ipykernel/tests/test_io.py | 3 +- ipykernel/trio_runner.py | 4 +- ipykernel/zmqstream.py | 35 +++++++++++ 16 files changed, 121 insertions(+), 230 deletions(-) create mode 100644 ipykernel/zmqstream.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 151dd3e9f..f63ca9326 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -38,6 +38,7 @@ jobs: - name: Install the Python dependencies run: | pip install .[test] codecov + pip install https://github.com/davidbrochart/jupyter_client/archive/async_stream.zip - name: Install matplotlib if: ${{ !startsWith(matrix.os, 'macos') && !startsWith(matrix.python-version, 'pypy') }} diff --git a/.github/workflows/downstream.yml b/.github/workflows/downstream.yml index 061299943..181cfd9f5 100644 --- a/.github/workflows/downstream.yml +++ b/.github/workflows/downstream.yml @@ -68,4 +68,5 @@ jobs: git clone https://github.com/jupyter/jupyter_kernel_test.git cd jupyter_kernel_test pip install -e ".[test]" + pip install https://github.com/davidbrochart/jupyter_client/archive/async_stream.zip python test_ipykernel.py diff --git a/examples/embedding/inprocess_qtconsole.py b/examples/embedding/inprocess_qtconsole.py index 55e514343..e964d1356 100644 --- a/examples/embedding/inprocess_qtconsole.py +++ b/examples/embedding/inprocess_qtconsole.py @@ -1,7 +1,5 @@ import os -import sys -import tornado from IPython.lib import guisupport from qtconsole.inprocess import QtInProcessKernelManager from qtconsole.rich_ipython_widget import RichIPythonWidget @@ -11,44 +9,10 @@ def print_process_id(): print("Process ID is:", os.getpid()) -def init_asyncio_patch(): - """set default asyncio policy to be compatible with tornado - Tornado 6 (at least) is not compatible with the default - asyncio implementation on Windows - Pick the older SelectorEventLoopPolicy on Windows - if the known-incompatible default policy is in use. - do this as early as possible to make it a low priority and overrideable - ref: https://github.com/tornadoweb/tornado/issues/2608 - FIXME: if/when tornado supports the defaults in asyncio, - remove and bump tornado requirement for py38 - """ - if ( - sys.platform.startswith("win") - and sys.version_info >= (3, 8) - and tornado.version_info < (6, 1) - ): - import asyncio - - try: - from asyncio import ( - WindowsProactorEventLoopPolicy, - WindowsSelectorEventLoopPolicy, - ) - except ImportError: - pass - # not affected - else: - if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy: - # WindowsProactorEventLoopPolicy is not compatible with tornado 6 - # fallback to the pre-3.8 default of Selector - asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy()) - - def main(): # Print the ID of the main process print_process_id() - init_asyncio_patch() app = guisupport.get_app_qt4() # Create an in-process kernel diff --git a/examples/embedding/inprocess_terminal.py b/examples/embedding/inprocess_terminal.py index 4121619d0..8cfaefc7c 100644 --- a/examples/embedding/inprocess_terminal.py +++ b/examples/embedding/inprocess_terminal.py @@ -1,7 +1,5 @@ import os -import sys -import tornado from jupyter_console.ptshell import ZMQTerminalInteractiveShell from ipykernel.inprocess.manager import InProcessKernelManager @@ -11,46 +9,12 @@ def print_process_id(): print("Process ID is:", os.getpid()) -def init_asyncio_patch(): - """set default asyncio policy to be compatible with tornado - Tornado 6 (at least) is not compatible with the default - asyncio implementation on Windows - Pick the older SelectorEventLoopPolicy on Windows - if the known-incompatible default policy is in use. - do this as early as possible to make it a low priority and overrideable - ref: https://github.com/tornadoweb/tornado/issues/2608 - FIXME: if/when tornado supports the defaults in asyncio, - remove and bump tornado requirement for py38 - """ - if ( - sys.platform.startswith("win") - and sys.version_info >= (3, 8) - and tornado.version_info < (6, 1) - ): - import asyncio - - try: - from asyncio import ( - WindowsProactorEventLoopPolicy, - WindowsSelectorEventLoopPolicy, - ) - except ImportError: - pass - # not affected - else: - if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy: - # WindowsProactorEventLoopPolicy is not compatible with tornado 6 - # fallback to the pre-3.8 default of Selector - asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy()) - - def main(): print_process_id() # Create an in-process kernel # >>> print_process_id() # will print the same process ID as the main process - init_asyncio_patch() kernel_manager = InProcessKernelManager() kernel_manager.start_kernel() kernel = kernel_manager.kernel diff --git a/ipykernel/control.py b/ipykernel/control.py index 1aaf9a7e8..a88ed23ca 100644 --- a/ipykernel/control.py +++ b/ipykernel/control.py @@ -1,19 +1,19 @@ +import asyncio from threading import Thread -from tornado.ioloop import IOLoop - class ControlThread(Thread): def __init__(self, **kwargs): Thread.__init__(self, name="Control", **kwargs) - self.io_loop = IOLoop(make_current=False) self.pydev_do_not_trace = True self.is_pydev_daemon_thread = True + self.io_loop = asyncio.new_event_loop() def run(self): self.name = "Control" + asyncio.set_event_loop(self.io_loop) try: - self.io_loop.start() + self.io_loop.run_forever() finally: self.io_loop.close() @@ -22,4 +22,4 @@ def stop(self): This method is threadsafe. """ - self.io_loop.add_callback(self.io_loop.stop) + self.io_loop.call_soon_threadsafe(self.io_loop.stop) diff --git a/ipykernel/debugger.py b/ipykernel/debugger.py index 7d0f3054e..db62a0214 100644 --- a/ipykernel/debugger.py +++ b/ipykernel/debugger.py @@ -1,3 +1,4 @@ +import asyncio import os import re import sys @@ -6,8 +7,6 @@ import zmq from IPython.core.getipython import get_ipython from IPython.core.inputtransformer2 import leading_empty_lines -from tornado.locks import Event -from tornado.queues import Queue from zmq.utils import jsonapi try: @@ -86,11 +85,13 @@ class DebugpyMessageQueue: SEPARATOR = "\r\n\r\n" SEPARATOR_LENGTH = 4 + message_queue: asyncio.Queue[t.Dict[str, t.Any]] + def __init__(self, event_callback, log): self.tcp_buffer = "" self._reset_tcp_pos() self.event_callback = event_callback - self.message_queue: Queue[t.Any] = Queue() + self.message_queue = asyncio.Queue() self.log = log def _reset_tcp_pos(self): @@ -171,7 +172,7 @@ def __init__(self, log, debugpy_stream, event_callback): self.debugpy_port = -1 self.routing_id = None self.wait_for_attach = True - self.init_event = Event() + self.init_event = asyncio.Event() self.init_event_seq = -1 def _get_endpoint(self): @@ -245,7 +246,7 @@ def connect_tcp_socket(self): def disconnect_tcp_socket(self): self.debugpy_stream.socket.disconnect(self._get_endpoint()) self.routing_id = None - self.init_event = Event() + self.init_event = asyncio.Event() self.init_event_seq = -1 self.wait_for_attach = True @@ -278,6 +279,8 @@ class Debugger: "configurationDone", ] + stopped_queue: asyncio.Queue[t.Dict[str, t.Any]] + # Requests that can be handled even if the debugger is not running static_debug_msg_types = ["debugInfo", "inspectVariables", "richInspectVariables", "modules"] @@ -291,7 +294,7 @@ def __init__( self.is_started = False self.event_callback = event_callback self.just_my_code = just_my_code - self.stopped_queue: Queue[t.Any] = Queue() + self.stopped_queue = asyncio.Queue() self.started_debug_handlers = {} for msg_type in Debugger.started_debug_msg_types: @@ -367,7 +370,7 @@ async def handle_stopped_event(self): def tcp_client(self): return self.debugpy_client - def start(self): + async def start(self): if not self.debugpy_initialized: tmp_dir = get_tmp_directory() if not os.path.exists(tmp_dir): @@ -384,7 +387,7 @@ def start(self): (self.shell_socket.getsockopt(ROUTING_ID)), ) - ident, msg = self.session.recv(self.shell_socket, mode=0) + ident, msg = await self.session.async_recv(self.shell_socket, mode=0) self.debugpy_initialized = msg["content"]["status"] == "ok" # Don't remove leading empty lines when debugging so the breakpoints are correctly positioned @@ -631,7 +634,7 @@ async def process_request(self, message): if self.is_started: self.log.info("The debugger has already started") else: - self.is_started = self.start() + self.is_started = await self.start() if self.is_started: self.log.info("The debugger has started") else: diff --git a/ipykernel/eventloops.py b/ipykernel/eventloops.py index fdc5ddd60..a8e227e10 100644 --- a/ipykernel/eventloops.py +++ b/ipykernel/eventloops.py @@ -190,7 +190,7 @@ def on_timer(self, event): self.func() # We need a custom wx.App to create our Frame subclass that has the - # wx.Timer to defer back to the tornado event loop. + # wx.Timer to defer back to the event loop. class IPWxApp(wx.App): def OnInit(self): self.frame = TimerFrame(wake) @@ -378,7 +378,7 @@ def loop_asyncio(kernel): import asyncio loop = asyncio.get_event_loop() - # loop is already running (e.g. tornado 5), nothing left to do + # loop is already running, nothing left to do if loop.is_running(): return diff --git a/ipykernel/inprocess/tests/test_kernel.py b/ipykernel/inprocess/tests/test_kernel.py index 2a2311fe0..b04ae94f5 100644 --- a/ipykernel/inprocess/tests/test_kernel.py +++ b/ipykernel/inprocess/tests/test_kernel.py @@ -8,7 +8,6 @@ from io import StringIO import pytest -import tornado from IPython.utils.io import capture_output from jupyter_client.session import Session @@ -20,44 +19,6 @@ orig_msg = Session.msg -def _init_asyncio_patch(): - """set default asyncio policy to be compatible with tornado - - Tornado 6 (at least) is not compatible with the default - asyncio implementation on Windows - - Pick the older SelectorEventLoopPolicy on Windows - if the known-incompatible default policy is in use. - - do this as early as possible to make it a low priority and overrideable - - ref: https://github.com/tornadoweb/tornado/issues/2608 - - FIXME: if/when tornado supports the defaults in asyncio, - remove and bump tornado requirement for py38 - """ - if ( - sys.platform.startswith("win") - and sys.version_info >= (3, 8) - and tornado.version_info < (6, 1) - ): - import asyncio - - try: - from asyncio import ( - WindowsProactorEventLoopPolicy, - WindowsSelectorEventLoopPolicy, - ) - except ImportError: - pass - # not affected - else: - if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy: - # WindowsProactorEventLoopPolicy is not compatible with tornado 6 - # fallback to the pre-3.8 default of Selector - asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy()) - - def _inject_cell_id(_self, *args, **kwargs): """ This patch jupyter_client.session:Session.msg to add a cell_id to the return message metadata @@ -80,7 +41,6 @@ def patch_cell_id(): class InProcessKernelTestCase(unittest.TestCase): def setUp(self): - _init_asyncio_patch() self.km = InProcessKernelManager() self.km.start_kernel() self.kc = self.km.client() diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index aeaf38e02..edfdc6433 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -3,6 +3,7 @@ # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. +import asyncio import atexit import io import os @@ -18,8 +19,8 @@ import zmq from jupyter_client.session import extract_header -from tornado.ioloop import IOLoop -from zmq.eventloop.zmqstream import ZMQStream + +from .zmqstream import ZMQStream # ----------------------------------------------------------------------------- # Globals @@ -57,7 +58,7 @@ def __init__(self, socket, pipe=False): self.background_socket = BackgroundSocket(self) self._master_pid = os.getpid() self._pipe_flag = pipe - self.io_loop = IOLoop(make_current=False) + self.io_loop = asyncio.new_event_loop() if pipe: self._setup_pipe_in() self._local = threading.local() @@ -72,8 +73,11 @@ def __init__(self, socket, pipe=False): def _thread_main(self): """The inner loop that's actually run in a thread""" - self.io_loop.start() - self.io_loop.close(all_fds=True) + asyncio.set_event_loop(self.io_loop) + try: + self.io_loop.run_forever() + finally: + self.io_loop.close() def _setup_event_pipe(self): """Create the PULL socket listening for events that should fire in this thread.""" @@ -181,13 +185,14 @@ def stop(self): """Stop the IOPub thread""" if not self.thread.is_alive(): return - self.io_loop.add_callback(self.io_loop.stop) - self.thread.join() + self.io_loop.call_soon_threadsafe(self.io_loop.stop) + # self.thread.join() # close *all* event pipes, created in any thread # event pipes can only be used from other threads while self.thread.is_alive() # so after thread.join, this should be safe for event_pipe in self._event_pipes: event_pipe.close() + self._event_puller.socket.close() def close(self): if self.closed: @@ -456,7 +461,11 @@ def _schedule_flush(self): # add_timeout has to be handed to the io thread via event pipe def _schedule_in_thread(): - self._io_loop.call_later(self.flush_interval, self._flush) + # FIXME: original code was: + # self._io_loop.call_later(self.flush_interval, self._flush) + self._io_loop.call_soon_threadsafe( + self._io_loop.call_later, self.flush_interval, self._flush + ) self.pub_thread.schedule(_schedule_in_thread) diff --git a/ipykernel/ipkernel.py b/ipykernel/ipkernel.py index c4ef68d3d..b5b5c1577 100644 --- a/ipykernel/ipkernel.py +++ b/ipykernel/ipkernel.py @@ -12,7 +12,6 @@ from IPython.core import release from IPython.utils.tokenutil import line_at_cursor, token_at_cursor from traitlets import Any, Bool, Instance, List, Type, observe, observe_compat -from zmq.eventloop.zmqstream import ZMQStream from .comm import CommManager from .compiler import XCachingCompiler @@ -21,6 +20,7 @@ from .kernelbase import Kernel as KernelBase from .kernelbase import _accepts_cell_id from .zmqshell import ZMQInteractiveShell +from .zmqstream import ZMQStream try: from IPython.core.interactiveshell import _asyncio_runner @@ -183,9 +183,7 @@ def start(self): self.debugpy_stream.on_recv(self.dispatch_debugpy, copy=False) super().start() if self.debugpy_stream: - asyncio.run_coroutine_threadsafe( - self.poll_stopped_queue(), self.control_thread.io_loop.asyncio_loop - ) + asyncio.run_coroutine_threadsafe(self.poll_stopped_queue(), self.control_thread.io_loop) def set_parent(self, ident, parent, channel="shell"): """Overridden from parent to tell the display hook and output streams @@ -286,8 +284,8 @@ def set_sigint_result(): return sigint_future.set_result(1) - # use add_callback for thread safety - self.io_loop.add_callback(set_sigint_result) + # use call_soon_threadsafe for thread safety + self.io_loop.call_soon_threadsafe(set_sigint_result) # set the custom sigint hander during this context save_sigint = signal.signal(signal.SIGINT, handle_sigint) diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index 8e6084671..440adabab 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -3,6 +3,7 @@ # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. +import asyncio import atexit import errno import logging @@ -15,6 +16,7 @@ from logging import StreamHandler import zmq +import zmq.asyncio from IPython.core.application import ( BaseIPythonApplication, base_aliases, @@ -27,7 +29,6 @@ from jupyter_client.connect import ConnectionFileMixin from jupyter_client.session import Session, session_aliases, session_flags from jupyter_core.paths import jupyter_runtime_dir -from tornado import ioloop from traitlets.traitlets import ( Any, Bool, @@ -41,7 +42,6 @@ ) from traitlets.utils import filefind from traitlets.utils.importstring import import_item -from zmq.eventloop.zmqstream import ZMQStream from .control import ControlThread from .heartbeat import Heartbeat @@ -51,6 +51,7 @@ from .ipkernel import IPythonKernel from .parentpoller import ParentPollerUnix, ParentPollerWindows from .zmqshell import ZMQInteractiveShell +from .zmqstream import ZMQStream # ----------------------------------------------------------------------------- # Flags and Aliases @@ -130,6 +131,7 @@ class IPKernelApp(BaseIPythonApplication, InteractiveShellApp, ConnectionFileMix heartbeat = Instance(Heartbeat, allow_none=True) context = Any() + sync_context = Any() shell_socket = Any() control_socket = Any() debugpy_socket = Any() @@ -299,7 +301,8 @@ def init_sockets(self): # Create a context, a session, and the kernel sockets. self.log.info("Starting the kernel at pid: %i", os.getpid()) assert self.context is None, "init_sockets cannot be called twice!" - self.context = context = zmq.Context() + self.context = context = zmq.asyncio.Context() + self.sync_context = sync_context = zmq.Context() atexit.register(self.close) self.shell_socket = context.socket(zmq.ROUTER) @@ -307,7 +310,7 @@ def init_sockets(self): self.shell_port = self._bind_socket(self.shell_socket, self.shell_port) self.log.debug("shell ROUTER Channel on port: %i" % self.shell_port) - self.stdin_socket = context.socket(zmq.ROUTER) + self.stdin_socket = sync_context.socket(zmq.ROUTER) self.stdin_socket.linger = 1000 self.stdin_port = self._bind_socket(self.stdin_socket, self.stdin_port) self.log.debug("stdin ROUTER Channel on port: %i" % self.stdin_port) @@ -348,7 +351,6 @@ def init_iopub(self, context): self.iopub_socket.linger = 1000 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port) self.log.debug("iopub PUB Channel on port: %i" % self.iopub_port) - self.configure_tornado_logger() self.iopub_thread = IOPubThread(self.iopub_socket, pipe=True) self.iopub_thread.start() # backward-compat: wrap iopub socket API in background thread @@ -392,7 +394,9 @@ def close(self): if socket and not socket.closed: socket.close() self.log.debug("Terminating zmq context") - self.context.term() + # FIXME: this line breaks shutdown + # self.context.term() + self.sync_context.term() self.log.debug("Terminated zmq context") def log_connection_info(self): @@ -582,60 +586,6 @@ def init_shell(self): if self.shell: self.shell.configurables.append(self) - def configure_tornado_logger(self): - """Configure the tornado logging.Logger. - - Must set up the tornado logger or else tornado will call - basicConfig for the root logger which makes the root logger - go to the real sys.stderr instead of the capture streams. - This function mimics the setup of logging.basicConfig. - """ - logger = logging.getLogger("tornado") - handler = logging.StreamHandler() - formatter = logging.Formatter(logging.BASIC_FORMAT) - handler.setFormatter(formatter) - logger.addHandler(handler) - - def _init_asyncio_patch(self): - """set default asyncio policy to be compatible with tornado - - Tornado 6 (at least) is not compatible with the default - asyncio implementation on Windows - - Pick the older SelectorEventLoopPolicy on Windows - if the known-incompatible default policy is in use. - - Support for Proactor via a background thread is available in tornado 6.1, - but it is still preferable to run the Selector in the main thread - instead of the background. - - do this as early as possible to make it a low priority and overrideable - - ref: https://github.com/tornadoweb/tornado/issues/2608 - - FIXME: if/when tornado supports the defaults in asyncio without threads, - remove and bump tornado requirement for py38. - Most likely, this will mean a new Python version - where asyncio.ProactorEventLoop supports add_reader and friends. - - """ - if sys.platform.startswith("win") and sys.version_info >= (3, 8): - import asyncio - - try: - from asyncio import ( - WindowsProactorEventLoopPolicy, - WindowsSelectorEventLoopPolicy, - ) - except ImportError: - pass - # not affected - else: - if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy: - # WindowsProactorEventLoopPolicy is not compatible with tornado 6 - # fallback to the pre-3.8 default of Selector - asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy()) - def init_pdb(self): """Replace pdb with IPython's version that is interruptible. @@ -654,7 +604,6 @@ def init_pdb(self): @catch_config_error def initialize(self, argv=None): - self._init_asyncio_patch() super().initialize(argv) if self.subapp is not None: return @@ -697,7 +646,7 @@ def start(self): if self.poller is not None: self.poller.start() self.kernel.start() - self.io_loop = ioloop.IOLoop.current() + self.io_loop = asyncio.get_event_loop() if self.trio_loop: from ipykernel.trio_runner import TrioRunner @@ -709,9 +658,11 @@ def start(self): pass else: try: - self.io_loop.start() + self.io_loop.run_forever() except KeyboardInterrupt: pass + finally: + self.io_loop.close() launch_new_instance = IPKernelApp.launch_instance diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index e1cc99d71..44381c062 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -36,8 +36,6 @@ import zmq from IPython.core.error import StdinNotImplementedError from jupyter_client.session import Session -from tornado import ioloop -from tornado.queues import Queue, QueueEmpty from traitlets.config.configurable import SingletonConfigurable from traitlets.traitlets import ( Any, @@ -52,11 +50,11 @@ default, observe, ) -from zmq.eventloop.zmqstream import ZMQStream from ipykernel.jsonutil import json_clean from ._version import kernel_protocol_version +from .zmqstream import ZMQStream def _accepts_cell_id(meth): @@ -69,6 +67,11 @@ def _accepts_cell_id(meth): class Kernel(SingletonConfigurable): + control_queue: asyncio.Queue[ + t.Union[concurrent.futures.Future[t.Any], asyncio.Future[t.Any], bytes] + ] + msg_queue: asyncio.Queue[t.Tuple[int, t.Callable[..., t.Awaitable[t.Any]], t.Tuple[t.Any, ...]]] + # --------------------------------------------------------------------------- # Kernel interface # --------------------------------------------------------------------------- @@ -81,9 +84,9 @@ class Kernel(SingletonConfigurable): @observe("eventloop") def _update_eventloop(self, change): """schedule call to eventloop from IOLoop""" - loop = ioloop.IOLoop.current() + loop = asyncio.get_event_loop() if change.new is not None: - loop.add_callback(self.enter_eventloop) + loop.call_soon(self.enter_eventloop) session = Instance(Session, allow_none=True) profile_dir = Instance("IPython.core.profiledir.ProfileDir", allow_none=True) @@ -269,7 +272,7 @@ def __init__(self, **kwargs): for msg_type in self.control_msg_types: self.control_handlers[msg_type] = getattr(self, msg_type) - self.control_queue: Queue[t.Any] = Queue() + self.control_queue = asyncio.Queue() def dispatch_control(self, msg): self.control_queue.put_nowait(msg) @@ -296,14 +299,14 @@ async def _flush_control_queue(self): control_loop = self.io_loop tracer_future = awaitable_future = asyncio.Future() - def _flush(): + async def _flush(): # control_stream.flush puts messages on the queue self.control_stream.flush() # put Future on the queue after all of those, # so we can wait for all queued messages to be processed - self.control_queue.put(tracer_future) + await self.control_queue.put(tracer_future) - control_loop.add_callback(_flush) + asyncio.run_coroutine_threadsafe(_flush(), control_loop) return awaitable_future async def process_control(self, msg): @@ -494,7 +497,7 @@ async def process_one(self, wait=True): else: try: t, dispatch, args = self.msg_queue.get_nowait() - except (asyncio.QueueEmpty, QueueEmpty): + except asyncio.QueueEmpty: return None await dispatch(*args) @@ -520,9 +523,11 @@ async def dispatch_queue(self): def _message_counter_default(self): return itertools.count() - def schedule_dispatch(self, dispatch, *args): + def schedule_dispatch( + self, dispatch: t.Callable[..., t.Awaitable[t.Any]], *args: t.Any + ) -> None: """schedule a message for dispatch""" - idx = next(self._message_counter) + idx: int = next(self._message_counter) self.msg_queue.put_nowait( ( @@ -532,13 +537,13 @@ def schedule_dispatch(self, dispatch, *args): ) ) # ensure the eventloop wakes up - self.io_loop.add_callback(lambda: None) + self.io_loop.call_soon_threadsafe(lambda: None) def start(self): """register dispatchers for streams""" - self.io_loop = ioloop.IOLoop.current() - self.msg_queue: Queue[t.Any] = Queue() - self.io_loop.add_callback(self.dispatch_queue) + self.io_loop = asyncio.get_event_loop() + self.msg_queue = asyncio.Queue() + asyncio.ensure_future(self.dispatch_queue()) self.control_stream.on_recv(self.dispatch_control, copy=False) @@ -547,7 +552,7 @@ def start(self): else: control_loop = self.io_loop - asyncio.run_coroutine_threadsafe(self.poll_control_queue(), control_loop.asyncio_loop) + asyncio.run_coroutine_threadsafe(self.poll_control_queue(), control_loop) self.shell_stream.on_recv( partial( @@ -918,11 +923,12 @@ async def shutdown_request(self, stream, ident, parent): self.log.debug("Stopping control ioloop") control_io_loop = self.control_stream.io_loop - control_io_loop.add_callback(control_io_loop.stop) + control_io_loop.call_soon_threadsafe(control_io_loop.stop) self.log.debug("Stopping shell ioloop") shell_io_loop = self.shell_stream.io_loop - shell_io_loop.add_callback(shell_io_loop.stop) + # FIXME: shouldn't need to be threadsafe + shell_io_loop.call_soon_threadsafe(shell_io_loop.stop) def do_shutdown(self, restart): """Override in subclasses to do things when the frontend shuts down the diff --git a/ipykernel/tests/test_eventloop.py b/ipykernel/tests/test_eventloop.py index 35818515e..19ea7785b 100644 --- a/ipykernel/tests/test_eventloop.py +++ b/ipykernel/tests/test_eventloop.py @@ -1,8 +1,5 @@ """Test eventloop integration""" -import pytest -import tornado - from .utils import execute, flush_channels, start_new_kernel KC = KM = None @@ -26,7 +23,6 @@ def teardown(): """ -@pytest.mark.skipif(tornado.version_info < (5,), reason="only relevant on tornado 5") def test_asyncio_interrupt(): flush_channels(KC) msg_id, content = execute("%gui asyncio", KC) diff --git a/ipykernel/tests/test_io.py b/ipykernel/tests/test_io.py index 7e2950976..89cfd06b0 100644 --- a/ipykernel/tests/test_io.py +++ b/ipykernel/tests/test_io.py @@ -4,6 +4,7 @@ import pytest import zmq +import zmq.asyncio from jupyter_client.session import Session from ipykernel.iostream import IOPubThread, OutStream @@ -12,7 +13,7 @@ def test_io_api(): """Test that wrapped stdout has the same API as a normal TextIO object""" session = Session() - ctx = zmq.Context() + ctx = zmq.asyncio.Context() pub = ctx.socket(zmq.PUB) thread = IOPubThread(pub) thread.start() diff --git a/ipykernel/trio_runner.py b/ipykernel/trio_runner.py index 7b8f2166c..44dff1c57 100644 --- a/ipykernel/trio_runner.py +++ b/ipykernel/trio_runner.py @@ -19,7 +19,9 @@ def initialize(self, kernel, io_loop): kernel.shell.magics_manager.magics["line"]["autoawait"] = lambda _: warnings.warn( "Autoawait isn't allowed in Trio background loop mode." ) - bg_thread = threading.Thread(target=io_loop.start, daemon=True, name="TornadoBackground") + bg_thread = threading.Thread( + target=io_loop.run_forever, daemon=True, name="AsyncioBackground" + ) bg_thread.start() def interrupt(self, signum, frame): diff --git a/ipykernel/zmqstream.py b/ipykernel/zmqstream.py new file mode 100644 index 000000000..22ef608b4 --- /dev/null +++ b/ipykernel/zmqstream.py @@ -0,0 +1,35 @@ +import asyncio +from inspect import isawaitable + + +class ZMQStream: + def __init__(self, socket, io_loop=None): + self.socket = socket + if io_loop is None: + self.io_loop = asyncio.get_event_loop() + self.is_thread = False + else: + self.io_loop = io_loop + self.is_thread = True + + def on_recv(self, callback, copy=True): + if self.is_thread: + # it is a loop in another thread + asyncio.run_coroutine_threadsafe(self.recv_task(callback, copy), self.io_loop) + else: + # it is the main thread's loop, schedule the task to launch when the loop runs + asyncio.ensure_future(self.recv_task(callback, copy)) + + async def recv_task(self, callback, copy): + while True: + msg_list = self.socket.recv_multipart(copy=copy) + if isawaitable(msg_list): + # in-process kernels are not async + msg_list = await msg_list + callback(msg_list) + + def send_multipart(self, msg_list, flags=0, copy=True, track=False, **kwargs): + return self.socket.send_multipart(msg_list, copy=copy) + + def flush(self, flag=None, limit=None): + pass