Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RQ: implement reliable timeout #4305

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions redash/cli/rq.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@

from click import argument
from flask.cli import AppGroup
from rq import Connection, Worker
from rq import Connection
from sqlalchemy.orm import configure_mappers

from redash import rq_redis_connection
from redash.schedule import rq_scheduler, schedule_periodic_jobs, periodic_job_definitions
from redash.tasks import HardLimitingWorker as Worker, rq_scheduler, schedule_periodic_jobs, periodic_job_definitions

manager = AppGroup(help="RQ management commands.")

Expand Down
2 changes: 2 additions & 0 deletions redash/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@
refresh_schemas, cleanup_query_results, empty_schedules)
from .alerts import check_alerts_for_query
from .failure_report import send_aggregated_errors
from .hard_limiting_worker import HardLimitingWorker
from .schedule import rq_scheduler, schedule_periodic_jobs, periodic_job_definitions
84 changes: 84 additions & 0 deletions redash/tasks/hard_limiting_worker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
import errno
import os
from rq import Worker, get_current_job
from rq.utils import utcnow
from rq.timeouts import UnixSignalDeathPenalty, HorseMonitorTimeoutException
from rq.job import JobStatus


class HardLimitingWorker(Worker):
"""
RQ's work horses enforce time limits by setting a timed alarm and stopping jobs
when they reach their time limits. However, the work horse may be entirely blocked
and may not respond to the alarm interrupt. Since respecting timeouts is critical
in Redash (if we don't respect them, workers may be infinitely stuck and as a result,
service may be denied for other queries), we enforce two time limits:
1. A soft time limit, enforced by the work horse
2. A hard time limit, enforced by the parent worker

The HardLimitingWorker class changes the default monitoring behavior of the default
RQ Worker by checking if the work horse is still busy with the job, even after
it should have timed out (+ a grace period of 15s). If it does, it kills the work horse.
"""
grace_period = 15

def soft_limit_exceeded(self, job):
seconds_under_monitor = (utcnow() - self.monitor_started).seconds

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small consideration. The timedelta docs say that .seconds is:
0 <= seconds < 3600*24 (the number of seconds in one day)
If jobs are supported that run longer than a day, probably should use .total_seconds() instead.

Suggested change
seconds_under_monitor = (utcnow() - self.monitor_started).seconds
seconds_under_monitor = (utcnow() - self.monitor_started).total_seconds()

return seconds_under_monitor > job.timeout + self.grace_period

def enforce_hard_limit(self, job):
self.log.warning('Job %s exceeded timeout of %ds (+%ds grace period) but work horse did not terminate it. '
'Killing the work horse.', job.id, job.timeout, self.grace_period)
self.kill_horse()

def monitor_work_horse(self, job):
"""The worker will monitor the work horse and make sure that it
either executes successfully or the status of the job is set to
failed
"""
self.monitor_started = utcnow()
while True:
try:
with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException):
retpid, ret_val = os.waitpid(self._horse_pid, 0)
break
except HorseMonitorTimeoutException:
# Horse has not exited yet and is still running.
# Send a heartbeat to keep the worker alive.
self.heartbeat(self.job_monitoring_interval + 5)

if self.soft_limit_exceeded(job):
self.enforce_hard_limit(job)
except OSError as e:
# In case we encountered an OSError due to EINTR (which is
# caused by a SIGINT or SIGTERM signal during
# os.waitpid()), we simply ignore it and enter the next
# iteration of the loop, waiting for the child to end. In
# any other case, this is some other unexpected OS error,
# which we don't want to catch, so we re-raise those ones.
if e.errno != errno.EINTR:
raise
# Send a heartbeat to keep the worker alive.
self.heartbeat()

if ret_val == os.EX_OK: # The process exited normally.
return
job_status = job.get_status()
if job_status is None: # Job completed and its ttl has expired
return
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:

if not job.ended_at:
job.ended_at = utcnow()

# Unhandled failure: move the job to the failed queue
self.log.warning((
'Moving job to FailedJobRegistry '
'(work-horse terminated unexpectedly; waitpid returned {})'
).format(ret_val))

self.handle_job_failure(
job,
exc_string="Work-horse process was terminated unexpectedly "
"(waitpid returned %s)" % ret_val
)
1 change: 1 addition & 0 deletions redash/schedule.py → redash/tasks/schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
queue_name="periodic",
interval=5)


def job_id(kwargs):
metadata = kwargs.copy()
metadata['func'] = metadata['func'].__name__
Expand Down
4 changes: 2 additions & 2 deletions tests/test_schedule.py → tests/tasks/test_schedule.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from unittest import TestCase
from mock import patch, ANY

from redash.schedule import rq_scheduler, schedule_periodic_jobs
from redash.tasks.schedule import rq_scheduler, schedule_periodic_jobs

class TestSchedule(TestCase):
def setUp(self):
Expand All @@ -26,7 +26,7 @@ def foo():
pass

schedule_periodic_jobs([{"func": foo, "interval": 60}])
with patch('redash.schedule.rq_scheduler.schedule') as schedule:
with patch('redash.tasks.rq_scheduler.schedule') as schedule:
schedule_periodic_jobs([{"func": foo, "interval": 60}])
schedule.assert_not_called()

Expand Down