Skip to content

Commit

Permalink
fix sync subs
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubwro committed Jan 21, 2024
1 parent 95c5329 commit 102137c
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 6 deletions.
1 change: 1 addition & 0 deletions src/NATS.jl
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ using URIs

import Base: show, convert

export NATSError
export connect, reconnect, ping, drain
export payload, header, headers
export publish, subscribe, unsubscribe
Expand Down
12 changes: 9 additions & 3 deletions src/connection/state.jl
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,15 @@ end
function _delete_sub_data(nc::Connection, sid::String)
@lock nc.lock begin
sub_data = get(nc.sub_data, sid, nothing)
!isnothing(sub_data) && close(sub_data.channel)
delete!(nc.sub_data, sid)
delete!(nc.unsubs, sid)
if sub_data.is_async == true
!isnothing(sub_data) && close(sub_data.channel)
delete!(nc.sub_data, sid)
delete!(nc.unsubs, sid)
else
# `next` rely on lookup of sub data, in this case let sub data stay and do cleanup
# when `next` gets the last message of a closed channel.
!isnothing(sub_data) && close(sub_data.channel)
end
end
end

Expand Down
1 change: 1 addition & 0 deletions src/pubsub/drain.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ function drain(connection::Connection, sid::String; timer = Timer(connection.dra
while !is_every_message_handled(sub_stats)
if !isopen(timer)
@error "Timeout for drain exceeded, not all msgs might be processed."
break
end
sleep(connection.drain_poll)
end
Expand Down
17 changes: 16 additions & 1 deletion src/pubsub/subscribe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,15 @@ function next(connection, sub; no_wait = false, no_throw = false)
end
sub_data.is_async && error("`next` is available only for synchronous subscriptions")
ch = sub_data.channel
no_wait && Base.n_avail(ch) == 0 && return nothing
if no_wait && Base.n_avail(ch) == 0
if !isopen(ch)
@lock connection.lock begin
delete!(connection.sub_data, sub.sid)
delete!(connection.unsubs, sub.sid)
end
end
return nothing
end
msg =
try
@lock sub_data.lock begin
Expand All @@ -91,6 +99,13 @@ function next(connection, sub; no_wait = false, no_throw = false)
throw(NATSError(499, "Client unsubscribed."))
end
rethrow()
finally
if !isopen(ch) && Base.n_avail(ch) == 0
@lock connection.lock begin
delete!(connection.sub_data, sub.sid)
delete!(connection.unsubs, sub.sid)
end
end
end
if !no_throw
status = statuscode(msg)
Expand Down
2 changes: 1 addition & 1 deletion src/reqreply/request.jl
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ function request(
errormonitor(timeout_task)
result = Msg[]
for _ in 1:nreplies
msg = next(connection, sub; no_throw = true)
msg = next(connection, sub)
isnothing(msg) && break
push!(result, msg)
end
Expand Down
2 changes: 1 addition & 1 deletion test/reqreply.jl
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ end
NATS.status()

@testset "No responders." begin
@test_throws ErrorException request(nc, "SOME.NULL")
@test_throws NATSError request(nc, "SOME.NULL")
end

NATS.status()
Expand Down

0 comments on commit 102137c

Please sign in to comment.