Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

pytest_plugins = ("pytest_jupyter.jupyter_server", )
pytest_plugins = ("pytest_jupyter.jupyter_server", "jupyter_server.pytest_plugin")


@pytest.fixture
Expand Down
3 changes: 3 additions & 0 deletions jupyter_rtc_core/rooms/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .yroom_file_api import YRoomFileAPI
from .yroom_manager import YRoomManager
from .yroom import YRoom
54 changes: 44 additions & 10 deletions jupyter_rtc_core/rooms/yroom.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
from __future__ import annotations # see PEP-563 for motivation behind this
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, cast
from logging import Logger
import asyncio
from ..websockets import YjsClientGroup

import pycrdt
from pycrdt import YMessageType, YSyncMessageType as YSyncMessageSubtype
from jupyter_ydoc import ydocs as jupyter_ydoc_classes
from jupyter_ydoc.ybasedoc import YBaseDoc
from tornado.websocket import WebSocketHandler
from .yroom_file_api import YRoomFileAPI

if TYPE_CHECKING:
from typing import Literal, Tuple
from jupyter_server_fileid.manager import BaseFileIdManager
from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager

class YRoom:
"""A Room to manage all client connection to one notebook file"""
Expand All @@ -29,25 +34,49 @@ class YRoom:
"""A message queue per room to keep websocket messages in order"""


def __init__(self, *, room_id: str, log: Logger, loop: asyncio.AbstractEventLoop):
def __init__(
self,
*,
room_id: str,
log: Logger,
loop: asyncio.AbstractEventLoop,
fileid_manager: BaseFileIdManager,
contents_manager: AsyncContentsManager | ContentsManager,
):
# Bind instance attributes
self.log = log
self.loop = loop
self.room_id = room_id

# Initialize YDoc, YAwareness, YjsClientGroup, and message queue
# Initialize YjsClientGroup, YDoc, YAwareness, JupyterYDoc
self._client_group = YjsClientGroup(room_id=room_id, log=self.log, loop=self.loop)
self.ydoc = pycrdt.Doc()
self.awareness = pycrdt.Awareness(ydoc=self.ydoc)
self.awareness.observe(self.send_server_awareness)
self._client_group = YjsClientGroup(room_id=room_id, log=self.log, loop=self.loop)
self._message_queue = asyncio.Queue()
JupyterYDocClass = cast(
type[YBaseDoc],
jupyter_ydoc_classes.get(self.file_type, jupyter_ydoc_classes["file"])
)
self.jupyter_ydoc = JupyterYDocClass(ydoc=self.ydoc, awareness=self.awareness)

# Initialize YRoomFileAPI and begin loading content
self.file_api = YRoomFileAPI(
room_id=self.room_id,
jupyter_ydoc=self.jupyter_ydoc,
log=self.log,
loop=self.loop,
fileid_manager=fileid_manager,
contents_manager=contents_manager
)
self.file_api.load_ydoc_content()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if load_ydoc_content() is called in this initialize method, it need to be moved to be after observer setup line 77 and 78. because creating task of ydoc means it could happen at any time for now on.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we discussed to use a different message queue to handle awareness messages and unblock awareness message handling no matter initial file loading finished or not.


# Start observer on the `ydoc` to ensure new updates are broadcast to
# all clients and saved to disk.
# Start observers on `self.ydoc` and `self.awareness` to ensure new
# updates are broadcast to all clients and saved to disk.
self.awareness.observe(self.send_server_awareness)
self.ydoc.observe(lambda event: self.write_sync_update(event.update))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may only want to start these observers after the YDoc content is loaded. So we would want to do something like:

async def _start_observers(self):
    await self.file_api.ydoc_content_loaded
    self.ydoc.observe(...)
    self.awareness.observe(...)

