Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add request id fix for bulk publishes #154

Merged
merged 9 commits into from
May 1, 2018
2 changes: 1 addition & 1 deletion lib/ably/auth.rb
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ def token_request_from_auth_url(auth_url, auth_options, token_params)
method = auth_options[:auth_method] || options[:auth_method] || :get
params = (auth_options[:auth_params] || options[:auth_method] || {}).merge(token_params)

response = connection.send(method) do |request|
response = connection.public_send(method) do |request|
request.url uri.path
request.headers = auth_options[:auth_headers] || {}
if method.to_s.downcase == 'post'
Expand Down
2 changes: 1 addition & 1 deletion lib/ably/modules/encodeable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def apply_encoders(method, encoders, channel_options, &error_callback)

previous_encoding = message_attributes[:encoding]
encoders.each do |encoder|
encoder.send method, message_attributes, channel_options
encoder.public_send method, message_attributes, channel_options
end
end until previous_encoding == message_attributes[:encoding]

Expand Down
30 changes: 19 additions & 11 deletions lib/ably/rest/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -439,22 +439,15 @@ def send_request(method, path, params, options)
max_retry_duration = http_defaults.fetch(:max_retry_duration)
requested_at = Time.now
retry_count = 0
request_id = nil
if add_request_ids
params = if params.nil?
{}
else
params.dup
end
request_id = SecureRandom.urlsafe_base64(10)
params[:request_id] = request_id
end
retry_sequence_id = nil
request_id = SecureRandom.urlsafe_base64(10) if add_request_ids

begin
use_fallback = can_fallback_to_alternate_ably_host? && retry_count > 0

connection(use_fallback: use_fallback).send(method, path, params) do |request|
if add_request_ids
request.params[:request_id] = request_id
request.options.context = {} if request.options.context.nil?
request.options.context[:request_id] = request_id
end
Expand All @@ -466,15 +459,30 @@ def send_request(method, path, params, options)
end
end
end
end.tap do
if retry_count > 0
logger.warn do
"Ably::Rest::Client - Request SUCCEEDED after #{retry_count} #{retry_count > 1 ? 'retries' : 'retry' } for" \
" #{method} #{path} #{params} (seq ##{retry_sequence_id}, time elapsed #{(Time.now.to_f - requested_at.to_f).round(2)}s)"
end
end
end

rescue Faraday::TimeoutError, Faraday::ClientError, Ably::Exceptions::ServerError => error
retry_sequence_id ||= SecureRandom.urlsafe_base64(4)
time_passed = Time.now - requested_at

if can_fallback_to_alternate_ably_host? && retry_count < max_retry_count && time_passed <= max_retry_duration
retry_count += 1
logger.warn { "Ably::Rest::Client - Retry #{retry_count} for #{method} #{path} #{params} as initial attempt failed: #{error}" }
logger.warn { "Ably::Rest::Client - Retry #{retry_count} for #{method} #{path} #{params} as initial attempt failed (seq ##{retry_sequence_id}): #{error}" }
retry
end

logger.error do
"Ably::Rest::Client - Request FAILED after #{retry_count} #{retry_count > 1 ? 'retries' : 'retry' } for" \
" #{method} #{path} #{params} (seq ##{retry_sequence_id}, time elapsed #{(Time.now.to_f - requested_at.to_f).round(2)}s)"
end

case error
when Faraday::TimeoutError
raise Ably::Exceptions::ConnectionTimeout.new(error.message, nil, 80014, error, { request_id: request_id })
Expand Down
28 changes: 2 additions & 26 deletions spec/acceptance/realtime/auth_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1014,33 +1014,9 @@ def disconnect_transport(connection)
end
end

context 'deprecated #authorise' do
context 'deprecated #authorise', :prevent_log_stubbing do
let(:client_options) { default_options.merge(key: api_key, logger: custom_logger_object, use_token_auth: true) }
let(:custom_logger) do
Class.new do
def initialize
@messages = []
end

[:fatal, :error, :warn, :info, :debug].each do |severity|
define_method severity do |message|
@messages << [severity, message]
end
end

def logs
@messages
end

def level
1
end

