Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
- stage: canary
rvm: 2.4.0
- stage: test
rvm: jruby-19mode
rvm: jruby-9.1.17.0
- stage: test
rvm: 1.9.3
- stage: test
Expand Down
18 changes: 8 additions & 10 deletions lib/rx/concurrency/virtual_time_scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,33 +54,32 @@ def stop
# Schedules an action to be executed.
def schedule_with_state(state, action)
raise 'action cannot be nil' unless action
schedule_at_absolute_with_state(state, @clock, action)
schedule_absolute_with_state(state, @clock, action)
end

# Schedules an action to be executed at due_time.
def schedule_at_relative(due_time, action)
def schedule_relative(due_time, action)
raise 'action cannot be nil' unless action

schedule_at_relative_with_state(action, due_time, method(:invoke))
schedule_relative_with_state(action, due_time, method(:invoke))
end

# Schedules an action to be executed at due_time.
def schedule_at_relative_with_state(state, due_time, action)
def schedule_relative_with_state(state, due_time, action)
raise 'action cannot be nil' unless action

schedule_at_absolute_with_state(state, @clock + due_time, action)
schedule_absolute_with_state(state, @clock + due_time, action)
end
alias_method :schedule_relative_with_state, :schedule_at_relative_with_state

# Schedules an action to be executed at due_time.
def schedule_at_absolute(due_time, action)
def schedule_absolute(due_time, action)
raise 'action cannot be nil' unless action

schedule_at_absolute_with_state(action, due_time, method(:invoke))
schedule_absolute_with_state(action, due_time, method(:invoke))
end

# Schedules an action to be executed at due_time.
def schedule_at_absolute_with_state(state, due_time, action)
def schedule_absolute_with_state(state, due_time, action)
raise 'action cannot be nil' unless action

si = nil
Expand All @@ -94,7 +93,6 @@ def schedule_at_absolute_with_state(state, due_time, action)

Subscription.create { si.cancel }
end
alias_method :schedule_absolute_with_state, :schedule_at_absolute_with_state

# Advances the scheduler's clock to the specified time, running all work till that point.
def advance_to(time)
Expand Down
2 changes: 1 addition & 1 deletion lib/rx/testing/cold_observable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def _subscribe(observer)
messages.each do |message|
notification = message.value