and start this as a separate task in __init__().

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to wait for ydoc content loaded? We should add subscribers first. It is just no action or update in ydoc so no subscription is triggered. We need to make sure subscribers are there first before we make any update to ydoc. I think loading ydoc content will update ydoc and generate first few updates.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need a queue here self.ydoc.observe(lambda event: self.write_sync_update(event.update)) to capture those updates before the clients are added and websocket is established.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Second thought, It might make sense to call file content load method separately and later (not in init method) once connection is established and client is added. Because once ydoc loads content, it has server updates to broadcast to all clients but the client websocket might not yet be added when we initialize YRoom. Are we going to call YRoom initialization in prepare method in YWebsocketHandler before websocket connection happens, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had posted a comment earlier but realized I was wrong. Yeah you're right, we should start the observers immediately, otherwise the new loaded content doesn't get broadcast. 😂

I was trying to stop YRoom consumers from making updates to the YDoc before the content is loaded, but as you've called out, this isn't the right way to do it. I think I'll add an async API for getting the YDoc, Awareness, and JupyterYDoc that ensures the content is loaded. Will work on this now since you're busy with another meeting.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because once ydoc loads content, it has server updates to broadcast to all clients but the client websocket might not yet be added when we initialize YRoom. Are we going to call YRoom initialization in prepare method in YWebsocketHandler before websocket connection happens, right?

This should be fine!

  • If clients join before the doc is loaded: they get a SyncUpdate containing the new content.

  • If clients join after: they get the content after completing the first client SS1 + server SS2 handshake.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, clients will always join after, since we don't process the message queue until after the ydoc content is loaded.


# Start background task that routes new messages in the message queue
# to the appropriate handler method.
# Initialize message queue and start background task that routes new
# messages in the message queue to the appropriate handler method.
self._message_queue = asyncio.Queue()
self.loop.create_task(self._on_new_message())