def level=(new_level)
end
end
end
let(:custom_logger_object) { custom_logger.new }
let(:custom_logger_object) { TestLogger.new }

it 'logs a deprecation warning (#RSA10l)' do
client.auth.authorise
Expand Down
31 changes: 2 additions & 29 deletions spec/acceptance/rest/auth_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1324,36 +1324,9 @@ def coerce_if_time_value(field_name, value, params = {})
end
end

context 'deprecated #authorise' do
context 'deprecated #authorise', :prevent_log_stubbing do
let(:client_options) { default_options.merge(key: api_key, logger: custom_logger_object, use_token_auth: true) }
let(:custom_logger) do
Class.new do
def initialize
@messages = []
end

[:fatal, :error, :warn, :info, :debug].each do |severity|
define_method severity do |message, &block|
message_val = [message]
message_val << block.call if block

@messages << [severity, message_val.compact.join(' ')]
end
end

def logs
@messages
end

def level
1
end

def level=(new_level)
end
end
end
let(:custom_logger_object) { custom_logger.new }
let(:custom_logger_object) { TestLogger.new }

it 'logs a deprecation warning (#RSA10l)' do
client.auth.authorise
Expand Down
136 changes: 103 additions & 33 deletions spec/acceptance/rest/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1017,36 +1017,10 @@ def encode64(text)

context 'request_id generation' do
context 'Timeout error' do
context 'with request_id', :webmock do
let(:custom_logger) do
Class.new do
def initialize
@messages = []
end

[:fatal, :error, :warn, :info, :debug].each do |severity|
define_method severity do |message, &block|
message_val = [message]
message_val << block.call if block

@messages << [severity, message_val.compact.join(' ')]
end
end

def logs
@messages
end

def level
1
end

def level=(new_level)
end
end
end
let(:custom_logger_object) { custom_logger.new }
context 'with option add_request_ids: true', :webmock, :prevent_log_stubbing do
let(:custom_logger_object) { TestLogger.new }
let(:client_options) { default_options.merge(key: api_key, logger: custom_logger_object, add_request_ids: true) }

before do
@request_id = nil
stub_request(:get, Addressable::Template.new("#{client.endpoint}/time{?request_id}")).with do |request|
Expand All @@ -1055,15 +1029,64 @@ def level=(new_level)
raise Faraday::TimeoutError.new('timeout error message')
end
end

