diff --git a/docs/source/configuration.md b/docs/source/configuration.md index bfe0b144..8a1f6784 100644 --- a/docs/source/configuration.md +++ b/docs/source/configuration.md @@ -26,6 +26,6 @@ jupyter lab --YDocExtension.file_poll_interval=2 # If None, the document will be kept in memory forever. jupyter lab --YDocExtension.document_cleanup_delay=100 -# The YStore class to use for storing Y updates (default: JupyterSQLiteYStore). -jupyter lab --YDocExtension.ystore_class=ypy_websocket.ystore.TempFileYStore +# The Store class used for storing Y updates (default: SQLiteYStore). +jupyter lab --YDocExtension.ystore_class=jupyter_collaboration.stores.FileYStore ``` diff --git a/docs/source/developer/architecture.md b/docs/source/developer/architecture.md index c0506086..2a45304e 100644 --- a/docs/source/developer/architecture.md +++ b/docs/source/developer/architecture.md @@ -4,6 +4,15 @@ COMING... +### Opening a document +![initialization](../images/initialization_diagram.png) + +### Autosave +![autosave](../images/autosave_diagram.png) + +### Conflict +![autosave](../images/conflict_diagram.png) + ## Early attempts Prior to the current implementation based on [Yjs](https://docs.yjs.dev/), other attempts using diff --git a/docs/source/images/autosave_diagram.png b/docs/source/images/autosave_diagram.png new file mode 100644 index 00000000..365fe104 Binary files /dev/null and b/docs/source/images/autosave_diagram.png differ diff --git a/docs/source/images/conflict_diagram.png b/docs/source/images/conflict_diagram.png new file mode 100644 index 00000000..bec7ebfa Binary files /dev/null and b/docs/source/images/conflict_diagram.png differ diff --git a/docs/source/images/initialization_diagram.png b/docs/source/images/initialization_diagram.png new file mode 100644 index 00000000..b0e98a2a Binary files /dev/null and b/docs/source/images/initialization_diagram.png differ diff --git a/jupyter_collaboration/app.py b/jupyter_collaboration/app.py index 37dd927e..7bce58fe 100644 --- a/jupyter_collaboration/app.py +++ b/jupyter_collaboration/app.py @@ -9,9 +9,9 @@ from .handlers import DocSessionHandler, YDocWebSocketHandler from .loaders import FileLoaderMapping +from .rooms import RoomManager from .stores import BaseYStore, SQLiteYStore from .utils import EVENTS_SCHEMA_PATH -from .websocketserver import JupyterWebsocketServer class YDocExtension(ExtensionApp): @@ -21,8 +21,6 @@ class YDocExtension(ExtensionApp): Enables Real Time Collaboration in JupyterLab """ - _store: BaseYStore = None - disable_rtc = Bool(False, config=True, help="Whether to disable real time collaboration.") file_poll_interval = Float( @@ -81,14 +79,7 @@ def initialize_handlers(self): for k, v in self.config.get(self.ystore_class.__name__, {}).items(): setattr(self.ystore_class, k, v) - # Instantiate the store - self._store = self.ystore_class(log=self.log) - - self.ywebsocket_server = JupyterWebsocketServer( - rooms_ready=False, - auto_clean_rooms=False, - log=self.log, - ) + self.store = self.ystore_class(log=self.log) # self.settings is local to the ExtensionApp but here we need # the global app settings in which the file id manager will register @@ -97,6 +88,14 @@ def initialize_handlers(self): self.serverapp.web_app.settings, self.log, self.file_poll_interval ) + self.room_manager = RoomManager( + self.store, + self.file_loaders, + self.serverapp.event_logger, + self.document_save_delay, + self.log, + ) + self.handlers.extend( [ ( @@ -104,10 +103,8 @@ def initialize_handlers(self): YDocWebSocketHandler, { "document_cleanup_delay": self.document_cleanup_delay, - "document_save_delay": self.document_save_delay, - "file_loaders": self.file_loaders, - "store": self._store, - "ywebsocket_server": self.ywebsocket_server, + "store": self.store, + "room_manager": self.room_manager, }, ), (r"/api/collaboration/session/(.*)", DocSessionHandler), @@ -118,7 +115,7 @@ async def stop_extension(self): # Cancel tasks and clean up await asyncio.wait( [ - asyncio.create_task(self.ywebsocket_server.clean()), + asyncio.create_task(self.room_manager.clear()), asyncio.create_task(self.file_loaders.clear()), ], timeout=3, diff --git a/jupyter_collaboration/handlers.py b/jupyter_collaboration/handlers.py index 0bbb985a..76f14a42 100644 --- a/jupyter_collaboration/handlers.py +++ b/jupyter_collaboration/handlers.py @@ -13,12 +13,11 @@ from jupyter_server.base.handlers import APIHandler, JupyterHandler from jupyter_ydoc import ydocs as YDOCS from tornado import web +from tornado.ioloop import IOLoop from tornado.websocket import WebSocketHandler -from ypy_websocket.websocket_server import YRoom -from ypy_websocket.yutils import YMessageType, write_var_uint +from ypy_websocket.yutils import write_var_uint -from .loaders import FileLoaderMapping -from .rooms import DocumentRoom, TransientRoom +from .rooms import BaseRoom, RoomManager from .stores import BaseYStore from .utils import ( JUPYTER_COLLABORATION_EVENTS_URI, @@ -26,7 +25,6 @@ MessageType, decode_file_path, ) -from .websocketserver import JupyterWebsocketServer YFILE = YDOCS["file"] @@ -53,84 +51,38 @@ class YDocWebSocketHandler(WebSocketHandler, JupyterHandler): receiving a message. """ + _room_id: str + room: BaseRoom + _serve_task: asyncio.Task | None _message_queue: asyncio.Queue[Any] - _background_tasks: set[asyncio.Task] - - def create_task(self, aw): - task = asyncio.create_task(aw) - self._background_tasks.add(task) - task.add_done_callback(self._background_tasks.discard) - - async def prepare(self): - # NOTE: Initialize in the ExtensionApp.start_extension once - # https://github.com/jupyter-server/jupyter_server/issues/1329 - # is done. - # We are temporarily initializing the store here because `start`` - # is an async function - if self._store is not None and not self._store.initialized: - await self._store.initialize() - - if not self._websocket_server.started.is_set(): - self.create_task(self._websocket_server.start()) - await self._websocket_server.started.wait() - - # Get room - self._room_id: str = self.request.path.split("/")[-1] - - if self._websocket_server.room_exists(self._room_id): - self.room: YRoom = await self._websocket_server.get_room(self._room_id) - - else: - if self._room_id.count(":") >= 2: - # DocumentRoom - file_format, file_type, file_id = decode_file_path(self._room_id) - if file_id in self._file_loaders: - self._emit( - LogLevel.WARNING, - None, - "There is another collaborative session accessing the same file.\nThe synchronization between rooms is not supported and you might lose some of your changes.", - ) - - file = self._file_loaders[file_id] - self.room = DocumentRoom( - self._room_id, - file_format, - file_type, - file, - self.event_logger, - self._store, - self.log, - self._document_save_delay, - ) - - else: - # TransientRoom - # it is a transient document (e.g. awareness) - self.room = TransientRoom(self._room_id, self.log) - - await self._websocket_server.start_room(self.room) - self._websocket_server.add_room(self._room_id, self.room) - - return await super().prepare() def initialize( self, - ywebsocket_server: JupyterWebsocketServer, - file_loaders: FileLoaderMapping, store: BaseYStore, + room_manager: RoomManager, document_cleanup_delay: float | None = 60.0, - document_save_delay: float | None = 1.0, ) -> None: - self._background_tasks = set() # File ID manager cannot be passed as argument as the extension may load after this one self._file_id_manager = self.settings["file_id_manager"] - self._file_loaders = file_loaders + self._store = store + self._room_manager = room_manager self._cleanup_delay = document_cleanup_delay - self._document_save_delay = document_save_delay - self._websocket_server = ywebsocket_server + + self._serve_task: asyncio.Task | None = None self._message_queue = asyncio.Queue() + async def prepare(self): + # NOTE: Initialize in the ExtensionApp.start_extension once + # https://github.com/jupyter-server/jupyter_server/issues/1329 + # is done. + # We are temporarily initializing the store here because the + # initialization is async + if not self._store.initialized: + await self._store.initialize() + + return await super().prepare() + @property def path(self): """ @@ -170,46 +122,43 @@ async def open(self, room_id): """ On connection open. """ - self.create_task(self._websocket_server.serve(self)) - - if isinstance(self.room, DocumentRoom): - # Close the connection if the document session expired - session_id = self.get_query_argument("sessionId", "") - if SERVER_SESSION != session_id: - self.close( - 1003, - f"Document session {session_id} expired. You need to reload this browser tab.", - ) - - # cancel the deletion of the room if it was scheduled - if self.room.cleaner is not None: - self.room.cleaner.cancel() - - try: - # Initialize the room - await self.room.initialize() - except Exception as e: - _, _, file_id = decode_file_path(self._room_id) - file = self._file_loaders[file_id] - - # Close websocket and propagate error. - if isinstance(e, web.HTTPError): - self.log.error(f"File {file.path} not found.\n{e!r}", exc_info=e) - self.close(1004, f"File {file.path} not found.") - else: - self.log.error(f"Error initializing: {file.path}\n{e!r}", exc_info=e) - self.close( - 1003, f"Error initializing: {file.path}. You need to close the document." - ) - - # Clean up the room and delete the file loader - if len(self.room.clients) == 0 or self.room.clients == [self]: - self._message_queue.put_nowait(b"") - self._cleanup_delay = 0 - await self._clean_room() + self._room_id = self.request.path.split("/")[-1] + + # Close the connection if the document session expired + session_id = self.get_query_argument("sessionId", None) + if session_id and SERVER_SESSION != session_id: + self.close( + 1003, + f"Document session {session_id} expired. You need to reload this browser tab.", + ) + try: + # Get room + self.room = await self._room_manager.get_room(self._room_id) + + except Exception as e: + _, _, file_id = decode_file_path(self._room_id) + path = self._file_id_manager.get_path(file_id) + + # Close websocket and propagate error. + if isinstance(e, web.HTTPError): + self.log.error(f"File {path} not found.\n{e!r}", exc_info=e) + self.close(1004, f"File {path} not found.") + else: + self.log.error(f"Error initializing: {path}\n{e!r}", exc_info=e) + self.close(1003, f"Error initializing: {path}. You need to close the document.") + + # Clean up the room and delete the file loader + if self.room is not None and len(self.room.clients) == 0 or self.room.clients == [self]: + self._message_queue.put_nowait(b"") + if self._serve_task: + self._serve_task.cancel() + await self._room_manager.remove_room(self._room_id) self._emit(LogLevel.INFO, "initialize", "New client connected.") + # Start processing messages in the room + self._serve_task = asyncio.create_task(self.room.serve(self)) + async def send(self, message): """ Send a message to the client. @@ -233,31 +182,6 @@ async def on_message(self, message): """ message_type = message[0] - if message_type == YMessageType.AWARENESS: - # awareness - skip = False - changes = self.room.awareness.get_changes(message[1:]) - added_users = changes["added"] - removed_users = changes["removed"] - for i, user in enumerate(added_users): - u = changes["states"][i] - if "user" in u: - name = u["user"]["name"] - self._websocket_server.connected_users[user] = name - self.log.debug("Y user joined: %s", name) - for user in removed_users: - if user in self._websocket_server.connected_users: - name = self._websocket_server.connected_users[user] - del self._websocket_server.connected_users[user] - self.log.debug("Y user left: %s", name) - # filter out message depending on changes - if skip: - self.log.debug( - "Filtered out Y message of type: %s", - YMessageType(message_type).name, - ) - return skip - if message_type == MessageType.ROOM: await self.room.handle_msg(message[1:]) @@ -269,16 +193,9 @@ async def on_message(self, message): {"sender": user.username, "timestamp": time.time(), "content": json.loads(msg)} ).encode("utf8") - for client in self.room.clients: - if client != self: - task = asyncio.create_task( - client.send(bytes([MessageType.CHAT]) + write_var_uint(len(data)) + data) - ) - self._websocket_server.background_tasks.add(task) - task.add_done_callback(self._websocket_server.background_tasks.discard) + self.room.broadcast_msg(bytes([MessageType.CHAT]) + write_var_uint(len(data)) + data) self._message_queue.put_nowait(message) - self._websocket_server.ypatch_nb += 1 def on_close(self) -> None: """ @@ -286,11 +203,16 @@ def on_close(self) -> None: """ # stop serving this client self._message_queue.put_nowait(b"") - if isinstance(self.room, DocumentRoom) and self.room.clients == [self]: + + if self._serve_task is not None and not self._serve_task.cancelled(): + self._serve_task.cancel() + + if self.room is not None and self.room.clients == [self]: # no client in this room after we disconnect - # keep the document for a while in case someone reconnects - self.log.info("Cleaning room: %s", self._room_id) - self.room.cleaner = asyncio.create_task(self._clean_room()) + # Remove the room with a delay in case someone reconnects + IOLoop.current().add_callback( + self._room_manager.remove_room, self._room_id, self._cleanup_delay + ) def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None: _, _, file_id = decode_file_path(self._room_id) @@ -304,42 +226,6 @@ def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = No self.event_logger.emit(schema_id=JUPYTER_COLLABORATION_EVENTS_URI, data=data) - async def _clean_room(self) -> None: - """ - Async task for cleaning up the resources. - - When all the clients of a room leave, we setup a task to clean up the resources - after a certain amount of time. We need to wait a few seconds to clean up the room - because sometimes websockets unintentionally disconnect. - - During the clean up, we need to delete the room to free resources since the room - contains a copy of the document. In addition, we remove the file if there is no rooms - subscribed to it. - """ - assert isinstance(self.room, DocumentRoom) - - if self._cleanup_delay is None: - return - - await asyncio.sleep(self._cleanup_delay) - - # Remove the room from the websocket server - self.log.info("Deleting Y document from memory: %s", self.room.room_id) - self._websocket_server.delete_room(room=self.room) - - # Clean room - del self.room - self.log.info("Room %s deleted", self._room_id) - self._emit(LogLevel.INFO, "clean", "Room deleted.") - - # Clean the file loader if there are not rooms using it - _, _, file_id = decode_file_path(self._room_id) - file = self._file_loaders[file_id] - if file.number_of_subscriptions == 0: - self.log.info("Deleting file %s", file.path) - await self._file_loaders.remove(file_id) - self._emit(LogLevel.INFO, "clean", "Loader deleted.") - def check_origin(self, origin): """ Check origin diff --git a/jupyter_collaboration/rooms/__init__.py b/jupyter_collaboration/rooms/__init__.py new file mode 100644 index 00000000..acd93d02 --- /dev/null +++ b/jupyter_collaboration/rooms/__init__.py @@ -0,0 +1,7 @@ +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. + +from .base import BaseRoom # noqa +from .document import DocumentRoom # noqa +from .manager import RoomManager # noqa +from .transient import TransientRoom # noqa diff --git a/jupyter_collaboration/rooms/base.py b/jupyter_collaboration/rooms/base.py new file mode 100644 index 00000000..15a16d1f --- /dev/null +++ b/jupyter_collaboration/rooms/base.py @@ -0,0 +1,41 @@ +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. + +from __future__ import annotations + +import asyncio +from logging import Logger + +from ypy_websocket.websocket_server import YRoom + +from ..stores import BaseYStore + + +class BaseRoom(YRoom): + def __init__(self, room_id: str, store: BaseYStore | None = None, log: Logger | None = None): + super().__init__(ready=False, ystore=store, log=log) + self._room_id = room_id + + @property + def room_id(self) -> str: + """ + The room ID. + """ + return self._room_id + + async def initialize(self) -> None: + return + + async def handle_msg(self, data: bytes) -> None: + return + + def broadcast_msg(self, msg: bytes) -> None: + for client in self.clients: + self._task_group.start_soon(client.send, msg) + + async def _broadcast_updates(self): + # FIXME should be upstreamed + try: + await super()._broadcast_updates() + except asyncio.CancelledError: + pass diff --git a/jupyter_collaboration/rooms.py b/jupyter_collaboration/rooms/document.py similarity index 87% rename from jupyter_collaboration/rooms.py rename to jupyter_collaboration/rooms/document.py index 2e755f9c..fd9340d9 100644 --- a/jupyter_collaboration/rooms.py +++ b/jupyter_collaboration/rooms/document.py @@ -10,23 +10,23 @@ from jupyter_events import EventLogger from jupyter_ydoc import ydocs as YDOCS -from ypy_websocket.stores import BaseYStore -from ypy_websocket.websocket_server import YRoom from ypy_websocket.yutils import write_var_uint -from .loaders import FileLoader -from .utils import ( +from ..loaders import FileLoader +from ..stores import BaseYStore +from ..utils import ( JUPYTER_COLLABORATION_EVENTS_URI, LogLevel, MessageType, OutOfBandChanges, RoomMessages, ) +from .base import BaseRoom YFILE = YDOCS["file"] -class DocumentRoom(YRoom): +class DocumentRoom(BaseRoom): """A Y room for a possibly stored document (e.g. a notebook).""" def __init__( @@ -36,13 +36,12 @@ def __init__( file_type: str, file: FileLoader, logger: EventLogger, - ystore: BaseYStore | None, + store: BaseYStore | None, log: Logger | None, save_delay: float | None = None, ): - super().__init__(ready=False, ystore=ystore, log=log) + super().__init__(room_id=room_id, store=store, log=log) - self._room_id: str = room_id self._file_format: str = file_format self._file_type: str = file_type self._last_modified: Any = None @@ -55,7 +54,6 @@ def __init__( self._update_lock = asyncio.Lock() self._outofband_lock = asyncio.Lock() self._initialization_lock = asyncio.Lock() - self._cleaner: asyncio.Task | None = None self._saving_document: asyncio.Task | None = None self._messages: dict[str, asyncio.Lock] = {} @@ -63,27 +61,6 @@ def __init__( self._document.observe(self._on_document_change) self._file.observe(self.room_id, self._on_content_change) - @property - def room_id(self) -> str: - """ - The room ID. - """ - return self._room_id - - @property - def cleaner(self) -> asyncio.Task | None: - """ - The task for cleaning up the resources. - """ - return self._cleaner - - @cleaner.setter - def cleaner(self, value: asyncio.Task) -> None: - """ - Setter for the clean up task. - """ - self._cleaner = value - async def initialize(self) -> None: """ Initializes the room. @@ -135,6 +112,9 @@ async def initialize(self) -> None: self.ystore.__class__.__name__, ) + # Update the content + self._document.source = model["content"] + doc = await self.ystore.get(self._room_id) await self.ystore.remove(self._room_id) version = 0 @@ -188,7 +168,7 @@ async def handle_msg(self, data: bytes) -> None: self._messages.pop(msg_id) data = msg_id.encode() self._outofband_lock.release() - await self._broadcast_msg( + self.broadcast_msg( bytes([MessageType.ROOM, ans]) + write_var_uint(len(data)) + data ) @@ -210,20 +190,14 @@ def stop(self) -> None: Cancels the save task and unsubscribes from the file. """ - super().stop() + self._document.unobserve() + self._file.unobserve(self.room_id) + # TODO: Should we cancel or wait ? if self._saving_document: self._saving_document.cancel() - self._document.unobserve() - self._file.unobserve(self.room_id) - - async def _broadcast_updates(self): - # FIXME should be upstreamed - try: - await super()._broadcast_updates() - except asyncio.CancelledError: - pass + return super().stop() async def _on_content_change(self, event: str, args: dict[str, Any]) -> None: """ @@ -246,7 +220,7 @@ async def _on_content_change(self, event: str, args: dict[str, Any]) -> None: self._messages[msg_id] = asyncio.Lock() await self._outofband_lock.acquire() data = msg_id.encode() - await self._broadcast_msg( + self.broadcast_msg( bytes([MessageType.ROOM, RoomMessages.FILE_CHANGED]) + write_var_uint(len(data)) + data @@ -368,30 +342,3 @@ async def _maybe_save_document(self) -> None: msg = f"Error saving file: {self._file.path}\n{e!r}" self.log.error(msg, exc_info=e) self._emit(LogLevel.ERROR, None, msg) - - async def _broadcast_msg(self, msg: bytes) -> None: - for client in self.clients: - await client.send(msg) - - -class TransientRoom(YRoom): - """A Y room for sharing state (e.g. awareness).""" - - def __init__(self, room_id: str, log: Logger | None): - super().__init__(log=log) - - self._room_id = room_id - - @property - def room_id(self) -> str: - """ - The room ID. - """ - return self._room_id - - async def _broadcast_updates(self): - # FIXME should be upstreamed - try: - await super()._broadcast_updates() - except asyncio.CancelledError: - pass diff --git a/jupyter_collaboration/rooms/manager.py b/jupyter_collaboration/rooms/manager.py new file mode 100644 index 00000000..c38e1498 --- /dev/null +++ b/jupyter_collaboration/rooms/manager.py @@ -0,0 +1,174 @@ +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. + +from __future__ import annotations + +import asyncio +from logging import Logger, getLogger +from typing import Any + +from ..loaders import FileLoaderMapping +from ..stores import BaseYStore +from ..utils import JUPYTER_COLLABORATION_EVENTS_URI, LogLevel, decode_file_path +from .base import BaseRoom +from .document import DocumentRoom +from .transient import TransientRoom + + +class RoomManager: + """Map IDs to rooms.""" + + def __init__( + self, + store: BaseYStore, + file_loaders: FileLoaderMapping, + event_logger: Any, + document_save_delay: float | None = 1.0, + log: Logger | None = None, + ) -> None: + self._store = store + self._file_loaders = file_loaders + self._event_logger = event_logger + + self._document_save_delay = document_save_delay + self.log = log or getLogger(__name__) + + self._rooms: dict[str, BaseRoom] = {} + self._room_tasks: dict[str, asyncio.Task] = {} + self._clean_up_tasks: dict[str, asyncio.Task] = {} + + self._lock = asyncio.Lock() + + def has_room(self, room_id: str) -> bool: + """Test if an id has a room.""" + return room_id in self._rooms + + async def get_room(self, room_id: str) -> BaseRoom: + """ + Get the room for a given id. + + NOTE: If the room doesn't exits, it will create and return + a new one. + """ + # Use a lock to make sure two clients don't create + # the same room. + async with self._lock: + # Cancel the clean up task if exists + if room_id in self._clean_up_tasks: + task = self._clean_up_tasks.pop(room_id) + task.cancel() + + room = self._rooms.get(room_id, None) + if room is not None: + return room + + if room_id.count(":") >= 2: + # DocumentRoom + room = self._create_document_room(room_id) + + else: + # TransientRoom + # it is a transient document (e.g. awareness) + room = TransientRoom(room_id, self.log) + + self._rooms[room_id] = room + if not room.started.is_set(): + self._room_tasks[room_id] = asyncio.create_task(room.start()) + + if not room.ready: + await room.initialize() + + return room + + async def remove_room(self, room_id: str, delay: float = 0) -> None: + """Remove the room for a given id.""" + # Use lock to make sure while a client is creating the + # clean up task, no one else is accessing the room or trying to + # deleted as well + async with self._lock: + if room_id in self._clean_up_tasks: + return + + # NOTE: Should we check if there is only one client? + # if len(self._rooms[room_id].clients) <= 1: + self._clean_up_tasks[room_id] = asyncio.create_task(self._clean_up_room(room_id, delay)) + + async def clear(self) -> None: + """Clear all rooms.""" + tasks = [] + for id in list(self._rooms): + tasks.append(asyncio.create_task(self._clean_up_room(id, 0))) + + await asyncio.gather(*tasks) + + def _create_document_room(self, room_id: str) -> DocumentRoom: + file_format, file_type, file_id = decode_file_path(room_id) + if file_id in self._file_loaders: + self._emit( + room_id, + LogLevel.WARNING, + None, + "There is another collaborative session accessing the same file.\nThe synchronization between rooms is not supported and you might lose some of your changes.", + ) + + file = self._file_loaders[file_id] + return DocumentRoom( + room_id, + file_format, + file_type, + file, + self._event_logger, + self._store, + self.log, + self._document_save_delay, + ) + + async def _clean_up_room(self, room_id: str, delay: float) -> None: + """ + Async task for cleaning up the resources. + + When all the clients of a room leave, we setup a task to clean up the resources + after a certain amount of time. We need to wait a few seconds to clean up the room + because sometimes websockets unintentionally disconnect. + + During the clean up, we need to delete the room to free resources since the room + contains a copy of the document. In addition, we remove the file if there is no rooms + subscribed to it. + """ + self.log.info("Cleaning room: %s", room_id) + + await asyncio.sleep(delay) + + # Remove the room + room = self._rooms.pop(room_id) + room.stop() + + task = self._room_tasks.pop(room_id) + await task + + self.log.info("Room %s deleted", room_id) + self._emit(room_id, LogLevel.INFO, "clean", "Room deleted.") + + # Clean the file loader if there are not rooms using it + _, _, file_id = decode_file_path(room_id) + file = self._file_loaders[file_id] + if file.number_of_subscriptions == 0: + await self._file_loaders.remove(file_id) + self.log.info("Loader %s deleted", file.path) + self._emit(room_id, LogLevel.INFO, "clean", "Loader deleted.") + + del self._clean_up_tasks[room_id] + + def _emit( + self, room_id: str, level: LogLevel, action: str | None = None, msg: str | None = None + ) -> None: + _, _, file_id = decode_file_path(room_id) + path = self._file_loaders.file_id_manager.get_path(file_id) + + data = {"level": level.value, "room": room_id, "path": path} + if action: + data["action"] = action + if msg: + data["msg"] = msg + + self._event_logger.emit(schema_id=JUPYTER_COLLABORATION_EVENTS_URI, data=data) diff --git a/jupyter_collaboration/rooms/transient.py b/jupyter_collaboration/rooms/transient.py new file mode 100644 index 00000000..9772dc48 --- /dev/null +++ b/jupyter_collaboration/rooms/transient.py @@ -0,0 +1,15 @@ +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. + +from __future__ import annotations + +from logging import Logger + +from .base import BaseRoom + + +class TransientRoom(BaseRoom): + """A Y room for sharing state (e.g. awareness).""" + + def __init__(self, room_id: str, log: Logger | None): + super().__init__(room_id=room_id, log=log) diff --git a/jupyter_collaboration/stores/__init__.py b/jupyter_collaboration/stores/__init__.py index fb4893c0..d70e1605 100644 --- a/jupyter_collaboration/stores/__init__.py +++ b/jupyter_collaboration/stores/__init__.py @@ -2,5 +2,7 @@ # Distributed under the terms of the Modified BSD License. from .base_store import BaseYStore # noqa +from .file_store import FileYStore # noqa +from .sqlite_store import SQLiteYStore as _SQLiteYStore # noqa from .stores import SQLiteYStore, TempFileYStore # noqa from .utils import YDocExists, YDocNotFound # noqa diff --git a/jupyter_collaboration/stores/base_store.py b/jupyter_collaboration/stores/base_store.py index 97039ae3..0785b41d 100644 --- a/jupyter_collaboration/stores/base_store.py +++ b/jupyter_collaboration/stores/base_store.py @@ -143,7 +143,7 @@ async def encode_state_as_update(self, path: str, ydoc: Y.YDoc) -> None: path: The document name/path. ydoc: The YDoc from which to store the state. """ - update = Y.encode_state_as_update(ydoc) # type: ignore + update = Y.encode_state_as_update(ydoc) await self.write(path, update) async def apply_updates(self, path: str, ydoc: Y.YDoc) -> None: @@ -154,4 +154,4 @@ async def apply_updates(self, path: str, ydoc: Y.YDoc) -> None: ydoc: The YDoc on which to apply the updates. """ async for update, *rest in self.read(path): # type: ignore - Y.apply_update(ydoc, update) # type: ignore + Y.apply_update(ydoc, update) diff --git a/jupyter_collaboration/stores/file_store.py b/jupyter_collaboration/stores/file_store.py index 00289767..2ea2167a 100644 --- a/jupyter_collaboration/stores/file_store.py +++ b/jupyter_collaboration/stores/file_store.py @@ -4,15 +4,13 @@ from __future__ import annotations import struct -import tempfile import time from logging import Logger, getLogger from pathlib import Path -from typing import AsyncIterator, Awaitable, Callable +from typing import Any, AsyncIterator, Awaitable, Callable import anyio from anyio import Event, Lock -from deprecated import deprecated from ypy_websocket.yutils import Decoder, get_new_path, write_var_uint from .base_store import BaseYStore @@ -235,7 +233,7 @@ async def _get_data_offset(self, path: Path) -> int: except Exception: raise YDocNotFound(f"File {str(path)} not found.") - async def _decode_data(self, data) -> AsyncIterator[tuple[bytes, bytes, float]]: + async def _decode_data(self, data: Any) -> AsyncIterator[tuple[bytes, bytes, float]]: i = 0 for d in Decoder(data).read_messages(): if i == 0: @@ -249,60 +247,3 @@ async def _decode_data(self, data) -> AsyncIterator[tuple[bytes, bytes, float]]: def _get_document_path(self, path: str) -> Path: return Path(self._store_path, path + ".y") - - -@deprecated(reason="Use FileYStore instead") -class TempFileYStore(FileYStore): - """ - A YStore which uses the system's temporary directory. - Files are writen under a common directory. - To prefix the directory name (e.g. /tmp/my_prefix_b4whmm7y/): - - ```py - class PrefixTempFileYStore(TempFileYStore): - prefix_dir = "my_prefix_" - ``` - - ## Note: - This class is deprecated. Use FileYStore and pass the tmp folder - as path argument. For example: - - ```py - tmp_dir = tempfile.mkdtemp(prefix="prefix/directory/") - store = FileYStore(tmp_dir) - ``` - """ - - prefix_dir: str | None = None - base_dir: str | None = None - - def __init__( - self, - path: str, - metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None, - log: Logger | None = None, - ): - """Initialize the object. - - Arguments: - path: The file path used to store the updates. - metadata_callback: An optional callback to call to get the metadata. - log: An optional logger. - """ - full_path = str(Path(self.get_base_dir()) / path) - super().__init__(full_path, metadata_callback=metadata_callback, log=log) - - def get_base_dir(self) -> str: - """Get the base directory where the update file is written. - - Returns: - The base directory path. - """ - if self.base_dir is None: - self.make_directory() - assert self.base_dir is not None - return self.base_dir - - def make_directory(self): - """Create the base directory where the update file is written.""" - type(self).base_dir = tempfile.mkdtemp(prefix=self.prefix_dir) diff --git a/jupyter_collaboration/stores/utils.py b/jupyter_collaboration/stores/utils.py index 79f6b7f3..6e8d71e4 100644 --- a/jupyter_collaboration/stores/utils.py +++ b/jupyter_collaboration/stores/utils.py @@ -1,6 +1,7 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. + class YDocNotFound(Exception): pass diff --git a/pyproject.toml b/pyproject.toml index f501a189..ab6cdc17 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,7 +32,8 @@ dependencies = [ "ypy-websocket>=0.12.1,<0.13.0", "jupyter_events>=0.7.0", "jupyter_server_fileid>=0.7.0,<1", - "jsonschema>=4.18.0" + "jsonschema>=4.18.0", + "anyio >=3.6.2,<5" ] dynamic = ["version", "description", "authors", "urls", "keywords"] @@ -152,7 +153,7 @@ disallow_any_generics = false disallow_incomplete_defs = true disallow_untyped_decorators = true no_implicit_optional = true -no_implicit_reexport = true +no_implicit_reexport = false pretty = true show_error_context = true show_error_codes = true diff --git a/tests/conftest.py b/tests/conftest.py index dd323d42..3e9bf5ca 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -5,6 +5,7 @@ import json from asyncio import Event, sleep from datetime import datetime +from logging import getLogger from typing import Any import nbformat @@ -170,9 +171,7 @@ def rtc_create_SQLite_store(jp_serverapp): setattr(SQLiteYStore, k, v) async def _inner(type: str, path: str, content: str) -> DocumentRoom: - room_id = f"{type}:{path}" - db = SQLiteYStore() - await db.start() + db = SQLiteYStore(log=getLogger(__name__)) await db.initialize() if type == "notebook": @@ -182,8 +181,8 @@ async def _inner(type: str, path: str, content: str) -> DocumentRoom: doc.source = content - await db.create(room_id, 0) - await db.encode_state_as_update(room_id, doc.ydoc) + await db.create(path, 0) + await db.encode_state_as_update(path, doc.ydoc) return db @@ -193,14 +192,15 @@ async def _inner(type: str, path: str, content: str) -> DocumentRoom: @pytest.fixture def rtc_create_mock_document_room(): def _inner( - id: str, + room_id: str, + path_id: str, path: str, content: str, last_modified: datetime | None = None, save_delay: float | None = None, store: SQLiteYStore | None = None, ) -> tuple[FakeContentsManager, FileLoader, DocumentRoom]: - paths = {id: path} + paths = {path_id: path} if last_modified is None: cm = FakeContentsManager({"content": content}) @@ -208,7 +208,7 @@ def _inner( cm = FakeContentsManager({"last_modified": datetime.now(), "content": content}) loader = FileLoader( - id, + path_id, FakeFileIDManager(paths), cm, poll_interval=0.1, @@ -218,7 +218,7 @@ def _inner( cm, loader, DocumentRoom( - "test-room", "text", "file", loader, FakeEventLogger(), store, None, save_delay + room_id, "text", "file", loader, FakeEventLogger(), store, None, save_delay ), ) diff --git a/tests/test_rooms.py b/tests/test_rooms.py index 7ecc5ccd..9f4669bb 100644 --- a/tests/test_rooms.py +++ b/tests/test_rooms.py @@ -15,7 +15,7 @@ @pytest.mark.asyncio async def test_should_initialize_document_room_without_store(rtc_create_mock_document_room): content = "test" - _, _, room = rtc_create_mock_document_room("test-id", "test.txt", content) + _, _, room = rtc_create_mock_document_room("test-id", "test_path", "test.txt", content) await room.initialize() assert room._document.source == content @@ -29,10 +29,11 @@ async def test_should_initialize_document_room_from_store( # If the content from the store is different than the content from disk, # the room will initialize with the content from disk and overwrite the document - id = "test-id" + room_id = "test-id" + path_id = "test_path" content = "test" - store = await rtc_create_SQLite_store("file", id, content) - _, _, room = rtc_create_mock_document_room("test-id", "test.txt", content, store=store) + store = await rtc_create_SQLite_store("file", room_id, content) + _, _, room = rtc_create_mock_document_room(room_id, path_id, "test.txt", content, store=store) await room.initialize() assert room._document.source == content @@ -43,13 +44,13 @@ async def test_should_overwrite_the_store(rtc_create_SQLite_store, rtc_create_mo id = "test-id" content = "test" store = await rtc_create_SQLite_store("file", id, "whatever") - _, _, room = rtc_create_mock_document_room("test-id", "test.txt", content, store=store) + _, _, room = rtc_create_mock_document_room(id, "test_path", "test.txt", content, store=store) await room.initialize() assert room._document.source == content doc = YUnicode() - await store.apply_updates(doc.ydoc) + await store.apply_updates(id, doc.ydoc) assert doc.source == content @@ -59,7 +60,9 @@ async def test_defined_save_delay_should_save_content_after_document_change( rtc_create_mock_document_room, ): content = "test" - cm, _, room = rtc_create_mock_document_room("test-id", "test.txt", content, save_delay=0.01) + cm, _, room = rtc_create_mock_document_room( + "test-id", "test_path", "test.txt", content, save_delay=0.01 + ) await room.initialize() room._document.source = "Test 2" @@ -75,7 +78,9 @@ async def test_undefined_save_delay_should_not_save_content_after_document_chang rtc_create_mock_document_room, ): content = "test" - cm, _, room = rtc_create_mock_document_room("test-id", "test.txt", content, save_delay=None) + cm, _, room = rtc_create_mock_document_room( + "test-id", "test_path", "test.txt", content, save_delay=None + ) await room.initialize() room._document.source = "Test 2" @@ -92,7 +97,7 @@ async def test_should_reload_content_from_disk(rtc_create_mock_document_room): last_modified = datetime.now() cm, loader, room = rtc_create_mock_document_room( - "test-id", "test.txt", "whatever", last_modified + "test-id", "test_path", "test.txt", "whatever", last_modified ) await room.initialize() @@ -114,7 +119,9 @@ async def test_should_not_reload_content_from_disk(rtc_create_mock_document_room content = "test" last_modified = datetime.now() - cm, loader, room = rtc_create_mock_document_room("test-id", "test.txt", content, last_modified) + cm, loader, room = rtc_create_mock_document_room( + "test-id", "test_path", "test.txt", content, last_modified + ) await room.initialize()