Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow multiple schedulers to run #104

Merged
merged 1 commit into from
Feb 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 23 additions & 24 deletions rq_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,28 +27,27 @@ def __init__(self, queue_name='default', interval=60, connection=None):
self.queue_name = queue_name
self._interval = interval
self.log = logger
self._lock_acquired = False

def register_birth(self):
if self.connection.exists(self.scheduler_key) and \
not self.connection.hexists(self.scheduler_key, 'death'):
raise ValueError("There's already an active RQ scheduler")
def acquire_lock(self):
"""
Acquire lock before scheduling jobs to prevent another scheduler
from scheduling jobs at the same time.

This function returns True if a lock is acquired. False otherwise.
"""
key = self.scheduler_key
now = time.time()
with self.connection._pipeline() as p:
p.delete(key)
p.hset(key, 'birth', now)
# Set scheduler key to expire a few seconds after polling interval
# This way, the key will automatically expire if scheduler
# quits unexpectedly
p.expire(key, int(self._interval) + 10)
p.execute()

def register_death(self):
"""Registers its own death."""
with self.connection._pipeline() as p:
p.hset(self.scheduler_key, 'death', time.time())
p.expire(self.scheduler_key, 60)
p.execute()
expires = int(self._interval) + 10
self._lock_acquired = self.connection.set(key, now, ex=expires, nx=True)
return self._lock_acquired

def remove_lock(self):
"""
Remove acquired lock.
"""
if self._lock_acquired:
self.connection.delete(self.scheduler_key)

def _install_signal_handlers(self):
"""
Expand All @@ -58,10 +57,10 @@ def _install_signal_handlers(self):

def stop(signum, frame):
"""
Register scheduler's death and exit.
Remove previously acquired lock and exit.
"""
self.log.info('Shutting down RQ scheduler...')
self.register_death()
self.remove_lock()
raise SystemExit()

signal.signal(signal.SIGINT, stop)
Expand Down Expand Up @@ -302,11 +301,11 @@ def run(self):
lower than current time).
"""
self.log.info('Running RQ scheduler...')
self.register_birth()
self._install_signal_handlers()
try:
while True:
self.enqueue_jobs()
if self.acquire_lock():
self.enqueue_jobs()
time.sleep(self._interval)
finally:
self.register_death()
self.remove_lock()
39 changes: 27 additions & 12 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,39 @@ def setUp(self):
super(TestScheduler, self).setUp()
self.scheduler = Scheduler(connection=self.testconn)

def test_birth_and_death_registration(self):
def test_acquire_lock(self):
"""
When scheduler registers it's birth, besides creating a key, it should
When scheduler acquires a lock, besides creating a key, it should
also set an expiry that's a few seconds longer than it's polling
interval so it automatically expires if scheduler is unexpectedly
terminated.
"""
key = Scheduler.scheduler_key
self.assertNotIn(key, tl(self.testconn.keys('*')))
scheduler = Scheduler(connection=self.testconn, interval=20)
scheduler.register_birth()
self.assertTrue(scheduler.acquire_lock())
self.assertIn(key, tl(self.testconn.keys('*')))
self.assertEqual(self.testconn.ttl(key), 30)
self.assertFalse(self.testconn.hexists(key, 'death'))
self.assertRaises(ValueError, scheduler.register_birth)
scheduler.register_death()
self.assertTrue(self.testconn.hexists(key, 'death'))
scheduler.remove_lock()
self.assertNotIn(key, tl(self.testconn.keys('*')))

def test_no_two_schedulers_acquire_lock(self):
"""
Ensure that no two schedulers can acquire the lock at the
same time. When removing the lock, only the scheduler which
originally acquired the lock can remove the lock.
"""
key = Scheduler.scheduler_key
self.assertNotIn(key, tl(self.testconn.keys('*')))
scheduler1 = Scheduler(connection=self.testconn, interval=20)
scheduler2 = Scheduler(connection=self.testconn, interval=20)
self.assertTrue(scheduler1.acquire_lock())
self.assertFalse(scheduler2.acquire_lock())
self.assertIn(key, tl(self.testconn.keys('*')))
scheduler2.remove_lock()
self.assertIn(key, tl(self.testconn.keys('*')))
scheduler1.remove_lock()
self.assertNotIn(key, tl(self.testconn.keys('*')))

def test_create_job(self):
"""
Expand Down Expand Up @@ -391,20 +407,19 @@ def test_small_float_interval(self):
scheduler = Scheduler(connection=self.testconn, interval=0.1) # testing interval = 0.1 second
self.assertEqual(scheduler._interval, 0.1)

#register birth
scheduler.register_birth()
#acquire lock
self.assertTrue(scheduler.acquire_lock())
self.assertIn(key, tl(self.testconn.keys('*')))
self.assertEqual(self.testconn.ttl(key), 10) # int(0.1) + 10 = 10
self.assertFalse(self.testconn.hexists(key, 'death'))

#enqueue a job
now = datetime.utcnow()
job = scheduler.enqueue_at(now, say_hello)
self.assertIn(job, self.scheduler.get_jobs_to_queue())
self.assertEqual(len(self.scheduler.get_jobs()), 1)

#register death
scheduler.register_death()
#remove the lock
scheduler.remove_lock()

#test that run works with the small floating-point interval
def send_stop_signal():
Expand Down