Skip to content

Commit

Permalink
fix tls connection
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubwro committed Oct 2, 2023
1 parent d08e5d8 commit e596fd2
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 103 deletions.
34 changes: 34 additions & 0 deletions src/connection/connect.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@

#TODO: restore link #NATS.Connect
"""
connect([host, port; kw...])
Initialize and return `Connection`.
See `Connect protocol message`.
"""
function connect(host::String = NATS_HOST, port::Int = NATS_PORT; default = true, cert_file = nothing, key_file = nothing, kw...)
if default && !isnothing(state.default_connection)
return default_connection()
end
nc = Connection()
connect_msg = from_kwargs(Connect, DEFAULT_CONNECT_ARGS, kw)
reconnect_task = Threads.Task(() -> while true reconnect(nc, host, port, connect_msg) end)
# Setting sticky flag to false makes processing 10x slower when running with multiple threads.
# reconnect_task.sticky = false
Base.Threads._spawn_set_thrpool(reconnect_task, :default)
Base.Threads.schedule(reconnect_task)
errormonitor(reconnect_task)

# TODO: refactor
# 1. init socket
# 2. run parser
# 3. reconnect

# connection_info = fetch(nc.info)
# @info "Info: $connection_info."
if default
lock(state.lock) do; state.default_connection = nc end
else
lock(state.lock) do; push!(state.connections, nc) end
end
nc
end
37 changes: 2 additions & 35 deletions src/connection/connection.jl
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ include("tls.jl")
include("send.jl")
include("handlers.jl")
include("reconnect.jl")
include("connect.jl")

const state = State(nothing, Connection[], Dict{String, Function}(), Function[default_fallback_handler], ReentrantLock(), Stats(0, 0))

Expand Down Expand Up @@ -85,40 +86,6 @@ outbox(nc::Connection) = @lock state.lock nc.outbox
# Enqueue protocol message in `outbox` to be written to socket.
# """

#TODO: restore link #NATS.Connect
"""
connect([host, port; kw...])
Initialize and return `Connection`.
See `Connect protocol message`.
"""
function connect(host::String = NATS_HOST, port::Int = NATS_PORT; default = true, kw...)
if default && !isnothing(state.default_connection)
return default_connection()
end
nc = Connection()
connect_msg = from_kwargs(Connect, DEFAULT_CONNECT_ARGS, kw)
reconnect_task = Threads.Task(() -> while true reconnect(nc, host, port, connect_msg) end)
# Setting sticky flag to false makes processing 10x slower when running with multiple threads.
# reconnect_task.sticky = false
Base.Threads._spawn_set_thrpool(reconnect_task, :default)
Base.Threads.schedule(reconnect_task)
errormonitor(reconnect_task)

# TODO: refactor
# 1. init socket
# 2. run parser
# 3. reconnect

# connection_info = fetch(nc.info)
# @info "Info: $connection_info."
if default
lock(state.lock) do; state.default_connection = nc end
else
lock(state.lock) do; push!(state.connections, nc) end
end
nc
end

function ping(nc)
send(nc, Ping())
end
Expand All @@ -137,7 +104,7 @@ function _cleanup_sub(nc::Connection, sid::String)
end

# """
# Cleanup subscription data when no more messages are expected.
# Update state on message received.
# """
function _cleanup_unsub_msg(nc::Connection, sid::String)
lock(state.lock) do
Expand Down
11 changes: 7 additions & 4 deletions src/connection/tls.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ end
function get_tls_input_buffered(ssl::SSLStream)
io = Base.BufferStream()
t = Threads.@spawn :default begin # TODO: make it sticky.
while !eof(ssl)
av = readavailable(ssl)
write(io, av)
try
while !eof(ssl)
av = readavailable(ssl)
write(io, av)
end
finally
close(io)
end
@info "TLS connection EOF."
end
errormonitor(t)
BufferedInputStream(io, 1)
Expand Down
64 changes: 0 additions & 64 deletions test/mock.jl

This file was deleted.

0 comments on commit e596fd2

Please sign in to comment.