Skip to content

Commit

Permalink
implement basic dead letter exchange behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
aiomaster committed Nov 7, 2017
1 parent 468fe86 commit 1ca670d
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 7 deletions.
48 changes: 41 additions & 7 deletions lib/bunny_mock/channel.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# frozen_string_literal: true

module BunnyMock
class Channel
#
Expand Down Expand Up @@ -278,11 +279,11 @@ def wait_for_confirms(*)
#
def ack(delivery_tag, multiple = false)
if multiple
@acknowledged_state[:pending].keys.each do |key|
@acknowledged_state[:pending].each_key do |key|
ack(key, false) if key <= delivery_tag
end
elsif @acknowledged_state[:pending].key?(delivery_tag)
update_acknowledgement_state(delivery_tag, :acked)
handle_acknowledgement(delivery_tag, :acked)
end
nil
end
Expand All @@ -300,12 +301,11 @@ def ack(delivery_tag, multiple = false)
#
def nack(delivery_tag, multiple = false, requeue = false)
if multiple
@acknowledged_state[:pending].keys.each do |key|
@acknowledged_state[:pending].each_key do |key|
nack(key, false, requeue) if key <= delivery_tag
end
elsif @acknowledged_state[:pending].key?(delivery_tag)
delivery, header, body = update_acknowledgement_state(delivery_tag, :nacked)
delivery.queue.publish(body, header.to_hash) if requeue
handle_acknowledgement(delivery_tag, :nacked, requeue)
end
nil
end
Expand All @@ -322,8 +322,7 @@ def nack(delivery_tag, multiple = false, requeue = false)
#
def reject(delivery_tag, requeue = false)
if @acknowledged_state[:pending].key?(delivery_tag)
delivery, header, body = update_acknowledgement_state(delivery_tag, :rejected)
delivery.queue.publish(body, header.to_hash) if requeue
handle_acknowledgement(delivery_tag, :rejected, requeue)
end
nil
end
Expand Down Expand Up @@ -399,6 +398,41 @@ def xchg_find_or_create(name, opts = {})
@connection.find_exchange(name) || Exchange.declare(self, name, opts)
end

# @private
def handle_acknowledgement(delivery_tag, new_state, requeue = false)
delivery, header, body = update_acknowledgement_state(delivery_tag, new_state)
return unless %i[nacked rejected].include?(new_state)

options = header.to_hash
return delivery.queue.publish(body, options) if requeue

dlx, routing_key = dead_letter_arguments(delivery.queue)
return unless dlx
options = set_dead_letter_options(options, delivery, routing_key)
exchange(dlx).publish(body, options)
end

# @private
def set_dead_letter_options(options, delivery, routing_key)
(options[:headers] ||= {})['x-death'] = [{
'count' => 1,
'reason' => 'rejected',
'queue' => delivery.queue.name,
'time' => Time.now,
'exchange' => delivery.exchange,
'routing_keys' => [delivery.routing_key]
}]
options[:routing_key] = routing_key if routing_key
options
end

# @private
def dead_letter_arguments(queue)
arguments = queue.opts[:arguments]
return [nil, nil] unless arguments
[arguments['x-dead-letter-exchange'], arguments['x-dead-letter-routing-key']]
end

# @private
def update_acknowledgement_state(delivery_tag, new_state)
@acknowledged_state[new_state][delivery_tag] = @acknowledged_state[:pending].delete(delivery_tag)
Expand Down
58 changes: 58 additions & 0 deletions spec/integration/message_acknowledgement_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,63 @@
expect(@channel.acknowledged_state[:pending]).to include(new_delivery_tag)
end
end

context 'when having a dead letter exchange defined' do
let(:dlx) { @channel.fanout('test.dlx') }
let(:dlq) { @channel.temporary_queue.bind(dlx) }
before do
dlq
queue.opts.merge!(arguments: { 'x-dead-letter-exchange' => dlx.name })
end

it 'should send nacked message to dead letter exchange if specified' do
queue.publish 'Message to nack'
@channel.nack delivery_tags['Message to nack']

expect(dlq.message_count).to be 1
expect(dlq.pop.last).to eq 'Message to nack'
end

it 'should not send nacked message to dead letter exchange if it is requeued' do
queue.publish 'Message to nack'
@channel.nack delivery_tags['Message to nack'], false, true

expect(dlq.message_count).to be 0
end

it 'should send rejected message to dead letter exchange if specified' do
queue.publish 'Message to reject'
@channel.reject delivery_tags['Message to reject']

expect(dlq.message_count).to be 1
_, properties, message = dlq.pop
expect(message).to eq 'Message to reject'
xdeath_headers = properties[:headers]['x-death'].first
expect(xdeath_headers['count']).to eq 1
expect(xdeath_headers['queue']).to eq queue.name
expect(xdeath_headers['reason']).to eq 'rejected'
expect(xdeath_headers['routing_keys']).to eq [queue.name]
end

it 'should not send rejected message to dead letter exchange if it is requeued' do
queue.publish 'Message to reject'
@channel.nack delivery_tags['Message to reject'], false, true

expect(dlq.message_count).to be 0
end

it 'should be possible to overwrite the dead letter routing key' do
queue.opts.merge!(arguments: { 'x-dead-letter-exchange' => dlx.name, 'x-dead-letter-routing-key' => 'dl_key' })
queue.publish 'Message to reject'
@channel.reject delivery_tags['Message to reject']

expect(dlq.message_count).to be 1
delivery_info, headers, payload = dlq.pop
expect(payload).to eq 'Message to reject'
expect(headers[:routing_key]).to eq 'dl_key'
expect(delivery_info[:routing_key]).to eq 'dl_key'
end

end
end
end

0 comments on commit 1ca670d

Please sign in to comment.