Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start ystore in a task #302

Merged
merged 5 commits into from
May 7, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion jupyter_collaboration/rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
class DocumentRoom(YRoom):
"""A Y room for a possibly stored document (e.g. a notebook)."""

_background_tasks: set[asyncio.Task]

def __init__(
self,
room_id: str,
Expand Down Expand Up @@ -48,6 +50,7 @@ def __init__(
self._cleaner: asyncio.Task | None = None
self._saving_document: asyncio.Task | None = None
self._messages: dict[str, asyncio.Lock] = {}
self._background_tasks = set()

# Listen for document changes
self._document.observe(self._on_document_change)
Expand Down Expand Up @@ -100,7 +103,9 @@ async def initialize(self) -> None:
# try to apply Y updates from the YStore for this document
read_from_source = True
if self.ystore is not None:
await self.ystore.started.wait()
if not self.ystore.started.is_set():
self.create_task(self.ystore.start())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ystore.start() can throw RuntimeException if ystore task group is not None. There is another ystore.start() in yroom broadcast method. Since both the start method is scheduled asynchronously and there could a race condition, either of those two ystore.start() tasks can failed with RuntimeException due to ystore is already started. In this task, it might be ok to fail but in yroom.broadcast method, that exception could cause yroom task group to fail and restarted unless we handle the exception in yroom.broadcast implementation as well.

await self.ystore.started.wait()
try:
await self.ystore.apply_updates(self.ydoc)
self._emit(
Expand Down Expand Up @@ -174,6 +179,11 @@ async def stop(self) -> None:
self._document.unobserve()
self._file.unobserve(self.room_id)

def create_task(self, aw):
task = asyncio.create_task(aw)
self._background_tasks.add(task)
task.add_done_callback(self._background_tasks.discard)

async def _broadcast_updates(self):
# FIXME should be upstreamed
try:
Expand Down
Loading