Skip to content

Commit

Permalink
kv refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubwro committed Feb 3, 2024
1 parent 049c6d9 commit b6233b6
Showing 1 changed file with 15 additions and 19 deletions.
34 changes: 15 additions & 19 deletions src/jetstream/jetdict/jetdict.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,25 @@ struct JetDict{T} <: AbstractDict{String, T}
encoding::KeyEncoding
end

function get_jetdict_stream_info(connection, bucket, encoding)
res = stream_info(connection, "$KV_STREAM_NAME_PREFIX$bucket"; no_throw = true)
if res isa ApiError
res.code != 404 && throw(res)
keyvalue_stream_create(connection, bucket, encoding)
else
stream_encoding = isnothing(res.config.metadata) ? :none : Symbol(get(res.config.metadata, "encoding", "none"))
if encoding != stream_encoding
error("Encoding do not match, cannot use :$encoding encoding on stream with :$stream_encoding encoding")
end
res
end
end

function JetDict{T}(connection::NATS.Connection, bucket::String, encoding::Symbol = :none) where T
check_encoding_implemented(encoding)
NATS.find_msg_conversion_or_throw(T)
NATS.find_data_conversion_or_throw(T)
stream = begin
res = stream_info(connection, "$KV_STREAM_NAME_PREFIX$bucket"; no_throw = true)
if res isa ApiError
res.code != 404 && throw(res)
keyvalue_stream_create(connection, bucket, encoding)
else
res
end
end
stream_encoding = begin
if isnothing(stream.config.metadata)
:none
else
Symbol(get(stream.config.metadata, "encoding", "none"))
end
end
if encoding != stream_encoding
error("Encoding do not match, cannot use :$encoding encoding on stream with :$stream_encoding encoding")
end
stream = get_jetdict_stream_info(connection, bucket, encoding)
JetDict{T}(connection, bucket, stream, T, ScopedValue{Dict{String, UInt64}}(), KeyEncoding{encoding}())
end

Expand Down

0 comments on commit b6233b6

Please sign in to comment.