From 746536c59c32c8fb7998e06d066a6186709365a1 Mon Sep 17 00:00:00 2001 From: Nicolas Brichet <32258950+brichet@users.noreply.github.com> Date: Wed, 17 Apr 2024 09:08:11 +0200 Subject: [PATCH] rename the docprovider destination name to avoid conficts (#285) --- .../jupyter_server_ydoc/handlers.py | 185 ++++++++++++------ .../jupyter_server_ydoc/loaders.py | 34 +++- .../jupyter_server_ydoc/rooms.py | 42 +++- 3 files changed, 183 insertions(+), 78 deletions(-) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py index 59849058..634fe218 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py @@ -7,7 +7,7 @@ import json import time import uuid -from typing import Any +from typing import Any, Awaitable from jupyter_server.auth import authorized from jupyter_server.base.handlers import APIHandler, JupyterHandler @@ -70,52 +70,84 @@ def create_task(self, aw): task.add_done_callback(self._background_tasks.discard) async def prepare(self): - if not self._websocket_server.started.is_set(): - self.create_task(self._websocket_server.start()) - await self._websocket_server.started.wait() - - # Get room - self._room_id: str = room_id_from_encoded_path(self.request.path) - - async with self._room_lock(self._room_id): - if self._websocket_server.room_exists(self._room_id): - self.room: YRoom = await self._websocket_server.get_room(self._room_id) - else: - if self._room_id.count(":") >= 2: - # DocumentRoom - file_format, file_type, file_id = decode_file_path(self._room_id) - if file_id in self._file_loaders: - self._emit( - LogLevel.WARNING, - None, - "There is another collaborative session accessing the same file.\nThe synchronization between rooms is not supported and you might lose some of your changes.", - ) - - file = self._file_loaders[file_id] - updates_file_path = f".{file_type}:{file_id}.y" - ystore = self._ystore_class(path=updates_file_path, log=self.log) - self.room = DocumentRoom( - self._room_id, - file_format, - file_type, - file, - self.event_logger, - ystore, - self.log, - self._document_save_delay, + res = super().prepare() + if isinstance(res, Awaitable): + await res + if not self._websocket_server.started.is_set(): + self.create_task(self._websocket_server.start()) + await self._websocket_server.started.wait() + + # Get room + self._room_id: str = self.request.path.split("/")[-1] + + async with self._room_lock(self._room_id): + if self._websocket_server.room_exists(self._room_id): + self.room: YRoom = await self._websocket_server.get_room( + self._room_id ) - + self.log.info("Get an room from websocket server") else: - # TransientRoom - # it is a transient document (e.g. awareness) - self.room = TransientRoom(self._room_id, self.log) - - await self._websocket_server.start_room(self.room) - self._websocket_server.add_room(self._room_id, self.room) + if self._room_id.count(":") >= 2: + # DocumentRoom + file_format, file_type, file_id = decode_file_path( + self._room_id + ) + if file_id in self._file_loaders: + self.log.info( + "There is another collaborative session accessing the same file" + ) + self._emit( + LogLevel.WARNING, + None, + "There is another collaborative session accessing the same file.\nThe synchronization between rooms is not supported and you might lose some of your changes.", + ) + + file = self._file_loaders[file_id] + updates_file_path = f".{file_type}:{file_id}.y" + ystore = self._ystore_class( + path=updates_file_path, log=self.log + ) + self.room = DocumentRoom( + self._room_id, + file_format, + file_type, + file, + self.event_logger, + ystore, + self.log, + self._document_save_delay, + ) - res = super().prepare() - if res is not None: - return await res + else: + # TransientRoom + # it is a transient document (e.g. awareness) + self.room = TransientRoom(self._room_id, self.log) + + self.log.info("About to start a room") + try: + await self._websocket_server.start_room(self.room) + except Exception as e: + self.log.error( + "Room %s failed to start on websocket server", self._room_id + ) + # Clean room + self.room.stop() + self.log.info("Room %s deleted", self._room_id) + self._emit(LogLevel.INFO, "clean", "Room deleted.") + + # if websocket server failed, then room and file will be garbage collected and file + # Clean the file loader in file loader mapping if there are not rooms using it + _, _, file_id = decode_file_path(self._room_id) + file = self._file_loaders[file_id] + if file.number_of_subscriptions == 0 or ( + file.number_of_subscriptions == 1 + and self._room_id in file._subscriptions + ): + self.log.info("Deleting file %s", file.path) + await self._file_loaders.remove(file_id) + self._emit(LogLevel.INFO, "clean", "file loader removed.") + raise e + self._websocket_server.add_room(self._room_id, self.room) def initialize( self, @@ -203,9 +235,12 @@ async def open(self, room_id): self.log.error(f"File {file.path} not found.\n{e!r}", exc_info=e) self.close(1004, f"File {file.path} not found.") else: - self.log.error(f"Error initializing: {file.path}\n{e!r}", exc_info=e) + self.log.error( + f"Error initializing: {file.path}\n{e!r}", exc_info=e + ) self.close( - 1003, f"Error initializing: {file.path}. You need to close the document." + 1003, + f"Error initializing: {file.path}. You need to close the document.", ) # Clean up the room and delete the file loader @@ -272,16 +307,24 @@ async def on_message(self, message): user = self.current_user data = json.dumps( - {"sender": user.username, "timestamp": time.time(), "content": json.loads(msg)} + { + "sender": user.username, + "timestamp": time.time(), + "content": json.loads(msg), + } ).encode("utf8") for client in self.room.clients: if client != self: task = asyncio.create_task( - client.send(bytes([MessageType.CHAT]) + write_var_uint(len(data)) + data) + client.send( + bytes([MessageType.CHAT]) + write_var_uint(len(data)) + data + ) ) self._websocket_server.background_tasks.add(task) - task.add_done_callback(self._websocket_server.background_tasks.discard) + task.add_done_callback( + self._websocket_server.background_tasks.discard + ) self._message_queue.put_nowait(message) self._websocket_server.ypatch_nb += 1 @@ -292,15 +335,19 @@ def on_close(self) -> None: """ # stop serving this client self._message_queue.put_nowait(b"") - if isinstance(self.room, DocumentRoom) and self.room.clients == [self]: - # no client in this room after we disconnect - # keep the document for a while in case someone reconnects - self.log.info("Cleaning room: %s", self._room_id) - self.room.cleaner = asyncio.create_task(self._clean_room()) - if self._room_id != "JupyterLab:globalAwareness": - self._emit_awareness_event(self.current_user.username, "leave") - - def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None: + if hasattr(self, "room"): + if isinstance(self.room, DocumentRoom) and self.room.clients == [self]: + # no client in this room after we disconnect + # keep the document for a while in case someone reconnects + self.log.info("Cleaning room: %s", self._room_id) + self.room.cleaner = asyncio.create_task(self._clean_room()) + if hasattr(self, "_room_id"): + if self._room_id != "JupyterLab:globalAwareness": + self._emit_awareness_event(self.current_user.username, "leave") + + def _emit( + self, level: LogLevel, action: str | None = None, msg: str | None = None + ) -> None: _, _, file_id = decode_file_path(self._room_id) path = self._file_id_manager.get_path(file_id) @@ -312,12 +359,16 @@ def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = No self.event_logger.emit(schema_id=JUPYTER_COLLABORATION_EVENTS_URI, data=data) - def _emit_awareness_event(self, username: str, action: str, msg: str | None = None) -> None: + def _emit_awareness_event( + self, username: str, action: str, msg: str | None = None + ) -> None: data = {"roomid": self._room_id, "username": username, "action": action} if msg: data["msg"] = msg - self.event_logger.emit(schema_id=JUPYTER_COLLABORATION_AWARENESS_EVENTS_URI, data=data) + self.event_logger.emit( + schema_id=JUPYTER_COLLABORATION_AWARENESS_EVENTS_URI, data=data + ) async def _clean_room(self) -> None: """ @@ -387,7 +438,12 @@ async def put(self, path): # index already exists self.log.info("Request for Y document '%s' with room ID: %s", path, idx) data = json.dumps( - {"format": format, "type": content_type, "fileId": idx, "sessionId": SERVER_SESSION} + { + "format": format, + "type": content_type, + "fileId": idx, + "sessionId": SERVER_SESSION, + } ) self.set_status(200) return self.finish(data) @@ -401,7 +457,12 @@ async def put(self, path): # index successfully created self.log.info("Request for Y document '%s' with room ID: %s", path, idx) data = json.dumps( - {"format": format, "type": content_type, "fileId": idx, "sessionId": SERVER_SESSION} + { + "format": format, + "type": content_type, + "fileId": idx, + "sessionId": SERVER_SESSION, + } ) self.set_status(201) return self.finish(data) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py index 62cc0a0a..967fd253 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py @@ -40,7 +40,9 @@ def __init__( self._log = log or getLogger(__name__) self._subscriptions: dict[str, Callable[[], Coroutine[Any, Any, None]]] = {} - self._watcher = asyncio.create_task(self._watch_file()) if self._poll_interval else None + self._watcher = ( + asyncio.create_task(self._watch_file()) if self._poll_interval else None + ) self.last_modified = None @property @@ -74,9 +76,14 @@ async def clean(self) -> None: if self._watcher is not None: if not self._watcher.cancelled(): self._watcher.cancel() - await self._watcher + try: + await self._watcher + except asyncio.CancelledError: + self._log.info(f"file watcher for '{self.file_id}' is cancelled now") - def observe(self, id: str, callback: Callable[[], Coroutine[Any, Any, None]]) -> None: + def observe( + self, id: str, callback: Callable[[], Coroutine[Any, Any, None]] + ) -> None: """ Subscribe to the file to get notified about out-of-band file changes. @@ -109,7 +116,9 @@ async def load_content(self, format: str, file_type: str) -> dict[str, Any]: """ async with self._lock: model = await ensure_async( - self._contents_manager.get(self.path, format=format, type=file_type, content=True) + self._contents_manager.get( + self.path, format=format, type=file_type, content=True + ) ) self.last_modified = model["last_modified"] return model @@ -156,7 +165,9 @@ async def maybe_save_content(self, model: dict[str, Any]) -> None: self.last_modified = m["last_modified"] raise OutOfBandChanges - async def _save_content(self, model: dict[str, Any], done_saving: asyncio.Event) -> None: + async def _save_content( + self, model: dict[str, Any], done_saving: asyncio.Event + ) -> None: try: m = await ensure_async(self._contents_manager.save(model, self.path)) self.last_modified = m["last_modified"] @@ -178,7 +189,9 @@ async def _watch_file(self) -> None: try: await self.maybe_notify() except Exception as e: - self._log.error(f"Error watching file: {self.path}\n{e!r}", exc_info=e) + self._log.error( + f"Error watching file: {self.path}\n{e!r}", exc_info=e + ) except asyncio.CancelledError: break @@ -189,9 +202,14 @@ async def maybe_notify(self) -> None: do_notify = False async with self._lock: # Get model metadata; format and type are not need - model = await ensure_async(self._contents_manager.get(self.path, content=False)) + model = await ensure_async( + self._contents_manager.get(self.path, content=False) + ) - if self.last_modified is not None and self.last_modified < model["last_modified"]: + if ( + self.last_modified is not None + and self.last_modified < model["last_modified"] + ): do_notify = True self.last_modified = model["last_modified"] diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py index e88e6d39..27643aaf 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py @@ -123,7 +123,9 @@ async def initialize(self) -> None: if self._document.source != model["content"]: # TODO: Delete document from the store. self._emit( - LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore." + LogLevel.INFO, + "initialize", + "The file is out-of-sync with the ystore.", ) self.log.info( "Content in file %s is out-of-sync with the ystore %s", @@ -135,7 +137,9 @@ async def initialize(self) -> None: if read_from_source: self._emit(LogLevel.INFO, "load", "Content loaded from disk.") self.log.info( - "Content in room %s loaded from file %s", self._room_id, self._file.path + "Content in room %s loaded from file %s", + self._room_id, + self._file.path, ) self._document.source = model["content"] @@ -146,7 +150,9 @@ async def initialize(self) -> None: self.ready = True self._emit(LogLevel.INFO, "initialize", "Room initialized") - def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None: + def _emit( + self, level: LogLevel, action: str | None = None, msg: str | None = None + ) -> None: data = {"level": level.value, "room": self._room_id, "path": self._file.path} if action: data["action"] = action @@ -161,7 +167,10 @@ def stop(self) -> None: Cancels the save task and unsubscribes from the file. """ - super().stop() + try: + super().stop() + except RuntimeError: + pass # TODO: Should we cancel or wait ? if self._saving_document: self._saving_document.cancel() @@ -180,8 +189,12 @@ async def _on_outofband_change(self) -> None: """ Called when the file got out-of-band changes. """ - self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id) - self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes. Overwriting the room.") + self.log.info( + "Out-of-band changes. Overwriting the content in room %s", self._room_id + ) + self._emit( + LogLevel.INFO, "overwrite", "Out-of-band changes. Overwriting the room." + ) try: model = await self._file.load_content(self._file_format, self._file_type) @@ -257,9 +270,13 @@ async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> No return except OutOfBandChanges: - self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id) + self.log.info( + "Out-of-band changes. Overwriting the content in room %s", self._room_id + ) try: - model = await self._file.load_content(self._file_format, self._file_type) + model = await self._file.load_content( + self._file_format, self._file_type + ) except Exception as e: msg = f"Error loading content from file: {self._file.path}\n{e!r}" self.log.error(msg, exc_info=e) @@ -299,3 +316,12 @@ async def _broadcast_updates(self): await super()._broadcast_updates() except asyncio.CancelledError: pass + + def stop(self) -> None: + """ + Stop the room. + """ + try: + super().stop() + except RuntimeError: + pass