Skip to content

Commit

Permalink
Allow multiple schedulers to run
Browse files Browse the repository at this point in the history
Schedulers have to acquire a lock before one can schedule jobs. The
lock automatically expires in case a scheduler is terminated unexpectedly.
Schedulers that cannot acquire a lock will sleep for the duration of
polling interval before retrying.

refs rq#70
  • Loading branch information
cheungpat committed Dec 12, 2015
1 parent 4e6837b commit 7e13092
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 36 deletions.
47 changes: 23 additions & 24 deletions rq_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,27 @@ def __init__(self, queue_name='default', interval=60, connection=None):
self.queue_name = queue_name
self._interval = interval
self.log = logger
self._lock_acquired = False

def register_birth(self):
if self.connection.exists(self.scheduler_key) and \
not self.connection.hexists(self.scheduler_key, 'death'):
raise ValueError("There's already an active RQ scheduler")
def acquire_lock(self):
"""
Acquire lock before scheduling jobs to prevent another scheduler
from scheduling jobs at the same time.
This function returns True if a lock is acquired. False otherwise.
"""
key = self.scheduler_key
now = time.time()
with self.connection._pipeline() as p:
p.delete(key)
p.hset(key, 'birth', now)
# Set scheduler key to expire a few seconds after polling interval
# This way, the key will automatically expire if scheduler
# quits unexpectedly
p.expire(key, int(self._interval) + 10)
p.execute()

def register_death(self):
"""Registers its own death."""
with self.connection._pipeline() as p:
p.hset(self.scheduler_key, 'death', time.time())
p.expire(self.scheduler_key, 60)
p.execute()
expires = int(self._interval) + 10
self._lock_acquired = self.connection.set(key, now, ex=expires, nx=True)
return self._lock_acquired

def remove_lock(self):
"""
Remove acquired lock.
"""
if self._lock_acquired:
self.connection.delete(self.scheduler_key)

def _install_signal_handlers(self):
"""
Expand All @@ -58,10 +57,10 @@ def _install_signal_handlers(self):

def stop(signum, frame):
"""
Register scheduler's death and exit.
Remove previously acquired lock and exit.
"""
self.log.info('Shutting down RQ scheduler...')
self.register_death()
self.remove_lock()
raise SystemExit()

signal.signal(signal.SIGINT, stop)
Expand Down Expand Up @@ -302,11 +301,11 @@ def run(self):
lower than current time).
"""
self.log.info('Running RQ scheduler...')
self.register_birth()
self._install_signal_handlers()
try:
while True:
self.enqueue_jobs()
if self.acquire_lock():
self.enqueue_jobs()
time.sleep(self._interval)
finally:
self.register_death()
self.remove_lock()
39 changes: 27 additions & 12 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,39 @@ def setUp(self):
super(TestScheduler, self).setUp()
self.scheduler = Scheduler(connection=self.testconn)

def test_birth_and_death_registration(self):
def test_acquire_lock(self):
"""
When scheduler registers it's birth, besides creating a key, it should
When scheduler acquires a lock, besides creating a key, it should
also set an expiry that's a few seconds longer than it's polling
interval so it automatically expires if scheduler is unexpectedly
terminated.
"""
key = Scheduler.scheduler_key
self.assertNotIn(key, tl(self.testconn.keys('*')))
scheduler = Scheduler(connection=self.testconn, interval=20)
scheduler.register_birth()
self.assertTrue(scheduler.acquire_lock())
self.assertIn(key, tl(self.testconn.keys('*')))
self.assertEqual(self.testconn.ttl(key), 30)
self.assertFalse(self.testconn.hexists(key, 'death'))
self.assertRaises(ValueError, scheduler.register_birth)
scheduler.register_death()
self.assertTrue(self.testconn.hexists(key, 'death'))
scheduler.remove_lock()
self.assertNotIn(key, tl(self.testconn.keys('*')))

def test_no_two_schedulers_acquire_lock(self):
"""
Ensure that no two schedulers can acquire the lock at the
same time. When removing the lock, only the scheduler which
originally acquired the lock can remove the lock.
"""
key = Scheduler.scheduler_key
self.assertNotIn(key, tl(self.testconn.keys('*')))
scheduler1 = Scheduler(connection=self.testconn, interval=20)
scheduler2 = Scheduler(connection=self.testconn, interval=20)
self.assertTrue(scheduler1.acquire_lock())
self.assertFalse(scheduler2.acquire_lock())
self.assertIn(key, tl(self.testconn.keys('*')))
scheduler2.remove_lock()
self.assertIn(key, tl(self.testconn.keys('*')))
scheduler1.remove_lock()
self.assertNotIn(key, tl(self.testconn.keys('*')))

def test_create_job(self):
"""
Expand Down Expand Up @@ -391,20 +407,19 @@ def test_small_float_interval(self):
scheduler = Scheduler(connection=self.testconn, interval=0.1) # testing interval = 0.1 second
self.assertEqual(scheduler._interval, 0.1)

#register birth
scheduler.register_birth()
#acquire lock
self.assertTrue(scheduler.acquire_lock())
self.assertIn(key, tl(self.testconn.keys('*')))
self.assertEqual(self.testconn.ttl(key), 10) # int(0.1) + 10 = 10
self.assertFalse(self.testconn.hexists(key, 'death'))

#enqueue a job
now = datetime.utcnow()
job = scheduler.enqueue_at(now, say_hello)
self.assertIn(job, self.scheduler.get_jobs_to_queue())
self.assertEqual(len(self.scheduler.get_jobs()), 1)

#register death
scheduler.register_death()
#remove the lock
scheduler.remove_lock()

#test that run works with the small floating-point interval
def send_stop_signal():
Expand Down

0 comments on commit 7e13092

Please sign in to comment.