Skip to content

Commit

Permalink
Add retry lock mechanism for multiple schedulers
Browse files Browse the repository at this point in the history
This is based on #104.
  • Loading branch information
Mauro Doglio committed Sep 19, 2016
1 parent 396efad commit 6e1a103
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 7 deletions.
40 changes: 34 additions & 6 deletions rq_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
class Scheduler(object):
scheduler_key = 'rq:scheduler'
scheduled_jobs_key = 'rq:scheduler:scheduled_jobs'
scheduler_lock_key = 'rq:scheduler:lock'

def __init__(self, queue_name='default', interval=60, connection=None):
from rq.connections import resolve_connection
Expand All @@ -27,9 +28,11 @@ def __init__(self, queue_name='default', interval=60, connection=None):
self._interval = interval
self.log = logger

def register_birth(self):
if self.connection.exists(self.scheduler_key) and \
not self.connection.hexists(self.scheduler_key, 'death'):
def register_birth(self, retry=False):
self._lock_acquired = self._acquire_lock()
if retry:
self._retry_lock()
elif not self._lock_acquired:
raise ValueError("There's already an active RQ scheduler")
key = self.scheduler_key
now = time.time()
Expand All @@ -47,6 +50,7 @@ def register_death(self):
with self.connection._pipeline() as p:
p.hset(self.scheduler_key, 'death', time.time())
p.expire(self.scheduler_key, 60)
p.delete(self.scheduler_lock_key)
p.execute()

def _install_signal_handlers(self):
Expand Down Expand Up @@ -323,17 +327,41 @@ def enqueue_jobs(self):
for job in jobs:
self.enqueue_job(job)

# Refresh scheduler key's expiry
# Refresh scheduler keys' expiry
self.connection.expire(self.scheduler_key, int(self._interval) + 10)
self.connection.expire(self.scheduler_lock_key, int(self._interval) + 10)
return jobs

def run(self, burst=False):
def _acquire_lock(self):
"""
Acquire lock before scheduling jobs to prevent another scheduler
from scheduling jobs at the same time.
This function returns True if a lock is acquired. False otherwise.
"""
key = self.scheduler_lock_key
now = time.time()
expires = int(self._interval) + 10
return self.connection.set(key, now, ex=expires, nx=True)

def _retry_lock(self):
"""
Try to acquire a lock until acquired, sleeping between each iteration.
"""
while not self._lock_acquired:
self.log.info("There's already an active RQ scheduler, retrying in 10 seconds...")
try:
time.sleep(10)
except KeyboardInterrupt:
raise SystemExit()
self._lock_acquired = self._acquire_lock()

def run(self, burst=False, retry=False):
"""
Periodically check whether there's any job that should be put in the queue (score
lower than current time).
"""
self.log.info('Running RQ scheduler...')
self.register_birth()
self.register_birth(retry=retry)
self._install_signal_handlers()
try:
while True:
Expand Down
4 changes: 3 additions & 1 deletion rq_scheduler/scripts/rqscheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ def main():
queue (in seconds, can be floating-point for more precision).")
parser.add_argument('--path', default='.', help='Specify the import path.')
parser.add_argument('--pid', help='A filename to use for the PID file.', metavar='FILE')
parser.add_argument('--retry', action='store_true', default=False,
help='Tell the scheduler to retry the registration process.')

args = parser.parse_args()

Expand All @@ -50,7 +52,7 @@ def main():
setup_loghandlers(level)

scheduler = Scheduler(connection=connection, interval=args.interval)
scheduler.run(burst=args.burst)
scheduler.run(burst=args.burst, retry=args.retry)

if __name__ == '__main__':
main()
12 changes: 12 additions & 0 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,3 +542,15 @@ def send_stop_signal():

#all jobs must have been scheduled during 1 second
self.assertEqual(len(scheduler.get_jobs()), 0)

def test_no_two_schedulers_acquire_lock(self):
"""
Ensure that no two schedulers can acquire the lock at the
same time.
"""
key = Scheduler.scheduler_key
self.assertNotIn(key, tl(self.testconn.keys('*')))
scheduler1 = Scheduler(connection=self.testconn, interval=20)
scheduler2 = Scheduler(connection=self.testconn, interval=20)
self.assertTrue(scheduler1._acquire_lock())
self.assertFalse(scheduler2._acquire_lock())

0 comments on commit 6e1a103

Please sign in to comment.