diff --git a/jupyter_collaboration/app.py b/jupyter_collaboration/app.py index ff337460..62561201 100644 --- a/jupyter_collaboration/app.py +++ b/jupyter_collaboration/app.py @@ -6,7 +6,7 @@ from jupyter_server.extension.application import ExtensionApp from traitlets import Bool, Float, Type -from ypy_websocket.ystore import BaseYStore +from ypy_websocket.stores import BaseYStore from .handlers import DocSessionHandler, YDocWebSocketHandler from .loaders import FileLoaderMapping @@ -22,6 +22,8 @@ class YDocExtension(ExtensionApp): Enables Real Time Collaboration in JupyterLab """ + _store: BaseYStore = None + disable_rtc = Bool(False, config=True, help="Whether to disable real time collaboration.") file_poll_interval = Float( @@ -80,10 +82,12 @@ def initialize_handlers(self): for k, v in self.config.get(self.ystore_class.__name__, {}).items(): setattr(self.ystore_class, k, v) + # Instantiate the store + self._store = self.ystore_class(log=self.log) + self.ywebsocket_server = JupyterWebsocketServer( rooms_ready=False, auto_clean_rooms=False, - ystore_class=self.ystore_class, log=self.log, ) @@ -103,7 +107,7 @@ def initialize_handlers(self): "document_cleanup_delay": self.document_cleanup_delay, "document_save_delay": self.document_save_delay, "file_loaders": self.file_loaders, - "ystore_class": self.ystore_class, + "store": self._store, "ywebsocket_server": self.ywebsocket_server, }, ), @@ -120,3 +124,6 @@ async def stop_extension(self): ], timeout=3, ) + + if self._store is not None and self._store.started.is_set(): + self._store.stop() diff --git a/jupyter_collaboration/handlers.py b/jupyter_collaboration/handlers.py index 4f000041..afd5466c 100644 --- a/jupyter_collaboration/handlers.py +++ b/jupyter_collaboration/handlers.py @@ -14,8 +14,8 @@ from jupyter_ydoc import ydocs as YDOCS from tornado import web from tornado.websocket import WebSocketHandler +from ypy_websocket.stores import BaseYStore from ypy_websocket.websocket_server import YRoom -from ypy_websocket.ystore import BaseYStore from ypy_websocket.yutils import YMessageType, write_var_uint from .loaders import FileLoaderMapping @@ -62,6 +62,15 @@ def create_task(self, aw): task.add_done_callback(self._background_tasks.discard) async def prepare(self): + # NOTE: Initialize in the ExtensionApp.start_extension once + # https://github.com/jupyter-server/jupyter_server/issues/1329 + # is done. + # We are temporarily initializing the store here because `start`` + # is an async function + if self._store is not None and not self._store.started.is_set(): + await self._store.start() + await self._store.initialize() + if not self._websocket_server.started.is_set(): self.create_task(self._websocket_server.start()) await self._websocket_server.started.wait() @@ -84,15 +93,13 @@ async def prepare(self): ) file = self._file_loaders[file_id] - updates_file_path = f".{file_type}:{file_id}.y" - ystore = self._ystore_class(path=updates_file_path, log=self.log) self.room = DocumentRoom( self._room_id, file_format, file_type, file, self.event_logger, - ystore, + self._store, self.log, self._document_save_delay, ) @@ -111,7 +118,7 @@ def initialize( self, ywebsocket_server: JupyterWebsocketServer, file_loaders: FileLoaderMapping, - ystore_class: type[BaseYStore], + store: BaseYStore, document_cleanup_delay: float | None = 60.0, document_save_delay: float | None = 1.0, ) -> None: @@ -119,7 +126,7 @@ def initialize( # File ID manager cannot be passed as argument as the extension may load after this one self._file_id_manager = self.settings["file_id_manager"] self._file_loaders = file_loaders - self._ystore_class = ystore_class + self._store = store self._cleanup_delay = document_cleanup_delay self._document_save_delay = document_save_delay self._websocket_server = ywebsocket_server diff --git a/jupyter_collaboration/rooms.py b/jupyter_collaboration/rooms.py index 75e2a20e..2e755f9c 100644 --- a/jupyter_collaboration/rooms.py +++ b/jupyter_collaboration/rooms.py @@ -10,8 +10,8 @@ from jupyter_events import EventLogger from jupyter_ydoc import ydocs as YDOCS +from ypy_websocket.stores import BaseYStore from ypy_websocket.websocket_server import YRoom -from ypy_websocket.ystore import BaseYStore, YDocNotFound from ypy_websocket.yutils import write_var_uint from .loaders import FileLoader @@ -104,36 +104,28 @@ async def initialize(self) -> None: return self.log.info("Initializing room %s", self._room_id) - model = await self._file.load_content(self._file_format, self._file_type, True) async with self._update_lock: # try to apply Y updates from the YStore for this document - read_from_source = True - if self.ystore is not None: - try: - await self.ystore.apply_updates(self.ydoc) - self._emit( - LogLevel.INFO, - "load", - "Content loaded from the store {}".format( - self.ystore.__class__.__qualname__ - ), - ) - self.log.info( - "Content in room %s loaded from the ystore %s", - self._room_id, - self.ystore.__class__.__name__, - ) - read_from_source = False - except YDocNotFound: - # YDoc not found in the YStore, create the document from the source file (no change history) - pass + if self.ystore is not None and await self.ystore.exists(self._room_id): + # Load the content from the store + await self.ystore.apply_updates(self._room_id, self.ydoc) + self._emit( + LogLevel.INFO, + "load", + "Content loaded from the store {}".format( + self.ystore.__class__.__qualname__ + ), + ) + self.log.info( + "Content in room %s loaded from the ystore %s", + self._room_id, + self.ystore.__class__.__name__, + ) - if not read_from_source: # if YStore updates and source file are out-of-sync, resync updates with source if self._document.source != model["content"]: - # TODO: Delete document from the store. self._emit( LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore." ) @@ -142,17 +134,26 @@ async def initialize(self) -> None: self._file.path, self.ystore.__class__.__name__, ) - read_from_source = True - if read_from_source: + doc = await self.ystore.get(self._room_id) + await self.ystore.remove(self._room_id) + version = 0 + if "version" in doc: + version = doc["version"] + 1 + + await self.ystore.create(self._room_id, version) + await self.ystore.encode_state_as_update(self._room_id, self.ydoc) + + else: self._emit(LogLevel.INFO, "load", "Content loaded from disk.") self.log.info( "Content in room %s loaded from file %s", self._room_id, self._file.path ) self._document.source = model["content"] - if self.ystore: - await self.ystore.encode_state_as_update(self.ydoc) + if self.ystore is not None: + await self.ystore.create(self._room_id, 0) + await self.ystore.encode_state_as_update(self._room_id, self.ydoc) self._last_modified = model["last_modified"] self._document.dirty = False diff --git a/jupyter_collaboration/stores.py b/jupyter_collaboration/stores.py index 323e06ed..aa2de45f 100644 --- a/jupyter_collaboration/stores.py +++ b/jupyter_collaboration/stores.py @@ -1,14 +1,19 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. +from __future__ import annotations + +from logging import Logger + from traitlets import Int, Unicode from traitlets.config import LoggingConfigurable -from ypy_websocket.ystore import SQLiteYStore as _SQLiteYStore -from ypy_websocket.ystore import TempFileYStore as _TempFileYStore +from ypy_websocket.stores import FileYStore +from ypy_websocket.stores import SQLiteYStore as _SQLiteYStore -class TempFileYStore(_TempFileYStore): - prefix_dir = "jupyter_ystore_" +class TempFileYStore(FileYStore): + def __init__(self, log: Logger | None = None): + super().__init__(path=".jupyter_store", log=log) class SQLiteYStoreMetaclass(type(LoggingConfigurable), type(_SQLiteYStore)): # type: ignore @@ -17,7 +22,7 @@ class SQLiteYStoreMetaclass(type(LoggingConfigurable), type(_SQLiteYStore)): # class SQLiteYStore(LoggingConfigurable, _SQLiteYStore, metaclass=SQLiteYStoreMetaclass): db_path = Unicode( - ".jupyter_ystore.db", + ".jupyter_store.db", config=True, help="""The path to the YStore database. Defaults to '.jupyter_ystore.db' in the current directory.""", @@ -30,3 +35,6 @@ class SQLiteYStore(LoggingConfigurable, _SQLiteYStore, metaclass=SQLiteYStoreMet help="""The document time-to-live in seconds. Defaults to None (document history is never cleared).""", ) + + def __init__(self, log: Logger | None = None): + super().__init__(path=self.db_path, log=log) diff --git a/jupyter_collaboration/websocketserver.py b/jupyter_collaboration/websocketserver.py index f35c1148..ec989db0 100644 --- a/jupyter_collaboration/websocketserver.py +++ b/jupyter_collaboration/websocketserver.py @@ -9,7 +9,6 @@ from tornado.websocket import WebSocketHandler from ypy_websocket.websocket_server import WebsocketServer, YRoom -from ypy_websocket.ystore import BaseYStore class RoomNotFound(LookupError): @@ -27,13 +26,11 @@ class JupyterWebsocketServer(WebsocketServer): def __init__( self, - ystore_class: BaseYStore, rooms_ready: bool = True, auto_clean_rooms: bool = True, log: Logger | None = None, ): super().__init__(rooms_ready, auto_clean_rooms, log) - self.ystore_class = ystore_class self.ypatch_nb = 0 self.connected_users: dict[Any, Any] = {} # Async loop is not yet ready at the object instantiation diff --git a/tests/conftest.py b/tests/conftest.py index 234dbb3c..dd323d42 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -170,8 +170,10 @@ def rtc_create_SQLite_store(jp_serverapp): setattr(SQLiteYStore, k, v) async def _inner(type: str, path: str, content: str) -> DocumentRoom: - db = SQLiteYStore(path=f"{type}:{path}") + room_id = f"{type}:{path}" + db = SQLiteYStore() await db.start() + await db.initialize() if type == "notebook": doc = YNotebook() @@ -179,7 +181,9 @@ async def _inner(type: str, path: str, content: str) -> DocumentRoom: doc = YUnicode() doc.source = content - await db.encode_state_as_update(doc.ydoc) + + await db.create(room_id, 0) + await db.encode_state_as_update(room_id, doc.ydoc) return db