Skip to content

Commit ee0e6d6

Browse files
jpcamaramhenrixon
andcommitted
Refactor internals and api namespace of batches
* Thanks to Mikael Henriksson for his work in rails#590. His work decentralizes management of batch status by moving it to the BatchUpdateJob, and tracking status using counts rather than querying specific job statuses after the fact. This is a much simpler approach to tracking the jobs, and allows us to avoid a constantly polling set of queries in the dispatcher. Also add in arbitrary metadata to allow tracking data from start to end of execution. This also means enqueueing a BatchUpdateJob based on callbacks in two different kinds of Batchable, which are included when a job is updated and finished, or when a FailedExecution is created (since failed jobs never "finish"). * This batch feature already took some inspiration from the GoodJob batch implementation (https://github.com/bensheldon/good_job). But now we also increase that by adopting some of the buffering and abstractions in a similar form as GoodJob. To discourage heavy reliance on the JobBatch model, it has been renamed to BatchRecord, and a separate Batch interface is how you interact with batches, with some delegation to the core model. * A new Buffer class (also modeled after GoodJob) was added specifically for batches. This was primarily added to support enqueue_after_transaction_commit. We now override the ActiveJob #enqueue method so we can keep track of which jobs are attempting to enqueue. When enqueue_after_transaction_commit is on, those jobs do not enqueue until all transactions commit. By tracking them at the high level enqueue and keeping a buffer of jobs, we can ensure that the jobs get tracked even when their creation is deferred until the transaction is committed. The side benefit is that we get to enqueue all the jobs together, probably offering some performance advantage. This buffer also keeps track of child batches for the same reason. * To support triggering a callback/BatchUpdateJob when a job finishes, the update to finished_at needed to become an update! call * As a simplification, on_failure is now only fired after all jobs finish, rather than at the first time a job fails * The adapter logic itself also needed to be updated to support the buffer and enqueue_after_transaction_commit. If a job is coming from a batch enqueue, we ignore it here and allow the batching process to enqueue_all at the end of the enqueue block. If the job is originally from a batch, but is retrying, we make sure the job counts in the batch stay updated. I don't love this addition, since it adds alot of complication to the adapter code, all solely oriented around batches * Batches benefit from keeping jobs until the batch has finished. As such, we ignore the preserve jobs setting, but if it is set to false, we enqueue a cleanup job once the batch has finished and clear out finished jobs * Implement preserved jobs test and remove todo * Idempotent updates with pessismistic locks * Check if it finished before we acquired the lock * Use enqueue_all directly rather than passing through activejob for completion jobs Co-authored-by: Mikael Henriksson <[email protected]>
1 parent 9c47969 commit ee0e6d6

File tree

27 files changed

+904
-325
lines changed

27 files changed

+904
-325
lines changed

README.md

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -595,9 +595,10 @@ and optionally trigger callbacks based on their status. It supports the followin
595595
- Three available callbacks to fire:
596596
- `on_finish`: Fired when all jobs have finished, including retries. Fires even when some jobs have failed.
597597
- `on_success`: Fired when all jobs have succeeded, including retries. Will not fire if any jobs have failed, but will fire if jobs have been discarded using `discard_on`
598-
- `on_failure`: Fired the _first_ time a job fails, after all retries are exhausted.
598+
- `on_failure`: Fired when all jobs have finished, including retries. Will only fire if one or more jobs have failed.
599599
- If a job is part of a batch, it can enqueue more jobs for that batch using `batch#enqueue`
600-
- Batches can be nested within other batches, creating a hierarchy. Outer batches will not finish until all nested batches have finished.
600+
- Batches can be nested within other batches, creating a hierarchy. Outer batches will not fire callbacks until all nested jobs have finished.
601+
- Attaching arbitrary metadata to a batch
601602

