diff --git a/.ruby-version b/.ruby-version index bb576db..cc6c9a4 100644 --- a/.ruby-version +++ b/.ruby-version @@ -1 +1 @@ -2.3 +2.3.5 diff --git a/README.md b/README.md index 50c32ea..1d36142 100644 --- a/README.md +++ b/README.md @@ -65,13 +65,28 @@ consumer.terminate The Nsq::Producer constructor takes the following options: -| Option | Description | Default | -|---------------|----------------------------------------|--------------------| -| `topic` | Topic to which to publish messages | | -| `nsqd` | Host and port of the nsqd instance | '127.0.0.1:4150' | -| `nsqlookupd` | Use lookupd to auto discover nsqds | | -| `tls_v1` | Flag for tls v1 connections | false | -| `tls_options` | Optional keys+certs for TLS connections| | +| Option | Description | Default | +|----------------|----------------------------------------|--------------------| +| `topic` | Topic to which to publish messages | | +| `nsqd` | Host and port of the nsqd instance | '127.0.0.1:4150' | +| `tls_v1` | Flag for tls v1 connections | false | +| `tls_options` | Optional keys+certs for TLS connections| | +| `synchronous` | Wait for acknowledgement on publish | false | + +Following options are only taken into account if the producer is configured as +synchronous: + +| Option | Description | Default | +|------------------|----------------------------------------|--------------------| +| `ok_timeout` | Time to wait acknowledgement (secs) | 3 | +| `retry_attempts` | Number of attempts to retry publishing | 3 | +| | before throwing an exception | | + + +**Note:** By default, producers are asynchronous, we don't wait for nsqd to +acknowledge our writes. As a result, if the connection to nsqd fails, you can +lose messages. This is acceptable for our use cases, mostly because we are +sending messages to a local nsqd instance and failure is very rare. For example, if you'd like to publish messages to a single nsqd. @@ -82,23 +97,15 @@ producer = Nsq::Producer.new( ) ``` -Alternatively, you can use nsqlookupd to find all nsqd nodes in the cluster. -When you instantiate Nsq::Producer in this way, it will automatically maintain -connections to all nsqd instances. When you publish a message, it will be sent -to a random nsqd instance. - -```Ruby -producer = Nsq::Producer.new( - nsqlookupd: ['1.2.3.4:4161', '6.7.8.9:4161'], - topic: 'topic-of-great-esteem' -) -``` +> A producer should is connecting to one single NSQd instance and can't find +> topic through nsqlookupd. This behavior is the one expected by the NSQ maintainers: +> [https://github.com/nsqio/nsq/issues/159](https://github.com/nsqio/nsq/issues/159) If you need to connect using SSL/TLS Authentication via `tls_options` ```Ruby producer = Nsq::Producer.new( - nsqlookupd: ['1.2.3.4:4161', '6.7.8.9:4161'], + nsqd: '6.7.8.9:4150', topic: 'topic-of-great-esteem', tls_v1: true, tls_options: { @@ -114,7 +121,7 @@ If you need to connect using simple `tls_v1` ```Ruby producer = Nsq::Producer.new( - nsqlookupd: ['1.2.3.4:4161', '6.7.8.9:4161'], + nsqd: '6.7.8.9:4150', topic: 'topic-of-great-esteem', tls_v1: true ) @@ -144,12 +151,6 @@ producing messages faster than we're able to send them to nsqd or nsqd is offline for an extended period and you accumulate 10,000 messages in the queue, calls to `#write` will block until there's room in the queue. -**Note:** We don't wait for nsqd to acknowledge our writes. As a result, if the -connection to nsqd fails, you can lose messages. This is acceptable for our use -cases, mostly because we are sending messages to a local nsqd instance and -failure is very rare. - - ### `#write_to_topic` Publishes one or more messages to nsqd. Like `#write`, but allows you to specify @@ -184,7 +185,23 @@ these messages to be lost. After you write your last message, consider sleeping for a second before you call `#terminate`. +### NsqdsProducer + +This producer aims at producing to multiple nsqd instances. Either to distribute +the messages or to behave as a failover, when an nsqd instance is down. + +Its attributes are based on the `Producer` class except that it doesn't have `:nsqd`, +and it has the additional parameters: + +**Note:** if the producer is not synchronous, the failover `:strategy` won't behave +correctly. As `write` would never fail. + +| Option | Description | Default | +|----------------|----------------------------------------|----------------------| +| `strategy` | Can be `:failover` or `:roundrobin` | `:failover` | +| `nsqds` | Array of host and port of the nsqds | `['127.0.0.1:4150']` | +The methods are simillar to the `Producer` class. ## Consumer diff --git a/lib/nsq.rb b/lib/nsq.rb index c94b2a9..50523dd 100644 --- a/lib/nsq.rb +++ b/lib/nsq.rb @@ -11,3 +11,4 @@ require_relative 'nsq/consumer' require_relative 'nsq/producer' +require_relative 'nsq/nsqds_producer' diff --git a/lib/nsq/connection.rb b/lib/nsq/connection.rb index e142f6c..941559f 100644 --- a/lib/nsq/connection.rb +++ b/lib/nsq/connection.rb @@ -3,9 +3,12 @@ require 'openssl' require 'timeout' +require_relative 'retry' +require_relative 'exceptions' require_relative 'frames/error' require_relative 'frames/message' require_relative 'frames/response' +require_relative 'selectable_queue' require_relative 'logger' module Nsq @@ -26,6 +29,7 @@ class Connection def initialize(opts = {}) @host = opts[:host] || (raise ArgumentError, 'host is required') @port = opts[:port] || (raise ArgumentError, 'port is required') + @response_queue = opts[:response_queue] @queue = opts[:queue] @topic = opts[:topic] @channel = opts[:channel] @@ -56,11 +60,11 @@ def initialize(opts = {}) end # for outgoing communication - @write_queue = SizedQueue.new(10000) + @write_queue = SelectableQueue.new(10000) # For indicating that the connection has died. # We use a Queue so we don't have to poll. Used to communicate across - # threads (from write_loop and read_loop to connect_and_monitor). + # threads (from read_write_loop to connect_and_monitor). @death_queue = Queue.new @connected = false @@ -83,11 +87,6 @@ def close end - def sub(topic, channel) - write "SUB #{topic} #{channel}\n" - end - - def rdy(count) write "RDY #{count}\n" end @@ -139,6 +138,10 @@ def re_up_ready private + def sub(topic, channel) + write_to_socket "SUB #{topic} #{channel}\n" + end + def cls write "CLS\n" end @@ -150,16 +153,14 @@ def nop def write(raw) - @write_queue.push(raw) + @write_queue.push(message: raw) end - def write_to_socket(raw) debug ">>> #{raw.inspect}" @socket.write(raw) end - def identify hostname = Socket.gethostname metadata = { @@ -195,12 +196,20 @@ def handle_response(frame) debug 'Received heartbeat' nop elsif frame.data == RESPONSE_OK + @response_queue.push(frame) if @response_queue debug 'Received OK' else die "Received response we don't know how to handle: #{frame.data}" end end + def handle_error(frame) + if @response_queue + @response_queue.push(frame) + else + error "Error received: #{frame.data}" + end + end def receive_frame if buffer = @socket.read(8) @@ -212,7 +221,6 @@ def receive_frame end end - FRAME_CLASSES = [Response, Error, Message] def frame_class_for_type(type) raise "Bad frame type specified: #{type}" if type > FRAME_CLASSES.length - 1 @@ -231,71 +239,59 @@ def decrement_in_flight end - def start_read_loop - @read_loop_thread ||= Thread.new{read_loop} + def start_read_write_loop + @read_write_loop_thread ||= Thread.new{read_write_loop} end - def stop_read_loop - @read_loop_thread.kill if @read_loop_thread - @read_loop_thread = nil + def stop_read_write_loop + # if the loop has died because of a connection error, the thread is + # already stopped, otherwise we want to terminate the producer connection + # and a custom-made message is sent signaling to the loop to stop + # gracefully + if @read_write_loop_thread + @write_queue.push(message: :stop_loop) if @read_write_loop_thread.alive? + @read_write_loop_thread.join + @read_write_loop_thread = nil + end end - - def read_loop + def read_write_loop loop do - frame = receive_frame - if frame.is_a?(Response) - handle_response(frame) - elsif frame.is_a?(Error) - error "Error received: #{frame.data}" - elsif frame.is_a?(Message) - debug "<<< #{frame.body}" - if @max_attempts && frame.attempts > @max_attempts - fin(frame.id) - else - @queue.push(frame) if @queue + begin + ready, _, _ = IO.select([@socket, @write_queue]) + + if ready.include?(@socket) + frame = receive_frame + if frame.is_a?(Response) + handle_response(frame) + elsif frame.is_a?(Error) + handle_error(frame) + elsif frame.is_a?(Message) + debug "<<< #{frame.body}" + if @max_attempts && frame.attempts > @max_attempts + fin(frame.id) + else + @queue.push(frame) if @queue + end + else + die(UnexpectedFrameError.new(frame)) + end end - else - raise 'No data from socket' - end - end - rescue Exception => ex - die(ex) - end - - - def start_write_loop - @write_loop_thread ||= Thread.new{write_loop} - end - - - def stop_write_loop - if @write_loop_thread - @write_queue.push(:stop_write_loop) - @write_loop_thread.join - end - @write_loop_thread = nil - end - - def write_loop - data = nil - loop do - data = @write_queue.pop - break if data == :stop_write_loop - write_to_socket(data) + if ready.include?(@write_queue) + data = @write_queue.pop + return if data[:message] == :stop_loop + write_to_socket(data[:message]) + end + rescue IO::WaitReadable + retry + end end rescue Exception => ex - # requeue PUB and MPUB commands - if data =~ /^M?PUB/ - debug "Requeueing to write_queue: #{data.inspect}" - @write_queue.push(data) - end die(ex) end - # Waits for death of connection def start_monitoring_connection @connection_monitor_thread ||= Thread.new{monitor_connection} @@ -308,7 +304,6 @@ def stop_monitoring_connection @connection_monitor = nil end - def monitor_connection loop do # wait for death, hopefully it never comes @@ -316,7 +311,7 @@ def monitor_connection warn "Died from: #{cause_of_death}" debug 'Reconnecting...' - reconnect + reconnect(cause_of_death) debug 'Reconnected!' # clear all death messages, since we're now reconnected. @@ -328,13 +323,25 @@ def monitor_connection # close the connection if it's not already closed and try to reconnect # over and over until we succeed! - def reconnect + def reconnect(cause_of_death) close_connection - with_retries do + Nsq::with_retries do + # If a synchronous producer received messages during the reconnection + # period those messages must fail if they expect an acknowledgement + # Between each reconnection attempt, we ensure the `connection.write` + # are not blocked by returning the exception which lead to the initial + # disconnection. + push_error_pending_writes cause_of_death if @response_queue open_connection end end + def push_error_pending_writes cause_of_death + while !@write_queue.empty? + data = @write_queue.pop + @response_queue.push(cause_of_death) if @response_queue + end + end def open_connection @socket = TCPSocket.new(@host, @port) @@ -344,24 +351,25 @@ def open_connection identify upgrade_to_ssl_socket if @tls_v1 - start_read_loop - start_write_loop @connected = true # we need to re-subscribe if there's a topic specified if @topic debug "Subscribing to #{@topic}" sub(@topic, @channel) + frame = receive_frame + raise ErrorFrameException(frame.data) if frame.is_a?(Error) re_up_ready end + + start_read_write_loop end # closes the connection and stops listening for messages def close_connection cls if connected? - stop_read_loop - stop_write_loop + stop_read_write_loop @socket.close if @socket @socket = nil @connected = false @@ -397,50 +405,6 @@ def openssl_context context end - - # Retry the supplied block with exponential backoff. - # - # Borrowed liberally from: - # https://github.com/ooyala/retries/blob/master/lib/retries.rb - def with_retries(&block) - base_sleep_seconds = 0.5 - max_sleep_seconds = 300 # 5 minutes - - # Let's do this thing - attempts = 0 - - begin - attempts += 1 - return block.call(attempts) - - rescue Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EHOSTUNREACH, - Errno::ENETDOWN, Errno::ENETUNREACH, Errno::ETIMEDOUT, Timeout::Error => ex - - raise ex if attempts >= 100 - - # The sleep time is an exponentially-increasing function of base_sleep_seconds. - # But, it never exceeds max_sleep_seconds. - sleep_seconds = [base_sleep_seconds * (2 ** (attempts - 1)), max_sleep_seconds].min - # Randomize to a random value in the range sleep_seconds/2 .. sleep_seconds - sleep_seconds = sleep_seconds * (0.5 * (1 + rand())) - # But never sleep less than base_sleep_seconds - sleep_seconds = [base_sleep_seconds, sleep_seconds].max - - warn "Failed to connect: #{ex}. Retrying in #{sleep_seconds.round(1)} seconds." - - snooze(sleep_seconds) - - retry - end - end - - - # Se we can stub for testing and reconnect in a tight loop - def snooze(t) - sleep(t) - end - - def server_needs_rdy_re_ups? # versions less than 0.3.0 need RDY re-ups # see: https://github.com/bitly/nsq/blob/master/ChangeLog.md#030---2014-11-18 diff --git a/lib/nsq/exceptions.rb b/lib/nsq/exceptions.rb index e641d1c..1bd5166 100644 --- a/lib/nsq/exceptions.rb +++ b/lib/nsq/exceptions.rb @@ -1,5 +1,19 @@ module Nsq # Raised when nsqlookupd discovery fails - class DiscoveryException < Exception; end -end + class DiscoveryException < StandardError; end + + class ErrorFrameException < StandardError; end + class UnexpectedFrameError < StandardError + def initialize(frame) + @frame = frame + end + + def message + if @frame + return "unexpected frame value #{frame.data}" + end + return 'empty frame from socket' + end + end +end diff --git a/lib/nsq/nsqds_producer.rb b/lib/nsq/nsqds_producer.rb new file mode 100644 index 0000000..2d88ea9 --- /dev/null +++ b/lib/nsq/nsqds_producer.rb @@ -0,0 +1,75 @@ +module Nsq + class NsqdsProducer + include Nsq::AttributeLogger + + STRATEGY_FAILOVER = :failover + STRATEGY_ROUNDROBIN = :round_robin + STRATEGIES = [STRATEGY_FAILOVER, STRATEGY_ROUNDROBIN].freeze + + attr_reader :topic + + def initialize(opts = {}) + @nsqds = opts.delete(:nsqds) + @strategy = opts.delete(:strategy) || STRATEGY_FAILOVER + @attempts = opts.delete(:strategy_attempts) + @topic = opts[:topic] + + if !STRATEGIES.include?(@strategy) + raise ArgumentError, "strategy should be one of #{STRATEGIES.join(", ")}" + end + + if @nsqds && !@nsqds.is_a?(Array) + raise ArgumentError, "nsqds should be an array of hosts 'host:port'" + elsif !@nsqds + @nsqds = ['127.0.0.1:4150'] + end + + @index = 0 + @producers = @nsqds.map do |nsqd| + Producer.new(opts.merge(nsqd: nsqd)) + end + end + + def terminate + @producers.each do |producer| + producer.terminate + end + end + + def write(*raw_messages) + each_provider(:write, *raw_messages) + end + + def deferred_write(delay, *raw_messages) + each_provider(:deferred_write, delay, *raw_messages) + end + + def deferred_write_to_topic(topic, delay, *raw_messages) + each_provider(:deferred_write_to_topic, topic, delay, *raw_messages) + end + + def write_to_topic(topic, *raw_messages) + each_provider(:write_to_topic, topic, *raw_messages) + end + + protected + + def each_provider(action, *args) + attempt = 0 + begin + @producers[@index].send(action, *args) + inc_index if @strategy == STRATEGY_ROUNDROBIN + rescue => ex + error producer: @producers[@index].nsqd, msg: "fail to #{action} message: #{ex.message}", exception: ex.class + inc_index + attempt += 1 + raise ex if attempt == @attempts + retry + end + end + + def inc_index + @index = (@index + 1 ) % @producers.length + end + end +end diff --git a/lib/nsq/producer.rb b/lib/nsq/producer.rb index fa3e471..0619397 100644 --- a/lib/nsq/producer.rb +++ b/lib/nsq/producer.rb @@ -1,32 +1,42 @@ +require_relative 'exceptions' +require_relative 'selectable_queue' +require_relative 'retry' require_relative 'client_base' module Nsq class Producer < ClientBase - attr_reader :topic + attr_reader :topic, :nsqd def initialize(opts = {}) @connections = {} + @nsqd = opts[:nsqd] @topic = opts[:topic] + @synchronous = opts[:synchronous] || false @discovery_interval = opts[:discovery_interval] || 60 @ssl_context = opts[:ssl_context] @tls_options = opts[:tls_options] @tls_v1 = opts[:tls_v1] + @retry_attempts = opts[:retry_attempts] || 3 - nsqlookupds = [] - if opts[:nsqlookupd] - nsqlookupds = [opts[:nsqlookupd]].flatten - discover_repeatedly( - nsqlookupds: nsqlookupds, - interval: @discovery_interval - ) + @ok_timeout = opts[:ok_timeout] || 3 + @write_queue = SelectableQueue.new(10000) - elsif opts[:nsqd] - nsqds = [opts[:nsqd]].flatten - nsqds.each{|d| add_connection(d)} + @response_queue = SelectableQueue.new(10000) if @synchronous + if @nsqd + raise ArgumentError, "should be a string 'host:port'" if !@nsqd.is_a?(String) + @connection = add_connection(@nsqd, response_queue: @response_queue) else - add_connection('127.0.0.1:4150') + @connection = add_connection('127.0.0.1:4150', response_queue: @response_queue) end + + @router_thread = Thread.new { start_router() } + end + + def terminate + stop_router + @router_thread.join + super end def write(*raw_messages) @@ -49,6 +59,15 @@ def deferred_write(delay, *raw_messages) deferred_write_to_topic(@topic, delay, *raw_messages) end + def deferred_write_to_topic(topic, delay, *raw_messages) + raise ArgumentError, 'message not provided' if raw_messages.empty? + messages = raw_messages.map(&:to_s) + messages.each do |msg| + msg = {op: :dpub, topic: topic, at: (delay * 1000).to_i, payload: msg} + queue(msg) + end + end + def write_to_topic(topic, *raw_messages) # return error if message(s) not provided raise ArgumentError, 'message not provided' if raw_messages.empty? @@ -56,39 +75,64 @@ def write_to_topic(topic, *raw_messages) # stringify the messages messages = raw_messages.map(&:to_s) - # get a suitable connection to write to - connection = connection_for_write - if messages.length > 1 - connection.mpub(topic, messages) + msg = { op: :mpub, topic: topic, payload: messages } else - connection.pub(topic, messages.first) + msg = { op: :pub, topic: topic, payload: messages.first } end - end - def deferred_write_to_topic(topic, delay, *raw_messages) - raise ArgumentError, 'message not provided' if raw_messages.empty? - messages = raw_messages.map(&:to_s) - connection = connection_for_write - messages.each do |msg| - connection.dpub(topic, (delay * 1000).to_i, msg) - end + queue(msg) end private - def connection_for_write - # Choose a random Connection that's currently connected - # Or, if there's nothing connected, just take any random one - connections_currently_connected = connections.select{|_,c| c.connected?} - connection = connections_currently_connected.values.sample || connections.values.sample - - # Raise an exception if there's no connection available - unless connection - raise 'No connections available' + + def queue(msg) + Nsq::with_retries max_attempts: @retry_attempts do + msg[:result] = SizedQueue.new(1) if @synchronous + @write_queue.push(msg) + if msg[:result] + Timeout::timeout(@ok_timeout) do + value = msg[:result].pop + raise value if value.is_a?(Exception) + end + end end + end - connection + def start_router + transactions = [] + queues = [@write_queue] + queues << @response_queue if @response_queue + loop do + ready, _, _ = IO::select(queues) + if ready.include?(@response_queue) + frame = @response_queue.pop + result = transactions.pop + next if result.nil? + if frame.is_a?(Exception) + result.push(frame) + elsif frame.is_a?(Response) + result.push(nil) + elsif frame.is_a?(Error) + result.push(ErrorFrameException.new(frame.data)) + else + result.push(InvalidFrameException.new(frame.data)) + end + else + data = @write_queue.pop + return if data[:op] == :stop_router + if data[:op] == :dpub + @connection.send(data[:op], data[:topic], data[:at], data[:payload]) + else + @connection.send(data[:op], data[:topic], data[:payload]) + end + transactions.push(data[:result]) + end + end end + def stop_router + @write_queue.push(op: :stop_router) + end end end diff --git a/lib/nsq/retry.rb b/lib/nsq/retry.rb new file mode 100644 index 0000000..f3b16b8 --- /dev/null +++ b/lib/nsq/retry.rb @@ -0,0 +1,42 @@ +require_relative 'exceptions' + +require 'timeout' + +module Nsq + # Retry the supplied block with exponential backoff. + # + # Borrowed liberally from: + # https://github.com/ooyala/retries/blob/master/lib/retries.rb + def self.with_retries(opts = {}, &block) + base_sleep_seconds = 0.5 + max_sleep_seconds = 300 # 5 minutes + + # Let's do this thing + attempts = 0 + max_attempts = opts[:max_attempts] || 100 + + begin + attempts += 1 + return block.call(attempts) + + rescue UnexpectedFrameError, ErrorFrameException, Errno::ECONNREFUSED, Errno::ECONNRESET, Errno::EHOSTUNREACH, + Errno::ENETDOWN, Errno::ENETUNREACH, Errno::ETIMEDOUT, Timeout::Error, SocketError => ex + + raise ex if attempts >= max_attempts + + # The sleep time is an exponentially-increasing function of base_sleep_seconds. + # But, it never exceeds max_sleep_seconds. + sleep_seconds = [base_sleep_seconds * (2 ** (attempts - 1)), max_sleep_seconds].min + # Randomize to a random value in the range sleep_seconds/2 .. sleep_seconds + sleep_seconds = sleep_seconds * (0.5 * (1 + rand())) + # But never sleep less than base_sleep_seconds + sleep_seconds = [base_sleep_seconds, sleep_seconds].max + + warn "Failed to connect: #{ex}. Retrying in #{sleep_seconds.round(1)} seconds." + + sleep(sleep_seconds) + + retry + end + end +end diff --git a/lib/nsq/selectable_queue.rb b/lib/nsq/selectable_queue.rb new file mode 100644 index 0000000..a63ac18 --- /dev/null +++ b/lib/nsq/selectable_queue.rb @@ -0,0 +1,51 @@ +# A queue that you can pass to IO.select. +# +# NOT THREAD SAFE: Only one thread should write; only one thread should read. +# +# Purpose: +# Allow easy integration of data-producing threads into event loops. The +# queue will be readable from select's perspective as long as there are +# objects in the queue. +# +# Implementation: +# The queue maintains a pipe. The pipe contains a number of bytes equal to +# the queue size. +# +# Example use: +# queue = SelectableQueue.new +# readable, _, _ = IO.select([queue, $stdin]) +# print "got #{queue.pop}" if readable.contain?(queue) +# +class SelectableQueue + def initialize(size = 0) + if size == 0 + @queue = Queue.new + else + @queue = SizedQueue.new(size) + end + @read_io, @write_io = IO.pipe + end + + def empty? + @queue.empty? + end + + def push(o) + @queue.push(o) + # It doesn't matter what we write into the pipe, as long as it's one byte. + # It's not blocking until full, and has a default limit of 65536 (64KB) + @write_io << '.' + self + end + + def pop(nonblock=false) + o = @queue.pop(nonblock) + @read_io.read(1) + o + end + + def to_io + @read_io + end +end + diff --git a/spec/lib/nsq/nsqds_producer_spec.rb b/spec/lib/nsq/nsqds_producer_spec.rb new file mode 100644 index 0000000..24be5d9 --- /dev/null +++ b/spec/lib/nsq/nsqds_producer_spec.rb @@ -0,0 +1,142 @@ +require_relative '../../spec_helper' +require 'json' + +describe Nsq::Producer do + def message_count(nsqd, topic = @producer.topic) + parsed_body = JSON.parse(nsqd.stats.body) + topics_info = (parsed_body['data'] || parsed_body)['topics'] + topic_info = topics_info.select{|t| t['topic_name'] == topic }.first + if topic_info + topic_info['message_count'] + else + 0 + end + end + + context 'connecting directly to a single nsqd' do + + def new_consumer(topic = TOPIC) + Nsq::Consumer.new( + topic: topic, + channel: CHANNEL, + nsqd: "#{@nsqd.host}:#{@nsqd.tcp_port}", + max_in_flight: 1 + ) + end + + before do + @cluster = NsqCluster.new(nsqd_count: 2) + end + + after do + @cluster.destroy + end + + describe '::new' do + it 'should throw an exception if one of the nsqds is down' do + @cluster.nsqd.first.stop + + expect{ + new_nsqds_producer(@cluster.nsqd) + }.to raise_error(Errno::ECONNREFUSED) + end + + it 'should throw an exception if the strategy is unkown' do + expect{ + new_nsqds_producer(@cluster.nsqd, strategy: :none) + }.to raise_error(ArgumentError, "strategy should be one of failover, round_robin") + end + end + + context 'failover strategy' do + before do + @producer = new_nsqds_producer(@cluster.nsqd, synchronous: true, retry_attempts: 1, ok_timeout: 1) + end + after do + @producer.terminate if @producer + end + + describe '#write' do + it 'should send a message to the first nsqd' do + @producer.write 'first' + wait_for{message_count(@cluster.nsqd.first)==1} + expect(message_count(@cluster.nsqd.first)).to eq(1) + end + + it 'should send a message to the second nsqd if the first is down' do + @cluster.nsqd[0].stop + @producer.write 'first' + wait_for{message_count(@cluster.nsqd[1])==1} + expect(message_count(@cluster.nsqd[1])).to eq(1) + end + + it 'should send a message to the first nsqd again if the second is down' do + @cluster.nsqd[0].stop + + @producer.write 'first' + wait_for{message_count(@cluster.nsqd[1])==1} + expect(message_count(@cluster.nsqd[1])).to eq(1) + @producer.write 'second' + wait_for{message_count(@cluster.nsqd[1])==2} + expect(message_count(@cluster.nsqd[1])).to eq(2) + + @cluster.nsqd[0].start + @cluster.nsqd[1].stop + + @producer.write 'third' + wait_for{message_count(@cluster.nsqd[0])==1} + expect(message_count(@cluster.nsqd[0])).to eq(1) + end + end + end + + context 'round robin strategy' do + before do + @producer = new_nsqds_producer(@cluster.nsqd, synchronous: true, retry_attempts: 1, ok_timeout: 1, strategy: Nsq::NsqdsProducer::STRATEGY_ROUNDROBIN) + end + after do + @producer.terminate if @producer + end + + describe '#write' do + it 'should distributes messages among nsqs' do + @producer.write 'first' + @producer.write 'second' + + wait_for{message_count(@cluster.nsqd.first)==1} + expect(message_count(@cluster.nsqd.first)).to eq(1) + + wait_for{message_count(@cluster.nsqd.last)==1} + expect(message_count(@cluster.nsqd.last)).to eq(1) + end + + it 'should send twice to the same if one node is down' do + @cluster.nsqd.first.stop + + @producer.write 'first' + @producer.write 'second' + + wait_for{message_count(@cluster.nsqd.last)==2} + expect(message_count(@cluster.nsqd.last)).to eq(2) + end + + it 'should start the round robin back once the node is back up' do + @cluster.nsqd.first.stop + @producer.write 'first' + @producer.write 'second' + + @cluster.nsqd.first.start + sleep 0.5 + + @producer.write 'three' + @producer.write 'four' + wait_for{message_count(@cluster.nsqd.last)==3} + expect(message_count(@cluster.nsqd.last)).to eq(3) + wait_for{message_count(@cluster.nsqd.first)==1} + expect(message_count(@cluster.nsqd.first)).to eq(1) + + end + end + end + end +end diff --git a/spec/lib/nsq/producer_spec.rb b/spec/lib/nsq/producer_spec.rb index 8924f4c..6ac1102 100644 --- a/spec/lib/nsq/producer_spec.rb +++ b/spec/lib/nsq/producer_spec.rb @@ -220,66 +220,5 @@ def new_consumer(topic = TOPIC) expect(message_count('hello')).to eq(10) end end - end - - context 'connecting via nsqlookupd' do - - before do - @cluster = NsqCluster.new(nsqd_count: 2, nsqlookupd_count: 1) - @producer = new_lookupd_producer - - # wait for it to connect to all nsqds - wait_for{ @producer.connections.length == @cluster.nsqd.length } - end - - after do - @producer.terminate if @producer - @cluster.destroy - end - - - describe '#connections' do - it 'should be connected to all nsqds' do - expect(@producer.connections.length).to eq(@cluster.nsqd.length) - end - - it 'should drop a connection when an nsqd goes offline' do - @cluster.nsqd.first.stop - wait_for{ @producer.connections.length == @cluster.nsqd.length - 1 } - expect(@producer.connections.length).to eq(@cluster.nsqd.length - 1) - end - end - - - describe '#connected?' do - it 'should return true if it\'s connected to at least one nsqd' do - expect(@producer.connected?).to eq(true) - end - - it 'should return false when it\'s not connected to any nsqds' do - @cluster.nsqd.each{|nsqd| nsqd.stop} - wait_for{ !@producer.connected? } - expect(@producer.connected?).to eq(false) - end - end - - - describe '#write' do - it 'writes to a random connection' do - expect_any_instance_of(Nsq::Connection).to receive(:pub) - @producer.write('howdy!') - end - - it 'raises an error if there are no connections to write to' do - @cluster.nsqd.each{|nsqd| nsqd.stop} - wait_for{ @producer.connections.length == 0 } - expect { - @producer.write('die') - }.to raise_error(RuntimeError, /No connections available/) - end - end - - end - end diff --git a/spec/lib/nsq/producer_synchronous_spec.rb b/spec/lib/nsq/producer_synchronous_spec.rb new file mode 100644 index 0000000..e06667a --- /dev/null +++ b/spec/lib/nsq/producer_synchronous_spec.rb @@ -0,0 +1,49 @@ +require_relative '../../spec_helper' + +describe Nsq::Producer do + context 'with a synchronous producer without retry' do + before do + @cluster = NsqCluster.new(nsqd_count: 1) + @nsqd = @cluster.nsqd.first + @producer = new_producer(@nsqd, synchronous: true, retry_attempts: 0) + end + + after do + @producer.terminate if @producer + @cluster.destroy + end + + describe '#write' do + it 'should raise an error when nsqd is down' do + @nsqd.stop + + expect{ + @producer.write('fail') + }.to raise_error(Nsq::UnexpectedFrameError) + end + end + end + + context 'with a synchronous producer with retries (default behavior)' do + before do + @cluster = NsqCluster.new(nsqd_count: 1) + @nsqd = @cluster.nsqd.first + @producer = new_producer(@nsqd, synchronous: true) + end + + after do + @producer.terminate if @producer + @cluster.destroy + end + + describe '#write' do + it 'shouldn\'t raise an error when nsqd is down' do + @nsqd.stop + + Thread.new { sleep 1 ; @nsqd.start } + + expect{ @producer.write('fail') }.not_to raise_error + end + end + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 50a17ae..4c76b88 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -71,6 +71,14 @@ def new_producer(nsqd, opts = {}) }.merge(opts)) end +def new_nsqds_producer(nsqds, opts = {}) + Nsq::NsqdsProducer.new({ + topic: TOPIC, + nsqds: nsqds.map{ |n| "#{n.host}:#{n.tcp_port}" }, + discovery_interval: 1 + }.merge(opts)) +end + def new_lookupd_producer(opts = {}) lookupd = @cluster.nsqlookupd.map{|l| "#{l.host}:#{l.http_port}"} Nsq::Producer.new({