Skip to content

Commit

Permalink
Add retry mechanism when a medium uploads files to an object store (#…
Browse files Browse the repository at this point in the history
…3655)

* add retry behaviour that wraps medium.put_file

provide the ability to retry uploading a medium resource to a remote object store

* rename method from s3 to object_store

avoid using specific provider nomenclature

* use retry behaviour to upload exports

use the retry behaviour to help avoid object store errors blocking the export creation

* use single quoted strings and avoid should

* add put_file_with_retry behaviour on DirectToS3 class

mirror put_file_with_retry on custom s3 medium uploader for use with s3.

We've never seen the need for this behaviour in s3 adapter.

However as the medium interface is used for DirectToS3 uploader class we need to support this method as it's used in the exporter code.
  • Loading branch information
camallen authored Sep 10, 2021
1 parent ab63cb6 commit 2794e5b
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 13 deletions.
13 changes: 13 additions & 0 deletions app/models/csv_dumps/direct_to_s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@ def put_file(gzip_file_path, compressed: true)
)
end

def put_file_with_retry(gzip_file_path, opts={}, num_retries=5)
attempts ||= 1
put_file(gzip_file_path, opts)
rescue UnencryptedBucket => e # do not retry this error
raise e
rescue => e # rubocop:disable Style/RescueStandardError
retry if (attempts += 1) <= num_retries

# ensure we raise unexpected errors once we've exhausted
# the number of retries to continute to surface these errors
raise e
end

def storage_adapter(adapter='aws')
return @storage_adapter if @storage_adapter

Expand Down
6 changes: 3 additions & 3 deletions app/models/csv_dumps/dump_processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def perform_dump

def upload_dump
gzip_file_path = csv_dump.gzip!
write_to_s3(gzip_file_path)
write_to_object_store(gzip_file_path)
set_ready_state
end

Expand All @@ -49,8 +49,8 @@ def set_ready_state
medium.save!
end

def write_to_s3(gzip_file_path)
medium.put_file(gzip_file_path, compressed: true)
def write_to_object_store(gzip_file_path)
medium.put_file_with_retry(gzip_file_path, compressed: true)
end
end
end
18 changes: 15 additions & 3 deletions app/models/medium.rb
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,24 @@ def get_url(opts = {})
end

def put_file(file_path, opts={})
if file_path.blank?
raise MissingPutFilePath.new("Must specify a file_path to store")
end
raise MissingPutFilePath, 'Must specify a file_path to store' if file_path.blank?

MediaStorage.put_file(src, file_path, indifferent_attributes.merge(opts))
end

def put_file_with_retry(file_path, opts={}, num_retries=5)
attempts ||= 1
put_file(file_path, opts)
rescue MissingPutFilePath => e # do not retry these invalid put_file args
raise e
rescue => e # rubocop:disable Style/RescueStandardError
retry if (attempts += 1) <= num_retries

# ensure we raise unexpected errors once we've exhausted
# the number of retries to continute to surface these errors
raise e
end

private

def queue_medium_removal
Expand Down
29 changes: 29 additions & 0 deletions spec/models/csv_dumps/direct_to_s3_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,35 @@
end
end

describe '#put_file_with_retry' do
it 'calls put_file with the src and other attributes' do
allow(adapter).to receive(:put_file)
direct_to_s3.put_file_with_retry(file_path)
expect(adapter).to have_received(:put_file).with(s3_path, file_path, s3_opts)
end

it 'retries the correct number of times' do
allow(adapter).to receive(:put_file).and_raise(Faraday::ConnectionFailed, 'some error in aws lib')
direct_to_s3.put_file_with_retry(file_path)
rescue Faraday::ConnectionFailed
expect(adapter).to have_received(:put_file).with(s3_path, file_path, s3_opts).exactly(5).times
end

it 'allows the retry number to be modified at runtime' do
allow(adapter).to receive(:put_file).and_raise(Faraday::ConnectionFailed, 'Connection reset by peer')
direct_to_s3.put_file_with_retry(file_path, {}, 2)
rescue Faraday::ConnectionFailed
expect(adapter).to have_received(:put_file).with(s3_path, file_path, s3_opts).twice
end

