Skip to content

Commit

Permalink
Drop Tornado
Browse files Browse the repository at this point in the history
  • Loading branch information
davidbrochart committed Jun 20, 2022
1 parent b1da6d0 commit dfa991e
Show file tree
Hide file tree
Showing 16 changed files with 121 additions and 230 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ jobs:
- name: Install the Python dependencies
run: |
pip install .[test] codecov
pip install https://github.com/davidbrochart/jupyter_client/archive/async_stream.zip
- name: Install matplotlib
if: ${{ !startsWith(matrix.os, 'macos') && !startsWith(matrix.python-version, 'pypy') }}
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/downstream.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,5 @@ jobs:
git clone https://github.com/jupyter/jupyter_kernel_test.git
cd jupyter_kernel_test
pip install -e ".[test]"
pip install https://github.com/davidbrochart/jupyter_client/archive/async_stream.zip
python test_ipykernel.py
36 changes: 0 additions & 36 deletions examples/embedding/inprocess_qtconsole.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import os
import sys

import tornado
from IPython.lib import guisupport
from qtconsole.inprocess import QtInProcessKernelManager
from qtconsole.rich_ipython_widget import RichIPythonWidget
Expand All @@ -11,44 +9,10 @@ def print_process_id():
print("Process ID is:", os.getpid())


def init_asyncio_patch():
"""set default asyncio policy to be compatible with tornado
Tornado 6 (at least) is not compatible with the default
asyncio implementation on Windows
Pick the older SelectorEventLoopPolicy on Windows
if the known-incompatible default policy is in use.
do this as early as possible to make it a low priority and overrideable
ref: https://github.com/tornadoweb/tornado/issues/2608
FIXME: if/when tornado supports the defaults in asyncio,
remove and bump tornado requirement for py38
"""
if (
sys.platform.startswith("win")
and sys.version_info >= (3, 8)
and tornado.version_info < (6, 1)
):
import asyncio

try:
from asyncio import (
WindowsProactorEventLoopPolicy,
WindowsSelectorEventLoopPolicy,
)
except ImportError:
pass
# not affected
else:
if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy:
# WindowsProactorEventLoopPolicy is not compatible with tornado 6
# fallback to the pre-3.8 default of Selector
asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())


def main():
# Print the ID of the main process
print_process_id()

init_asyncio_patch()
app = guisupport.get_app_qt4()

# Create an in-process kernel
Expand Down
36 changes: 0 additions & 36 deletions examples/embedding/inprocess_terminal.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import os
import sys

import tornado
from jupyter_console.ptshell import ZMQTerminalInteractiveShell

from ipykernel.inprocess.manager import InProcessKernelManager
Expand All @@ -11,46 +9,12 @@ def print_process_id():
print("Process ID is:", os.getpid())


def init_asyncio_patch():
"""set default asyncio policy to be compatible with tornado
Tornado 6 (at least) is not compatible with the default
asyncio implementation on Windows
Pick the older SelectorEventLoopPolicy on Windows
if the known-incompatible default policy is in use.
do this as early as possible to make it a low priority and overrideable
ref: https://github.com/tornadoweb/tornado/issues/2608
FIXME: if/when tornado supports the defaults in asyncio,
remove and bump tornado requirement for py38
"""
if (
sys.platform.startswith("win")
and sys.version_info >= (3, 8)
and tornado.version_info < (6, 1)
):
import asyncio

try:
from asyncio import (
WindowsProactorEventLoopPolicy,
WindowsSelectorEventLoopPolicy,
)
except ImportError:
pass
# not affected
else:
if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy:
# WindowsProactorEventLoopPolicy is not compatible with tornado 6
# fallback to the pre-3.8 default of Selector
asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())


def main():
print_process_id()

# Create an in-process kernel
# >>> print_process_id()
# will print the same process ID as the main process
init_asyncio_patch()
kernel_manager = InProcessKernelManager()
kernel_manager.start_kernel()
kernel = kernel_manager.kernel
Expand Down
10 changes: 5 additions & 5 deletions ipykernel/control.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import asyncio
from threading import Thread

from tornado.ioloop import IOLoop


class ControlThread(Thread):
def __init__(self, **kwargs):
Thread.__init__(self, name="Control", **kwargs)
self.io_loop = IOLoop(make_current=False)
self.pydev_do_not_trace = True
self.is_pydev_daemon_thread = True
self.io_loop = asyncio.new_event_loop()

