Skip to content

Commit

Permalink
Drop Tornado
Browse files Browse the repository at this point in the history
  • Loading branch information
davidbrochart committed Feb 23, 2022
1 parent 5c16fde commit a32a71c
Show file tree
Hide file tree
Showing 14 changed files with 96 additions and 226 deletions.
30 changes: 0 additions & 30 deletions examples/embedding/inprocess_qtconsole.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import os
import sys

import tornado

from qtconsole.rich_ipython_widget import RichIPythonWidget
from qtconsole.inprocess import QtInProcessKernelManager
from IPython.lib import guisupport
Expand All @@ -12,38 +10,10 @@ def print_process_id():
print('Process ID is:', os.getpid())


def init_asyncio_patch():
"""set default asyncio policy to be compatible with tornado
Tornado 6 (at least) is not compatible with the default
asyncio implementation on Windows
Pick the older SelectorEventLoopPolicy on Windows
if the known-incompatible default policy is in use.
do this as early as possible to make it a low priority and overrideable
ref: https://github.com/tornadoweb/tornado/issues/2608
FIXME: if/when tornado supports the defaults in asyncio,
remove and bump tornado requirement for py38
"""
if sys.platform.startswith("win") and sys.version_info >= (3, 8) and tornado.version_info < (6, 1):
import asyncio
try:
from asyncio import (
WindowsProactorEventLoopPolicy,
WindowsSelectorEventLoopPolicy,
)
except ImportError:
pass
# not affected
else:
if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy:
# WindowsProactorEventLoopPolicy is not compatible with tornado 6
# fallback to the pre-3.8 default of Selector
asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())

def main():
# Print the ID of the main process
print_process_id()

init_asyncio_patch()
app = guisupport.get_app_qt4()

# Create an in-process kernel
Expand Down
31 changes: 0 additions & 31 deletions examples/embedding/inprocess_terminal.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import os
import sys

import tornado

from ipykernel.inprocess import InProcessKernelManager
from jupyter_console.ptshell import ZMQTerminalInteractiveShell

Expand All @@ -11,41 +9,12 @@ def print_process_id():
print('Process ID is:', os.getpid())


def init_asyncio_patch():
"""set default asyncio policy to be compatible with tornado
Tornado 6 (at least) is not compatible with the default
asyncio implementation on Windows
Pick the older SelectorEventLoopPolicy on Windows
if the known-incompatible default policy is in use.
do this as early as possible to make it a low priority and overrideable
ref: https://github.com/tornadoweb/tornado/issues/2608
FIXME: if/when tornado supports the defaults in asyncio,
remove and bump tornado requirement for py38
"""
if sys.platform.startswith("win") and sys.version_info >= (3, 8) and tornado.version_info < (6, 1):
import asyncio
try:
from asyncio import (
WindowsProactorEventLoopPolicy,
WindowsSelectorEventLoopPolicy,
)
except ImportError:
pass
# not affected
else:
if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy:
# WindowsProactorEventLoopPolicy is not compatible with tornado 6
# fallback to the pre-3.8 default of Selector
asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())


def main():
print_process_id()

# Create an in-process kernel
# >>> print_process_id()
# will print the same process ID as the main process
init_asyncio_patch()
kernel_manager = InProcessKernelManager()
kernel_manager.start_kernel()
kernel = kernel_manager.kernel
Expand Down
20 changes: 9 additions & 11 deletions ipykernel/control.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
from threading import Thread
import zmq
if zmq.pyzmq_version_info() >= (17, 0):
from tornado.ioloop import IOLoop
else:
# deprecated since pyzmq 17
from zmq.eventloop.ioloop import IOLoop
from threading import Thread, Event
import asyncio


class ControlThread(Thread):

def __init__(self, **kwargs):
Thread.__init__(self, name="Control", **kwargs)
self.io_loop = IOLoop(make_current=False)
self.pydev_do_not_trace = True
self.is_pydev_daemon_thread = True
self.io_loop = None
self.loop_ready = Event()

def run(self):
self.name = "Control"
self.io_loop.make_current()
self.io_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.io_loop)
self.loop_ready.set()
try:
self.io_loop.start()
self.io_loop.run_forever()
finally:
self.io_loop.close()

