From 12f915a9b1b661f6d6fe384b0f9d8c6d031bbfbd Mon Sep 17 00:00:00 2001 From: Lewis Marshall Date: Fri, 21 Nov 2014 17:01:02 +0000 Subject: [PATCH] Synchronize access to @client_threads in LogStash::Outputs::Tcp Signed-off-by: Lewis Marshall --- lib/logstash/outputs/tcp.rb | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index a9061f8..1c329f0 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -75,23 +75,32 @@ def register @logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}") @server_socket = TCPServer.new(@host, @port) @client_threads = [] + @client_threads_lock = Mutex.new @accept_thread = Thread.new(@server_socket) do |server_socket| loop do - client_thread = Thread.start(server_socket.accept) do |client_socket| - client = Client.new(client_socket, @logger) - Thread.current[:client] = client - client.run + @client_threads_lock.synchronize do + @client_threads << Thread.start(server_socket.accept) do |client_socket| + begin + client = Client.new(client_socket, @logger) + Thread.current[:client] = client + client.run + ensure + @client_threads_lock.synchronize do + @client_threads.delete(Thread.current) + end + end + end end - @client_threads << client_thread end end @codec.on_event do |payload| - @client_threads.each do |client_thread| - client_thread[:client].write(payload) + # dup @client_threads to avoid holding the lock while writing to clients + threads = @client_threads_lock.synchronize { @client_threads.dup } + threads.each do |thread| + thread[:client].write(payload) end - @client_threads.reject! {|t| !t.alive? } end else client_socket = nil