Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubwro committed Feb 3, 2024
1 parent d2711fe commit 15ffcf2
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/jetstream/JetStream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export stream_message_get, stream_message_delete

export ConsumerConfiguration
export consumer_create, consumer_update, consumer_delete
export consumer_next, consumer_ack, consumer_nak
export consumer_next, consumer_ack

export keyvalue_stream_create, keyvalue_stream_purge, keyvalue_stream_delete
export keyvalue_get, keyvalue_put, keyvalue_delete, keyvalue_watch
Expand Down
19 changes: 7 additions & 12 deletions src/jetstream/consumer/ack.jl
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@

# https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#acknowledgement-models

const CONSUMER_ACK_OPTIONS = [ "+ACK", "-NAK", "+WPI", "+NXT", "+TERM" ]

"""
$(SIGNATURES)
Confirms message delivery to server.
"""
function consumer_ack(connection::NATS.Connection, msg::NATS.Msg; delays = DEFAULT_API_CALL_DELAYS)
function consumer_ack(connection::NATS.Connection, msg::NATS.Msg, ack::String = "+ACK"; delays = DEFAULT_API_CALL_DELAYS)
ack in CONSUMER_ACK_OPTIONS || error("Unknown ack type \"$ack\", allowed values: $(join(CONSUMER_ACK_OPTIONS, ", "))")
isnothing(msg.reply_to) && error("No reply subject for msg $msg.")
!startswith(msg.reply_to, "\$JS.ACK") && @warn "`ack` sent for message that doesn't need acknowledgement."
jetstream_api_call(NATS.Msg, connection, msg.reply_to; delays)
end

"""
$(SIGNATURES)
Mark message as undelivered, what avoid waiting for timeout before redelivery.
"""
function consumer_nak(connection::NATS.Connection, msg::NATS.Msg; delays = DEFAULT_API_CALL_DELAYS)
isnothing(msg.reply_to) && error("No reply subject for msg $msg.")
!startswith(msg.reply_to, "\$JS.ACK") && @warn "`nak` sent for message that doesn't need acknowledgement."
jetstream_api_call(NATS.Msg, connection, msg.reply_to, "-NAK"; delays)
end
8 changes: 2 additions & 6 deletions src/jetstream/stream/subscribe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,8 @@ Subscribe to a stream.
"""
function stream_subscribe(f, connection::NATS.Connection, subject::String)
subject_streams = stream_infos(connection, subject)
if isempty(subject_streams)
error("No stream found for subject `$subject`")
end
if length(subject_streams) > 1
error("Multiple streams found")
end
isempty(subject_streams) && error("No stream found for subject \"$subject\"")
length(subject_streams) > 1 && error("Multiple streams found for subject \"$subject\"")
stream = only(subject_streams)
name = randstring(20)
deliver_subject = randstring(8)
Expand Down
8 changes: 7 additions & 1 deletion test/jetstream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ end
stream_unsubscribe(connection, stream_sub)
@test length(results) == n
stream_delete(connection, stream_info)

@test_throws "No stream found for subject" stream_subscribe(connection, "NOT_EXISTS.foo") do msg
@info msg
end
end

@testset "Stream message access" begin
Expand Down Expand Up @@ -156,11 +160,13 @@ end
filter_subjects=["$subject_prefix.*"],
ack_policy = :explicit,
name ="c1",
durable_name = "c1" #TODO: make it not durable
durable_name = "c1", #TODO: make it not durable
ack_wait = 5 * 10^9
)
consumer = JetStream.consumer_create(connection, consumer_config, stream_info)
for i in 1:3
msg = JetStream.consumer_next(connection, consumer, no_wait = true)
consumer_ack(connection, msg)
@test msg isa NATS.Msg
end

Expand Down

0 comments on commit 15ffcf2

Please sign in to comment.