Skip to content

Conversation

tom-vx51
Copy link
Contributor

@tom-vx51 tom-vx51 commented Oct 10, 2025

What changes are proposed in this pull request?

This PR adds support for subscribing to execution store updates, primarily for the purpose of real-time communication between the server and the application.

This implementation leverages server-sent events (SSE) to set up client-server subscriptions, and uses Mongo change streams if available (with a fallback to polling if change streams are not available).

es_subscription_test_demo.mov

@tom-vx51 tom-vx51 requested review from a team as code owners October 10, 2025 02:02
Copy link
Contributor

coderabbitai bot commented Oct 10, 2025

Walkthrough

Adds end-to-end SSE-based execution-store subscriptions: a frontend React hook, backend SSE operator and route, notification service (Mongo change streams with polling fallback), in-memory subscription registry, remote SSE notifier, repository/service wiring with caching and subscription APIs, server lifecycle management for the notifier, executor signaling for SSE operators, message data structures, and tests.

Changes

Cohort / File(s) Summary
Frontend SSE subscription hook
app/packages/core/src/subscription/useExecutionStoreSubscribe.ts
New React hook to open/monitor SSE subscriptions to the execution store, handle pings, parse messages, and expose health/unsubscribe/reset APIs.
Repos and Services (execution store)
fiftyone/factory/repo_factory.py, fiftyone/factory/repos/execution_store.py, fiftyone/operators/store/service.py, fiftyone/operators/store/store.py
Repo factory now caches per dataset/collection and accepts an optional notification service. Execution store repos refactored with a new abstract/base, notification integration (subscribe/unsubscribe, clear_cache), index updates, and consistent policy/TTL handling. Service and store classes accept and pass notification services, and expose subscribe/unsubscribe APIs.
SSE infrastructure (server + notifier + lifecycle + registry)
fiftyone/operators/remote_notifier.py, fiftyone/operators/store/notification_service.py, fiftyone/operators/store/subscription_registry.py, fiftyone/server/app.py, fiftyone/operators/server.py
Introduces an SSE notifier with per-store queues and dataset filtering, a Mongo change-stream notification service with polling fallback and lifecycle manager, an in-memory subscription registry, app startup/shutdown management for the service, and a new POST route to subscribe to execution-store via SSE.
Operators API and execution signaling
fiftyone/operators/sse.py, fiftyone/operators/__init__.py, fiftyone/operators/executor.py, fiftyone/operators/message.py
Adds SseOperator/SseOperatorConfig and exports, executor now flags SSE results via is_sse, and defines MessageData/MessageMetadata with JSON helpers.
Tests
tests/unittests/operators/notification_service_tests.py, tests/unittests/operators/subscription_registry_tests.py
Unit tests for notification service behavior (subscribe/notify/change-stream/polling) and for the in-memory subscription registry (concurrency and state transitions).

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor UI as Frontend (useExecutionStoreSubscribe)
  participant FE as App Server Route<br>/operators/subscribe-execution-store
  participant OP as SseOperator
  participant NTF as SseNotifier (default_sse_notifier)
  participant NS as Notification Service<br>(MongoChangeStream / Poll)
  participant DB as MongoDB

  UI->>FE: POST subscribe-execution-store<br/>operator_uri, dataset_id, operator_uri-resolved
  FE->>OP: Execute operator
  OP-->>FE: EventSourceResponse (SSE stream)
  FE-->>UI: SSE connection established

  rect rgb(230,245,255)
  note over NS,DB: Startup (app lifecycle)
  FE->>NS: (lifecycle) start in dedicated thread
  NS->>DB: Open change stream (fallback: poll)
  end

  UI-->>NTF: Client connected (store_name, dataset_id)
  NTF->>NS: Request initial state sync (optional)
  NS-->>NTF: Current state messages (async)
  NTF-->>UI: SSE events (initial)

  loop On changes
    DB-->>NS: Change event (insert/update/delete)
    NS->>NS: Build MessageData (metadata)
    NS-->>NTF: Broadcast message
    NTF-->>UI: SSE event (filtered by dataset_id)
  end

  UI-->>FE: Client disconnect / unsubscribe
  NTF->>NTF: Unregister queue