it 'does not retry if put_file raises UnencryptedBucket' do
allow(direct_to_s3).to receive(:put_file).and_raise(CsvDumps::DirectToS3::UnencryptedBucket)
direct_to_s3.put_file_with_retry('')
rescue CsvDumps::DirectToS3::UnencryptedBucket
expect(direct_to_s3).to have_received(:put_file).once
end
end

describe 'bucket encryption' do
it "raises an error if it's not encrypted" do
allow(adapter).to receive(:encrypted_bucket?).and_return(false)
Expand Down
9 changes: 5 additions & 4 deletions spec/models/csv_dumps/dump_processor_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
RSpec.describe CsvDumps::DumpProcessor do
let(:formatter) { double("Formatter", headers: false).tap { |f| allow(f).to receive(:to_rows) { |model| [model] } } }
let(:scope) { [] }
let(:medium) { double("Medium", put_file: true, metadata: {}, save!: true) }
let(:medium) { instance_double('Medium', put_file_with_retry: true, metadata: {}, save!: true) }
let(:csv_dump) { double(CsvDump, cleanup!: true, gzip!: true) }
let(:processor) { described_class.new(formatter, scope, medium, csv_dump) }

Expand All @@ -29,14 +29,15 @@
processor.execute
end

it "push the file to s3" do
it 'push the file to an object store' do
path = double
allow(csv_dump).to receive(:gzip!).and_return(path)
expect(medium).to receive(:put_file).with(path, compressed: true).once
allow(medium).to receive(:put_file_with_retry)
processor.execute
expect(medium).to have_received(:put_file_with_retry).with(path, compressed: true).once
end

it "should clean up the file after sending to s3" do
it 'cleans up the file after sending to object store' do
expect(csv_dump).to receive(:cleanup!).once
processor.execute
end
Expand Down
34 changes: 34 additions & 0 deletions spec/models/medium_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,40 @@
end
end

describe '#put_file_with_retry' do
let(:file_path) { Rails.root.join('/tmp/project_x_dump.csv') }
let(:media_storage_put_file_params) do
[medium.src, file_path, medium.attributes]
end

it 'calls MediaStorage put_file with the src and other attributes' do
allow(MediaStorage).to receive(:put_file)
medium.put_file_with_retry(file_path)
expect(MediaStorage).to have_received(:put_file).with(*media_storage_put_file_params)
end

it 'retries the correct number of times' do
allow(MediaStorage).to receive(:put_file).and_raise(Faraday::ConnectionFailed, 'Connection reset by peer')
medium.put_file_with_retry(file_path)
rescue Faraday::ConnectionFailed
expect(MediaStorage).to have_received(:put_file).with(*media_storage_put_file_params).exactly(5).times
end

it 'allows the retry number to be modified at runtime' do
allow(MediaStorage).to receive(:put_file).and_raise(Faraday::ConnectionFailed, 'Connection reset by peer')
medium.put_file_with_retry(file_path, {}, 2)
rescue Faraday::ConnectionFailed
expect(MediaStorage).to have_received(:put_file).with(*media_storage_put_file_params).twice
end

it 'does not retry if put_file raises MissingPutFilePath' do
allow(medium).to receive(:put_file).and_call_original
medium.put_file_with_retry('')
rescue Medium::MissingPutFilePath
expect(medium).to have_received(:put_file).once
end
end

describe "#locations" do
let(:project) { create(:project) }
context "when type is one of project_avatar, user_avatar, or project_background" do
Expand Down
7 changes: 4 additions & 3 deletions spec/support/dump_worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
worker.perform(another_project.id, "project")
end

it "should not push a file to s3" do
expect(worker).to_not receive(:write_to_s3)
worker.perform(another_project.id, "project")
it 'does not write the file to a remote object store' do
allow(worker).to receive(:write_to_object_store)
worker.perform(another_project.id, 'project')
expect(worker).not_to have_received(:write_to_object_store)
end

it "should not queue a worker to send an email" do
Expand Down

0 comments on commit 2794e5b

Please sign in to comment.