Expand Down Expand Up @@ -91,6 +120,11 @@ async def _on_new_message(self) -> None:
message type & subtype, which are obtained from the first 2 bytes of the
message.
"""
# Wait for content to be loaded before processing any messages in the
# message queue
await self.file_api.ydoc_content_loaded

# Begin processing messages from the message queue
while True:
try:
client_id, message = await self._message_queue.get()
Expand Down
189 changes: 189 additions & 0 deletions jupyter_rtc_core/rooms/yroom_file_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
"""
WIP.
This file just contains interfaces to be filled out later.
"""

from __future__ import annotations
from typing import TYPE_CHECKING, Literal
import asyncio
import pycrdt
from jupyter_ydoc.ybasedoc import YBaseDoc
from jupyter_server.utils import ensure_async
import logging
import os

if TYPE_CHECKING:
from typing import Awaitable
from jupyter_server_fileid.manager import BaseFileIdManager
from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager

class YRoomFileAPI:
"""
Provides an API to 1 file from Jupyter Server's ContentsManager for a YRoom,
given the the room's JupyterYDoc and ID in the constructor.
To load the content, consumers should call `file_api.load_ydoc_content()`,
then `await file_api.ydoc_content_loaded` before performing any operations
on the YDoc.
To save a JupyterYDoc to the file, call
`file_api.schedule_save(jupyter_ydoc)`.
"""

# See `filemanager.py` in `jupyter_server` for references on supported file
# formats & file types.
file_format: Literal["text", "base64"]
file_type: Literal["file", "notebook"]
file_id: str
log: logging.Logger
jupyter_ydoc: YBaseDoc

_fileid_manager: BaseFileIdManager
_contents_manager: AsyncContentsManager | ContentsManager
_loop: asyncio.AbstractEventLoop
_ydoc_content_loading: False
_ydoc_content_loaded: asyncio.Event
_scheduled_saves: asyncio.Queue[None]

def __init__(
self,
*,
room_id: str,
jupyter_ydoc: YBaseDoc,
log: logging.Logger,
fileid_manager: BaseFileIdManager,
contents_manager: AsyncContentsManager | ContentsManager,
loop: asyncio.AbstractEventLoop
):
# Bind instance attributes
self.file_format, self.file_type, self.file_id = room_id.split(":")
self.jupyter_ydoc = jupyter_ydoc
self.log = log
self._loop = loop
self._fileid_manager = fileid_manager
self._contents_manager = contents_manager

# Initialize loading & loaded states
self._ydoc_content_loading = False
self._ydoc_content_loaded = asyncio.Event()

# Initialize save request queue
# Setting maxsize=1 allows 1 save in-progress with another save pending.
self._scheduled_saves = asyncio.Queue(maxsize=1)

# Start processing scheduled saves in a loop running concurrently
self._loop.create_task(self._process_scheduled_saves())


def get_path(self) -> str:
"""
Returns the path to the file by querying the FileIdManager. This is a
relative path to the `root_dir` in `ContentsManager`.
Raises a `RuntimeError` if the file ID does not refer to a valid file
path.
"""
abs_path = self._fileid_manager.get_path(self.file_id)
if not abs_path:
raise RuntimeError(
f"Unable to locate file with ID: '{self.file_id}'."
)

rel_path = os.path.relpath(abs_path, self._contents_manager.root_dir)
return rel_path


@property
def ydoc_content_loaded(self) -> Awaitable[None]:
"""
Returns an Awaitable that only resolves when the content of the YDoc is
loaded.
"""
return self._ydoc_content_loaded.wait()


def load_ydoc_content(self) -> None:
"""
Loads the file from disk asynchronously into `self.jupyter_ydoc`.
Consumers should `await file_api.ydoc_content_loaded` before performing
any operations on the YDoc.
"""
# If already loaded/loading, return immediately.
# Otherwise, set loading to `True` and start the loading task.
if self._ydoc_content_loaded.is_set() or self._ydoc_content_loading:
return
self._ydoc_content_loading = True
self._loop.create_task(self._load_ydoc_content())
Comment on lines +106 to +117
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, we can technically call this in YRoomFileAPI.__init__(), but I think as a design principle, instantiating classes shouldn't produce side-effects. Operations should be made explicit for readability.



async def _load_ydoc_content(self) -> None:
# Load the content of the file from the given file ID.
path = self.get_path()
m = await ensure_async(self._contents_manager.get(
path,
type=self.file_type,
format=self.file_format
))
content = m['content']

# Set JupyterYDoc content
self.jupyter_ydoc.source = content

# Finally, set loaded event to inform consumers that the YDoc is ready
# Also set loading to `False` for consistency
self._ydoc_content_loaded.set()
self._ydoc_content_loading = False


def schedule_save(self) -> None:
"""
Schedules a request to save the JupyterYDoc to disk. This method
requires `self.get_jupyter_ydoc()` to have been awaited prior; otherwise
this will raise a `RuntimeError`.
If there are no pending requests, then this will immediately save the
YDoc to disk in a separate background thread.
If there is any pending request, then this method does nothing, as the
YDoc will be saved when the pending request is fulfilled.
TODO: handle out-of-band changes to the file when writing.
"""
assert self.jupyter_ydoc
if not self._scheduled_saves.full():
self._scheduled_saves.put_nowait(None)


async def _process_scheduled_saves(self) -> None:
# Wait for content to be loaded before processing scheduled saves
await self._ydoc_content_loaded.wait()

while True:
try:
await self._scheduled_saves.get()
except asyncio.QueueShutDown:
return

try:
assert self.jupyter_ydoc
path = self.get_path()
content = self.jupyter_ydoc.source
file_format = self.file_format
file_type = self.file_type if self.file_type in SAVEABLE_FILE_TYPES else "file"

await ensure_async(self._contents_manager.save(
{
"format": file_format,
"type": file_type,
"content": content,
},
path
))
except Exception as e:
self.log.error("An exception occurred when saving JupyterYDoc.")
self.log.exception(e)


# see https://github.com/jupyterlab/jupyter-collaboration/blob/main/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py#L146-L149
SAVEABLE_FILE_TYPES = { "directory", "file", "notebook" }
56 changes: 0 additions & 56 deletions jupyter_rtc_core/rooms/yroom_loader.py

This file was deleted.

4 changes: 4 additions & 0 deletions jupyter_rtc_core/tests/mocks/mock_plaintext.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
i am a teapot
short and stout
here's my handle
here's my spout
Loading