Skip to content

Commit

Permalink
jetstream docs
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubwro committed Jan 27, 2024
1 parent ba630e6 commit d3ad528
Show file tree
Hide file tree
Showing 15 changed files with 151 additions and 11 deletions.
4 changes: 4 additions & 0 deletions docs/make.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Documenter
using NATS
using NATS.JetStream

makedocs(
sitename = "NATS",
Expand All @@ -17,6 +18,9 @@ makedocs(
"debugging.md",
],
"JetStream" => [
"jetstream/stream.md"
"jetstream/consumer.md"
"jetstream/keyvalue.md"
"jetstream/jetdict.md"
"jetstream/jetchannel.md"
],
Expand Down
20 changes: 19 additions & 1 deletion docs/src/jetstream/jetdict.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,22 @@ end
sleep(1) # Wait for changes
stream_unsubscribe(nc, sub)
keyvalue_stream_delete(nc, "example_kv_watch")
```
```

## Optimistic concurrency

```@repl
using NATS
using NATS.JetStream
connection = NATS.connect()
kv = JetStream.JetDict{String}(connection, "test_kv_concurrency")
kv["a"] = "1"
@async (sleep(2); kv["a"] = "2")
with_optimistic_concurrency(kv) do
old = kv["a"]
sleep(3)
kv["a"] = "$(old)_updated"
end
kv["a"]
keyvalue_stream_delete(connection, "test_kv_concurrency")
```
7 changes: 4 additions & 3 deletions src/jetstream/JetStream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ export stream_publish, stream_subscribe, stream_unsubscribe
export stream_message_get, stream_message_delete

export ConsumerConfiguration
export consumer_create, consumer_delete, consumer_ack, consumer_nak
export consumer_create, consumer_update, consumer_delete
export consumer_next, consumer_ack, consumer_nak

export keyvalue_stream_create, keyvalue_stream_delete
export keyvalue_get, keyvalue_put, keyvalue_delete
export keyvalue_stream_create, keyvalue_stream_purge, keyvalue_stream_delete
export keyvalue_get, keyvalue_put, keyvalue_delete, keyvalue_watch
export JetDict, watch, with_optimistic_concurrency

export channel_stream_create, channel_consumer_delete
Expand Down
5 changes: 5 additions & 0 deletions src/jetstream/api/consumer.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@

"""
Configuration options for a consumer.
$(TYPEDFIELDS)
"""
@kwdef struct ConsumerConfiguration
"A unique name for a durable consumer"
durable_name::Union{String, Nothing} = nothing
Expand Down
5 changes: 5 additions & 0 deletions src/jetstream/api/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ end
max_ack_pending::Union{Int64, Nothing} = nothing
end

"""
Configuration options for a stream.
$(TYPEDFIELDS)
"""
@kwdef struct StreamConfiguration
"A unique name for the Stream."
name::String
Expand Down
16 changes: 10 additions & 6 deletions src/jetstream/consumer/ack.jl
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
# """
# Confirms message delivery to server.
# """
"""
$(SIGNATURES)
Confirms message delivery to server.
"""
function consumer_ack(connection::NATS.Connection, msg::NATS.Msg)
isnothing(msg.reply_to) && error("No reply subject for msg $msg.")
!startswith(msg.reply_to, "\$JS.ACK") && @warn "`ack` sent for message that doesn't need acknowledgement."
NATS.request(connection, msg.reply_to) # TODO: add retry in case of 503
end

# """
# Mark message as undelivered, what avoid waiting for timeout before redelivery.
# """
"""
$(SIGNATURES)
Mark message as undelivered, what avoid waiting for timeout before redelivery.
"""
function consumer_nak(connection::NATS.Connection, msg::NATS.Msg)
isnothing(msg.reply_to) && error("No reply subject for msg $msg.")
!startswith(msg.reply_to, "\$JS.ACK") && @warn "`nak` sent for message that doesn't need acknowledgement."
Expand Down
15 changes: 15 additions & 0 deletions src/jetstream/consumer/manage.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,29 @@ function consumer_create_or_update(connection::NATS.Connection, config::Consumer
consumer_create_or_update(connection, config, stream.config.name)
end

"""
$(SIGNATURES)
Create a stream consumer.
"""
function consumer_create(connection::NATS.Connection, config::ConsumerConfiguration, stream::Union{String, StreamInfo})
consumer_create_or_update(connection, config, stream)
end

"""
$(SIGNATURES)
Update stream consumer configuration.
"""
function consumer_update(connection::NATS.Connection, consumer::ConsumerConfiguration, stream::Union{StreamInfo, String})
consumer_create_or_update(connection, consumer, stream)
end

"""
$(SIGNATURES)
Delete a consumer.
"""
function consumer_delete(connection::NATS.Connection, stream_name::String, consumer_name::String)
subject = "\$JS.API.CONSUMER.DELETE.$stream_name.$consumer_name"
res = NATS.request(Union{ApiResult, ApiError}, connection, subject)
Expand Down
5 changes: 5 additions & 0 deletions src/jetstream/consumer/next.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
const NEXT_EXPIRES = 500 * 1000 * 1000 # 500 ms, TODO: add to env

"""
$(SIGNATURES)
Get next message for a consumer.
"""
function consumer_next(connection::NATS.Connection, consumer::ConsumerInfo, batch::Int64; no_wait = false)
req = Dict()
req[:no_wait] = no_wait
Expand Down
31 changes: 30 additions & 1 deletion src/jetstream/keyvalue/manage.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@



function keyvalue_stream_name(bucket::String)
"KV_$bucket"
end
Expand All @@ -11,6 +10,11 @@ end

const MAX_HISTORY = 64

"""
$(SIGNATURES)
Create a stream for KV bucket.
"""
function keyvalue_stream_create(connection::NATS.Connection, bucket::String, encoding::Symbol, history = 1)
history in 1:MAX_HISTORY || error("History must be greater than 0 and cannot be greater than $MAX_HISTORY")
stream_config = StreamConfiguration(
Expand All @@ -30,21 +34,41 @@ function keyvalue_stream_info(connection::NATS.Connection, bucket::String)
stream_info(connection, keyvalue_stream_name(bucket))
end

"""
$(SIGNATURES)
Delete a KV stream by bucket name.
"""
function keyvalue_stream_delete(connection::NATS.Connection, bucket::String)
stream_delete(connection, keyvalue_stream_name(bucket))
end

"""
$(SIGNATURES)
Purge a KV stream.
"""
function keyvalue_stream_purge(connection::NATS.Connection, bucket::String)
stream_purge(connection, keyvalue_stream_name(bucket))
end

"""
$(SIGNATURES)
Get a value from KV stream.
"""
function keyvalue_get(connection::NATS.Connection, bucket::String, key::String)::NATS.Msg
validate_key(key)
stream = keyvalue_stream_name(bucket)
subject = "$(keyvalue_subject_prefix(bucket)).$key"
stream_message_get(connection, stream, subject; allow_direct = true)
end

"""
$(SIGNATURES)
Put a value to KV stream.
"""
function keyvalue_put(connection::NATS.Connection, bucket::String, key::String, value, revision = 0)::PubAck
validate_key(key)
hdrs = NATS.Headers() #TODO: can preserve original headers?
Expand All @@ -55,6 +79,11 @@ function keyvalue_put(connection::NATS.Connection, bucket::String, key::String,
stream_publish(connection, subject, (value, hdrs))
end

"""
$(SIGNATURES)
Delete a value from KV stream.
"""
function keyvalue_delete(connection::NATS.Connection, bucket::String, key)::PubAck
validate_key(key)
hdrs = [ "KV-Operation" => "DEL" ]
Expand Down
5 changes: 5 additions & 0 deletions src/jetstream/keyvalue/watch.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@

const ALL_KEYS = ">"

"""
$(SIGNATURES)
Watch for changes in KV stream.
"""
function keyvalue_watch(f, connection::NATS.Connection, bucket::String, key = ALL_KEYS)
prefix = keyvalue_subject_prefix(bucket)
subject = "$prefix.$key"
Expand Down
20 changes: 20 additions & 0 deletions src/jetstream/stream/manage.jl
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@

"""
$(SIGNATURES)
Create a stream.
"""
function stream_create(connection::NATS.Connection, config::StreamConfiguration; no_throw = false)
validate(config)
response = NATS.request(Union{StreamInfo, ApiError}, connection, "\$JS.API.STREAM.CREATE.$(config.name)", JSON3.write(config))
no_throw || response isa ApiError && throw(response)
response
end

"""
$(SIGNATURES)
Update a stream.
"""
function stream_update(connection::NATS.Connection, config::StreamConfiguration; no_throw = false)
validate(config)
response = NATS.request(Union{StreamInfo, ApiError}, connection, "\$JS.API.STREAM.UPDATE.$(config.name)", JSON3.write(config))
Expand All @@ -26,6 +36,11 @@ function stream_update_or_create(connection::NATS.Connection, config::StreamConf
end
end

"""
$(SIGNATURES)
Delete a stream.
"""
function stream_delete(connection::NATS.Connection, stream::String; no_throw = false)
res = NATS.request(Union{ApiResult, ApiError}, connection, "\$JS.API.STREAM.DELETE.$(stream)")
no_throw || res isa ApiError && throw(res)
Expand All @@ -36,6 +51,11 @@ function stream_delete(connection::NATS.Connection, stream::StreamInfo; no_throw
stream_delete(connection, stream.config.name; no_throw)
end

"""
$(SIGNATURES)
Purge a stream. It is equivalent of deleting all messages.
"""
function stream_purge(connection::NATS.Connection, stream::String; no_throw = false)
res = NATS.request(Union{ApiResult, ApiError}, connection, "\$JS.API.STREAM.PURGE.$stream")
no_throw || res isa ApiError && throw(res)
Expand Down
15 changes: 15 additions & 0 deletions src/jetstream/stream/message.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ function check_allow_direct(connection::NATS.Connection, stream_name::String)
end
end

"""
$(SIGNATURES)
Get a message from stream.
"""
function stream_message_get(connection::NATS.Connection, stream_name::String, subject::String; allow_direct = nothing)
allow_direct = @something allow_direct check_allow_direct(connection, stream_name)
if allow_direct
Expand All @@ -38,10 +43,20 @@ function stream_message_get(connection::NATS.Connection, stream_name::String, su
end
end

"""
$(SIGNATURES)
Get a message from stream.
"""
function stream_message_get(connection::NATS.Connection, stream::StreamInfo, subject::String)
stream_message_get(connection, stream.config.name, subject; stream.config.allow_direct)
end

"""
$(SIGNATURES)
Delete a message from stream.
"""
function stream_message_delete(connection::NATS.Connection, stream::StreamInfo, msg::NATS.Msg)
seq = NATS.header(msg, "Nats-Sequence")
if isnothing(seq)
Expand Down
5 changes: 5 additions & 0 deletions src/jetstream/stream/publish.jl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ function check_stream_publish_error(s, e)
e isa NATS.NATSError && e.code == 503
end

"""
$(SIGNATURES)
Publish a message to stream.
"""
function stream_publish(connection::NATS.Connection, subject, data; delays = ExponentialBackOff(n = 3, first_delay = 0.2, max_delay = 0.5))
# ack_msg = _try_request(subject, data; connection)
publish_retry = retry(NATS.request; delays, check = check_stream_publish_error)
Expand Down
5 changes: 5 additions & 0 deletions src/jetstream/stream/subscribe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

# Subscribe to a stream by creating a consumer.
# Might be more performant to configure republish subject on steram.
"""
$(SIGNATURES)
Subscribe to a stream.
"""
function stream_subscribe(f, connection, subject)
subject_streams = stream_infos(connection, subject)
if isempty(subject_streams)
Expand Down
4 changes: 4 additions & 0 deletions src/jetstream/stream/unsubscribe.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
"""
$(SIGNATURES)
Unsubscribe stream subscription.
"""
function stream_unsubscribe(connection, subscription::Tuple{NATS.Sub, JetStream.ConsumerInfo})
sub, cons = subscription
NATS.unsubscribe(connection, sub)
Expand Down

0 comments on commit d3ad528

Please sign in to comment.