-
Notifications
You must be signed in to change notification settings - Fork 9
Introduce YRoomFileAPI
#24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 8 commits
1cec327
bb90f33
dbdde01
a06a655
32d6d8a
fbce965
72d1b5e
a99614f
58a7543
cbc4069
3a90131
734cdb4
3716dbd
b1ff093
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| from .yroom_file_api import YRoomFileAPI | ||
| from .yroom_manager import YRoomManager | ||
| from .yroom import YRoom |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,15 +1,20 @@ | ||
| from __future__ import annotations # see PEP-563 for motivation behind this | ||
| from typing import TYPE_CHECKING | ||
| from typing import TYPE_CHECKING, cast | ||
| from logging import Logger | ||
| import asyncio | ||
| from ..websockets import YjsClientGroup | ||
|
|
||
| import pycrdt | ||
| from pycrdt import YMessageType, YSyncMessageType as YSyncMessageSubtype | ||
| from jupyter_ydoc import ydocs as jupyter_ydoc_classes | ||
| from jupyter_ydoc.ybasedoc import YBaseDoc | ||
| from tornado.websocket import WebSocketHandler | ||
| from .yroom_file_api import YRoomFileAPI | ||
|
|
||
| if TYPE_CHECKING: | ||
| from typing import Literal, Tuple | ||
| from jupyter_server_fileid.manager import BaseFileIdManager | ||
| from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager | ||
|
|
||
| class YRoom: | ||
| """A Room to manage all client connection to one notebook file""" | ||
|
|
@@ -29,25 +34,49 @@ class YRoom: | |
| """A message queue per room to keep websocket messages in order""" | ||
|
|
||
|
|
||
| def __init__(self, *, room_id: str, log: Logger, loop: asyncio.AbstractEventLoop): | ||
| def __init__( | ||
| self, | ||
| *, | ||
| room_id: str, | ||
| log: Logger, | ||
| loop: asyncio.AbstractEventLoop, | ||
| fileid_manager: BaseFileIdManager, | ||
| contents_manager: AsyncContentsManager | ContentsManager, | ||
| ): | ||
| # Bind instance attributes | ||
| self.log = log | ||
| self.loop = loop | ||
| self.room_id = room_id | ||
|
|
||
| # Initialize YDoc, YAwareness, YjsClientGroup, and message queue | ||
| # Initialize YjsClientGroup, YDoc, YAwareness, JupyterYDoc | ||
| self._client_group = YjsClientGroup(room_id=room_id, log=self.log, loop=self.loop) | ||
| self.ydoc = pycrdt.Doc() | ||
| self.awareness = pycrdt.Awareness(ydoc=self.ydoc) | ||
| self.awareness.observe(self.send_server_awareness) | ||
| self._client_group = YjsClientGroup(room_id=room_id, log=self.log, loop=self.loop) | ||
| self._message_queue = asyncio.Queue() | ||
| JupyterYDocClass = cast( | ||
| type[YBaseDoc], | ||
| jupyter_ydoc_classes.get(self.file_type, jupyter_ydoc_classes["file"]) | ||
| ) | ||
| self.jupyter_ydoc = JupyterYDocClass(ydoc=self.ydoc, awareness=self.awareness) | ||
|
|
||
| # Initialize YRoomFileAPI and begin loading content | ||
| self.file_api = YRoomFileAPI( | ||
| room_id=self.room_id, | ||
| jupyter_ydoc=self.jupyter_ydoc, | ||
| log=self.log, | ||
| loop=self.loop, | ||
| fileid_manager=fileid_manager, | ||
| contents_manager=contents_manager | ||
| ) | ||
| self.file_api.load_ydoc_content() | ||
|
|
||
| # Start observer on the `ydoc` to ensure new updates are broadcast to | ||
| # all clients and saved to disk. | ||
| # Start observers on `self.ydoc` and `self.awareness` to ensure new | ||
| # updates are broadcast to all clients and saved to disk. | ||
| self.awareness.observe(self.send_server_awareness) | ||
| self.ydoc.observe(lambda event: self.write_sync_update(event.update)) | ||
|
||
|
|
||
| # Start background task that routes new messages in the message queue | ||
| # to the appropriate handler method. | ||
| # Initialize message queue and start background task that routes new | ||
| # messages in the message queue to the appropriate handler method. | ||
| self._message_queue = asyncio.Queue() | ||
| self.loop.create_task(self._on_new_message()) | ||
|
|
||
|
|
||
|
|
@@ -91,6 +120,11 @@ async def _on_new_message(self) -> None: | |
| message type & subtype, which are obtained from the first 2 bytes of the | ||
| message. | ||
| """ | ||
| # Wait for content to be loaded before processing any messages in the | ||
| # message queue | ||
| await self.file_api.ydoc_content_loaded | ||
|
|
||
| # Begin processing messages from the message queue | ||
jzhang20133 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| while True: | ||
| try: | ||
| client_id, message = await self._message_queue.get() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,189 @@ | ||
| """ | ||
| WIP. | ||
| This file just contains interfaces to be filled out later. | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
| from typing import TYPE_CHECKING, Literal | ||
| import asyncio | ||
| import pycrdt | ||
| from jupyter_ydoc.ybasedoc import YBaseDoc | ||
| from jupyter_server.utils import ensure_async | ||
| import logging | ||
| import os | ||
|
|
||
| if TYPE_CHECKING: | ||
| from typing import Awaitable | ||
| from jupyter_server_fileid.manager import BaseFileIdManager | ||
| from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager | ||
|
|
||
| class YRoomFileAPI: | ||
dlqqq marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """ | ||
| Provides an API to 1 file from Jupyter Server's ContentsManager for a YRoom, | ||
| given the the room's JupyterYDoc and ID in the constructor. | ||
| To load the content, consumers should call `file_api.load_ydoc_content()`, | ||
| then `await file_api.ydoc_content_loaded` before performing any operations | ||
| on the YDoc. | ||
| To save a JupyterYDoc to the file, call | ||
| `file_api.schedule_save(jupyter_ydoc)`. | ||
| """ | ||
|
|
||
| # See `filemanager.py` in `jupyter_server` for references on supported file | ||
| # formats & file types. | ||
| file_format: Literal["text", "base64"] | ||
| file_type: Literal["file", "notebook"] | ||
| file_id: str | ||
| log: logging.Logger | ||
| jupyter_ydoc: YBaseDoc | ||
|
|
||
| _fileid_manager: BaseFileIdManager | ||
| _contents_manager: AsyncContentsManager | ContentsManager | ||
| _loop: asyncio.AbstractEventLoop | ||
| _ydoc_content_loading: False | ||
| _ydoc_content_loaded: asyncio.Event | ||
| _scheduled_saves: asyncio.Queue[None] | ||
|
|
||
| def __init__( | ||
| self, | ||
| *, | ||
| room_id: str, | ||
| jupyter_ydoc: YBaseDoc, | ||
| log: logging.Logger, | ||
| fileid_manager: BaseFileIdManager, | ||
| contents_manager: AsyncContentsManager | ContentsManager, | ||
| loop: asyncio.AbstractEventLoop | ||
| ): | ||
| # Bind instance attributes | ||
| self.file_format, self.file_type, self.file_id = room_id.split(":") | ||
| self.jupyter_ydoc = jupyter_ydoc | ||
| self.log = log | ||
| self._loop = loop | ||
| self._fileid_manager = fileid_manager | ||
| self._contents_manager = contents_manager | ||
|
|
||
| # Initialize loading & loaded states | ||
| self._ydoc_content_loading = False | ||
| self._ydoc_content_loaded = asyncio.Event() | ||
|
|
||
| # Initialize save request queue | ||
| # Setting maxsize=1 allows 1 save in-progress with another save pending. | ||
| self._scheduled_saves = asyncio.Queue(maxsize=1) | ||
|
|
||
| # Start processing scheduled saves in a loop running concurrently | ||
| self._loop.create_task(self._process_scheduled_saves()) | ||
|
|
||
|
|
||
| def get_path(self) -> str: | ||
| """ | ||
| Returns the path to the file by querying the FileIdManager. This is a | ||
| relative path to the `root_dir` in `ContentsManager`. | ||
| Raises a `RuntimeError` if the file ID does not refer to a valid file | ||
| path. | ||
| """ | ||
| abs_path = self._fileid_manager.get_path(self.file_id) | ||
| if not abs_path: | ||
| raise RuntimeError( | ||
| f"Unable to locate file with ID: '{self.file_id}'." | ||
| ) | ||
|
|
||
| rel_path = os.path.relpath(abs_path, self._contents_manager.root_dir) | ||
| return rel_path | ||
|
|
||
|
|
||
| @property | ||
| def ydoc_content_loaded(self) -> Awaitable[None]: | ||
| """ | ||
| Returns an Awaitable that only resolves when the content of the YDoc is | ||
| loaded. | ||
| """ | ||
| return self._ydoc_content_loaded.wait() | ||
|
|
||
|
|
||
| def load_ydoc_content(self) -> None: | ||
| """ | ||
| Loads the file from disk asynchronously into `self.jupyter_ydoc`. | ||
| Consumers should `await file_api.ydoc_content_loaded` before performing | ||
| any operations on the YDoc. | ||
| """ | ||
| # If already loaded/loading, return immediately. | ||
| # Otherwise, set loading to `True` and start the loading task. | ||
| if self._ydoc_content_loaded.is_set() or self._ydoc_content_loading: | ||
| return | ||
| self._ydoc_content_loading = True | ||
| self._loop.create_task(self._load_ydoc_content()) | ||
|
Comment on lines
+106
to
+117
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW, we can technically call this in |
||
|
|
||
|
|
||
| async def _load_ydoc_content(self) -> None: | ||
| # Load the content of the file from the given file ID. | ||
| path = self.get_path() | ||
| m = await ensure_async(self._contents_manager.get( | ||
| path, | ||
| type=self.file_type, | ||
| format=self.file_format | ||
| )) | ||
| content = m['content'] | ||
|
|
||
| # Set JupyterYDoc content | ||
| self.jupyter_ydoc.source = content | ||
|
|
||
| # Finally, set loaded event to inform consumers that the YDoc is ready | ||
| # Also set loading to `False` for consistency | ||
| self._ydoc_content_loaded.set() | ||
| self._ydoc_content_loading = False | ||
|
|
||
|
|
||
| def schedule_save(self) -> None: | ||
dlqqq marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """ | ||
| Schedules a request to save the JupyterYDoc to disk. This method | ||
| requires `self.get_jupyter_ydoc()` to have been awaited prior; otherwise | ||
| this will raise a `RuntimeError`. | ||
| If there are no pending requests, then this will immediately save the | ||
| YDoc to disk in a separate background thread. | ||
| If there is any pending request, then this method does nothing, as the | ||
| YDoc will be saved when the pending request is fulfilled. | ||
| TODO: handle out-of-band changes to the file when writing. | ||
| """ | ||
| assert self.jupyter_ydoc | ||
| if not self._scheduled_saves.full(): | ||
| self._scheduled_saves.put_nowait(None) | ||
|
|
||
|
|
||
| async def _process_scheduled_saves(self) -> None: | ||
dlqqq marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| # Wait for content to be loaded before processing scheduled saves | ||
| await self._ydoc_content_loaded.wait() | ||
|
|
||
| while True: | ||
| try: | ||
| await self._scheduled_saves.get() | ||
| except asyncio.QueueShutDown: | ||
| return | ||
|
|
||
| try: | ||
| assert self.jupyter_ydoc | ||
| path = self.get_path() | ||
| content = self.jupyter_ydoc.source | ||
| file_format = self.file_format | ||
| file_type = self.file_type if self.file_type in SAVEABLE_FILE_TYPES else "file" | ||
|
|
||
| await ensure_async(self._contents_manager.save( | ||
| { | ||
| "format": file_format, | ||
| "type": file_type, | ||
| "content": content, | ||
| }, | ||
| path | ||
| )) | ||
| except Exception as e: | ||
| self.log.error("An exception occurred when saving JupyterYDoc.") | ||
| self.log.exception(e) | ||
|
|
||
|
|
||
| # see https://github.com/jupyterlab/jupyter-collaboration/blob/main/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py#L146-L149 | ||
| SAVEABLE_FILE_TYPES = { "directory", "file", "notebook" } | ||
This file was deleted.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| i am a teapot | ||
| short and stout | ||
| here's my handle | ||
| here's my spout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if load_ydoc_content() is called in this initialize method, it need to be moved to be after observer setup line 77 and 78. because creating task of ydoc means it could happen at any time for now on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we discussed to use a different message queue to handle awareness messages and unblock awareness message handling no matter initial file loading finished or not.