diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py index e286cc13..b49ee7af 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/app.py @@ -14,6 +14,7 @@ from traitlets import Bool, Float, Type from .handlers import ( + DocForkHandler, DocSessionHandler, TimelineHandler, UndoRedoHandler, @@ -25,6 +26,7 @@ from .utils import ( AWARENESS_EVENTS_SCHEMA_PATH, EVENTS_SCHEMA_PATH, + FORK_EVENTS_SCHEMA_PATH, encode_file_path, room_id_from_encoded_path, ) @@ -85,6 +87,7 @@ def initialize(self): super().initialize() self.serverapp.event_logger.register_event_schema(EVENTS_SCHEMA_PATH) self.serverapp.event_logger.register_event_schema(AWARENESS_EVENTS_SCHEMA_PATH) + self.serverapp.event_logger.register_event_schema(FORK_EVENTS_SCHEMA_PATH) def initialize_settings(self): self.settings.update( @@ -123,6 +126,13 @@ def initialize_handlers(self): self.handlers.extend( [ + ( + r"/api/collaboration/fork/(.*)", + DocForkHandler, + { + "ywebsocket_server": self.ywebsocket_server, + }, + ), ( r"/api/collaboration/room/(.*)", YDocWebSocketHandler, diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/events/fork.yaml b/projects/jupyter-server-ydoc/jupyter_server_ydoc/events/fork.yaml new file mode 100644 index 00000000..0125fdf2 --- /dev/null +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/events/fork.yaml @@ -0,0 +1,56 @@ +"$id": https://schema.jupyter.org/jupyter_collaboration/fork/v1 +"$schema": "http://json-schema.org/draft-07/schema" +version: 1 +title: Collaborative fork events +personal-data: true +description: | + Fork events emitted from server-side during a collaborative session. +type: object +required: + - fork_roomid + - fork_info + - username + - action +properties: + fork_roomid: + type: string + description: | + Fork root room ID. + fork_info: + type: object + description: | + Fork root room information. + required: + - root_roomid + - synchronize + - title + - description + properties: + root_roomid: + type: string + description: | + Root room ID. Usually composed by the file type, format and ID. + synchronize: + type: boolean + description: | + Whether the fork is kept in sync with the root. + title: + type: string + description: | + The title of the fork. + description: + type: string + description: | + The description of the fork. + username: + type: string + description: | + The name of the user who created or deleted the fork. + action: + enum: + - create + - delete + description: | + Possible values: + 1. create + 2. delete diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py index 586e4dc1..b76666d4 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py @@ -26,6 +26,7 @@ from .utils import ( JUPYTER_COLLABORATION_AWARENESS_EVENTS_URI, JUPYTER_COLLABORATION_EVENTS_URI, + JUPYTER_COLLABORATION_FORK_EVENTS_URI, LogLevel, MessageType, decode_file_path, @@ -39,6 +40,7 @@ SERVER_SESSION = str(uuid.uuid4()) FORK_DOCUMENTS = {} +FORK_ROOMS: dict[str, dict[str, str]] = {} class YDocWebSocketHandler(WebSocketHandler, JupyterHandler): @@ -600,3 +602,101 @@ async def _cleanup_undo_manager(self, room_id: str) -> None: if room_id in FORK_DOCUMENTS: del FORK_DOCUMENTS[room_id] self.log.info(f"Fork Document for {room_id} has been removed.") + + +class DocForkHandler(APIHandler): + """ + Jupyter Server handler to: + - create a fork of a root document (optionally synchronizing with the root document), + - delete a fork of a root document (optionally merging back in the root document). + - get fork IDs of a root document. + """ + + auth_resource = "contents" + + def initialize( + self, + ywebsocket_server: JupyterWebsocketServer, + ) -> None: + self._websocket_server = ywebsocket_server + + @web.authenticated + @authorized + async def get(self, root_roomid): + """ + Returns a dictionary of fork room ID to fork room information for the given root room ID. + """ + self.write( + { + fork_roomid: fork_info + for fork_roomid, fork_info in FORK_ROOMS.items() + if fork_info["root_roomid"] == root_roomid + } + ) + + @web.authenticated + @authorized + async def put(self, root_roomid): + """ + Creates a fork of a root document and returns its ID. + Optionally keeps the fork in sync with the root. + """ + fork_roomid = uuid4().hex + root_room = await self._websocket_server.get_room(root_roomid) + update = root_room.ydoc.get_update() + fork_ydoc = Doc() + fork_ydoc.apply_update(update) + model = self.get_json_body() + synchronize = model.get("synchronize", False) + if synchronize: + root_room.ydoc.observe(lambda event: fork_ydoc.apply_update(event.update)) + FORK_ROOMS[fork_roomid] = fork_info = { + "root_roomid": root_roomid, + "synchronize": synchronize, + "title": model.get("title", ""), + "description": model.get("description", ""), + } + fork_room = YRoom(ydoc=fork_ydoc) + self._websocket_server.rooms[fork_roomid] = fork_room + await self._websocket_server.start_room(fork_room) + self._emit_fork_event(self.current_user.username, fork_roomid, fork_info, "create") + data = json.dumps( + { + "sessionId": SERVER_SESSION, + "fork_roomid": fork_roomid, + "fork_info": fork_info, + } + ) + self.set_status(201) + return self.finish(data) + + @web.authenticated + @authorized + async def delete(self, fork_roomid): + """ + Deletes a forked document, and optionally merges it back in the root document. + """ + fork_info = FORK_ROOMS[fork_roomid] + root_roomid = fork_info["root_roomid"] + del FORK_ROOMS[fork_roomid] + if self.get_query_argument("merge") == "true": + root_room = await self._websocket_server.get_room(root_roomid) + root_ydoc = root_room.ydoc + fork_room = await self._websocket_server.get_room(fork_roomid) + fork_ydoc = fork_room.ydoc + fork_update = fork_ydoc.get_update() + root_ydoc.apply_update(fork_update) + await self._websocket_server.delete_room(name=fork_roomid) + self._emit_fork_event(self.current_user.username, fork_roomid, fork_info, "delete") + self.set_status(200) + + def _emit_fork_event( + self, username: str, fork_roomid: str, fork_info: dict[str, str], action: str + ) -> None: + data = { + "username": username, + "fork_roomid": fork_roomid, + "fork_info": fork_info, + "action": action, + } + self.event_logger.emit(schema_id=JUPYTER_COLLABORATION_FORK_EVENTS_URI, data=data) diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/pytest_plugin.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/pytest_plugin.py index efdac129..008236c4 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/pytest_plugin.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/pytest_plugin.py @@ -145,6 +145,65 @@ async def _inner(format: str, type: str, path: str) -> Any: return _inner +@pytest.fixture +def rtc_connect_fork_client(jp_http_port, jp_base_url, rtc_fetch_session): + async def _inner(room_id: str) -> Any: + return connect( + f"ws://127.0.0.1:{jp_http_port}{jp_base_url}api/collaboration/room/{room_id}" + ) + + return _inner + + +@pytest.fixture +def rtc_get_forks_client(jp_fetch): + async def _inner(root_roomid: str) -> Any: + return await jp_fetch( + "/api/collaboration/fork", + root_roomid, + method="GET", + ) + + return _inner + + +@pytest.fixture +def rtc_create_fork_client(jp_fetch): + async def _inner( + root_roomid: str, + synchronize: bool, + title: str | None = None, + description: str | None = None, + ) -> Any: + return await jp_fetch( + "/api/collaboration/fork", + root_roomid, + method="PUT", + body=json.dumps( + { + "synchronize": synchronize, + "title": title, + "description": description, + } + ), + ) + + return _inner + + +@pytest.fixture +def rtc_delete_fork_client(jp_fetch): + async def _inner(fork_roomid: str, merge: bool) -> Any: + return await jp_fetch( + "/api/collaboration/fork", + fork_roomid, + method="DELETE", + params={"merge": str(merge).lower()}, + ) + + return _inner + + @pytest.fixture def rtc_add_doc_to_store(rtc_connect_doc_client): event = Event() diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/utils.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/utils.py index d1c74ce4..22c51d87 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/utils.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/utils.py @@ -11,7 +11,9 @@ JUPYTER_COLLABORATION_AWARENESS_EVENTS_URI = ( "https://schema.jupyter.org/jupyter_collaboration/awareness/v1" ) +JUPYTER_COLLABORATION_FORK_EVENTS_URI = "https://schema.jupyter.org/jupyter_collaboration/fork/v1" AWARENESS_EVENTS_SCHEMA_PATH = EVENTS_FOLDER_PATH / "awareness.yaml" +FORK_EVENTS_SCHEMA_PATH = EVENTS_FOLDER_PATH / "fork.yaml" class MessageType(IntEnum): diff --git a/projects/jupyter-server-ydoc/pyproject.toml b/projects/jupyter-server-ydoc/pyproject.toml index 7007980e..2e5227c8 100644 --- a/projects/jupyter-server-ydoc/pyproject.toml +++ b/projects/jupyter-server-ydoc/pyproject.toml @@ -41,6 +41,7 @@ dynamic = ["version"] [project.optional-dependencies] test = [ "coverage", + "dirty-equals", "jupyter_server[test]>=2.4.0", "jupyter_server_fileid[test]", "pytest>=7.0", diff --git a/tests/test_handlers.py b/tests/test_handlers.py index 96aaded5..17f3c95b 100644 --- a/tests/test_handlers.py +++ b/tests/test_handlers.py @@ -7,8 +7,10 @@ from asyncio import Event, sleep from typing import Any +from dirty_equals import IsStr from jupyter_events.logger import EventLogger from jupyter_ydoc import YUnicode +from pycrdt import Text from pycrdt_websocket import WebsocketProvider @@ -215,3 +217,147 @@ async def my_listener(logger: EventLogger, schema_id: str, data: dict) -> None: await jp_serverapp.web_app.settings["jupyter_server_ydoc"].stop_extension() del jp_serverapp.web_app.settings["file_id_manager"] + + +async def test_fork_handler( + jp_serverapp, + rtc_create_file, + rtc_connect_doc_client, + rtc_connect_fork_client, + rtc_get_forks_client, + rtc_create_fork_client, + rtc_delete_fork_client, + rtc_fetch_session, +): + collected_data = [] + + async def my_listener(logger: EventLogger, schema_id: str, data: dict) -> None: + collected_data.append(data) + + event_logger = jp_serverapp.event_logger + event_logger.add_listener( + schema_id="https://schema.jupyter.org/jupyter_collaboration/fork/v1", + listener=my_listener, + ) + + path, _ = await rtc_create_file("test.txt", "Hello") + + root_connect_event = Event() + + def _on_root_change(topic: str, event: Any) -> None: + if topic == "source": + root_connect_event.set() + + root_ydoc = YUnicode() + root_ydoc.observe(_on_root_change) + + resp = await rtc_fetch_session("text", "file", path) + data = json.loads(resp.body.decode("utf-8")) + file_id = data["fileId"] + root_roomid = f"text:file:{file_id}" + + async with await rtc_connect_doc_client("text", "file", path) as ws, WebsocketProvider( + root_ydoc.ydoc, ws + ): + await root_connect_event.wait() + + resp = await rtc_create_fork_client(root_roomid, False, "my fork0", "is awesome0") + data = json.loads(resp.body.decode("utf-8")) + fork_roomid0 = data["fork_roomid"] + + resp = await rtc_get_forks_client(root_roomid) + data = json.loads(resp.body.decode("utf-8")) + expected_data0 = { + fork_roomid0: { + "root_roomid": root_roomid, + "synchronize": False, + "title": "my fork0", + "description": "is awesome0", + } + } + assert data == expected_data0 + + assert collected_data == [ + { + "username": IsStr(), + "fork_roomid": fork_roomid0, + "fork_info": expected_data0[fork_roomid0], + "action": "create", + } + ] + + resp = await rtc_create_fork_client(root_roomid, True, "my fork1", "is awesome1") + data = json.loads(resp.body.decode("utf-8")) + fork_roomid1 = data["fork_roomid"] + + resp = await rtc_get_forks_client(root_roomid) + data = json.loads(resp.body.decode("utf-8")) + expected_data1 = { + fork_roomid1: { + "root_roomid": root_roomid, + "synchronize": True, + "title": "my fork1", + "description": "is awesome1", + } + } + expected_data = dict(**expected_data0, **expected_data1) + assert data == expected_data + + assert len(collected_data) == 2 + assert collected_data[1] == { + "username": IsStr(), + "fork_roomid": fork_roomid1, + "fork_info": expected_data[fork_roomid1], + "action": "create", + } + + fork_ydoc = YUnicode() + fork_connect_event = Event() + + def _on_fork_change(topic: str, event: Any) -> None: + if topic == "source": + fork_connect_event.set() + + fork_ydoc.observe(_on_fork_change) + fork_text = fork_ydoc.ydoc.get("source", type=Text) + + async with await rtc_connect_fork_client(fork_roomid1) as ws, WebsocketProvider( + fork_ydoc.ydoc, ws + ): + await fork_connect_event.wait() + root_text = root_ydoc.ydoc.get("source", type=Text) + root_text += ", World!" + await sleep(0.1) + assert str(fork_text) == "Hello, World!" + fork_text += " Hi!" + + await sleep(0.1) + assert str(root_text) == "Hello, World!" + + await rtc_delete_fork_client(fork_roomid0, True) + await sleep(0.1) + assert str(root_text) == "Hello, World!" + resp = await rtc_get_forks_client(root_roomid) + data = json.loads(resp.body.decode("utf-8")) + assert data == expected_data1 + assert len(collected_data) == 3 + assert collected_data[2] == { + "username": IsStr(), + "fork_roomid": fork_roomid0, + "fork_info": expected_data[fork_roomid0], + "action": "delete", + } + + await rtc_delete_fork_client(fork_roomid1, True) + await sleep(0.1) + assert str(root_text) == "Hello, World! Hi!" + resp = await rtc_get_forks_client(root_roomid) + data = json.loads(resp.body.decode("utf-8")) + assert data == {} + assert len(collected_data) == 4 + assert collected_data[3] == { + "username": IsStr(), + "fork_roomid": fork_roomid1, + "fork_info": expected_data[fork_roomid1], + "action": "delete", + }