Skip to content

Commit

Permalink
Save current hash on the document
Browse files Browse the repository at this point in the history
  • Loading branch information
krassowski committed Aug 18, 2024
1 parent 7ea2cd5 commit 882d32e
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 16 deletions.
5 changes: 5 additions & 0 deletions packages/docprovider/src/ydrive.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ export class YDrive extends Drive implements ICollaborativeDrive {
const key = `${options.format}:${options.contentType}:${options.path}`;
this._providers.set(key, provider);

sharedModel.changed.connect((_, change) => {
// TODO: make use of the hash
console.log(change.stateChange);
});

sharedModel.disposed.connect(() => {
const provider = this._providers.get(key);
if (provider) {
Expand Down
45 changes: 36 additions & 9 deletions projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ def __init__(
self._log = log or getLogger(__name__)
self._subscriptions: dict[str, Callable[[], Coroutine[Any, Any, None]]] = {}

self._watcher = asyncio.create_task(self._watch_file()) if self._poll_interval else None
self._watcher = (
asyncio.create_task(self._watch_file()) if self._poll_interval else None
)
self.last_modified = None

@property
Expand Down Expand Up @@ -79,7 +81,9 @@ async def clean(self) -> None:
except asyncio.CancelledError:
self._log.info(f"file watcher for '{self.file_id}' is cancelled now")

def observe(self, id: str, callback: Callable[[], Coroutine[Any, Any, None]]) -> None:
def observe(
self, id: str, callback: Callable[[], Coroutine[Any, Any, None]]
) -> None:
"""
Subscribe to the file to get notified about out-of-band file changes.
Expand Down Expand Up @@ -112,12 +116,14 @@ async def load_content(self, format: str, file_type: str) -> dict[str, Any]:
"""
async with self._lock:
model = await ensure_async(
self._contents_manager.get(self.path, format=format, type=file_type, content=True)
self._contents_manager.get(
self.path, format=format, type=file_type, content=True
)
)
self.last_modified = model["last_modified"]
return model

async def maybe_save_content(self, model: dict[str, Any]) -> None:
async def maybe_save_content(self, model: dict[str, Any]) -> dict[str, Any] | None:
"""
Save the content of the file.
Expand Down Expand Up @@ -149,20 +155,34 @@ async def maybe_save_content(self, model: dict[str, Any]) -> None:
# otherwise it could corrupt the file
done_saving = asyncio.Event()
task = asyncio.create_task(self._save_content(model, done_saving))
saved_model = None
try:
await asyncio.shield(task)
saved_model = await asyncio.shield(task)
except asyncio.CancelledError:
pass
await done_saving.wait()
return saved_model
else:
# file changed on disk, raise an error
self.last_modified = m["last_modified"]
raise OutOfBandChanges

async def _save_content(self, model: dict[str, Any], done_saving: asyncio.Event) -> None:
async def _save_content(
self, model: dict[str, Any], done_saving: asyncio.Event
) -> dict[str, Any]:
try:
m = await ensure_async(self._contents_manager.save(model, self.path))
self.last_modified = m["last_modified"]
# TODO, get rid of the extra `get` here once upstream issue:
# https://github.com/jupyter-server/jupyter_server/issues/1453 is resolved
model_with_hash = await ensure_async(
self._contents_manager.get(
self.path,
content=False,
require_hash=True, # TODO require version supporting hash
)
)
return {**m, "hash": model_with_hash["hash"]}
finally:
done_saving.set()

Expand All @@ -181,7 +201,9 @@ async def _watch_file(self) -> None:
try:
await self.maybe_notify()
except Exception as e:
self._log.error(f"Error watching file: {self.path}\n{e!r}", exc_info=e)
self._log.error(
f"Error watching file: {self.path}\n{e!r}", exc_info=e
)
except asyncio.CancelledError:
break

Expand All @@ -192,9 +214,14 @@ async def maybe_notify(self) -> None:
do_notify = False
async with self._lock:
# Get model metadata; format and type are not need
model = await ensure_async(self._contents_manager.get(self.path, content=False))
model = await ensure_async(
self._contents_manager.get(self.path, content=False)
)

if self.last_modified is not None and self.last_modified < model["last_modified"]:
if (
self.last_modified is not None
and self.last_modified < model["last_modified"]
):
do_notify = True

self.last_modified = model["last_modified"]
Expand Down
28 changes: 21 additions & 7 deletions projects/jupyter-server-ydoc/jupyter_server_ydoc/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ def __init__(
save_delay: float | None = None,
exception_handler: Callable[[Exception, Logger], bool] | None = None,
):
super().__init__(ready=False, ystore=ystore, exception_handler=exception_handler, log=log)
super().__init__(
ready=False, ystore=ystore, exception_handler=exception_handler, log=log
)

self._room_id: str = room_id
self._file_format: str = file_format
Expand Down Expand Up @@ -168,7 +170,9 @@ async def initialize(self) -> None:
self.ready = True
self._emit(LogLevel.INFO, "initialize", "Room initialized")

def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None:
def _emit(
self, level: LogLevel, action: str | None = None, msg: str | None = None
) -> None:
data = {"level": level.value, "room": self._room_id, "path": self._file.path}
if action:
data["action"] = action
Expand Down Expand Up @@ -210,8 +214,12 @@ async def _on_outofband_change(self) -> None:
"""
Called when the file got out-of-band changes.
"""
self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id)
self._emit(LogLevel.INFO, "overwrite", "Out-of-band changes. Overwriting the room.")
self.log.info(
"Out-of-band changes. Overwriting the content in room %s", self._room_id
)
self._emit(
LogLevel.INFO, "overwrite", "Out-of-band changes. Overwriting the room."
)

try:
model = await self._file.load_content(self._file_format, self._file_type)
Expand Down Expand Up @@ -271,7 +279,7 @@ async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> No
await asyncio.sleep(self._save_delay)

self.log.info("Saving the content from room %s", self._room_id)
await self._file.maybe_save_content(
saved_model = await self._file.maybe_save_content(
{
"format": self._file_format,
"type": self._file_type,
Expand All @@ -280,16 +288,22 @@ async def _maybe_save_document(self, saving_document: asyncio.Task | None) -> No
)
async with self._update_lock:
self._document.dirty = False
if saved_model:
self._document.hash = saved_model["hash"]

self._emit(LogLevel.INFO, "save", "Content saved.")

except asyncio.CancelledError:
return

except OutOfBandChanges:
self.log.info("Out-of-band changes. Overwriting the content in room %s", self._room_id)
self.log.info(
"Out-of-band changes. Overwriting the content in room %s", self._room_id
)
try:
model = await self._file.load_content(self._file_format, self._file_type)
model = await self._file.load_content(
self._file_format, self._file_type
)
except Exception as e:
msg = f"Error loading content from file: {self._file.path}\n{e!r}"
self.log.error(msg, exc_info=e)
Expand Down

0 comments on commit 882d32e

Please sign in to comment.