602603
```rb
603604
class SleepyJob < ApplicationJob
@@ -614,7 +615,7 @@ class MultiStepJob < ApplicationJob
614615
# Because of this nested batch, the top-level batch won't finish until the inner,
615616
# 10 second job finishes
616617
# Both jobs will still run simultaneously
617-
SolidQueue::JobBatch.enqueue do
618+
SolidQueue::Batch.enqueue do
618619
SleepyJob.perform_later(10)
619620
end
620621
end
@@ -639,10 +640,11 @@ class BatchFailureJob < ApplicationJob
639640
end
640641
end
641642
642-
SolidQueue::JobBatch.enqueue(
643+
SolidQueue::Batch.enqueue(
643644
on_finish: BatchFinishJob,
644645
on_success: BatchSuccessJob,
645-
on_failure: BatchFailureJob
646+
on_failure: BatchFailureJob,
647+
metadata: { user_id: 123 }
646648
) do
647649
5.times.map { |i| SleepyJob.perform_later(i) }
648650
end
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class BatchUpdateJob < ActiveJob::Base
5+
class UpdateFailure < RuntimeError; end
6+
7+
queue_as :background
8+
9+
discard_on ActiveRecord::RecordNotFound
10+
11+
def perform(batch_id, job)
12+
batch = SolidQueue::BatchRecord.find_by!(batch_id: batch_id)
13+
14+
return if job.batch_id != batch_id
15+
16+
status = job.status
17+
return unless status.in?([ :finished, :failed ])
18+
19+
batch.job_finished!(job)
20+
rescue => e
21+
Rails.logger.error "[SolidQueue] BatchUpdateJob failed for batch #{batch_id}, job #{job.id}: #{e.message}"
22+
raise
23+
end
24+
end
25+
end
Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class BatchRecord < Record
5+
self.table_name = "solid_queue_job_batches"
6+
7+
STATUSES = %w[pending processing completed failed]
8+
9+
belongs_to :parent_job_batch, foreign_key: :parent_job_batch_id, class_name: "SolidQueue::BatchRecord", optional: true
10+
has_many :jobs, foreign_key: :batch_id, primary_key: :batch_id
11+
has_many :children, foreign_key: :parent_job_batch_id, primary_key: :batch_id, class_name: "SolidQueue::BatchRecord"
12+
13+
serialize :on_finish, coder: JSON
14+
serialize :on_success, coder: JSON
15+
serialize :on_failure, coder: JSON
16+
serialize :metadata, coder: JSON
17+
18+
validates :status, inclusion: { in: STATUSES }
19+
20+
scope :pending, -> { where(status: "pending") }
21+
scope :processing, -> { where(status: "processing") }
22+
scope :completed, -> { where(status: "completed") }
23+
scope :failed, -> { where(status: "failed") }
24+
scope :finished, -> { where(status: %w[completed failed]) }
25+
scope :unfinished, -> { where(status: %w[pending processing]) }
26+
27+
after_initialize :set_batch_id
28+
before_create :set_parent_job_batch_id
29+
30+
def on_success=(value)
31+
super(serialize_callback(value))
32+
end
33+
34+
def on_failure=(value)
35+
super(serialize_callback(value))
36+
end
37+
38+
def on_finish=(value)
39+
super(serialize_callback(value))
40+
end
41+
42+
def job_finished!(job)
43+
return if finished?
44+
return if job.batch_processed_at?
45+
46+
job.with_lock do
47+
if job.batch_processed_at.blank?
48+
job.update!(batch_processed_at: Time.current)
49+
50+
if job.failed_execution.present?
51+
self.class.where(id: id).update_all(
52+
"failed_jobs = failed_jobs + 1, pending_jobs = pending_jobs - 1"
53+
)
54+
else
55+
self.class.where(id: id).update_all(
56+
"completed_jobs = completed_jobs + 1, pending_jobs = pending_jobs - 1"
57+
)
58+
end
59+
end
60+
end
61+
62+
reload
63+
check_completion!
64+
end
65+
66+
def check_completion!
67+
return if finished?
68+
69+
actual_children = children.count
70+
return if actual_children < expected_children
71+
72+
children.find_each do |child|
73+
return unless child.finished?
74+
end
75+
76+
with_lock do
77+
if finished?
78+
# do nothing
79+
elsif pending_jobs <= 0
80+
if failed_jobs > 0
81+
mark_as_failed!
82+
else
83+
mark_as_completed!
84+
end
85+
clear_unpreserved_jobs
86+
elsif status == "pending"
87+
update!(status: "processing")
88+
end
89+
end
90+
end
91+
92+
def finished?
93+
status.in?(%w[completed failed])
94+
end
95+
96+
def processing?
97+
status == "processing"
98+
end
99+
100+
def pending?
101+
status == "pending"
102+
end
103+
104+
def progress_percentage
105+
return 0 if total_jobs == 0
106+
((completed_jobs + failed_jobs) * 100.0 / total_jobs).round(2)
107+
end
108+
109+
private
110+
111+
def set_parent_job_batch_id
112+
self.parent_job_batch_id ||= Batch.current_batch_id if Batch.current_batch_id.present?
113+
end
114+
115+
def set_batch_id
116+
self.batch_id ||= SecureRandom.uuid
117+
end
118+
119+
def as_active_job(active_job_klass)
120+
active_job_klass.is_a?(ActiveJob::Base) ? active_job_klass : active_job_klass.new
121+
end
122+
123+
def serialize_callback(value)
124+
return value if value.blank?
125+
active_job = as_active_job(value)
126+
# We can pick up batch ids from context, but callbacks should never be considered a part of the batch
127+
active_job.batch_id = nil
128+
active_job.serialize
129+
end
130+
131+
def perform_completion_job(job_field, attrs)
132+
active_job = ActiveJob::Base.deserialize(send(job_field))
133+
active_job.send(:deserialize_arguments_if_needed)
134+
active_job.arguments = [ Batch.new(_batch_record: self) ] + Array.wrap(active_job.arguments)
135+
SolidQueue::Job.enqueue_all([ active_job ])
136+
137+
active_job.provider_job_id = Job.find_by(active_job_id: active_job.job_id).id
138+
attrs[job_field] = active_job.serialize
139+
end
140+
141+
def mark_as_completed!
142+
# SolidQueue does treats `discard_on` differently than failures. The job will report as being :finished,
143+
# and there is no record of the failure.
144+
# GoodJob would report a discard as an error. It's possible we should do that in the future?
145+
update!(status: "completed", finished_at: Time.current)
146+
147+
perform_completion_job(:on_success, {}) if on_success.present?
148+
perform_completion_job(:on_finish, {}) if on_finish.present?
149+
150+
if parent_job_batch_id.present?
151+
parent = BatchRecord.find_by(batch_id: parent_job_batch_id)
152+
parent&.reload&.check_completion!
153+
end
154+
end
155+
156+
def mark_as_failed!
157+
update!(status: "failed", finished_at: Time.current)
158+
perform_completion_job(:on_failure, {}) if on_failure.present?
159+
perform_completion_job(:on_finish, {}) if on_finish.present?
160+
161+
# Check if parent batch can now complete
162+
if parent_job_batch_id.present?
163+
parent = BatchRecord.find_by(batch_id: parent_job_batch_id)
164+
parent&.check_completion!
165+
end
166+
end
167+
168+
def clear_unpreserved_jobs
169+
SolidQueue::Batch::CleanupJob.perform_later(self) unless SolidQueue.preserve_finished_jobs?
170+
end
171+
end
172+
end
173+
174+
require_relative "batch_record/buffer"
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class BatchRecord
5+
class Buffer
6+
attr_reader :jobs, :child_batches
7+
8+
def initialize
9+
@jobs = {}
10+
@child_batches = []
11+
end
12+
13+
def add(job)
14+
@jobs[job.job_id] = job
15+
job
16+
end
17+
18+
def add_child_batch(batch)
19+
@child_batches << batch
20+
batch
21+
end
22+
23+
def capture
24+
previous_buffer = ActiveSupport::IsolatedExecutionState[:solid_queue_batch_buffer]
25+
ActiveSupport::IsolatedExecutionState[:solid_queue_batch_buffer] = self
26+
27+
yield
28+
29+
@jobs
30+
ensure
31+
ActiveSupport::IsolatedExecutionState[:solid_queue_batch_buffer] = previous_buffer
32+
end
33+
34+
def self.current
35+
ActiveSupport::IsolatedExecutionState[:solid_queue_batch_buffer]
36+
end
37+
38+
def self.capture_job(job)
39+
current&.add(job)
40+
end
41+
42+
def self.capture_child_batch(batch)
43+
current&.add_child_batch(batch)
44+
end
45+
end
46+
end
47+
end

app/models/solid_queue/claimed_execution.rb

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,6 @@ def perform
6969
failed_with(result.error)
7070
raise result.error
7171
end
72-
73-
job.job_batch.touch(:changed_at, :last_changed_at) if job.batch_id.present?
7472
ensure
7573
unblock_next_job
7674
end
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class Execution
5+
module Batchable
6+
extend ActiveSupport::Concern
7+
8+
included do
9+
after_create :update_batch_progress, if: -> { job.batch_id? }
10+
end
11+
12+
private
13+
def update_batch_progress
14+
BatchUpdateJob.perform_later(job.batch_id, job)
15+
rescue => e
16+
Rails.logger.error "[SolidQueue] Failed to notify batch #{batch_id} about job #{id} completion: #{e.message}"
17+
end
18+
end
19+
end
20+
end

app/models/solid_queue/failed_execution.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
module SolidQueue
44
class FailedExecution < Execution
5-
include Dispatching
5+
include Dispatching, Batchable
66

77
serialize :error, coder: JSON
88

app/models/solid_queue/job.rb

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,10 @@ module SolidQueue
44
class Job < Record
55
class EnqueueError < StandardError; end
66

7-
include Executable, Clearable, Recurrable
7+
include Executable, Clearable, Recurrable, Batchable
88

99
serialize :arguments, coder: JSON
1010

11-
belongs_to :job_batch, foreign_key: :batch_id, optional: true
12-
1311
class << self
1412
def enqueue_all(active_jobs)
1513
active_jobs_by_job_id = active_jobs.index_by(&:job_id)
@@ -56,7 +54,6 @@ def create_all_from_active_jobs(active_jobs)
5654
end
5755

5856
def attributes_from_active_job(active_job)
59-
active_job.batch_id = JobBatch.current_batch_id || active_job.batch_id
6057
{
6158
queue_name: active_job.queue_name || DEFAULT_QUEUE_NAME,
6259
active_job_id: active_job.job_id,
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class Job
5+
module Batchable
6+
extend ActiveSupport::Concern
7+
8+
included do
9+
belongs_to :job_batch, foreign_key: :batch_id, optional: true
10+
11+
after_update :update_batch_progress, if: :batch_id?
12+
end
13+
14+
private
15+
def update_batch_progress
16+
return unless saved_change_to_finished_at? && finished_at.present?
17+
return unless batch_id.present?
18+
19+
BatchUpdateJob.perform_later(batch_id, self)
20+
rescue => e
21+
Rails.logger.error "[SolidQueue] Failed to notify batch #{batch_id} about job #{id} completion: #{e.message}"
22+
end
23+
end
24+
end
25+
end

app/models/solid_queue/job/executable.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ def dispatch_bypassing_concurrency_limits
7676
end
7777

7878
def finished!
79-
if SolidQueue.preserve_finished_jobs? || batch_id.present?
80-
touch(:finished_at)
79+
if SolidQueue.preserve_finished_jobs? || batch_id.present? # We clear jobs after the batch finishes
80+
update!(finished_at: Time.current)
8181
else
8282
destroy!
8383
end

0 commit comments

Comments
 (0)