Skip to content

Commit

Permalink
rename the docprovider destination name to avoid conficts (jupyterlab…
Browse files Browse the repository at this point in the history
  • Loading branch information
brichet authored and Jialin Zhang committed Apr 24, 2024
1 parent adcde32 commit 746536c
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 78 deletions.
185 changes: 123 additions & 62 deletions projects/jupyter-server-ydoc/jupyter_server_ydoc/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import json
import time
import uuid
from typing import Any
from typing import Any, Awaitable

from jupyter_server.auth import authorized
from jupyter_server.base.handlers import APIHandler, JupyterHandler
Expand Down Expand Up @@ -70,52 +70,84 @@ def create_task(self, aw):
task.add_done_callback(self._background_tasks.discard)

async def prepare(self):
if not self._websocket_server.started.is_set():
self.create_task(self._websocket_server.start())
await self._websocket_server.started.wait()

# Get room
self._room_id: str = room_id_from_encoded_path(self.request.path)

async with self._room_lock(self._room_id):
if self._websocket_server.room_exists(self._room_id):
self.room: YRoom = await self._websocket_server.get_room(self._room_id)
else:
if self._room_id.count(":") >= 2:
# DocumentRoom
file_format, file_type, file_id = decode_file_path(self._room_id)
if file_id in self._file_loaders:
self._emit(
LogLevel.WARNING,
None,
"There is another collaborative session accessing the same file.\nThe synchronization between rooms is not supported and you might lose some of your changes.",
)

file = self._file_loaders[file_id]
updates_file_path = f".{file_type}:{file_id}.y"
ystore = self._ystore_class(path=updates_file_path, log=self.log)
self.room = DocumentRoom(
self._room_id,
file_format,
file_type,
file,
self.event_logger,
ystore,
self.log,
self._document_save_delay,
res = super().prepare()
if isinstance(res, Awaitable):
await res
if not self._websocket_server.started.is_set():
self.create_task(self._websocket_server.start())
await self._websocket_server.started.wait()

# Get room
self._room_id: str = self.request.path.split("/")[-1]

async with self._room_lock(self._room_id):
if self._websocket_server.room_exists(self._room_id):
self.room: YRoom = await self._websocket_server.get_room(
self._room_id
)

self.log.info("Get an room from websocket server")
else:
# TransientRoom
# it is a transient document (e.g. awareness)
self.room = TransientRoom(self._room_id, self.log)

await self._websocket_server.start_room(self.room)
self._websocket_server.add_room(self._room_id, self.room)
if self._room_id.count(":") >= 2:
# DocumentRoom
file_format, file_type, file_id = decode_file_path(
self._room_id
)
if file_id in self._file_loaders:
self.log.info(
"There is another collaborative session accessing the same file"
)
self._emit(
LogLevel.WARNING,
None,
"There is another collaborative session accessing the same file.\nThe synchronization between rooms is not supported and you might lose some of your changes.",
)

file = self._file_loaders[file_id]
updates_file_path = f".{file_type}:{file_id}.y"
ystore = self._ystore_class(
path=updates_file_path, log=self.log
)
self.room = DocumentRoom(
self._room_id,
file_format,
file_type,
file,
self.event_logger,
ystore,
self.log,
self._document_save_delay,
)

res = super().prepare()
if res is not None:
return await res
else:
# TransientRoom
# it is a transient document (e.g. awareness)
self.room = TransientRoom(self._room_id, self.log)

self.log.info("About to start a room")
try:
await self._websocket_server.start_room(self.room)
except Exception as e:
self.log.error(
"Room %s failed to start on websocket server", self._room_id
)
# Clean room
self.room.stop()
self.log.info("Room %s deleted", self._room_id)
self._emit(LogLevel.INFO, "clean", "Room deleted.")

# if websocket server failed, then room and file will be garbage collected and file
# Clean the file loader in file loader mapping if there are not rooms using it
_, _, file_id = decode_file_path(self._room_id)
file = self._file_loaders[file_id]
if file.number_of_subscriptions == 0 or (
file.number_of_subscriptions == 1
and self._room_id in file._subscriptions
):
self.log.info("Deleting file %s", file.path)
await self._file_loaders.remove(file_id)
self._emit(LogLevel.INFO, "clean", "file loader removed.")
raise e
self._websocket_server.add_room(self._room_id, self.room)

def initialize(
self,
Expand Down Expand Up @@ -203,9 +235,12 @@ async def open(self, room_id):
self.log.error(f"File {file.path} not found.\n{e!r}", exc_info=e)
self.close(1004, f"File {file.path} not found.")
else:
self.log.error(f"Error initializing: {file.path}\n{e!r}", exc_info=e)
self.log.error(
f"Error initializing: {file.path}\n{e!r}", exc_info=e
)
self.close(
1003, f"Error initializing: {file.path}. You need to close the document."
1003,
f"Error initializing: {file.path}. You need to close the document.",
)

# Clean up the room and delete the file loader
Expand Down Expand Up @@ -272,16 +307,24 @@ async def on_message(self, message):

user = self.current_user
data = json.dumps(
{"sender": user.username, "timestamp": time.time(), "content": json.loads(msg)}
{
"sender": user.username,
"timestamp": time.time(),
"content": json.loads(msg),
}
).encode("utf8")

for client in self.room.clients:
if client != self:
task = asyncio.create_task(
client.send(bytes([MessageType.CHAT]) + write_var_uint(len(data)) + data)
client.send(
bytes([MessageType.CHAT]) + write_var_uint(len(data)) + data
)
)
self._websocket_server.background_tasks.add(task)
task.add_done_callback(self._websocket_server.background_tasks.discard)
task.add_done_callback(
self._websocket_server.background_tasks.discard
)

self._message_queue.put_nowait(message)
self._websocket_server.ypatch_nb += 1
Expand All @@ -292,15 +335,19 @@ def on_close(self) -> None:
"""
# stop serving this client
self._message_queue.put_nowait(b"")
if isinstance(self.room, DocumentRoom) and self.room.clients == [self]:
# no client in this room after we disconnect
# keep the document for a while in case someone reconnects
self.log.info("Cleaning room: %s", self._room_id)
self.room.cleaner = asyncio.create_task(self._clean_room())
if self._room_id != "JupyterLab:globalAwareness":
self._emit_awareness_event(self.current_user.username, "leave")

def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = None) -> None:
if hasattr(self, "room"):
if isinstance(self.room, DocumentRoom) and self.room.clients == [self]:
# no client in this room after we disconnect
# keep the document for a while in case someone reconnects
self.log.info("Cleaning room: %s", self._room_id)
self.room.cleaner = asyncio.create_task(self._clean_room())
if hasattr(self, "_room_id"):
if self._room_id != "JupyterLab:globalAwareness":
self._emit_awareness_event(self.current_user.username, "leave")

def _emit(
self, level: LogLevel, action: str | None = None, msg: str | None = None
) -> None:
_, _, file_id = decode_file_path(self._room_id)
path = self._file_id_manager.get_path(file_id)

Expand All @@ -312,12 +359,16 @@ def _emit(self, level: LogLevel, action: str | None = None, msg: str | None = No

self.event_logger.emit(schema_id=JUPYTER_COLLABORATION_EVENTS_URI, data=data)

def _emit_awareness_event(self, username: str, action: str, msg: str | None = None) -> None:
def _emit_awareness_event(
self, username: str, action: str, msg: str | None = None
) -> None:
data = {"roomid": self._room_id, "username": username, "action": action}
if msg:
data["msg"] = msg

self.event_logger.emit(schema_id=JUPYTER_COLLABORATION_AWARENESS_EVENTS_URI, data=data)
self.event_logger.emit(
schema_id=JUPYTER_COLLABORATION_AWARENESS_EVENTS_URI, data=data
)

async def _clean_room(self) -> None:
"""
Expand Down Expand Up @@ -387,7 +438,12 @@ async def put(self, path):
# index already exists
self.log.info("Request for Y document '%s' with room ID: %s", path, idx)
data = json.dumps(
{"format": format, "type": content_type, "fileId": idx, "sessionId": SERVER_SESSION}
{
"format": format,
"type": content_type,
"fileId": idx,
"sessionId": SERVER_SESSION,
}
)
self.set_status(200)
return self.finish(data)
Expand All @@ -401,7 +457,12 @@ async def put(self, path):
# index successfully created
self.log.info("Request for Y document '%s' with room ID: %s", path, idx)
data = json.dumps(
{"format": format, "type": content_type, "fileId": idx, "sessionId": SERVER_SESSION}
{
"format": format,
"type": content_type,
"fileId": idx,
"sessionId": SERVER_SESSION,
}
)
self.set_status(201)
return self.finish(data)
34 changes: 26 additions & 8 deletions projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ def __init__(
self._log = log or getLogger(__name__)
self._subscriptions: dict[str, Callable[[], Coroutine[Any, Any, None]]] = {}

self._watcher = asyncio.create_task(self._watch_file()) if self._poll_interval else None
self._watcher = (
asyncio.create_task(self._watch_file()) if self._poll_interval else None
)
self.last_modified = None

@property
Expand Down Expand Up @@ -74,9 +76,14 @@ async def clean(self) -> None:
if self._watcher is not None:
if not self._watcher.cancelled():
self._watcher.cancel()
await self._watcher
try:
await self._watcher
except asyncio.CancelledError:
self._log.info(f"file watcher for '{self.file_id}' is cancelled now")

def observe(self, id: str, callback: Callable[[], Coroutine[Any, Any, None]]) -> None:
def observe(
self, id: str, callback: Callable[[], Coroutine[Any, Any, None]]
) -> None:
"""
Subscribe to the file to get notified about out-of-band file changes.
Expand Down Expand Up @@ -109,7 +116,9 @@ async def load_content(self, format: str, file_type: str) -> dict[str, Any]:
"""
async with self._lock:
model = await ensure_async(
self._contents_manager.get(self.path, format=format, type=file_type, content=True)
self._contents_manager.get(
self.path, format=format, type=file_type, content=True
)
)
self.last_modified = model["last_modified"]
return model
Expand Down Expand Up @@ -156,7 +165,9 @@ async def maybe_save_content(self, model: dict[str, Any]) -> None:
self.last_modified = m["last_modified"]
raise OutOfBandChanges

async def _save_content(self, model: dict[str, Any], done_saving: asyncio.Event) -> None:
async def _save_content(
self, model: dict[str, Any], done_saving: asyncio.Event
) -> None:
try:
m = await ensure_async(self._contents_manager.save(model, self.path))
self.last_modified = m["last_modified"]
Expand All @@ -178,7 +189,9 @@ async def _watch_file(self) -> None:
try:
await self.maybe_notify()
except Exception as e:
self._log.error(f"Error watching file: {self.path}\n{e!r}", exc_info=e)
self._log.error(
f"Error watching file: {self.path}\n{e!r}", exc_info=e
)
except asyncio.CancelledError:
break

Expand All @@ -189,9 +202,14 @@ async def maybe_notify(self) -> None:
do_notify = False
async with self._lock:
# Get model metadata; format and type are not need
model = await ensure_async(self._contents_manager.get(self.path, content=False))
model = await ensure_async(
self._contents_manager.get(self.path, content=False)
)

if self.last_modified is not None and self.last_modified < model["last_modified"]:
if (
self.last_modified is not None
and self.last_modified < model["last_modified"]
):
do_notify = True

self.last_modified = model["last_modified"]
Expand Down
Loading

0 comments on commit 746536c

Please sign in to comment.