Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent on_cancellation_job & on_completion_job deserialization failure blocking cleanup #24

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Changelog
## 0.8.0
* Failure to deserialize on_cancellation_job or on_completion_job will not prevent clean up of the job group.

### 0.7.0
* Add support for ruby 3
* Drop support for ruby < 2.6
Expand Down
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,24 @@ job_group.cancel
Configuration to allow failed jobs not to cancel the group
```ruby
# We can optionally pass options that will allow jobs to fail without cancelling the group.
# This also allows the on_completion job to fire once all jobs have either succeeded or failed.
# This also allows the on_completion job to fire once all jobs have either succeeded or failed.
job_group = Delayed::JobGroups::JobGroup.create!(failure_cancels_group: false)
```

### Job Group Plugin Options

The job group plugin can be configured in an initializer (e.g. `config/initializers/delayed_job_groups_plugin.rb`) as follows:

```ruby
Delayed::JobGroups.configure do |configuration|
configuration.error_reporter = Proc.new { |error| Bugsnag.notify(error) }
end
```

The plugin supports the following options (all of which are optional):

* `error_reporter` - a callback proc that accepts an `Exception` if the plugin encounters an unexpected error. This can be useful for reporting to an error monitoring system.

## Supported Platforms

* Only the Delayed Job Active Record backend is supported.
Expand Down
9 changes: 9 additions & 0 deletions lib/delayed/job_groups/configuration.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# frozen_string_literal: true

module Delayed
module JobGroups
class Configuration
attr_accessor :error_reporter
end
end
end
35 changes: 33 additions & 2 deletions lib/delayed/job_groups/job_group.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,21 @@ def unblock
end

def cancel
Delayed::Job.enqueue(on_cancellation_job, on_cancellation_job_options || {}) if on_cancellation_job
job = nil
job_options = nil

# Deserialization of the job or its options can fail
begin
job = on_cancellation_job
job_options = on_cancellation_job_options
rescue StandardError => e
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we only catch Delayed::DeserializationErrors so we'll crash and retry for other types of errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my scenario, I got an ArgumentError. Checking out https://github.com/collectiveidea/delayed_job/blob/master/lib/delayed/backend/base.rb#L73, it looks like quite a long list of errors that can be generated by YAML.load_dj(handler). I'm happy to copy paste all of those errors in here if that's preferred.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yuck. I was hoping it was just a single exception to catch. It's not ideal but I think copying that list of errors will avoid mistakenly rescuing some classes of errors.

Delayed::Worker.logger.info('Failed to deserialize the on_cancellation_job or on_cancellation_job_options ' \
"for job_group_id=#{id}. Skipping on_cancellation_job to clean up job group.")
error_reporter.call(e) if error_reporter
end

Delayed::Job.enqueue(job, job_options || {}) if job

destroy
end

Expand Down Expand Up @@ -80,9 +94,26 @@ def ready_for_completion?
end

def complete
Delayed::Job.enqueue(on_completion_job, on_completion_job_options || {}) if on_completion_job
job = nil
job_options = nil

# Deserialization of the job or its options can fail
begin
job = on_completion_job
job_options = on_completion_job_options
rescue StandardError => e
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about only catching Delayed::DeserializationError

Delayed::Worker.logger.info('Failed to deserialize the on_completion_job or on_completion_job_options for ' \
"job_group_id=#{id}. Skipping on_completion_job to clean up job group.")
error_reporter.call(e) if error_reporter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Swallowing these errors when a job group is trying to complete seems dangerous since the job group really hasn't completed. Do you think we need to introduce the notion of a job group being in a failed state so we don't crash the worker by continually retrying but still capture the fact that the job group didn't complete. We might need something similar for cancels too since the job group really hasn't been completely canceled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose an alternative here is that the job group could mark itself as blocked instead of destroying itself.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't we still need to manage some state to indicate why a job group was blocked e.g. so you could query the DB for the list of job groups that are blocked due to failure and unblock them after a fix has been deployed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would certainly be more convenient if we did that. I think we could get away with using a single failed_at timestamp on the on_cancellation_job or the on_completion_job that would be set if deserialization failed. Blocking the job group still makes sense right?

end

Delayed::Job.enqueue(job, job_options || {}) if job
destroy
end

def error_reporter
Delayed::JobGroups.configuration.error_reporter
end
end
end
end
2 changes: 1 addition & 1 deletion lib/delayed/job_groups/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@

module Delayed
module JobGroups
VERSION = '0.7.0'
VERSION = '0.8.0'
end
end
15 changes: 15 additions & 0 deletions lib/delayed_job_groups_plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require 'active_record'
require 'delayed_job'
require 'delayed_job_active_record'
require 'delayed/job_groups/configuration'
require 'delayed/job_groups/compatibility'
require 'delayed/job_groups/job_extensions'
require 'delayed/job_groups/job_group'
Expand All @@ -20,3 +21,17 @@
end

