Skip to content

Commit

Permalink
restructure
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubwro committed Oct 2, 2023
1 parent 32b3aeb commit 3029f0c
Show file tree
Hide file tree
Showing 12 changed files with 209 additions and 194 deletions.
4 changes: 2 additions & 2 deletions src/NATS.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ include("init.jl")
include("consts.jl")
include("protocol/protocol.jl")
include("connection/connection.jl")
include("pubsub.jl")
include("reqreply.jl")
include("pubsub/pubsub.jl")
include("reqreply/reqreply.jl")

export connect, ping, publish, subscribe, unsubscribe, payload, request, reply, header, headers, drain, isdrained

Expand Down
152 changes: 0 additions & 152 deletions src/pubsub.jl

This file was deleted.

34 changes: 34 additions & 0 deletions src/pubsub/publish.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""
$(SIGNATURES)
Publish message to a subject.
"""
function publish(
subject::String;
connection::Connection = default_connection(),
reply_to::Union{String, Nothing} = nothing,
payload = nothing,
headers::Union{Nothing, Headers} = nothing
)
publish(subject, (payload, headers); connection, reply_to)
end

function publish(
subject::String,
data;
connection::Connection = default_connection(),
reply_to::Union{String, Nothing} = nothing
)
payload_bytes = repr(MIME_PAYLOAD(), data)
payload = isempty(payload_bytes) ? nothing : String(payload_bytes)
headers_bytes = repr(MIME_HEADERS(), data)
headers = isempty(headers_bytes) ? nothing : String(headers_bytes)

if isnothing(headers)
send(connection, Pub(subject, reply_to, sizeof(payload), payload))
else
headers_size = sizeof(headers)
total_size = headers_size + sizeof(payload)
send(connection, HPub(subject, reply_to, headers_size, total_size, headers, payload))
end
end
3 changes: 3 additions & 0 deletions src/pubsub/pubsub.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
include("publish.jl")
include("subscribe.jl")
include("unsubscribe.jl")
30 changes: 30 additions & 0 deletions src/pubsub/sub_async.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@

function _start_async_handler(f::Function, subject::String)
error_ch = Channel(100000)

ch = Channel(10000000, spawn = true) do ch # TODO: move to const
try
while true
msg = take!(ch)
Threads.@spawn :default try
f(msg)
catch err
put!(error_ch, err)
end
end
catch
close(error_ch)
end
end
Threads.@spawn :default begin
while true
sleep(5)
avail = Base.n_avail(error_ch)
errors = [ take!(error_ch) for _ in 1:avail ]
if !isempty(errors)
@error "$(length(errors)) handler errors on \"$subject\" in last 5 s. Last one:" last(errors)
end
end
end
ch
end
31 changes: 31 additions & 0 deletions src/pubsub/sub_sync.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
function _start_handler(f::Function, subject::String)
ch = Channel(10000000) # TODO: move to const
Threads.@spawn begin
last_error_time = time()
errors_since_last_log = 0
last_error = nothing
while true
try
msg = take!(ch)
f(msg)
catch err
if err isa InvalidStateException
break
end
last_error = err
errors_since_last_log = errors_since_last_log + 1
now = time()
time_diff = now - last_error_time
if last_error_time < now - 5.0
last_error_time = now
@error "$errors_since_last_log handler errors on \"$subject\" in last $(round(time_diff, digits = 2)) s. Last one:" err
errors_since_last_log = 0
end
end
end
if errors_since_last_log > 0
@error "$errors_since_last_log handler errors on \"$subject\"in last $(round(time() - last_error_time, digits = 2)) s. Last one:" last_error
end
end
ch
end
31 changes: 31 additions & 0 deletions src/pubsub/subscribe.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
include("sub_sync.jl")
include("sub_async.jl")

"""
$(SIGNATURES)
Subscribe to a subject.
"""
function subscribe(
f,
subject::String;
connection::Connection = default_connection(),
queue_group::Union{String, Nothing} = nothing,
async_handlers = false
)
arg_t = argtype(f)
find_msg_conversion_or_throw(arg_t)
sid = @lock NATS.state.lock randstring(connection.rng, 20)
sub = Sub(subject, queue_group, sid)
c = if async_handlers
_start_async_handler(_fast_call(f, arg_t), subject)
else
_start_handler(_fast_call(f, arg_t), subject)
end
@lock NATS.state.lock begin
state.handlers[sid] = c
connection.subs[sid] = sub
end
send(connection, sub)
sub
end
26 changes: 26 additions & 0 deletions src/pubsub/unsubscribe.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
function unsubscribe(
sid::String;
connection::Connection,
max_msgs::Union{Int, Nothing} = nothing
)
# TODO: do not send unsub if sub alredy removed by Msg handler.
usnub = Unsub(sid, max_msgs)
send(connection, usnub)
if isnothing(max_msgs) || max_msgs == 0
_cleanup_sub(connection, sid)
end
usnub
end

"""
$(SIGNATURES)
Unsubscrible from a subject.
"""
function unsubscribe(
sub::Sub;
connection::Connection = default_connection(),
max_msgs::Union{Int, Nothing} = nothing
)
unsubscribe(sub.sid; connection, max_msgs)
end
37 changes: 37 additions & 0 deletions src/reqreply/reply.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@


"""
Reply for messages for a subject. Works like `subscribe` with automatic `publish` to the subject from `reply_to` field.
$(SIGNATURES)
# Examples
```julia-repl
julia> sub = reply("FOO.REQUESTS") do msg
"This is a reply payload."
end
NATS.Sub("FOO.REQUESTS", nothing, "jdnMEcJN")
julia> sub = reply("FOO.REQUESTS") do msg
"This is a reply payload.", ["example_header" => "This is a header value"]
end
NATS.Sub("FOO.REQUESTS", nothing, "jdnMEcJN")
julia> unsubscribe(sub)
```
"""
function reply(
f,
subject::String;
connection::Connection = default_connection(),
queue_group::Union{Nothing, String} = nothing,
async_handlers = false
)
T = argtype(f)
find_msg_conversion_or_throw(T)
fast_f = _fast_call(f, T)
subscribe(subject; connection, queue_group, async_handlers) do msg
data = fast_f(msg)
publish(msg.reply_to, data; connection)
end
end
2 changes: 2 additions & 0 deletions src/reqreply/reqreply.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
include("request.jl")
include("reply.jl")
Loading

0 comments on commit 3029f0c

Please sign in to comment.