Skip to content

Commit

Permalink
implement basic dead letter exchange behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
aiomaster committed Oct 26, 2017
1 parent 468fe86 commit 6729e0d
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 5 deletions.
29 changes: 24 additions & 5 deletions lib/bunny_mock/channel.rb
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ def ack(delivery_tag, multiple = false)
ack(key, false) if key <= delivery_tag
end
elsif @acknowledged_state[:pending].key?(delivery_tag)
update_acknowledgement_state(delivery_tag, :acked)
handle_acknowledgement(delivery_tag, :acked)
end
nil
end
Expand All @@ -304,8 +304,7 @@ def nack(delivery_tag, multiple = false, requeue = false)
nack(key, false, requeue) if key <= delivery_tag
end
elsif @acknowledged_state[:pending].key?(delivery_tag)
delivery, header, body = update_acknowledgement_state(delivery_tag, :nacked)
delivery.queue.publish(body, header.to_hash) if requeue
handle_acknowledgement(delivery_tag, :nacked, requeue)
end
nil
end
Expand All @@ -322,8 +321,7 @@ def nack(delivery_tag, multiple = false, requeue = false)
#
def reject(delivery_tag, requeue = false)
if @acknowledged_state[:pending].key?(delivery_tag)
delivery, header, body = update_acknowledgement_state(delivery_tag, :rejected)
delivery.queue.publish(body, header.to_hash) if requeue
handle_acknowledgement(delivery_tag, :rejected, requeue)
end
nil
end
Expand Down Expand Up @@ -399,6 +397,27 @@ def xchg_find_or_create(name, opts = {})
@connection.find_exchange(name) || Exchange.declare(self, name, opts)
end

# @private
def handle_acknowledgement(delivery_tag, new_state, requeue = false)
delivery, header, body = update_acknowledgement_state(delivery_tag, new_state)
return unless %i(nacked rejected).include?(new_state)

options = header.to_hash
return delivery.queue.publish(body, options) if requeue

dlx, routing_key = dead_letter_arguments(delivery.queue)
return unless dlx
options[:routing_key] = routing_key if routing_key
exchange(dlx).publish(body, options)
end

# @private
def dead_letter_arguments(queue)
arguments = queue.opts[:arguments]
return [nil, nil] unless arguments
[arguments['x-dead-letter-exchange'], arguments['x-dead-letter-routing-key']]
end

# @private
def update_acknowledgement_state(delivery_tag, new_state)
@acknowledged_state[new_state][delivery_tag] = @acknowledged_state[:pending].delete(delivery_tag)
Expand Down
52 changes: 52 additions & 0 deletions spec/integration/message_acknowledgement_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -110,5 +110,57 @@
expect(@channel.acknowledged_state[:pending]).to include(new_delivery_tag)
end
end

context 'when having a dead letter exchange defined' do
let(:dlx) { @channel.fanout('test.dlx') }
let(:dlq) { @channel.temporary_queue.bind(dlx) }
before do
dlq
queue.opts.merge!(arguments: { 'x-dead-letter-exchange' => dlx.name })
end

it 'should send nacked message to dead letter exchange if specified' do
queue.publish 'Message to nack'
@channel.nack delivery_tags['Message to nack']

expect(dlq.message_count).to be 1
expect(dlq.pop.last).to eq 'Message to nack'
end

it 'should not send nacked message to dead letter exchange if it is requeued' do
queue.publish 'Message to nack'
@channel.nack delivery_tags['Message to nack'], false, true

expect(dlq.message_count).to be 0
end

it 'should send rejected message to dead letter exchange if specified' do
queue.publish 'Message to reject'
@channel.reject delivery_tags['Message to reject']

expect(dlq.message_count).to be 1
expect(dlq.pop.last).to eq 'Message to reject'
end

it 'should not send rejected message to dead letter exchange if it is requeued' do
queue.publish 'Message to reject'
@channel.nack delivery_tags['Message to reject'], false, true

expect(dlq.message_count).to be 0
end

it 'should be possible to overwrite the dead letter routing key' do
queue.opts.merge!(arguments: { 'x-dead-letter-exchange' => dlx.name, 'x-dead-letter-routing-key' => 'dl_key' })
queue.publish 'Message to reject'
@channel.reject delivery_tags['Message to reject']

expect(dlq.message_count).to be 1
delivery_info, headers, payload = dlq.pop
expect(payload).to eq 'Message to reject'
expect(headers[:routing_key]).to eq 'dl_key'
expect(delivery_info[:routing_key]).to eq 'dl_key'
end

end
end
end

0 comments on commit 6729e0d

Please sign in to comment.