From 9eed731897790f967a2457be0560db0a3e6f0a77 Mon Sep 17 00:00:00 2001 From: Nicolas Brichet Date: Thu, 10 Oct 2024 10:38:32 +0200 Subject: [PATCH 1/7] Update jupyter_ydoc and pycrdt_websocket dependencies (and indirectly pycrdt) --- .../jupyter_server_ydoc/handlers.py | 25 ------------------- .../jupyter_server_ydoc/rooms.py | 22 +++++++++++++++- projects/jupyter-server-ydoc/pyproject.toml | 4 +-- 3 files changed, 23 insertions(+), 28 deletions(-) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py index f04004fe..8c9f46d4 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py @@ -286,31 +286,6 @@ async def on_message(self, message): """ message_type = message[0] - if message_type == YMessageType.AWARENESS: - # awareness - skip = False - changes = self.room.awareness.get_changes(message[1:]) - added_users = changes["added"] - removed_users = changes["removed"] - for i, user in enumerate(added_users): - u = changes["states"][i] - if "user" in u: - name = u["user"]["name"] - self._websocket_server.connected_users[user] = name - self.log.debug("Y user joined: %s", name) - for user in removed_users: - if user in self._websocket_server.connected_users: - name = self._websocket_server.connected_users[user] - del self._websocket_server.connected_users[user] - self.log.debug("Y user left: %s", name) - # filter out message depending on changes - if skip: - self.log.debug( - "Filtered out Y message of type: %s", - YMessageType(message_type).name, - ) - return skip - if message_type == MessageType.CHAT: msg = message[2:].decode("utf-8") diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py index 0fdb6d8e..692b305c 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py @@ -41,7 +41,7 @@ def __init__( self._file_format: str = file_format self._file_type: str = file_type self._file: FileLoader = file - self._document = YDOCS.get(self._file_type, YFILE)(self.ydoc) + self._document = YDOCS.get(self._file_type, YFILE)(self.ydoc, self.awareness) self._document.path = self._file.path self._logger = logger @@ -57,6 +57,9 @@ def __init__( self._document.observe(self._on_document_change) self._file.observe(self.room_id, self._on_outofband_change, self._on_filepath_change) + # Listen for awareness changes + self.awareness.observe(self._on_awareness_change) + @property def file_format(self) -> str: """Document file format.""" @@ -316,6 +319,23 @@ async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> No self.log.error(msg, exc_info=e) self._emit(LogLevel.ERROR, None, msg) + def _on_awareness_change(self, type: str, changes: tuple[dict[str, Any], Any]): + if type != "change": + return + added_users = changes[0]["added"] + removed_users = changes[0]["removed"] + for i, user in enumerate(added_users): + u = self.awareness.states[user] + if "user" in u: + name = u["user"]["name"] + self._websocket_server.connected_users[user] = name + self.log.debug("Y user joined: %s", name) + for user in removed_users: + if user in self._websocket_server.connected_users: + name = self._websocket_server.connected_users[user] + del self._websocket_server.connected_users[user] + self.log.debug("Y user left: %s", name) + class TransientRoom(YRoom): """A Y room for sharing state (e.g. awareness).""" diff --git a/projects/jupyter-server-ydoc/pyproject.toml b/projects/jupyter-server-ydoc/pyproject.toml index 3348d07d..7007980e 100644 --- a/projects/jupyter-server-ydoc/pyproject.toml +++ b/projects/jupyter-server-ydoc/pyproject.toml @@ -29,9 +29,9 @@ authors = [ ] dependencies = [ "jupyter_server>=2.11.1,<3.0.0", - "jupyter_ydoc>=2.0.0,<4.0.0", + "jupyter_ydoc>=2.1.2,<4.0.0", "pycrdt", - "pycrdt-websocket>=0.14.2,<0.15.0", + "pycrdt-websocket>=0.15.0,<0.16.0", "jupyter_events>=0.10.0", "jupyter_server_fileid>=0.7.0,<1", "jsonschema>=4.18.0" From a6aeb8644cfe8a117ec2942261d858492219e337 Mon Sep 17 00:00:00 2001 From: Nicolas Brichet Date: Thu, 10 Oct 2024 13:40:20 +0200 Subject: [PATCH 2/7] linting --- projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py | 2 +- projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py index 8c9f46d4..2b8ccf76 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py @@ -15,7 +15,7 @@ from jupyter_server.base.handlers import APIHandler, JupyterHandler from jupyter_server.utils import ensure_async from jupyter_ydoc import ydocs as YDOCS -from pycrdt import Doc, UndoManager, YMessageType, write_var_uint +from pycrdt import Doc, UndoManager, write_var_uint from pycrdt_websocket.websocket_server import YRoom from pycrdt_websocket.ystore import BaseYStore from tornado import web diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py index 692b305c..409f2025 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py @@ -319,12 +319,12 @@ async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> No self.log.error(msg, exc_info=e) self._emit(LogLevel.ERROR, None, msg) - def _on_awareness_change(self, type: str, changes: tuple[dict[str, Any], Any]): + def _on_awareness_change(self, type: str, changes: tuple[dict[str, Any], Any]) -> None: if type != "change": return added_users = changes[0]["added"] removed_users = changes[0]["removed"] - for i, user in enumerate(added_users): + for _, user in enumerate(added_users): u = self.awareness.states[user] if "user" in u: name = u["user"]["name"] From a38c80e7c15a2c7173fcbf2cf8118fee556ab176 Mon Sep 17 00:00:00 2001 From: Nicolas Brichet Date: Thu, 10 Oct 2024 13:43:40 +0200 Subject: [PATCH 3/7] Remove useless enumerate --- projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py index 409f2025..d1c6ec3d 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py @@ -324,7 +324,7 @@ def _on_awareness_change(self, type: str, changes: tuple[dict[str, Any], Any]) - return added_users = changes[0]["added"] removed_users = changes[0]["removed"] - for _, user in enumerate(added_users): + for user in added_users: u = self.awareness.states[user] if "user" in u: name = u["user"]["name"] From 83713417f1eabd3a5d096911c2caa0bf9bc67052 Mon Sep 17 00:00:00 2001 From: Nicolas Brichet Date: Thu, 10 Oct 2024 16:24:31 +0200 Subject: [PATCH 4/7] Update the connected users on global awareness changes --- .../jupyter_server_ydoc/handlers.py | 28 +++++++++++++++++++ .../jupyter_server_ydoc/rooms.py | 20 ------------- 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py index 2b8ccf76..dda9f51c 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py @@ -137,6 +137,9 @@ def exception_logger(exception: Exception, log: Logger) -> bool: exception_handler=exception_logger, ) + # Listen for the changes in GlobalAwareness to update users + self.room.awareness.observe(self._on_global_awareness_event) + try: await self._websocket_server.start_room(self.room) except Exception as e: @@ -380,6 +383,31 @@ async def _clean_room(self) -> None: self._emit(LogLevel.INFO, "clean", "Loader deleted.") del self._room_locks[self._room_id] + def _on_global_awareness_event(self, type: str, changes: tuple[dict[str, Any], Any]) -> None: + """ + Update the users when the global awareness changes. + + + Parameters: + type: 'update' or 'change' (change is triggered only if the states are modified) + changes: the changes + """ + if type != "change": + return + added_users = changes[0]["added"] + removed_users = changes[0]["removed"] + for user in added_users: + u = self.room.awareness.states[user] + if "user" in u: + name = u["user"]["name"] + self._websocket_server.connected_users[user] = name + self.log.debug("Y user joined: %s", name) + for user in removed_users: + if user in self._websocket_server.connected_users: + name = self._websocket_server.connected_users[user] + del self._websocket_server.connected_users[user] + self.log.debug("Y user left: %s", name) + def check_origin(self, origin): """ Check origin diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py index d1c6ec3d..62dae0fc 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py @@ -57,9 +57,6 @@ def __init__( self._document.observe(self._on_document_change) self._file.observe(self.room_id, self._on_outofband_change, self._on_filepath_change) - # Listen for awareness changes - self.awareness.observe(self._on_awareness_change) - @property def file_format(self) -> str: """Document file format.""" @@ -319,23 +316,6 @@ async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> No self.log.error(msg, exc_info=e) self._emit(LogLevel.ERROR, None, msg) - def _on_awareness_change(self, type: str, changes: tuple[dict[str, Any], Any]) -> None: - if type != "change": - return - added_users = changes[0]["added"] - removed_users = changes[0]["removed"] - for user in added_users: - u = self.awareness.states[user] - if "user" in u: - name = u["user"]["name"] - self._websocket_server.connected_users[user] = name - self.log.debug("Y user joined: %s", name) - for user in removed_users: - if user in self._websocket_server.connected_users: - name = self._websocket_server.connected_users[user] - del self._websocket_server.connected_users[user] - self.log.debug("Y user left: %s", name) - class TransientRoom(YRoom): """A Y room for sharing state (e.g. awareness).""" From c1f8ebdf7250756ada1d029cbdd1b8b5e6c8b686 Mon Sep 17 00:00:00 2001 From: Nicolas Brichet <32258950+brichet@users.noreply.github.com> Date: Mon, 14 Oct 2024 22:41:05 +0200 Subject: [PATCH 5/7] Apply suggestions from code review Co-authored-by: David Brochart --- .../jupyter_server_ydoc/handlers.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py index dda9f51c..bf10ffac 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py @@ -383,14 +383,13 @@ async def _clean_room(self) -> None: self._emit(LogLevel.INFO, "clean", "Loader deleted.") del self._room_locks[self._room_id] - def _on_global_awareness_event(self, type: str, changes: tuple[dict[str, Any], Any]) -> None: + def _on_global_awareness_event(self, topic: str, changes: tuple[dict[str, Any], Any]) -> None: """ Update the users when the global awareness changes. - Parameters: - type: 'update' or 'change' (change is triggered only if the states are modified) - changes: the changes + topic (str): `"update"` or `"change"` (`"change"` is triggered only if the states are modified). + changes (tuple[dict[str, Any], Any]): The changes and the origin of the changes. """ if type != "change": return @@ -404,8 +403,7 @@ def _on_global_awareness_event(self, type: str, changes: tuple[dict[str, Any], A self.log.debug("Y user joined: %s", name) for user in removed_users: if user in self._websocket_server.connected_users: - name = self._websocket_server.connected_users[user] - del self._websocket_server.connected_users[user] + name = self._websocket_server.connected_users.pop(user) self.log.debug("Y user left: %s", name) def check_origin(self, origin): From 080e51aa6e018dcf9a77d48c495c25a221892091 Mon Sep 17 00:00:00 2001 From: Nicolas Brichet Date: Mon, 14 Oct 2024 22:55:28 +0200 Subject: [PATCH 6/7] Test if the transcient room is the global awareness to observe it --- projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py index bf10ffac..b5a6bb12 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py @@ -137,8 +137,9 @@ def exception_logger(exception: Exception, log: Logger) -> bool: exception_handler=exception_logger, ) - # Listen for the changes in GlobalAwareness to update users - self.room.awareness.observe(self._on_global_awareness_event) + if self._room_id == "JupyterLab:globalAwareness": + # Listen for the changes in GlobalAwareness to update users + self.room.awareness.observe(self._on_global_awareness_event) try: await self._websocket_server.start_room(self.room) From 218179bbacd4f2dd7d22f4d293602e5e0021d703 Mon Sep 17 00:00:00 2001 From: Nicolas Brichet Date: Mon, 14 Oct 2024 23:26:32 +0200 Subject: [PATCH 7/7] typing --- .../jupyter-server-ydoc/jupyter_server_ydoc/handlers.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py index b5a6bb12..586e4dc1 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py @@ -8,7 +8,7 @@ import time import uuid from logging import Logger -from typing import Any +from typing import Any, Literal from uuid import uuid4 from jupyter_server.auth import authorized @@ -384,7 +384,9 @@ async def _clean_room(self) -> None: self._emit(LogLevel.INFO, "clean", "Loader deleted.") del self._room_locks[self._room_id] - def _on_global_awareness_event(self, topic: str, changes: tuple[dict[str, Any], Any]) -> None: + def _on_global_awareness_event( + self, topic: Literal["change", "update"], changes: tuple[dict[str, Any], Any] + ) -> None: """ Update the users when the global awareness changes. @@ -392,7 +394,7 @@ def _on_global_awareness_event(self, topic: str, changes: tuple[dict[str, Any], topic (str): `"update"` or `"change"` (`"change"` is triggered only if the states are modified). changes (tuple[dict[str, Any], Any]): The changes and the origin of the changes. """ - if type != "change": + if topic != "change": return added_users = changes[0]["added"] removed_users = changes[0]["removed"]