From 59a011a25dfc0925fdec73fc08c0c0a4e7041e38 Mon Sep 17 00:00:00 2001 From: Zach Sailer Date: Mon, 29 Apr 2024 12:01:24 -0700 Subject: [PATCH 1/4] Log (instead of raise) exceptions when running as a server extension --- .../jupyter_server_ydoc/app.py | 5 +- .../jupyter_server_ydoc/handlers.py | 68 ++++++++++++++++--- .../jupyter_server_ydoc/rooms.py | 28 ++++++-- .../jupyter_server_ydoc/websocketserver.py | 21 +++++- projects/jupyter-server-ydoc/pyproject.toml | 2 +- 5 files changed, 103 insertions(+), 21 deletions(-) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py index 8ee23ee3..2b8b0301 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py @@ -22,7 +22,7 @@ encode_file_path, room_id_from_encoded_path, ) -from .websocketserver import JupyterWebsocketServer, RoomNotFound +from .websocketserver import JupyterWebsocketServer, RoomNotFound, exception_logger class YDocExtension(ExtensionApp): @@ -107,6 +107,9 @@ def initialize_handlers(self): rooms_ready=False, auto_clean_rooms=False, ystore_class=self.ystore_class, + # Log exceptions, because we don't want the websocket server + # to _ever_ crash permanently in a live jupyter_server. + exception_handler=exception_logger, log=self.log, ) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py index d74bd6df..3010904c 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py @@ -7,6 +7,7 @@ import json import time import uuid +from logging import Logger from typing import Any from jupyter_server.auth import authorized @@ -105,6 +106,26 @@ async def prepare(self): self._document_save_delay, ) + def exception_logger(exception: Exception, log: Logger) -> bool: + """A function that catches any exceptions raised in the websocket + server and logs them. + + The protects the y-room's task group from cancelling + anytime an exception is raised. + """ + room_id = "unknown" + if self.room.room_id: + room_id = self.room.room_id + log.error( + f"Document Room Exception, (room_id={room_id}: ", + exc_info=exception, + ) + return True + + # Logging exceptions, instead of raising them here to ensure + # that the y-rooms stay alive even after an exception is seen. + self.room.exception_handler = exception_logger + else: # TransientRoom # it is a transient document (e.g. awareness) @@ -203,9 +224,12 @@ async def open(self, room_id): self.log.error(f"File {file.path} not found.\n{e!r}", exc_info=e) self.close(1004, f"File {file.path} not found.") else: - self.log.error(f"Error initializing: {file.path}\n{e!r}", exc_info=e) + self.log.error( + f"Error initializing: {file.path}\n{e!r}", exc_info=e + ) self.close( - 1003, f"Error initializing: {file.path}. You need to close the document." + 1003, + f"Error initializing: {file.path}. You need to close the document.", ) # Clean up the room and delete the file loader @@ -272,16 +296,24 @@ async def on_message(self, message): user = self.current_user data = json.dumps( - {"sender": user.username, "timestamp": time.time(), "content": json.loads(msg)} + { + "sender": user.username, + "timestamp": time.time(), + "content": json.loads(msg), + } ).encode("utf8") for client in self.room.clients: if client != self: task = asyncio.create_task( - client.send(bytes([MessageType.CHAT]) + write_var_uint(len(data)) + data) + client.send( + bytes([MessageType.CHAT]) + write_var_uint(len(data)) + data + ) ) self._websocket_server.background_tasks.add(task) - task.add_done_callback(self._websocket_server.background_tasks.discard) + task.add_done_callback( + self._websocket_server.background_tasks.discard + ) self._message_queue.put_nowait(message) self._websocket_server.ypatch_nb += 1 @@ -300,7 +332,9 @@ def on_close(self) -> None: if self._room_id != "JupyterLab:globalAwareness": self._emit_awareness_event(self.current_user.username, "leave") - def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None: + def _emit( + self, level: LogLevel, action: str | None = None, msg: str | None = None + ) -> None: _, _, file_id = decode_file_path(self._room_id) path = self._file_id_manager.get_path(file_id) @@ -312,12 +346,16 @@ def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = No self.event_logger.emit(schema_id=JUPYTER_COLLABORATION_EVENTS_URI, data=data) - def _emit_awareness_event(self, username: str, action: str, msg: str | None = None) -> None: + def _emit_awareness_event( + self, username: str, action: str, msg: str | None = None + ) -> None: data = {"roomid": self._room_id, "username": username, "action": action} if msg: data["msg"] = msg - self.event_logger.emit(schema_id=JUPYTER_COLLABORATION_AWARENESS_EVENTS_URI, data=data) + self.event_logger.emit( + schema_id=JUPYTER_COLLABORATION_AWARENESS_EVENTS_URI, data=data + ) async def _clean_room(self) -> None: """ @@ -387,7 +425,12 @@ async def put(self, path): # index already exists self.log.info("Request for Y document '%s' with room ID: %s", path, idx) data = json.dumps( - {"format": format, "type": content_type, "fileId": idx, "sessionId": SERVER_SESSION} + { + "format": format, + "type": content_type, + "fileId": idx, + "sessionId": SERVER_SESSION, + } ) self.set_status(200) return self.finish(data) @@ -401,7 +444,12 @@ async def put(self, path): # index successfully created self.log.info("Request for Y document '%s' with room ID: %s", path, idx) data = json.dumps( - {"format": format, "type": content_type, "fileId": idx, "sessionId": SERVER_SESSION} + { + "format": format, + "type": content_type, + "fileId": idx, + "sessionId": SERVER_SESSION, + } ) self.set_status(201) return self.finish(data) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py index 691943c5..55dccad7 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py @@ -123,7 +123,9 @@ async def initialize(self) -> None: if self._document.source != model["content"]: # TODO: Delete document from the store. self._emit( - LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore." + LogLevel.INFO, + "initialize", + "The file is out-of-sync with the ystore.", ) self.log.info( "Content in file %s is out-of-sync with the ystore %s", @@ -135,7 +137,9 @@ async def initialize(self) -> None: if read_from_source: self._emit(LogLevel.INFO, "load", "Content loaded from disk.") self.log.info( - "Content in room %s loaded from file %s", self._room_id, self._file.path + "Content in room %s loaded from file %s", + self._room_id, + self._file.path, ) self._document.source = model["content"] @@ -146,7 +150,9 @@ async def initialize(self) -> None: self.ready = True self._emit(LogLevel.INFO, "initialize", "Room initialized") - def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None: + def _emit( + self, level: LogLevel, action: str | None = None, msg: str | None = None + ) -> None: data = {"level": level.value, "room": self._room_id, "path": self._file.path} if action: data["action"] = action @@ -180,8 +186,12 @@ async def _on_outofband_change(self) -> None: """ Called when the file got out-of-band changes. """ - self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id) - self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes. Overwriting the room.") + self.log.info( + "Out-of-band changes. Overwriting the content in room %s", self._room_id + ) + self._emit( + LogLevel.INFO, "overwrite", "Out-of-band changes. Overwriting the room." + ) try: model = await self._file.load_content(self._file_format, self._file_type) @@ -257,9 +267,13 @@ async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> No return except OutOfBandChanges: - self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id) + self.log.info( + "Out-of-band changes. Overwriting the content in room %s", self._room_id + ) try: - model = await self._file.load_content(self._file_format, self._file_type) + model = await self._file.load_content( + self._file_format, self._file_type + ) except Exception as e: msg = f"Error loading content from file: {self._file.path}\n{e!r}" self.log.error(msg, exc_info=e) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py index 365da078..58c4fec4 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py @@ -5,7 +5,7 @@ import asyncio from logging import Logger -from typing import Any +from typing import Any, Callable from pycrdt_websocket.websocket_server import WebsocketServer, YRoom from pycrdt_websocket.ystore import BaseYStore @@ -16,6 +16,17 @@ class RoomNotFound(LookupError): pass +def exception_logger(exception: Exception, log: Logger) -> bool: + """A function that catches any exceptions raised in the websocket + server and logs them. + + The protects the websocket server's task group from cancelling + anytime an exception is raised. + """ + log.error("Jupyter Websocket Server: ", exc_info=exception) + return True + + class JupyterWebsocketServer(WebsocketServer): """Ypy websocket server. @@ -30,9 +41,15 @@ def __init__( ystore_class: BaseYStore, rooms_ready: bool = True, auto_clean_rooms: bool = True, + exception_handler: Callable[[Exception, Logger], bool] | None = None, log: Logger | None = None, ): - super().__init__(rooms_ready, auto_clean_rooms, log) + super().__init__( + rooms_ready=rooms_ready, + auto_clean_rooms=auto_clean_rooms, + exception_handler=exception_handler, + log=log, + ) self.ystore_class = ystore_class self.ypatch_nb = 0 self.connected_users: dict[Any, Any] = {} diff --git a/projects/jupyter-server-ydoc/pyproject.toml b/projects/jupyter-server-ydoc/pyproject.toml index 9ab08633..d96af6cf 100644 --- a/projects/jupyter-server-ydoc/pyproject.toml +++ b/projects/jupyter-server-ydoc/pyproject.toml @@ -30,7 +30,7 @@ authors = [ dependencies = [ "jupyter_server>=2.4.0,<3.0.0", "jupyter_ydoc>=2.0.0,<3.0.0", - "pycrdt-websocket>=0.13.0,<0.14.0", + "pycrdt-websocket>=0.13.1,<0.14.0", "jupyter_events>=0.10.0", "jupyter_server_fileid>=0.7.0,<1", "jsonschema>=4.18.0" From 8d52c58e7fa44d5664c4427adad5a3ef3f1a5363 Mon Sep 17 00:00:00 2001 From: Zach Sailer Date: Mon, 29 Apr 2024 14:35:13 -0700 Subject: [PATCH 2/4] pre-commit errors --- .../jupyter_server_ydoc/handlers.py | 24 +++++-------------- .../jupyter_server_ydoc/rooms.py | 20 ++++------------ 2 files changed, 11 insertions(+), 33 deletions(-) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py index 3010904c..e06a8204 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py @@ -224,9 +224,7 @@ async def open(self, room_id): self.log.error(f"File {file.path} not found.\n{e!r}", exc_info=e) self.close(1004, f"File {file.path} not found.") else: - self.log.error( - f"Error initializing: {file.path}\n{e!r}", exc_info=e - ) + self.log.error(f"Error initializing: {file.path}\n{e!r}", exc_info=e) self.close( 1003, f"Error initializing: {file.path}. You need to close the document.", @@ -306,14 +304,10 @@ async def on_message(self, message): for client in self.room.clients: if client != self: task = asyncio.create_task( - client.send( - bytes([MessageType.CHAT]) + write_var_uint(len(data)) + data - ) + client.send(bytes([MessageType.CHAT]) + write_var_uint(len(data)) + data) ) self._websocket_server.background_tasks.add(task) - task.add_done_callback( - self._websocket_server.background_tasks.discard - ) + task.add_done_callback(self._websocket_server.background_tasks.discard) self._message_queue.put_nowait(message) self._websocket_server.ypatch_nb += 1 @@ -332,9 +326,7 @@ def on_close(self) -> None: if self._room_id != "JupyterLab:globalAwareness": self._emit_awareness_event(self.current_user.username, "leave") - def _emit( - self, level: LogLevel, action: str | None = None, msg: str | None = None - ) -> None: + def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None: _, _, file_id = decode_file_path(self._room_id) path = self._file_id_manager.get_path(file_id) @@ -346,16 +338,12 @@ def _emit( self.event_logger.emit(schema_id=JUPYTER_COLLABORATION_EVENTS_URI, data=data) - def _emit_awareness_event( - self, username: str, action: str, msg: str | None = None - ) -> None: + def _emit_awareness_event(self, username: str, action: str, msg: str | None = None) -> None: data = {"roomid": self._room_id, "username": username, "action": action} if msg: data["msg"] = msg - self.event_logger.emit( - schema_id=JUPYTER_COLLABORATION_AWARENESS_EVENTS_URI, data=data - ) + self.event_logger.emit(schema_id=JUPYTER_COLLABORATION_AWARENESS_EVENTS_URI, data=data) async def _clean_room(self) -> None: """ diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py index 55dccad7..7b5e5ca1 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py @@ -150,9 +150,7 @@ async def initialize(self) -> None: self.ready = True self._emit(LogLevel.INFO, "initialize", "Room initialized") - def _emit( - self, level: LogLevel, action: str | None = None, msg: str | None = None - ) -> None: + def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None: data = {"level": level.value, "room": self._room_id, "path": self._file.path} if action: data["action"] = action @@ -186,12 +184,8 @@ async def _on_outofband_change(self) -> None: """ Called when the file got out-of-band changes. """ - self.log.info( - "Out-of-band changes. Overwriting the content in room %s", self._room_id - ) - self._emit( - LogLevel.INFO, "overwrite", "Out-of-band changes. Overwriting the room." - ) + self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id) + self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes. Overwriting the room.") try: model = await self._file.load_content(self._file_format, self._file_type) @@ -267,13 +261,9 @@ async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> No return except OutOfBandChanges: - self.log.info( - "Out-of-band changes. Overwriting the content in room %s", self._room_id - ) + self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id) try: - model = await self._file.load_content( - self._file_format, self._file_type - ) + model = await self._file.load_content(self._file_format, self._file_type) except Exception as e: msg = f"Error loading content from file: {self._file.path}\n{e!r}" self.log.error(msg, exc_info=e) From 585e103cb330a09b124afc61e896ab28a0c382a9 Mon Sep 17 00:00:00 2001 From: Zach Sailer Date: Mon, 29 Apr 2024 14:47:12 -0700 Subject: [PATCH 3/4] Allow transient rooms to log exceptions --- .../jupyter_server_ydoc/handlers.py | 44 +++++++++---------- .../jupyter_server_ydoc/rooms.py | 14 ++++-- 2 files changed, 32 insertions(+), 26 deletions(-) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py index e06a8204..1f976eca 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py @@ -82,6 +82,21 @@ async def prepare(self): if self._websocket_server.room_exists(self._room_id): self.room: YRoom = await self._websocket_server.get_room(self._room_id) else: + # Logging exceptions, instead of raising them here to ensure + # that the y-rooms stay alive even after an exception is seen. + def exception_logger(exception: Exception, log: Logger) -> bool: + """A function that catches any exceptions raised in the websocket + server and logs them. + + The protects the y-room's task group from cancelling + anytime an exception is raised. + """ + log.error( + f"Document Room Exception, (room_id={self._room_id or 'unknown'}): ", + exc_info=exception, + ) + return True + if self._room_id.count(":") >= 2: # DocumentRoom file_format, file_type, file_id = decode_file_path(self._room_id) @@ -103,33 +118,18 @@ async def prepare(self): self.event_logger, ystore, self.log, - self._document_save_delay, + exception_handler=exception_logger, + save_delay=self._document_save_delay, ) - def exception_logger(exception: Exception, log: Logger) -> bool: - """A function that catches any exceptions raised in the websocket - server and logs them. - - The protects the y-room's task group from cancelling - anytime an exception is raised. - """ - room_id = "unknown" - if self.room.room_id: - room_id = self.room.room_id - log.error( - f"Document Room Exception, (room_id={room_id}: ", - exc_info=exception, - ) - return True - - # Logging exceptions, instead of raising them here to ensure - # that the y-rooms stay alive even after an exception is seen. - self.room.exception_handler = exception_logger - else: # TransientRoom # it is a transient document (e.g. awareness) - self.room = TransientRoom(self._room_id, self.log) + self.room = TransientRoom( + self._room_id, + log=self.log, + exception_handler=exception_logger, + ) await self._websocket_server.start_room(self.room) self._websocket_server.add_room(self._room_id, self.room) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py index 7b5e5ca1..00787dbe 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py @@ -5,7 +5,7 @@ import asyncio from logging import Logger -from typing import Any +from typing import Any, Callable from jupyter_events import EventLogger from jupyter_ydoc import ydocs as YDOCS @@ -31,8 +31,9 @@ def __init__( ystore: BaseYStore | None, log: Logger | None, save_delay: float | None = None, + exception_handler: Callable[[Exception, Logger], bool] | None = None, ): - super().__init__(ready=False, ystore=ystore, log=log) + super().__init__(ready=False, ystore=ystore, exception_handler=exception_handler, log=log) self._room_id: str = room_id self._file_format: str = file_format @@ -285,8 +286,13 @@ async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> No class TransientRoom(YRoom): """A Y room for sharing state (e.g. awareness).""" - def __init__(self, room_id: str, log: Logger | None): - super().__init__(log=log) + def __init__( + self, + room_id: str, + log: Logger | None = None, + exception_handler: Callable[[Exception, Logger], bool] | None = None, + ): + super().__init__(log=log, exception_handler=exception_handler) self._room_id = room_id From 93c3b7de483b486e1d9fecd9e43a9974d27b1eaf Mon Sep 17 00:00:00 2001 From: Zachary Sailer Date: Tue, 30 Apr 2024 16:31:28 -0700 Subject: [PATCH 4/4] Update projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py Co-authored-by: David Brochart --- .../jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py index 58c4fec4..3137e1f8 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/websocketserver.py @@ -20,7 +20,7 @@ def exception_logger(exception: Exception, log: Logger) -> bool: """A function that catches any exceptions raised in the websocket server and logs them. - The protects the websocket server's task group from cancelling + This protects the websocket server's task group from cancelling anytime an exception is raised. """ log.error("Jupyter Websocket Server: ", exc_info=exception)