Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

pytest_plugins = ("pytest_jupyter.jupyter_server", )
pytest_plugins = ("pytest_jupyter.jupyter_server", "jupyter_server.pytest_plugin")


@pytest.fixture
Expand Down
3 changes: 3 additions & 0 deletions jupyter_rtc_core/rooms/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .yroom_file_api import YRoomFileAPI
from .yroom_manager import YRoomManager
from .yroom import YRoom
128 changes: 88 additions & 40 deletions jupyter_rtc_core/rooms/yroom.py
Original file line number Diff line number Diff line change
@@ -1,54 +1,86 @@
from __future__ import annotations # see PEP-563 for motivation behind this
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, cast
from logging import Logger
import asyncio
from ..websockets import YjsClientGroup

import pycrdt
from pycrdt import YMessageType, YSyncMessageType as YSyncMessageSubtype
from jupyter_ydoc import ydocs as jupyter_ydoc_classes
from jupyter_ydoc.ybasedoc import YBaseDoc
from tornado.websocket import WebSocketHandler
from .yroom_file_api import YRoomFileAPI

if TYPE_CHECKING:
from typing import Literal, Tuple
from jupyter_server_fileid.manager import BaseFileIdManager
from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager

class YRoom:
"""A Room to manage all client connection to one notebook file"""
room_id: str

log: Logger
"""Log object"""
_room_id: str
"""Room Id"""
ydoc: pycrdt.Doc
_jupyter_ydoc: YBaseDoc
"""JupyterYDoc"""
_ydoc: pycrdt.Doc
"""Ydoc"""
awareness: pycrdt.Awareness
_awareness: pycrdt.Awareness
"""Ydoc awareness object"""
loop: asyncio.AbstractEventLoop
_loop: asyncio.AbstractEventLoop
"""Event loop"""
log: Logger
"""Log object"""
_client_group: YjsClientGroup
"""Client group to manage synced and desynced clients"""
_message_queue: asyncio.Queue[Tuple[str, bytes]]
"""A message queue per room to keep websocket messages in order"""


def __init__(self, *, room_id: str, log: Logger, loop: asyncio.AbstractEventLoop):
def __init__(
self,
*,
room_id: str,
log: Logger,
loop: asyncio.AbstractEventLoop,
fileid_manager: BaseFileIdManager,
contents_manager: AsyncContentsManager | ContentsManager,
):
# Bind instance attributes
self.log = log
self.loop = loop
self.room_id = room_id

# Initialize YDoc, YAwareness, YjsClientGroup, and message queue
self.ydoc = pycrdt.Doc()
self.awareness = pycrdt.Awareness(ydoc=self.ydoc)
self.awareness.observe(self.send_server_awareness)
self._client_group = YjsClientGroup(room_id=room_id, log=self.log, loop=self.loop)
self._message_queue = asyncio.Queue()
self._loop = loop
self._room_id = room_id

# Initialize YjsClientGroup, YDoc, YAwareness, JupyterYDoc
self._client_group = YjsClientGroup(room_id=room_id, log=self.log, loop=self._loop)
self._ydoc = pycrdt.Doc()
self._awareness = pycrdt.Awareness(ydoc=self._ydoc)
JupyterYDocClass = cast(
type[YBaseDoc],
jupyter_ydoc_classes.get(self.file_type, jupyter_ydoc_classes["file"])
)
self.jupyter_ydoc = JupyterYDocClass(ydoc=self._ydoc, awareness=self._awareness)

# Initialize YRoomFileAPI and begin loading content
self.file_api = YRoomFileAPI(
room_id=self._room_id,
jupyter_ydoc=self.jupyter_ydoc,
log=self.log,
loop=self._loop,
fileid_manager=fileid_manager,
contents_manager=contents_manager
)
self.file_api.load_ydoc_content()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if load_ydoc_content() is called in this initialize method, it need to be moved to be after observer setup line 77 and 78. because creating task of ydoc means it could happen at any time for now on.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we discussed to use a different message queue to handle awareness messages and unblock awareness message handling no matter initial file loading finished or not.


# Start observer on the `ydoc` to ensure new updates are broadcast to
# all clients and saved to disk.
self.ydoc.observe(lambda event: self.write_sync_update(event.update))
# Start observers on `self.ydoc` and `self.awareness` to ensure new
# updates are broadcast to all clients and saved to disk.
self._awareness.observe(self.send_server_awareness)
self._ydoc.observe(lambda event: self.write_sync_update(event.update))

# Start background task that routes new messages in the message queue
# to the appropriate handler method.
self.loop.create_task(self._on_new_message())
# Initialize message queue and start background task that routes new
# messages in the message queue to the appropriate handler method.
self._message_queue = asyncio.Queue()
self._loop.create_task(self._on_new_message())


