diff --git a/lib/bunny_mock/channel.rb b/lib/bunny_mock/channel.rb index 55323e0..45147aa 100644 --- a/lib/bunny_mock/channel.rb +++ b/lib/bunny_mock/channel.rb @@ -1,4 +1,5 @@ # frozen_string_literal: true + module BunnyMock class Channel # @@ -278,11 +279,11 @@ def wait_for_confirms(*) # def ack(delivery_tag, multiple = false) if multiple - @acknowledged_state[:pending].keys.each do |key| + @acknowledged_state[:pending].each_key do |key| ack(key, false) if key <= delivery_tag end elsif @acknowledged_state[:pending].key?(delivery_tag) - update_acknowledgement_state(delivery_tag, :acked) + handle_acknowledgement(delivery_tag, :acked) end nil end @@ -300,12 +301,11 @@ def ack(delivery_tag, multiple = false) # def nack(delivery_tag, multiple = false, requeue = false) if multiple - @acknowledged_state[:pending].keys.each do |key| + @acknowledged_state[:pending].each_key do |key| nack(key, false, requeue) if key <= delivery_tag end elsif @acknowledged_state[:pending].key?(delivery_tag) - delivery, header, body = update_acknowledgement_state(delivery_tag, :nacked) - delivery.queue.publish(body, header.to_hash) if requeue + handle_acknowledgement(delivery_tag, :nacked, requeue) end nil end @@ -322,8 +322,7 @@ def nack(delivery_tag, multiple = false, requeue = false) # def reject(delivery_tag, requeue = false) if @acknowledged_state[:pending].key?(delivery_tag) - delivery, header, body = update_acknowledgement_state(delivery_tag, :rejected) - delivery.queue.publish(body, header.to_hash) if requeue + handle_acknowledgement(delivery_tag, :rejected, requeue) end nil end @@ -399,6 +398,41 @@ def xchg_find_or_create(name, opts = {}) @connection.find_exchange(name) || Exchange.declare(self, name, opts) end + # @private + def handle_acknowledgement(delivery_tag, new_state, requeue = false) + delivery, header, body = update_acknowledgement_state(delivery_tag, new_state) + return unless %i[nacked rejected].include?(new_state) + + options = header.to_hash + return delivery.queue.publish(body, options) if requeue + + dlx, routing_key = dead_letter_arguments(delivery.queue) + return unless dlx + options = set_dead_letter_options(options, delivery, routing_key) + exchange(dlx).publish(body, options) + end + + # @private + def set_dead_letter_options(options, delivery, routing_key) + (options[:headers] ||= {})['x-death'] = [{ + 'count' => 1, + 'reason' => 'rejected', + 'queue' => delivery.queue.name, + 'time' => Time.now, + 'exchange' => delivery.exchange, + 'routing_keys' => [delivery.routing_key] + }] + options[:routing_key] = routing_key if routing_key + options + end + + # @private + def dead_letter_arguments(queue) + arguments = queue.opts[:arguments] + return [nil, nil] unless arguments + [arguments['x-dead-letter-exchange'], arguments['x-dead-letter-routing-key']] + end + # @private def update_acknowledgement_state(delivery_tag, new_state) @acknowledged_state[new_state][delivery_tag] = @acknowledged_state[:pending].delete(delivery_tag) diff --git a/spec/integration/message_acknowledgement_spec.rb b/spec/integration/message_acknowledgement_spec.rb index 1911555..c34f72c 100644 --- a/spec/integration/message_acknowledgement_spec.rb +++ b/spec/integration/message_acknowledgement_spec.rb @@ -110,5 +110,63 @@ expect(@channel.acknowledged_state[:pending]).to include(new_delivery_tag) end end + + context 'when having a dead letter exchange defined' do + let(:dlx) { @channel.fanout('test.dlx') } + let(:dlq) { @channel.temporary_queue.bind(dlx) } + before do + dlq + queue.opts.merge!(arguments: { 'x-dead-letter-exchange' => dlx.name }) + end + + it 'should send nacked message to dead letter exchange if specified' do + queue.publish 'Message to nack' + @channel.nack delivery_tags['Message to nack'] + + expect(dlq.message_count).to be 1 + expect(dlq.pop.last).to eq 'Message to nack' + end + + it 'should not send nacked message to dead letter exchange if it is requeued' do + queue.publish 'Message to nack' + @channel.nack delivery_tags['Message to nack'], false, true + + expect(dlq.message_count).to be 0 + end + + it 'should send rejected message to dead letter exchange if specified' do + queue.publish 'Message to reject' + @channel.reject delivery_tags['Message to reject'] + + expect(dlq.message_count).to be 1 + _, properties, message = dlq.pop + expect(message).to eq 'Message to reject' + xdeath_headers = properties[:headers]['x-death'].first + expect(xdeath_headers['count']).to eq 1 + expect(xdeath_headers['queue']).to eq queue.name + expect(xdeath_headers['reason']).to eq 'rejected' + expect(xdeath_headers['routing_keys']).to eq [queue.name] + end + + it 'should not send rejected message to dead letter exchange if it is requeued' do + queue.publish 'Message to reject' + @channel.nack delivery_tags['Message to reject'], false, true + + expect(dlq.message_count).to be 0 + end + + it 'should be possible to overwrite the dead letter routing key' do + queue.opts.merge!(arguments: { 'x-dead-letter-exchange' => dlx.name, 'x-dead-letter-routing-key' => 'dl_key' }) + queue.publish 'Message to reject' + @channel.reject delivery_tags['Message to reject'] + + expect(dlq.message_count).to be 1 + delivery_info, headers, payload = dlq.pop + expect(payload).to eq 'Message to reject' + expect(headers[:routing_key]).to eq 'dl_key' + expect(delivery_info[:routing_key]).to eq 'dl_key' + end + + end end end