Expand All @@ -28,4 +26,4 @@ def stop(self):
This method is threadsafe.
"""
self.io_loop.add_callback(self.io_loop.stop)
self.io_loop.call_soon_threadsafe(self.io_loop.stop)
12 changes: 5 additions & 7 deletions ipykernel/debugger.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import sys
import os
import re
Expand All @@ -6,9 +7,6 @@
import zmq
from zmq.utils import jsonapi

from tornado.queues import Queue
from tornado.locks import Event

from IPython.core.getipython import get_ipython
from IPython.core.inputtransformer2 import leading_empty_lines

Expand Down Expand Up @@ -86,7 +84,7 @@ def __init__(self, event_callback, log):
self.tcp_buffer = ''
self._reset_tcp_pos()
self.event_callback = event_callback
self.message_queue = Queue()
self.message_queue = asyncio.Queue()
self.log = log

def _reset_tcp_pos(self):
Expand Down Expand Up @@ -166,7 +164,7 @@ def __init__(self, log, debugpy_stream, event_callback):
self.debugpy_port = -1
self.routing_id = None
self.wait_for_attach = True
self.init_event = Event()
self.init_event = asyncio.Event()
self.init_event_seq = -1

def _get_endpoint(self):
Expand Down Expand Up @@ -238,7 +236,7 @@ def connect_tcp_socket(self):
def disconnect_tcp_socket(self):
self.debugpy_stream.socket.disconnect(self._get_endpoint())
self.routing_id = None
self.init_event = Event()
self.init_event = asyncio.Event()
self.init_event_seq = -1
self.wait_for_attach = True

Expand Down Expand Up @@ -282,7 +280,7 @@ def __init__(self, log, debugpy_stream, event_callback, shell_socket, session, j
self.is_started = False
self.event_callback = event_callback
self.just_my_code = just_my_code
self.stopped_queue = Queue()
self.stopped_queue = asyncio.Queue()

self.started_debug_handlers = {}
for msg_type in Debugger.started_debug_msg_types:
Expand Down
4 changes: 2 additions & 2 deletions ipykernel/eventloops.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def on_timer(self, event):
self.func()

# We need a custom wx.App to create our Frame subclass that has the
# wx.Timer to defer back to the tornado event loop.
# wx.Timer to defer back to the event loop.
class IPWxApp(wx.App):
def OnInit(self):
self.frame = TimerFrame(wake)
Expand Down Expand Up @@ -367,7 +367,7 @@ def loop_asyncio(kernel):
'''Start a kernel with asyncio event loop support.'''
import asyncio
loop = asyncio.get_event_loop()
# loop is already running (e.g. tornado 5), nothing left to do
# loop is already running, nothing left to do
if loop.is_running():
return

Expand Down
36 changes: 0 additions & 36 deletions ipykernel/inprocess/tests/test_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

import pytest

import tornado

from ipykernel.inprocess.blocking import BlockingInProcessKernelClient
from ipykernel.inprocess.manager import InProcessKernelManager
from ipykernel.inprocess.ipkernel import InProcessKernel
Expand All @@ -17,43 +15,9 @@



def _init_asyncio_patch():
"""set default asyncio policy to be compatible with tornado
Tornado 6 (at least) is not compatible with the default
asyncio implementation on Windows
Pick the older SelectorEventLoopPolicy on Windows
if the known-incompatible default policy is in use.
do this as early as possible to make it a low priority and overrideable
ref: https://github.com/tornadoweb/tornado/issues/2608
FIXME: if/when tornado supports the defaults in asyncio,
remove and bump tornado requirement for py38
"""
if sys.platform.startswith("win") and sys.version_info >= (3, 8) and tornado.version_info < (6, 1):
import asyncio
try:
from asyncio import (
WindowsProactorEventLoopPolicy,
WindowsSelectorEventLoopPolicy,
)
except ImportError:
pass
# not affected
else:
if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy:
# WindowsProactorEventLoopPolicy is not compatible with tornado 6
# fallback to the pre-3.8 default of Selector
asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())


class InProcessKernelTestCase(unittest.TestCase):

def setUp(self):
_init_asyncio_patch()
self.km = InProcessKernelManager()
self.km.start_kernel()
self.kc = self.km.client()
Expand Down
42 changes: 22 additions & 20 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.

import asyncio
import atexit
from binascii import b2a_hex
from collections import deque
Expand All @@ -17,15 +18,10 @@
import io

import zmq
if zmq.pyzmq_version_info() >= (17, 0):
from tornado.ioloop import IOLoop
else:
# deprecated since pyzmq 17
from zmq.eventloop.ioloop import IOLoop
from zmq.eventloop.zmqstream import ZMQStream

from jupyter_client.session import extract_header

from .zmqstream import ZMQStream


#-----------------------------------------------------------------------------
# Globals
Expand Down Expand Up @@ -63,24 +59,30 @@ def __init__(self, socket, pipe=False):
self.background_socket = BackgroundSocket(self)
self._master_pid = os.getpid()
self._pipe_flag = pipe
self.io_loop = IOLoop(make_current=False)
self.io_loop = None
self.loop_ready = threading.Event()
self.thread = threading.Thread(target=self._thread_main, name="IOPub")
self.thread.daemon = True
self.thread.pydev_do_not_trace = True
self.thread.is_pydev_daemon_thread = True
self.thread.name = "IOPub"
self.thread.start()
if pipe:
self._setup_pipe_in()
self._local = threading.local()
self._events = deque()
self._event_pipes = WeakSet()
self._setup_event_pipe()
self.thread = threading.Thread(target=self._thread_main, name="IOPub")
self.thread.daemon = True
self.thread.pydev_do_not_trace = True
self.thread.is_pydev_daemon_thread = True
self.thread.name = "IOPub"

def _thread_main(self):
"""The inner loop that's actually run in a thread"""
self.io_loop.make_current()
self.io_loop.start()
self.io_loop.close(all_fds=True)
self.io_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.io_loop)
self.loop_ready.set()
try:
self.io_loop.run_forever()
finally:
self.io_loop.close()

