From a32a71c1d48c0ec272b3e82fb2ac89666eb6436f Mon Sep 17 00:00:00 2001 From: David Brochart Date: Wed, 23 Feb 2022 14:11:25 +0100 Subject: [PATCH] Drop Tornado --- examples/embedding/inprocess_qtconsole.py | 30 ---------- examples/embedding/inprocess_terminal.py | 31 ---------- ipykernel/control.py | 20 +++---- ipykernel/debugger.py | 12 ++-- ipykernel/eventloops.py | 4 +- ipykernel/inprocess/tests/test_kernel.py | 36 ------------ ipykernel/iostream.py | 42 ++++++------- ipykernel/ipkernel.py | 8 +-- ipykernel/kernelapp.py | 72 +++-------------------- ipykernel/kernelbase.py | 30 +++++----- ipykernel/tests/test_eventloop.py | 2 - ipykernel/trio_runner.py | 4 +- ipykernel/zmqstream.py | 30 ++++++++++ setup.py | 1 - 14 files changed, 96 insertions(+), 226 deletions(-) create mode 100644 ipykernel/zmqstream.py diff --git a/examples/embedding/inprocess_qtconsole.py b/examples/embedding/inprocess_qtconsole.py index 5d9998bff..e8da8b81e 100644 --- a/examples/embedding/inprocess_qtconsole.py +++ b/examples/embedding/inprocess_qtconsole.py @@ -1,8 +1,6 @@ import os import sys -import tornado - from qtconsole.rich_ipython_widget import RichIPythonWidget from qtconsole.inprocess import QtInProcessKernelManager from IPython.lib import guisupport @@ -12,38 +10,10 @@ def print_process_id(): print('Process ID is:', os.getpid()) -def init_asyncio_patch(): - """set default asyncio policy to be compatible with tornado - Tornado 6 (at least) is not compatible with the default - asyncio implementation on Windows - Pick the older SelectorEventLoopPolicy on Windows - if the known-incompatible default policy is in use. - do this as early as possible to make it a low priority and overrideable - ref: https://github.com/tornadoweb/tornado/issues/2608 - FIXME: if/when tornado supports the defaults in asyncio, - remove and bump tornado requirement for py38 - """ - if sys.platform.startswith("win") and sys.version_info >= (3, 8) and tornado.version_info < (6, 1): - import asyncio - try: - from asyncio import ( - WindowsProactorEventLoopPolicy, - WindowsSelectorEventLoopPolicy, - ) - except ImportError: - pass - # not affected - else: - if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy: - # WindowsProactorEventLoopPolicy is not compatible with tornado 6 - # fallback to the pre-3.8 default of Selector - asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy()) - def main(): # Print the ID of the main process print_process_id() - init_asyncio_patch() app = guisupport.get_app_qt4() # Create an in-process kernel diff --git a/examples/embedding/inprocess_terminal.py b/examples/embedding/inprocess_terminal.py index 5ccab1654..862a3582d 100644 --- a/examples/embedding/inprocess_terminal.py +++ b/examples/embedding/inprocess_terminal.py @@ -1,8 +1,6 @@ import os import sys -import tornado - from ipykernel.inprocess import InProcessKernelManager from jupyter_console.ptshell import ZMQTerminalInteractiveShell @@ -11,41 +9,12 @@ def print_process_id(): print('Process ID is:', os.getpid()) -def init_asyncio_patch(): - """set default asyncio policy to be compatible with tornado - Tornado 6 (at least) is not compatible with the default - asyncio implementation on Windows - Pick the older SelectorEventLoopPolicy on Windows - if the known-incompatible default policy is in use. - do this as early as possible to make it a low priority and overrideable - ref: https://github.com/tornadoweb/tornado/issues/2608 - FIXME: if/when tornado supports the defaults in asyncio, - remove and bump tornado requirement for py38 - """ - if sys.platform.startswith("win") and sys.version_info >= (3, 8) and tornado.version_info < (6, 1): - import asyncio - try: - from asyncio import ( - WindowsProactorEventLoopPolicy, - WindowsSelectorEventLoopPolicy, - ) - except ImportError: - pass - # not affected - else: - if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy: - # WindowsProactorEventLoopPolicy is not compatible with tornado 6 - # fallback to the pre-3.8 default of Selector - asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy()) - - def main(): print_process_id() # Create an in-process kernel # >>> print_process_id() # will print the same process ID as the main process - init_asyncio_patch() kernel_manager = InProcessKernelManager() kernel_manager.start_kernel() kernel = kernel_manager.kernel diff --git a/ipykernel/control.py b/ipykernel/control.py index cba8b0994..a4e515a58 100644 --- a/ipykernel/control.py +++ b/ipykernel/control.py @@ -1,25 +1,23 @@ -from threading import Thread -import zmq -if zmq.pyzmq_version_info() >= (17, 0): - from tornado.ioloop import IOLoop -else: - # deprecated since pyzmq 17 - from zmq.eventloop.ioloop import IOLoop +from threading import Thread, Event +import asyncio class ControlThread(Thread): def __init__(self, **kwargs): Thread.__init__(self, name="Control", **kwargs) - self.io_loop = IOLoop(make_current=False) self.pydev_do_not_trace = True self.is_pydev_daemon_thread = True + self.io_loop = None + self.loop_ready = Event() def run(self): self.name = "Control" - self.io_loop.make_current() + self.io_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.io_loop) + self.loop_ready.set() try: - self.io_loop.start() + self.io_loop.run_forever() finally: self.io_loop.close() @@ -28,4 +26,4 @@ def stop(self): This method is threadsafe. """ - self.io_loop.add_callback(self.io_loop.stop) + self.io_loop.call_soon_threadsafe(self.io_loop.stop) diff --git a/ipykernel/debugger.py b/ipykernel/debugger.py index dfe5d0a9b..4223fcb27 100644 --- a/ipykernel/debugger.py +++ b/ipykernel/debugger.py @@ -1,3 +1,4 @@ +import asyncio import sys import os import re @@ -6,9 +7,6 @@ import zmq from zmq.utils import jsonapi -from tornado.queues import Queue -from tornado.locks import Event - from IPython.core.getipython import get_ipython from IPython.core.inputtransformer2 import leading_empty_lines @@ -86,7 +84,7 @@ def __init__(self, event_callback, log): self.tcp_buffer = '' self._reset_tcp_pos() self.event_callback = event_callback - self.message_queue = Queue() + self.message_queue = asyncio.Queue() self.log = log def _reset_tcp_pos(self): @@ -166,7 +164,7 @@ def __init__(self, log, debugpy_stream, event_callback): self.debugpy_port = -1 self.routing_id = None self.wait_for_attach = True - self.init_event = Event() + self.init_event = asyncio.Event() self.init_event_seq = -1 def _get_endpoint(self): @@ -238,7 +236,7 @@ def connect_tcp_socket(self): def disconnect_tcp_socket(self): self.debugpy_stream.socket.disconnect(self._get_endpoint()) self.routing_id = None - self.init_event = Event() + self.init_event = asyncio.Event() self.init_event_seq = -1 self.wait_for_attach = True @@ -282,7 +280,7 @@ def __init__(self, log, debugpy_stream, event_callback, shell_socket, session, j self.is_started = False self.event_callback = event_callback self.just_my_code = just_my_code - self.stopped_queue = Queue() + self.stopped_queue = asyncio.Queue() self.started_debug_handlers = {} for msg_type in Debugger.started_debug_msg_types: diff --git a/ipykernel/eventloops.py b/ipykernel/eventloops.py index 29434bb41..0af1c4905 100644 --- a/ipykernel/eventloops.py +++ b/ipykernel/eventloops.py @@ -186,7 +186,7 @@ def on_timer(self, event): self.func() # We need a custom wx.App to create our Frame subclass that has the - # wx.Timer to defer back to the tornado event loop. + # wx.Timer to defer back to the event loop. class IPWxApp(wx.App): def OnInit(self): self.frame = TimerFrame(wake) @@ -367,7 +367,7 @@ def loop_asyncio(kernel): '''Start a kernel with asyncio event loop support.''' import asyncio loop = asyncio.get_event_loop() - # loop is already running (e.g. tornado 5), nothing left to do + # loop is already running, nothing left to do if loop.is_running(): return diff --git a/ipykernel/inprocess/tests/test_kernel.py b/ipykernel/inprocess/tests/test_kernel.py index 952c66905..c12e706d1 100644 --- a/ipykernel/inprocess/tests/test_kernel.py +++ b/ipykernel/inprocess/tests/test_kernel.py @@ -7,8 +7,6 @@ import pytest -import tornado - from ipykernel.inprocess.blocking import BlockingInProcessKernelClient from ipykernel.inprocess.manager import InProcessKernelManager from ipykernel.inprocess.ipkernel import InProcessKernel @@ -17,43 +15,9 @@ -def _init_asyncio_patch(): - """set default asyncio policy to be compatible with tornado - - Tornado 6 (at least) is not compatible with the default - asyncio implementation on Windows - - Pick the older SelectorEventLoopPolicy on Windows - if the known-incompatible default policy is in use. - - do this as early as possible to make it a low priority and overrideable - - ref: https://github.com/tornadoweb/tornado/issues/2608 - - FIXME: if/when tornado supports the defaults in asyncio, - remove and bump tornado requirement for py38 - """ - if sys.platform.startswith("win") and sys.version_info >= (3, 8) and tornado.version_info < (6, 1): - import asyncio - try: - from asyncio import ( - WindowsProactorEventLoopPolicy, - WindowsSelectorEventLoopPolicy, - ) - except ImportError: - pass - # not affected - else: - if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy: - # WindowsProactorEventLoopPolicy is not compatible with tornado 6 - # fallback to the pre-3.8 default of Selector - asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy()) - - class InProcessKernelTestCase(unittest.TestCase): def setUp(self): - _init_asyncio_patch() self.km = InProcessKernelManager() self.km.start_kernel() self.kc = self.km.client() diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index 50f86d243..1463a9eca 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -3,6 +3,7 @@ # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. +import asyncio import atexit from binascii import b2a_hex from collections import deque @@ -17,15 +18,10 @@ import io import zmq -if zmq.pyzmq_version_info() >= (17, 0): - from tornado.ioloop import IOLoop -else: - # deprecated since pyzmq 17 - from zmq.eventloop.ioloop import IOLoop -from zmq.eventloop.zmqstream import ZMQStream - from jupyter_client.session import extract_header +from .zmqstream import ZMQStream + #----------------------------------------------------------------------------- # Globals @@ -63,24 +59,30 @@ def __init__(self, socket, pipe=False): self.background_socket = BackgroundSocket(self) self._master_pid = os.getpid() self._pipe_flag = pipe - self.io_loop = IOLoop(make_current=False) + self.io_loop = None + self.loop_ready = threading.Event() + self.thread = threading.Thread(target=self._thread_main, name="IOPub") + self.thread.daemon = True + self.thread.pydev_do_not_trace = True + self.thread.is_pydev_daemon_thread = True + self.thread.name = "IOPub" + self.thread.start() if pipe: self._setup_pipe_in() self._local = threading.local() self._events = deque() self._event_pipes = WeakSet() self._setup_event_pipe() - self.thread = threading.Thread(target=self._thread_main, name="IOPub") - self.thread.daemon = True - self.thread.pydev_do_not_trace = True - self.thread.is_pydev_daemon_thread = True - self.thread.name = "IOPub" def _thread_main(self): """The inner loop that's actually run in a thread""" - self.io_loop.make_current() - self.io_loop.start() - self.io_loop.close(all_fds=True) + self.io_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.io_loop) + self.loop_ready.set() + try: + self.io_loop.run_forever() + finally: + self.io_loop.close() def _setup_event_pipe(self): """Create the PULL socket listening for events that should fire in this thread.""" @@ -91,7 +93,7 @@ def _setup_event_pipe(self): _uuid = b2a_hex(os.urandom(16)).decode('ascii') iface = self._event_interface = 'inproc://%s' % _uuid pipe_in.bind(iface) - self._event_puller = ZMQStream(pipe_in, self.io_loop) + self._event_puller = ZMQStream(pipe_in, self) self._event_puller.on_recv(self._handle_event) @property @@ -145,7 +147,7 @@ def _setup_pipe_in(self): self._pipe_flag = False pipe_in.close() return - self._pipe_in = ZMQStream(pipe_in, self.io_loop) + self._pipe_in = ZMQStream(pipe_in, self) self._pipe_in.on_recv(self._handle_pipe_msg) def _handle_pipe_msg(self, msg): @@ -187,7 +189,7 @@ def stop(self): """Stop the IOPub thread""" if not self.thread.is_alive(): return - self.io_loop.add_callback(self.io_loop.stop) + self.io_loop.call_soon_threadsafe(self.io_loop.stop) self.thread.join() # close *all* event pipes, created in any thread # event pipes can only be used from other threads while self.thread.is_alive() @@ -447,7 +449,7 @@ def _schedule_flush(self): # add_timeout has to be handed to the io thread via event pipe def _schedule_in_thread(): - self._io_loop.call_later(self.flush_interval, self._flush) + self._io_loop.call_soon_threadsafe(self._flush) self.pub_thread.schedule(_schedule_in_thread) def flush(self): diff --git a/ipykernel/ipkernel.py b/ipykernel/ipkernel.py index 26276ca91..3b8092f48 100644 --- a/ipykernel/ipkernel.py +++ b/ipykernel/ipkernel.py @@ -11,7 +11,6 @@ from IPython.core import release from IPython.utils.tokenutil import token_at_cursor, line_at_cursor from traitlets import Instance, Type, Any, List, Bool, observe, observe_compat -from zmq.eventloop.zmqstream import ZMQStream from .comm import CommManager from .kernelbase import Kernel as KernelBase @@ -19,6 +18,7 @@ from .eventloops import _use_appnope from .compiler import XCachingCompiler from .debugger import Debugger, _is_debugpy_available +from .zmqstream import ZMQStream try: from IPython.core.interactiveshell import _asyncio_runner @@ -177,7 +177,7 @@ def start(self): self.debugpy_stream.on_recv(self.dispatch_debugpy, copy=False) super().start() if self.debugpy_stream: - asyncio.run_coroutine_threadsafe(self.poll_stopped_queue(), self.control_thread.io_loop.asyncio_loop) + asyncio.run_coroutine_threadsafe(self.poll_stopped_queue(), self.control_thread.io_loop) def set_parent(self, ident, parent, channel='shell'): """Overridden from parent to tell the display hook and output streams @@ -282,8 +282,8 @@ def set_sigint_result(): if sigint_future.cancelled() or sigint_future.done(): return sigint_future.set_result(1) - # use add_callback for thread safety - self.io_loop.add_callback(set_sigint_result) + # use call_soon_threadsage for thread safety + self.io_loop.call_soon_threadsafe(set_sigint_result) # set the custom sigint hander during this context save_sigint = signal.signal(signal.SIGINT, handle_sigint) diff --git a/ipykernel/kernelapp.py b/ipykernel/kernelapp.py index dc33530f8..59940e720 100644 --- a/ipykernel/kernelapp.py +++ b/ipykernel/kernelapp.py @@ -3,6 +3,7 @@ # Copyright (c) IPython Development Team. # Distributed under the terms of the Modified BSD License. +import asyncio import atexit import os import sys @@ -14,10 +15,8 @@ from io import TextIOWrapper, FileIO from logging import StreamHandler -from tornado import ioloop - import zmq -from zmq.eventloop.zmqstream import ZMQStream +import zmq.asyncio from IPython.core.application import ( BaseIPythonApplication, base_flags, base_aliases, catch_config_error @@ -45,6 +44,7 @@ Session, session_flags, session_aliases, ) from .zmqshell import ZMQInteractiveShell +from .zmqstream import ZMQStream #----------------------------------------------------------------------------- # Flags and Aliases @@ -274,7 +274,7 @@ def init_sockets(self): # Create a context, a session, and the kernel sockets. self.log.info("Starting the kernel at pid: %i", os.getpid()) assert self.context is None, "init_sockets cannot be called twice!" - self.context = context = zmq.Context() + self.context = context = zmq.asyncio.Context() atexit.register(self.close) self.shell_socket = context.socket(zmq.ROUTER) @@ -324,9 +324,7 @@ def init_iopub(self, context): self.iopub_socket.linger = 1000 self.iopub_port = self._bind_socket(self.iopub_socket, self.iopub_port) self.log.debug("iopub PUB Channel on port: %i" % self.iopub_port) - self.configure_tornado_logger() self.iopub_thread = IOPubThread(self.iopub_socket, pipe=True) - self.iopub_thread.start() # backward-compat: wrap iopub socket API in background thread self.iopub_socket = self.iopub_thread.background_socket @@ -491,8 +489,8 @@ def init_signal(self): def init_kernel(self): """Create the Kernel object itself""" shell_stream = ZMQStream(self.shell_socket) - control_stream = ZMQStream(self.control_socket, self.control_thread.io_loop) - debugpy_stream = ZMQStream(self.debugpy_socket, self.control_thread.io_loop) + control_stream = ZMQStream(self.control_socket, self.control_thread) + debugpy_stream = ZMQStream(self.debugpy_socket, self.control_thread) self.control_thread.start() kernel_factory = self.kernel_class.instance @@ -551,59 +549,6 @@ def init_shell(self): if self.shell: self.shell.configurables.append(self) - def configure_tornado_logger(self): - """ Configure the tornado logging.Logger. - - Must set up the tornado logger or else tornado will call - basicConfig for the root logger which makes the root logger - go to the real sys.stderr instead of the capture streams. - This function mimics the setup of logging.basicConfig. - """ - logger = logging.getLogger('tornado') - handler = logging.StreamHandler() - formatter = logging.Formatter(logging.BASIC_FORMAT) - handler.setFormatter(formatter) - logger.addHandler(handler) - - def _init_asyncio_patch(self): - """set default asyncio policy to be compatible with tornado - - Tornado 6 (at least) is not compatible with the default - asyncio implementation on Windows - - Pick the older SelectorEventLoopPolicy on Windows - if the known-incompatible default policy is in use. - - Support for Proactor via a background thread is available in tornado 6.1, - but it is still preferable to run the Selector in the main thread - instead of the background. - - do this as early as possible to make it a low priority and overrideable - - ref: https://github.com/tornadoweb/tornado/issues/2608 - - FIXME: if/when tornado supports the defaults in asyncio without threads, - remove and bump tornado requirement for py38. - Most likely, this will mean a new Python version - where asyncio.ProactorEventLoop supports add_reader and friends. - - """ - if sys.platform.startswith("win") and sys.version_info >= (3, 8): - import asyncio - try: - from asyncio import ( - WindowsProactorEventLoopPolicy, - WindowsSelectorEventLoopPolicy, - ) - except ImportError: - pass - # not affected - else: - if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy: - # WindowsProactorEventLoopPolicy is not compatible with tornado 6 - # fallback to the pre-3.8 default of Selector - asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy()) - def init_pdb(self): """Replace pdb with IPython's version that is interruptible. @@ -620,7 +565,6 @@ def init_pdb(self): @catch_config_error def initialize(self, argv=None): - self._init_asyncio_patch() super().initialize(argv) if self.subapp is not None: return @@ -663,7 +607,7 @@ def start(self): if self.poller is not None: self.poller.start() self.kernel.start() - self.io_loop = ioloop.IOLoop.current() + self.io_loop = asyncio.get_event_loop() if self.trio_loop: from ipykernel.trio_runner import TrioRunner tr = TrioRunner() @@ -674,7 +618,7 @@ def start(self): pass else: try: - self.io_loop.start() + self.io_loop.run_forever() except KeyboardInterrupt: pass diff --git a/ipykernel/kernelbase.py b/ipykernel/kernelbase.py index c775398fc..e0970ccdf 100644 --- a/ipykernel/kernelbase.py +++ b/ipykernel/kernelbase.py @@ -37,16 +37,14 @@ import zmq from IPython.core.error import StdinNotImplementedError from jupyter_client.session import Session -from tornado import ioloop -from tornado.queues import Queue, QueueEmpty from traitlets import (Any, Bool, Dict, Float, Instance, Integer, List, Set, Unicode, default, observe) from traitlets.config.configurable import SingletonConfigurable -from zmq.eventloop.zmqstream import ZMQStream from ipykernel.jsonutil import json_clean from ._version import kernel_protocol_version +from .zmqstream import ZMQStream class Kernel(SingletonConfigurable): @@ -61,9 +59,9 @@ class Kernel(SingletonConfigurable): @observe('eventloop') def _update_eventloop(self, change): """schedule call to eventloop from IOLoop""" - loop = ioloop.IOLoop.current() + loop = asyncio.get_event_loop() if change.new is not None: - loop.add_callback(self.enter_eventloop) + loop.call_soon(self.enter_eventloop()) session = Instance(Session, allow_none=True) profile_dir = Instance('IPython.core.profiledir.ProfileDir', allow_none=True) @@ -233,7 +231,7 @@ def __init__(self, **kwargs): for msg_type in self.control_msg_types: self.control_handlers[msg_type] = getattr(self, msg_type) - self.control_queue = Queue() + self.control_queue = asyncio.Queue() def dispatch_control(self, msg): self.control_queue.put_nowait(msg) @@ -266,7 +264,7 @@ def _flush(): # so we can wait for all queued messages to be processed self.control_queue.put(tracer_future) - control_loop.add_callback(_flush) + control_loop.call_soon_threadsafe(_flush) return awaitable_future async def process_control(self, msg): @@ -457,7 +455,7 @@ async def process_one(self, wait=True): else: try: t, dispatch, args = self.msg_queue.get_nowait() - except (asyncio.QueueEmpty, QueueEmpty): + except asyncio.QueueEmpty: return None await dispatch(*args) @@ -493,14 +491,13 @@ def schedule_dispatch(self, dispatch, *args): args, ) ) - # ensure the eventloop wakes up - self.io_loop.add_callback(lambda: None) def start(self): """register dispatchers for streams""" - self.io_loop = ioloop.IOLoop.current() - self.msg_queue = Queue() - self.io_loop.add_callback(self.dispatch_queue) + self.msg_queue = asyncio.Queue() + self.io_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.io_loop) + asyncio.ensure_future(self.dispatch_queue()) self.control_stream.on_recv(self.dispatch_control, copy=False) @@ -509,7 +506,7 @@ def start(self): else: control_loop = self.io_loop - asyncio.run_coroutine_threadsafe(self.poll_control_queue(), control_loop.asyncio_loop) + asyncio.run_coroutine_threadsafe(self.poll_control_queue(), control_loop) self.shell_stream.on_recv( partial( @@ -837,11 +834,12 @@ async def shutdown_request(self, stream, ident, parent): self.log.debug('Stopping control ioloop') control_io_loop = self.control_stream.io_loop - control_io_loop.add_callback(control_io_loop.stop) + if control_io_loop: + control_io_loop.call_soon_threadsafe(control_io_loop.stop) self.log.debug('Stopping shell ioloop') shell_io_loop = self.shell_stream.io_loop - shell_io_loop.add_callback(shell_io_loop.stop) + shell_io_loop.call_soon_threadsafe(shell_io_loop.stop) def do_shutdown(self, restart): """Override in subclasses to do things when the frontend shuts down the diff --git a/ipykernel/tests/test_eventloop.py b/ipykernel/tests/test_eventloop.py index 1f25d97e2..5da49fb28 100644 --- a/ipykernel/tests/test_eventloop.py +++ b/ipykernel/tests/test_eventloop.py @@ -3,7 +3,6 @@ import sys import pytest -import tornado from .utils import flush_channels, start_new_kernel, execute @@ -28,7 +27,6 @@ def teardown(): """ -@pytest.mark.skipif(tornado.version_info < (5,), reason="only relevant on tornado 5") def test_asyncio_interrupt(): flush_channels(KC) msg_id, content = execute('%gui asyncio', KC) diff --git a/ipykernel/trio_runner.py b/ipykernel/trio_runner.py index d78e516aa..5788e6c4f 100644 --- a/ipykernel/trio_runner.py +++ b/ipykernel/trio_runner.py @@ -19,8 +19,8 @@ def initialize(self, kernel, io_loop): kernel.shell.magics_manager.magics['line']['autoawait'] = \ lambda _: warnings.warn("Autoawait isn't allowed in Trio " "background loop mode.") - bg_thread = threading.Thread(target=io_loop.start, daemon=True, - name='TornadoBackground') + bg_thread = threading.Thread(target=io_loop.run_forever, daemon=True, + name='AsyncioBackground') bg_thread.start() def interrupt(self, signum, frame): diff --git a/ipykernel/zmqstream.py b/ipykernel/zmqstream.py new file mode 100644 index 000000000..d7ae4ef40 --- /dev/null +++ b/ipykernel/zmqstream.py @@ -0,0 +1,30 @@ +import asyncio + + +class ZMQStream: + def __init__(self, socket, thread=None): + self.socket = socket + self.thread = thread + if thread: + self.io_loop = thread.io_loop + else: + self.io_loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.io_loop) + + def on_recv(self, callback, copy=True): + if self.thread: + self.thread.loop_ready.wait() + asyncio.run_coroutine_threadsafe(self.recv_task(callback, copy), self.thread.io_loop) + else: + asyncio.ensure_future(self.recv_task(callback, copy)) + + async def recv_task(self, callback, copy): + while True: + msg_list = await self.socket.recv_multipart(copy=copy) + callback(msg_list) + + def send_multipart(self, msg_list, flags=0, copy=True, track=False, **kwargs): + return self.socket.send_multipart(msg_list, copy=copy) + + def flush(self, flag=None, limit = None): + pass diff --git a/setup.py b/setup.py index 71289881e..31ec80769 100644 --- a/setup.py +++ b/setup.py @@ -65,7 +65,6 @@ def run(self): 'ipython>=7.23.1', 'traitlets>=5.1.0,<6.0', 'jupyter_client<8.0', - 'tornado>=4.2,<7.0', 'matplotlib-inline>=0.1.0,<0.2.0', 'appnope;platform_system=="Darwin"', 'psutil',