@property
Expand All @@ -59,21 +91,32 @@ def clients(self) -> YjsClientGroup:
"""

return self._client_group


def add_client(self, websocket: WebSocketHandler) -> str:

async def get_jupyter_ydoc(self):
"""
Creates a new client from the given Tornado WebSocketHandler and
adds it to the room. Returns the ID of the new client.
Returns a reference to the room's JupyterYDoc
(`jupyter_ydoc.ybasedoc.YBaseDoc`) after waiting for its content to be
loaded from the ContentsManager.
"""
await self.file_api.ydoc_content_loaded
return self.jupyter_ydoc


return self.clients.add(websocket)


def remove_client(self, client_id: str) -> None:
"""Removes a client from the room, given the client ID."""
async def get_ydoc(self):
"""
Returns a reference to the room's YDoc (`pycrdt.Doc`) after
waiting for its content to be loaded from the ContentsManager.
"""
await self.file_api.ydoc_content_loaded
return self._ydoc

self.clients.remove(client_id)

def get_awareness(self):
"""
Returns a reference to the room's awareness (`pycrdt.Awareness`).
"""
return self._awareness


def add_message(self, client_id: str, message: bytes) -> None:
Expand All @@ -91,6 +134,11 @@ async def _on_new_message(self) -> None:
message type & subtype, which are obtained from the first 2 bytes of the
message.
"""
# Wait for content to be loaded before processing any messages in the
# message queue
await self.file_api.ydoc_content_loaded

# Begin processing messages from the message queue
while True:
try:
client_id, message = await self._message_queue.get()
Expand Down Expand Up @@ -141,7 +189,7 @@ def handle_sync_step1(self, client_id: str, message: bytes) -> None:
# Compute SyncStep2 reply
try:
message_payload = message[1:]
sync_step2_message = pycrdt.handle_sync_message(message_payload, self.ydoc)
sync_step2_message = pycrdt.handle_sync_message(message_payload, self._ydoc)
assert isinstance(sync_step2_message, bytes)
except Exception as e:
self.log.error(
Expand Down Expand Up @@ -170,7 +218,7 @@ def handle_sync_step1(self, client_id: str, message: bytes) -> None:
# Send SyncStep1 message
try:
assert isinstance(new_client.websocket, WebSocketHandler)
sync_step1_message = pycrdt.create_sync_message(self.ydoc)
sync_step1_message = pycrdt.create_sync_message(self._ydoc)
new_client.websocket.write_message(sync_step1_message)
except Exception as e:
self.log.error(
Expand All @@ -191,7 +239,7 @@ def handle_sync_step2(self, client_id: str, message: bytes) -> None:
"""
try:
message_payload = message[1:]
pycrdt.handle_sync_message(message_payload, self.ydoc)
pycrdt.handle_sync_message(message_payload, self._ydoc)
except Exception as e:
self.log.error(
"An exception occurred when applying a SyncStep2 message "
Expand All @@ -211,13 +259,13 @@ def handle_sync_update(self, client_id: str, message: bytes) -> None:
"""
# Remove client and kill websocket if received SyncUpdate when client is desynced
if self._should_ignore_update(client_id, "SyncUpdate"):
self.log.error(f"Should not receive SyncUpdate message when double handshake is not completed for client '{client_id}' and room '{self.room_id}'")
self.log.error(f"Should not receive SyncUpdate message when double handshake is not completed for client '{client_id}' and room '{self._room_id}'")
self._client_group.remove(client_id)

# Apply the SyncUpdate to the YDoc
try:
message_payload = message[1:]
pycrdt.handle_sync_message(message_payload, self.ydoc)
pycrdt.handle_sync_message(message_payload, self._ydoc)
except Exception as e:
self.log.error(
"An exception occurred when applying a SyncUpdate message "
Expand Down Expand Up @@ -251,7 +299,7 @@ def handle_awareness_update(self, client_id: str, message: bytes) -> None:
# Apply the AwarenessUpdate message
try:
message_payload = message[1:]
self.awareness.apply_awareness_update(message_payload, origin=self)
self._awareness.apply_awareness_update(message_payload, origin=self)
except Exception as e:
self.log.error(
"An exception occurred when applying an AwarenessUpdate"
Expand Down Expand Up @@ -315,7 +363,7 @@ def send_server_awareness(self, type: str, changes: tuple[dict[str, Any], Any])
return

updated_clients = [v for value in changes[0].values() for v in value]
state = self.awareness.encode_awareness_update(updated_clients)
state = self._awareness.encode_awareness_update(updated_clients)
message = pycrdt.create_awareness_message(state)
self._broadcast_message(message, "AwarenessUpdate")

Expand Down
Loading