diff --git a/src/beicon/v2/core.cljs b/src/beicon/v2/core.cljs index 2c84f4f..ee89b96 100644 --- a/src/beicon/v2/core.cljs +++ b/src/beicon/v2/core.cljs @@ -417,6 +417,16 @@ [f ob] (ops/pipe (ops/mapcat f) ob)) +(defn exhaust-map + "Maps each value from the source Observable to an Observable, but + ignores subsequent values until the inner Observable completes. + Args: + f: a function that takes a value from the source observable and + returns an Observable + ob: the source Observable" + [f ob] + (ops/pipe (ops/exhaust-map f) ob)) + (defn concat-all [ob] (ops/pipe (ops/merge-all 1) ob)) diff --git a/src/beicon/v2/operators.cljs b/src/beicon/v2/operators.cljs index 7d3b6d5..d8ed17f 100644 --- a/src/beicon/v2/operators.cljs +++ b/src/beicon/v2/operators.cljs @@ -78,6 +78,11 @@ [f] (rx/concatMap #(f %2 %1))) +(def ^function exhaust-map + "Maps each value from the source Observable to an Observable, but ignores + subsequent values until the inner Observable completes." + rx/exhaustMap) + (def ^function skip "Bypasses a specified number of elements in an observable sequence and then returns the remaining diff --git a/test/beicon/tests/v2_test.cljs b/test/beicon/tests/v2_test.cljs index d639f1f..30aa8aa 100644 --- a/test/beicon/tests/v2_test.cljs +++ b/test/beicon/tests/v2_test.cljs @@ -299,6 +299,47 @@ (drain! fs #(t/is (= % [[0 :a] [1 :b] [2 :c]]))) (rx/on-end fs done)))) +(t/deftest observable-exhaust-map-basic + (t/async done + (let [s (rx/from [1 2 3]) + ms (rx/exhaust-map #(rx/of (* % 10)) s)] + (t/is (rx/observable? ms)) + (drain! ms #(t/is (= % [10 20 30]))) + (rx/on-end ms done)))) + +(t/deftest observable-exhaust-map-ignoring + ;; Create inner observables with different timing. + ;; First one takes longer, subsequent ones should be ignored + ;; until the first one completes + (t/async done + (let [s (rx/from [1 2 3]) + + ms (rx/exhaust-map + (fn [x] + (if (= x 1) + (rx/delay 50 (rx/of (* x 100))) + (rx/of (* x 100)))) + s)] + (drain! ms #(t/is (= % [100]))) + (rx/on-end ms done)))) + +(t/deftest observable-exhaust-map-with-timer-sequence + ;; Only values 0 and 3 should be emitted + ;; 0: starts immediately, takes 30ms + ;; 1, 2: ignored (arrive at 10ms, 20ms - while 0 is still running) + ;; 3: starts at 30ms (after 0 completes), takes 30ms + (t/async done + (let [s (->> (rx/timer 0 10) + (rx/take 4)) + ms (rx/exhaust-map + (fn [x] + (->> (rx/timer 30) + (rx/map (constantly (* x 100))))) + s)] + (drain! ms #(t/is (= % [0 300]))) + (rx/on-end ms done)))) + + (t/deftest observable-retry (t/async done (let [errored? (volatile! false)