Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor plugin for the new shared concurrency model #20

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 36 additions & 33 deletions lib/logstash/outputs/tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
class LogStash::Outputs::Tcp < LogStash::Outputs::Base

config_name "tcp"
concurrency :shared

default :codec, "json"

Expand Down Expand Up @@ -115,7 +116,7 @@ def register
end # @ssl_enable

if server?
workers_not_supported
@server_mutex = Mutex.new

@logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}")
begin
Expand Down Expand Up @@ -144,35 +145,6 @@ def register
end
end
end

@codec.on_event do |event, payload|
@client_threads.each do |client_thread|
client_thread[:client].write(payload)
end
@client_threads.reject! {|t| !t.alive? }
end
else
client_socket = nil
@codec.on_event do |event, payload|
begin
client_socket = connect unless client_socket
r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil)
# don't expect any reads, but a readable socket might
# mean the remote end closed, so read it and throw it away.
# we'll get an EOFError if it happens.
client_socket.sysread(16384) if r.any?

# Now send the payload
client_socket.syswrite(payload) if w.any?
rescue => e
@logger.warn("tcp output exception", :host => @host, :port => @port,
:exception => e, :backtrace => e.backtrace)
client_socket.close rescue nil
client_socket = nil
sleep @reconnect_interval
retry
end
end
end
end # def register

Expand Down Expand Up @@ -204,7 +176,38 @@ def server?
end # def server?

public
def receive(event)
@codec.encode(event)
end # def receive
def multi_receive_encoded(encoded)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been naming this variable events_and_encoded. I'm not saying it should be named the exact same thing here, but encoded to me sounds like a bytes/strings only.

if server?
@server_mutex.synchronize do
@client_threads.each do |client_thread|
encoded.each do |event,data|
client_thread[:client].write(data)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize this was in the original, but maybe this would be a good time to rename :client to something more unique, like :logstash_tcp_output_client. These are global variables, so more namespacing = good.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It occurs to me that if you start up two TCP servers they would step on each other's use of client_thread[:client] no? Shouldn't the variable name include self.id to ensure uniqueness?

end
end
@client_threads.reject! {|t| !t.alive? }

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this? Why would some client threads be alive?

I know this is original code but it's confusing to me. I don't understand it.

end
else
client_socket = nil
begin
client_socket = connect unless client_socket
Copy link

@andrewvc andrewvc Aug 29, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you already planned this or not, but just copying the code from the original here is problematic. This will always reconnect on each batch!

We should have a client socket per TCP output per worker thread. Code like this might be good.

client_socket = Thread.current["logstash_tcp_out_#{self.id}_client_socket"] ||= connect

r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil)
# don't expect any reads, but a readable socket might
# mean the remote end closed, so read it and throw it away.
# we'll get an EOFError if it happens.
client_socket.sysread(16384) if r.any?

# Now send the payload
encoded.each do |event,data|
client_socket.syswrite(data) if w.any?
end
rescue => e
@logger.warn("tcp output exception", :host => @host, :port => @port,
:exception => e, :backtrace => e.backtrace)
client_socket.close rescue nil
client_socket = nil
sleep @reconnect_interval
retry
end
end
end
end # class LogStash::Outputs::Tcp