def _setup_event_pipe(self):
"""Create the PULL socket listening for events that should fire in this thread."""
Expand All @@ -91,7 +93,7 @@ def _setup_event_pipe(self):
_uuid = b2a_hex(os.urandom(16)).decode('ascii')
iface = self._event_interface = 'inproc://%s' % _uuid
pipe_in.bind(iface)
self._event_puller = ZMQStream(pipe_in, self.io_loop)
self._event_puller = ZMQStream(pipe_in, self)
self._event_puller.on_recv(self._handle_event)

@property
Expand Down Expand Up @@ -145,7 +147,7 @@ def _setup_pipe_in(self):
self._pipe_flag = False
pipe_in.close()
return
self._pipe_in = ZMQStream(pipe_in, self.io_loop)
self._pipe_in = ZMQStream(pipe_in, self)
self._pipe_in.on_recv(self._handle_pipe_msg)

def _handle_pipe_msg(self, msg):
Expand Down Expand Up @@ -187,7 +189,7 @@ def stop(self):
"""Stop the IOPub thread"""
if not self.thread.is_alive():
return
self.io_loop.add_callback(self.io_loop.stop)
self.io_loop.call_soon_threadsafe(self.io_loop.stop)
self.thread.join()
# close *all* event pipes, created in any thread
# event pipes can only be used from other threads while self.thread.is_alive()
Expand Down Expand Up @@ -447,7 +449,7 @@ def _schedule_flush(self):

# add_timeout has to be handed to the io thread via event pipe
def _schedule_in_thread():
self._io_loop.call_later(self.flush_interval, self._flush)
self._io_loop.call_soon_threadsafe(self._flush)
self.pub_thread.schedule(_schedule_in_thread)

def flush(self):
Expand Down
8 changes: 4 additions & 4 deletions ipykernel/ipkernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
from IPython.core import release
from IPython.utils.tokenutil import token_at_cursor, line_at_cursor
from traitlets import Instance, Type, Any, List, Bool, observe, observe_compat
from zmq.eventloop.zmqstream import ZMQStream

from .comm import CommManager
from .kernelbase import Kernel as KernelBase
from .zmqshell import ZMQInteractiveShell
from .eventloops import _use_appnope
from .compiler import XCachingCompiler
from .debugger import Debugger, _is_debugpy_available
from .zmqstream import ZMQStream

try:
from IPython.core.interactiveshell import _asyncio_runner
Expand Down Expand Up @@ -177,7 +177,7 @@ def start(self):
self.debugpy_stream.on_recv(self.dispatch_debugpy, copy=False)
super().start()
if self.debugpy_stream:
asyncio.run_coroutine_threadsafe(self.poll_stopped_queue(), self.control_thread.io_loop.asyncio_loop)
asyncio.run_coroutine_threadsafe(self.poll_stopped_queue(), self.control_thread.io_loop)

def set_parent(self, ident, parent, channel='shell'):
"""Overridden from parent to tell the display hook and output streams
Expand Down Expand Up @@ -282,8 +282,8 @@ def set_sigint_result():
if sigint_future.cancelled() or sigint_future.done():
return
sigint_future.set_result(1)
# use add_callback for thread safety
self.io_loop.add_callback(set_sigint_result)
# use call_soon_threadsage for thread safety
self.io_loop.call_soon_threadsafe(set_sigint_result)

# set the custom sigint hander during this context
save_sigint = signal.signal(signal.SIGINT, handle_sigint)
Expand Down
Loading

0 comments on commit a32a71c

Please sign in to comment.