-
Notifications
You must be signed in to change notification settings - Fork 675
add support for subscribing to execution store updates #6402
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
WalkthroughAdds end-to-end SSE-based execution-store subscriptions: a frontend React hook, backend SSE operator and route, notification service (Mongo change streams with polling fallback), in-memory subscription registry, remote SSE notifier, repository/service wiring with caching and subscription APIs, server lifecycle management for the notifier, executor signaling for SSE operators, message data structures, and tests. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor UI as Frontend (useExecutionStoreSubscribe)
participant FE as App Server Route<br>/operators/subscribe-execution-store
participant OP as SseOperator
participant NTF as SseNotifier (default_sse_notifier)
participant NS as Notification Service<br>(MongoChangeStream / Poll)
participant DB as MongoDB
UI->>FE: POST subscribe-execution-store<br/>operator_uri, dataset_id, operator_uri-resolved
FE->>OP: Execute operator
OP-->>FE: EventSourceResponse (SSE stream)
FE-->>UI: SSE connection established
rect rgb(230,245,255)
note over NS,DB: Startup (app lifecycle)
FE->>NS: (lifecycle) start in dedicated thread
NS->>DB: Open change stream (fallback: poll)
end
UI-->>NTF: Client connected (store_name, dataset_id)
NTF->>NS: Request initial state sync (optional)
NS-->>NTF: Current state messages (async)
NTF-->>UI: SSE events (initial)
loop On changes
DB-->>NS: Change event (insert/update/delete)
NS->>NS: Build MessageData (metadata)
NS-->>NTF: Broadcast message
NTF-->>UI: SSE event (filtered by dataset_id)
end
UI-->>FE: Client disconnect / unsubscribe
NTF->>NTF: Unregister queue
sequenceDiagram
autonumber
participant Prod as Producer (Repo/Service)
participant NS as Notification Service
participant NTF as SseNotifier
participant UI as SSE Clients
Prod->>NS: set_key/delete_key/etc.
NS->>NS: Detect change (change stream or poll)
NS->>NTF: notify(store_name, MessageData.json)
NTF->>UI: Fan-out to subscribed queues (dataset filter)
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (16)
app/packages/core/src/subscription/useExecutionStoreSubscribe.ts
(1 hunks)fiftyone/factory/repo_factory.py
(2 hunks)fiftyone/factory/repos/execution_store.py
(13 hunks)fiftyone/operators/__init__.py
(1 hunks)fiftyone/operators/executor.py
(4 hunks)fiftyone/operators/message.py
(2 hunks)fiftyone/operators/remote_notifier.py
(1 hunks)fiftyone/operators/server.py
(2 hunks)fiftyone/operators/sse.py
(1 hunks)fiftyone/operators/store/notification_service.py
(1 hunks)fiftyone/operators/store/service.py
(6 hunks)fiftyone/operators/store/store.py
(3 hunks)fiftyone/operators/store/subscription_registry.py
(1 hunks)fiftyone/server/app.py
(3 hunks)tests/unittests/operators/notification_service_tests.py
(1 hunks)tests/unittests/operators/subscription_registry_tests.py
(1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.{ts,tsx}
⚙️ CodeRabbit configuration file
Review the Typescript and React code for conformity with best practices in React, Recoil, Graphql, and Typescript. Highlight any deviations.
Files:
app/packages/core/src/subscription/useExecutionStoreSubscribe.ts
🧬 Code graph analysis (14)
tests/unittests/operators/notification_service_tests.py (4)
fiftyone/operators/message.py (4)
MessageData
(51-100)MessageMetadata
(42-47)to_json
(30-35)to_json
(94-100)fiftyone/operators/store/notification_service.py (12)
MongoChangeStreamNotificationService
(100-559)subscribe
(43-59)subscribe
(137-182)unsubscribe
(62-68)unsubscribe
(184-190)unsubscribe_all
(71-77)unsubscribe_all
(192-198)notify
(80-87)notify
(218-272)_handle_change
(402-466)_run
(274-287)_poll
(490-559)fiftyone/operators/remote_notifier.py (2)
broadcast_to_store
(27-35)broadcast_to_store
(50-113)fiftyone/operators/store/subscription_registry.py (8)
subscribe
(22-37)subscribe
(85-98)get_subscribers
(62-69)get_subscribers
(113-117)unsubscribe
(40-45)unsubscribe
(100-106)unsubscribe_all
(48-52)unsubscribe_all
(108-111)
tests/unittests/operators/subscription_registry_tests.py (3)
fiftyone/operators/store/subscription_registry.py (11)
InLocalMemorySubscriptionRegistry
(72-121)subscribe
(22-37)subscribe
(85-98)get_subscribers
(62-69)get_subscribers
(113-117)unsubscribe
(40-45)unsubscribe
(100-106)unsubscribe_all
(48-52)unsubscribe_all
(108-111)empty_subscribers
(55-59)empty_subscribers
(119-121)fiftyone/factory/repos/execution_store.py (2)
subscribe
(324-348)unsubscribe
(350-367)fiftyone/operators/store/notification_service.py (6)
subscribe
(43-59)subscribe
(137-182)unsubscribe
(62-68)unsubscribe
(184-190)unsubscribe_all
(71-77)unsubscribe_all
(192-198)
fiftyone/operators/store/store.py (5)
fiftyone/operators/message.py (1)
MessageData
(51-100)fiftyone/factory/repos/execution_store.py (3)
MongoExecutionStoreRepo
(370-681)subscribe
(324-348)unsubscribe
(350-367)fiftyone/operators/store/service.py (3)
ExecutionStoreService
(22-330)subscribe
(307-319)unsubscribe
(321-330)fiftyone/operators/store/notification_service.py (4)
subscribe
(43-59)subscribe
(137-182)unsubscribe
(62-68)unsubscribe
(184-190)fiftyone/operators/store/subscription_registry.py (4)
subscribe
(22-37)subscribe
(85-98)unsubscribe
(40-45)unsubscribe
(100-106)
fiftyone/operators/sse.py (2)
fiftyone/operators/executor.py (2)
ExecutionContext
(522-1018)dataset_id
(614-618)fiftyone/operators/remote_notifier.py (1)
get_event_source_response
(115-166)
fiftyone/operators/__init__.py (1)
fiftyone/operators/sse.py (2)
SseOperator
(40-62)SseOperatorConfig
(18-37)
fiftyone/operators/remote_notifier.py (3)
tests/unittests/operators/notification_service_tests.py (1)
notification_service
(153-169)fiftyone/operators/store/notification_service.py (1)
_broadcast_current_state_for_store
(324-377)fiftyone/operators/message.py (2)
to_json
(30-35)to_json
(94-100)
fiftyone/server/app.py (1)
fiftyone/operators/store/notification_service.py (6)
MongoChangeStreamNotificationServiceLifecycleManager
(562-635)is_notification_service_disabled
(638-645)start_in_dedicated_thread
(572-598)stop
(95-97)stop
(379-400)stop
(600-635)
fiftyone/operators/store/subscription_registry.py (2)
fiftyone/factory/repos/execution_store.py (2)
subscribe
(324-348)unsubscribe
(350-367)fiftyone/operators/store/notification_service.py (6)
subscribe
(43-59)subscribe
(137-182)unsubscribe
(62-68)unsubscribe
(184-190)unsubscribe_all
(71-77)unsubscribe_all
(192-198)
fiftyone/operators/store/notification_service.py (6)
fiftyone/operators/message.py (4)
MessageData
(51-100)MessageMetadata
(42-47)to_json
(30-35)to_json
(94-100)fiftyone/operators/remote_notifier.py (3)
RemoteNotifier
(25-35)broadcast_to_store
(27-35)broadcast_to_store
(50-113)fiftyone/operators/store/subscription_registry.py (11)
LocalSubscriptionRegistry
(18-69)subscribe
(22-37)subscribe
(85-98)unsubscribe
(40-45)unsubscribe
(100-106)unsubscribe_all
(48-52)unsubscribe_all
(108-111)get_subscribers
(62-69)get_subscribers
(113-117)empty_subscribers
(55-59)empty_subscribers
(119-121)fiftyone/factory/repos/execution_store.py (2)
subscribe
(324-348)unsubscribe
(350-367)fiftyone/core/odm/database.py (1)
get_async_db_conn
(526-533)tests/unittests/operators/notification_service_tests.py (1)
notification_service
(153-169)
fiftyone/operators/store/service.py (5)
fiftyone/operators/executor.py (2)
store
(984-996)dataset_id
(614-618)fiftyone/operators/store/notification_service.py (5)
ChangeStreamNotificationService
(39-97)subscribe
(43-59)subscribe
(137-182)unsubscribe
(62-68)unsubscribe
(184-190)fiftyone/factory/repos/execution_store.py (3)
ExecutionStoreRepo
(287-367)subscribe
(324-348)unsubscribe
(350-367)fiftyone/operators/store/store.py (2)
subscribe
(208-217)unsubscribe
(219-228)fiftyone/operators/store/subscription_registry.py (4)
subscribe
(22-37)subscribe
(85-98)unsubscribe
(40-45)unsubscribe
(100-106)
fiftyone/operators/server.py (3)
fiftyone/server/decorators.py (1)
route
(45-77)fiftyone/operators/executor.py (7)
operator_uri
(833-835)execute_or_delegate_operator
(225-372)to_json
(60-64)to_json
(114-118)to_json
(1090-1106)to_json
(1128-1139)to_json
(1166-1175)fiftyone/operators/registry.py (4)
can_execute
(145-154)operator_exists
(59-71)operator_exists
(130-143)list_errors
(118-128)
fiftyone/factory/repo_factory.py (2)
fiftyone/operators/store/notification_service.py (1)
ChangeStreamNotificationService
(39-97)fiftyone/factory/repos/execution_store.py (2)
ExecutionStoreRepo
(287-367)MongoExecutionStoreRepo
(370-681)
fiftyone/factory/repos/execution_store.py (3)
fiftyone/operators/store/notification_service.py (8)
ChangeStreamNotificationService
(39-97)is_notification_service_disabled
(638-645)subscribe
(43-59)subscribe
(137-182)unsubscribe
(62-68)unsubscribe
(184-190)unsubscribe_all
(71-77)unsubscribe_all
(192-198)fiftyone/operators/store/service.py (3)
clear_cache
(88-90)subscribe
(307-319)unsubscribe
(321-330)fiftyone/operators/store/subscription_registry.py (6)
subscribe
(22-37)subscribe
(85-98)unsubscribe
(40-45)unsubscribe
(100-106)unsubscribe_all
(48-52)unsubscribe_all
(108-111)
app/packages/core/src/subscription/useExecutionStoreSubscribe.ts (2)
app/packages/operators/src/operators.ts (1)
resolveOperatorURI
(637-643)app/packages/utilities/src/fetch.ts (1)
getEventSource
(254-341)
🪛 Pylint (3.3.9)
tests/unittests/operators/notification_service_tests.py
[error] 1-1: Unrecognized option found: optimize-ast, files-output, function-name-hint, variable-name-hint, const-name-hint, attr-name-hint, argument-name-hint, class-attribute-name-hint, inlinevar-name-hint, class-name-hint, module-name-hint, method-name-hint, no-space-check
(E0015)
[refactor] 1-1: Useless option value for '--disable', 'bad-continuation' was removed from pylint, see pylint-dev/pylint#3571.
(R0022)
tests/unittests/operators/subscription_registry_tests.py
[error] 1-1: Unrecognized option found: optimize-ast, files-output, function-name-hint, variable-name-hint, const-name-hint, attr-name-hint, argument-name-hint, class-attribute-name-hint, inlinevar-name-hint, class-name-hint, module-name-hint, method-name-hint, no-space-check
(E0015)
[refactor] 1-1: Useless option value for '--disable', 'bad-continuation' was removed from pylint, see pylint-dev/pylint#3571.
(R0022)
fiftyone/operators/sse.py
[error] 1-1: Unrecognized option found: optimize-ast, files-output, function-name-hint, variable-name-hint, const-name-hint, attr-name-hint, argument-name-hint, class-attribute-name-hint, inlinevar-name-hint, class-name-hint, module-name-hint, method-name-hint, no-space-check
(E0015)
[refactor] 1-1: Useless option value for '--disable', 'bad-continuation' was removed from pylint, see pylint-dev/pylint#3571.
(R0022)
[refactor] 19-19: Too many positional arguments (8/5)
(R0917)
fiftyone/operators/remote_notifier.py
[error] 1-1: Unrecognized option found: optimize-ast, files-output, function-name-hint, variable-name-hint, const-name-hint, attr-name-hint, argument-name-hint, class-attribute-name-hint, inlinevar-name-hint, class-name-hint, module-name-hint, method-name-hint, no-space-check
(E0015)
[refactor] 1-1: Useless option value for '--disable', 'bad-continuation' was removed from pylint, see pylint-dev/pylint#3571.
(R0022)
fiftyone/operators/store/subscription_registry.py
[error] 1-1: Unrecognized option found: optimize-ast, files-output, function-name-hint, variable-name-hint, const-name-hint, attr-name-hint, argument-name-hint, class-attribute-name-hint, inlinevar-name-hint, class-name-hint, module-name-hint, method-name-hint, no-space-check
(E0015)
[refactor] 1-1: Useless option value for '--disable', 'bad-continuation' was removed from pylint, see pylint-dev/pylint#3571.
(R0022)
fiftyone/operators/store/notification_service.py
[error] 1-1: Unrecognized option found: optimize-ast, files-output, function-name-hint, variable-name-hint, const-name-hint, attr-name-hint, argument-name-hint, class-attribute-name-hint, inlinevar-name-hint, class-name-hint, module-name-hint, method-name-hint, no-space-check
(E0015)
[refactor] 1-1: Useless option value for '--disable', 'bad-continuation' was removed from pylint, see pylint-dev/pylint#3571.
(R0022)
fiftyone/operators/server.py
[refactor] 259-262: Unnecessary "else" after "return", remove the "else" and de-indent the code inside it
(R1705)
🪛 Ruff (0.13.3)
tests/unittests/operators/notification_service_tests.py
153-153: Unused method argument: event_loop
(ARG002)
173-173: Unpacked variable collection
is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
173-173: Unpacked variable remote_notifier
is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
201-201: Unpacked variable collection
is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
201-201: Unpacked variable remote_notifier
is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
231-231: Unpacked variable collection
is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
231-231: Unpacked variable remote_notifier
is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
253-253: Unpacked variable collection
is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
253-253: Unpacked variable remote_notifier
is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
287-287: Unpacked variable remote_notifier
is never used
Prefix it with an underscore or any other dummy variable pattern
(RUF059)
fiftyone/operators/sse.py
55-55: Avoid specifying long messages outside the exception class
(TRY003)
fiftyone/operators/remote_notifier.py
65-67: try
-except
-pass
detected, consider logging the exception
(S110)
65-65: Do not catch blind exception: Exception
(BLE001)
97-97: Do not catch blind exception: Exception
(BLE001)
186-188: Avoid specifying long messages outside the exception class
(TRY003)
fiftyone/server/app.py
187-187: Redundant exception object included in logging.exception
call
(TRY401)
fiftyone/operators/store/subscription_registry.py
102-102: Loop control variable store
not used within loop body
Rename unused store
to _store
(B007)
fiftyone/operators/store/notification_service.py
230-230: Do not catch blind exception: Exception
(BLE001)
253-253: Do not catch blind exception: Exception
(BLE001)
260-262: try
-except
-pass
detected, consider logging the exception
(S110)
260-260: Do not catch blind exception: Exception
(BLE001)
284-284: f-string without any placeholders
Remove extraneous f
prefix
(F541)
318-318: Redundant exception object included in logging.exception
call
(TRY401)
373-373: Do not catch blind exception: Exception
(BLE001)
423-423: Unnecessary key check before dictionary access
Replace with dict.get
(RUF019)
466-466: Redundant exception object included in logging.exception
call
(TRY401)
477-477: Redundant exception object included in logging.exception
call
(TRY401)
627-627: Do not catch blind exception: Exception
(BLE001)
fiftyone/operators/store/service.py
52-52: PEP 484 prohibits implicit Optional
Convert to Optional[T]
(RUF013)
fiftyone/operators/server.py
240-240: Avoid specifying long messages outside the exception class
(TRY003)
fiftyone/factory/repos/execution_store.py
311-313: Avoid specifying long messages outside the exception class
(TRY003)
342-344: Avoid specifying long messages outside the exception class
(TRY003)
363-365: Avoid specifying long messages outside the exception class
(TRY003)
382-384: Avoid specifying long messages outside the exception class
(TRY003)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
- GitHub Check: test-windows / test-python (windows-latest, 3.9)
- GitHub Check: test-windows / test-python (windows-latest, 3.11)
- GitHub Check: test-windows / test-python (windows-latest, 3.10)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.11)
- GitHub Check: test-windows / test-python (windows-latest, 3.12)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.9)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.12)
- GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
- GitHub Check: build / build
- GitHub Check: e2e / test-e2e
- GitHub Check: build
if not is_notification_service_disabled(): | ||
if notification_service is None: | ||
self._notification_service = default_notification_service | ||
else: | ||
self._notification_service = notification_service | ||
else: | ||
logger.warning("Execution store notification service is disabled") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initialize _notification_service
when disabled.
If the notification service is disabled via FIFTYONE_EXECUTION_STORE_NOTIFICATION_SERVICE_DISABLED
, we log but never set _notification_service
, so the first call to subscribe()
/unsubscribe()
raises AttributeError
instead of the intended ValueError
. Please always define the attribute, e.g. by setting it to None
in the disabled branch.
if not is_notification_service_disabled():
if notification_service is None:
self._notification_service = default_notification_service
else:
self._notification_service = notification_service
else:
logger.warning("Execution store notification service is disabled")
+ self._notification_service = None
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if not is_notification_service_disabled(): | |
if notification_service is None: | |
self._notification_service = default_notification_service | |
else: | |
self._notification_service = notification_service | |
else: | |
logger.warning("Execution store notification service is disabled") | |
if not is_notification_service_disabled(): | |
if notification_service is None: | |
self._notification_service = default_notification_service | |
else: | |
self._notification_service = notification_service | |
else: | |
logger.warning("Execution store notification service is disabled") | |
self._notification_service = None |
🤖 Prompt for AI Agents
In fiftyone/factory/repos/execution_store.py around lines 316 to 323, when the
notification service is disabled the code only logs a warning and does not set
self._notification_service, causing AttributeError later; modify the disabled
branch to explicitly set self._notification_service = None (so
subscribe/unsubscribe raise the intended ValueError) while keeping the existing
log message.
return self._notification_service.subscribe( | ||
store_name, callback, str(self._dataset_id) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don’t stringify a missing dataset id when subscribing.
str(self._dataset_id)
turns a None
scope into the literal "None"
, so global subscribers never match notifications (which still carry None
). Pass None
through unchanged.
- return self._notification_service.subscribe(
- store_name, callback, str(self._dataset_id)
- )
+ dataset_scope = (
+ str(self._dataset_id) if self._dataset_id is not None else None
+ )
+ return self._notification_service.subscribe(
+ store_name, callback, dataset_scope
+ )
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
return self._notification_service.subscribe( | |
store_name, callback, str(self._dataset_id) | |
) | |
dataset_scope = ( | |
str(self._dataset_id) if self._dataset_id is not None else None | |
) | |
return self._notification_service.subscribe( | |
store_name, callback, dataset_scope | |
) |
🤖 Prompt for AI Agents
In fiftyone/factory/repos/execution_store.py around lines 346 to 348, the call
to subscribe is passing str(self._dataset_id) which converts None to the literal
"None" and breaks global subscriptions; change the argument to pass
self._dataset_id directly (so None is preserved) when calling
self._notification_service.subscribe(store_name, callback, self._dataset_id).
def delete_store(self, store_name: str) -> int: | ||
if self._notification_service: | ||
self._notification_service.unsubscribe_all(store_name) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dataset-scoped deletions shouldn’t drop other datasets’ subscribers.
unsubscribe_all(store_name)
erases every subscription registered under that store name, regardless of dataset. Because the subscription registry (see fiftyone/operators/store/subscription_registry.py
) keys by store_name
only, deleting a store in one dataset silently strips subscriptions in every other dataset that happens to share the same store name. Please make the unsubscribe path dataset-aware (e.g., teach the notification service/registry to filter by both store and dataset id, and call that here) so other datasets remain subscribed.
# Maps store names to a set of tuples (queue, dataset_id) | ||
self.store_queues: Dict[ | ||
str, Set[Tuple[asyncio.Queue, Optional[str]]] | ||
] = {} | ||
|
||
async def broadcast_to_store(self, store_name: str, message: str) -> None: | ||
""" | ||
Broadcast a message to all connected SSE clients subscribed to the specified store. | ||
Handles disconnected clients gracefully without raising exceptions. | ||
Args: | ||
store_name: The name of the store to broadcast to. | ||
message: The message to broadcast. | ||
""" | ||
if store_name in self.store_queues: | ||
# Try to extract dataset_id from message for filtering | ||
dataset_id = None | ||
try: | ||
msg_data = json.loads(message) | ||
dataset_id = msg_data.get("metadata", {}).get("dataset_id") | ||
except Exception: | ||
# If we can't parse the message, continue without dataset filtering | ||
pass | ||
|
||
logger.debug( | ||
"Broadcasting message to store '%s'%s: %s", | ||
store_name, | ||
f" for dataset {dataset_id}" if dataset_id else "", | ||
message, | ||
) | ||
|
||
# Create a copy of the queues to avoid modification during iteration | ||
queue_items = list(self.store_queues[store_name]) | ||
queues_to_remove = set() | ||
|
||
for queue, client_dataset_id in queue_items: | ||
# Filter by dataset_id if both are specified | ||
if ( | ||
client_dataset_id is not None | ||
and dataset_id is not None | ||
and dataset_id != client_dataset_id | ||
): | ||
continue | ||
|
||
try: | ||
# Use put_nowait to avoid blocking on full queues | ||
# This prevents one slow client from blocking others | ||
queue.put_nowait(message) | ||
except asyncio.QueueFull: | ||
logger.debug( | ||
f"Queue full for client in store '{store_name}', dropping message" | ||
) | ||
except Exception as e: | ||
# If we encounter an error with this queue, mark it for removal | ||
logger.debug( | ||
f"Error sending to client in store '{store_name}': {e}" | ||
) | ||
queues_to_remove.add((queue, client_dataset_id)) | ||
|
||
# Clean up any problematic queues | ||
for queue_item in queues_to_remove: | ||
self._unregister_queue( | ||
store_name, queue_item[0], queue_item[1] | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix concurrent access to store_queues
store_queues
is mutated from both the server event loop (via get_event_source_response
/_unregister_queue
during request handling) and the notification-service thread (via broadcast_to_store
, triggered from MongoChangeStreamNotificationService.notify
, see fiftyone/operators/store/notification_service.py
, Line 267). Because these code paths run in different threads without synchronization, we can crash with RuntimeError: dictionary changed size during iteration
and corrupt the subscription registry. Guard all reads/writes to store_queues
(and the per-store sets) with a threading lock, or funnel every mutation through a single event loop. Right now this is a critical race condition before release.
🧰 Tools
🪛 Ruff (0.13.3)
65-67: try
-except
-pass
detected, consider logging the exception
(S110)
65-65: Do not catch blind exception: Exception
(BLE001)
97-97: Do not catch blind exception: Exception
(BLE001)
def unsubscribe(self, subscription_id: str): | ||
"""Unsubscribe from a specific store. | ||
Args: | ||
subscription_id: The subscription id to unsubscribe from. | ||
""" | ||
self._subscription_registry.unsubscribe(subscription_id) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return the unsubscribe result
LocalSubscriptionRegistry.unsubscribe()
returns True
/False
, and the repo/service APIs (ExecutionStoreRepo.unsubscribe
→ ExecutionStoreService.unsubscribe
→ ExecutionStore.unsubscribe
) all advertise that boolean return to callers. Dropping the return value here turns every successful unsubscribe into None
, breaking the contract and confusing callers that rely on the boolean. Please return the registry result.
- self._subscription_registry.unsubscribe(subscription_id)
+ return self._subscription_registry.unsubscribe(subscription_id)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def unsubscribe(self, subscription_id: str): | |
"""Unsubscribe from a specific store. | |
Args: | |
subscription_id: The subscription id to unsubscribe from. | |
""" | |
self._subscription_registry.unsubscribe(subscription_id) | |
def unsubscribe(self, subscription_id: str): | |
"""Unsubscribe from a specific store. | |
Args: | |
subscription_id: The subscription id to unsubscribe from. | |
""" | |
return self._subscription_registry.unsubscribe(subscription_id) |
🤖 Prompt for AI Agents
In fiftyone/operators/store/notification_service.py around lines 184 to 191, the
unsubscribe method calls
self._subscription_registry.unsubscribe(subscription_id) but ignores its boolean
return value; update the method to return that result (i.e., return the value
from self._subscription_registry.unsubscribe(subscription_id)) so callers
receive the True/False outcome as advertised by the repo/service APIs.
docs = await self._collection_async.find(query).to_list() | ||
for doc in docs: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pass length
to Motor to_list()
Motor requires a length
argument when calling AsyncIOMotorCursor.to_list()
. Calling it without length
raises TypeError: to_list() missing 1 required positional argument: 'length'
, so this code will blow up as soon as we try to sync the current state for a subscriber. Please pass an explicit length (e.g. length=None
to fetch all documents) both here and in _poll()
where the same pattern appears.
- docs = await self._collection_async.find(query).to_list()
+ docs = await self._collection_async.find(query).to_list(length=None)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
docs = await self._collection_async.find(query).to_list() | |
for doc in docs: | |
docs = await self._collection_async.find(query).to_list(length=None) | |
for doc in docs: |
🤖 Prompt for AI Agents
In fiftyone/operators/store/notification_service.py around lines 356-357 the
call docs = await self._collection_async.find(query).to_list() omits the
required length argument for Motor's AsyncIOMotorCursor.to_list(), causing a
TypeError; update this call to pass an explicit length (e.g. to_list(None) to
fetch all documents) and do the same in the _poll() method where the same
pattern appears so both calls supply length=None (or another explicit integer)
to avoid the missing-argument error.
What changes are proposed in this pull request?
This PR adds support for subscribing to execution store updates, primarily for the purpose of real-time communication between the server and the application.
This implementation leverages server-sent events (SSE) to set up client-server subscriptions, and uses Mongo change streams if available (with a fallback to polling if change streams are not available).
es_subscription_test_demo.mov