diff --git a/src/connection/connect.jl b/src/connection/connect.jl new file mode 100644 index 00000000..bf94a5e6 --- /dev/null +++ b/src/connection/connect.jl @@ -0,0 +1,34 @@ + +#TODO: restore link #NATS.Connect +""" + connect([host, port; kw...]) +Initialize and return `Connection`. +See `Connect protocol message`. +""" +function connect(host::String = NATS_HOST, port::Int = NATS_PORT; default = true, cert_file = nothing, key_file = nothing, kw...) + if default && !isnothing(state.default_connection) + return default_connection() + end + nc = Connection() + connect_msg = from_kwargs(Connect, DEFAULT_CONNECT_ARGS, kw) + reconnect_task = Threads.Task(() -> while true reconnect(nc, host, port, connect_msg) end) + # Setting sticky flag to false makes processing 10x slower when running with multiple threads. + # reconnect_task.sticky = false + Base.Threads._spawn_set_thrpool(reconnect_task, :default) + Base.Threads.schedule(reconnect_task) + errormonitor(reconnect_task) + + # TODO: refactor + # 1. init socket + # 2. run parser + # 3. reconnect + + # connection_info = fetch(nc.info) + # @info "Info: $connection_info." + if default + lock(state.lock) do; state.default_connection = nc end + else + lock(state.lock) do; push!(state.connections, nc) end + end + nc +end diff --git a/src/connection/connection.jl b/src/connection/connection.jl index d75c0c34..ef81f987 100644 --- a/src/connection/connection.jl +++ b/src/connection/connection.jl @@ -44,6 +44,7 @@ include("tls.jl") include("send.jl") include("handlers.jl") include("reconnect.jl") +include("connect.jl") const state = State(nothing, Connection[], Dict{String, Function}(), Function[default_fallback_handler], ReentrantLock(), Stats(0, 0)) @@ -85,40 +86,6 @@ outbox(nc::Connection) = @lock state.lock nc.outbox # Enqueue protocol message in `outbox` to be written to socket. # """ -#TODO: restore link #NATS.Connect -""" - connect([host, port; kw...]) -Initialize and return `Connection`. -See `Connect protocol message`. -""" -function connect(host::String = NATS_HOST, port::Int = NATS_PORT; default = true, kw...) - if default && !isnothing(state.default_connection) - return default_connection() - end - nc = Connection() - connect_msg = from_kwargs(Connect, DEFAULT_CONNECT_ARGS, kw) - reconnect_task = Threads.Task(() -> while true reconnect(nc, host, port, connect_msg) end) - # Setting sticky flag to false makes processing 10x slower when running with multiple threads. - # reconnect_task.sticky = false - Base.Threads._spawn_set_thrpool(reconnect_task, :default) - Base.Threads.schedule(reconnect_task) - errormonitor(reconnect_task) - - # TODO: refactor - # 1. init socket - # 2. run parser - # 3. reconnect - - # connection_info = fetch(nc.info) - # @info "Info: $connection_info." - if default - lock(state.lock) do; state.default_connection = nc end - else - lock(state.lock) do; push!(state.connections, nc) end - end - nc -end - function ping(nc) send(nc, Ping()) end @@ -137,7 +104,7 @@ function _cleanup_sub(nc::Connection, sid::String) end # """ -# Cleanup subscription data when no more messages are expected. +# Update state on message received. # """ function _cleanup_unsub_msg(nc::Connection, sid::String) lock(state.lock) do diff --git a/src/connection/tls.jl b/src/connection/tls.jl index b7b38235..f9e0089c 100644 --- a/src/connection/tls.jl +++ b/src/connection/tls.jl @@ -10,11 +10,14 @@ end function get_tls_input_buffered(ssl::SSLStream) io = Base.BufferStream() t = Threads.@spawn :default begin # TODO: make it sticky. - while !eof(ssl) - av = readavailable(ssl) - write(io, av) + try + while !eof(ssl) + av = readavailable(ssl) + write(io, av) + end + finally + close(io) end - @info "TLS connection EOF." end errormonitor(t) BufferedInputStream(io, 1) diff --git a/test/mock.jl b/test/mock.jl deleted file mode 100644 index 534f3f11..00000000 --- a/test/mock.jl +++ /dev/null @@ -1,64 +0,0 @@ - -import Base: read, write, close, eof, readuntil - -# https://github.com/JuliaLang/julia/issues/24526 - -struct TCPSocketMock <: IO - input::IO - output::IO - traffic_generator::Function - function TCPSocketMock(traffic_generator) - @warn "Using mocked connection." - new(Base.BufferStream(), Base.BufferStream(), traffic_generator) - end -end - -read(mock::TCPSocketMock, n::Int) = read(mock.output, n) -write(mock::TCPSocketMock, s::String) = write(mock.input, s) -close(mock::TCPSocketMock) = nothing -eof(mock::TCPSocketMock) = false -readuntil(mock::TCPSocketMock, s::String) = readuntil(mock.output, s) - -function init_connection(io::IO) - # write(io, info) - op = readline(io.input) - @assert startswith(op, "CONNECT") - -end - -function wait_on_sub_and_publish(io::IO) - init_connection(io) - op = next_protocol_message(io.input) - @assert op isa Sub - - for i in 1:10 - show(io, MIME_PROTOCOL(), Pub(op.reply_to, nothing, 0, nothing)) - end -end - -function reply_on_request(io::IO) - init_connection(io) - - op = NATS.next_protocol_message(io) - while true - @assert op isa Msg - show(io, MIME_PROTOCOL(), NATS.Pub(op.reply_to, nothing, 0, nothing)) - end -end - -using Pretend - -@testset "Request reply." begin - Pretend.activate() - - mock = TCPSocketMock(identity) - - t = @async reply_on_request(mock) - errormonitor(t) - - Pretend.apply(NATS.mockable_socket_connect => (port::Integer) -> mock) do - nc = NATS.connect() - sleep(1) - nc - end -end