def run(self):
self.name = "Control"
asyncio.set_event_loop(self.io_loop)
try:
self.io_loop.start()
self.io_loop.run_forever()
finally:
self.io_loop.close()

Expand All @@ -22,4 +22,4 @@ def stop(self):
This method is threadsafe.
"""
self.io_loop.add_callback(self.io_loop.stop)
self.io_loop.call_soon_threadsafe(self.io_loop.stop)
21 changes: 12 additions & 9 deletions ipykernel/debugger.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import os
import re
import sys
Expand All @@ -6,8 +7,6 @@
import zmq
from IPython.core.getipython import get_ipython
from IPython.core.inputtransformer2 import leading_empty_lines
from tornado.locks import Event
from tornado.queues import Queue
from zmq.utils import jsonapi

try:
Expand Down Expand Up @@ -86,11 +85,13 @@ class DebugpyMessageQueue:
SEPARATOR = "\r\n\r\n"
SEPARATOR_LENGTH = 4

message_queue: asyncio.Queue[t.Dict[str, t.Any]]

def __init__(self, event_callback, log):
self.tcp_buffer = ""
self._reset_tcp_pos()
self.event_callback = event_callback
self.message_queue: Queue[t.Any] = Queue()
self.message_queue = asyncio.Queue()
self.log = log

def _reset_tcp_pos(self):
Expand Down Expand Up @@ -171,7 +172,7 @@ def __init__(self, log, debugpy_stream, event_callback):
self.debugpy_port = -1
self.routing_id = None
self.wait_for_attach = True
self.init_event = Event()
self.init_event = asyncio.Event()
self.init_event_seq = -1

def _get_endpoint(self):
Expand Down Expand Up @@ -245,7 +246,7 @@ def connect_tcp_socket(self):
def disconnect_tcp_socket(self):
self.debugpy_stream.socket.disconnect(self._get_endpoint())
self.routing_id = None
self.init_event = Event()
self.init_event = asyncio.Event()
self.init_event_seq = -1
self.wait_for_attach = True

Expand Down Expand Up @@ -278,6 +279,8 @@ class Debugger:
"configurationDone",
]

stopped_queue: asyncio.Queue[t.Dict[str, t.Any]]

# Requests that can be handled even if the debugger is not running
static_debug_msg_types = ["debugInfo", "inspectVariables", "richInspectVariables", "modules"]

Expand All @@ -291,7 +294,7 @@ def __init__(
self.is_started = False
self.event_callback = event_callback
self.just_my_code = just_my_code
self.stopped_queue: Queue[t.Any] = Queue()
self.stopped_queue = asyncio.Queue()

self.started_debug_handlers = {}
for msg_type in Debugger.started_debug_msg_types:
Expand Down Expand Up @@ -367,7 +370,7 @@ async def handle_stopped_event(self):
def tcp_client(self):
return self.debugpy_client

def start(self):
async def start(self):
if not self.debugpy_initialized:
tmp_dir = get_tmp_directory()
if not os.path.exists(tmp_dir):
Expand All @@ -384,7 +387,7 @@ def start(self):
(self.shell_socket.getsockopt(ROUTING_ID)),
)

ident, msg = self.session.recv(self.shell_socket, mode=0)
ident, msg = await self.session.async_recv(self.shell_socket, mode=0)
self.debugpy_initialized = msg["content"]["status"] == "ok"

# Don't remove leading empty lines when debugging so the breakpoints are correctly positioned
Expand Down Expand Up @@ -631,7 +634,7 @@ async def process_request(self, message):
if self.is_started:
self.log.info("The debugger has already started")
else:
self.is_started = self.start()
self.is_started = await self.start()
if self.is_started:
self.log.info("The debugger has started")
else:
Expand Down
4 changes: 2 additions & 2 deletions ipykernel/eventloops.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def on_timer(self, event):
self.func()

# We need a custom wx.App to create our Frame subclass that has the
# wx.Timer to defer back to the tornado event loop.
# wx.Timer to defer back to the event loop.
class IPWxApp(wx.App):
def OnInit(self):
self.frame = TimerFrame(wake)
Expand Down Expand Up @@ -378,7 +378,7 @@ def loop_asyncio(kernel):
import asyncio

loop = asyncio.get_event_loop()
# loop is already running (e.g. tornado 5), nothing left to do
# loop is already running, nothing left to do
if loop.is_running():
return

Expand Down
40 changes: 0 additions & 40 deletions ipykernel/inprocess/tests/test_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from io import StringIO

import pytest
import tornado
from IPython.utils.io import capture_output
from jupyter_client.session import Session

Expand All @@ -20,44 +19,6 @@
orig_msg = Session.msg


def _init_asyncio_patch():
"""set default asyncio policy to be compatible with tornado
Tornado 6 (at least) is not compatible with the default
asyncio implementation on Windows
Pick the older SelectorEventLoopPolicy on Windows
if the known-incompatible default policy is in use.
do this as early as possible to make it a low priority and overrideable
ref: https://github.com/tornadoweb/tornado/issues/2608
FIXME: if/when tornado supports the defaults in asyncio,
remove and bump tornado requirement for py38
"""
if (
sys.platform.startswith("win")
and sys.version_info >= (3, 8)
and tornado.version_info < (6, 1)
):
import asyncio

try:
from asyncio import (
WindowsProactorEventLoopPolicy,
WindowsSelectorEventLoopPolicy,
)
except ImportError:
pass
# not affected
else:
if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy:
# WindowsProactorEventLoopPolicy is not compatible with tornado 6
# fallback to the pre-3.8 default of Selector
asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())


def _inject_cell_id(_self, *args, **kwargs):
"""
This patch jupyter_client.session:Session.msg to add a cell_id to the return message metadata
Expand All @@ -80,7 +41,6 @@ def patch_cell_id():

