diff --git a/jupyter_collaboration/app.py b/jupyter_collaboration/app.py index ff337460..37dd927e 100644 --- a/jupyter_collaboration/app.py +++ b/jupyter_collaboration/app.py @@ -6,11 +6,10 @@ from jupyter_server.extension.application import ExtensionApp from traitlets import Bool, Float, Type -from ypy_websocket.ystore import BaseYStore from .handlers import DocSessionHandler, YDocWebSocketHandler from .loaders import FileLoaderMapping -from .stores import SQLiteYStore +from .stores import BaseYStore, SQLiteYStore from .utils import EVENTS_SCHEMA_PATH from .websocketserver import JupyterWebsocketServer @@ -22,6 +21,8 @@ class YDocExtension(ExtensionApp): Enables Real Time Collaboration in JupyterLab """ + _store: BaseYStore = None + disable_rtc = Bool(False, config=True, help="Whether to disable real time collaboration.") file_poll_interval = Float( @@ -80,10 +81,12 @@ def initialize_handlers(self): for k, v in self.config.get(self.ystore_class.__name__, {}).items(): setattr(self.ystore_class, k, v) + # Instantiate the store + self._store = self.ystore_class(log=self.log) + self.ywebsocket_server = JupyterWebsocketServer( rooms_ready=False, auto_clean_rooms=False, - ystore_class=self.ystore_class, log=self.log, ) @@ -103,7 +106,7 @@ def initialize_handlers(self): "document_cleanup_delay": self.document_cleanup_delay, "document_save_delay": self.document_save_delay, "file_loaders": self.file_loaders, - "ystore_class": self.ystore_class, + "store": self._store, "ywebsocket_server": self.ywebsocket_server, }, ), diff --git a/jupyter_collaboration/handlers.py b/jupyter_collaboration/handlers.py index 4f000041..0bbb985a 100644 --- a/jupyter_collaboration/handlers.py +++ b/jupyter_collaboration/handlers.py @@ -15,11 +15,11 @@ from tornado import web from tornado.websocket import WebSocketHandler from ypy_websocket.websocket_server import YRoom -from ypy_websocket.ystore import BaseYStore from ypy_websocket.yutils import YMessageType, write_var_uint from .loaders import FileLoaderMapping from .rooms import DocumentRoom, TransientRoom +from .stores import BaseYStore from .utils import ( JUPYTER_COLLABORATION_EVENTS_URI, LogLevel, @@ -62,6 +62,14 @@ def create_task(self, aw): task.add_done_callback(self._background_tasks.discard) async def prepare(self): + # NOTE: Initialize in the ExtensionApp.start_extension once + # https://github.com/jupyter-server/jupyter_server/issues/1329 + # is done. + # We are temporarily initializing the store here because `start`` + # is an async function + if self._store is not None and not self._store.initialized: + await self._store.initialize() + if not self._websocket_server.started.is_set(): self.create_task(self._websocket_server.start()) await self._websocket_server.started.wait() @@ -84,15 +92,13 @@ async def prepare(self): ) file = self._file_loaders[file_id] - updates_file_path = f".{file_type}:{file_id}.y" - ystore = self._ystore_class(path=updates_file_path, log=self.log) self.room = DocumentRoom( self._room_id, file_format, file_type, file, self.event_logger, - ystore, + self._store, self.log, self._document_save_delay, ) @@ -111,7 +117,7 @@ def initialize( self, ywebsocket_server: JupyterWebsocketServer, file_loaders: FileLoaderMapping, - ystore_class: type[BaseYStore], + store: BaseYStore, document_cleanup_delay: float | None = 60.0, document_save_delay: float | None = 1.0, ) -> None: @@ -119,7 +125,7 @@ def initialize( # File ID manager cannot be passed as argument as the extension may load after this one self._file_id_manager = self.settings["file_id_manager"] self._file_loaders = file_loaders - self._ystore_class = ystore_class + self._store = store self._cleanup_delay = document_cleanup_delay self._document_save_delay = document_save_delay self._websocket_server = ywebsocket_server diff --git a/jupyter_collaboration/rooms.py b/jupyter_collaboration/rooms.py index 75e2a20e..2e755f9c 100644 --- a/jupyter_collaboration/rooms.py +++ b/jupyter_collaboration/rooms.py @@ -10,8 +10,8 @@ from jupyter_events import EventLogger from jupyter_ydoc import ydocs as YDOCS +from ypy_websocket.stores import BaseYStore from ypy_websocket.websocket_server import YRoom -from ypy_websocket.ystore import BaseYStore, YDocNotFound from ypy_websocket.yutils import write_var_uint from .loaders import FileLoader @@ -104,36 +104,28 @@ async def initialize(self) -> None: return self.log.info("Initializing room %s", self._room_id) - model = await self._file.load_content(self._file_format, self._file_type, True) async with self._update_lock: # try to apply Y updates from the YStore for this document - read_from_source = True - if self.ystore is not None: - try: - await self.ystore.apply_updates(self.ydoc) - self._emit( - LogLevel.INFO, - "load", - "Content loaded from the store {}".format( - self.ystore.__class__.__qualname__ - ), - ) - self.log.info( - "Content in room %s loaded from the ystore %s", - self._room_id, - self.ystore.__class__.__name__, - ) - read_from_source = False - except YDocNotFound: - # YDoc not found in the YStore, create the document from the source file (no change history) - pass + if self.ystore is not None and await self.ystore.exists(self._room_id): + # Load the content from the store + await self.ystore.apply_updates(self._room_id, self.ydoc) + self._emit( + LogLevel.INFO, + "load", + "Content loaded from the store {}".format( + self.ystore.__class__.__qualname__ + ), + ) + self.log.info( + "Content in room %s loaded from the ystore %s", + self._room_id, + self.ystore.__class__.__name__, + ) - if not read_from_source: # if YStore updates and source file are out-of-sync, resync updates with source if self._document.source != model["content"]: - # TODO: Delete document from the store. self._emit( LogLevel.INFO, "initialize", "The file is out-of-sync with the ystore." ) @@ -142,17 +134,26 @@ async def initialize(self) -> None: self._file.path, self.ystore.__class__.__name__, ) - read_from_source = True - if read_from_source: + doc = await self.ystore.get(self._room_id) + await self.ystore.remove(self._room_id) + version = 0 + if "version" in doc: + version = doc["version"] + 1 + + await self.ystore.create(self._room_id, version) + await self.ystore.encode_state_as_update(self._room_id, self.ydoc) + + else: self._emit(LogLevel.INFO, "load", "Content loaded from disk.") self.log.info( "Content in room %s loaded from file %s", self._room_id, self._file.path ) self._document.source = model["content"] - if self.ystore: - await self.ystore.encode_state_as_update(self.ydoc) + if self.ystore is not None: + await self.ystore.create(self._room_id, 0) + await self.ystore.encode_state_as_update(self._room_id, self.ydoc) self._last_modified = model["last_modified"] self._document.dirty = False diff --git a/jupyter_collaboration/stores/__init__.py b/jupyter_collaboration/stores/__init__.py new file mode 100644 index 00000000..fb4893c0 --- /dev/null +++ b/jupyter_collaboration/stores/__init__.py @@ -0,0 +1,6 @@ +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. + +from .base_store import BaseYStore # noqa +from .stores import SQLiteYStore, TempFileYStore # noqa +from .utils import YDocExists, YDocNotFound # noqa diff --git a/jupyter_collaboration/stores/base_store.py b/jupyter_collaboration/stores/base_store.py new file mode 100644 index 00000000..97039ae3 --- /dev/null +++ b/jupyter_collaboration/stores/base_store.py @@ -0,0 +1,157 @@ +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. + +from __future__ import annotations + +from abc import ABC, abstractmethod +from inspect import isawaitable +from typing import AsyncIterator, Awaitable, Callable, cast + +import y_py as Y +from anyio import Event + + +class BaseYStore(ABC): + """ + Base class for the stores. + """ + + version = 3 + metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None + + _store_path: str + _initialized: Event | None = None + + @abstractmethod + def __init__( + self, path: str, metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None + ): + """ + Initialize the object. + + Arguments: + path: The path where the store will be located. + metadata_callback: An optional callback to call to get the metadata. + log: An optional logger. + """ + ... + + @abstractmethod + async def initialize(self) -> None: + """ + Initializes the store. + """ + ... + + @abstractmethod + async def exists(self, path: str) -> bool: + """ + Returns True if the document exists, else returns False. + + Arguments: + path: The document name/path. + """ + ... + + @abstractmethod + async def list(self) -> AsyncIterator[str]: + """ + Returns a list with the name/path of the documents stored. + """ + ... + + @abstractmethod + async def get(self, path: str, updates: bool = False) -> dict | None: + """ + Returns the document's metadata or None if the document does't exist. + + Arguments: + path: The document name/path. + updates: Whether to return document's content or only the metadata. + """ + ... + + @abstractmethod + async def create(self, path: str, version: int) -> None: + """ + Creates a new document. + + Arguments: + path: The document name/path. + version: Document version. + """ + ... + + @abstractmethod + async def remove(self, path: str) -> dict | None: + """ + Removes a document. + + Arguments: + path: The document name/path. + """ + ... + + @abstractmethod + async def write(self, path: str, data: bytes) -> None: + """ + Store a document update. + + Arguments: + path: The document name/path. + data: The update to store. + """ + ... + + @abstractmethod + async def read(self, path: str) -> AsyncIterator[tuple[bytes, bytes]]: + """ + Async iterator for reading document's updates. + + Arguments: + path: The document name/path. + + Returns: + A tuple of (update, metadata, timestamp) for each update. + """ + ... + + @property + def initialized(self) -> bool: + if self._initialized is not None: + return self._initialized.is_set() + return False + + async def get_metadata(self) -> bytes: + """ + Returns: + The metadata. + """ + if self.metadata_callback is None: + return b"" + + metadata = self.metadata_callback() + if isawaitable(metadata): + metadata = await metadata + metadata = cast(bytes, metadata) + return metadata + + async def encode_state_as_update(self, path: str, ydoc: Y.YDoc) -> None: + """Store a YDoc state. + + Arguments: + path: The document name/path. + ydoc: The YDoc from which to store the state. + """ + update = Y.encode_state_as_update(ydoc) # type: ignore + await self.write(path, update) + + async def apply_updates(self, path: str, ydoc: Y.YDoc) -> None: + """Apply all stored updates to the YDoc. + + Arguments: + path: The document name/path. + ydoc: The YDoc on which to apply the updates. + """ + async for update, *rest in self.read(path): # type: ignore + Y.apply_update(ydoc, update) # type: ignore diff --git a/jupyter_collaboration/stores/file_store.py b/jupyter_collaboration/stores/file_store.py new file mode 100644 index 00000000..00289767 --- /dev/null +++ b/jupyter_collaboration/stores/file_store.py @@ -0,0 +1,308 @@ +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. + +from __future__ import annotations + +import struct +import tempfile +import time +from logging import Logger, getLogger +from pathlib import Path +from typing import AsyncIterator, Awaitable, Callable + +import anyio +from anyio import Event, Lock +from deprecated import deprecated +from ypy_websocket.yutils import Decoder, get_new_path, write_var_uint + +from .base_store import BaseYStore +from .utils import YDocExists, YDocNotFound + + +class FileYStore(BaseYStore): + """A YStore which uses one file per document.""" + + _lock: Lock + metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None + + def __init__( + self, + path: str = "./ystore", + metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None, + log: Logger | None = None, + ) -> None: + """Initialize the object. + + Arguments: + path: The file path used to store the updates. + metadata_callback: An optional callback to call to get the metadata. + log: An optional logger. + """ + self._lock = Lock() + self._store_path = path + self.metadata_callback = metadata_callback + self.log = log or getLogger(__name__) + + async def initialize(self) -> None: + """ + Initializes the store. + """ + if self.initialized or self._initialized is not None: + return + self._initialized = Event() + + version_path = Path(self._store_path, "__version__") + if not await anyio.Path(self._store_path).exists(): + await anyio.Path(self._store_path).mkdir(parents=True, exist_ok=True) + + version = -1 + create_version = False + if await anyio.Path(version_path).exists(): + async with await anyio.open_file(version_path, "rb") as f: + version = int(await f.readline()) + + # Store version mismatch. Move store and create a new one. + if self.version != version: + create_version = True + + if create_version: + new_path = await get_new_path(self._store_path) + self.log.warning( + f"YStore version mismatch, moving {self._store_path} to {new_path}" + ) + await anyio.Path(self._store_path).rename(new_path) + await anyio.Path(self._store_path).mkdir(parents=True, exist_ok=True) + + else: + create_version = True + + if create_version: + async with await anyio.open_file(version_path, "wb") as f: + version_bytes = str(self.version).encode() + await f.write(version_bytes) + + self._initialized.set() + + async def exists(self, path: str) -> bool: + """ + Returns True if the document exists, else returns False. + + Arguments: + path: The document name/path. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + return await anyio.Path(self._get_document_path(path)).exists() + + async def list(self) -> AsyncIterator[str]: # type: ignore[override] + """ + Returns a list with the name/path of the documents stored. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async for child in anyio.Path(self._store_path).glob("**/*.y"): + yield child.relative_to(self._store_path).with_suffix("").as_posix() + + async def get(self, path: str, updates: bool = False) -> dict | None: + """ + Returns the document's metadata and updates or None if the document does't exist. + + Arguments: + path: The document name/path. + updates: Whether to return document's content or only the metadata. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + file_path = self._get_document_path(path) + if not await anyio.Path(file_path).exists(): + return None + else: + version = None + async with await anyio.open_file(file_path, "rb") as f: + header = await f.read(8) + if header == b"VERSION:": + version = int(await f.readline()) + + list_updates: list[tuple[bytes, bytes, float]] = [] + if updates: + data = await f.read() + async for update, metadata, timestamp in self._decode_data(data): + list_updates.append((update, metadata, timestamp)) + + return dict(path=path, version=version, updates=list_updates) + + async def create(self, path: str, version: int) -> None: + """ + Creates a new document. + + Arguments: + path: The document name/path. + version: Document version. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + file_path = self._get_document_path(path) + if await anyio.Path(file_path).exists(): + raise YDocExists(f"The document {path} already exists.") + + else: + await anyio.Path(file_path.parent).mkdir(parents=True, exist_ok=True) + async with await anyio.open_file(file_path, "wb") as f: + version_bytes = f"VERSION:{version}\n".encode() + await f.write(version_bytes) + + async def remove(self, path: str) -> None: + """ + Removes a document. + + Arguments: + path: The document name/path. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + file_path = self._get_document_path(path) + if await anyio.Path(file_path).exists(): + await anyio.Path(file_path).unlink(missing_ok=False) + else: + raise YDocNotFound(f"The document {path} doesn't exists.") + + async def read(self, path: str) -> AsyncIterator[tuple[bytes, bytes, float]]: # type: ignore + """Async iterator for reading the store content. + + Returns: + A tuple of (update, metadata, timestamp) for each update. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + file_path = self._get_document_path(path) + if not await anyio.Path(file_path).exists(): + raise YDocNotFound + + offset = await self._get_data_offset(file_path) + async with await anyio.open_file(file_path, "rb") as f: + await f.seek(offset) + data = await f.read() + if not data: + raise YDocNotFound + + async for res in self._decode_data(data): + yield res + + async def write(self, path: str, data: bytes) -> None: + """Store an update. + + Arguments: + data: The update to store. + """ + async with self._lock: + file_path = self._get_document_path(path) + if not await anyio.Path(file_path).exists(): + raise YDocNotFound + + async with await anyio.open_file(file_path, "ab") as f: + data_len = write_var_uint(len(data)) + await f.write(data_len + data) + metadata = await self.get_metadata() + metadata_len = write_var_uint(len(metadata)) + await f.write(metadata_len + metadata) + timestamp = struct.pack(" int: + try: + async with await anyio.open_file(path, "rb") as f: + header = await f.read(8) + if header == b"VERSION:": + await f.readline() + return await f.tell() + else: + raise Exception + + except Exception: + raise YDocNotFound(f"File {str(path)} not found.") + + async def _decode_data(self, data) -> AsyncIterator[tuple[bytes, bytes, float]]: + i = 0 + for d in Decoder(data).read_messages(): + if i == 0: + update = d + elif i == 1: + metadata = d + else: + timestamp = struct.unpack(" Path: + return Path(self._store_path, path + ".y") + + +@deprecated(reason="Use FileYStore instead") +class TempFileYStore(FileYStore): + """ + A YStore which uses the system's temporary directory. + Files are writen under a common directory. + To prefix the directory name (e.g. /tmp/my_prefix_b4whmm7y/): + + ```py + class PrefixTempFileYStore(TempFileYStore): + prefix_dir = "my_prefix_" + ``` + + ## Note: + This class is deprecated. Use FileYStore and pass the tmp folder + as path argument. For example: + + ```py + tmp_dir = tempfile.mkdtemp(prefix="prefix/directory/") + store = FileYStore(tmp_dir) + ``` + """ + + prefix_dir: str | None = None + base_dir: str | None = None + + def __init__( + self, + path: str, + metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None, + log: Logger | None = None, + ): + """Initialize the object. + + Arguments: + path: The file path used to store the updates. + metadata_callback: An optional callback to call to get the metadata. + log: An optional logger. + """ + full_path = str(Path(self.get_base_dir()) / path) + super().__init__(full_path, metadata_callback=metadata_callback, log=log) + + def get_base_dir(self) -> str: + """Get the base directory where the update file is written. + + Returns: + The base directory path. + """ + if self.base_dir is None: + self.make_directory() + assert self.base_dir is not None + return self.base_dir + + def make_directory(self): + """Create the base directory where the update file is written.""" + type(self).base_dir = tempfile.mkdtemp(prefix=self.prefix_dir) diff --git a/jupyter_collaboration/stores/sqlite_store.py b/jupyter_collaboration/stores/sqlite_store.py new file mode 100644 index 00000000..793d31d8 --- /dev/null +++ b/jupyter_collaboration/stores/sqlite_store.py @@ -0,0 +1,291 @@ +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. + +from __future__ import annotations + +import time +from logging import Logger, getLogger +from typing import Any, AsyncIterator, Awaitable, Callable, Iterable + +import aiosqlite +import anyio +import y_py as Y +from anyio import Event, Lock +from ypy_websocket.yutils import get_new_path + +from .base_store import BaseYStore +from .utils import YDocExists, YDocNotFound + + +class SQLiteYStore(BaseYStore): + """A YStore which uses an SQLite database. + Unlike file-based YStores, the Y updates of all documents are stored in the same database. + + Subclass to point to your database file: + + ```py + class MySQLiteYStore(SQLiteYStore): + _store_path = "path/to/my_ystore.db" + ``` + """ + + _lock: Lock + # Determines the "time to live" for all documents, i.e. how recent the + # latest update of a document must be before purging document history. + # Defaults to never purging document history (None). + document_ttl: int | None = None + + def __init__( + self, + path: str = "./ystore.db", + metadata_callback: Callable[[], Awaitable[bytes] | bytes] | None = None, + log: Logger | None = None, + ) -> None: + """Initialize the object. + + Arguments: + path: The database path used to store the updates. + metadata_callback: An optional callback to call to get the metadata. + log: An optional logger. + """ + self._lock = Lock() + self._store_path = path + self.metadata_callback = metadata_callback + self.log = log or getLogger(__name__) + + async def initialize(self) -> None: + """ + Initializes the store. + """ + if self.initialized or self._initialized is not None: + return + self._initialized = Event() + + async with self._lock: + if await anyio.Path(self._store_path).exists(): + version = -1 + async with aiosqlite.connect(self._store_path) as db: + cursor = await db.execute("pragma user_version") + row = await cursor.fetchone() + if row is not None: + version = row[0] + + # The DB has an old version. Move the database. + if self.version != version: + new_path = await get_new_path(self._store_path) + self.log.warning( + f"YStore version mismatch, moving {self._store_path} to {new_path}" + ) + await anyio.Path(self._store_path).rename(new_path) + + # Make sure every table exists. + async with aiosqlite.connect(self._store_path) as db: + await db.execute( + "CREATE TABLE IF NOT EXISTS documents (path TEXT PRIMARY KEY, version INTEGER NOT NULL)" + ) + await db.execute( + "CREATE TABLE IF NOT EXISTS yupdates (path TEXT NOT NULL, yupdate BLOB, metadata BLOB, timestamp REAL NOT NULL)" + ) + await db.execute( + "CREATE INDEX IF NOT EXISTS idx_yupdates_path_timestamp ON yupdates (path, timestamp)" + ) + await db.execute(f"PRAGMA user_version = {self.version}") + await db.commit() + + self._initialized.set() + + async def exists(self, path: str) -> bool: + """ + Returns True if the document exists, else returns False. + + Arguments: + path: The document name/path. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + async with aiosqlite.connect(self._store_path) as db: + cursor = await db.execute( + "SELECT path, version FROM documents WHERE path = ?", + (path,), + ) + return (await cursor.fetchone()) is not None + + async def list(self) -> AsyncIterator[str]: # type: ignore[override] + """ + Returns a list with the name/path of the documents stored. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + async with aiosqlite.connect(self._store_path) as db: + async with db.execute("SELECT path FROM documents") as cursor: + async for path in cursor: + yield path[0] + + async def get(self, path: str, updates: bool = False) -> dict | None: + """ + Returns the document's metadata and updates or None if the document does't exist. + + Arguments: + path: The document name/path. + updates: Whether to return document's content or only the metadata. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + async with aiosqlite.connect(self._store_path) as db: + cursor = await db.execute( + "SELECT path, version FROM documents WHERE path = ?", + (path,), + ) + doc = await cursor.fetchone() + + if doc is None: + return None + + list_updates: Iterable[Any] = [] + if updates: + cursor = await db.execute( + "SELECT yupdate, metadata, timestamp FROM yupdates WHERE path = ?", + (path,), + ) + list_updates = await cursor.fetchall() + + return dict(path=doc[0], version=doc[1], updates=list_updates) + + async def create(self, path: str, version: int) -> None: + """ + Creates a new document. + + Arguments: + path: The document name/path. + version: Document version. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + try: + async with aiosqlite.connect(self._store_path) as db: + await db.execute( + "INSERT INTO documents VALUES (?, ?)", + (path, version), + ) + await db.commit() + except aiosqlite.IntegrityError: + raise YDocExists(f"The document {path} already exists.") + + async def remove(self, path: str) -> None: + """ + Removes a document. + + Arguments: + path: The document name/path. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + async with aiosqlite.connect(self._store_path) as db: + cursor = await db.execute( + "SELECT path, version FROM documents WHERE path = ?", + (path,), + ) + if (await cursor.fetchone()) is None: + raise YDocNotFound(f"The document {path} doesn't exists.") + + await db.execute( + "DELETE FROM documents WHERE path = ?", + (path,), + ) + await db.execute( + "DELETE FROM yupdates WHERE path = ?", + (path,), + ) + await db.commit() + + async def read(self, path: str) -> AsyncIterator[tuple[bytes, bytes, float]]: # type: ignore + """Async iterator for reading the store content. + + Arguments: + path: The document name/path. + + Returns: + A tuple of (update, metadata, timestamp) for each update. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + try: + async with self._lock: + async with aiosqlite.connect(self._store_path) as db: + async with db.execute( + "SELECT yupdate, metadata, timestamp FROM yupdates WHERE path = ?", + (path,), + ) as cursor: + found = False + async for update, metadata, timestamp in cursor: + found = True + yield update, metadata, timestamp + if not found: + raise YDocNotFound + except Exception: + raise YDocNotFound + + async def write(self, path: str, data: bytes) -> None: + """ + Store an update. + + Arguments: + path: The document name/path. + data: The update to store. + """ + if self._initialized is None: + raise Exception("The store was not initialized.") + await self._initialized.wait() + + async with self._lock: + async with aiosqlite.connect(self._store_path) as db: + # first, determine time elapsed since last update + cursor = await db.execute( + "SELECT timestamp FROM yupdates WHERE path = ? ORDER BY timestamp DESC LIMIT 1", + (path,), + ) + row = await cursor.fetchone() + diff = (time.time() - row[0]) if row else 0 + + if self.document_ttl is not None and diff > self.document_ttl: + # squash updates + ydoc = Y.YDoc() + async with db.execute( + "SELECT yupdate FROM yupdates WHERE path = ?", (path,) + ) as cursor: + async for update, in cursor: + Y.apply_update(ydoc, update) + # delete history + await db.execute("DELETE FROM yupdates WHERE path = ?", (path,)) + # insert squashed updates + squashed_update = Y.encode_state_as_update(ydoc) + metadata = await self.get_metadata() + await db.execute( + "INSERT INTO yupdates VALUES (?, ?, ?, ?)", + (path, squashed_update, metadata, time.time()), + ) + + # finally, write this update to the DB + metadata = await self.get_metadata() + await db.execute( + "INSERT INTO yupdates VALUES (?, ?, ?, ?)", + (path, data, metadata, time.time()), + ) + await db.commit() diff --git a/jupyter_collaboration/stores.py b/jupyter_collaboration/stores/stores.py similarity index 63% rename from jupyter_collaboration/stores.py rename to jupyter_collaboration/stores/stores.py index 323e06ed..66ad5927 100644 --- a/jupyter_collaboration/stores.py +++ b/jupyter_collaboration/stores/stores.py @@ -1,14 +1,20 @@ # Copyright (c) Jupyter Development Team. # Distributed under the terms of the Modified BSD License. +from __future__ import annotations + +from logging import Logger + from traitlets import Int, Unicode from traitlets.config import LoggingConfigurable -from ypy_websocket.ystore import SQLiteYStore as _SQLiteYStore -from ypy_websocket.ystore import TempFileYStore as _TempFileYStore +from .file_store import FileYStore +from .sqlite_store import SQLiteYStore as _SQLiteYStore -class TempFileYStore(_TempFileYStore): - prefix_dir = "jupyter_ystore_" + +class TempFileYStore(FileYStore): + def __init__(self, log: Logger | None = None): + super().__init__(path=".jupyter_store", log=log) class SQLiteYStoreMetaclass(type(LoggingConfigurable), type(_SQLiteYStore)): # type: ignore @@ -17,7 +23,7 @@ class SQLiteYStoreMetaclass(type(LoggingConfigurable), type(_SQLiteYStore)): # class SQLiteYStore(LoggingConfigurable, _SQLiteYStore, metaclass=SQLiteYStoreMetaclass): db_path = Unicode( - ".jupyter_ystore.db", + ".jupyter_store.db", config=True, help="""The path to the YStore database. Defaults to '.jupyter_ystore.db' in the current directory.""", @@ -30,3 +36,6 @@ class SQLiteYStore(LoggingConfigurable, _SQLiteYStore, metaclass=SQLiteYStoreMet help="""The document time-to-live in seconds. Defaults to None (document history is never cleared).""", ) + + def __init__(self, log: Logger | None = None): + super().__init__(path=self.db_path, log=log) diff --git a/jupyter_collaboration/stores/utils.py b/jupyter_collaboration/stores/utils.py new file mode 100644 index 00000000..79f6b7f3 --- /dev/null +++ b/jupyter_collaboration/stores/utils.py @@ -0,0 +1,9 @@ +# Copyright (c) Jupyter Development Team. +# Distributed under the terms of the Modified BSD License. + +class YDocNotFound(Exception): + pass + + +class YDocExists(Exception): + pass diff --git a/jupyter_collaboration/websocketserver.py b/jupyter_collaboration/websocketserver.py index f35c1148..ec989db0 100644 --- a/jupyter_collaboration/websocketserver.py +++ b/jupyter_collaboration/websocketserver.py @@ -9,7 +9,6 @@ from tornado.websocket import WebSocketHandler from ypy_websocket.websocket_server import WebsocketServer, YRoom -from ypy_websocket.ystore import BaseYStore class RoomNotFound(LookupError): @@ -27,13 +26,11 @@ class JupyterWebsocketServer(WebsocketServer): def __init__( self, - ystore_class: BaseYStore, rooms_ready: bool = True, auto_clean_rooms: bool = True, log: Logger | None = None, ): super().__init__(rooms_ready, auto_clean_rooms, log) - self.ystore_class = ystore_class self.ypatch_nb = 0 self.connected_users: dict[Any, Any] = {} # Async loop is not yet ready at the object instantiation diff --git a/tests/conftest.py b/tests/conftest.py index 234dbb3c..dd323d42 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -170,8 +170,10 @@ def rtc_create_SQLite_store(jp_serverapp): setattr(SQLiteYStore, k, v) async def _inner(type: str, path: str, content: str) -> DocumentRoom: - db = SQLiteYStore(path=f"{type}:{path}") + room_id = f"{type}:{path}" + db = SQLiteYStore() await db.start() + await db.initialize() if type == "notebook": doc = YNotebook() @@ -179,7 +181,9 @@ async def _inner(type: str, path: str, content: str) -> DocumentRoom: doc = YUnicode() doc.source = content - await db.encode_state_as_update(doc.ydoc) + + await db.create(room_id, 0) + await db.encode_state_as_update(room_id, doc.ydoc) return db