Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Resque Scheduler authors
- Nickolas Means
- Olek Janiszewski
- Olivier Brisse
- Paul Wille
- Petteri Räty
- Phil Cohen
- Rob Olson
Expand Down
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -502,12 +502,13 @@ A future version of resque-scheduler may do this for you.
Similar to the `before_enqueue`- and `after_enqueue`-hooks provided in Resque
(>= 1.19.1), your jobs can specify one or more of the following hooks:

* `before_schedule`: Called with the job args before a job is placed on
the delayed queue. If the hook returns `false`, the job will not be placed on
the queue.
* `after_schedule`: Called with the job args after a job is placed on the
delayed queue. Any exception raised propagates up to the code with queued the
job.
* `before_schedule`: Called with the job args before a job is scheduled. This
includes delayed jobs (when placed on the delayed queue via `enqueue_at`/`enqueue_in`)
and recurring cron jobs (each time the job is enqueued).
If the hook returns `false`, the job will not be enqueued.
* `after_schedule`: Called with the job args after a job is scheduled (same
scenarios as `before_schedule`). Any exception raised propagates up to the
calling code.
* `before_delayed_enqueue`: Called with the job args after the job has been
removed from the delayed queue, but not yet put on a normal queue. It is
called before `before_enqueue`-hooks, and on the same job instance as the
Expand Down
60 changes: 37 additions & 23 deletions lib/resque/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -296,16 +296,7 @@ def handle_shutdown

# Enqueues a job based on a config hash
def enqueue_from_config(job_config)
args = job_config['args'] || job_config[:args]

klass_name = job_config['class'] || job_config[:class]
begin
klass = Resque::Scheduler::Util.constantize(klass_name)
rescue NameError
klass = klass_name
end

params = args.is_a?(Hash) ? [args] : Array(args)
klass, klass_name, params = extract_klass_and_params(job_config)
queue = job_config['queue'] ||
job_config[:queue] ||
Resque.queue_from_class(klass)
Expand All @@ -331,16 +322,14 @@ def enqueue_from_config(job_config)
# for non-existent classes (for example: running scheduler in
# one app that schedules for another.
if Class === klass
Resque::Scheduler::Plugin.process_schedule_hooks(klass, *params) do
Resque::Scheduler::Plugin.run_before_delayed_enqueue_hooks(klass, *params)
# If the class is a custom job class, call self#scheduled on it.
# This allows you to do things like Resque.enqueue_at(timestamp,
# CustomJobClass). Otherwise, pass off to Resque.
if klass.respond_to?(:scheduled)
klass.scheduled(queue, klass_name, *params)
else
Resque.enqueue_to(queue, klass, *params)
end
Resque::Scheduler::Plugin.run_before_delayed_enqueue_hooks(klass, *params)
# If the class is a custom job class, call self#scheduled on it.
# This allows you to do things like Resque.enqueue_at(timestamp,
# CustomJobClass). Otherwise, pass off to Resque.
if klass.respond_to?(:scheduled)
klass.scheduled(queue, klass_name, *params)
else
Resque.enqueue_to(queue, klass, *params)
end
else
# This will not run the before_hooks in rescue, but will at least
Expand Down Expand Up @@ -490,11 +479,36 @@ def logger
private

def enqueue_recurring(name, config)
if am_master
log! "queueing #{config['class']} (#{name})"
return unless am_master

log! "queueing #{config['class']} (#{name})"

klass, _klass_name, params = extract_klass_and_params(config)

# Run schedule hooks for cron/recurring jobs
if Class === klass
Resque::Scheduler::Plugin.process_schedule_hooks(klass, *params) do
enqueue(config)
end
else
enqueue(config)
Resque.last_enqueued_at(name, Time.now.to_s)
end

Resque.last_enqueued_at(name, Time.now.to_s)
end

def extract_klass_and_params(config)
klass_name = config['class'] || config[:class]
klass = begin
Resque::Scheduler::Util.constantize(klass_name)
rescue NameError
klass_name
end

args = config['args'] || config[:args]
params = args.is_a?(Hash) ? [args] : Array(args)

[klass, klass_name, params]
end

def app_str
Expand Down
90 changes: 83 additions & 7 deletions test/scheduler_hooks_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@ def enqueued
Resque.redis.lrange("queue:#{SomeRealClass.queue}", 0, -1).map(&JSON.method(:parse))
end

test 'before_schedule and after_scheduler hooks are called when enqueued from config' do
SomeRealClass.expects(:before_schedule_example).with('/tmp')
SomeRealClass.expects(:after_schedule_example).with('/tmp')
test 'direct enqueue does not trigger schedule hooks' do
SomeRealClass.expects(:before_schedule_example).never
SomeRealClass.expects(:after_schedule_example).never
Resque::Scheduler.enqueue(config)

