Skip to content

Commit

Permalink
Backport 'handle exception when websocket server start room failed' #289
Browse files Browse the repository at this point in the history
  • Loading branch information
Jialin Zhang committed May 1, 2024
1 parent f284083 commit dc6bf82
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 9 deletions.
33 changes: 26 additions & 7 deletions jupyter_collaboration/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from jupyter_server.auth import authorized
from jupyter_server.base.handlers import APIHandler, JupyterHandler
from jupyter_server.utils import ensure_async
from jupyter_ydoc import ydocs as YDOCS
from pycrdt_websocket.websocket_server import YRoom
from pycrdt_websocket.ystore import BaseYStore
Expand All @@ -26,6 +27,7 @@
LogLevel,
MessageType,
decode_file_path,
room_id_from_encoded_path,
)
from .websocketserver import JupyterWebsocketServer

Expand Down Expand Up @@ -69,12 +71,13 @@ def create_task(self, aw):
task.add_done_callback(self._background_tasks.discard)

async def prepare(self):
await ensure_async(super().prepare())
if not self._websocket_server.started.is_set():
self.create_task(self._websocket_server.start())
await self._websocket_server.started.wait()

# Get room
self._room_id: str = self.request.path.split("/")[-1]
self._room_id: str = room_id_from_encoded_path(self.request.path)

async with self._room_lock(self._room_id):
if self._websocket_server.room_exists(self._room_id):
Expand Down Expand Up @@ -109,12 +112,26 @@ async def prepare(self):
# it is a transient document (e.g. awareness)
self.room = TransientRoom(self._room_id, self.log)

await self._websocket_server.start_room(self.room)
self._websocket_server.add_room(self._room_id, self.room)
try:
await self._websocket_server.start_room(self.room)
except Exception as e:
self.log.error("Room %s failed to start on websocket server", self._room_id)
# Clean room
await self.room.stop()
self.log.info("Room %s deleted", self._room_id)
self._emit(LogLevel.INFO, "clean", "Room deleted.")

res = super().prepare()
if res is not None:
return await res
# Clean the file loader in file loader mapping if there are not any rooms using it
_, _, file_id = decode_file_path(self._room_id)
file = self._file_loaders[file_id]
if file.number_of_subscriptions == 0 or (
file.number_of_subscriptions == 1 and self._room_id in file._subscriptions
):
self.log.info("Deleting file %s", file.path)
await self._file_loaders.remove(file_id)
self._emit(LogLevel.INFO, "clean", "file loader removed.")
raise e
self._websocket_server.add_room(self._room_id, self.room)