it 'has an error with the same request_id of the request' do
expect{ client.time }.to raise_error(Ably::Exceptions::ConnectionTimeout, /#{@request_id}/)
expect { client.time }.to raise_error(Ably::Exceptions::ConnectionTimeout, /#{@request_id}/)
expect(@request_id).to be_a(String)
expect(@request_id).to_not be_empty
expect(custom_logger_object.logs.find { |severity, message| message.match(/#{@request_id}/i)} ).to_not be_nil
end
end

context 'when specifying fallback hosts', :webmock do
let(:client_options) { { key: api_key, fallback_hosts_use_default: true, add_request_ids: true } }
context 'with option add_request_ids: true and REST operations with a message body' do
let(:client_options) { default_options.merge({ key: api_key, add_request_ids: true }) }
let(:channel_name) { random_str }
let(:channel) { client.channels.get(channel_name) }

context 'with mocks to inspect the params', :webmock do
before do
stub_request(:post, Addressable::Template.new("#{client.endpoint}/channels/#{channel_name}/publish{?request_id}")).
with do |request|
@request_id = request.uri.query_values['request_id']
end.to_return(:status => 200, :body => [], :headers => { 'Content-Type' => 'application/json' })
end

context 'with a single publish' do
it 'succeeds and sends the request_id as a param' do
channel.publish('name', { body: random_str })
expect(@request_id.to_s).to_not be_empty
end
end

context 'with an array publish' do
it 'succeeds and sends the request_id as a param' do
channel.publish([{ body: random_str }, { body: random_str }])
expect(@request_id.to_s).to_not be_empty
end
end
end

context 'without mocks to ensure the requests are accepted' do
context 'with a single publish' do
it 'succeeds and sends the request_id as a param' do
channel.publish('name', { body: random_str })
expect(channel.history.items.length).to eql(1)
end
end

context 'with an array publish' do
it 'succeeds and sends the request_id as a param' do
channel.publish([{ body: random_str }, { body: random_str }])
expect(channel.history.items.length).to eql(2)
end
end
end
end

context 'option add_request_ids: true and specified fallback hosts', :webmock do
let(:client_options) { { key: api_key, fallback_hosts_use_default: true, add_request_ids: true, log_level: :error } }
let(:requests) { [] }

before do
@request_id = nil
hosts = Ably::FALLBACK_HOSTS + ['rest.ably.io']
Expand All @@ -1076,15 +1099,19 @@ def level=(new_level)
end
end
end
it 'request_id is the same across retries' do

specify 'request_id is the same across retries' do
expect{ client.time }.to raise_error(Ably::Exceptions::ConnectionTimeout, /#{@request_id}/)
expect(@request_id).to be_a(String)
expect(@request_id).to_not be_empty
expect(requests.uniq.count).to eql(1)
expect(requests.uniq.first).to eql(@request_id)
end
end

context 'without request_id' do
let(:client_options) { default_options.merge(key: api_key, http_request_timeout: 0) }

it 'does not include request_id in ConnectionTimeout error' do
begin
client.stats
Expand All @@ -1097,6 +1124,7 @@ def level=(new_level)

context 'UnauthorizedRequest nonce error' do
let(:token_params) { { nonce: "samenonce_#{protocol}", timestamp: Time.now.to_i } }

it 'includes request_id in UnauthorizedRequest error due to replayed nonce' do
client1 = Ably::Rest::Client.new(default_options.merge(key: api_key))
client2 = Ably::Rest::Client.new(default_options.merge(key: api_key, add_request_ids: true))
Expand All @@ -1109,5 +1137,47 @@ def level=(new_level)
end
end
end

context 'failed request logging', :prevent_log_stubbing do
let(:custom_logger) { TestLogger.new }
let(:client_options) { default_options.merge(key: api_key, logger: custom_logger) }

it 'is absent when requests do not fail' do
client.time
expect(custom_logger.logs(min_severity: :warn)).to be_empty
end

context 'with the first request failing' do
let(:client_options) do
default_options.merge(
rest_host: 'non.existent.domain.local',
fallback_hosts: [[environment, Ably::Rest::Client::DOMAIN].join('-')],
key: api_key,
logger: custom_logger)
end

it 'is present with success message when requests do not actually fail' do
client.time
expect(custom_logger.logs(min_severity: :warn).select { |severity, msg| msg.match(/Retry/) }.length).to eql(1)
expect(custom_logger.logs(min_severity: :warn).select { |severity, msg| msg.match(/SUCCEEDED/) }.length).to eql(1)
end
end

context 'with all requests failing' do
let(:client_options) do
default_options.merge(
rest_host: 'non.existent.domain.local',
fallback_hosts: ['non2.existent.domain.local'],
key: api_key,
logger: custom_logger)
end

it 'is present when all requests fail' do
expect { client.time }.to raise_error(Ably::Exceptions::ConnectionError)
expect(custom_logger.logs(min_severity: :warn).select { |severity, msg| msg.match(/Retry/) }.length).to be >= 2
expect(custom_logger.logs(min_severity: :error).select { |severity, msg| msg.match(/FAILED/) }.length).to eql(1)
end
end
end
end
end
10 changes: 1 addition & 9 deletions spec/shared/client_initializer_behaviour.rb
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,7 @@ def rest?
end

context 'with custom logger and log_level' do
let(:custom_logger) do
Class.new do
extend Forwardable
def initialize
@logger = Logger.new(STDOUT)
end
def_delegators :@logger, :fatal, :error, :warn, :info, :debug, :level, :level=
end
end
let(:custom_logger) { TestLogger }
let(:client_options) { default_options.merge(logger: custom_logger.new, log_level: Logger::DEBUG, auto_connect: false) }

it 'uses the custom logger' do
Expand Down
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ def console(message)
require 'support/private_api_formatter'
require 'support/protocol_helper'
require 'support/random_helper'
require 'support/test_logger_helper'

require 'rspec_config'

Expand Down
Loading