diff --git a/.travis.yml b/.travis.yml index 89f46b3..e5188a6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,5 +10,6 @@ rvm: - 2.3.0 - jruby-19mode # JRuby in 1.9 mode - rbx-2.6 + - rbx-3.82 before_install: - - gem update bundler + - ruby --version | grep -qF 'rubinius 2.6' || gem update bundler diff --git a/lib/rx/subjects/behavior_subject.rb b/lib/rx/subjects/behavior_subject.rb index 1091067..b8e4a1d 100644 --- a/lib/rx/subjects/behavior_subject.rb +++ b/lib/rx/subjects/behavior_subject.rb @@ -31,8 +31,8 @@ def has_observers? # Gets the current value or throws an exception. def value - gate.synchronize do - self.check_unsubscribed + gate.synchronize do + check_unsubscribed raise @error if @error @value end @@ -41,8 +41,8 @@ def value # Notifies all subscribed observers about the end of the sequence. def on_completed os = nil - @gate.synchronize do - self.check_unsubscribed + @gate.synchronize do + check_unsubscribed unless @stopped os = @observers.clone @@ -60,7 +60,7 @@ def on_error(error) os = nil @gate.synchronize do - self.check_unsubscribed + check_unsubscribed unless @stopped os = @observers.clone @@ -77,7 +77,7 @@ def on_error(error) def on_next(value) os = nil @gate.synchronize do - self.check_unsubscribed + check_unsubscribed @value = value os = @observers.clone unless @stopped end @@ -91,7 +91,7 @@ def subscribe(observer) err = nil gate.synchronize do - self.check_unsubscribed + check_unsubscribed unless @stopped observers.push(observer) diff --git a/test/rx/concurrency/test_default_scheduler.rb b/test/rx/concurrency/test_default_scheduler.rb index 9cc44b1..807dfbf 100644 --- a/test/rx/concurrency/test_default_scheduler.rb +++ b/test/rx/concurrency/test_default_scheduler.rb @@ -1,21 +1,22 @@ # Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. require 'test_helper' +require 'rx/subscriptions/helpers/await_helpers' -# DefaultScheduler creates new threads in which to run scheduled tasks; a short -# sleep is necessary to allow the thread scheduler to yield to the other -# threads. class TestDefaultScheduler < Minitest::Test + include AwaitHelpers def setup @scheduler = Rx::DefaultScheduler.instance end + INTERVAL = 0.05 + def test_schedule_with_state state = [] task = ->(_, s) { s << 1 } @scheduler.schedule_with_state(state, task) - sleep 0.001 + await_array_length(state, 1, INTERVAL) assert_equal([1], state) end @@ -24,15 +25,18 @@ def test_schedule_relative_with_state state = [] task = ->(_, s) { s << 1 } @scheduler.schedule_relative_with_state(state, 0.05, task) - sleep 0.1 + await_array_length(state, 1, INTERVAL) assert_equal([1], state) end def test_default_schedule_runs_in_its_own_thread + state = [] id = Thread.current.object_id - @scheduler.schedule -> { refute_equal(id, Thread.current.object_id) } - sleep 0.001 + @scheduler.schedule -> { state << Thread.current.object_id } + await_array_length(state, 1, INTERVAL) + + refute_equal([id], state) end def test_schedule_action_cancel diff --git a/test/rx/concurrency/test_periodic_scheduler.rb b/test/rx/concurrency/test_periodic_scheduler.rb index ff9c269..dc31fc2 100644 --- a/test/rx/concurrency/test_periodic_scheduler.rb +++ b/test/rx/concurrency/test_periodic_scheduler.rb @@ -1,21 +1,15 @@ # Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. require 'test_helper' +require 'rx/subscriptions/helpers/await_helpers' class PeriodicTestClass include Rx::PeriodicScheduler end -def await_array_length(array, expected, interval) - sleep (expected * interval) * 0.9 - deadline = Time.now + interval * (expected + 1) - while Time.now < deadline - break if array.length == expected - sleep interval / 10 - end -end - class TestPeriodicScheduler < Minitest::Test + include AwaitHelpers + def setup @scheduler = PeriodicTestClass.new end diff --git a/test/rx/subjects/test_behavior_subject.rb b/test/rx/subjects/test_behavior_subject.rb new file mode 100644 index 0000000..d306283 --- /dev/null +++ b/test/rx/subjects/test_behavior_subject.rb @@ -0,0 +1,31 @@ +# Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +require 'test_helper' + +class TestBehaviorSubject < Minitest::Test + def test_subscriber_notified_on_change + value = 0 + subject = Rx::BehaviorSubject.new 0 + subject.as_observable.subscribe { |update| value = update } + subject.on_next 1 + assert_equal 1, value + end + + def test_multiple_observers_notified_on_change + value1 = 0 + value2 = 0 + subject = Rx::BehaviorSubject.new 0 + subject.as_observable.subscribe { |update| value1 = update } + subject.as_observable.subscribe { |update| value2 = update } + subject.on_next 1 + assert_equal 1, value1 + assert_equal 1, value2 + end + + def test_errors_on_next_when_unsubscribed + subject = Rx::BehaviorSubject.new 0 + subject.as_observable.subscribe { } + subject.unsubscribe + assert_raises(RuntimeError) { subject.on_next 1 } + end +end diff --git a/test/rx/subscriptions/helpers/await_helpers.rb b/test/rx/subscriptions/helpers/await_helpers.rb new file mode 100644 index 0000000..87098c0 --- /dev/null +++ b/test/rx/subscriptions/helpers/await_helpers.rb @@ -0,0 +1,10 @@ +module AwaitHelpers + def await_array_length(array, expected, interval) + sleep (expected * interval) * 0.9 + deadline = Time.now + interval * (expected + 1) + while Time.now < deadline + break if array.length == expected + sleep interval / 10 + end + end +end