Skip to content

Commit

Permalink
Update overwriten code with newest changes
Browse files Browse the repository at this point in the history
Something changed in python-rq and the old code was behaving in a way that if a job ran for longer than 2 min it would be automatically set as failed, but it would continue running.

This causes a problem in the UI because it is as if the job stopped, but it actually didn't
  • Loading branch information
thiagogds committed Oct 17, 2024
1 parent 7c22756 commit 3003649
Showing 1 changed file with 36 additions and 19 deletions.
55 changes: 36 additions & 19 deletions redash/tasks/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from rq import Queue as BaseQueue
from rq.job import Job as BaseJob
from rq.job import JobStatus
from rq.timeouts import HorseMonitorTimeoutException, UnixSignalDeathPenalty
from rq.timeouts import HorseMonitorTimeoutException
from rq.utils import utcnow
from rq.worker import (
HerokuWorker, # HerokuWorker implements graceful shutdown on SIGTERM
Expand Down Expand Up @@ -113,30 +113,44 @@ def enforce_hard_limit(self, job):
)
self.kill_horse()

def monitor_work_horse(self, job, queue):
def monitor_work_horse(self, job: "Job", queue: "Queue"):
"""The worker will monitor the work horse and make sure that it
either executes successfully or the status of the job is set to
failed
Args:
job (Job): _description_
queue (Queue): _description_
"""
self.monitor_started = utcnow()
retpid = ret_val = rusage = None
job.started_at = utcnow()
while True:
try:
with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException):
retpid, ret_val = os.waitpid(self._horse_pid, 0)
with self.death_penalty_class(self.job_monitoring_interval, HorseMonitorTimeoutException):
retpid, ret_val, rusage = self.wait_for_horse()
break
except HorseMonitorTimeoutException:
# Horse has not exited yet and is still running.
# Send a heartbeat to keep the worker alive.
self.heartbeat(self.job_monitoring_interval + 5)
self.set_current_job_working_time((utcnow() - job.started_at).total_seconds())

job.refresh()
# Kill the job from this side if something is really wrong (interpreter lock/etc).
if job.timeout != -1 and self.current_job_working_time > (job.timeout + 60): # type: ignore
self.heartbeat(self.job_monitoring_interval + 60)
self.kill_horse()
self.wait_for_horse()
break

self.maintain_heartbeats(job)

if job.is_cancelled:
self.stop_executing_job(job)

if self.soft_limit_exceeded(job):
self.enforce_hard_limit(job)

except OSError as e:
# In case we encountered an OSError due to EINTR (which is
# caused by a SIGINT or SIGTERM signal during
Expand All @@ -149,29 +163,32 @@ def monitor_work_horse(self, job, queue):
# Send a heartbeat to keep the worker alive.
self.heartbeat()

self.set_current_job_working_time(0)
self._horse_pid = 0 # Set horse PID to 0, horse has finished working
if ret_val == os.EX_OK: # The process exited normally.
return

job_status = job.get_status()

if job_status is None: # Job completed and its ttl has expired
return
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
elif self._stopped_job_id == job.id:
# Work-horse killed deliberately
self.log.warning("Job stopped by user, moving job to FailedJobRegistry")
if job.stopped_callback:
job.execute_stopped_callback(self.death_penalty_class)
self.handle_job_failure(job, queue=queue, exc_string="Job stopped by user, work-horse terminated.")
elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
if not job.ended_at:
job.ended_at = utcnow()

# Unhandled failure: move the job to the failed queue
self.log.warning(
(
"Moving job to FailedJobRegistry "
"(work-horse terminated unexpectedly; waitpid returned {})" # fmt: skip
).format(ret_val)
)

self.handle_job_failure(
job,
queue=queue,
exc_string="Work-horse process was terminated unexpectedly "
"(waitpid returned %s)" % ret_val, # fmt: skip
)
signal_msg = f" (signal {os.WTERMSIG(ret_val)})" if ret_val and os.WIFSIGNALED(ret_val) else ""
exc_string = f"Work-horse terminated unexpectedly; waitpid returned {ret_val}{signal_msg}; "
self.log.warning("Moving job to FailedJobRegistry (%s)", exc_string)

self.handle_work_horse_killed(job, retpid, ret_val, rusage)
self.handle_job_failure(job, queue=queue, exc_string=exc_string)


class RedashWorker(StatsdRecordingWorker, HardLimitingWorker):
Expand Down

0 comments on commit 3003649

Please sign in to comment.