class InProcessKernelTestCase(unittest.TestCase):
def setUp(self):
_init_asyncio_patch()
self.km = InProcessKernelManager()
self.km.start_kernel()
self.kc = self.km.client()
Expand Down
25 changes: 17 additions & 8 deletions ipykernel/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.

import asyncio
import atexit
import io
import os
Expand All @@ -18,8 +19,8 @@

import zmq
from jupyter_client.session import extract_header
from tornado.ioloop import IOLoop
from zmq.eventloop.zmqstream import ZMQStream

from .zmqstream import ZMQStream

# -----------------------------------------------------------------------------
# Globals
Expand Down Expand Up @@ -57,7 +58,7 @@ def __init__(self, socket, pipe=False):
self.background_socket = BackgroundSocket(self)
self._master_pid = os.getpid()
self._pipe_flag = pipe
self.io_loop = IOLoop(make_current=False)
self.io_loop = asyncio.new_event_loop()
if pipe:
self._setup_pipe_in()
self._local = threading.local()
Expand All @@ -72,8 +73,11 @@ def __init__(self, socket, pipe=False):

def _thread_main(self):
"""The inner loop that's actually run in a thread"""
self.io_loop.start()
self.io_loop.close(all_fds=True)
asyncio.set_event_loop(self.io_loop)
try:
self.io_loop.run_forever()
finally:
self.io_loop.close()

def _setup_event_pipe(self):
"""Create the PULL socket listening for events that should fire in this thread."""
Expand Down Expand Up @@ -181,13 +185,14 @@ def stop(self):
"""Stop the IOPub thread"""
if not self.thread.is_alive():
return
self.io_loop.add_callback(self.io_loop.stop)
self.thread.join()
self.io_loop.call_soon_threadsafe(self.io_loop.stop)
# self.thread.join()
# close *all* event pipes, created in any thread
# event pipes can only be used from other threads while self.thread.is_alive()
# so after thread.join, this should be safe
for event_pipe in self._event_pipes:
event_pipe.close()
self._event_puller.socket.close()

def close(self):
if self.closed:
Expand Down Expand Up @@ -456,7 +461,11 @@ def _schedule_flush(self):

# add_timeout has to be handed to the io thread via event pipe
def _schedule_in_thread():
self._io_loop.call_later(self.flush_interval, self._flush)
# FIXME: original code was:
# self._io_loop.call_later(self.flush_interval, self._flush)
self._io_loop.call_soon_threadsafe(
self._io_loop.call_later, self.flush_interval, self._flush
)

self.pub_thread.schedule(_schedule_in_thread)

Expand Down
Loading

0 comments on commit dfa991e

Please sign in to comment.