Loading
sequenceDiagram
  autonumber
  participant Prod as Producer (Repo/Service)
  participant NS as Notification Service
  participant NTF as SseNotifier
  participant UI as SSE Clients

  Prod->>NS: set_key/delete_key/etc.
  NS->>NS: Detect change (change stream or poll)
  NS->>NTF: notify(store_name, MessageData.json)
  NTF->>UI: Fan-out to subscribed queues (dataset filter)
Loading

Estimated code review effort

🎯 5 (Critical) | ⏱️ ~120 minutes

Poem

A rabbit perked its ears to streams,
Where keys and stores now whisper dreams.
Ping-ping—alive! the wires sing,
From change streams’ hum to client spring.
With queues that hop and TTLs neat,
We sip the data, warm and sweet.
SSE skies, our burrow’s beat. 🐇✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 61.16% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title Check ✅ Passed The pull request title clearly and concisely summarizes the main feature addition of subscription support for execution store updates, aligning directly with the changeset and providing specific context for future reference.
Description Check ✅ Passed The pull request description fully follows the repository’s template by including clear sections for proposed changes, testing details, release notes with user-facing change indication, and affected areas, ensuring all required information is provided.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feat/es-subscriptions

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1585ac6 and 51764e9.

📒 Files selected for processing (16)
  • app/packages/core/src/subscription/useExecutionStoreSubscribe.ts (1 hunks)
  • fiftyone/factory/repo_factory.py (2 hunks)
  • fiftyone/factory/repos/execution_store.py (13 hunks)
  • fiftyone/operators/__init__.py (1 hunks)
  • fiftyone/operators/executor.py (4 hunks)
  • fiftyone/operators/message.py (2 hunks)
  • fiftyone/operators/remote_notifier.py (1 hunks)
  • fiftyone/operators/server.py (2 hunks)
  • fiftyone/operators/sse.py (1 hunks)
  • fiftyone/operators/store/notification_service.py (1 hunks)
  • fiftyone/operators/store/service.py (6 hunks)
  • fiftyone/operators/store/store.py (3 hunks)
  • fiftyone/operators/store/subscription_registry.py (1 hunks)
  • fiftyone/server/app.py (3 hunks)
  • tests/unittests/operators/notification_service_tests.py (1 hunks)
  • tests/unittests/operators/subscription_registry_tests.py (1 hunks)