Delayed::Worker.plugins << Delayed::JobGroups::Plugin

module Delayed
module JobGroups
@configuration = Delayed::JobGroups::Configuration.new

class << self
attr_reader :configuration

def configure
yield(configuration) if block_given?
end
end
end
end
101 changes: 91 additions & 10 deletions spec/delayed/job_groups/job_group_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,47 @@
expect(job_group).to have_been_destroyed
end
end

context "on_completion_job refers to missing class" do
# The on_completion_job needs the class to be defined this way in order to serialize it
# rubocop:disable RSpec/LeakyConstantDeclaration,Style/ClassAndModuleChildren,Lint/ConstantDefinitionInBlock
module Delayed::JobGroups::JobGroupTestHelper
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we define this constant in a before so we're sure it will be present when the example starts running? The current approach won't work if we ever added multiple examples to this example group.

class OnCompletionJob

end
end
# rubocop:enable RSpec/LeakyConstantDeclaration,Style/ClassAndModuleChildren,Lint/ConstantDefinitionInBlock

let(:error_reporter) { Proc.new { |_error| } }

around do |example|
original_error_reporter = Delayed::JobGroups.configuration.error_reporter
Delayed::JobGroups.configuration.error_reporter = error_reporter
example.run
Delayed::JobGroups.configuration.error_reporter = original_error_reporter
end

before { allow(error_reporter).to receive(:call) }

it "handles missing on_completion_job" do
on_completion_job = Delayed::JobGroups::JobGroupTestHelper::OnCompletionJob.new
job_group = Delayed::JobGroups::JobGroup.create!(on_completion_job: on_completion_job,
on_completion_job_options: {})
job = Delayed::Job.create!(job_group_id: job_group.id)
job_group.mark_queueing_complete
job.destroy

# Remove the class for on_completion_job
Delayed::JobGroups::JobGroupTestHelper.module_eval do
remove_const 'OnCompletionJob'
end

# Deserialization fails
expect { Delayed::JobGroups::JobGroup.check_for_completion(job_group.id) }.not_to raise_error
expect(error_reporter).to have_received(:call)
expect(job_group).to have_been_destroyed
end
end
end

describe "#enqueue" do
Expand Down Expand Up @@ -212,20 +253,60 @@
let!(:queued_job) { Delayed::Job.create!(job_group_id: job_group.id) }
let!(:running_job) { Delayed::Job.create!(job_group_id: job_group.id, locked_at: Time.now, locked_by: 'test') }

before do
job_group.cancel
end
context "with no on_cancellation_job" do
before do
job_group.cancel
end

it "destroys the job group" do
expect(job_group).to have_been_destroyed
end
it "destroys the job group" do
expect(job_group).to have_been_destroyed
end

it "destroys queued jobs" do
expect(queued_job).to have_been_destroyed
it "destroys queued jobs" do
expect(queued_job).to have_been_destroyed
end

it "does not destroy running jobs" do
expect(running_job).not_to have_been_destroyed
end
end

it "does not destroy running jobs" do
expect(running_job).not_to have_been_destroyed
context "on_cancellation_job refers to missing class" do
# The on_cancellation_job needs the class to be defined this way in order to serialize it
# rubocop:disable RSpec/LeakyConstantDeclaration,Style/ClassAndModuleChildren,Lint/ConstantDefinitionInBlock
module Delayed::JobGroups::JobGroupTestHelper
class OnCancellationJob

end
end
# rubocop:enable RSpec/LeakyConstantDeclaration,Style/ClassAndModuleChildren,Lint/ConstantDefinitionInBlock

let(:error_reporter) { Proc.new { |_error| } }

around do |example|
original_error_reporter = Delayed::JobGroups.configuration.error_reporter
Delayed::JobGroups.configuration.error_reporter = error_reporter
example.run
Delayed::JobGroups.configuration.error_reporter = original_error_reporter
end

before { allow(error_reporter).to receive(:call) }

it "handles missing on_cancellation_job" do
on_cancellation_job = Delayed::JobGroups::JobGroupTestHelper::OnCancellationJob.new
job_group = Delayed::JobGroups::JobGroup.create!(on_cancellation_job: on_cancellation_job,
on_cancellation_job_options: {})

# Remove the class for on_cancellation_job
Delayed::JobGroups::JobGroupTestHelper.module_eval do
remove_const 'OnCancellationJob'
end

# Deserialization fails
expect { job_group.cancel }.not_to raise_error
expect(error_reporter).to have_received(:call)
expect(job_group).to have_been_destroyed
end
end
end

Expand Down