Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions Sources/StreamVideo/Utils/Store/Store.swift
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ final class Store<Namespace: StoreNamespace>: @unchecked Sendable {

/// Executor that processes actions through the pipeline.
private let executor: StoreExecutor<Namespace>


/// Coordinator that can skip redundant actions before execution.
private let coordinator: StoreCoordinator<Namespace>

/// Publisher that holds and emits the current state.
private let stateSubject: CurrentValueSubject<Namespace.State, Never>

Expand All @@ -81,20 +84,23 @@ final class Store<Namespace: StoreNamespace>: @unchecked Sendable {
/// - middleware: Array of middleware for side effects.
/// - logger: Logger for recording store operations.
/// - executor: Executor for processing the action pipeline.
/// - coordinator: Coordinator that validates actions before execution.
init(
identifier: String,
initialState: Namespace.State,
reducers: [Reducer<Namespace>],
middleware: [Middleware<Namespace>],
logger: StoreLogger<Namespace>,
executor: StoreExecutor<Namespace>
executor: StoreExecutor<Namespace>,
coordinator: StoreCoordinator<Namespace>
) {
self.identifier = identifier
stateSubject = .init(initialState)
self.reducers = reducers
self.middleware = []
self.logger = logger
self.executor = executor
self.coordinator = coordinator

middleware.forEach { add($0) }
}
Expand Down Expand Up @@ -241,17 +247,17 @@ final class Store<Namespace: StoreNamespace>: @unchecked Sendable {
/// logger.error("Action failed: \(error)")
/// }
/// ```

///
/// - Returns: A ``StoreTask`` that can be awaited or ignored for
/// fire-and-forget semantics.
@discardableResult
/// - Returns: A ``StoreTask`` that can be awaited for completion
/// or ignored for fire-and-forget semantics.
func dispatch(
_ actions: [StoreActionBox<Namespace.Action>],
file: StaticString = #file,
function: StaticString = #function,
line: UInt = #line
) -> StoreTask<Namespace> {
let task = StoreTask(executor: executor)
let task = StoreTask(executor: executor, coordinator: coordinator)
processingQueue.addTaskOperation { [weak self] in
guard let self else {
return
Expand All @@ -272,9 +278,13 @@ final class Store<Namespace: StoreNamespace>: @unchecked Sendable {
return task
}

/// Dispatches a single boxed action asynchronously.
///
/// Wraps the action in an array and forwards to
/// ``dispatch(_:file:function:line:)``.
///
/// - Returns: A ``StoreTask`` that can be awaited or ignored.
@discardableResult
/// - Returns: A ``StoreTask`` that can be awaited for completion
/// or ignored for fire-and-forget semantics.
func dispatch(
_ action: StoreActionBox<Namespace.Action>,
file: StaticString = #file,
Expand All @@ -289,9 +299,13 @@ final class Store<Namespace: StoreNamespace>: @unchecked Sendable {
)
}

/// Dispatches multiple unboxed actions asynchronously.
///
/// Actions are boxed automatically before being forwarded to
/// ``dispatch(_:file:function:line:)``.
///
/// - Returns: A ``StoreTask`` that can be awaited or ignored.
@discardableResult
/// - Returns: A ``StoreTask`` that can be awaited for completion
/// or ignored for fire-and-forget semantics.
func dispatch(
_ actions: [Namespace.Action],
file: StaticString = #file,
Expand All @@ -306,9 +320,13 @@ final class Store<Namespace: StoreNamespace>: @unchecked Sendable {
)
}

/// Dispatches a single unboxed action asynchronously.
///
/// The action is boxed automatically and forwarded to
/// ``dispatch(_:file:function:line:)``.
///
/// - Returns: A ``StoreTask`` that can be awaited or ignored.
@discardableResult
/// - Returns: A ``StoreTask`` that can be awaited for completion
/// or ignored for fire-and-forget semantics.
func dispatch(
_ action: Namespace.Action,
file: StaticString = #file,
Expand Down
33 changes: 33 additions & 0 deletions Sources/StreamVideo/Utils/Store/StoreCoordinator.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
//
// Copyright © 2025 Stream.io Inc. All rights reserved.
//

import Foundation

/// Coordinates store actions to prevent redundant state transitions.
///
/// The coordinator evaluates an action against the current state before the
/// store processes it.
/// Implementations can override ``shouldExecute(action:state:)``
/// to skip actions that would not yield a different state,
/// reducing unnecessary work along the pipeline.
class StoreCoordinator<Namespace: StoreNamespace>: @unchecked Sendable {

/// Determines whether an action should run for the provided state snapshot.
///
/// This default implementation always executes the action.
/// Subclasses can override the method to run diffing logic or other
/// heuristics that detect state changes and return `false` when the action
/// can be safely skipped.
///
/// - Parameters:
/// - action: The action that is about to be dispatched.
/// - state: The current state before the action runs.
/// - Returns: `true` to process the action; `false` to skip it.
func shouldExecute(
action: Namespace.Action,
state: Namespace.State
) -> Bool {
true
}
}
4 changes: 3 additions & 1 deletion Sources/StreamVideo/Utils/Store/StoreExecutor.swift
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class StoreExecutor<Namespace: StoreNamespace>: @unchecked Sendable {
file: StaticString,
function: StaticString,
line: UInt
) async throws {
) async throws -> Namespace.State {
// Apply optional delay before processing action
await action.applyDelayBeforeIfRequired()

Expand Down Expand Up @@ -106,6 +106,8 @@ class StoreExecutor<Namespace: StoreNamespace>: @unchecked Sendable {

// Apply optional delay after successful processing
await action.applyDelayAfterIfRequired()

return updatedState
} catch {
// Log failure and rethrow
logger.didFail(
Expand Down
53 changes: 50 additions & 3 deletions Sources/StreamVideo/Utils/Store/StoreLogger.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class StoreLogger<Namespace: StoreNamespace> {
/// aggregation tools.
let logSubsystem: LogSubsystem

/// Aggregated metrics recorded for dispatched actions.
///
/// Statistics are enabled in DEBUG builds to help monitor action
/// throughput.
let statistics: StoreStatistics<Namespace> = .init()

/// Initializes a new store logger.
Expand All @@ -56,7 +60,10 @@ class StoreLogger<Namespace: StoreNamespace> {
self.logSubsystem = logSubsystem

#if DEBUG
statistics.enable(interval: 60) { [weak self] in self?.report($0, interval: $1) }
statistics.enable(interval: 60) {
[weak self] numberOfActions, interval in
self?.report(numberOfActions, interval: interval)
}
#endif
}

Expand All @@ -82,7 +89,38 @@ class StoreLogger<Namespace: StoreNamespace> {
) {
defer { statistics.record(action) }
log.debug(
"Store identifier:\(identifier) completed action:\(action) state:\(state).",
"Store identifier:\(identifier) completed action:\(action) "
+ "state:\(state).",
subsystems: logSubsystem,
functionName: function,
fileName: file,
lineNumber: line
)
}

/// Called when an action is skipped by the coordinator.
///
/// Override to customize logging or metrics for redundant actions
/// that do not require processing.
///
/// - Parameters:
/// - identifier: The store's unique identifier.
/// - action: The action that was skipped.
/// - state: The snapshot used when making the decision.
/// - file: Source file where the action was dispatched.
/// - function: Function where the action was dispatched.
/// - line: Line number where the action was dispatched.
func didSkip(
identifier: String,
action: Namespace.Action,
state: Namespace.State,
file: StaticString,
function: StaticString,
line: UInt
) {
defer { statistics.record(action) }
log.debug(
"Store identifier:\(identifier) skipped action:\(action).",
subsystems: logSubsystem,
functionName: function,
fileName: file,
Expand Down Expand Up @@ -121,12 +159,21 @@ class StoreLogger<Namespace: StoreNamespace> {
)
}

/// Reports aggregated statistics for the store.
///
/// This hook is invoked on a timer when statistics tracking is
/// enabled. Override to forward metrics or customize formatting.
///
/// - Parameters:
/// - numberOfActions: Count of actions recorded in the interval.
/// - interval: The time window for the reported statistics.
func report(
_ numberOfActions: Int,
interval: TimeInterval
) {
log.debug(
"Store identifier:\(Namespace.identifier) performs \(numberOfActions) per \(interval) seconds.",
"Store identifier:\(Namespace.identifier) performs "
+ "\(numberOfActions) per \(interval) seconds.",
subsystems: logSubsystem
)
}
Expand Down
29 changes: 24 additions & 5 deletions Sources/StreamVideo/Utils/Store/StoreNamespace.swift
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,33 @@ protocol StoreNamespace: Sendable {
/// - Returns: An executor instance for this store.
static func executor() -> StoreExecutor<Self>

/// Creates the coordinator for evaluating actions before execution.
///
/// Override to provide custom logic that skips redundant actions.
///
/// - Returns: A coordinator instance for this store.
static func coordinator() -> StoreCoordinator<Self>

/// Creates a configured store instance.
///
/// This method assembles all components into a functioning store.
/// The default implementation should work for most cases.
///
/// - Parameter initialState: The initial state for the store.
///
/// - Parameters:
/// - initialState: The initial state for the store.
/// - reducers: Reducers used to transform state.
/// - middleware: Middleware that handle side effects.
/// - logger: Logger responsible for diagnostics.
/// - executor: Executor that runs the action pipeline.
/// - coordinator: Coordinator that can skip redundant actions.
/// - Returns: A fully configured store instance.
static func store(
initialState: State,
reducers: [Reducer<Self>],
middleware: [Middleware<Self>],
logger: StoreLogger<Self>,
executor: StoreExecutor<Self>
executor: StoreExecutor<Self>,
coordinator: StoreCoordinator<Self>
) -> Store<Self>
}

Expand All @@ -122,6 +135,9 @@ extension StoreNamespace {
/// Default implementation returns basic executor.
static func executor() -> StoreExecutor<Self> { .init() }

/// Default implementation returns a coordinator with no skip logic.
static func coordinator() -> StoreCoordinator<Self> { .init() }

/// Default implementation creates a store with all components.
///
/// This implementation:
Expand All @@ -131,20 +147,23 @@ extension StoreNamespace {
/// 4. Adds middleware from `middleware()`
/// 5. Uses logger from `logger()`
/// 6. Uses executor from `executor()`
/// 7. Uses coordinator from `coordinator()`
static func store(
initialState: State,
reducers: [Reducer<Self>] = Self.reducers(),
middleware: [Middleware<Self>] = Self.middleware(),
logger: StoreLogger<Self> = Self.logger(),
executor: StoreExecutor<Self> = Self.executor()
executor: StoreExecutor<Self> = Self.executor(),
coordinator: StoreCoordinator<Self> = Self.coordinator()
) -> Store<Self> {
.init(
identifier: Self.identifier,
initialState: initialState,
reducers: reducers,
middleware: middleware,
logger: logger,
executor: executor
executor: executor,
coordinator: coordinator
)
}
}
43 changes: 32 additions & 11 deletions Sources/StreamVideo/Utils/Store/StoreTask.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import Combine
import Foundation

/// A lightweight handle for a single dispatched store action.
/// A lightweight handle for dispatched store actions.
///
/// `StoreTask` coordinates the execution of one action via
/// ``StoreExecutor`` and exposes a way to await the result. Callers can
/// `StoreTask` coordinates the execution of one or more actions via
/// ``StoreExecutor`` and ``StoreCoordinator``. Callers can
/// dispatch-and-forget using `run(...)` and optionally await completion
/// or failure later with ``result()``.
///
Expand All @@ -22,27 +22,30 @@ final class StoreTask<Namespace: StoreNamespace>: Sendable {
private enum State { case idle, running, completed, failed(Error) }

private let executor: StoreExecutor<Namespace>
private let coordinator: StoreCoordinator<Namespace>
private let resultSubject: CurrentValueSubject<State, Never> = .init(.idle)

init(
executor: StoreExecutor<Namespace>
executor: StoreExecutor<Namespace>,
coordinator: StoreCoordinator<Namespace>
) {
self.executor = executor
self.coordinator = coordinator
}

// MARK: - Execution

/// Executes the given action through the store pipeline.
/// Executes the given actions through the store pipeline.
///
/// The task transitions to `.running`, delegates to the
/// ``StoreExecutor`` and records completion or failure. Errors are
/// captured and can be retrieved by awaiting ``result()``.
/// ``StoreExecutor`` and ``StoreCoordinator``, and records completion
/// or failure. Errors are captured and can be retrieved by awaiting
/// ``result()``.
///
/// - Parameters:
/// - identifier: Store identifier for logging context.
/// - state: Current state snapshot before processing.
/// - action: Action to execute.
/// - delay: Optional before/after delays.
/// - actions: Actions to execute, each optionally delayed.
/// - reducers: Reducers to apply in order.
/// - middleware: Middleware for side effects.
/// - logger: Logger used for diagnostics.
Expand All @@ -64,10 +67,28 @@ final class StoreTask<Namespace: StoreNamespace>: Sendable {
) async {
resultSubject.send(.running)
do {
var updatedState = state
for action in actions {
try await executor.run(
guard
coordinator.shouldExecute(
action: action.wrappedValue,
state: updatedState
)
else {
logger.didSkip(
identifier: identifier,
action: action.wrappedValue,
state: updatedState,
file: file,
function: function,
line: line
)
continue
}

updatedState = try await executor.run(
identifier: identifier,
state: state,
state: updatedState,
action: action,
reducers: reducers,
middleware: middleware,
Expand Down
Loading