Skip to content
This repository was archived by the owner on Nov 26, 2024. It is now read-only.

Commit a3fda7b

Browse files
committed
try to fix rq/rq#1507
the idea is to port the changes from the current version of rq's monitor_work_horse to the version introduced by redash. This is a classic example why overriding a method that you don't own is usually a bad idea. You miss on upstream fixes. Another point is that I'm not even sure if this custom method is even necessary with the version of rq we're using. Maybe we should investigate it more
1 parent 2ab57bc commit a3fda7b

File tree

1 file changed

+14
-5
lines changed

1 file changed

+14
-5
lines changed

redash/tasks/worker.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -113,20 +113,20 @@ def monitor_work_horse(self, job, queue):
113113
with UnixSignalDeathPenalty(
114114
self.job_monitoring_interval, HorseMonitorTimeoutException
115115
):
116-
retpid, ret_val = os.waitpid(self._horse_pid, 0)
116+
retpid, ret_val = self.wait_for_horse()
117117
break
118118
except HorseMonitorTimeoutException:
119119
# Horse has not exited yet and is still running.
120120
# Send a heartbeat to keep the worker alive.
121-
self.heartbeat(self.job_monitoring_interval + 5)
122-
123-
job.refresh()
121+
self.set_current_job_working_time((utcnow() - job.started_at).total_seconds())
124122

125123
if job.is_cancelled:
126124
self.stop_executing_job(job)
127125

128126
if self.soft_limit_exceeded(job):
129127
self.enforce_hard_limit(job)
128+
129+
self.maintain_heartbeats(job)
130130
except OSError as e:
131131
# In case we encountered an OSError due to EINTR (which is
132132
# caused by a SIGINT or SIGTERM signal during
@@ -139,12 +139,21 @@ def monitor_work_horse(self, job, queue):
139139
# Send a heartbeat to keep the worker alive.
140140
self.heartbeat()
141141

142+
self.set_current_job_working_time(0)
143+
self._horse_pid = 0 # Set horse PID to 0, horse has finished working
142144
if ret_val == os.EX_OK: # The process exited normally.
143145
return
144146
job_status = job.get_status()
145147
if job_status is None: # Job completed and its ttl has expired
146148
return
147-
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
149+
elif job_status == JobStatus.STOPPED:
150+
# Work-horse killed deliberately
151+
self.log.warning('Job stopped by user, moving job to FailedJobRegistry')
152+
self.handle_job_failure(
153+
job, queue=queue,
154+
exc_string="Job stopped by user, work-horse terminated."
155+
)
156+
elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
148157

149158
if not job.ended_at:
150159
job.ended_at = utcnow()

0 commit comments

Comments
 (0)