Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/beicon/v2/core.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,16 @@
[f ob]
(ops/pipe (ops/mapcat f) ob))

(defn exhaust-map
"Maps each value from the source Observable to an Observable, but
ignores subsequent values until the inner Observable completes.
Args:
f: a function that takes a value from the source observable and
returns an Observable
ob: the source Observable"
[f ob]
(ops/pipe (ops/exhaust-map f) ob))

(defn concat-all
[ob]
(ops/pipe (ops/merge-all 1) ob))
Expand Down
5 changes: 5 additions & 0 deletions src/beicon/v2/operators.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@
[f]
(rx/concatMap #(f %2 %1)))

(def ^function exhaust-map
"Maps each value from the source Observable to an Observable, but ignores
subsequent values until the inner Observable completes."
rx/exhaustMap)

(def ^function skip
"Bypasses a specified number of elements in an
observable sequence and then returns the remaining
Expand Down
41 changes: 41 additions & 0 deletions test/beicon/tests/v2_test.cljs
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,47 @@
(drain! fs #(t/is (= % [[0 :a] [1 :b] [2 :c]])))
(rx/on-end fs done))))

(t/deftest observable-exhaust-map-basic
(t/async done
(let [s (rx/from [1 2 3])
ms (rx/exhaust-map #(rx/of (* % 10)) s)]
(t/is (rx/observable? ms))
(drain! ms #(t/is (= % [10 20 30])))
(rx/on-end ms done))))

(t/deftest observable-exhaust-map-ignoring
;; Create inner observables with different timing.
;; First one takes longer, subsequent ones should be ignored
;; until the first one completes
(t/async done
(let [s (rx/from [1 2 3])

ms (rx/exhaust-map
(fn [x]
(if (= x 1)
(rx/delay 50 (rx/of (* x 100)))
(rx/of (* x 100))))
s)]
(drain! ms #(t/is (= % [100])))
(rx/on-end ms done))))

(t/deftest observable-exhaust-map-with-timer-sequence
;; Only values 0 and 3 should be emitted
;; 0: starts immediately, takes 30ms
;; 1, 2: ignored (arrive at 10ms, 20ms - while 0 is still running)
;; 3: starts at 30ms (after 0 completes), takes 30ms
(t/async done
(let [s (->> (rx/timer 0 10)
(rx/take 4))
ms (rx/exhaust-map
(fn [x]
(->> (rx/timer 30)
(rx/map (constantly (* x 100)))))
s)]
(drain! ms #(t/is (= % [0 300])))
(rx/on-end ms done))))


(t/deftest observable-retry
(t/async done
(let [errored? (volatile! false)
Expand Down