From 882d32ea10edcba168ba6fc305225f2a03d543d6 Mon Sep 17 00:00:00 2001 From: krassowski <5832902+krassowski@users.noreply.github.com> Date: Sun, 18 Aug 2024 17:00:41 +0100 Subject: [PATCH] Save current hash on the document --- packages/docprovider/src/ydrive.ts | 5 +++ .../jupyter_server_ydoc/loaders.py | 45 +++++++++++++++---- .../jupyter_server_ydoc/rooms.py | 28 +++++++++--- 3 files changed, 62 insertions(+), 16 deletions(-) diff --git a/packages/docprovider/src/ydrive.ts b/packages/docprovider/src/ydrive.ts index 0f4ce8bf..99a9a2eb 100644 --- a/packages/docprovider/src/ydrive.ts +++ b/packages/docprovider/src/ydrive.ts @@ -161,6 +161,11 @@ export class YDrive extends Drive implements ICollaborativeDrive { const key = `${options.format}:${options.contentType}:${options.path}`; this._providers.set(key, provider); + sharedModel.changed.connect((_, change) => { + // TODO: make use of the hash + console.log(change.stateChange); + }); + sharedModel.disposed.connect(() => { const provider = this._providers.get(key); if (provider) { diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py index a2622cca..96cdbca3 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py @@ -40,7 +40,9 @@ def __init__( self._log = log or getLogger(__name__) self._subscriptions: dict[str, Callable[[], Coroutine[Any, Any, None]]] = {} - self._watcher = asyncio.create_task(self._watch_file()) if self._poll_interval else None + self._watcher = ( + asyncio.create_task(self._watch_file()) if self._poll_interval else None + ) self.last_modified = None @property @@ -79,7 +81,9 @@ async def clean(self) -> None: except asyncio.CancelledError: self._log.info(f"file watcher for '{self.file_id}' is cancelled now") - def observe(self, id: str, callback: Callable[[], Coroutine[Any, Any, None]]) -> None: + def observe( + self, id: str, callback: Callable[[], Coroutine[Any, Any, None]] + ) -> None: """ Subscribe to the file to get notified about out-of-band file changes. @@ -112,12 +116,14 @@ async def load_content(self, format: str, file_type: str) -> dict[str, Any]: """ async with self._lock: model = await ensure_async( - self._contents_manager.get(self.path, format=format, type=file_type, content=True) + self._contents_manager.get( + self.path, format=format, type=file_type, content=True + ) ) self.last_modified = model["last_modified"] return model - async def maybe_save_content(self, model: dict[str, Any]) -> None: + async def maybe_save_content(self, model: dict[str, Any]) -> dict[str, Any] | None: """ Save the content of the file. @@ -149,20 +155,34 @@ async def maybe_save_content(self, model: dict[str, Any]) -> None: # otherwise it could corrupt the file done_saving = asyncio.Event() task = asyncio.create_task(self._save_content(model, done_saving)) + saved_model = None try: - await asyncio.shield(task) + saved_model = await asyncio.shield(task) except asyncio.CancelledError: pass await done_saving.wait() + return saved_model else: # file changed on disk, raise an error self.last_modified = m["last_modified"] raise OutOfBandChanges - async def _save_content(self, model: dict[str, Any], done_saving: asyncio.Event) -> None: + async def _save_content( + self, model: dict[str, Any], done_saving: asyncio.Event + ) -> dict[str, Any]: try: m = await ensure_async(self._contents_manager.save(model, self.path)) self.last_modified = m["last_modified"] + # TODO, get rid of the extra `get` here once upstream issue: + # https://github.com/jupyter-server/jupyter_server/issues/1453 is resolved + model_with_hash = await ensure_async( + self._contents_manager.get( + self.path, + content=False, + require_hash=True, # TODO require version supporting hash + ) + ) + return {**m, "hash": model_with_hash["hash"]} finally: done_saving.set() @@ -181,7 +201,9 @@ async def _watch_file(self) -> None: try: await self.maybe_notify() except Exception as e: - self._log.error(f"Error watching file: {self.path}\n{e!r}", exc_info=e) + self._log.error( + f"Error watching file: {self.path}\n{e!r}", exc_info=e + ) except asyncio.CancelledError: break @@ -192,9 +214,14 @@ async def maybe_notify(self) -> None: do_notify = False async with self._lock: # Get model metadata; format and type are not need - model = await ensure_async(self._contents_manager.get(self.path, content=False)) + model = await ensure_async( + self._contents_manager.get(self.path, content=False) + ) - if self.last_modified is not None and self.last_modified < model["last_modified"]: + if ( + self.last_modified is not None + and self.last_modified < model["last_modified"] + ): do_notify = True self.last_modified = model["last_modified"] diff --git a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py index 873fe113..fe06d3fa 100644 --- a/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py +++ b/projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py @@ -35,7 +35,9 @@ def __init__( save_delay: float | None = None, exception_handler: Callable[[Exception, Logger], bool] | None = None, ): - super().__init__(ready=False, ystore=ystore, exception_handler=exception_handler, log=log) + super().__init__( + ready=False, ystore=ystore, exception_handler=exception_handler, log=log + ) self._room_id: str = room_id self._file_format: str = file_format @@ -168,7 +170,9 @@ async def initialize(self) -> None: self.ready = True self._emit(LogLevel.INFO, "initialize", "Room initialized") - def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None: + def _emit( + self, level: LogLevel, action: str | None = None, msg: str | None = None + ) -> None: data = {"level": level.value, "room": self._room_id, "path": self._file.path} if action: data["action"] = action @@ -210,8 +214,12 @@ async def _on_outofband_change(self) -> None: """ Called when the file got out-of-band changes. """ - self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id) - self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes. Overwriting the room.") + self.log.info( + "Out-of-band changes. Overwriting the content in room %s", self._room_id + ) + self._emit( + LogLevel.INFO, "overwrite", "Out-of-band changes. Overwriting the room." + ) try: model = await self._file.load_content(self._file_format, self._file_type) @@ -271,7 +279,7 @@ async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> No await asyncio.sleep(self._save_delay) self.log.info("Saving the content from room %s", self._room_id) - await self._file.maybe_save_content( + saved_model = await self._file.maybe_save_content( { "format": self._file_format, "type": self._file_type, @@ -280,6 +288,8 @@ async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> No ) async with self._update_lock: self._document.dirty = False + if saved_model: + self._document.hash = saved_model["hash"] self._emit(LogLevel.INFO, "save", "Content saved.") @@ -287,9 +297,13 @@ async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> No return except OutOfBandChanges: - self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id) + self.log.info( + "Out-of-band changes. Overwriting the content in room %s", self._room_id + ) try: - model = await self._file.load_content(self._file_format, self._file_type) + model = await self._file.load_content( + self._file_format, self._file_type + ) except Exception as e: msg = f"Error loading content from file: {self._file.path}\n{e!r}" self.log.error(msg, exc_info=e)