Skip to content

Commit

Permalink
refactor reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubwro committed Oct 2, 2023
1 parent 345372a commit 161bf09
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 78 deletions.
79 changes: 2 additions & 77 deletions src/connection/connection.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ include("utils.jl")
include("tls.jl")
include("send.jl")
include("handlers.jl")
include("reconnect.jl")

const state = State(nothing, Connection[], Dict{String, Function}(), Function[default_fallback_handler], ReentrantLock(), Stats(0, 0))

Expand Down Expand Up @@ -44,82 +45,6 @@ outbox(nc::Connection) = @lock state.lock nc.outbox
# """
# Enqueue protocol message in `outbox` to be written to socket.
# """
function socket_connect(port::Integer)
Sockets.connect(port)
end

function reconnect(nc::Connection, host, port, con_msg)
sock = retry(socket_connect, delays=SOCKET_CONNECT_DELAYS)(port)

read_stream = sock
write_stream = sock

process(nc, next_protocol_message(read_stream))
info = fetch(nc.info)
send(nc, con_msg)

# @show fetch(nc.info)
if !isnothing(info.tls_required) && info.tls_required
(read_stream, write_stream) = upgrade_to_tls(sock)
@info "Socket upgraded"
end

lock(state.lock) do; nc.status = CONNECTED end

receiver_task = Threads.Task(() -> begin
while true
process(nc, next_protocol_message(read_stream))
end
end)
Base.Threads._spawn_set_thrpool(receiver_task, :default)
Base.Threads.schedule(receiver_task)
errormonitor(receiver_task)

sender_task = Threads.@spawn :default disable_sigint() do; sendloop(nc, write_stream) end
# TODO: better monitoring of sender with `bind`.
errormonitor(sender_task)
try
wait(receiver_task)
catch err
@error err
close(nc.outbox)
close(sock)
# TODO: distinguish recoverable and unrecoverable exceptions.
# throw(err)
end
try
wait(sender_task)
catch err
@debug "Sender task finished." err
end
sleep(3) # TODO: remove this sleep, use `retry` on `Sockets.connect`.
# if nc.status in [CLOSING, CLOSED, FAILURE]
# @info "Connection is closing."
# error("Connection closed.")
# end
@info "Disconnected. Trying to reconnect."
new_outbox = Channel{ProtocolMessage}(OUTBOX_SIZE)
# TODO: restore old subs.

migrated = []
for (sid, sub) in pairs(nc.subs)
push!(migrated, sub)
if haskey(nc.unsubs, sid)
push!(migrated, Unsub(sid, nc.unsubs[sid]))
end
end
@info "Migrating $(length(migrated)) subs to a new connection."
for msg in migrated
put!(new_outbox, msg)
end
for msg in collect(nc.outbox)
if msg isa Msg || msg isa HMsg || msg isa Pub || msg isa HPub || msg isa Unsub
put!(new_outbox, msg)
end
end
lock(state.lock) do; nc.status = RECONNECTING end
lock(state.lock) do; nc.outbox = new_outbox end
end

#TODO: restore link #NATS.Connect
"""
Expand All @@ -133,7 +58,7 @@ function connect(host::String = NATS_HOST, port::Int = NATS_PORT; default = true
end
nc = Connection()
connect_msg = from_kwargs(Connect, DEFAULT_CONNECT_ARGS, kw)
reconnect_task = Base.Threads.Task(() -> disable_sigint() do; while true reconnect(nc, host, port, connect_msg) end end)
reconnect_task = Threads.Task(() -> while true reconnect(nc, host, port, connect_msg) end)
# Setting sticky flag to false makes processing 10x slower when running with multiple threads.
# reconnect_task.sticky = false
Base.Threads._spawn_set_thrpool(reconnect_task, :default)
Expand Down
71 changes: 70 additions & 1 deletion src/connection/reconnect.jl
Original file line number Diff line number Diff line change
@@ -1 +1,70 @@
# Stuff for gracefuly handling reconnects, especially restoring subscriptions.
# Stuff for gracefuly handling reconnects, especially restoring subscriptions.

function reconnect(nc::Connection, host, port, con_msg)
sock = retry(Sockets.connect, delays=SOCKET_CONNECT_DELAYS)(port)

read_stream = sock
write_stream = sock

process(nc, next_protocol_message(read_stream))
info = fetch(nc.info)
send(nc, con_msg)

# @show fetch(nc.info)
if !isnothing(info.tls_required) && info.tls_required
(read_stream, write_stream) = upgrade_to_tls(sock)
@info "Socket upgraded"
end

lock(state.lock) do; nc.status = CONNECTED end

receiver_task = Threads.Task(() -> begin
while !eof(read_stream)
process(nc, next_protocol_message(read_stream))
end
end)
Base.Threads._spawn_set_thrpool(receiver_task, :default)
Base.Threads.schedule(receiver_task)
sender_task = Threads.@spawn :default disable_sigint() do; sendloop(nc, write_stream) end

c = Channel()
bind(c, receiver_task)
bind(c, sender_task)
try
wait(c)
catch err
if istaskfailed(receiver_task)
@error "Receiver task failed:" receiver_task.result
end
if istaskfailed(sender_task)
@error "Sender task failed:" sender_task.result
end

close(nc.outbox)
close(sock)
end
try wait(sender_task) catch end
try wait(receiver_task) catch end
@info "Disconnected. Trying to reconnect."
new_outbox = Channel{ProtocolMessage}(OUTBOX_SIZE)
# TODO: restore old subs.

migrated = []
for (sid, sub) in pairs(nc.subs)
push!(migrated, sub)
if haskey(nc.unsubs, sid)
push!(migrated, Unsub(sid, nc.unsubs[sid]))
end
end
@info "Migrating $(length(migrated)) subs to a new connection."
for msg in migrated
put!(new_outbox, msg)
end
for msg in collect(nc.outbox)
if msg isa Msg || msg isa HMsg || msg isa Pub || msg isa HPub || msg isa Unsub
put!(new_outbox, msg)
end
end
lock(state.lock) do; nc.status = RECONNECTING end
lock(state.lock) do; nc.outbox = new_outbox end
end

0 comments on commit 161bf09

Please sign in to comment.