Skip to content

Commit

Permalink
producer: Automatically reconnect if we detect a fork
Browse files Browse the repository at this point in the history
  • Loading branch information
chen-anders committed Jun 2, 2024
1 parent 4bda9e1 commit 7563ee4
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 5 deletions.
1 change: 1 addition & 0 deletions lib/nsq/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def initialize(opts = {})


def connected?
return @write_loop_thread.alive? && @connected if @write_loop_thread # for producers
@connected
end

Expand Down
46 changes: 41 additions & 5 deletions lib/nsq/producer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,24 @@ def initialize(opts = {})
@ssl_context = opts[:ssl_context]
@tls_options = opts[:tls_options]
@tls_v1 = opts[:tls_v1]
@mx = Mutex.new

nsqlookupds = []
@nsqlookupds = []
if opts[:nsqlookupd]
nsqlookupds = [opts[:nsqlookupd]].flatten
@nsqlookupds = [opts[:nsqlookupd]].flatten
discover_repeatedly(
nsqlookupds: nsqlookupds,
nsqlookupds: @nsqlookupds,
interval: @discovery_interval
)

elsif opts[:nsqd]
nsqds = [opts[:nsqd]].flatten
nsqds.each{|d| add_connection(d)}
@nsqds = [opts[:nsqd]].flatten
@nsqds.each{|d| add_connection(d)}

else
add_connection('127.0.0.1:4150')
end
update_fork_pid
end

def write(*raw_messages)
Expand Down Expand Up @@ -75,8 +77,35 @@ def deferred_write_to_topic(topic, delay, *raw_messages)
end
end

def reconnect_on_fork
if @nsqlookupds.any?
if @discovery_thread && !@discovery_thread.alive?
@mx.synchronize do
break if @discovery_thread.alive?
debug "Discovery thread is dead; terminating connections and restarting discovery loop"
terminate
discover_repeatedly(
nsqlookupds: @nsqlookupds,
interval: @discovery_interval
)
update_fork_pid
end
end
elsif forked?
@mx.synchronize do
break unless forked?
debug "Fork detected - recreating connections"
terminate
@nsqds.each{|d| add_connection(d)}
update_fork_pid
end
end
end

private
def connection_for_write
reconnect_on_fork

# Choose a random Connection that's currently connected
# Or, if there's nothing connected, just take any random one
connections_currently_connected = connections.select{|_,c| c.connected?}
Expand All @@ -90,5 +119,12 @@ def connection_for_write
connection
end

def forked?
Process.pid != @fork_pid
end

def update_fork_pid
@fork_pid = Process.pid
end
end
end

0 comments on commit 7563ee4

Please sign in to comment.