🧰 Additional context used
📓 Path-based instructions (1)
**/*.{ts,tsx}

⚙️ CodeRabbit configuration file

Review the Typescript and React code for conformity with best practices in React, Recoil, Graphql, and Typescript. Highlight any deviations.

Files:

  • app/packages/core/src/subscription/useExecutionStoreSubscribe.ts
🧬 Code graph analysis (14)
tests/unittests/operators/notification_service_tests.py (4)
fiftyone/operators/message.py (4)
  • MessageData (51-100)
  • MessageMetadata (42-47)
  • to_json (30-35)
  • to_json (94-100)
fiftyone/operators/store/notification_service.py (12)
  • MongoChangeStreamNotificationService (100-559)
  • subscribe (43-59)
  • subscribe (137-182)
  • unsubscribe (62-68)
  • unsubscribe (184-190)
  • unsubscribe_all (71-77)
  • unsubscribe_all (192-198)
  • notify (80-87)
  • notify (218-272)
  • _handle_change (402-466)
  • _run (274-287)
  • _poll (490-559)
fiftyone/operators/remote_notifier.py (2)
  • broadcast_to_store (27-35)
  • broadcast_to_store (50-113)
fiftyone/operators/store/subscription_registry.py (8)
  • subscribe (22-37)
  • subscribe (85-98)
  • get_subscribers (62-69)
  • get_subscribers (113-117)
  • unsubscribe (40-45)
  • unsubscribe (100-106)
  • unsubscribe_all (48-52)
  • unsubscribe_all (108-111)
tests/unittests/operators/subscription_registry_tests.py (3)
fiftyone/operators/store/subscription_registry.py (11)
  • InLocalMemorySubscriptionRegistry (72-121)
  • subscribe (22-37)
  • subscribe (85-98)
  • get_subscribers (62-69)
  • get_subscribers (113-117)
  • unsubscribe (40-45)
  • unsubscribe (100-106)
  • unsubscribe_all (48-52)
  • unsubscribe_all (108-111)
  • empty_subscribers (55-59)
  • empty_subscribers (119-121)
fiftyone/factory/repos/execution_store.py (2)
  • subscribe (324-348)
  • unsubscribe (350-367)
fiftyone/operators/store/notification_service.py (6)
  • subscribe (43-59)
  • subscribe (137-182)
  • unsubscribe (62-68)
  • unsubscribe (184-190)
  • unsubscribe_all (71-77)
  • unsubscribe_all (192-198)
fiftyone/operators/store/store.py (5)
fiftyone/operators/message.py (1)
  • MessageData (51-100)
fiftyone/factory/repos/execution_store.py (3)
  • MongoExecutionStoreRepo (370-681)
  • subscribe (324-348)
  • unsubscribe (350-367)
fiftyone/operators/store/service.py (3)
  • ExecutionStoreService (22-330)
  • subscribe (307-319)
  • unsubscribe (321-330)
fiftyone/operators/store/notification_service.py (4)
  • subscribe (43-59)
  • subscribe (137-182)
  • unsubscribe (62-68)
  • unsubscribe (184-190)
fiftyone/operators/store/subscription_registry.py (4)
  • subscribe (22-37)
  • subscribe (85-98)
  • unsubscribe (40-45)
  • unsubscribe (100-106)
fiftyone/operators/sse.py (2)
fiftyone/operators/executor.py (2)
  • ExecutionContext (522-1018)
  • dataset_id (614-618)
fiftyone/operators/remote_notifier.py (1)
  • get_event_source_response (115-166)
fiftyone/operators/__init__.py (1)
fiftyone/operators/sse.py (2)
  • SseOperator (40-62)
  • SseOperatorConfig (18-37)
fiftyone/operators/remote_notifier.py (3)
tests/unittests/operators/notification_service_tests.py (1)
  • notification_service (153-169)
fiftyone/operators/store/notification_service.py (1)
  • _broadcast_current_state_for_store (324-377)
fiftyone/operators/message.py (2)
  • to_json (30-35)
  • to_json (94-100)
fiftyone/server/app.py (1)
fiftyone/operators/store/notification_service.py (6)
  • MongoChangeStreamNotificationServiceLifecycleManager (562-635)
  • is_notification_service_disabled (638-645)
  • start_in_dedicated_thread (572-598)
  • stop (95-97)
  • stop (379-400)
  • stop (600-635)
fiftyone/operators/store/subscription_registry.py (2)
fiftyone/factory/repos/execution_store.py (2)
  • subscribe (324-348)
  • unsubscribe (350-367)
fiftyone/operators/store/notification_service.py (6)
  • subscribe (43-59)
  • subscribe (137-182)
  • unsubscribe (62-68)
  • unsubscribe (184-190)
  • unsubscribe_all (71-77)
  • unsubscribe_all (192-198)
fiftyone/operators/store/notification_service.py (6)
fiftyone/operators/message.py (4)
  • MessageData (51-100)
  • MessageMetadata (42-47)
  • to_json (30-35)
  • to_json (94-100)
fiftyone/operators/remote_notifier.py (3)
  • RemoteNotifier (25-35)
  • broadcast_to_store (27-35)
  • broadcast_to_store (50-113)
fiftyone/operators/store/subscription_registry.py (11)
  • LocalSubscriptionRegistry (18-69)
  • subscribe (22-37)
  • subscribe (85-98)
  • unsubscribe (40-45)
  • unsubscribe (100-106)
  • unsubscribe_all (48-52)
  • unsubscribe_all (108-111)
  • get_subscribers (62-69)
  • get_subscribers (113-117)
  • empty_subscribers (55-59)
  • empty_subscribers (119-121)
fiftyone/factory/repos/execution_store.py (2)
  • subscribe (324-348)
  • unsubscribe (350-367)
fiftyone/core/odm/database.py (1)
  • get_async_db_conn (526-533)
tests/unittests/operators/notification_service_tests.py (1)
  • notification_service (153-169)
fiftyone/operators/store/service.py (5)
fiftyone/operators/executor.py (2)
  • store (984-996)
  • dataset_id (614-618)
fiftyone/operators/store/notification_service.py (5)
  • ChangeStreamNotificationService (39-97)
  • subscribe (43-59)
  • subscribe (137-182)
  • unsubscribe (62-68)
  • unsubscribe (184-190)
fiftyone/factory/repos/execution_store.py (3)
  • ExecutionStoreRepo (287-367)
  • subscribe (324-348)
  • unsubscribe (350-367)
fiftyone/operators/store/store.py (2)
  • subscribe (208-217)
  • unsubscribe (219-228)
fiftyone/operators/store/subscription_registry.py (4)
  • subscribe (22-37)
  • subscribe (85-98)
  • unsubscribe (40-45)
  • unsubscribe (100-106)
fiftyone/operators/server.py (3)
fiftyone/server/decorators.py (1)
  • route (45-77)
fiftyone/operators/executor.py (7)
  • operator_uri (833-835)
  • execute_or_delegate_operator (225-372)
  • to_json (60-64)
  • to_json (114-118)
  • to_json (1090-1106)
  • to_json (1128-1139)
  • to_json (1166-1175)
fiftyone/operators/registry.py (4)
  • can_execute (145-154)
  • operator_exists (59-71)
  • operator_exists (130-143)
  • list_errors (118-128)
fiftyone/factory/repo_factory.py (2)
fiftyone/operators/store/notification_service.py (1)
  • ChangeStreamNotificationService (39-97)
fiftyone/factory/repos/execution_store.py (2)
  • ExecutionStoreRepo (287-367)
  • MongoExecutionStoreRepo (370-681)
fiftyone/factory/repos/execution_store.py (3)
fiftyone/operators/store/notification_service.py (8)
  • ChangeStreamNotificationService (39-97)
  • is_notification_service_disabled (638-645)
  • subscribe (43-59)
  • subscribe (137-182)
  • unsubscribe (62-68)
  • unsubscribe (184-190)
  • unsubscribe_all (71-77)
  • unsubscribe_all (192-198)
fiftyone/operators/store/service.py (3)
  • clear_cache (88-90)
  • subscribe (307-319)
  • unsubscribe (321-330)
fiftyone/operators/store/subscription_registry.py (6)
  • subscribe (22-37)
  • subscribe (85-98)
  • unsubscribe (40-45)
  • unsubscribe (100-106)
  • unsubscribe_all (48-52)
  • unsubscribe_all (108-111)
app/packages/core/src/subscription/useExecutionStoreSubscribe.ts (2)
app/packages/operators/src/operators.ts (1)
  • resolveOperatorURI (637-643)
app/packages/utilities/src/fetch.ts (1)
  • getEventSource (254-341)
🪛 Pylint (3.3.9)
tests/unittests/operators/notification_service_tests.py

[error] 1-1: Unrecognized option found: optimize-ast, files-output, function-name-hint, variable-name-hint, const-name-hint, attr-name-hint, argument-name-hint, class-attribute-name-hint, inlinevar-name-hint, class-name-hint, module-name-hint, method-name-hint, no-space-check

(E0015)


[refactor] 1-1: Useless option value for '--disable', 'bad-continuation' was removed from pylint, see pylint-dev/pylint#3571.

(R0022)

tests/unittests/operators/subscription_registry_tests.py

[error] 1-1: Unrecognized option found: optimize-ast, files-output, function-name-hint, variable-name-hint, const-name-hint, attr-name-hint, argument-name-hint, class-attribute-name-hint, inlinevar-name-hint, class-name-hint, module-name-hint, method-name-hint, no-space-check

(E0015)


[refactor] 1-1: Useless option value for '--disable', 'bad-continuation' was removed from pylint, see pylint-dev/pylint#3571.

(R0022)

fiftyone/operators/sse.py

[error] 1-1: Unrecognized option found: optimize-ast, files-output, function-name-hint, variable-name-hint, const-name-hint, attr-name-hint, argument-name-hint, class-attribute-name-hint, inlinevar-name-hint, class-name-hint, module-name-hint, method-name-hint, no-space-check

(E0015)


[refactor] 1-1: Useless option value for '--disable', 'bad-continuation' was removed from pylint, see pylint-dev/pylint#3571.

(R0022)


[refactor] 19-19: Too many positional arguments (8/5)

(R0917)

fiftyone/operators/remote_notifier.py

[error] 1-1: Unrecognized option found: optimize-ast, files-output, function-name-hint, variable-name-hint, const-name-hint, attr-name-hint, argument-name-hint, class-attribute-name-hint, inlinevar-name-hint, class-name-hint, module-name-hint, method-name-hint, no-space-check

(E0015)


[refactor] 1-1: Useless option value for '--disable', 'bad-continuation' was removed from pylint, see pylint-dev/pylint#3571.

(R0022)

fiftyone/operators/store/subscription_registry.py

[error] 1-1: Unrecognized option found: optimize-ast, files-output, function-name-hint, variable-name-hint, const-name-hint, attr-name-hint, argument-name-hint, class-attribute-name-hint, inlinevar-name-hint, class-name-hint, module-name-hint, method-name-hint, no-space-check

(E0015)


[refactor] 1-1: Useless option value for '--disable', 'bad-continuation' was removed from pylint, see pylint-dev/pylint#3571.

(R0022)

fiftyone/operators/store/notification_service.py

[error] 1-1: Unrecognized option found: optimize-ast, files-output, function-name-hint, variable-name-hint, const-name-hint, attr-name-hint, argument-name-hint, class-attribute-name-hint, inlinevar-name-hint, class-name-hint, module-name-hint, method-name-hint, no-space-check

(E0015)


[refactor] 1-1: Useless option value for '--disable', 'bad-continuation' was removed from pylint, see pylint-dev/pylint#3571.

(R0022)

fiftyone/operators/server.py

[refactor] 259-262: Unnecessary "else" after "return", remove the "else" and de-indent the code inside it

(R1705)

🪛 Ruff (0.13.3)
tests/unittests/operators/notification_service_tests.py

153-153: Unused method argument: event_loop

(ARG002)


173-173: Unpacked variable collection is never used

Prefix it with an underscore or any other dummy variable pattern

(RUF059)


173-173: Unpacked variable remote_notifier is never used

Prefix it with an underscore or any other dummy variable pattern

(RUF059)


201-201: Unpacked variable collection is never used

Prefix it with an underscore or any other dummy variable pattern

(RUF059)


201-201: Unpacked variable remote_notifier is never used

Prefix it with an underscore or any other dummy variable pattern

(RUF059)


231-231: Unpacked variable collection is never used

Prefix it with an underscore or any other dummy variable pattern

(RUF059)


231-231: Unpacked variable remote_notifier is never used

Prefix it with an underscore or any other dummy variable pattern

(RUF059)


253-253: Unpacked variable collection is never used

Prefix it with an underscore or any other dummy variable pattern

(RUF059)


253-253: Unpacked variable remote_notifier is never used

Prefix it with an underscore or any other dummy variable pattern

(RUF059)


287-287: Unpacked variable remote_notifier is never used

Prefix it with an underscore or any other dummy variable pattern

(RUF059)

fiftyone/operators/sse.py

55-55: Avoid specifying long messages outside the exception class

(TRY003)

fiftyone/operators/remote_notifier.py

65-67: try-except-pass detected, consider logging the exception

(S110)


65-65: Do not catch blind exception: Exception

(BLE001)


97-97: Do not catch blind exception: Exception

(BLE001)


186-188: Avoid specifying long messages outside the exception class

(TRY003)

fiftyone/server/app.py

187-187: Redundant exception object included in logging.exception call

(TRY401)

fiftyone/operators/store/subscription_registry.py

102-102: Loop control variable store not used within loop body

Rename unused store to _store

(B007)

fiftyone/operators/store/notification_service.py

230-230: Do not catch blind exception: Exception

(BLE001)


253-253: Do not catch blind exception: Exception

(BLE001)


260-262: try-except-pass detected, consider logging the exception

(S110)


260-260: Do not catch blind exception: Exception

(BLE001)


284-284: f-string without any placeholders

Remove extraneous f prefix

(F541)


318-318: Redundant exception object included in logging.exception call

(TRY401)


373-373: Do not catch blind exception: Exception

(BLE001)


423-423: Unnecessary key check before dictionary access

Replace with dict.get

(RUF019)


466-466: Redundant exception object included in logging.exception call

(TRY401)


477-477: Redundant exception object included in logging.exception call

(TRY401)


627-627: Do not catch blind exception: Exception

(BLE001)

fiftyone/operators/store/service.py

52-52: PEP 484 prohibits implicit Optional

Convert to Optional[T]

(RUF013)

fiftyone/operators/server.py

240-240: Avoid specifying long messages outside the exception class

(TRY003)

fiftyone/factory/repos/execution_store.py

311-313: Avoid specifying long messages outside the exception class

(TRY003)


342-344: Avoid specifying long messages outside the exception class

(TRY003)


363-365: Avoid specifying long messages outside the exception class

(TRY003)


382-384: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (11)
  • GitHub Check: test-windows / test-python (windows-latest, 3.9)
  • GitHub Check: test-windows / test-python (windows-latest, 3.11)
  • GitHub Check: test-windows / test-python (windows-latest, 3.10)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.11)
  • GitHub Check: test-windows / test-python (windows-latest, 3.12)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.9)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.12)
  • GitHub Check: test / test-python (ubuntu-latest-m, 3.10)
  • GitHub Check: build / build
  • GitHub Check: e2e / test-e2e
  • GitHub Check: build

Comment on lines +316 to +323
if not is_notification_service_disabled():
if notification_service is None:
self._notification_service = default_notification_service
else:
self._notification_service = notification_service
else:
logger.warning("Execution store notification service is disabled")

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Initialize _notification_service when disabled.

If the notification service is disabled via FIFTYONE_EXECUTION_STORE_NOTIFICATION_SERVICE_DISABLED, we log but never set _notification_service, so the first call to subscribe()/unsubscribe() raises AttributeError instead of the intended ValueError. Please always define the attribute, e.g. by setting it to None in the disabled branch.

         if not is_notification_service_disabled():
             if notification_service is None:
                 self._notification_service = default_notification_service
             else:
                 self._notification_service = notification_service
         else:
             logger.warning("Execution store notification service is disabled")
+            self._notification_service = None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if not is_notification_service_disabled():
if notification_service is None:
self._notification_service = default_notification_service
else:
self._notification_service = notification_service
else:
logger.warning("Execution store notification service is disabled")
if not is_notification_service_disabled():
if notification_service is None:
self._notification_service = default_notification_service
else:
self._notification_service = notification_service
else:
logger.warning("Execution store notification service is disabled")
self._notification_service = None
🤖 Prompt for AI Agents
In fiftyone/factory/repos/execution_store.py around lines 316 to 323, when the
notification service is disabled the code only logs a warning and does not set
self._notification_service, causing AttributeError later; modify the disabled
branch to explicitly set self._notification_service = None (so
subscribe/unsubscribe raise the intended ValueError) while keeping the existing
log message.

Comment on lines +346 to +348
return self._notification_service.subscribe(
store_name, callback, str(self._dataset_id)
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Don’t stringify a missing dataset id when subscribing.

str(self._dataset_id) turns a None scope into the literal "None", so global subscribers never match notifications (which still carry None). Pass None through unchanged.

-        return self._notification_service.subscribe(
-            store_name, callback, str(self._dataset_id)
-        )
+        dataset_scope = (
+            str(self._dataset_id) if self._dataset_id is not None else None
+        )
+        return self._notification_service.subscribe(
+            store_name, callback, dataset_scope
+        )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
return self._notification_service.subscribe(
store_name, callback, str(self._dataset_id)
)
dataset_scope = (
str(self._dataset_id) if self._dataset_id is not None else None
)
return self._notification_service.subscribe(
store_name, callback, dataset_scope
)
🤖 Prompt for AI Agents
In fiftyone/factory/repos/execution_store.py around lines 346 to 348, the call
to subscribe is passing str(self._dataset_id) which converts None to the literal
"None" and breaks global subscriptions; change the argument to pass
self._dataset_id directly (so None is preserved) when calling
self._notification_service.subscribe(store_name, callback, self._dataset_id).

Comment on lines 490 to +493
def delete_store(self, store_name: str) -> int:
if self._notification_service:
self._notification_service.unsubscribe_all(store_name)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Dataset-scoped deletions shouldn’t drop other datasets’ subscribers.

unsubscribe_all(store_name) erases every subscription registered under that store name, regardless of dataset. Because the subscription registry (see fiftyone/operators/store/subscription_registry.py) keys by store_name only, deleting a store in one dataset silently strips subscriptions in every other dataset that happens to share the same store name. Please make the unsubscribe path dataset-aware (e.g., teach the notification service/registry to filter by both store and dataset id, and call that here) so other datasets remain subscribed.

Comment on lines +45 to +108
# Maps store names to a set of tuples (queue, dataset_id)
self.store_queues: Dict[
str, Set[Tuple[asyncio.Queue, Optional[str]]]
] = {}

async def broadcast_to_store(self, store_name: str, message: str) -> None:
"""
Broadcast a message to all connected SSE clients subscribed to the specified store.
Handles disconnected clients gracefully without raising exceptions.
Args:
store_name: The name of the store to broadcast to.
message: The message to broadcast.
"""
if store_name in self.store_queues:
# Try to extract dataset_id from message for filtering
dataset_id = None
try:
msg_data = json.loads(message)
dataset_id = msg_data.get("metadata", {}).get("dataset_id")
except Exception:
# If we can't parse the message, continue without dataset filtering
pass

logger.debug(
"Broadcasting message to store '%s'%s: %s",
store_name,
f" for dataset {dataset_id}" if dataset_id else "",
message,
)

# Create a copy of the queues to avoid modification during iteration
queue_items = list(self.store_queues[store_name])
queues_to_remove = set()

for queue, client_dataset_id in queue_items:
# Filter by dataset_id if both are specified
if (
client_dataset_id is not None
and dataset_id is not None
and dataset_id != client_dataset_id
):
continue

try:
# Use put_nowait to avoid blocking on full queues
# This prevents one slow client from blocking others
queue.put_nowait(message)
except asyncio.QueueFull:
logger.debug(
f"Queue full for client in store '{store_name}', dropping message"
)
except Exception as e:
# If we encounter an error with this queue, mark it for removal
logger.debug(
f"Error sending to client in store '{store_name}': {e}"
)
queues_to_remove.add((queue, client_dataset_id))

# Clean up any problematic queues
for queue_item in queues_to_remove:
self._unregister_queue(
store_name, queue_item[0], queue_item[1]
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix concurrent access to store_queues

store_queues is mutated from both the server event loop (via get_event_source_response/_unregister_queue during request handling) and the notification-service thread (via broadcast_to_store, triggered from MongoChangeStreamNotificationService.notify, see fiftyone/operators/store/notification_service.py, Line 267). Because these code paths run in different threads without synchronization, we can crash with RuntimeError: dictionary changed size during iteration and corrupt the subscription registry. Guard all reads/writes to store_queues (and the per-store sets) with a threading lock, or funnel every mutation through a single event loop. Right now this is a critical race condition before release.

🧰 Tools
🪛 Ruff (0.13.3)

65-67: try-except-pass detected, consider logging the exception

(S110)


65-65: Do not catch blind exception: Exception

(BLE001)


97-97: Do not catch blind exception: Exception

(BLE001)

Comment on lines +184 to +191
def unsubscribe(self, subscription_id: str):
"""Unsubscribe from a specific store.
Args:
subscription_id: The subscription id to unsubscribe from.
"""
self._subscription_registry.unsubscribe(subscription_id)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Return the unsubscribe result

LocalSubscriptionRegistry.unsubscribe() returns True/False, and the repo/service APIs (ExecutionStoreRepo.unsubscribeExecutionStoreService.unsubscribeExecutionStore.unsubscribe) all advertise that boolean return to callers. Dropping the return value here turns every successful unsubscribe into None, breaking the contract and confusing callers that rely on the boolean. Please return the registry result.

-        self._subscription_registry.unsubscribe(subscription_id)
+        return self._subscription_registry.unsubscribe(subscription_id)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def unsubscribe(self, subscription_id: str):
"""Unsubscribe from a specific store.
Args:
subscription_id: The subscription id to unsubscribe from.
"""
self._subscription_registry.unsubscribe(subscription_id)
def unsubscribe(self, subscription_id: str):
"""Unsubscribe from a specific store.
Args:
subscription_id: The subscription id to unsubscribe from.
"""
return self._subscription_registry.unsubscribe(subscription_id)
🤖 Prompt for AI Agents
In fiftyone/operators/store/notification_service.py around lines 184 to 191, the
unsubscribe method calls
self._subscription_registry.unsubscribe(subscription_id) but ignores its boolean
return value; update the method to return that result (i.e., return the value
from self._subscription_registry.unsubscribe(subscription_id)) so callers
receive the True/False outcome as advertised by the repo/service APIs.

Comment on lines +356 to +357
docs = await self._collection_async.find(query).to_list()
for doc in docs:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Pass length to Motor to_list()

Motor requires a length argument when calling AsyncIOMotorCursor.to_list(). Calling it without length raises TypeError: to_list() missing 1 required positional argument: 'length', so this code will blow up as soon as we try to sync the current state for a subscriber. Please pass an explicit length (e.g. length=None to fetch all documents) both here and in _poll() where the same pattern appears.

-        docs = await self._collection_async.find(query).to_list()
+        docs = await self._collection_async.find(query).to_list(length=None)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
docs = await self._collection_async.find(query).to_list()
for doc in docs:
docs = await self._collection_async.find(query).to_list(length=None)
for doc in docs:
🤖 Prompt for AI Agents
In fiftyone/operators/store/notification_service.py around lines 356-357 the
call docs = await self._collection_async.find(query).to_list() omits the
required length argument for Motor's AsyncIOMotorCursor.to_list(), causing a
TypeError; update this call to pass an explicit length (e.g. to_list(None) to
fetch all documents) and do the same in the _poll() method where the same
pattern appears so both calls supply length=None (or another explicit integer)
to avoid the missing-argument error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant