Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi schedulers #143

Merged
merged 3 commits into from
Feb 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 41 additions & 5 deletions rq_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,16 @@ def __init__(self, queue_name='default', interval=60, connection=None):
self.queue_name = queue_name
self._interval = interval
self.log = logger
self._lock_acquired = False

def register_birth(self):
if self.connection.exists(self.scheduler_key) and \
not self.connection.hexists(self.scheduler_key, 'death'):
raise ValueError("There's already an active RQ scheduler")

key = self.scheduler_key
now = time.time()

with self.connection._pipeline() as p:
p.delete(key)
p.hset(key, 'birth', now)
Expand All @@ -49,6 +52,29 @@ def register_death(self):
p.expire(self.scheduler_key, 60)
p.execute()

def acquire_lock(self):
"""
Acquire lock before scheduling jobs to prevent another scheduler
from scheduling jobs at the same time.

This function returns True if a lock is acquired. False otherwise.
"""
key = '%s_lock' % self.scheduler_key
now = time.time()
expires = int(self._interval) + 10
self._lock_acquired = self.connection.set(
key, now, ex=expires, nx=True)
return self._lock_acquired

def remove_lock(self):
"""
Remove acquired lock.
"""
key = '%s_lock' % self.scheduler_key

if self._lock_acquired:
self.connection.delete(key)

def _install_signal_handlers(self):
"""
Installs signal handlers for handling SIGINT and SIGTERM
Expand All @@ -57,10 +83,12 @@ def _install_signal_handlers(self):

def stop(signum, frame):
"""
Register scheduler's death and exit.
Register scheduler's death and exit
and remove previously acquired lock and exit.
"""
self.log.info('Shutting down RQ scheduler...')
self.register_death()
self.remove_lock()
raise SystemExit()

signal.signal(signal.SIGINT, stop)
Expand Down Expand Up @@ -333,14 +361,22 @@ def run(self, burst=False):
lower than current time).
"""
self.log.info('Running RQ scheduler...')

self.register_birth()
self._install_signal_handlers()

try:
while True:
self.enqueue_jobs()
if burst:
self.log.info('RQ scheduler done, quitting')
break
if self.acquire_lock():
self.enqueue_jobs()

if burst:
self.log.info('RQ scheduler done, quitting')
break
else:
self.log.info('Waiting for lock...')

time.sleep(self._interval)
finally:
self.remove_lock()
self.register_death()
46 changes: 31 additions & 15 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,39 @@ def setUp(self):
super(TestScheduler, self).setUp()
self.scheduler = Scheduler(connection=self.testconn)

def test_birth_and_death_registration(self):
def test_acquire_lock(self):
"""
When scheduler registers it's birth, besides creating a key, it should
When scheduler acquires a lock, besides creating a key, it should
also set an expiry that's a few seconds longer than it's polling
interval so it automatically expires if scheduler is unexpectedly
terminated.
"""
key = Scheduler.scheduler_key
key = '%s_lock' % Scheduler.scheduler_key
self.assertNotIn(key, tl(self.testconn.keys('*')))
scheduler = Scheduler(connection=self.testconn, interval=20)
scheduler.register_birth()
self.assertTrue(scheduler.acquire_lock())
self.assertIn(key, tl(self.testconn.keys('*')))
self.assertEqual(self.testconn.ttl(key), 30)
self.assertFalse(self.testconn.hexists(key, 'death'))
self.assertRaises(ValueError, scheduler.register_birth)
scheduler.register_death()
self.assertTrue(self.testconn.hexists(key, 'death'))
scheduler.remove_lock()
self.assertNotIn(key, tl(self.testconn.keys('*')))

def test_no_two_schedulers_acquire_lock(self):
"""
Ensure that no two schedulers can acquire the lock at the
same time. When removing the lock, only the scheduler which
originally acquired the lock can remove the lock.
"""
key = '%s_lock' % Scheduler.scheduler_key
self.assertNotIn(key, tl(self.testconn.keys('*')))
scheduler1 = Scheduler(connection=self.testconn, interval=20)
scheduler2 = Scheduler(connection=self.testconn, interval=20)
self.assertTrue(scheduler1.acquire_lock())
self.assertFalse(scheduler2.acquire_lock())
self.assertIn(key, tl(self.testconn.keys('*')))
scheduler2.remove_lock()
self.assertIn(key, tl(self.testconn.keys('*')))
scheduler1.remove_lock()
self.assertNotIn(key, tl(self.testconn.keys('*')))

def test_create_job(self):
"""
Expand Down Expand Up @@ -508,24 +524,24 @@ def test_small_float_interval(self):
Test that scheduler accepts 'interval' of type float, less than 1 second.
"""
key = Scheduler.scheduler_key
lock_key = '%s_lock' % Scheduler.scheduler_key
self.assertNotIn(key, tl(self.testconn.keys('*')))
scheduler = Scheduler(connection=self.testconn, interval=0.1) # testing interval = 0.1 second
self.assertEqual(scheduler._interval, 0.1)

#register birth
scheduler.register_birth()
self.assertIn(key, tl(self.testconn.keys('*')))
self.assertEqual(self.testconn.ttl(key), 10) # int(0.1) + 10 = 10
self.assertFalse(self.testconn.hexists(key, 'death'))
#acquire lock
self.assertTrue(scheduler.acquire_lock())
self.assertIn(lock_key, tl(self.testconn.keys('*')))
self.assertEqual(self.testconn.ttl(lock_key), 10) # int(0.1) + 10 = 10

#enqueue a job
now = datetime.utcnow()
job = scheduler.enqueue_at(now, say_hello)
self.assertIn(job, self.scheduler.get_jobs_to_queue())
self.assertEqual(len(self.scheduler.get_jobs()), 1)

#register death
scheduler.register_death()
#remove the lock
scheduler.remove_lock()

#test that run works with the small floating-point interval
def send_stop_signal():
Expand Down