diff --git a/docs/source/configuration.md b/docs/source/configuration.md index bfe0b144..8a1f6784 100644 --- a/docs/source/configuration.md +++ b/docs/source/configuration.md @@ -26,6 +26,6 @@ jupyter lab --YDocExtension.file_poll_interval=2 # If None, the document will be kept in memory forever. jupyter lab --YDocExtension.document_cleanup_delay=100 -# The YStore class to use for storing Y updates (default: JupyterSQLiteYStore). -jupyter lab --YDocExtension.ystore_class=ypy_websocket.ystore.TempFileYStore +# The Store class used for storing Y updates (default: SQLiteYStore). +jupyter lab --YDocExtension.ystore_class=jupyter_collaboration.stores.FileYStore ``` diff --git a/docs/source/developer/architecture.md b/docs/source/developer/architecture.md index c0506086..2a45304e 100644 --- a/docs/source/developer/architecture.md +++ b/docs/source/developer/architecture.md @@ -4,6 +4,15 @@ COMING... +### Opening a document +![initialization](../images/initialization_diagram.png) + +### Autosave +![autosave](../images/autosave_diagram.png) + +### Conflict +![autosave](../images/conflict_diagram.png) + ## Early attempts Prior to the current implementation based on [Yjs](https://docs.yjs.dev/), other attempts using diff --git a/docs/source/images/autosave_diagram.png b/docs/source/images/autosave_diagram.png new file mode 100644 index 00000000..365fe104 Binary files /dev/null and b/docs/source/images/autosave_diagram.png differ diff --git a/docs/source/images/conflict_diagram.png b/docs/source/images/conflict_diagram.png new file mode 100644 index 00000000..bec7ebfa Binary files /dev/null and b/docs/source/images/conflict_diagram.png differ diff --git a/docs/source/images/initialization_diagram.png b/docs/source/images/initialization_diagram.png new file mode 100644 index 00000000..b0e98a2a Binary files /dev/null and b/docs/source/images/initialization_diagram.png differ diff --git a/jupyter_collaboration/app.py b/jupyter_collaboration/app.py index 23133fd9..7bce58fe 100644 --- a/jupyter_collaboration/app.py +++ b/jupyter_collaboration/app.py @@ -12,7 +12,6 @@ from .rooms import RoomManager from .stores import BaseYStore, SQLiteYStore from .utils import EVENTS_SCHEMA_PATH -from .websocketserver import JupyterWebsocketServer class YDocExtension(ExtensionApp): @@ -80,14 +79,7 @@ def initialize_handlers(self): for k, v in self.config.get(self.ystore_class.__name__, {}).items(): setattr(self.ystore_class, k, v) - # NOTE: Initialize in the ExtensionApp.start_extension once - # https://github.com/jupyter-server/jupyter_server/issues/1329 - # is done. - # We are temporarily initializing the store here because the - # initialization is async self.store = self.ystore_class(log=self.log) - loop = asyncio.get_event_loop() - loop.run_until_complete(self.store.initialize()) # self.settings is local to the ExtensionApp but here we need # the global app settings in which the file id manager will register @@ -111,6 +103,7 @@ def initialize_handlers(self): YDocWebSocketHandler, { "document_cleanup_delay": self.document_cleanup_delay, + "store": self.store, "room_manager": self.room_manager, }, ), diff --git a/jupyter_collaboration/handlers.py b/jupyter_collaboration/handlers.py index dbe4a268..76f14a42 100644 --- a/jupyter_collaboration/handlers.py +++ b/jupyter_collaboration/handlers.py @@ -17,7 +17,8 @@ from tornado.websocket import WebSocketHandler from ypy_websocket.yutils import write_var_uint -from .rooms import DocumentRoom, RoomManager +from .rooms import BaseRoom, RoomManager +from .stores import BaseYStore from .utils import ( JUPYTER_COLLABORATION_EVENTS_URI, LogLevel, @@ -50,29 +51,38 @@ class YDocWebSocketHandler(WebSocketHandler, JupyterHandler): receiving a message. """ - _serve_task: asyncio.Task + _room_id: str + room: BaseRoom + _serve_task: asyncio.Task | None _message_queue: asyncio.Queue[Any] - async def prepare(self): - # Get room - self._room_id: str = self.request.path.split("/")[-1] - self.room = await self._room_manager.get_room(self._room_id) - return await super().prepare() - def initialize( - self, room_manager: RoomManager, document_cleanup_delay: float | None = 60.0 + self, + store: BaseYStore, + room_manager: RoomManager, + document_cleanup_delay: float | None = 60.0, ) -> None: # File ID manager cannot be passed as argument as the extension may load after this one self._file_id_manager = self.settings["file_id_manager"] + self._store = store self._room_manager = room_manager self._cleanup_delay = document_cleanup_delay - self._room_id = None - self.room = None - self._serve_task = None + self._serve_task: asyncio.Task | None = None self._message_queue = asyncio.Queue() + async def prepare(self): + # NOTE: Initialize in the ExtensionApp.start_extension once + # https://github.com/jupyter-server/jupyter_server/issues/1329 + # is done. + # We are temporarily initializing the store here because the + # initialization is async + if not self._store.initialized: + await self._store.initialize() + + return await super().prepare() + @property def path(self): """ @@ -112,47 +122,43 @@ async def open(self, room_id): """ On connection open. """ - # Start processing messages in the room - self._serve_task = asyncio.create_task(self.room.serve(self)) + self._room_id = self.request.path.split("/")[-1] + + # Close the connection if the document session expired + session_id = self.get_query_argument("sessionId", None) + if session_id and SERVER_SESSION != session_id: + self.close( + 1003, + f"Document session {session_id} expired. You need to reload this browser tab.", + ) + try: + # Get room + self.room = await self._room_manager.get_room(self._room_id) - if isinstance(self.room, DocumentRoom): - # Close the connection if the document session expired - session_id = self.get_query_argument("sessionId", "") - if SERVER_SESSION != session_id: - self.close( - 1003, - f"Document session {session_id} expired. You need to reload this browser tab.", - ) - - # TODO: Move initialization to RoomManager to make sure only one - # client calls initialize - try: - # Initialize the room - await self.room.initialize() - except Exception as e: - _, _, file_id = decode_file_path(self._room_id) - path = self._file_id_manager.get_path(file_id) - - # Close websocket and propagate error. - if isinstance(e, web.HTTPError): - self.log.error(f"File {path} not found.\n{e!r}", exc_info=e) - self.close(1004, f"File {path} not found.") - else: - self.log.error(f"Error initializing: {path}\n{e!r}", exc_info=e) - self.close(1003, f"Error initializing: {path}. You need to close the document.") - - # Clean up the room and delete the file loader - if ( - self.room is not None - and len(self.room.clients) == 0 - or self.room.clients == [self] - ): - self._message_queue.put_nowait(b"") + except Exception as e: + _, _, file_id = decode_file_path(self._room_id) + path = self._file_id_manager.get_path(file_id) + + # Close websocket and propagate error. + if isinstance(e, web.HTTPError): + self.log.error(f"File {path} not found.\n{e!r}", exc_info=e) + self.close(1004, f"File {path} not found.") + else: + self.log.error(f"Error initializing: {path}\n{e!r}", exc_info=e) + self.close(1003, f"Error initializing: {path}. You need to close the document.") + + # Clean up the room and delete the file loader + if self.room is not None and len(self.room.clients) == 0 or self.room.clients == [self]: + self._message_queue.put_nowait(b"") + if self._serve_task: self._serve_task.cancel() - await self._room_manager.remove(self._room_id) + await self._room_manager.remove_room(self._room_id) self._emit(LogLevel.INFO, "initialize", "New client connected.") + # Start processing messages in the room + self._serve_task = asyncio.create_task(self.room.serve(self)) + async def send(self, message): """ Send a message to the client. @@ -201,15 +207,11 @@ def on_close(self) -> None: if self._serve_task is not None and not self._serve_task.cancelled(): self._serve_task.cancel() - if ( - self.room is not None - and isinstance(self.room, DocumentRoom) - and self.room.clients == [self] - ): + if self.room is not None and self.room.clients == [self]: # no client in this room after we disconnect # Remove the room with a delay in case someone reconnects IOLoop.current().add_callback( - self._room_manager.remove, self._room_id, self._cleanup_delay + self._room_manager.remove_room, self._room_id, self._cleanup_delay ) def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None: diff --git a/jupyter_collaboration/rooms/base.py b/jupyter_collaboration/rooms/base.py index 014d8251..15a16d1f 100644 --- a/jupyter_collaboration/rooms/base.py +++ b/jupyter_collaboration/rooms/base.py @@ -23,6 +23,12 @@ def room_id(self) -> str: """ return self._room_id + async def initialize(self) -> None: + return + + async def handle_msg(self, data: bytes) -> None: + return + def broadcast_msg(self, msg: bytes) -> None: for client in self.clients: self._task_group.start_soon(client.send, msg) diff --git a/jupyter_collaboration/rooms/document.py b/jupyter_collaboration/rooms/document.py index c4e93348..fd9340d9 100644 --- a/jupyter_collaboration/rooms/document.py +++ b/jupyter_collaboration/rooms/document.py @@ -10,10 +10,10 @@ from jupyter_events import EventLogger from jupyter_ydoc import ydocs as YDOCS -from ypy_websocket.stores import BaseYStore from ypy_websocket.yutils import write_var_uint from ..loaders import FileLoader +from ..stores import BaseYStore from ..utils import ( JUPYTER_COLLABORATION_EVENTS_URI, LogLevel, diff --git a/jupyter_collaboration/rooms/manager.py b/jupyter_collaboration/rooms/manager.py index 6a1bd885..c38e1498 100644 --- a/jupyter_collaboration/rooms/manager.py +++ b/jupyter_collaboration/rooms/manager.py @@ -43,7 +43,7 @@ def has_room(self, room_id: str) -> bool: """Test if an id has a room.""" return room_id in self._rooms - async def get_room(self, room_id: str) -> BaseRoom | None: + async def get_room(self, room_id: str) -> BaseRoom: """ Get the room for a given id. @@ -75,9 +75,12 @@ async def get_room(self, room_id: str) -> BaseRoom | None: if not room.started.is_set(): self._room_tasks[room_id] = asyncio.create_task(room.start()) + if not room.ready: + await room.initialize() + return room - async def remove(self, room_id: str, delay: float = 0) -> None: + async def remove_room(self, room_id: str, delay: float = 0) -> None: """Remove the room for a given id.""" # Use lock to make sure while a client is creating the # clean up task, no one else is accessing the room or trying to @@ -86,6 +89,8 @@ async def remove(self, room_id: str, delay: float = 0) -> None: if room_id in self._clean_up_tasks: return + # NOTE: Should we check if there is only one client? + # if len(self._rooms[room_id].clients) <= 1: self._clean_up_tasks[room_id] = asyncio.create_task(self._clean_up_room(room_id, delay)) async def clear(self) -> None: @@ -118,7 +123,7 @@ def _create_document_room(self, room_id: str) -> DocumentRoom: self._document_save_delay, ) - async def _clean_up_room(self, room_id: str, delay: float): + async def _clean_up_room(self, room_id: str, delay: float) -> None: """ Async task for cleaning up the resources. diff --git a/jupyter_collaboration/stores/__init__.py b/jupyter_collaboration/stores/__init__.py index fb4893c0..d70e1605 100644 --- a/jupyter_collaboration/stores/__init__.py +++ b/jupyter_collaboration/stores/__init__.py @@ -2,5 +2,7 @@ # Distributed under the terms of the Modified BSD License. from .base_store import BaseYStore # noqa +from .file_store import FileYStore # noqa +from .sqlite_store import SQLiteYStore as _SQLiteYStore # noqa from .stores import SQLiteYStore, TempFileYStore # noqa from .utils import YDocExists, YDocNotFound # noqa diff --git a/jupyter_collaboration/stores/base_store.py b/jupyter_collaboration/stores/base_store.py index 97039ae3..0785b41d 100644 --- a/jupyter_collaboration/stores/base_store.py +++ b/jupyter_collaboration/stores/base_store.py @@ -143,7 +143,7 @@ async def encode_state_as_update(self, path: str, ydoc: Y.YDoc) -> None: path: The document name/path. ydoc: The YDoc from which to store the state. """ - update = Y.encode_state_as_update(ydoc) # type: ignore + update = Y.encode_state_as_update(ydoc) await self.write(path, update) async def apply_updates(self, path: str, ydoc: Y.YDoc) -> None: @@ -154,4 +154,4 @@ async def apply_updates(self, path: str, ydoc: Y.YDoc) -> None: ydoc: The YDoc on which to apply the updates. """ async for update, *rest in self.read(path): # type: ignore - Y.apply_update(ydoc, update) # type: ignore + Y.apply_update(ydoc, update) diff --git a/jupyter_collaboration/stores/file_store.py b/jupyter_collaboration/stores/file_store.py index 00289767..2ea2167a 100644 --- a/jupyter_collaboration/stores/file_store.py +++ b/jupyter_collaboration/stores/file_store.py @@ -4,15 +4,13 @@ from __future__ import annotations import struct -import tempfile import time from logging import Logger, getLogger from pathlib import Path -from typing import AsyncIterator, Awaitable, Callable +from typing import Any, AsyncIterator, Awaitable, Callable import anyio from anyio import Event, Lock -from deprecated import deprecated from ypy_websocket.yutils import Decoder, get_new_path, write_var_uint from .base_store import BaseYStore @@ -235,7 +233,7 @@ async def _get_data_offset(self, path: Path) -> int: except Exception: raise YDocNotFound(f"File {str(path)} not found.") - async def _decode_data(self, data) -> AsyncIterator[tuple[bytes, bytes, float]]: + async def _decode_data(self, data: Any) -> AsyncIterator[tuple[bytes, bytes, float]]: i = 0 for d in Decoder(data).read_messages(): if i == 0: @@ -249,60 +247,3 @@ async def _decode_data(self, data) -> AsyncIterator[tuple[bytes, bytes, float]]: def _get_document_path(self, path: str) -> Path: return Path(self._store_path, path + ".y") - - -@deprecated(reason="Use FileYStore instead") -class TempFileYStore(FileYStore): - """ - A YStore which uses the system's temporary directory. - Files are writen under a common directory. - To prefix the directory name (e.g. /tmp/my_prefix_b4whmm7y/): - - ```py - class PrefixTempFileYStore(TempFileYStore): - prefix_dir = "my_prefix_" - ``` - - ## Note: - This class is deprecated. Use FileYStore and pass the tmp folder - as path argument. For example: - - ```py - tmp_dir = tempfile.mkdtemp(prefix="prefix/directory/") - store = FileYStore(tmp_dir) - ``` - """ - - prefix_dir: str | None = None - base_dir: str | None = None - - def __init__( - self, - path: str, - metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None, - log: Logger | None = None, - ): - """Initialize the object. - - Arguments: - path: The file path used to store the updates. - metadata_callback: An optional callback to call to get the metadata. - log: An optional logger. - """ - full_path = str(Path(self.get_base_dir()) / path) - super().__init__(full_path, metadata_callback=metadata_callback, log=log) - - def get_base_dir(self) -> str: - """Get the base directory where the update file is written. - - Returns: - The base directory path. - """ - if self.base_dir is None: - self.make_directory() - assert self.base_dir is not None - return self.base_dir - - def make_directory(self): - """Create the base directory where the update file is written.""" - type(self).base_dir = tempfile.mkdtemp(prefix=self.prefix_dir) diff --git a/jupyter_collaboration/stores/utils.py b/jupyter_collaboration/stores/utils.py index 79f6b7f3..6e8d71e4 100644 --- a/jupyter_collaboration/stores/utils.py +++ b/jupyter_collaboration/stores/utils.py @@ -1,6 +1,7 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. + class YDocNotFound(Exception): pass diff --git a/pyproject.toml b/pyproject.toml index f501a189..ab6cdc17 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,8 @@ dependencies = [ "ypy-websocket>=0.12.1,<0.13.0", "jupyter_events>=0.7.0", "jupyter_server_fileid>=0.7.0,<1", - "jsonschema>=4.18.0" + "jsonschema>=4.18.0", + "anyio >=3.6.2,<5" ] dynamic = ["version", "description", "authors", "urls", "keywords"] @@ -152,7 +153,7 @@ disallow_any_generics = false disallow_incomplete_defs = true disallow_untyped_decorators = true no_implicit_optional = true -no_implicit_reexport = true +no_implicit_reexport = false pretty = true show_error_context = true show_error_codes = true diff --git a/tests/conftest.py b/tests/conftest.py index dd323d42..3e9bf5ca 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,6 +5,7 @@ import json from asyncio import Event, sleep from datetime import datetime +from logging import getLogger from typing import Any import nbformat @@ -170,9 +171,7 @@ def rtc_create_SQLite_store(jp_serverapp): setattr(SQLiteYStore, k, v) async def _inner(type: str, path: str, content: str) -> DocumentRoom: - room_id = f"{type}:{path}" - db = SQLiteYStore() - await db.start() + db = SQLiteYStore(log=getLogger(__name__)) await db.initialize() if type == "notebook": @@ -182,8 +181,8 @@ async def _inner(type: str, path: str, content: str) -> DocumentRoom: doc.source = content - await db.create(room_id, 0) - await db.encode_state_as_update(room_id, doc.ydoc) + await db.create(path, 0) + await db.encode_state_as_update(path, doc.ydoc) return db @@ -193,14 +192,15 @@ async def _inner(type: str, path: str, content: str) -> DocumentRoom: @pytest.fixture def rtc_create_mock_document_room(): def _inner( - id: str, + room_id: str, + path_id: str, path: str, content: str, last_modified: datetime | None = None, save_delay: float | None = None, store: SQLiteYStore | None = None, ) -> tuple[FakeContentsManager, FileLoader, DocumentRoom]: - paths = {id: path} + paths = {path_id: path} if last_modified is None: cm = FakeContentsManager({"content": content}) @@ -208,7 +208,7 @@ def _inner( cm = FakeContentsManager({"last_modified": datetime.now(), "content": content}) loader = FileLoader( - id, + path_id, FakeFileIDManager(paths), cm, poll_interval=0.1, @@ -218,7 +218,7 @@ def _inner( cm, loader, DocumentRoom( - "test-room", "text", "file", loader, FakeEventLogger(), store, None, save_delay + room_id, "text", "file", loader, FakeEventLogger(), store, None, save_delay ), ) diff --git a/tests/test_rooms.py b/tests/test_rooms.py index 7ecc5ccd..9f4669bb 100644 --- a/tests/test_rooms.py +++ b/tests/test_rooms.py @@ -15,7 +15,7 @@ @pytest.mark.asyncio async def test_should_initialize_document_room_without_store(rtc_create_mock_document_room): content = "test" - _, _, room = rtc_create_mock_document_room("test-id", "test.txt", content) + _, _, room = rtc_create_mock_document_room("test-id", "test_path", "test.txt", content) await room.initialize() assert room._document.source == content @@ -29,10 +29,11 @@ async def test_should_initialize_document_room_from_store( # If the content from the store is different than the content from disk, # the room will initialize with the content from disk and overwrite the document - id = "test-id" + room_id = "test-id" + path_id = "test_path" content = "test" - store = await rtc_create_SQLite_store("file", id, content) - _, _, room = rtc_create_mock_document_room("test-id", "test.txt", content, store=store) + store = await rtc_create_SQLite_store("file", room_id, content) + _, _, room = rtc_create_mock_document_room(room_id, path_id, "test.txt", content, store=store) await room.initialize() assert room._document.source == content @@ -43,13 +44,13 @@ async def test_should_overwrite_the_store(rtc_create_SQLite_store, rtc_create_mo id = "test-id" content = "test" store = await rtc_create_SQLite_store("file", id, "whatever") - _, _, room = rtc_create_mock_document_room("test-id", "test.txt", content, store=store) + _, _, room = rtc_create_mock_document_room(id, "test_path", "test.txt", content, store=store) await room.initialize() assert room._document.source == content doc = YUnicode() - await store.apply_updates(doc.ydoc) + await store.apply_updates(id, doc.ydoc) assert doc.source == content @@ -59,7 +60,9 @@ async def test_defined_save_delay_should_save_content_after_document_change( rtc_create_mock_document_room, ): content = "test" - cm, _, room = rtc_create_mock_document_room("test-id", "test.txt", content, save_delay=0.01) + cm, _, room = rtc_create_mock_document_room( + "test-id", "test_path", "test.txt", content, save_delay=0.01 + ) await room.initialize() room._document.source = "Test 2" @@ -75,7 +78,9 @@ async def test_undefined_save_delay_should_not_save_content_after_document_chang rtc_create_mock_document_room, ): content = "test" - cm, _, room = rtc_create_mock_document_room("test-id", "test.txt", content, save_delay=None) + cm, _, room = rtc_create_mock_document_room( + "test-id", "test_path", "test.txt", content, save_delay=None + ) await room.initialize() room._document.source = "Test 2" @@ -92,7 +97,7 @@ async def test_should_reload_content_from_disk(rtc_create_mock_document_room): last_modified = datetime.now() cm, loader, room = rtc_create_mock_document_room( - "test-id", "test.txt", "whatever", last_modified + "test-id", "test_path", "test.txt", "whatever", last_modified ) await room.initialize() @@ -114,7 +119,9 @@ async def test_should_not_reload_content_from_disk(rtc_create_mock_document_room content = "test" last_modified = datetime.now() - cm, loader, room = rtc_create_mock_document_room("test-id", "test.txt", content, last_modified) + cm, loader, room = rtc_create_mock_document_room( + "test-id", "test_path", "test.txt", content, last_modified + ) await room.initialize()