From 36ee5de088d405a2714c894e6c1e6b3fc8041629 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Wed, 11 Jun 2025 13:52:30 -0700 Subject: [PATCH 01/21] remove rooms when they start stopping, not after --- jupyter_server_documents/rooms/yroom.py | 30 +++++++++++-------- .../rooms/yroom_manager.py | 20 +++++++++---- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/jupyter_server_documents/rooms/yroom.py b/jupyter_server_documents/rooms/yroom.py index 4a07062..8890406 100644 --- a/jupyter_server_documents/rooms/yroom.py +++ b/jupyter_server_documents/rooms/yroom.py @@ -76,9 +76,10 @@ class YRoom: _ydoc_subscription: pycrdt.Subscription """Subscription to YDoc changes.""" - _on_stop: callable[[], Any] | None + _on_stopping: callable[[], Any] | None """ - Callback to run after stopping, provided in the constructor. + Callback to run as soon as `stop()` or `stop_immediately()` are called. Only + set in the constructor. """ _fileid_manager: BaseFileIdManager @@ -93,8 +94,8 @@ def __init__( loop: asyncio.AbstractEventLoop, fileid_manager: BaseFileIdManager, contents_manager: AsyncContentsManager | ContentsManager, - on_stop: callable[[], Any] | None = None, - event_logger: EventLogger + event_logger: EventLogger, + on_stopping: callable[[], Any] | None = None, ): # Bind instance attributes self.room_id = room_id @@ -102,7 +103,7 @@ def __init__( self._loop = loop self._fileid_manager = fileid_manager self._contents_manager = contents_manager - self._on_stop = on_stop + self._on_stopping = on_stopping # Initialize YjsClientGroup, YDoc, YAwareness, JupyterYDoc self._client_group = YjsClientGroup(room_id=room_id, log=self.log, loop=self._loop) @@ -680,12 +681,18 @@ def stop_immediately(self, close_code: int) -> None: apply pending updates or save the file, e.g. when the file has been deleted from disk. """ + # First, run the 'stopping' callback to inform the consumer + if self._on_stopping: + self._on_stopping() + # Disconnect all clients with given `close_code` self.clients.stop(close_code=close_code) # Remove all observers self._ydoc.unobserve(self._ydoc_subscription) self._awareness.unobserve(self._awareness_subscription) + if self._jupyter_ydoc: + self._jupyter_ydoc.unobserve() # Purge the message queue immediately, dropping all queued messages while not self._message_queue.empty(): @@ -699,17 +706,17 @@ def stop_immediately(self, close_code: int) -> None: if self.file_api: self.file_api.stop() - # Finally, run the provided callback (if any) and return - if self._on_stop: - self._on_stop() - async def stop(self) -> None: """ Stops the YRoom gracefully by disconnecting all clients with close code 1001, applying all pending updates, and saving the YDoc before exiting. """ - # First, disconnect all clients by stopping the client group. + # First, run the 'stopping' callback to inform the consumer + if self._on_stopping: + self._on_stopping() + + # Disconnect all clients by stopping the client group. self.clients.stop() # Remove all observers, as updates no longer need to be broadcast @@ -727,9 +734,6 @@ async def stop(self) -> None: if self.file_api: await self.file_api.stop_then_save() - # Finally, run the provided callback (if any) and return - if self._on_stop: - self._on_stop() def should_ignore_state_update(event: pycrdt.MapEvent) -> bool: """ diff --git a/jupyter_server_documents/rooms/yroom_manager.py b/jupyter_server_documents/rooms/yroom_manager.py index e6d7576..7316518 100644 --- a/jupyter_server_documents/rooms/yroom_manager.py +++ b/jupyter_server_documents/rooms/yroom_manager.py @@ -12,6 +12,12 @@ class YRoomManager(): _rooms_by_id: dict[str, YRoom] + """ + Dictionary of active `YRoom` instances, keyed by room ID. + + It is guaranteed that if a room is present in the dictionary, the room is + not currently stopping. This is ensured by `_handle_yroom_stopping()`. + """ def __init__( self, @@ -44,7 +50,7 @@ def get_room(self, room_id: str) -> YRoom | None: method will initialize a new YRoom. """ - # If room exists, then return it immediately + # If room exists, return the room if room_id in self._rooms_by_id: return self._rooms_by_id[room_id] @@ -57,7 +63,7 @@ def get_room(self, room_id: str) -> YRoom | None: loop=self.loop, fileid_manager=self.fileid_manager, contents_manager=self.contents_manager, - on_stop=lambda: self._handle_yroom_stop(room_id), + on_stopping=lambda: self._handle_yroom_stopping(room_id), event_logger=self.event_logger, ) self._rooms_by_id[room_id] = yroom @@ -70,11 +76,15 @@ def get_room(self, room_id: str) -> YRoom | None: return None - def _handle_yroom_stop(self, room_id: str) -> None: + def _handle_yroom_stopping(self, room_id: str) -> None: """ - Callback that is run when the YRoom is stopped. This ensures the room is - removed from the dictionary for garbage collection, even if the room was + Callback that is run when the YRoom starts stopping. This callback: + + - Ensures the room is removed from the dictionary, even if the room was stopped directly without `YRoomManager.delete_room()`. + + - Prevents future connections to the stopping room and allows its memory + to be freed once complete. """ self._rooms_by_id.pop(room_id, None) From 45e290bacf2557e257f7ce7cf709539164d9328e Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Wed, 11 Jun 2025 14:17:02 -0700 Subject: [PATCH 02/21] automatically stop empty rooms in background task every 5s --- .../rooms/yroom_manager.py | 55 +++++++++++++++++-- .../websockets/clients.py | 7 ++- 2 files changed, 53 insertions(+), 9 deletions(-) diff --git a/jupyter_server_documents/rooms/yroom_manager.py b/jupyter_server_documents/rooms/yroom_manager.py index 7316518..82bc6cb 100644 --- a/jupyter_server_documents/rooms/yroom_manager.py +++ b/jupyter_server_documents/rooms/yroom_manager.py @@ -19,6 +19,13 @@ class YRoomManager(): not currently stopping. This is ensured by `_handle_yroom_stopping()`. """ + _get_fileid_manager: callable[[], BaseFileIdManager] + contents_manager: AsyncContentsManager | ContentsManager + event_logger: EventLogger + loop: asyncio.AbstractEventLoop + log: logging.Logger + _watch_rooms_task: asyncio.Task + def __init__( self, *, @@ -37,7 +44,11 @@ def __init__( # Initialize dictionary of YRooms, keyed by room ID self._rooms_by_id = {} - + + # Start `self._watch_rooms()` background task to automatically stop + # empty rooms + self._watch_rooms_task = self.loop.create_task(self._watch_rooms()) + @property def fileid_manager(self) -> BaseFileIdManager: @@ -111,24 +122,56 @@ async def delete_room(self, room_id: str) -> None: self.log.exception(e) return False + async def _watch_rooms(self) -> None: + """ + Background task that checks all `YRoom` instances every 5 seconds, and + deletes any rooms that are empty. + + TODO: Notebooks? + """ + while True: + # Check every 5 seconds + await asyncio.sleep(5) + + # Get room IDs from the room dictionary in advance, as the + # dictionary will mutate if rooms are deleted. + room_ids = set(self._rooms_by_id.keys()) + + # Iterate through all rooms. If any rooms are empty, stop the rooms. + for room_id in room_ids: + # If room_id is not in this dictionary, then the room was + # stopped by someone else while this `for` loop was still + # running. Continue, as the room is already being stopped. + if room_id not in self._rooms_by_id: + continue + + # Otherwise, stop the room if it's empty + room = self._rooms_by_id[room_id] + if room.clients.count == 0: + self.log.info(f"Found empty YRoom '{room_id}'.") + self.loop.create_task(self.delete_room(room_id)) + async def stop(self) -> None: """ Gracefully deletes each `YRoom`. See `delete_room()` for more info. """ + # First, stop all background tasks + self._watch_rooms_task.cancel() + + # Get all room IDs. If there are none, return early, as all rooms are + # already stopped. room_ids = list(self._rooms_by_id.keys()) room_count = len(room_ids) - if room_count == 0: return - self.log.info( - f"Stopping `YRoomManager` and deleting all {room_count} YRooms." - ) - # Delete rooms in parallel. # Note that we do not use `asyncio.TaskGroup` here because that cancels # all other tasks when any task raises an exception. + self.log.info( + f"Stopping `YRoomManager` and deleting all {room_count} YRooms." + ) deletion_tasks = [] for room_id in room_ids: dt = asyncio.create_task(self.delete_room(room_id)) diff --git a/jupyter_server_documents/websockets/clients.py b/jupyter_server_documents/websockets/clients.py index c0f662a..ac9244c 100644 --- a/jupyter_server_documents/websockets/clients.py +++ b/jupyter_server_documents/websockets/clients.py @@ -157,9 +157,10 @@ def get_all(self, synced_only: bool = True) -> list[YjsClient]: return all_clients - def is_empty(self) -> bool: - """Returns whether the client group is empty.""" - return len(self.synced) == 0 and len(self.desynced) == 0 + @property + def count(self) -> int: + """Returns the number of clients synced / syncing to the room.""" + return len(self.synced) + len(self.desynced) async def _clean_desynced(self) -> None: while True: From 0b4e494cf00f5377b4b462c90894371e40355acc Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Wed, 11 Jun 2025 14:50:11 -0700 Subject: [PATCH 03/21] bump to 10 second interval --- .../rooms/yroom_manager.py | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/jupyter_server_documents/rooms/yroom_manager.py b/jupyter_server_documents/rooms/yroom_manager.py index 82bc6cb..a63c941 100644 --- a/jupyter_server_documents/rooms/yroom_manager.py +++ b/jupyter_server_documents/rooms/yroom_manager.py @@ -124,14 +124,12 @@ async def delete_room(self, room_id: str) -> None: async def _watch_rooms(self) -> None: """ - Background task that checks all `YRoom` instances every 5 seconds, and + Background task that checks all `YRoom` instances every 10 seconds, and deletes any rooms that are empty. - - TODO: Notebooks? """ while True: - # Check every 5 seconds - await asyncio.sleep(5) + # Check every 10 seconds + await asyncio.sleep(10) # Get room IDs from the room dictionary in advance, as the # dictionary will mutate if rooms are deleted. @@ -139,17 +137,20 @@ async def _watch_rooms(self) -> None: # Iterate through all rooms. If any rooms are empty, stop the rooms. for room_id in room_ids: - # If room_id is not in this dictionary, then the room was - # stopped by someone else while this `for` loop was still - # running. Continue, as the room is already being stopped. + # Continue if `room_id`` is not in the rooms dictionary. This + # happens if the room was stopped by something else while this + # `for` loop is still running, so we must check. if room_id not in self._rooms_by_id: continue - # Otherwise, stop the room if it's empty + # Continue if the room is not empty room = self._rooms_by_id[room_id] - if room.clients.count == 0: - self.log.info(f"Found empty YRoom '{room_id}'.") - self.loop.create_task(self.delete_room(room_id)) + if room.clients.count != 0: + continue + + # Otherwise, delete the room + self.log.info(f"Found empty YRoom '{room_id}'.") + self.loop.create_task(self.delete_room(room_id)) async def stop(self) -> None: From e7bf15b43a2b95b3463884f9dcc8cf6bfe554ba9 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Thu, 12 Jun 2025 09:29:51 -0700 Subject: [PATCH 04/21] only delete notebook rooms once kernel idle/dead --- jupyter_server_documents/rooms/yroom.py | 6 +-- .../rooms/yroom_manager.py | 47 +++++++++++++++++-- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/jupyter_server_documents/rooms/yroom.py b/jupyter_server_documents/rooms/yroom.py index 8890406..5309437 100644 --- a/jupyter_server_documents/rooms/yroom.py +++ b/jupyter_server_documents/rooms/yroom.py @@ -212,7 +212,7 @@ def clients(self) -> YjsClientGroup: return self._client_group - async def get_jupyter_ydoc(self): + async def get_jupyter_ydoc(self) -> YBaseDoc: """ Returns a reference to the room's JupyterYDoc (`jupyter_ydoc.ybasedoc.YBaseDoc`) after waiting for its content to be @@ -227,7 +227,7 @@ async def get_jupyter_ydoc(self): return self._jupyter_ydoc - async def get_ydoc(self): + async def get_ydoc(self) -> pycrdt.Doc: """ Returns a reference to the room's YDoc (`pycrdt.Doc`) after waiting for its content to be loaded from the ContentsManager. @@ -237,7 +237,7 @@ async def get_ydoc(self): return self._ydoc - def get_awareness(self): + def get_awareness(self) -> pycrdt.Awareness: """ Returns a reference to the room's awareness (`pycrdt.Awareness`). """ diff --git a/jupyter_server_documents/rooms/yroom_manager.py b/jupyter_server_documents/rooms/yroom_manager.py index a63c941..f98d291 100644 --- a/jupyter_server_documents/rooms/yroom_manager.py +++ b/jupyter_server_documents/rooms/yroom_manager.py @@ -11,6 +11,17 @@ from jupyter_events import EventLogger class YRoomManager(): + """ + A singleton that manages all `YRoom` instances in the server extension. This + automatically deletes empty `YRoom`s with no connected clients or active + kernel every 10 seconds. + + Because rooms may be deleted due to inactivity, consumers should only store + a reference to the room ID and call `get_room(room_id)` each time a + reference to the room is needed. This method is cheap as long as the room + still exists. + """ + _rooms_by_id: dict[str, YRoom] """ Dictionary of active `YRoom` instances, keyed by room ID. @@ -86,6 +97,12 @@ def get_room(self, room_id: str) -> YRoom | None: ) return None + def has_room(self, room_id: str) -> bool: + """ + Returns whether a `YRoom` instance with a matching `room_id` already + exists. + """ + return room_id in self._rooms_by_id def _handle_yroom_stopping(self, room_id: str) -> None: """ @@ -124,29 +141,49 @@ async def delete_room(self, room_id: str) -> None: async def _watch_rooms(self) -> None: """ - Background task that checks all `YRoom` instances every 10 seconds, and - deletes any rooms that are empty. + Background task that checks all `YRoom` instances every 10 seconds, + deleting any rooms that are totally inactive. + + - For rooms providing notebooks: This task deletes the room if it has no + connected clients and its kernel execution status is either 'idle' or + 'dead'. + + - For all other rooms: This task deletes the room if it has no connected + clients. """ while True: # Check every 10 seconds await asyncio.sleep(10) - # Get room IDs from the room dictionary in advance, as the + # Get all room IDs from the room dictionary in advance, as the # dictionary will mutate if rooms are deleted. room_ids = set(self._rooms_by_id.keys()) + # Remove the global awareness room ID from this set, as that room + # should not be stopped until the server extension is stopped. + room_ids.discard("JupyterLab:globalAwareness") + # Iterate through all rooms. If any rooms are empty, stop the rooms. for room_id in room_ids: - # Continue if `room_id`` is not in the rooms dictionary. This + # Continue if `room_id` is not in the rooms dictionary. This # happens if the room was stopped by something else while this # `for` loop is still running, so we must check. if room_id not in self._rooms_by_id: continue - # Continue if the room is not empty + # Continue if the room has any connected clients. room = self._rooms_by_id[room_id] if room.clients.count != 0: continue + + # Continue if the room contains a notebook with kernel execution + # state neither 'idle' nor 'dead'. + # In this case, the notebook kernel may still be running code + # cells, so the room should not be closed. + awareness = room.get_awareness().get_local_state() or {} + execution_state = awareness.get("kernel", {}).get("execution_state", None) + if execution_state not in { "idle", "dead" }: + continue # Otherwise, delete the room self.log.info(f"Found empty YRoom '{room_id}'.") From 8f2180a1a07c8323549ebb0f1ce651c2cf912fc9 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Thu, 12 Jun 2025 10:52:07 -0700 Subject: [PATCH 05/21] fix #111, implement update_session() --- jupyter_server_documents/session_manager.py | 156 +++++++++++++++----- 1 file changed, 118 insertions(+), 38 deletions(-) diff --git a/jupyter_server_documents/session_manager.py b/jupyter_server_documents/session_manager.py index e8e080d..0e255dc 100644 --- a/jupyter_server_documents/session_manager.py +++ b/jupyter_server_documents/session_manager.py @@ -30,20 +30,47 @@ def yroom_manager(self) -> YRoomManager: """The Jupyter Server's YRoom Manager.""" return self.serverapp.web_app.settings["yroom_manager"] - def get_kernel_client(self, kernel_id) -> DocumentAwareKernelClient: + _room_ids: dict[str, str] + """ + Dictionary of room IDs, keyed by session ID. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._room_ids = {} + + def get_kernel_client(self, kernel_id: str) -> DocumentAwareKernelClient: """Get the kernel client for a running kernel.""" kernel_manager = self.kernel_manager.get_kernel(kernel_id) kernel_client = kernel_manager.main_client return kernel_client - # The `type` argument here is for future proofing this API. - # Today, only notebooks have ydocs with kernels connected. - # In the future, we will include consoles here. - def get_yroom(self, path, type) -> YRoom: - """Get the yroom for a given path.""" + def get_yroom(self, session_id: str) -> YRoom: + """ + Get the `YRoom` for a session given its ID. The session must have + been created first via a call to `create_session()`. + """ + room_id = self._room_ids.get(session_id, None) + yroom = self.yroom_manager.get_room(room_id) if room_id else None + if not yroom: + raise LookupError(f"No room found for session '{session_id}'.") + return yroom + + + def _init_session_yroom(self, session_id: str, path: str) -> YRoom: + """ + Returns a `YRoom` for a session identified by the given `session_id` and + `path`. This should be called only in `create_session()`. + + This method stores the new room ID & session ID in `self._room_ids`. The + `YRoom` for a session can be retrieved via `self.get_yroom()` after this + method is called. + """ file_id = self.file_id_manager.index(path) room_id = f"json:notebook:{file_id}" yroom = self.yroom_manager.get_room(room_id) + self._room_ids[session_id] = room_id + return yroom async def create_session( @@ -57,51 +84,104 @@ async def create_session( """ After creating a session, connects the yroom to the kernel client. """ - output = await super().create_session( + session_model = await super().create_session( path, name, type, kernel_name, kernel_id ) + session_id = session_model["id"] if kernel_id is None: - kernel_id = output["kernel"]["id"] - - # Connect this session's yroom to the kernel. - if type == "notebook": - # If name or path is None, we cannot map to a yroom, - # so just move on. - if name is None or path is None: - self.log.debug("`name` or `path` was not given, so a yroom was not set up for this session.") - return output - # When JupyterLab creates a session, it uses a fake path - # which is the relative path + UUID, i.e. the notebook - # name is incorrect temporarily. It later makes multiple - # updates to the session to correct the path. - # - # Here, we create the true path to store in the fileID service - # by dropping the UUID and appending the file name. - real_path = os.path.join(os.path.split(path)[0], name) - yroom = self.get_yroom(real_path, type) - # TODO: we likely have a race condition here... need to - # think about it more. Currently, the kernel client gets - # created after the kernel starts fully. We need the - # kernel client instantiated _before_ trying to connect - # the yroom. - kernel_client = self.get_kernel_client(kernel_id) - await kernel_client.add_yroom(yroom) - self.log.info(f"Connected yroom {yroom.room_id} to kernel {kernel_id}. yroom: {yroom}") - else: - self.log.debug(f"Document type {type} is not supported by YRoom.") - return output + kernel_id = session_model["kernel"]["id"] + + # If the type is not 'notebook', return the session model immediately + if type != "notebook": + self.log.warning( + f"Document type '{type}' is not recognized by YDocSessionManager." + ) + return session_model + + # If name or path is None, we cannot map to a yroom, + # so just move on. + if name is None or path is None: + self.log.warning(f"`name` or `path` was not given for new session at '{path}'.") + return session_model + + # Otherwise, get a `YRoom` and add it to this session's kernel client. + + # When JupyterLab creates a session, it uses a fake path + # which is the relative path + UUID, i.e. the notebook + # name is incorrect temporarily. It later makes multiple + # updates to the session to correct the path. + # + # Here, we create the true path to store in the fileID service + # by dropping the UUID and appending the file name. + real_path = os.path.join(os.path.split(path)[0], name) + + # Get YRoom for this session and store its ID in `self._room_ids` + yroom = self._init_session_yroom(session_id, real_path) + + # Add YRoom to this session's kernel client + # TODO: we likely have a race condition here... need to + # think about it more. Currently, the kernel client gets + # created after the kernel starts fully. We need the + # kernel client instantiated _before_ trying to connect + # the yroom. + kernel_client = self.get_kernel_client(kernel_id) + await kernel_client.add_yroom(yroom) + self.log.info(f"Connected yroom {yroom.room_id} to kernel {kernel_id}. yroom: {yroom}") + return session_model + + + async def update_session(self, session_id, **update) -> None: + """ + Updates the session identified by `session_id` using the keyword + arguments passed to this method. Each keyword argument should correspond + to a column in the sessions table. + + This class calls the `update_session()` parent method, then updates the + kernel client if `update` contains `kernel_id`. + """ + # Apply update and return early if `kernel_id` was not updated + if "kernel_id" not in update: + return await super().update_session(session_id, **update) + + # Otherwise, first remove the YRoom from the old kernel client and add + # the YRoom to the new kernel client, before applying the update. + old_session_info = (await self.get_session(session_id=session_id) or {}) + old_kernel_id = old_session_info.get("kernel_id", None) + new_kernel_id = update.get("kernel_id", None) + self.log.info( + f"Updating session '{session_id}' from kernel '{old_kernel_id}' " + f"to kernel '{new_kernel_id}'." + ) + yroom = self.get_yroom(session_id) + if old_kernel_id: + old_kernel_client = self.get_kernel_client(old_kernel_id) + await old_kernel_client.remove_yroom(yroom=yroom) + if new_kernel_id: + new_kernel_client = self.get_kernel_client(new_kernel_id) + await new_kernel_client.add_yroom(yroom=yroom) + + # Apply update and return + return await super().update_session(session_id, **update) + async def delete_session(self, session_id): """ Deletes the session and disconnects the yroom from the kernel client. """ session = await self.get_session(session_id=session_id) - kernel_id, path, type = session["kernel"]["id"], session["path"], session["type"] - yroom = self.get_yroom(path, type) + kernel_id = session["kernel"]["id"] + + # Remove YRoom from session's kernel client + yroom = self.get_yroom(session_id) kernel_client = self.get_kernel_client(kernel_id) await kernel_client.remove_yroom(yroom) + + # Remove room ID stored for the session + self._room_ids.pop(session_id, None) + + # Delete the session via the parent method await super().delete_session(session_id) \ No newline at end of file From 9c3b680696b73b71b8df5599fbfc017aecc27f89 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Thu, 12 Jun 2025 13:29:02 -0700 Subject: [PATCH 06/21] update kernel client to always use latest rooms from YRoomManager --- .../kernels/kernel_client.py | 83 ++++++++++++++++--- jupyter_server_documents/session_manager.py | 5 +- 2 files changed, 76 insertions(+), 12 deletions(-) diff --git a/jupyter_server_documents/kernels/kernel_client.py b/jupyter_server_documents/kernels/kernel_client.py index 658087a..f61e7fe 100644 --- a/jupyter_server_documents/kernels/kernel_client.py +++ b/jupyter_server_documents/kernels/kernel_client.py @@ -1,9 +1,9 @@ """ A new Kernel client that is aware of ydocuments. """ +from __future__ import annotations import anyio import asyncio -import json import typing as t from traitlets import Set, Instance, Any, Type, default @@ -16,6 +16,9 @@ from .kernel_client_abc import AbstractDocumentAwareKernelClient +if t.TYPE_CHECKING: + from jupyter_server_documents.rooms.yroom_manager import YRoomManager + class DocumentAwareKernelClient(AsyncKernelClient): """ @@ -39,11 +42,6 @@ def _default_message_cache(self): # message is received. _listeners = Set(allow_none=True) - # A set of YRooms that will intercept output and kernel - # status messages. - _yrooms: t.Set[YRoom] = Set(trait=Instance(YRoom), default_value=set()) - - output_processor = Instance( OutputProcessor, allow_none=True @@ -58,6 +56,24 @@ def _default_message_cache(self): def _default_output_processor(self) -> OutputProcessor: self.log.info("Creating output processor") return self.output_process_class(parent=self, config=self.config) + + _yroom_manager: YRoomManager | None + """ + The YRoomManager registered via `self.bind_yroom_manager()`, which must be + called before adding any `YRoom` via `self.add_yroom()`. + """ + + _yroom_ids: set[str] + """ + The set of room IDs that are registered with this kernel client. This class + stores room IDs instead of `YRoom` instances because `YRoom` instances may + be deleted once inactive. + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._yroom_manager = None + self._yroom_ids = set() async def start_listening(self): """Start listening to messages coming from the kernel. @@ -286,18 +302,63 @@ async def handle_document_related_message(self, msg: t.List[bytes]) -> t.Optiona # Default return if message is processed and does not need forwarding return msg + @property + def _yrooms(self) -> list[YRoom]: + """ + Returns the list of YRoom instances registered to this kernel client. + """ + if len(self._yroom_ids) == 0: + return [] + + assert self._yroom_manager + rooms: list[YRoom] = [] + + # Always call `get_room()` to get the latest reference to the room. We + # must do this since rooms may be deleted upon inactivity. The + # `get_room()` method returns a cached value as long as the room was not + # deleted, so this is very fast in most cases. + for room_id in self._yroom_ids: + room = self._yroom_manager.get_room(room_id) + rooms.append(room) + + return rooms + async def add_yroom(self, yroom: YRoom): """ - Register a YRoom with this kernel client. YRooms will - intercept display and kernel status messages. + Register a `YRoom` with this kernel client, given the room ID. + Registered `YRoom`s will intercept display and kernel status messages. + + `self.bind_yroom_manager()` must be called before using this method. """ - self._yrooms.add(yroom) + assert self._yroom_manager + self._yroom_ids.add(yroom.room_id) + self.log.info( + f"Added room '{yroom.room_id}' to kernel '{self.kernel_name}'. " + f"Total rooms: {len(self._yroom_ids)}" + ) async def remove_yroom(self, yroom: YRoom): """ - De-register a YRoom from handling kernel client messages. + De-register a `YRoom` from this kernel client, given the room ID. + + `self.bind_yroom_manager()` must be called before using this method. + """ + self._yrooms_ids.discard(yroom.room_id) + self.log.info( + f"Removed room '{yroom.room_id}' from kernel '{self.kernel_name}'. " + f"Total rooms: {len(self._yroom_ids)}" + ) + + @property + def yroom_manager(self) -> YRoomManager: + return self._yroom_manager + + def bind_yroom_manager(self, yroom_manager: YRoomManager): + """ + Binds a reference to the `YRoomManager` singleton to this instance. This + method must be called before adding a room. """ - self._yrooms.discard(yroom) + self._yroom_manager = yroom_manager AbstractDocumentAwareKernelClient.register(DocumentAwareKernelClient) diff --git a/jupyter_server_documents/session_manager.py b/jupyter_server_documents/session_manager.py index 0e255dc..b09ed01 100644 --- a/jupyter_server_documents/session_manager.py +++ b/jupyter_server_documents/session_manager.py @@ -122,13 +122,16 @@ async def create_session( # Get YRoom for this session and store its ID in `self._room_ids` yroom = self._init_session_yroom(session_id, real_path) + # Bind `YRoomManager` to the kernel client + kernel_client = self.get_kernel_client(kernel_id) + kernel_client.bind_yroom_manager(yroom_manager=self.yroom_manager) + # Add YRoom to this session's kernel client # TODO: we likely have a race condition here... need to # think about it more. Currently, the kernel client gets # created after the kernel starts fully. We need the # kernel client instantiated _before_ trying to connect # the yroom. - kernel_client = self.get_kernel_client(kernel_id) await kernel_client.add_yroom(yroom) self.log.info(f"Connected yroom {yroom.room_id} to kernel {kernel_id}. yroom: {yroom}") return session_model From 50b7cd8c59029206ac11baaeea4b47065fe3c31e Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Thu, 12 Jun 2025 13:33:41 -0700 Subject: [PATCH 07/21] fix room cleanup for non-notebooks --- jupyter_server_documents/rooms/yroom_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jupyter_server_documents/rooms/yroom_manager.py b/jupyter_server_documents/rooms/yroom_manager.py index f98d291..8feab86 100644 --- a/jupyter_server_documents/rooms/yroom_manager.py +++ b/jupyter_server_documents/rooms/yroom_manager.py @@ -182,7 +182,7 @@ async def _watch_rooms(self) -> None: # cells, so the room should not be closed. awareness = room.get_awareness().get_local_state() or {} execution_state = awareness.get("kernel", {}).get("execution_state", None) - if execution_state not in { "idle", "dead" }: + if execution_state not in { "idle", "dead", None }: continue # Otherwise, delete the room From 7da899c6fde9773a174ead39f4219c6daec514db Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Thu, 12 Jun 2025 14:32:08 -0700 Subject: [PATCH 08/21] ensure returned rooms are kept alive for 10 seconds --- .../rooms/yroom_manager.py | 37 +++++++++++++++++-- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/jupyter_server_documents/rooms/yroom_manager.py b/jupyter_server_documents/rooms/yroom_manager.py index 8feab86..0b16e07 100644 --- a/jupyter_server_documents/rooms/yroom_manager.py +++ b/jupyter_server_documents/rooms/yroom_manager.py @@ -30,6 +30,14 @@ class YRoomManager(): not currently stopping. This is ensured by `_handle_yroom_stopping()`. """ + _inactive_rooms: set[str] + """ + Set of room IDs that were marked inactive on the last iteration of + `_watch_rooms()`. If a room is inactive and its ID is present in this set, + then the room has been inactive for >10 seconds, and the room should be + deleted in `_watch_rooms()`. + """ + _get_fileid_manager: callable[[], BaseFileIdManager] contents_manager: AsyncContentsManager | ContentsManager event_logger: EventLogger @@ -56,6 +64,9 @@ def __init__( # Initialize dictionary of YRooms, keyed by room ID self._rooms_by_id = {} + # Initialize set of inactive rooms tracked by `self._watch_rooms()` + self._inactive_rooms = set() + # Start `self._watch_rooms()` background task to automatically stop # empty rooms self._watch_rooms_task = self.loop.create_task(self._watch_rooms()) @@ -70,7 +81,14 @@ def get_room(self, room_id: str) -> YRoom | None: """ Retrieves a YRoom given a room ID. If the YRoom does not exist, this method will initialize a new YRoom. + + This method ensures that the returned room will be alive for >10 + seconds. This prevents the room from being deleted shortly after the + consumer receives it via this method, even if it is inactive. """ + # First, ensure this room stays open for >10 seconds by removing it from + # the inactive set of rooms if it is present. + self._inactive_rooms.discard(room_id) # If room exists, return the room if room_id in self._rooms_by_id: @@ -142,7 +160,7 @@ async def delete_room(self, room_id: str) -> None: async def _watch_rooms(self) -> None: """ Background task that checks all `YRoom` instances every 10 seconds, - deleting any rooms that are totally inactive. + deleting any rooms that have been inactive for >10 seconds. - For rooms providing notebooks: This task deletes the room if it has no connected clients and its kernel execution status is either 'idle' or @@ -169,11 +187,13 @@ async def _watch_rooms(self) -> None: # happens if the room was stopped by something else while this # `for` loop is still running, so we must check. if room_id not in self._rooms_by_id: + self._inactive_rooms.discard(room_id) continue # Continue if the room has any connected clients. room = self._rooms_by_id[room_id] if room.clients.count != 0: + self._inactive_rooms.discard(room_id) continue # Continue if the room contains a notebook with kernel execution @@ -183,11 +203,20 @@ async def _watch_rooms(self) -> None: awareness = room.get_awareness().get_local_state() or {} execution_state = awareness.get("kernel", {}).get("execution_state", None) if execution_state not in { "idle", "dead", None }: + self._inactive_rooms.discard(room_id) continue - # Otherwise, delete the room - self.log.info(f"Found empty YRoom '{room_id}'.") - self.loop.create_task(self.delete_room(room_id)) + # The room is inactive if this statement is reached + # Delete the room if was marked as inactive in the last + # iteration, otherwise mark it as inactive. + if room_id in self._inactive_rooms: + self.log.info( + f"YRoom '{room_id}' has been inactive for >10 seconds. " + ) + self.loop.create_task(self.delete_room(room_id)) + self._inactive_rooms.discard(room_id) + else: + self._inactive_rooms.add(room_id) async def stop(self) -> None: From 1d15cdea60cd19572886ed87c35dca707dce2328 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Thu, 12 Jun 2025 14:50:10 -0700 Subject: [PATCH 09/21] update docstrings --- .../rooms/yroom_manager.py | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/jupyter_server_documents/rooms/yroom_manager.py b/jupyter_server_documents/rooms/yroom_manager.py index 0b16e07..444ee21 100644 --- a/jupyter_server_documents/rooms/yroom_manager.py +++ b/jupyter_server_documents/rooms/yroom_manager.py @@ -13,13 +13,12 @@ class YRoomManager(): """ A singleton that manages all `YRoom` instances in the server extension. This - automatically deletes empty `YRoom`s with no connected clients or active - kernel every 10 seconds. + automatically deletes `YRoom` instances if they have had no connected + clients or active kernel for >10 seconds. Because rooms may be deleted due to inactivity, consumers should only store a reference to the room ID and call `get_room(room_id)` each time a - reference to the room is needed. This method is cheap as long as the room - still exists. + reference to the room is needed. See `get_room()` for more details. """ _rooms_by_id: dict[str, YRoom] @@ -79,12 +78,17 @@ def fileid_manager(self) -> BaseFileIdManager: def get_room(self, room_id: str) -> YRoom | None: """ - Retrieves a YRoom given a room ID. If the YRoom does not exist, this - method will initialize a new YRoom. + Returns the `YRoom` instance for a given room ID. If the instance does + not exist, this method will initialize one and return it. Otherwise, + this method returns the instance from its cache, ensuring that this + method is fast in almost all cases. - This method ensures that the returned room will be alive for >10 + Consumers should always call this method each time a reference to the + `YRoom` is needed, since rooms may be deleted due to inactivity. + + This method also ensures that the returned room will be alive for >10 seconds. This prevents the room from being deleted shortly after the - consumer receives it via this method, even if it is inactive. + consumer receives it via this method, even if it was inactive. """ # First, ensure this room stays open for >10 seconds by removing it from # the inactive set of rooms if it is present. From ad34d9ecc9c078c66f5e8b1e22ae69a25da5ae23 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Mon, 16 Jun 2025 09:12:57 -0700 Subject: [PATCH 10/21] refactor FileAPI, allow it to be restarted --- jupyter_server_documents/rooms/yroom.py | 142 ++++++++-------- .../rooms/yroom_file_api.py | 158 ++++++++++-------- .../tests/test_yroom_file_api.py | 4 +- 3 files changed, 166 insertions(+), 138 deletions(-) diff --git a/jupyter_server_documents/rooms/yroom.py b/jupyter_server_documents/rooms/yroom.py index 5309437..9779840 100644 --- a/jupyter_server_documents/rooms/yroom.py +++ b/jupyter_server_documents/rooms/yroom.py @@ -37,10 +37,10 @@ class YRoom: The `YRoomFileAPI` instance for this room. This is set to `None` only if `self.room_id == "JupyterLab:globalAwareness"`. - The file API provides `load_ydoc_content()` for loading the YDoc content - from the `ContentsManager`, accepts & handles save requests via - `file_api.schedule_save()`, and automatically watches the file for - out-of-band changes. + The file API provides `load_content_into()` for loading the content + from the `ContentsManager` into the JupyterYDoc. It accepts & handles save + requests via `file_api.schedule_save()`, and automatically watches the file + for out-of-band changes. """ events_api: YRoomEventsAPI | None @@ -105,26 +105,28 @@ def __init__( self._contents_manager = contents_manager self._on_stopping = on_stopping - # Initialize YjsClientGroup, YDoc, YAwareness, JupyterYDoc + # Initialize YjsClientGroup, YDoc, and Awareness self._client_group = YjsClientGroup(room_id=room_id, log=self.log, loop=self._loop) - self._ydoc = pycrdt.Doc() - self._awareness = pycrdt.Awareness(ydoc=self._ydoc) + self._ydoc = self._init_ydoc() + self._awareness = self._init_awareness(ydoc=self._ydoc) # If this room is providing global awareness, set unused optional # attributes to `None`. if self.room_id == "JupyterLab:globalAwareness": - self.file_api = None self._jupyter_ydoc = None + self.file_api = None self.events_api = None else: - # Otherwise, initialize optional attributes for document rooms + # Otherwise, initialize optional attributes for document rooms. # Initialize JupyterYDoc - self._jupyter_ydoc = self._init_jupyter_ydoc() + self._jupyter_ydoc = self._init_jupyter_ydoc( + ydoc=self._ydoc, + awareness=self._awareness + ) # Initialize YRoomFileAPI, start loading content self.file_api = YRoomFileAPI( room_id=self.room_id, - jupyter_ydoc=self._jupyter_ydoc, log=self.log, loop=self._loop, fileid_manager=self._fileid_manager, @@ -133,10 +135,7 @@ def __init__( on_outofband_move=self.handle_outofband_move, on_inband_deletion=self.handle_inband_deletion ) - self.file_api.load_ydoc_content() - - # Attach Jupyter YDoc observer to automatically save on change - self._jupyter_ydoc.observe(self._on_jupyter_ydoc_update) + self.file_api.load_content_into(self._jupyter_ydoc) # Initialize YRoomEventsAPI self.events_api = YRoomEventsAPI( @@ -146,15 +145,6 @@ def __init__( log=self.log, ) - # Start observers on `self.ydoc` and `self.awareness` to ensure new - # updates are always broadcast to all clients. - self._awareness_subscription = self._awareness.observe( - self._on_awareness_update - ) - self._ydoc_subscription = self._ydoc.observe( - self._on_ydoc_update - ) - # Initialize message queue and start background task that routes new # messages in the message queue to the appropriate handler method. self._message_queue = asyncio.Queue() @@ -171,23 +161,48 @@ def __init__( # Emit 'load' event once content is loaded assert self.file_api async def emit_load_event(): - await self.file_api.ydoc_content_loaded.wait() + await self.file_api.until_content_loaded self.events_api.emit_room_event("load") self._loop.create_task(emit_load_event()) - def _init_jupyter_ydoc(self) -> YBaseDoc: + def _init_ydoc(self) -> pycrdt.Doc: + """ + Initializes a YDoc, automatically binding its `_on_ydoc_update()` + observer to `self._ydoc_subscription`. The observer can removed via + `ydoc.unobserve(self._ydoc_subscription)`. """ - Initializes a Jupyter YDoc (instance of `pycrdt.YBaseDoc`). This - should not be called in global awareness rooms, and requires - `self._ydoc` and `self._awareness` to be set prior. + self._ydoc = pycrdt.Doc() + self._ydoc_subscription = self._ydoc.observe( + self._on_ydoc_update + ) + return self._ydoc + - Raises `AssertionError` if the room ID is "JupyterLab:globalAwareness" - or if either `self._ydoc` or `self._awareness` are not set. + def _init_awareness(self, ydoc: pycrdt.Doc) -> pycrdt.Awareness: + """ + Initializes an Awareness instance, automatically binding its + `_on_awareness_update()` observer to `self._awareness_subscription`. + The observer can be removed via + `awareness.unobserve(self._awareness_subscription)`. + """ + self._awareness = pycrdt.Awareness(ydoc=ydoc) + self._awareness_subscription = self._awareness.observe( + self._on_awareness_update + ) + return self._awareness + + + def _init_jupyter_ydoc(self, ydoc: pycrdt.Doc, awareness: pycrdt.Awareness) -> YBaseDoc: + """ + Initializes a Jupyter YDoc (instance of `pycrdt.YBaseDoc`), + automatically attaching its `_on_jupyter_ydoc_update()` observer. + The observer can be removed via `jupyter_ydoc.unobserve()`. + + Raises `AssertionError` if the room ID is "JupyterLab:globalAwareness", + as a JupyterYDoc is not needed for global awareness rooms. """ assert self.room_id != "JupyterLab:globalAwareness" - assert self._ydoc - assert self._awareness # Get Jupyter YDoc class, defaulting to `YFile` if the file type is # unrecognized @@ -198,8 +213,9 @@ def _init_jupyter_ydoc(self) -> YBaseDoc: ) # Initialize Jupyter YDoc and return it - jupyter_ydoc = JupyterYDocClass(ydoc=self._ydoc, awareness=self._awareness) - return jupyter_ydoc + self._jupyter_ydoc = JupyterYDocClass(ydoc=ydoc, awareness=awareness) + self._jupyter_ydoc.observe(self._on_jupyter_ydoc_update) + return self._jupyter_ydoc @property @@ -218,12 +234,12 @@ async def get_jupyter_ydoc(self) -> YBaseDoc: (`jupyter_ydoc.ybasedoc.YBaseDoc`) after waiting for its content to be loaded from the ContentsManager. """ - if self.file_api: - await self.file_api.ydoc_content_loaded.wait() if self.room_id == "JupyterLab:globalAwareness": message = "There is no Jupyter ydoc for global awareness scenario" self.log.error(message) raise Exception(message) + if self.file_api: + await self.file_api.until_content_loaded return self._jupyter_ydoc @@ -233,7 +249,7 @@ async def get_ydoc(self) -> pycrdt.Doc: waiting for its content to be loaded from the ContentsManager. """ if self.file_api: - await self.file_api.ydoc_content_loaded.wait() + await self.file_api.until_content_loaded return self._ydoc @@ -265,7 +281,7 @@ async def _process_message_queue(self) -> None: # Wait for content to be loaded before processing any messages in the # message queue if self.file_api: - await self.file_api.ydoc_content_loaded.wait() + await self.file_api.until_content_loaded # Begin processing messages from the message queue while True: @@ -330,7 +346,7 @@ async def _process_message_queue(self) -> None: "Stopped `self._process_message_queue()` background task " f"for YRoom '{self.room_id}'." ) - + def handle_sync_step1(self, client_id: str, message: bytes) -> None: """ @@ -473,8 +489,7 @@ def _on_jupyter_ydoc_update(self, updated_key: str, event: Any) -> None: # Do nothing if the content is still loading. Clients cannot make # updates until the content is loaded, so this safely prevents an extra # save upon loading/reloading the YDoc. - content_loading = not self.file_api.ydoc_content_loaded.is_set() - if content_loading: + if not self.file_api.content_loaded: return # Do nothing if the event updates the 'state' dictionary with no effect @@ -619,33 +634,17 @@ def reload_ydoc(self) -> None: self._awareness.unobserve(self._awareness_subscription) self._jupyter_ydoc.unobserve() - # Reset YDoc, YAwareness, JupyterYDoc to empty states - self._ydoc = pycrdt.Doc() - self._awareness = pycrdt.Awareness(ydoc=self._ydoc) - self._jupyter_ydoc = self._init_jupyter_ydoc() - - # Reset `YRoomFileAPI` & reload the document - self.file_api = YRoomFileAPI( - room_id=self.room_id, - jupyter_ydoc=self._jupyter_ydoc, - log=self.log, - loop=self._loop, - fileid_manager=self._fileid_manager, - contents_manager=self._contents_manager, - on_outofband_change=self.reload_ydoc, - on_outofband_move=self.handle_outofband_move, - on_inband_deletion=self.handle_inband_deletion + # Re-initialize YDoc, YAwareness, JupyterYDoc + self._ydoc = self._init_ydoc() + self._awareness = self._init_awareness(ydoc=self._ydoc) + self._jupyter_ydoc = self._init_jupyter_ydoc( + ydoc=self._ydoc, + awareness=self._awareness ) - self.file_api.load_ydoc_content() - # Add observers to new YDoc, YAwareness, and JupyterYDoc instances - self._awareness_subscription = self._awareness.observe( - self._on_awareness_update - ) - self._ydoc_subscription = self._ydoc.observe( - self._on_ydoc_update - ) - self._jupyter_ydoc.observe(self._on_jupyter_ydoc_update) + # Reset `YRoomFileAPI` & reload the document + self.file_api.restart() + self.file_api.load_content_into(self._jupyter_ydoc) # Emit 'overwrite' event as the YDoc content has been overwritten if self.events_api: @@ -705,7 +704,7 @@ def stop_immediately(self, close_code: int) -> None: # Stop FileAPI immediately (without saving) if self.file_api: self.file_api.stop() - + async def stop(self) -> None: """ @@ -730,9 +729,12 @@ async def stop(self) -> None: await self._message_queue.join() self._message_queue.put_nowait(None) - # Stop FileAPI, saving the content before doing so + # Stop FileAPI, saving the content after doing so if self.file_api: - await self.file_api.stop_then_save() + self.file_api.stop() + await self.file_api.save_immediately(self._jupyter_ydoc) + + def should_ignore_state_update(event: pycrdt.MapEvent) -> bool: diff --git a/jupyter_server_documents/rooms/yroom_file_api.py b/jupyter_server_documents/rooms/yroom_file_api.py index c257191..97ce5f7 100644 --- a/jupyter_server_documents/rooms/yroom_file_api.py +++ b/jupyter_server_documents/rooms/yroom_file_api.py @@ -1,9 +1,3 @@ -""" -WIP. - -This file just contains interfaces to be filled out later. -""" - from __future__ import annotations from typing import TYPE_CHECKING import asyncio @@ -14,7 +8,7 @@ from tornado.web import HTTPError if TYPE_CHECKING: - from typing import Any, Callable, Literal + from typing import Any, Callable, Coroutine, Literal from jupyter_server_fileid.manager import BaseFileIdManager from jupyter_server.services.contents.manager import AsyncContentsManager, ContentsManager @@ -38,14 +32,13 @@ class YRoomFileAPI: file_type: Literal["file", "notebook"] file_id: str log: logging.Logger - jupyter_ydoc: YBaseDoc _fileid_manager: BaseFileIdManager _contents_manager: AsyncContentsManager | ContentsManager _loop: asyncio.AbstractEventLoop _save_scheduled: bool - _ydoc_content_loading: bool - _ydoc_content_loaded: asyncio.Event + _content_loading: bool + _content_load_event: asyncio.Event _last_modified: datetime | None """ @@ -74,13 +67,22 @@ class YRoomFileAPI: The callback to run when an in-band move file deletion is detected. """ - _save_loop_task: asyncio.Task + _watch_file_task: asyncio.Task | None + """ + The task running the `_watch_file()` loop that processes scheduled saves and + watches for in-band & out-of-band changes. + """ + + _stopped: bool + """ + Whether the FileAPI has been stopped, i.e. when the `_watch_file()` task is + not running. + """ def __init__( self, *, room_id: str, - jupyter_ydoc: YBaseDoc, log: logging.Logger, fileid_manager: BaseFileIdManager, contents_manager: AsyncContentsManager | ContentsManager, @@ -92,7 +94,6 @@ def __init__( # Bind instance attributes self.room_id = room_id self.file_format, self.file_type, self.file_id = room_id.split(":") - self.jupyter_ydoc = jupyter_ydoc self.log = log self._loop = loop self._fileid_manager = fileid_manager @@ -103,13 +104,11 @@ def __init__( self._save_scheduled = False self._last_path = None self._last_modified = None + self._stopped = False # Initialize loading & loaded states - self._ydoc_content_loading = False - self._ydoc_content_loaded = asyncio.Event() - - # Start processing scheduled saves in a loop running concurrently - self._save_loop_task = self._loop.create_task(self._watch_file()) + self._content_loading = False + self._content_load_event = asyncio.Event() def get_path(self) -> str | None: @@ -124,42 +123,42 @@ def get_path(self) -> str | None: @property - def ydoc_content_loaded(self) -> asyncio.Event: + def content_loaded(self) -> bool: """ - Returns an `asyncio.Event` that is set when the YDoc content is loaded. + Immediately returns whether the YDoc content is loaded. - To suspend a coroutine until the content is loaded: + To have a coroutine wait until the content is loaded, call `await + file_api.until_content_loaded` instead. + """ + return self._content_load_event.is_set() - ``` - await file_api.ydoc_content_loaded.wait() - ``` - To synchronously (i.e. immediately) check if the content is loaded: - - ``` - file_api.ydoc_content_loaded.is_set() - ``` + @property + def until_content_loaded(self) -> Coroutine[Any, Any, Literal[True]]: + """ + Returns an awaitable that resolves when the content is loaded. """ + return self._content_load_event.wait() - return self._ydoc_content_loaded - - def load_ydoc_content(self) -> None: + def load_content_into(self, jupyter_ydoc: YBaseDoc) -> None: """ - Loads the file from disk asynchronously into `self.jupyter_ydoc`. + Loads the file content into the given JupyterYDoc. Consumers should `await file_api.ydoc_content_loaded` before performing any operations on the YDoc. + + This method starts the `_watch_file()` task after the content is loaded. """ # If already loaded/loading, return immediately. # Otherwise, set loading to `True` and start the loading task. - if self._ydoc_content_loaded.is_set() or self._ydoc_content_loading: + if self._content_load_event.is_set() or self._content_loading: return - self._ydoc_content_loading = True - self._loop.create_task(self._load_ydoc_content()) + self._content_loading = True + self._loop.create_task(self._load_content(jupyter_ydoc)) - async def _load_ydoc_content(self) -> None: + async def _load_content(self, jupyter_ydoc: YBaseDoc) -> None: # Get the path specified on the file ID path = self.get_path() if not path: @@ -176,18 +175,23 @@ async def _load_ydoc_content(self) -> None: # Set JupyterYDoc content and set `dirty = False` to hide the "unsaved # changes" icon in the UI - self.jupyter_ydoc.source = file_data['content'] - self.jupyter_ydoc.dirty = False + jupyter_ydoc.source = file_data['content'] + jupyter_ydoc.dirty = False # Set `_last_modified` timestamp self._last_modified = file_data['last_modified'] - # Finally, set loaded event to inform consumers that the YDoc is ready + # Set loaded event to inform consumers that the YDoc is ready # Also set loading to `False` for consistency and log success - self._ydoc_content_loaded.set() - self._ydoc_content_loading = False + self._content_load_event.set() + self._content_loading = False self.log.info(f"Loaded content for room ID '{self.room_id}'.") + # Start _watch_file() task + self._watch_file_task = self._loop.create_task( + self._watch_file(jupyter_ydoc) + ) + def schedule_save(self) -> None: """ @@ -197,18 +201,16 @@ def schedule_save(self) -> None: """ self._save_scheduled = True - async def _watch_file(self) -> None: + async def _watch_file(self, jupyter_ydoc: YBaseDoc) -> None: """ - Defines a background task that continuously saves the YDoc every 500ms, - checking for out-of-band changes before doing so. + Defines a background task that processes scheduled saves to the YDoc + every 500ms, checking for in-band & out-of-band changes before doing so. - Note that consumers must call `self.schedule_save()` for the next tick + This task is started by a call to `load_ydoc_content()`. + Consumers must call `self.schedule_save()` for the next tick of this task to save. """ - # Wait for content to be loaded before processing scheduled saves - await self._ydoc_content_loaded.wait() - while True: try: await asyncio.sleep(0.5) @@ -218,7 +220,7 @@ async def _watch_file(self) -> None: # cancelled halfway and corrupting the file. We need to # store a reference to the shielded task to prevent it from # being garbage collected (see `asyncio.shield()` docs). - save_task = self._save_jupyter_ydoc() + save_task = self.save_immediately(jupyter_ydoc) await asyncio.shield(save_task) except asyncio.CancelledError: break @@ -231,7 +233,7 @@ async def _watch_file(self) -> None: # occurs repeatedly. await asyncio.sleep(5) - self.log.info( + self.log.debug( "Stopped `self._watch_file()` background task " f"for YRoom '{self.room_id}'." ) @@ -317,18 +319,20 @@ async def _check_file(self): self._on_outofband_change() - async def _save_jupyter_ydoc(self): + async def save_immediately(self, jupyter_ydoc: YBaseDoc): """ - Saves the JupyterYDoc to disk immediately. + Saves the given JupyterYDoc to disk immediately. - This is a private method. Consumers should call - `file_api.schedule_save()` to save the YDoc on the next tick of - the `self._watch_file()` background task. + This method should only be called by consumers if the YDoc needs to be + saved while the FileAPI is stopped, e.g. when the parent room is + stopping. In all other cases, consumers should call `schedule_save()` + instead to save the YDoc on the next tick of the `self._watch_file()` + background task. """ try: # Build arguments to `CM.save()` path = self.get_path() - content = self.jupyter_ydoc.source + content = jupyter_ydoc.source file_format = self.file_format file_type = self.file_type if self.file_type in SAVEABLE_FILE_TYPES else "file" @@ -354,7 +358,7 @@ async def _save_jupyter_ydoc(self): # Set `dirty` to `False` to hide the "unsaved changes" icon in the # JupyterLab tab for this YDoc in the frontend. - self.jupyter_ydoc.dirty = False + jupyter_ydoc.dirty = False except Exception as e: self.log.error("An exception occurred when saving JupyterYDoc.") self.log.exception(e) @@ -365,19 +369,41 @@ def stop(self) -> None: Gracefully stops the `YRoomFileAPI`. This immediately halts the background task saving the YDoc to the `ContentsManager`. - To save the YDoc after stopping, call `await file_api.stop_then_save()` - instead. + To save the YDoc after stopping, call `await + file_api.save_immediately()` after calling this method. """ - self._save_loop_task.cancel() + self.log.info(f"Stopping FileAPI for room '{self.room_id}'.") + if self._watch_file_task: + self._watch_file_task.cancel() + self._stopped = True + @property + def stopped(self) -> bool: + """ + Returns whether the FileAPI has been stopped via the `stop()` method. + """ + return self._stopped - async def stop_then_save(self) -> None: + def restart(self) -> None: """ - Gracefully stops the YRoomFileAPI by calling `self.stop()`, then saves - the content of `self.jupyter_ydoc` before exiting. + Restarts the instance by stopping the `_watch_file()` background task + and clearing its internal state. This can be called before or after the + instance is stopped. + + Consumers should call `load_content_into()` again after this method. """ - self.stop() - await self._save_jupyter_ydoc() + # Stop if not stopped already + if not self.stopped: + self.stop() + + # Reset instance attributes + self._stopped = False + self._content_load_event = asyncio.Event() + self._content_loading = False + self._save_scheduled = False + self._last_modified = None + self._last_path = None + self.log.info(f"Restarted FileAPI for room '{self.room_id}'.") # see https://github.com/jupyterlab/jupyter-collaboration/blob/main/projects/jupyter-server-ydoc/jupyter_server_ydoc/loaders.py#L146-L149 diff --git a/jupyter_server_documents/tests/test_yroom_file_api.py b/jupyter_server_documents/tests/test_yroom_file_api.py index d2e29dd..545ad63 100644 --- a/jupyter_server_documents/tests/test_yroom_file_api.py +++ b/jupyter_server_documents/tests/test_yroom_file_api.py @@ -73,8 +73,8 @@ async def plaintext_file_api(mock_plaintext_file: str, jp_contents_manager: Asyn async def test_load_plaintext_file(plaintext_file_api: Awaitable[YRoomFileAPI], mock_plaintext_file: str): file_api = await plaintext_file_api jupyter_ydoc = file_api.jupyter_ydoc - file_api.load_ydoc_content() - await file_api.ydoc_content_loaded.wait() + file_api.load_content_into(jupyter_ydoc) + await file_api.until_content_loaded # Assert that `get_jupyter_ydoc()` returns a `jupyter_ydoc.YUnicode` object # for plaintext files From c96783610afe140d99b6aa74ef4651599e10e7c3 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Fri, 13 Jun 2025 15:22:27 -0700 Subject: [PATCH 11/21] allow rooms to be restarted, unify stop() methods --- jupyter_server_documents/rooms/yroom.py | 318 ++++++++++-------- .../rooms/yroom_file_api.py | 32 +- .../websockets/clients.py | 24 +- 3 files changed, 213 insertions(+), 161 deletions(-) diff --git a/jupyter_server_documents/rooms/yroom.py b/jupyter_server_documents/rooms/yroom.py index 9779840..1a3db58 100644 --- a/jupyter_server_documents/rooms/yroom.py +++ b/jupyter_server_documents/rooms/yroom.py @@ -82,6 +82,12 @@ class YRoom: set in the constructor. """ + _stopped: bool = False + """ + Whether the YRoom is stopped. Set to `True` when `stop()` is called and set + to `False` when `restart()` is called. + """ + _fileid_manager: BaseFileIdManager _contents_manager: AsyncContentsManager | ContentsManager @@ -104,6 +110,7 @@ def __init__( self._fileid_manager = fileid_manager self._contents_manager = contents_manager self._on_stopping = on_stopping + self._stopped = False # Initialize YjsClientGroup, YDoc, and Awareness self._client_group = YjsClientGroup(room_id=room_id, log=self.log, loop=self._loop) @@ -131,7 +138,7 @@ def __init__( loop=self._loop, fileid_manager=self._fileid_manager, contents_manager=self._contents_manager, - on_outofband_change=self.reload_ydoc, + on_outofband_change=self.handle_outofband_change, on_outofband_move=self.handle_outofband_move, on_inband_deletion=self.handle_inband_deletion ) @@ -292,50 +299,9 @@ async def _process_message_queue(self) -> None: if queue_item is None: break - # Otherwise, process the new message + # Otherwise, process & handle the new message client_id, message = queue_item - - # Determine message type & subtype from header - message_type = message[0] - sync_message_subtype = "*" - # message subtypes only exist on sync messages, hence this condition - if message_type == YMessageType.SYNC and len(message) >= 2: - sync_message_subtype = message[1] - - # Determine if message is invalid - # NOTE: In Python 3.12+, we can drop list(...) call - # according to https://docs.python.org/3/library/enum.html#enum.EnumType.__contains__ - invalid_message_type = message_type not in list(YMessageType) - invalid_sync_message_type = message_type == YMessageType.SYNC and sync_message_subtype not in list(YSyncMessageSubtype) - invalid_message = invalid_message_type or invalid_sync_message_type - - # Handle invalid messages by logging a warning and ignoring - if invalid_message: - self.log.warning( - "Ignoring an unrecognized message with header " - f"'{message_type},{sync_message_subtype}' from client " - f"'{client_id}'. Messages must have one of the following " - "headers: '0,0' (SyncStep1), '0,1' (SyncStep2), " - "'0,2' (SyncUpdate), or '1,*' (AwarenessUpdate)." - ) - # Handle Awareness messages - elif message_type == YMessageType.AWARENESS: - self.log.debug(f"Received AwarenessUpdate from '{client_id}'.") - self.handle_awareness_update(client_id, message) - self.log.debug(f"Handled AwarenessUpdate from '{client_id}'.") - # Handle Sync messages - elif sync_message_subtype == YSyncMessageSubtype.SYNC_STEP1: - self.log.info(f"Received SS1 from '{client_id}'.") - self.handle_sync_step1(client_id, message) - self.log.info(f"Handled SS1 from '{client_id}'.") - elif sync_message_subtype == YSyncMessageSubtype.SYNC_STEP2: - self.log.info(f"Received SS2 from '{client_id}'.") - self.handle_sync_step2(client_id, message) - self.log.info(f"Handled SS2 from '{client_id}'.") - elif sync_message_subtype == YSyncMessageSubtype.SYNC_UPDATE: - self.log.info(f"Received SyncUpdate from '{client_id}'.") - self.handle_sync_update(client_id, message) - self.log.info(f"Handled SyncUpdate from '{client_id}'.") + self.handle_message(client_id, message) # Finally, inform the asyncio Queue that the task was complete # This is required for `self._message_queue.join()` to unblock once @@ -347,6 +313,60 @@ async def _process_message_queue(self) -> None: f"for YRoom '{self.room_id}'." ) + def handle_message(self, client_id: str, message: bytes) -> None: + """ + Handles all messages from every client received in the message queue by + calling the appropriate handler based on the message type. This method + routes the message to one of the following methods: + + - `handle_sync_step1()` + - `handle_sync_step2()` + - `handle_sync_update()` + - `handle_awareness_update()` + """ + + # Determine message type & subtype from header + message_type = message[0] + sync_message_subtype = "*" + # message subtypes only exist on sync messages, hence this condition + if message_type == YMessageType.SYNC and len(message) >= 2: + sync_message_subtype = message[1] + + # Determine if message is invalid + # NOTE: In Python 3.12+, we can drop list(...) call + # according to https://docs.python.org/3/library/enum.html#enum.EnumType.__contains__ + invalid_message_type = message_type not in list(YMessageType) + invalid_sync_message_type = message_type == YMessageType.SYNC and sync_message_subtype not in list(YSyncMessageSubtype) + invalid_message = invalid_message_type or invalid_sync_message_type + + # Handle invalid messages by logging a warning and ignoring + if invalid_message: + self.log.warning( + "Ignoring an unrecognized message with header " + f"'{message_type},{sync_message_subtype}' from client " + f"'{client_id}'. Messages must have one of the following " + "headers: '0,0' (SyncStep1), '0,1' (SyncStep2), " + "'0,2' (SyncUpdate), or '1,*' (AwarenessUpdate)." + ) + # Handle Awareness messages + elif message_type == YMessageType.AWARENESS: + self.log.debug(f"Received AwarenessUpdate from '{client_id}'.") + self.handle_awareness_update(client_id, message) + self.log.debug(f"Handled AwarenessUpdate from '{client_id}'.") + # Handle Sync messages + elif sync_message_subtype == YSyncMessageSubtype.SYNC_STEP1: + self.log.info(f"Received SS1 from '{client_id}'.") + self.handle_sync_step1(client_id, message) + self.log.info(f"Handled SS1 from '{client_id}'.") + elif sync_message_subtype == YSyncMessageSubtype.SYNC_STEP2: + self.log.info(f"Received SS2 from '{client_id}'.") + self.handle_sync_step2(client_id, message) + self.log.info(f"Handled SS2 from '{client_id}'.") + elif sync_message_subtype == YSyncMessageSubtype.SYNC_UPDATE: + self.log.info(f"Received SyncUpdate from '{client_id}'.") + self.handle_sync_update(client_id, message) + self.log.info(f"Handled SyncUpdate from '{client_id}'.") + def handle_sync_step1(self, client_id: str, message: bytes) -> None: """ @@ -595,96 +615,68 @@ def _on_awareness_update(self, type: str, changes: tuple[dict[str, Any], Any]) - def reload_ydoc(self) -> None: """ - Reloads the YDoc from the `ContentsManager`. This method: - - - Is called in response to out-of-band changes. - - - Disconnects all clients with close code 4000. - - - Empties the message queue, as the updates can no longer be applied. - - - Resets `self._ydoc`, `self._awareness`, and `self._jupyter_ydoc`. - - - Resets `self.file_api` to reload the YDoc from the `ContentsManager`. - - This method is deliberately synchronous so it cannot interrupted by - another coroutine. + Alias for `self.restart(close_code=4000, immediately=True)`. + + TODO: Use a designated close code to distinguish YDoc reloads from + out-of-band changes. """ - # Do nothing if this is a global awareness room, since the YDoc is never - # used anyways. - if self.room_id == "JupyterLab:globalAwareness": - return - - # Stop the existing `YRoomFileAPI` immediately - assert self.file_api - self.file_api.stop() + self.restart(close_code=4000, immediately=True) - # Disconnect all clients with close code 4000. - # This is a special code defined by our extension, informing each client - # to purge their existing YDoc & re-connect. - self.clients.close_all(4000) - - # Empty message queue - while not self._message_queue.empty(): - self._message_queue.get_nowait() - self._message_queue.task_done() - - # Remove existing observers - self._ydoc.unobserve(self._ydoc_subscription) - self._awareness.unobserve(self._awareness_subscription) - self._jupyter_ydoc.unobserve() - - # Re-initialize YDoc, YAwareness, JupyterYDoc - self._ydoc = self._init_ydoc() - self._awareness = self._init_awareness(ydoc=self._ydoc) - self._jupyter_ydoc = self._init_jupyter_ydoc( - ydoc=self._ydoc, - awareness=self._awareness - ) - - # Reset `YRoomFileAPI` & reload the document - self.file_api.restart() - self.file_api.load_content_into(self._jupyter_ydoc) + + def handle_outofband_change(self) -> None: + """ + Handles an out-of-band change by restarting the YRoom immediately, + closing all Websockets with close code 4000. - # Emit 'overwrite' event as the YDoc content has been overwritten - if self.events_api: - self.events_api.emit_room_event("overwrite") + See `restart()` for more info. + """ + self.restart(close_code=4000, immediately=True) + - def handle_outofband_move(self) -> None: """ - Handles an out-of-band move/deletion by stopping the YRoom immediately - with close code 4001. + Handles an out-of-band move/deletion by stopping the YRoom immediately, + closing all Websockets with close code 4001. + + See `stop()` for more info. """ - self.stop_immediately(close_code=4001) + self.stop(close_code=4001, immediately=True) def handle_inband_deletion(self) -> None: """ - Handles an in-band file deletion by stopping the YRoom immediately with - close code 4002. + Handles an in-band file deletion by stopping the YRoom immediately, + closing all Websockets with close code 4002. + + See `stop()` for more info. """ - self.stop_immediately(close_code=4002) + self.stop(close_code=4002, immediately=True) - def stop_immediately(self, close_code: int) -> None: + def stop(self, close_code: int = 1001, immediately: bool = False): """ - Stops the YRoom immediately, closing all Websockets with the given - `close_code`. This is similar to `self.stop()` with some key - differences: + Stops the YRoom. This method: + + - Disconnects all clients with the given `close_code`, + defaulting to `1001` (server shutting down) if not given. - - This does not apply any pending YDoc updates from other clients. - - This does not save the file before exiting. + - Removes all observers and stops the `_process_message_queue()` + background task. + + - If `immediately=False` (default), this method will finish applying all + pending updates in the message queue and save the YDoc before returning. + Otherwise, if `immediately=True`, this method will drop all pending + updates and not save the YDoc before returning. - This should be reserved for scenarios where it does not make sense to - apply pending updates or save the file, e.g. when the file has been - deleted from disk. + - Clears the YDoc, Awareness, and JupyterYDoc, freeing their memory to + the server. This deletes the YDoc history. """ - # First, run the 'stopping' callback to inform the consumer + + # TODO: delete dis? if self._on_stopping: self._on_stopping() - - # Disconnect all clients with given `close_code` + + # Disconnect all clients with the given close code self.clients.stop(close_code=close_code) # Remove all observers @@ -692,51 +684,87 @@ def stop_immediately(self, close_code: int) -> None: self._awareness.unobserve(self._awareness_subscription) if self._jupyter_ydoc: self._jupyter_ydoc.unobserve() - - # Purge the message queue immediately, dropping all queued messages + + # Empty the message queue based on `immediately` argument while not self._message_queue.empty(): - self._message_queue.get_nowait() - self._message_queue.task_done() + if immediately: + self._message_queue.get_nowait() + self._message_queue.task_done() + else: + client_id, message = self._message.queue.get_nowait() + self.handle_message(client_id, message) - # Enqueue `None` to stop the `_process_message_queue()` background task + # Stop the `_process_message_queue` task by enqueueing `None` self._message_queue.put_nowait(None) + + # Return early if the room is not a document room, as no more action is + # needed. + if not self.file_api or not self._jupyter_ydoc: + return - # Stop FileAPI immediately (without saving) - if self.file_api: - self.file_api.stop() + # Otherwise, stop the file API. + self.file_api.stop() + + # Clear the YDoc, saving beforehand unless `immediately=True` + if immediately: + self._clear_ydoc() + else: + # TODO: how to handle restarts after this safely? + self._loop.create_task( + self._save_then_clear_ydoc() + ) + + self._stopped = True + def _clear_ydoc(self): + """ + Clears the YDoc, awareness, and JupyterYDoc, freeing their memory to the + server. This deletes the YDoc history. + """ + self._ydoc = self._init_ydoc() + self._awareness = self._init_awareness(ydoc=self._ydoc) + self._jupyter_ydoc = self._init_jupyter_ydoc( + ydoc=self._ydoc, + awareness=self._awareness + ) - async def stop(self) -> None: + async def _save_then_clear_ydoc(self): + await self.file_api.save(self._jupyter_ydoc) + self._clear_ydoc() + + @property + def stopped(self) -> bool: """ - Stops the YRoom gracefully by disconnecting all clients with close code - 1001, applying all pending updates, and saving the YDoc before exiting. + Returns whether the room is stopped. """ - # First, run the 'stopping' callback to inform the consumer - if self._on_stopping: - self._on_stopping() + return self._stopped + + def restart(self, close_code: int = 1001, immediately: bool = False): + """ + Restarts the YRoom. This method re-initializes & reloads the YDoc, + Awareness, and the JupyterYDoc. After this method is called, this + instance behaves as if it were just initialized. - # Disconnect all clients by stopping the client group. - self.clients.stop() - - # Remove all observers, as updates no longer need to be broadcast - self._ydoc.unobserve(self._ydoc_subscription) - self._awareness.unobserve(self._awareness_subscription) - if self._jupyter_ydoc: - self._jupyter_ydoc.unobserve() + If the YRoom was not stopped beforehand, then `self.stop(close_code, + immediately)` with the given arguments. Otherwise, `close_code` and + `immediately` are ignored. + """ + # Stop if not stopped already, then reset `stopped` state + if not self._stopped: + self.stop(close_code=close_code, immediately=immediately) + self._stopped = False - # Finish processing all messages, then enqueue `None` to stop the - # `_process_message_queue()` background task. - await self._message_queue.join() - self._message_queue.put_nowait(None) + # Restart client group + self.clients.restart() - # Stop FileAPI, saving the content after doing so - if self.file_api: - self.file_api.stop() - await self.file_api.save_immediately(self._jupyter_ydoc) + # Restart `YRoomFileAPI` & reload the document + self.file_api.restart() + self.file_api.load_content_into(self._jupyter_ydoc) + # Restart `_process_message_queue()` task + self._loop.create_task(self._process_message_queue()) - def should_ignore_state_update(event: pycrdt.MapEvent) -> bool: """ Returns whether an update to the `state` dictionary should be ignored by diff --git a/jupyter_server_documents/rooms/yroom_file_api.py b/jupyter_server_documents/rooms/yroom_file_api.py index 97ce5f7..9e858c6 100644 --- a/jupyter_server_documents/rooms/yroom_file_api.py +++ b/jupyter_server_documents/rooms/yroom_file_api.py @@ -15,14 +15,16 @@ class YRoomFileAPI: """ Provides an API to 1 file from Jupyter Server's ContentsManager for a YRoom, - given the the room's JupyterYDoc and ID in the constructor. + given the the room ID in the constructor. - To load the content, consumers should call `file_api.load_ydoc_content()`, - then `await file_api.ydoc_content_loaded` before performing any operations - on the YDoc. + - To load the content, consumers call `load_content_into()` with a + JupyterYDoc. This also starts the `_watch_file()` loop. - To save a JupyterYDoc to the file, call - `file_api.schedule_save(jupyter_ydoc)`. + - Consumers should `await file_api.until_content_loaded` before performing + any operations on the YDoc. + + - To save a JupyterYDoc to the file, call + `file_api.schedule_save(jupyter_ydoc)` after calling `load_content_into()`. """ # See `filemanager.py` in `jupyter_server` for references on supported file @@ -220,7 +222,7 @@ async def _watch_file(self, jupyter_ydoc: YBaseDoc) -> None: # cancelled halfway and corrupting the file. We need to # store a reference to the shielded task to prevent it from # being garbage collected (see `asyncio.shield()` docs). - save_task = self.save_immediately(jupyter_ydoc) + save_task = self.save(jupyter_ydoc) await asyncio.shield(save_task) except asyncio.CancelledError: break @@ -319,15 +321,15 @@ async def _check_file(self): self._on_outofband_change() - async def save_immediately(self, jupyter_ydoc: YBaseDoc): + async def save(self, jupyter_ydoc: YBaseDoc): """ - Saves the given JupyterYDoc to disk immediately. + Saves the given JupyterYDoc to disk. This method works even if the + FileAPI is stopped. This method should only be called by consumers if the YDoc needs to be saved while the FileAPI is stopped, e.g. when the parent room is stopping. In all other cases, consumers should call `schedule_save()` - instead to save the YDoc on the next tick of the `self._watch_file()` - background task. + instead. """ try: # Build arguments to `CM.save()` @@ -386,11 +388,11 @@ def stopped(self) -> bool: def restart(self) -> None: """ - Restarts the instance by stopping the `_watch_file()` background task - and clearing its internal state. This can be called before or after the - instance is stopped. + Restarts the instance by stopping if the room is not stopped, then + clearing its internal state. - Consumers should call `load_content_into()` again after this method. + Consumers should call `load_content_into()` again after this method to + restart the `_watch_file()` task. """ # Stop if not stopped already if not self.stopped: diff --git a/jupyter_server_documents/websockets/clients.py b/jupyter_server_documents/websockets/clients.py index ac9244c..c9a01d0 100644 --- a/jupyter_server_documents/websockets/clients.py +++ b/jupyter_server_documents/websockets/clients.py @@ -201,7 +201,8 @@ def close_all(self, close_code: int): def stop(self, close_code: int = 1001): """ Closes all Websocket connections with the given close code, removes all - clients from this group, and ignores any future calls to `add()`. + clients from this group. Future calls to `add()` are ignored until the + client group is restarted via `restart()`. If a close code is not specified, it defaults to 1001 (indicates server shutting down). @@ -211,4 +212,25 @@ def stop(self, close_code: int = 1001): # Set `_stopped` to `True` to ignore future calls to `add()` self._stopped = True + + @property + def stopped(self): + """ + Returns whether the client group is stopped. + """ + + return self._stopped + + def restart(self, close_code: int = 1001): + """ + Restarts the client group by setting `stopped` to `False`. Future calls + to `add()` will *not* be ignored after this method is called. + + If the client group is not stopped, `self.stop(close_code)` will be + called with the given argument. Otherwise, `close_code` will be ignored. + """ + if not self.stopped: + self.stop(close_code=close_code) + + self._stopped = False \ No newline at end of file From df8007874b3a152928acdcb42ee25ff8d1cfb669 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Fri, 13 Jun 2025 15:58:41 -0700 Subject: [PATCH 12/21] implement contents lock to prevent overlapping reads/writes --- jupyter_server_documents/rooms/yroom.py | 15 ++++- .../rooms/yroom_file_api.py | 56 +++++++++++++------ 2 files changed, 51 insertions(+), 20 deletions(-) diff --git a/jupyter_server_documents/rooms/yroom.py b/jupyter_server_documents/rooms/yroom.py index 1a3db58..2c8a4c8 100644 --- a/jupyter_server_documents/rooms/yroom.py +++ b/jupyter_server_documents/rooms/yroom.py @@ -308,7 +308,7 @@ async def _process_message_queue(self) -> None: # queue is empty in `self.stop()`. self._message_queue.task_done() - self.log.info( + self.log.debug( "Stopped `self._process_message_queue()` background task " f"for YRoom '{self.room_id}'." ) @@ -709,13 +709,13 @@ def stop(self, close_code: int = 1001, immediately: bool = False): if immediately: self._clear_ydoc() else: - # TODO: how to handle restarts after this safely? self._loop.create_task( self._save_then_clear_ydoc() ) self._stopped = True + def _clear_ydoc(self): """ Clears the YDoc, awareness, and JupyterYDoc, freeing their memory to the @@ -727,11 +727,19 @@ def _clear_ydoc(self): ydoc=self._ydoc, awareness=self._awareness ) + async def _save_then_clear_ydoc(self): + """ + Saves the JupyterYDoc, then calls `self._clear_ydoc()`. + + This can be run safely in the background because the FileAPI uses an + lock to prevent overlapping reads & writes to a single file. + """ await self.file_api.save(self._jupyter_ydoc) self._clear_ydoc() - + + @property def stopped(self) -> bool: """ @@ -739,6 +747,7 @@ def stopped(self) -> bool: """ return self._stopped + def restart(self, close_code: int = 1001, immediately: bool = False): """ Restarts the YRoom. This method re-initializes & reloads the YDoc, diff --git a/jupyter_server_documents/rooms/yroom_file_api.py b/jupyter_server_documents/rooms/yroom_file_api.py index 9e858c6..d87875c 100644 --- a/jupyter_server_documents/rooms/yroom_file_api.py +++ b/jupyter_server_documents/rooms/yroom_file_api.py @@ -36,7 +36,18 @@ class YRoomFileAPI: log: logging.Logger _fileid_manager: BaseFileIdManager + """ + Stores a reference to the Jupyter Server's File ID Manager. + """ + _contents_manager: AsyncContentsManager | ContentsManager + """ + Stores a reference to the Jupyter Server's ContentsManager. + + NOTE: any calls made on this attribute should acquire & release the + `_content_lock`. See `_content_lock` for more info. + """ + _loop: asyncio.AbstractEventLoop _save_scheduled: bool _content_loading: bool @@ -81,6 +92,13 @@ class YRoomFileAPI: not running. """ + _content_lock: asyncio.Lock + """ + An `asyncio.Lock` that ensures `ContentsManager` calls reading/writing for a + single file do not overlap. This prevents file corruption scenarios like + dual-writes or dirty-reads. + """ + def __init__( self, *, @@ -108,9 +126,10 @@ def __init__( self._last_modified = None self._stopped = False - # Initialize loading & loaded states + # Initialize content-related primitives self._content_loading = False self._content_load_event = asyncio.Event() + self._content_lock = asyncio.Lock() def get_path(self) -> str | None: @@ -169,11 +188,12 @@ async def _load_content(self, jupyter_ydoc: YBaseDoc) -> None: # Load the content of the file from the path self.log.info(f"Loading content for room ID '{self.room_id}', found at path: '{path}'.") - file_data = await ensure_async(self._contents_manager.get( - path, - type=self.file_type, - format=self.file_format - )) + async with self._content_lock: + file_data = await ensure_async(self._contents_manager.get( + path, + type=self.file_type, + format=self.file_format + )) # Set JupyterYDoc content and set `dirty = False` to hide the "unsaved # changes" icon in the UI @@ -289,9 +309,10 @@ async def _check_file(self): # If this raises `HTTPError(404)`, that indicates the file was # moved/deleted out-of-band. try: - file_data = await ensure_async(self._contents_manager.get( - path=path, format=file_format, type=file_type, content=False - )) + async with self._content_lock: + file_data = await ensure_async(self._contents_manager.get( + path=path, format=file_format, type=file_type, content=False + )) except HTTPError as e: # If not 404, re-raise the exception as it is unknown if (e.status_code != 404): @@ -344,14 +365,15 @@ async def save(self, jupyter_ydoc: YBaseDoc): self._save_scheduled = False # Save the YDoc via the ContentsManager - file_data = await ensure_async(self._contents_manager.save( - { - "format": file_format, - "type": file_type, - "content": content, - }, - path - )) + async with self._content_lock: + file_data = await ensure_async(self._contents_manager.save( + { + "format": file_format, + "type": file_type, + "content": content, + }, + path + )) # Set most recent `last_modified` timestamp if file_data['last_modified']: From 734a109741019c78a7ffca4d78211c8e8445770c Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Fri, 13 Jun 2025 15:59:32 -0700 Subject: [PATCH 13/21] update YRoomManager to use new stop() API --- .../rooms/yroom_manager.py | 36 +++++++++---------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/jupyter_server_documents/rooms/yroom_manager.py b/jupyter_server_documents/rooms/yroom_manager.py index 444ee21..2db7942 100644 --- a/jupyter_server_documents/rooms/yroom_manager.py +++ b/jupyter_server_documents/rooms/yroom_manager.py @@ -139,10 +139,11 @@ def _handle_yroom_stopping(self, room_id: str) -> None: self._rooms_by_id.pop(room_id, None) - async def delete_room(self, room_id: str) -> None: + def delete_room(self, room_id: str) -> None: """ - Gracefully deletes a YRoom given a room ID. This stops the YRoom first, - which finishes applying all updates & saves the content automatically. + Gracefully deletes a YRoom given a room ID. This stops the YRoom, + closing all Websockets, applying remaining updates, and saves the final + content of the YDoc in a background task. Returns `True` if the room was deleted successfully. Returns `False` if an exception was raised. @@ -153,12 +154,12 @@ async def delete_room(self, room_id: str) -> None: self.log.info(f"Stopping YRoom '{room_id}'.") try: - await yroom.stop() - self.log.info(f"Stopped YRoom '{room_id}'.") + yroom.stop() return True except Exception as e: - self.log.error(f"Exception raised when stopping YRoom '{room_id}:") - self.log.exception(e) + self.log.exception( + f"Exception raised when stopping YRoom '{room_id}: " + ) return False async def _watch_rooms(self) -> None: @@ -223,35 +224,30 @@ async def _watch_rooms(self) -> None: self._inactive_rooms.add(room_id) - async def stop(self) -> None: + def stop(self) -> None: """ Gracefully deletes each `YRoom`. See `delete_room()` for more info. """ # First, stop all background tasks self._watch_rooms_task.cancel() - # Get all room IDs. If there are none, return early, as all rooms are - # already stopped. + # Get all room IDs. If there are none, return early. room_ids = list(self._rooms_by_id.keys()) room_count = len(room_ids) if room_count == 0: return - # Delete rooms in parallel. - # Note that we do not use `asyncio.TaskGroup` here because that cancels - # all other tasks when any task raises an exception. + # Otherwise, delete all rooms. self.log.info( f"Stopping `YRoomManager` and deleting all {room_count} YRooms." ) - deletion_tasks = [] + failures = 0 for room_id in room_ids: - dt = asyncio.create_task(self.delete_room(room_id)) - deletion_tasks.append(dt) - - # Use returned values to log success/failure of room deletion - results: list[bool] = await asyncio.gather(*deletion_tasks) - failures = results.count(False) + result = self.delete_room(room_id) + if not result: + failures += 1 + # Log the aggregate status before returning. if failures: self.log.error( "An exception occurred when stopping `YRoomManager`. " From 8ffae7bdd91c03dafc2e3168bbc9c41108e016c8 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Fri, 13 Jun 2025 16:26:36 -0700 Subject: [PATCH 14/21] stop inactive rooms instead of deleting --- jupyter_server_documents/rooms/yroom.py | 15 +- .../rooms/yroom_manager.py | 142 ++++++++---------- 2 files changed, 63 insertions(+), 94 deletions(-) diff --git a/jupyter_server_documents/rooms/yroom.py b/jupyter_server_documents/rooms/yroom.py index 2c8a4c8..1252703 100644 --- a/jupyter_server_documents/rooms/yroom.py +++ b/jupyter_server_documents/rooms/yroom.py @@ -76,12 +76,6 @@ class YRoom: _ydoc_subscription: pycrdt.Subscription """Subscription to YDoc changes.""" - _on_stopping: callable[[], Any] | None - """ - Callback to run as soon as `stop()` or `stop_immediately()` are called. Only - set in the constructor. - """ - _stopped: bool = False """ Whether the YRoom is stopped. Set to `True` when `stop()` is called and set @@ -101,7 +95,6 @@ def __init__( fileid_manager: BaseFileIdManager, contents_manager: AsyncContentsManager | ContentsManager, event_logger: EventLogger, - on_stopping: callable[[], Any] | None = None, ): # Bind instance attributes self.room_id = room_id @@ -109,7 +102,6 @@ def __init__( self._loop = loop self._fileid_manager = fileid_manager self._contents_manager = contents_manager - self._on_stopping = on_stopping self._stopped = False # Initialize YjsClientGroup, YDoc, and Awareness @@ -671,11 +663,6 @@ def stop(self, close_code: int = 1001, immediately: bool = False): - Clears the YDoc, Awareness, and JupyterYDoc, freeing their memory to the server. This deletes the YDoc history. """ - - # TODO: delete dis? - if self._on_stopping: - self._on_stopping() - # Disconnect all clients with the given close code self.clients.stop(close_code=close_code) @@ -733,7 +720,7 @@ async def _save_then_clear_ydoc(self): """ Saves the JupyterYDoc, then calls `self._clear_ydoc()`. - This can be run safely in the background because the FileAPI uses an + This can be run safely in the background because the FileAPI uses a lock to prevent overlapping reads & writes to a single file. """ await self.file_api.save(self._jupyter_ydoc) diff --git a/jupyter_server_documents/rooms/yroom_manager.py b/jupyter_server_documents/rooms/yroom_manager.py index 2db7942..7dc9044 100644 --- a/jupyter_server_documents/rooms/yroom_manager.py +++ b/jupyter_server_documents/rooms/yroom_manager.py @@ -13,28 +13,25 @@ class YRoomManager(): """ A singleton that manages all `YRoom` instances in the server extension. This - automatically deletes `YRoom` instances if they have had no connected - clients or active kernel for >10 seconds. - - Because rooms may be deleted due to inactivity, consumers should only store - a reference to the room ID and call `get_room(room_id)` each time a - reference to the room is needed. See `get_room()` for more details. + automatically stops `YRoom` instances if they have had no connected clients + or active kernel for >10 seconds. """ _rooms_by_id: dict[str, YRoom] """ - Dictionary of active `YRoom` instances, keyed by room ID. + Dictionary of active `YRoom` instances, keyed by room ID. Rooms are never + deleted from this dictionary, even if stopped due to inactivity. - It is guaranteed that if a room is present in the dictionary, the room is - not currently stopping. This is ensured by `_handle_yroom_stopping()`. + TODO: Delete a room if its file was deleted in/out-of-band or moved + out-of-band. See #116. """ _inactive_rooms: set[str] """ Set of room IDs that were marked inactive on the last iteration of `_watch_rooms()`. If a room is inactive and its ID is present in this set, - then the room has been inactive for >10 seconds, and the room should be - deleted in `_watch_rooms()`. + then the room the room should be stopped as it has been inactive for >10 + seconds. """ _get_fileid_manager: callable[[], BaseFileIdManager] @@ -80,23 +77,16 @@ def get_room(self, room_id: str) -> YRoom | None: """ Returns the `YRoom` instance for a given room ID. If the instance does not exist, this method will initialize one and return it. Otherwise, - this method returns the instance from its cache, ensuring that this - method is fast in almost all cases. - - Consumers should always call this method each time a reference to the - `YRoom` is needed, since rooms may be deleted due to inactivity. - - This method also ensures that the returned room will be alive for >10 - seconds. This prevents the room from being deleted shortly after the - consumer receives it via this method, even if it was inactive. + this method returns the instance from its cache. """ # First, ensure this room stays open for >10 seconds by removing it from # the inactive set of rooms if it is present. self._inactive_rooms.discard(room_id) # If room exists, return the room - if room_id in self._rooms_by_id: - return self._rooms_by_id[room_id] + yroom = self._rooms_by_id.get(room_id, None) + if yroom: + return yroom # Otherwise, create a new room try: @@ -107,7 +97,6 @@ def get_room(self, room_id: str) -> YRoom | None: loop=self.loop, fileid_manager=self.fileid_manager, contents_manager=self.contents_manager, - on_stopping=lambda: self._handle_yroom_stopping(room_id), event_logger=self.event_logger, ) self._rooms_by_id[room_id] = yroom @@ -119,6 +108,7 @@ def get_room(self, room_id: str) -> YRoom | None: ) return None + def has_room(self, room_id: str) -> bool: """ Returns whether a `YRoom` instance with a matching `room_id` already @@ -126,24 +116,13 @@ def has_room(self, room_id: str) -> bool: """ return room_id in self._rooms_by_id - def _handle_yroom_stopping(self, room_id: str) -> None: - """ - Callback that is run when the YRoom starts stopping. This callback: - - - Ensures the room is removed from the dictionary, even if the room was - stopped directly without `YRoomManager.delete_room()`. - - Prevents future connections to the stopping room and allows its memory - to be freed once complete. - """ - self._rooms_by_id.pop(room_id, None) - - def delete_room(self, room_id: str) -> None: """ Gracefully deletes a YRoom given a room ID. This stops the YRoom, - closing all Websockets, applying remaining updates, and saves the final - content of the YDoc in a background task. + closing all Websockets with close code 1001 (server shutting down), + applying remaining updates, and saving the final content of the YDoc in + a background task. Returns `True` if the room was deleted successfully. Returns `False` if an exception was raised. @@ -165,65 +144,68 @@ def delete_room(self, room_id: str) -> None: async def _watch_rooms(self) -> None: """ Background task that checks all `YRoom` instances every 10 seconds, - deleting any rooms that have been inactive for >10 seconds. + stopping any rooms that have been inactive for >10 seconds. - - For rooms providing notebooks: This task deletes the room if it has no + - For rooms providing notebooks: This task stops the room if it has no connected clients and its kernel execution status is either 'idle' or 'dead'. - - For all other rooms: This task deletes the room if it has no connected + - For all other rooms: This task stops the room if it has no connected clients. """ while True: # Check every 10 seconds await asyncio.sleep(10) - # Get all room IDs from the room dictionary in advance, as the - # dictionary will mutate if rooms are deleted. + # Get all room IDs, except for the global awareness room room_ids = set(self._rooms_by_id.keys()) - - # Remove the global awareness room ID from this set, as that room - # should not be stopped until the server extension is stopped. room_ids.discard("JupyterLab:globalAwareness") - # Iterate through all rooms. If any rooms are empty, stop the rooms. + # Iterate through all rooms. If any rooms are empty, stop the room. for room_id in room_ids: - # Continue if `room_id` is not in the rooms dictionary. This - # happens if the room was stopped by something else while this - # `for` loop is still running, so we must check. - if room_id not in self._rooms_by_id: - self._inactive_rooms.discard(room_id) - continue - - # Continue if the room has any connected clients. - room = self._rooms_by_id[room_id] - if room.clients.count != 0: - self._inactive_rooms.discard(room_id) - continue - - # Continue if the room contains a notebook with kernel execution - # state neither 'idle' nor 'dead'. - # In this case, the notebook kernel may still be running code - # cells, so the room should not be closed. - awareness = room.get_awareness().get_local_state() or {} - execution_state = awareness.get("kernel", {}).get("execution_state", None) - if execution_state not in { "idle", "dead", None }: - self._inactive_rooms.discard(room_id) - continue - - # The room is inactive if this statement is reached - # Delete the room if was marked as inactive in the last - # iteration, otherwise mark it as inactive. - if room_id in self._inactive_rooms: - self.log.info( - f"YRoom '{room_id}' has been inactive for >10 seconds. " - ) - self.loop.create_task(self.delete_room(room_id)) - self._inactive_rooms.discard(room_id) - else: - self._inactive_rooms.add(room_id) + self._check_room(room_id) + def _check_room(self, room_id: str) -> None: + """ + Checks a room for inactivity. + + - If a room is inactive and not in `_inactive_rooms`, this method adds + the room to `_inactive_rooms`. + + - If a room is inactive and is listed in `_inactive_rooms`, this method + stops the room, as it has been inactive for 2 consecutive iterations of + `_watch_rooms()`. + """ + # Do nothing if the room has any connected clients. + room = self._rooms_by_id[room_id] + if room.clients.count != 0: + self._inactive_rooms.discard(room_id) + return + + # Do nothing if the room contains a notebook with kernel execution state + # neither 'idle' nor 'dead'. + # In this case, the notebook kernel may still be running code cells, so + # the room should not be closed. + awareness = room.get_awareness().get_local_state() or {} + execution_state = awareness.get("kernel", {}).get("execution_state", None) + if execution_state not in { "idle", "dead", None }: + self._inactive_rooms.discard(room_id) + return + + # The room is inactive if this line is reached. + # Stop the room if was marked as inactive in the last iteration, + # otherwise mark it as inactive. + if room_id in self._inactive_rooms: + self.log.info( + f"YRoom '{room_id}' has been inactive for >10 seconds. " + ) + room.stop() + self._inactive_rooms.discard(room_id) + else: + self._inactive_rooms.add(room_id) + + def stop(self) -> None: """ Gracefully deletes each `YRoom`. See `delete_room()` for more info. From 37b8df0a44589699b9ef30be5ababa3c322bb0d0 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Fri, 13 Jun 2025 16:46:14 -0700 Subject: [PATCH 15/21] fix extension stopping --- jupyter_server_documents/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jupyter_server_documents/app.py b/jupyter_server_documents/app.py index 6b2b631..1d53aa1 100644 --- a/jupyter_server_documents/app.py +++ b/jupyter_server_documents/app.py @@ -101,5 +101,5 @@ def _link_jupyter_server_extension(self, server_app): async def stop_extension(self): self.log.info("Stopping `jupyter_server_documents` server extension.") if self.yroom_manager: - await self.yroom_manager.stop() + self.yroom_manager.stop() self.log.info("`jupyter_server_documents` server extension is shut down. Goodbye!") From 70aceb943d4db591e246448a86a8617ddddff804 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Fri, 13 Jun 2025 17:19:07 -0700 Subject: [PATCH 16/21] restart inactive rooms instead of deleting --- jupyter_server_documents/rooms/yroom.py | 33 ++++++++++-- .../rooms/yroom_file_api.py | 1 - .../rooms/yroom_manager.py | 54 +++++++++++-------- 3 files changed, 62 insertions(+), 26 deletions(-) diff --git a/jupyter_server_documents/rooms/yroom.py b/jupyter_server_documents/rooms/yroom.py index 1252703..3a2a024 100644 --- a/jupyter_server_documents/rooms/yroom.py +++ b/jupyter_server_documents/rooms/yroom.py @@ -76,12 +76,17 @@ class YRoom: _ydoc_subscription: pycrdt.Subscription """Subscription to YDoc changes.""" - _stopped: bool = False + _stopped: bool """ Whether the YRoom is stopped. Set to `True` when `stop()` is called and set to `False` when `restart()` is called. """ + _updated: bool + """ + See `self.updated` for more info. + """ + _fileid_manager: BaseFileIdManager _contents_manager: AsyncContentsManager | ContentsManager @@ -103,6 +108,7 @@ def __init__( self._fileid_manager = fileid_manager self._contents_manager = contents_manager self._stopped = False + self._updated = False # Initialize YjsClientGroup, YDoc, and Awareness self._client_group = YjsClientGroup(room_id=room_id, log=self.log, loop=self._loop) @@ -511,8 +517,9 @@ def _on_jupyter_ydoc_update(self, updated_key: str, event: Any) -> None: map_event = cast(pycrdt.MapEvent, event) if should_ignore_state_update(map_event): return - - # Otherwise, save the file + + # Otherwise, a change was made. Set `updated=True` and save the file + self._updated = True self.file_api.schedule_save() @@ -663,6 +670,8 @@ def stop(self, close_code: int = 1001, immediately: bool = False): - Clears the YDoc, Awareness, and JupyterYDoc, freeing their memory to the server. This deletes the YDoc history. """ + self.log.info(f"Stopping YRoom '{self.room_id}'.") + # Disconnect all clients with the given close code self.clients.stop(close_code=close_code) @@ -734,6 +743,17 @@ def stopped(self) -> bool: """ return self._stopped + @property + def updated(self) -> bool: + """ + Returns whether the room has been updated since the last restart, or + since initialization if the room was not restarted. + + This initializes to `False` and is set to `True` whenever a meaningful + update that needs to be saved occurs. This is reset to `False` when + `restart()` is called. + """ + return self._updated def restart(self, close_code: int = 1001, immediately: bool = False): """ @@ -745,10 +765,13 @@ def restart(self, close_code: int = 1001, immediately: bool = False): immediately)` with the given arguments. Otherwise, `close_code` and `immediately` are ignored. """ - # Stop if not stopped already, then reset `stopped` state + # Stop if not stopped already if not self._stopped: self.stop(close_code=close_code, immediately=immediately) + + # Reset internal state self._stopped = False + self._updated = False # Restart client group self.clients.restart() @@ -759,6 +782,8 @@ def restart(self, close_code: int = 1001, immediately: bool = False): # Restart `_process_message_queue()` task self._loop.create_task(self._process_message_queue()) + + self.log.info(f"Restarted YRoom '{self.room_id}'.") def should_ignore_state_update(event: pycrdt.MapEvent) -> bool: diff --git a/jupyter_server_documents/rooms/yroom_file_api.py b/jupyter_server_documents/rooms/yroom_file_api.py index d87875c..d93d584 100644 --- a/jupyter_server_documents/rooms/yroom_file_api.py +++ b/jupyter_server_documents/rooms/yroom_file_api.py @@ -396,7 +396,6 @@ def stop(self) -> None: To save the YDoc after stopping, call `await file_api.save_immediately()` after calling this method. """ - self.log.info(f"Stopping FileAPI for room '{self.room_id}'.") if self._watch_file_task: self._watch_file_task.cancel() self._stopped = True diff --git a/jupyter_server_documents/rooms/yroom_manager.py b/jupyter_server_documents/rooms/yroom_manager.py index 7dc9044..f93193e 100644 --- a/jupyter_server_documents/rooms/yroom_manager.py +++ b/jupyter_server_documents/rooms/yroom_manager.py @@ -12,15 +12,17 @@ class YRoomManager(): """ - A singleton that manages all `YRoom` instances in the server extension. This - automatically stops `YRoom` instances if they have had no connected clients - or active kernel for >10 seconds. + A singleton that manages all `YRoom` instances in the server extension. + + This manager automatically restarts updated `YRoom` instances if they have + had no connected clients or active kernel for >10 seconds. This deletes the + YDoc history to free its memory to the server. """ _rooms_by_id: dict[str, YRoom] """ Dictionary of active `YRoom` instances, keyed by room ID. Rooms are never - deleted from this dictionary, even if stopped due to inactivity. + deleted from this dictionary. TODO: Delete a room if its file was deleted in/out-of-band or moved out-of-band. See #116. @@ -30,8 +32,7 @@ class YRoomManager(): """ Set of room IDs that were marked inactive on the last iteration of `_watch_rooms()`. If a room is inactive and its ID is present in this set, - then the room the room should be stopped as it has been inactive for >10 - seconds. + then the room should be restarted as it has been inactive for >10 seconds. """ _get_fileid_manager: callable[[], BaseFileIdManager] @@ -79,8 +80,7 @@ def get_room(self, room_id: str) -> YRoom | None: not exist, this method will initialize one and return it. Otherwise, this method returns the instance from its cache. """ - # First, ensure this room stays open for >10 seconds by removing it from - # the inactive set of rooms if it is present. + # First, ensure the room is not considered inactive. self._inactive_rooms.discard(room_id) # If room exists, return the room @@ -144,14 +144,16 @@ def delete_room(self, room_id: str) -> None: async def _watch_rooms(self) -> None: """ Background task that checks all `YRoom` instances every 10 seconds, - stopping any rooms that have been inactive for >10 seconds. + restarting any updated rooms that have been inactive for >10 seconds. + This frees the memory occupied by the room's YDoc history, discarding it + in the process. - - For rooms providing notebooks: This task stops the room if it has no - connected clients and its kernel execution status is either 'idle' or - 'dead'. + - For rooms providing notebooks: This task restarts the room if it has + been updated, has no connected clients, and its kernel execution status + is either 'idle' or 'dead'. - - For all other rooms: This task stops the room if it has no connected - clients. + - For all other rooms: This task restarts the room if it has been + updated and has no connected clients. """ while True: # Check every 10 seconds @@ -161,7 +163,7 @@ async def _watch_rooms(self) -> None: room_ids = set(self._rooms_by_id.keys()) room_ids.discard("JupyterLab:globalAwareness") - # Iterate through all rooms. If any rooms are empty, stop the room. + # Check all rooms and restart it if inactive for >10 seconds. for room_id in room_ids: self._check_room(room_id) @@ -170,12 +172,15 @@ def _check_room(self, room_id: str) -> None: """ Checks a room for inactivity. + - Rooms that have not been updated are not restarted, as there is no + YDoc history to free. + - If a room is inactive and not in `_inactive_rooms`, this method adds the room to `_inactive_rooms`. - If a room is inactive and is listed in `_inactive_rooms`, this method - stops the room, as it has been inactive for 2 consecutive iterations of - `_watch_rooms()`. + restarts the room, as it has been inactive for 2 consecutive iterations + of `_watch_rooms()`. """ # Do nothing if the room has any connected clients. room = self._rooms_by_id[room_id] @@ -192,15 +197,22 @@ def _check_room(self, room_id: str) -> None: if execution_state not in { "idle", "dead", None }: self._inactive_rooms.discard(room_id) return + + # Do nothing if the room has not been updated. This prevents empty rooms + # from being restarted every 10 seconds. + if not room.updated: + self._inactive_rooms.discard(room_id) + return - # The room is inactive if this line is reached. - # Stop the room if was marked as inactive in the last iteration, + # The room is updated (with history) & inactive if this line is reached. + # Restart the room if was marked as inactive in the last iteration, # otherwise mark it as inactive. if room_id in self._inactive_rooms: self.log.info( - f"YRoom '{room_id}' has been inactive for >10 seconds. " + f"Room '{room_id}' has been inactive for >10 seconds. " + "Restarting the room to free memory occupied by its history." ) - room.stop() + room.restart() self._inactive_rooms.discard(room_id) else: self._inactive_rooms.add(room_id) From 77e48521f2f3db919d5998beb3e8040a3c4af32f Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Fri, 13 Jun 2025 17:49:43 -0700 Subject: [PATCH 17/21] revert changes to kernel client --- .../kernels/kernel_client.py | 83 +++---------------- jupyter_server_documents/session_manager.py | 5 +- 2 files changed, 12 insertions(+), 76 deletions(-) diff --git a/jupyter_server_documents/kernels/kernel_client.py b/jupyter_server_documents/kernels/kernel_client.py index f61e7fe..658087a 100644 --- a/jupyter_server_documents/kernels/kernel_client.py +++ b/jupyter_server_documents/kernels/kernel_client.py @@ -1,9 +1,9 @@ """ A new Kernel client that is aware of ydocuments. """ -from __future__ import annotations import anyio import asyncio +import json import typing as t from traitlets import Set, Instance, Any, Type, default @@ -16,9 +16,6 @@ from .kernel_client_abc import AbstractDocumentAwareKernelClient -if t.TYPE_CHECKING: - from jupyter_server_documents.rooms.yroom_manager import YRoomManager - class DocumentAwareKernelClient(AsyncKernelClient): """ @@ -42,6 +39,11 @@ def _default_message_cache(self): # message is received. _listeners = Set(allow_none=True) + # A set of YRooms that will intercept output and kernel + # status messages. + _yrooms: t.Set[YRoom] = Set(trait=Instance(YRoom), default_value=set()) + + output_processor = Instance( OutputProcessor, allow_none=True @@ -56,24 +58,6 @@ def _default_message_cache(self): def _default_output_processor(self) -> OutputProcessor: self.log.info("Creating output processor") return self.output_process_class(parent=self, config=self.config) - - _yroom_manager: YRoomManager | None - """ - The YRoomManager registered via `self.bind_yroom_manager()`, which must be - called before adding any `YRoom` via `self.add_yroom()`. - """ - - _yroom_ids: set[str] - """ - The set of room IDs that are registered with this kernel client. This class - stores room IDs instead of `YRoom` instances because `YRoom` instances may - be deleted once inactive. - """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._yroom_manager = None - self._yroom_ids = set() async def start_listening(self): """Start listening to messages coming from the kernel. @@ -302,63 +286,18 @@ async def handle_document_related_message(self, msg: t.List[bytes]) -> t.Optiona # Default return if message is processed and does not need forwarding return msg - @property - def _yrooms(self) -> list[YRoom]: - """ - Returns the list of YRoom instances registered to this kernel client. - """ - if len(self._yroom_ids) == 0: - return [] - - assert self._yroom_manager - rooms: list[YRoom] = [] - - # Always call `get_room()` to get the latest reference to the room. We - # must do this since rooms may be deleted upon inactivity. The - # `get_room()` method returns a cached value as long as the room was not - # deleted, so this is very fast in most cases. - for room_id in self._yroom_ids: - room = self._yroom_manager.get_room(room_id) - rooms.append(room) - - return rooms - async def add_yroom(self, yroom: YRoom): """ - Register a `YRoom` with this kernel client, given the room ID. - Registered `YRoom`s will intercept display and kernel status messages. - - `self.bind_yroom_manager()` must be called before using this method. + Register a YRoom with this kernel client. YRooms will + intercept display and kernel status messages. """ - assert self._yroom_manager - self._yroom_ids.add(yroom.room_id) - self.log.info( - f"Added room '{yroom.room_id}' to kernel '{self.kernel_name}'. " - f"Total rooms: {len(self._yroom_ids)}" - ) + self._yrooms.add(yroom) async def remove_yroom(self, yroom: YRoom): """ - De-register a `YRoom` from this kernel client, given the room ID. - - `self.bind_yroom_manager()` must be called before using this method. - """ - self._yrooms_ids.discard(yroom.room_id) - self.log.info( - f"Removed room '{yroom.room_id}' from kernel '{self.kernel_name}'. " - f"Total rooms: {len(self._yroom_ids)}" - ) - - @property - def yroom_manager(self) -> YRoomManager: - return self._yroom_manager - - def bind_yroom_manager(self, yroom_manager: YRoomManager): - """ - Binds a reference to the `YRoomManager` singleton to this instance. This - method must be called before adding a room. + De-register a YRoom from handling kernel client messages. """ - self._yroom_manager = yroom_manager + self._yrooms.discard(yroom) AbstractDocumentAwareKernelClient.register(DocumentAwareKernelClient) diff --git a/jupyter_server_documents/session_manager.py b/jupyter_server_documents/session_manager.py index b09ed01..0e255dc 100644 --- a/jupyter_server_documents/session_manager.py +++ b/jupyter_server_documents/session_manager.py @@ -122,16 +122,13 @@ async def create_session( # Get YRoom for this session and store its ID in `self._room_ids` yroom = self._init_session_yroom(session_id, real_path) - # Bind `YRoomManager` to the kernel client - kernel_client = self.get_kernel_client(kernel_id) - kernel_client.bind_yroom_manager(yroom_manager=self.yroom_manager) - # Add YRoom to this session's kernel client # TODO: we likely have a race condition here... need to # think about it more. Currently, the kernel client gets # created after the kernel starts fully. We need the # kernel client instantiated _before_ trying to connect # the yroom. + kernel_client = self.get_kernel_client(kernel_id) await kernel_client.add_yroom(yroom) self.log.info(f"Connected yroom {yroom.room_id} to kernel {kernel_id}. yroom: {yroom}") return session_model From aff427f9fdc438f0fcbd6c471da9a13482a8d9b6 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Fri, 13 Jun 2025 18:00:25 -0700 Subject: [PATCH 18/21] fix empty notebooks after room restart --- jupyter_server_documents/rooms/yroom.py | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/jupyter_server_documents/rooms/yroom.py b/jupyter_server_documents/rooms/yroom.py index 3a2a024..d5bac86 100644 --- a/jupyter_server_documents/rooms/yroom.py +++ b/jupyter_server_documents/rooms/yroom.py @@ -701,14 +701,13 @@ def stop(self, close_code: int = 1001, immediately: bool = False): # Otherwise, stop the file API. self.file_api.stop() - # Clear the YDoc, saving beforehand unless `immediately=True` - if immediately: - self._clear_ydoc() - else: + # Clear the YDoc, saving the previous content unless `immediately=True` + if not immediately: + prev_jupyter_ydoc = self._jupyter_ydoc self._loop.create_task( - self._save_then_clear_ydoc() + self.file_api.save(prev_jupyter_ydoc) ) - + self._clear_ydoc() self._stopped = True @@ -725,17 +724,6 @@ def _clear_ydoc(self): ) - async def _save_then_clear_ydoc(self): - """ - Saves the JupyterYDoc, then calls `self._clear_ydoc()`. - - This can be run safely in the background because the FileAPI uses a - lock to prevent overlapping reads & writes to a single file. - """ - await self.file_api.save(self._jupyter_ydoc) - self._clear_ydoc() - - @property def stopped(self) -> bool: """ @@ -743,6 +731,7 @@ def stopped(self) -> bool: """ return self._stopped + @property def updated(self) -> bool: """ @@ -755,6 +744,7 @@ def updated(self) -> bool: """ return self._updated + def restart(self, close_code: int = 1001, immediately: bool = False): """ Restarts the YRoom. This method re-initializes & reloads the YDoc, From a3dac7a9dfd4b5597d72f8b5beb4babc10c23572 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Mon, 16 Jun 2025 13:23:18 -0700 Subject: [PATCH 19/21] add observe_jupyter_ydoc() method for consumers --- jupyter_server_documents/rooms/yroom.py | 59 ++++++++++++++++++++++--- 1 file changed, 52 insertions(+), 7 deletions(-) diff --git a/jupyter_server_documents/rooms/yroom.py b/jupyter_server_documents/rooms/yroom.py index d5bac86..cd029dc 100644 --- a/jupyter_server_documents/rooms/yroom.py +++ b/jupyter_server_documents/rooms/yroom.py @@ -5,6 +5,7 @@ from ..websockets import YjsClientGroup import pycrdt +import uuid from pycrdt import YMessageType, YSyncMessageType as YSyncMessageSubtype from jupyter_server_documents.ydocs import ydocs as jupyter_ydoc_classes from jupyter_ydoc.ybasedoc import YBaseDoc @@ -53,6 +54,14 @@ class YRoom: _jupyter_ydoc: YBaseDoc | None """JupyterYDoc""" + _jupyter_ydoc_observers: dict[str, callable[[str, Any], Any]] + """ + Dictionary of JupyterYDoc observers added by consumers of this room. + + Added to via `observe_jupyter_ydoc()`. Removed from via + `unobserve_jupyter_ydoc()`. + """ + _ydoc: pycrdt.Doc """Ydoc""" _awareness: pycrdt.Awareness @@ -107,6 +116,7 @@ def __init__( self._loop = loop self._fileid_manager = fileid_manager self._contents_manager = contents_manager + self._jupyter_ydoc_observers = {} self._stopped = False self._updated = False @@ -482,6 +492,37 @@ def _on_ydoc_update(self, event: TransactionEvent) -> None: self._broadcast_message(message, message_type="SyncUpdate") + def observe_jupyter_ydoc(self, observer: callable[[str, Any], Any]) -> str: + """ + Adds an observer callback to the JupyterYDoc that fires on change. + The callback should accept 2 arguments: + + 1. `updated_key: str`: the key of the shared type that was updated, e.g. + "cells", "state", or "metadata". + + 2. `event: Any`: The `pycrdt` event corresponding to the shared type. + For example, if "state" refers to a `pycrdt.Map`, `event` will take the + type `pycrdt.MapEvent`. + + Consumers should use this method instead of calling `observe()` directly + on the `jupyter_ydoc.YBaseDoc` instance, because JupyterYDocs generally + only allow for a single observer. + + Returns an `observer_id: str` that can be passed to + `unobserve_jupyter_ydoc()` to remove the observer. + """ + observer_id = uuid.uuid4() + self._jupyter_ydoc_observers[observer_id] = observer + + + def unobserve_jupyter_ydoc(self, observer_id: str): + """ + Removes an observer from the JupyterYDoc previously added by + `observe_jupyter_ydoc()`, given the returned `observer_id`. + """ + self._jupyter_ydoc_observers.pop(observer_id, None) + + def _on_jupyter_ydoc_update(self, updated_key: str, event: Any) -> None: """ This method is an observer on `self._jupyter_ydoc` which saves the file @@ -518,10 +559,15 @@ def _on_jupyter_ydoc_update(self, updated_key: str, event: Any) -> None: if should_ignore_state_update(map_event): return - # Otherwise, a change was made. Set `updated=True` and save the file + # Otherwise, a change was made. + # Call all observers added by consumers first. + for observer in self._jupyter_ydoc_observers.values(): + observer(updated_key, event) + + # Then set `updated=True` and save the file. self._updated = True self.file_api.schedule_save() - + def handle_awareness_update(self, client_id: str, message: bytes) -> None: # Apply the AwarenessUpdate message @@ -707,14 +753,14 @@ def stop(self, close_code: int = 1001, immediately: bool = False): self._loop.create_task( self.file_api.save(prev_jupyter_ydoc) ) - self._clear_ydoc() + self._reset_ydoc() self._stopped = True - def _clear_ydoc(self): + def _reset_ydoc(self): """ - Clears the YDoc, awareness, and JupyterYDoc, freeing their memory to the - server. This deletes the YDoc history. + Deletes and re-initializes the YDoc, awareness, and JupyterYDoc. This + frees the memory occupied by their histories. """ self._ydoc = self._init_ydoc() self._awareness = self._init_awareness(ydoc=self._ydoc) @@ -723,7 +769,6 @@ def _clear_ydoc(self): awareness=self._awareness ) - @property def stopped(self) -> bool: """ From ef31982383a1fda10c1babaa760bb06fd15d6755 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Mon, 16 Jun 2025 14:00:05 -0700 Subject: [PATCH 20/21] fix typos and missing type annotations (thanks @JGuinegagne) --- jupyter_server_documents/rooms/yroom.py | 8 ++++---- jupyter_server_documents/session_manager.py | 2 +- jupyter_server_documents/websockets/clients.py | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/jupyter_server_documents/rooms/yroom.py b/jupyter_server_documents/rooms/yroom.py index cd029dc..f80fbf8 100644 --- a/jupyter_server_documents/rooms/yroom.py +++ b/jupyter_server_documents/rooms/yroom.py @@ -184,7 +184,7 @@ async def emit_load_event(): def _init_ydoc(self) -> pycrdt.Doc: """ Initializes a YDoc, automatically binding its `_on_ydoc_update()` - observer to `self._ydoc_subscription`. The observer can removed via + observer to `self._ydoc_subscription`. The observer can be removed via `ydoc.unobserve(self._ydoc_subscription)`. """ self._ydoc = pycrdt.Doc() @@ -698,7 +698,7 @@ def handle_inband_deletion(self) -> None: self.stop(close_code=4002, immediately=True) - def stop(self, close_code: int = 1001, immediately: bool = False): + def stop(self, close_code: int = 1001, immediately: bool = False) -> None: """ Stops the YRoom. This method: @@ -757,7 +757,7 @@ def stop(self, close_code: int = 1001, immediately: bool = False): self._stopped = True - def _reset_ydoc(self): + def _reset_ydoc(self) -> None: """ Deletes and re-initializes the YDoc, awareness, and JupyterYDoc. This frees the memory occupied by their histories. @@ -790,7 +790,7 @@ def updated(self) -> bool: return self._updated - def restart(self, close_code: int = 1001, immediately: bool = False): + def restart(self, close_code: int = 1001, immediately: bool = False) -> None: """ Restarts the YRoom. This method re-initializes & reloads the YDoc, Awareness, and the JupyterYDoc. After this method is called, this diff --git a/jupyter_server_documents/session_manager.py b/jupyter_server_documents/session_manager.py index 0e255dc..473a0cd 100644 --- a/jupyter_server_documents/session_manager.py +++ b/jupyter_server_documents/session_manager.py @@ -134,7 +134,7 @@ async def create_session( return session_model - async def update_session(self, session_id, **update) -> None: + async def update_session(self, session_id: str, **update) -> None: """ Updates the session identified by `session_id` using the keyword arguments passed to this method. Each keyword argument should correspond diff --git a/jupyter_server_documents/websockets/clients.py b/jupyter_server_documents/websockets/clients.py index c9a01d0..0a18144 100644 --- a/jupyter_server_documents/websockets/clients.py +++ b/jupyter_server_documents/websockets/clients.py @@ -198,7 +198,7 @@ def close_all(self, close_code: int): for client in clients: client.websocket.close(code=close_code) - def stop(self, close_code: int = 1001): + def stop(self, close_code: int = 1001) -> None: """ Closes all Websocket connections with the given close code, removes all clients from this group. Future calls to `add()` are ignored until the @@ -214,14 +214,14 @@ def stop(self, close_code: int = 1001): self._stopped = True @property - def stopped(self): + def stopped(self) -> bool: """ Returns whether the client group is stopped. """ return self._stopped - def restart(self, close_code: int = 1001): + def restart(self, close_code: int = 1001) -> None: """ Restarts the client group by setting `stopped` to `False`. Future calls to `add()` will *not* be ignored after this method is called. From a7c90a5d10b93c162f090c9613d842210093c2d9 Mon Sep 17 00:00:00 2001 From: "David L. Qiu" Date: Wed, 18 Jun 2025 10:38:48 -0700 Subject: [PATCH 21/21] disable _watch_rooms() task --- jupyter_server_documents/rooms/yroom_manager.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/jupyter_server_documents/rooms/yroom_manager.py b/jupyter_server_documents/rooms/yroom_manager.py index f93193e..d0e4926 100644 --- a/jupyter_server_documents/rooms/yroom_manager.py +++ b/jupyter_server_documents/rooms/yroom_manager.py @@ -40,7 +40,7 @@ class YRoomManager(): event_logger: EventLogger loop: asyncio.AbstractEventLoop log: logging.Logger - _watch_rooms_task: asyncio.Task + _watch_rooms_task: asyncio.Task | None def __init__( self, @@ -66,7 +66,8 @@ def __init__( # Start `self._watch_rooms()` background task to automatically stop # empty rooms - self._watch_rooms_task = self.loop.create_task(self._watch_rooms()) + # TODO: Do not enable this until #120 is addressed. + # self._watch_rooms_task = self.loop.create_task(self._watch_rooms()) @property @@ -223,7 +224,8 @@ def stop(self) -> None: Gracefully deletes each `YRoom`. See `delete_room()` for more info. """ # First, stop all background tasks - self._watch_rooms_task.cancel() + if self._watch_rooms_task: + self._watch_rooms_task.cancel() # Get all room IDs. If there are none, return early. room_ids = list(self._rooms_by_id.keys())