def initialize(
self,
Expand All @@ -133,6 +150,8 @@ def initialize(
self._document_save_delay = document_save_delay
self._websocket_server = ywebsocket_server
self._message_queue = asyncio.Queue()
self._room_id = ""
self.room = None

@property
def path(self):
Expand Down Expand Up @@ -226,7 +245,7 @@ async def send(self, message):
try:
self.write_message(message, binary=True)
except Exception as e:
self.log.debug("Failed to write message", exc_info=e)
self.log.error("Failed to write message", exc_info=e)

async def recv(self):
"""
Expand Down
5 changes: 4 additions & 1 deletion jupyter_collaboration/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@ async def clean(self) -> None:
if self._watcher is not None:
if not self._watcher.cancelled():
self._watcher.cancel()
await self._watcher
try:
await self._watcher
except asyncio.CancelledError:
self._log.info(f"file watcher for '{self.file_id}' is cancelled now")

def observe(self, id: str, callback: Callable[[], Coroutine[Any, Any, None]]) -> None:
"""
Expand Down
14 changes: 13 additions & 1 deletion jupyter_collaboration/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,10 @@ async def stop(self) -> None:
Cancels the save task and unsubscribes from the file.
"""
await super().stop()
try:
await super().stop()
except RuntimeError:
pass
# TODO: Should we cancel or wait ?
if self._saving_document:
self._saving_document.cancel()
Expand Down Expand Up @@ -299,3 +302,12 @@ async def _broadcast_updates(self):
await super()._broadcast_updates()
except asyncio.CancelledError:
pass

async def stop(self) -> None:
"""
Stop the room.
"""
try:
await super().stop()
except RuntimeError:
pass
5 changes: 5 additions & 0 deletions jupyter_collaboration/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,8 @@ def encode_file_path(format: str, file_type: str, file_id: str) -> str:
path (str): File path.
"""
return f"{format}:{file_type}:{file_id}"


def room_id_from_encoded_path(encoded_path: str) -> str:
"""Transforms the encoded path into a stable room identifier."""
return encoded_path.split("/")[-1]
84 changes: 84 additions & 0 deletions tests/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,87 @@ async def my_listener(logger: EventLogger, schema_id: str, data: dict) -> None:
assert collected_data[1]["action"] == "leave"
assert collected_data[1]["roomid"] == "text:file:" + fim.get_id("test.txt")
assert collected_data[1]["username"] is not None


async def test_room_handler_doc_client_should_cleanup_room_file(
rtc_create_file, rtc_connect_doc_client, jp_serverapp
):
path, _ = await rtc_create_file("test.txt", "test")

event = Event()

def _on_document_change(target: str, e: Any) -> None:
if target == "source":
event.set()

doc = YUnicode()
doc.observe(_on_document_change)

async with await rtc_connect_doc_client("text", "file", path) as ws, WebsocketProvider(
doc.ydoc, ws
):
await event.wait()
await sleep(0.1)

# kill websocketserver to mimic task group inactive failure
await jp_serverapp.web_app.settings["jupyter_collaboration"].ywebsocket_server.stop()

listener_was_called = False
collected_data = []

async def my_listener(logger: EventLogger, schema_id: str, data: dict) -> None:
nonlocal listener_was_called
collected_data.append(data)
listener_was_called = True

event_logger = jp_serverapp.event_logger
event_logger.add_listener(
schema_id="https://schema.jupyter.org/jupyter_collaboration/session/v1",
listener=my_listener,
)

path2, _ = await rtc_create_file("test2.txt", "test2")

try:
async with await rtc_connect_doc_client("text2", "file2", path2) as ws, WebsocketProvider(
doc.ydoc, ws
):
await event.wait()
await sleep(0.1)
except Exception:
pass

try:
async with await rtc_connect_doc_client("text2", "file2", path2) as ws, WebsocketProvider(
doc.ydoc, ws
):
await event.wait()
await sleep(0.1)
except Exception:
pass

fim = jp_serverapp.web_app.settings["file_id_manager"]

assert listener_was_called is True
assert len(collected_data) == 4
# no two collaboration events are emitted.
# [{'level': 'WARNING', 'msg': 'There is another collaborative session accessing the same file.\nThe synchronization bet...ou might lose some of your changes.', 'path': 'test2.txt', 'room': 'text2:file2:51b7e24f-f534-47fb-882f-5cc45ba867fe'}]
assert collected_data[0]["path"] == "test2.txt"
assert collected_data[0]["room"] == "text2:file2:" + fim.get_id("test2.txt")
assert collected_data[0]["action"] == "clean"
assert collected_data[0]["msg"] == "Room deleted."
assert collected_data[1]["path"] == "test2.txt"
assert collected_data[1]["room"] == "text2:file2:" + fim.get_id("test2.txt")
assert collected_data[1]["action"] == "clean"
assert collected_data[1]["msg"] == "file loader removed."
assert collected_data[2]["path"] == "test2.txt"
assert collected_data[2]["room"] == "text2:file2:" + fim.get_id("test2.txt")
assert collected_data[2]["action"] == "clean"
assert collected_data[2]["msg"] == "Room deleted."
assert collected_data[3]["path"] == "test2.txt"
assert collected_data[3]["room"] == "text2:file2:" + fim.get_id("test2.txt")
assert collected_data[3]["action"] == "clean"
assert collected_data[3]["msg"] == "file loader removed."

await jp_serverapp.web_app.settings["jupyter_collaboration"].stop_extension()
del jp_serverapp.web_app.settings["file_id_manager"]

0 comments on commit dc6bf82

Please sign in to comment.