assert_equal [{ 'class' => 'SomeRealClass', 'args' => ['/tmp'] }], enqueued
end

test 'any before_schedule returning false will halt the job from being enqueued' do
SomeRealClass.expects(:before_schedule_a).with('/tmp').returns(false)
SomeRealClass.expects(:before_schedule_b).with('/tmp')
test 'direct enqueue bypasses before_schedule hooks so job cannot be halted' do
SomeRealClass.expects(:before_schedule_a).never
SomeRealClass.expects(:before_schedule_b).never
SomeRealClass.expects(:after_schedule_example).never
Resque::Scheduler.enqueue(config)

assert_equal [], enqueued
# Job is enqueued because schedule hooks are not run for direct enqueue
assert_equal [{ 'class' => 'SomeRealClass', 'args' => ['/tmp'] }], enqueued
end

test 'before_schedule hook that does not return false should be enqueued' do
Expand Down Expand Up @@ -71,3 +72,78 @@ def enqueued
end
end
end

context 'delayed job hooks' do
setup do
Resque::Scheduler.quiet = true
Resque.data_store.redis.flushall
end

test 'schedule hooks are not called again when delayed job is transferred to queue' do
future_time = Time.now + 600

# Hooks should be called when the job is initially scheduled
SomeRealClass.expects(:before_schedule_example).with('foo')
SomeRealClass.expects(:after_schedule_example).with('foo')
Resque.enqueue_at(future_time, SomeRealClass, 'foo')

# When the job is transferred from delayed queue to actual queue,
# schedule hooks should NOT be called again
SomeRealClass.expects(:before_schedule_example).never
SomeRealClass.expects(:after_schedule_example).never
Resque::Scheduler.handle_delayed_items(future_time)
end

test 'before_delayed_enqueue is called when delayed job is transferred to queue' do
future_time = Time.now + 600

# Schedule the job (schedule hooks called here)
SomeRealClass.stubs(:before_schedule_example)
SomeRealClass.stubs(:after_schedule_example)
Resque.enqueue_at(future_time, SomeRealClass, 'foo')

# before_delayed_enqueue should be called when transferring to actual queue
# Note: args come back as strings after JSON serialization
SomeRealClass.expects(:before_delayed_enqueue_example).with('foo').once

Resque::Scheduler.handle_delayed_items(future_time)
end
end

context 'cron job hooks' do
def config
{
'cron' => '* * * * *',
'class' => 'SomeRealClass',
'args' => '/tmp'
}
end

def enqueued
Resque.redis.lrange("queue:#{SomeRealClass.queue}", 0, -1).map(&JSON.method(:parse))
end

setup do
Resque::Scheduler.quiet = true
Resque.data_store.redis.flushall
Resque::Scheduler.stubs(:am_master).returns(true)
end

test 'cron job triggers schedule hooks via enqueue_recurring' do
SomeRealClass.expects(:before_schedule_example).with('/tmp')
SomeRealClass.expects(:after_schedule_example).with('/tmp')

Resque::Scheduler.send(:enqueue_recurring, 'some_job', config)

assert_equal [{ 'class' => 'SomeRealClass', 'args' => ['/tmp'] }], enqueued
end

test 'cron job before_schedule returning false halts the job' do
SomeRealClass.expects(:before_schedule_example).with('/tmp').returns(false)
SomeRealClass.expects(:after_schedule_example).never

Resque::Scheduler.send(:enqueue_recurring, 'some_job', config)

assert_equal [], enqueued
end
end
9 changes: 6 additions & 3 deletions test/scheduler_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
Resque::Scheduler.enqueue_from_config(config)
end

test 'enqueue runs hooks' do
test 'enqueue_from_config runs before_delayed_enqueue and resque hooks but not schedule hooks' do
Resque::Scheduler.env = 'production'
config = {
'cron' => '* * * * *',
Expand All @@ -40,11 +40,14 @@
Resque::Job.expects(:create).with(
SomeJobWithResqueHooks.queue, SomeJobWithResqueHooks, '/tmp'
)
SomeJobWithResqueHooks.expects(:before_schedule).with('/tmp')
# Schedule hooks are NOT called from enqueue_from_config (only from enqueue_recurring)
SomeJobWithResqueHooks.expects(:before_schedule).never
SomeJobWithResqueHooks.expects(:after_schedule).never
# before_delayed_enqueue IS called
SomeJobWithResqueHooks.expects(:before_delayed_enqueue_example).with('/tmp')
# Resque hooks are still called
SomeJobWithResqueHooks.expects(:before_enqueue_example).with('/tmp')
SomeJobWithResqueHooks.expects(:after_enqueue_example).with('/tmp')
SomeJobWithResqueHooks.expects(:after_schedule).with('/tmp')

Resque::Scheduler.enqueue_from_config(config)
end
Expand Down
Loading