Skip to content

Commit

Permalink
Fix spin-loop/cleanup failure mode within run loop (#42)
Browse files Browse the repository at this point in the history
This ensures that exceptions raised in thread callback hooks are rescued
and properly mark jobs as failed.

This is also a good opportunity to change the `num` argument (of
`work_off(num)`) to mean number of jobs (give or take a few due to
`max_claims`), not number of iterations. Previously (before threading
was introduced) I think it meant number of jobs (though jobs and
iterations were 1:1). I would not have done this before the refactor,
because there was no guarantee that one of `success` or `failure` would
be incremented (the thread might crash for many reasons). Now, we only
increment `success` and treat `total - success` as the "failure" number
when we return from the method.

Fixes #23 and #41

This is also a prereq for a resolution I'm cooking up for #36
  • Loading branch information
smudge authored Jun 27, 2024
1 parent 24b75f5 commit fd97a38
Show file tree
Hide file tree
Showing 10 changed files with 33 additions and 35 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ jobs:
strategy:
fail-fast: false
matrix:
ruby: ['2.6', '2.7', '3.0', '3.1', '3.2']
ruby: ['2.7', '3.0', '3.1', '3.2']
gemfile:
- gemfiles/rails_5_2.gemfile
- gemfiles/rails_6_0.gemfile
Expand Down
2 changes: 1 addition & 1 deletion Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ gem 'mysql2'
gem 'pg'
gem 'rake'
gem 'rspec'
gem 'sqlite3'
gem 'sqlite3', '~> 1.7.3'
gem 'timecop'
gem 'zeitwerk'
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ DEPENDENCIES
pg
rake
rspec
sqlite3
sqlite3 (~> 1.7.3)
timecop
zeitwerk

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/rails_5_2.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ gem "mysql2"
gem "pg"
gem "rake"
gem "rspec"
gem "sqlite3"
gem "sqlite3", "~> 1.7.3"
gem "timecop"
gem "zeitwerk"

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/rails_6_0.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ gem "mysql2"
gem "pg"
gem "rake"
gem "rspec"
gem "sqlite3"
gem "sqlite3", "~> 1.7.3"
gem "timecop"
gem "zeitwerk"

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/rails_6_1.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ gem "mysql2"
gem "pg"
gem "rake"
gem "rspec"
gem "sqlite3"
gem "sqlite3", "~> 1.7.3"
gem "timecop"
gem "zeitwerk"

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/rails_7_0.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ gem "mysql2"
gem "pg"
gem "rake"
gem "rspec"
gem "sqlite3"
gem "sqlite3", "~> 1.7.3"
gem "timecop"
gem "zeitwerk"

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/rails_7_1.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ gem "mysql2"
gem "pg"
gem "rake"
gem "rspec"
gem "sqlite3"
gem "sqlite3", "~> 1.7.3"
gem "timecop"
gem "zeitwerk"

Expand Down
2 changes: 1 addition & 1 deletion gemfiles/rails_main.gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ gem "mysql2"
gem "pg"
gem "rake"
gem "rspec"
gem "sqlite3"
gem "sqlite3", "~> 1.7.3"
gem "timecop"
gem "zeitwerk"
gem "actionview", github: "rails/rails", glob: "actionview/*.gemspec"
Expand Down
50 changes: 24 additions & 26 deletions lib/delayed/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,17 @@ def on_exit!
# Exit early if interrupted.
def work_off(num = 100)
success = Concurrent::AtomicFixnum.new(0)
failure = Concurrent::AtomicFixnum.new(0)
total = 0

num.times do
while total < num
jobs = reserve_jobs
break if jobs.empty?

total += jobs.length
pool = Concurrent::FixedThreadPool.new(jobs.length)
jobs.each do |job|
pool.post do
run_thread_callbacks(job) do
if run_job(job)
success.increment
else
failure.increment
end
end
success.increment if run_job(job)
end
end

Expand All @@ -114,38 +109,41 @@ def work_off(num = 100)
break if stop? # leave if we're exiting
end

[success, failure].map(&:value)
[success.value, total - success.value]
end

def run_thread_callbacks(job, &block)
self.class.lifecycle.run_callbacks(:thread, self, job, &block)
end

def run(job)
metadata = {
status: 'RUNNING',
name: job.name,
run_at: job.run_at,
created_at: job.created_at,
priority: job.priority,
queue: job.queue,
attempts: job.attempts,
enqueued_for: (Time.current - job.created_at).round,
}
job_say job, metadata.to_json
run_time = Benchmark.realtime do
Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) do
job.invoke_job
run_thread_callbacks(job) do
metadata = {
status: 'RUNNING',
name: job.name,
run_at: job.run_at,
created_at: job.created_at,
priority: job.priority,
queue: job.queue,
attempts: job.attempts,
enqueued_for: (Time.current - job.created_at).round,
}
job_say job, metadata.to_json
run_time = Benchmark.realtime do
Timeout.timeout(max_run_time(job).to_i, WorkerTimeout) do
job.invoke_job
end
job.destroy
end
job.destroy
job_say job, format('COMPLETED after %.4f seconds', run_time)
end
job_say job, format('COMPLETED after %.4f seconds', run_time)
true # did work
rescue DeserializationError => e
job_say job, "FAILED permanently with #{e.class.name}: #{e.message}", 'error'

job.error = e
failed(job)
false # work failed
rescue Exception => e # rubocop:disable Lint/RescueException
self.class.lifecycle.run_callbacks(:error, self, job) { handle_failed_job(job, e) }
false # work failed
Expand Down

0 comments on commit fd97a38

Please sign in to comment.