diff --git a/src/jetstream/api/call.jl b/src/jetstream/api/call.jl index 7e483ef7..a91a12c5 100644 --- a/src/jetstream/api/call.jl +++ b/src/jetstream/api/call.jl @@ -1,10 +1,10 @@ +const DEFAULT_API_CALL_DELAYS = ExponentialBackOff(n = 7, first_delay = 0.1, max_delay = 0.5) + function check_api_call_error(s, e) e isa Union{NATS.NATSError, ApiError} && e.code == 503 end -const DEFAULT_API_CALL_DELAYS = ExponentialBackOff(n = typemax(Int64), first_delay = 0.2, max_delay = 0.5) - function jetstream_api_call(T, connection::NATS.Connection, subject, data = nothing; delays = DEFAULT_API_CALL_DELAYS) call_retry = retry(NATS.request; delays, check = check_api_call_error) call_retry(T, connection, subject, data) diff --git a/src/jetstream/consumer/ack.jl b/src/jetstream/consumer/ack.jl index d12e22a4..2ca777ac 100644 --- a/src/jetstream/consumer/ack.jl +++ b/src/jetstream/consumer/ack.jl @@ -3,10 +3,10 @@ $(SIGNATURES) Confirms message delivery to server. """ -function consumer_ack(connection::NATS.Connection, msg::NATS.Msg) +function consumer_ack(connection::NATS.Connection, msg::NATS.Msg; delays = DEFAULT_API_CALL_DELAYS) isnothing(msg.reply_to) && error("No reply subject for msg $msg.") !startswith(msg.reply_to, "\$JS.ACK") && @warn "`ack` sent for message that doesn't need acknowledgement." - jetstream_api_call(NATS.Msg, connection, msg.reply_to) + jetstream_api_call(NATS.Msg, connection, msg.reply_to; delays) end """ @@ -14,8 +14,8 @@ $(SIGNATURES) Mark message as undelivered, what avoid waiting for timeout before redelivery. """ -function consumer_nak(connection::NATS.Connection, msg::NATS.Msg) +function consumer_nak(connection::NATS.Connection, msg::NATS.Msg; delays = DEFAULT_API_CALL_DELAYS) isnothing(msg.reply_to) && error("No reply subject for msg $msg.") !startswith(msg.reply_to, "\$JS.ACK") && @warn "`nak` sent for message that doesn't need acknowledgement." - jetstream_api_call(NATS.Msg, connection, msg.reply_to, "-NAK") + jetstream_api_call(NATS.Msg, connection, msg.reply_to, "-NAK"; delays) end diff --git a/src/jetstream/jetchannel/jetchannel.jl b/src/jetstream/jetchannel/jetchannel.jl index 6b91644e..eb7895ea 100644 --- a/src/jetstream/jetchannel/jetchannel.jl +++ b/src/jetstream/jetchannel/jetchannel.jl @@ -1,6 +1,8 @@ include("manage.jl") +const DEFAULT_JETCHANNEL_DELAYS = ExponentialBackOff(n = typemax(Int64), first_delay = 0.2, max_delay = 0.5) + struct JetChannel{T} <: AbstractChannel{T} connection::NATS.Connection name::String @@ -25,7 +27,7 @@ end function Base.take!(jetchannel::JetChannel{T}) where T msg = consumer_next(jetchannel.connection, jetchannel.consumer) - ack = consumer_ack(jetchannel.connection, msg) + ack = consumer_ack(jetchannel.connection, msg; delays = DEFAULT_JETCHANNEL_DELAYS) @assert ack isa NATS.Msg convert(T, msg) end @@ -33,7 +35,7 @@ end function Base.put!(jetchannel::JetChannel{T}, v::T) where T subject = channel_subject(jetchannel.name) - ack = stream_publish(jetchannel.connection, subject, v) + ack = stream_publish(jetchannel.connection, subject, v; delays = DEFAULT_JETCHANNEL_DELAYS) @assert ack.stream == jetchannel.stream.config.name v end \ No newline at end of file diff --git a/src/jetstream/stream/publish.jl b/src/jetstream/stream/publish.jl index 0f990c80..e07bd60d 100644 --- a/src/jetstream/stream/publish.jl +++ b/src/jetstream/stream/publish.jl @@ -1,9 +1,10 @@ +const DEFAULT_STREAM_PUBLISH_DELAYS = ExponentialBackOff(n = 7, first_delay = 0.1, max_delay = 0.5) """ $(SIGNATURES) Publish a message to stream. """ -function stream_publish(connection::NATS.Connection, subject, data) - jetstream_api_call(PubAck, connection, subject, data) +function stream_publish(connection::NATS.Connection, subject, data; delays = DEFAULT_STREAM_PUBLISH_DELAYS) + jetstream_api_call(PubAck, connection, subject, data; delays) end