From 3bb38b6c64e64534b06e376c6be449d20dba68b8 Mon Sep 17 00:00:00 2001 From: Henry Shi Date: Thu, 1 Sep 2016 02:42:21 -0400 Subject: [PATCH] Allow multiple schedulers to run, based on https://github.com/ui/rq-scheduler/pull/104 --- rq_scheduler/scheduler.py | 46 +++++++++++++++++++-------------------- tests/test_scheduler.py | 39 +++++++++++++++++++++++---------- 2 files changed, 49 insertions(+), 36 deletions(-) diff --git a/rq_scheduler/scheduler.py b/rq_scheduler/scheduler.py index 2231d12..56fc17a 100644 --- a/rq_scheduler/scheduler.py +++ b/rq_scheduler/scheduler.py @@ -26,28 +26,26 @@ def __init__(self, queue_name='default', interval=60, connection=None): self.queue_name = queue_name self._interval = interval self.log = logger + self._lock_acquired = False - def register_birth(self): - if self.connection.exists(self.scheduler_key) and \ - not self.connection.hexists(self.scheduler_key, 'death'): - raise ValueError("There's already an active RQ scheduler") + def acquire_lock(self): + """ + Acquire lock before scheduling jobs to prevent another scheduler + from scheduling jobs at the same time. + This function returns True if a lock is acquired. False otherwise. + """ key = self.scheduler_key now = time.time() - with self.connection._pipeline() as p: - p.delete(key) - p.hset(key, 'birth', now) - # Set scheduler key to expire a few seconds after polling interval - # This way, the key will automatically expire if scheduler - # quits unexpectedly - p.expire(key, int(self._interval) + 10) - p.execute() - - def register_death(self): - """Registers its own death.""" - with self.connection._pipeline() as p: - p.hset(self.scheduler_key, 'death', time.time()) - p.expire(self.scheduler_key, 60) - p.execute() + expires = int(self._interval) + 10 + self._lock_acquired = self.connection.set(key, now, ex=expires, nx=True) + return self._lock_acquired + + def remove_lock(self): + """ + Remove acquired lock. + """ + if self._lock_acquired: + self.connection.delete(self.scheduler_key) def _install_signal_handlers(self): """ @@ -57,10 +55,10 @@ def _install_signal_handlers(self): def stop(signum, frame): """ - Register scheduler's death and exit. + Remove previously acquired lock and exit. """ self.log.info('Shutting down RQ scheduler...') - self.register_death() + self.remove_lock() raise SystemExit() signal.signal(signal.SIGINT, stop) @@ -329,11 +327,11 @@ def run(self): lower than current time). """ self.log.info('Running RQ scheduler...') - self.register_birth() self._install_signal_handlers() try: while True: - self.enqueue_jobs() + if self.acquire_lock(): + self.enqueue_jobs() time.sleep(self._interval) finally: - self.register_death() + self.remove_lock() diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 0e8c082..32ac3fe 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -34,9 +34,9 @@ def setUp(self): super(TestScheduler, self).setUp() self.scheduler = Scheduler(connection=self.testconn) - def test_birth_and_death_registration(self): + def test_acquire_lock(self): """ - When scheduler registers it's birth, besides creating a key, it should + When scheduler acquires a lock, besides creating a key, it should also set an expiry that's a few seconds longer than it's polling interval so it automatically expires if scheduler is unexpectedly terminated. @@ -44,13 +44,29 @@ def test_birth_and_death_registration(self): key = Scheduler.scheduler_key self.assertNotIn(key, tl(self.testconn.keys('*'))) scheduler = Scheduler(connection=self.testconn, interval=20) - scheduler.register_birth() + self.assertTrue(scheduler.acquire_lock()) self.assertIn(key, tl(self.testconn.keys('*'))) self.assertEqual(self.testconn.ttl(key), 30) - self.assertFalse(self.testconn.hexists(key, 'death')) - self.assertRaises(ValueError, scheduler.register_birth) - scheduler.register_death() - self.assertTrue(self.testconn.hexists(key, 'death')) + scheduler.remove_lock() + self.assertNotIn(key, tl(self.testconn.keys('*'))) + + def test_no_two_schedulers_acquire_lock(self): + """ + Ensure that no two schedulers can acquire the lock at the + same time. When removing the lock, only the scheduler which + originally acquired the lock can remove the lock. + """ + key = Scheduler.scheduler_key + self.assertNotIn(key, tl(self.testconn.keys('*'))) + scheduler1 = Scheduler(connection=self.testconn, interval=20) + scheduler2 = Scheduler(connection=self.testconn, interval=20) + self.assertTrue(scheduler1.acquire_lock()) + self.assertFalse(scheduler2.acquire_lock()) + self.assertIn(key, tl(self.testconn.keys('*'))) + scheduler2.remove_lock() + self.assertIn(key, tl(self.testconn.keys('*'))) + scheduler1.remove_lock() + self.assertNotIn(key, tl(self.testconn.keys('*'))) def test_create_job(self): """ @@ -468,11 +484,10 @@ def test_small_float_interval(self): scheduler = Scheduler(connection=self.testconn, interval=0.1) # testing interval = 0.1 second self.assertEqual(scheduler._interval, 0.1) - #register birth - scheduler.register_birth() + #acquire lock + self.assertTrue(scheduler.acquire_lock()) self.assertIn(key, tl(self.testconn.keys('*'))) self.assertEqual(self.testconn.ttl(key), 10) # int(0.1) + 10 = 10 - self.assertFalse(self.testconn.hexists(key, 'death')) #enqueue a job now = datetime.utcnow() @@ -480,8 +495,8 @@ def test_small_float_interval(self): self.assertIn(job, self.scheduler.get_jobs_to_queue()) self.assertEqual(len(self.scheduler.get_jobs()), 1) - #register death - scheduler.register_death() + #remove the lock + scheduler.remove_lock() #test that run works with the small floating-point interval def send_stop_signal():