From ed9105870ff698299f0abf47a25cbd6abf31b794 Mon Sep 17 00:00:00 2001 From: James Adam Date: Tue, 6 Sep 2016 13:37:14 +0100 Subject: [PATCH] Allow jobs to be queuable once the existing job starts performing This is useful when you know that the currently running job might not be taking into account the most recent updates to some of the data it is operating on, which can occur when the jobs take some time to run. For example: 1. update model 2. queue job to index in ElasticSearch 3. job starts performing 4. another update happens to the model 5. try to queue job to update index With the default queue locking mechanism, step 5 will only succeed in enqueuing a new job if the job that started in step 3 has already completed. However, if that job hasn't finished, but has loaded the model with only the changes from step 1, then the changes introduced in step 4 won't get indexed. With `unlock_while_performing` set to true, there can always be a single pending job in the queue, even when there's also a job running, and so we can be confident that any un-indexed changes to the model will get indexed once the final job finishes running. --- README.md | 18 ++++++++++++++++++ lib/resque/plugins/lock.rb | 4 ++++ test/lock_test.rb | 38 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+) diff --git a/README.md b/README.md index 6d5e49b..2c206f2 100644 --- a/README.md +++ b/README.md @@ -43,4 +43,22 @@ UpdateNetworkGraph is queued at a time, regardless of the repo_id. Normally a job is locked using a combination of its class name and arguments. +If you don't want to have to wait until a job has completed +before being able to enqueue another job with the same +arguments, you can set the `unlock_while_performing` flag: + + class UpdateNetworkGraph + extend Resque::Plugins::Lock + @unlock_while_performing = true + + # etc ... + end + +With this option set, another job of the same type with the +same arguments can be queued even while the original one is +being performed. Otherwise, the queue will remain locked +until the job has completed. This option can be useful if you +know that some data has changed which the currently-performing +job will not have taken into account. + [rq]: http://github.com/defunkt/resque diff --git a/lib/resque/plugins/lock.rb b/lib/resque/plugins/lock.rb index dccfff2..716a0ce 100644 --- a/lib/resque/plugins/lock.rb +++ b/lib/resque/plugins/lock.rb @@ -75,6 +75,10 @@ def before_enqueue_lock(*args) now > Resque.redis.getset(key, timeout).to_i end + def before_perform_lock(*args) + Resque.redis.del(lock(*args)) if @unlock_while_performing + end + def around_perform_lock(*args) begin yield diff --git a/test/lock_test.rb b/test/lock_test.rb index 5e4a25c..ec480bc 100644 --- a/test/lock_test.rb +++ b/test/lock_test.rb @@ -21,6 +21,7 @@ def self.perform def setup Resque.redis.del('queue:lock_test') Resque.redis.del(Job.lock) + Resque.redis.del(JobWithOptionalQueueOnlyLocking.lock) end def test_lint @@ -57,4 +58,41 @@ def test_deadlock Resque.enqueue(Job) assert_equal 2, Resque.redis.llen('queue:lock_test') end + + class JobWithOptionalQueueOnlyLocking + extend Resque::Plugins::Lock + @queue = :lock_test + class << self + attr_accessor :unlock_while_performing + end + + def self.perform + Resque.enqueue(self) + if unlock_while_performing + raise 'this job should be queueable while it is running' unless + Resque.redis.llen('queue:lock_test') == 1 + else + raise 'this job should NOT be queueable while it is running' unless + Resque.redis.llen('queue:lock_test') == 0 + end + end + end + + def test_queue_is_normally_locked_when_job_running + JobWithOptionalQueueOnlyLocking.unlock_while_performing = nil + Resque.enqueue(JobWithOptionalQueueOnlyLocking) + job = Resque.reserve('lock_test') + job.perform + rescue => e + flunk e.message + end + + def test_queue_only_locking + JobWithOptionalQueueOnlyLocking.unlock_while_performing = true + Resque.enqueue(JobWithOptionalQueueOnlyLocking) + job = Resque.reserve('lock_test') + job.perform + rescue => e + flunk e.message + end end