From 6729e0d6dce8cf42f28a2faa5a28d8b2145695f4 Mon Sep 17 00:00:00 2001 From: Christoph Wagner Date: Wed, 25 Oct 2017 22:54:26 +0200 Subject: [PATCH] implement basic dead letter exchange behaviour --- lib/bunny_mock/channel.rb | 29 +++++++++-- .../message_acknowledgement_spec.rb | 52 +++++++++++++++++++ 2 files changed, 76 insertions(+), 5 deletions(-) diff --git a/lib/bunny_mock/channel.rb b/lib/bunny_mock/channel.rb index 55323e0..dcd17c9 100644 --- a/lib/bunny_mock/channel.rb +++ b/lib/bunny_mock/channel.rb @@ -282,7 +282,7 @@ def ack(delivery_tag, multiple = false) ack(key, false) if key <= delivery_tag end elsif @acknowledged_state[:pending].key?(delivery_tag) - update_acknowledgement_state(delivery_tag, :acked) + handle_acknowledgement(delivery_tag, :acked) end nil end @@ -304,8 +304,7 @@ def nack(delivery_tag, multiple = false, requeue = false) nack(key, false, requeue) if key <= delivery_tag end elsif @acknowledged_state[:pending].key?(delivery_tag) - delivery, header, body = update_acknowledgement_state(delivery_tag, :nacked) - delivery.queue.publish(body, header.to_hash) if requeue + handle_acknowledgement(delivery_tag, :nacked, requeue) end nil end @@ -322,8 +321,7 @@ def nack(delivery_tag, multiple = false, requeue = false) # def reject(delivery_tag, requeue = false) if @acknowledged_state[:pending].key?(delivery_tag) - delivery, header, body = update_acknowledgement_state(delivery_tag, :rejected) - delivery.queue.publish(body, header.to_hash) if requeue + handle_acknowledgement(delivery_tag, :rejected, requeue) end nil end @@ -399,6 +397,27 @@ def xchg_find_or_create(name, opts = {}) @connection.find_exchange(name) || Exchange.declare(self, name, opts) end + # @private + def handle_acknowledgement(delivery_tag, new_state, requeue = false) + delivery, header, body = update_acknowledgement_state(delivery_tag, new_state) + return unless %i(nacked rejected).include?(new_state) + + options = header.to_hash + return delivery.queue.publish(body, options) if requeue + + dlx, routing_key = dead_letter_arguments(delivery.queue) + return unless dlx + options[:routing_key] = routing_key if routing_key + exchange(dlx).publish(body, options) + end + + # @private + def dead_letter_arguments(queue) + arguments = queue.opts[:arguments] + return [nil, nil] unless arguments + [arguments['x-dead-letter-exchange'], arguments['x-dead-letter-routing-key']] + end + # @private def update_acknowledgement_state(delivery_tag, new_state) @acknowledged_state[new_state][delivery_tag] = @acknowledged_state[:pending].delete(delivery_tag) diff --git a/spec/integration/message_acknowledgement_spec.rb b/spec/integration/message_acknowledgement_spec.rb index 1911555..44eb6c0 100644 --- a/spec/integration/message_acknowledgement_spec.rb +++ b/spec/integration/message_acknowledgement_spec.rb @@ -110,5 +110,57 @@ expect(@channel.acknowledged_state[:pending]).to include(new_delivery_tag) end end + + context 'when having a dead letter exchange defined' do + let(:dlx) { @channel.fanout('test.dlx') } + let(:dlq) { @channel.temporary_queue.bind(dlx) } + before do + dlq + queue.opts.merge!(arguments: { 'x-dead-letter-exchange' => dlx.name }) + end + + it 'should send nacked message to dead letter exchange if specified' do + queue.publish 'Message to nack' + @channel.nack delivery_tags['Message to nack'] + + expect(dlq.message_count).to be 1 + expect(dlq.pop.last).to eq 'Message to nack' + end + + it 'should not send nacked message to dead letter exchange if it is requeued' do + queue.publish 'Message to nack' + @channel.nack delivery_tags['Message to nack'], false, true + + expect(dlq.message_count).to be 0 + end + + it 'should send rejected message to dead letter exchange if specified' do + queue.publish 'Message to reject' + @channel.reject delivery_tags['Message to reject'] + + expect(dlq.message_count).to be 1 + expect(dlq.pop.last).to eq 'Message to reject' + end + + it 'should not send rejected message to dead letter exchange if it is requeued' do + queue.publish 'Message to reject' + @channel.nack delivery_tags['Message to reject'], false, true + + expect(dlq.message_count).to be 0 + end + + it 'should be possible to overwrite the dead letter routing key' do + queue.opts.merge!(arguments: { 'x-dead-letter-exchange' => dlx.name, 'x-dead-letter-routing-key' => 'dl_key' }) + queue.publish 'Message to reject' + @channel.reject delivery_tags['Message to reject'] + + expect(dlq.message_count).to be 1 + delivery_info, headers, payload = dlq.pop + expect(payload).to eq 'Message to reject' + expect(headers[:routing_key]).to eq 'dl_key' + expect(delivery_info[:routing_key]).to eq 'dl_key' + end + + end end end