Skip to content

Commit

Permalink
better control on api call delays
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubwro committed Jan 28, 2024
1 parent 1797ba8 commit 9cb46b5
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 10 deletions.
4 changes: 2 additions & 2 deletions src/jetstream/api/call.jl
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@

const DEFAULT_API_CALL_DELAYS = ExponentialBackOff(n = 7, first_delay = 0.1, max_delay = 0.5)

function check_api_call_error(s, e)
e isa Union{NATS.NATSError, ApiError} && e.code == 503
end

const DEFAULT_API_CALL_DELAYS = ExponentialBackOff(n = typemax(Int64), first_delay = 0.2, max_delay = 0.5)

function jetstream_api_call(T, connection::NATS.Connection, subject, data = nothing; delays = DEFAULT_API_CALL_DELAYS)
call_retry = retry(NATS.request; delays, check = check_api_call_error)
call_retry(T, connection, subject, data)
Expand Down
8 changes: 4 additions & 4 deletions src/jetstream/consumer/ack.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@ $(SIGNATURES)
Confirms message delivery to server.
"""
function consumer_ack(connection::NATS.Connection, msg::NATS.Msg)
function consumer_ack(connection::NATS.Connection, msg::NATS.Msg; delays = DEFAULT_API_CALL_DELAYS)
isnothing(msg.reply_to) && error("No reply subject for msg $msg.")
!startswith(msg.reply_to, "\$JS.ACK") && @warn "`ack` sent for message that doesn't need acknowledgement."
jetstream_api_call(NATS.Msg, connection, msg.reply_to)
jetstream_api_call(NATS.Msg, connection, msg.reply_to; delays)
end

"""
$(SIGNATURES)
Mark message as undelivered, what avoid waiting for timeout before redelivery.
"""
function consumer_nak(connection::NATS.Connection, msg::NATS.Msg)
function consumer_nak(connection::NATS.Connection, msg::NATS.Msg; delays = DEFAULT_API_CALL_DELAYS)
isnothing(msg.reply_to) && error("No reply subject for msg $msg.")
!startswith(msg.reply_to, "\$JS.ACK") && @warn "`nak` sent for message that doesn't need acknowledgement."
jetstream_api_call(NATS.Msg, connection, msg.reply_to, "-NAK")
jetstream_api_call(NATS.Msg, connection, msg.reply_to, "-NAK"; delays)
end
6 changes: 4 additions & 2 deletions src/jetstream/jetchannel/jetchannel.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@

include("manage.jl")

const DEFAULT_JETCHANNEL_DELAYS = ExponentialBackOff(n = typemax(Int64), first_delay = 0.2, max_delay = 0.5)

struct JetChannel{T} <: AbstractChannel{T}
connection::NATS.Connection
name::String
Expand All @@ -25,15 +27,15 @@ end

function Base.take!(jetchannel::JetChannel{T}) where T
msg = consumer_next(jetchannel.connection, jetchannel.consumer)
ack = consumer_ack(jetchannel.connection, msg)
ack = consumer_ack(jetchannel.connection, msg; delays = DEFAULT_JETCHANNEL_DELAYS)
@assert ack isa NATS.Msg
convert(T, msg)
end


function Base.put!(jetchannel::JetChannel{T}, v::T) where T
subject = channel_subject(jetchannel.name)
ack = stream_publish(jetchannel.connection, subject, v)
ack = stream_publish(jetchannel.connection, subject, v; delays = DEFAULT_JETCHANNEL_DELAYS)
@assert ack.stream == jetchannel.stream.config.name
v
end
5 changes: 3 additions & 2 deletions src/jetstream/stream/publish.jl
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
const DEFAULT_STREAM_PUBLISH_DELAYS = ExponentialBackOff(n = 7, first_delay = 0.1, max_delay = 0.5)

"""
$(SIGNATURES)
Publish a message to stream.
"""
function stream_publish(connection::NATS.Connection, subject, data)
jetstream_api_call(PubAck, connection, subject, data)
function stream_publish(connection::NATS.Connection, subject, data; delays = DEFAULT_STREAM_PUBLISH_DELAYS)
jetstream_api_call(PubAck, connection, subject, data; delays)
end

0 comments on commit 9cb46b5

Please sign in to comment.