d.push(@scheduler.schedule_at_relative_with_state(nil, message.time, lambda {|scheduler1, state1|
d.push(@scheduler.schedule_relative_with_state(nil, message.time, lambda {|scheduler1, state1|
notification.accept observer
Subscription.empty
}))
Expand Down
2 changes: 1 addition & 1 deletion lib/rx/testing/hot_observable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def initialize(scheduler, *args)

@messages.each do |message|
notification = message.value
@scheduler.schedule_at_relative_with_state(nil, message.time, lambda {|scheduler1, state1|
@scheduler.schedule_relative_with_state(nil, message.time, lambda {|scheduler1, state1|

@observers.clone.each {|observer| notification.accept observer }

Expand Down
13 changes: 7 additions & 6 deletions lib/rx/testing/test_scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ def initialize(increment_on_simultaneous = true)
end

# Schedules an action to be executed at due_time.
def schedule_at_absolute_with_state(state, due_time, action)
def schedule_absolute_with_state(state, due_time, action)
raise 'action cannot be nil' unless action

due_time = due_time.ts if TestTime === due_time
due_time = now + 1 if due_time <= now && @increment_on_simultaneous

super(state, due_time, action)
Expand All @@ -43,24 +44,24 @@ def configure(options = {})
subscription = nil
observer = create_observer

schedule_at_absolute_with_state(nil, o[:created], lambda {|scheduler, state|
schedule_absolute_with_state(nil, o[:created], lambda {|scheduler, state|
source = yield
Subscription.empty
})

schedule_at_absolute_with_state(nil, o[:subscribed], lambda {|scheduler, state|
schedule_absolute_with_state(nil, o[:subscribed], lambda {|scheduler, state|
subscription = source.subscribe observer
Subscription.empty
})

schedule_at_absolute_with_state(nil, o[:disposed], lambda {|scheduler, state|
schedule_absolute_with_state(nil, o[:disposed], lambda {|scheduler, state|
subscription.unsubscribe
Subscription.empty
})

start
observer

observer
end

# Creates a hot observable using the specified timestamped notification messages.
Expand Down
36 changes: 36 additions & 0 deletions lib/rx/testing/test_time.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
module Rx
class TestTime < Time
include Comparable

attr_reader :ts

def initialize(ts)
super()
@ts = ts
end

def <=>(other)
if other.is_a? TestTime
@ts <=> other.ts
else
@ts <=> other
end
end

def +(other)
TestTime.new(other + @ts)
end

def -(other)
TestTime.new(other - @ts)
end

def to_s
"TestTime @ #{@ts.to_s}"
end

def inspect
"TestTime @ #{@ts.to_s}"
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def test_now
def test_relative_with_state
state = []
task = ->(_, s) { s.push 1 }
@scheduler.schedule_at_relative_with_state(state, 2, task)
@scheduler.schedule_relative_with_state(state, 2, task)
@scheduler.start

assert_equal([1], state)
Expand All @@ -45,7 +45,7 @@ def test_relative_with_state
def test_relative
ran = false
task = ->() { ran = true }
@scheduler.schedule_at_relative(2, task)
@scheduler.schedule_relative(2, task)
@scheduler.start

assert_equal(true, ran)
Expand All @@ -58,7 +58,7 @@ def test_absolute_with_state
state = []
time = @start + 2
task = ->(_, s) { s.push 1 }
@scheduler.schedule_at_absolute_with_state(state, time, task)
@scheduler.schedule_absolute_with_state(state, time, task)
@scheduler.start

assert_equal([1], state)
Expand All @@ -69,7 +69,7 @@ def test_absolute
ran = false
time = @start + 2
task = ->() { ran = true }
@scheduler.schedule_at_absolute(time, task)
@scheduler.schedule_absolute(time, task)
@scheduler.start

assert_equal(true, ran)
Expand All @@ -83,8 +83,8 @@ def test_advance
task = ->() { ran = true }
failure = ->() { flunk "Should never reach." }

@scheduler.schedule_at_absolute(@start + 10, task)
@scheduler.schedule_at_absolute(@start + 11, failure)
@scheduler.schedule_absolute(@start + 10, task)
@scheduler.schedule_absolute(@start + 11, failure)
@scheduler.advance_to(@start + 10)

assert_equal(true, ran)
Expand All @@ -105,8 +105,8 @@ def test_advance_by
task = ->() { ran = true }
failure = ->() { flunk "Should never reach." }

@scheduler.schedule_at_relative(10, task)
@scheduler.schedule_at_relative(11, failure)
@scheduler.schedule_relative(10, task)
@scheduler.schedule_relative(11, failure)
@scheduler.advance_by(10)

assert_equal(true, ran)
Expand All @@ -119,7 +119,7 @@ def test_advance_raises_if_out_of_range

def test_sleep
failure = ->() { flunk "Should not run." }
@scheduler.schedule_at_relative(10, failure)
@scheduler.schedule_relative(10, failure)
@scheduler.sleep(20)

assert_equal(@start + 20, @scheduler.now)
Expand Down
29 changes: 10 additions & 19 deletions test/rx/linq/observable/test_timer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,20 @@ def test_emit_periodic_values_after_delay

assert_msgs msgs('-----0--1--'), actual
end
end

class TestOperatorAsyncTimer < Minitest::Test
include Rx::AsyncTesting
include Rx::ReactiveTest
def test_emit_value_at_point_in_time
actual = scheduler.configure do
Rx::Observable.timer(Rx::TestTime.new(300), scheduler)
end

def setup
@scheduler = Rx::TestScheduler.new
@observer = @scheduler.create_observer
assert_msgs msgs('---(0|)'), actual
end

def test_emit_value_at_point_in_time
Rx::Observable.timer(Time.now).subscribe(@observer)
await_array_length(@observer.messages, 2)
expected = [on_next(0, 0), on_completed(0)]
assert_equal expected, @observer.messages
end
def test_emit_periodic_values_after_point_in_time
actual = scheduler.configure do
Rx::Observable.timer(Rx::TestTime.new(500), 300, scheduler)
end

def test_emit_value_at_repeated_point_in_time
Rx::Observable.timer(Time.now, 0.01).subscribe(@observer)
await_array_length(@observer.messages, 3)
events = @observer.messages.map {|m| m.value }
assert events.all? {|v| Rx::OnNextNotification === v }
assert_equal [0, 1, 2], events.map {|v| v.value }.slice(0, 3)
assert_msgs msgs('-----0--1--'), actual
end
end