Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 36 additions & 19 deletions redash/tasks/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from rq import Queue as BaseQueue
from rq.job import Job as BaseJob
from rq.job import JobStatus
from rq.timeouts import HorseMonitorTimeoutException, UnixSignalDeathPenalty
from rq.timeouts import HorseMonitorTimeoutException
from rq.utils import utcnow
from rq.worker import (
HerokuWorker, # HerokuWorker implements graceful shutdown on SIGTERM
Expand Down Expand Up @@ -113,30 +113,44 @@ def enforce_hard_limit(self, job):
)
self.kill_horse()

def monitor_work_horse(self, job, queue):
def monitor_work_horse(self, job: "Job", queue: "Queue"):
"""The worker will monitor the work horse and make sure that it
either executes successfully or the status of the job is set to
failed

Args:
job (Job): _description_
queue (Queue): _description_
"""
self.monitor_started = utcnow()
retpid = ret_val = rusage = None
job.started_at = utcnow()
while True:
try:
with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException):
retpid, ret_val = os.waitpid(self._horse_pid, 0)
with self.death_penalty_class(self.job_monitoring_interval, HorseMonitorTimeoutException):
retpid, ret_val, rusage = self.wait_for_horse()
break
except HorseMonitorTimeoutException:
# Horse has not exited yet and is still running.
# Send a heartbeat to keep the worker alive.
self.heartbeat(self.job_monitoring_interval + 5)
self.set_current_job_working_time((utcnow() - job.started_at).total_seconds())

job.refresh()
# Kill the job from this side if something is really wrong (interpreter lock/etc).
if job.timeout != -1 and self.current_job_working_time > (job.timeout + 60): # type: ignore
self.heartbeat(self.job_monitoring_interval + 60)
self.kill_horse()
self.wait_for_horse()
break

self.maintain_heartbeats(job)

if job.is_cancelled:
self.stop_executing_job(job)

if self.soft_limit_exceeded(job):
self.enforce_hard_limit(job)

except OSError as e:
# In case we encountered an OSError due to EINTR (which is
# caused by a SIGINT or SIGTERM signal during
Expand All @@ -149,29 +163,32 @@ def monitor_work_horse(self, job, queue):
# Send a heartbeat to keep the worker alive.
self.heartbeat()

self.set_current_job_working_time(0)
self._horse_pid = 0 # Set horse PID to 0, horse has finished working
if ret_val == os.EX_OK: # The process exited normally.
return

job_status = job.get_status()

if job_status is None: # Job completed and its ttl has expired
return
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
elif self._stopped_job_id == job.id:
# Work-horse killed deliberately
self.log.warning("Job stopped by user, moving job to FailedJobRegistry")
if job.stopped_callback:
job.execute_stopped_callback(self.death_penalty_class)
self.handle_job_failure(job, queue=queue, exc_string="Job stopped by user, work-horse terminated.")
elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
if not job.ended_at:
job.ended_at = utcnow()

# Unhandled failure: move the job to the failed queue
self.log.warning(
(
"Moving job to FailedJobRegistry "
"(work-horse terminated unexpectedly; waitpid returned {})" # fmt: skip
).format(ret_val)
)

self.handle_job_failure(
job,
queue=queue,
exc_string="Work-horse process was terminated unexpectedly "
"(waitpid returned %s)" % ret_val, # fmt: skip
)
signal_msg = f" (signal {os.WTERMSIG(ret_val)})" if ret_val and os.WIFSIGNALED(ret_val) else ""
exc_string = f"Work-horse terminated unexpectedly; waitpid returned {ret_val}{signal_msg}; "
self.log.warning("Moving job to FailedJobRegistry (%s)", exc_string)

self.handle_work_horse_killed(job, retpid, ret_val, rusage)
self.handle_job_failure(job, queue=queue, exc_string=exc_string)


class RedashWorker(StatsdRecordingWorker, HardLimitingWorker):
Expand Down
Loading