Skip to content

Commit

Permalink
stream test
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubwro committed Feb 3, 2024
1 parent 4007986 commit 4a25118
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/jetstream/JetStream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import Base: IteratorSize
import Base: put!, take!


export StreamConfiguration
export StreamConfiguration, Republish, StreamConsumerLimit, StreamSource
export stream_create, stream_update, stream_update_or_create, stream_purge, stream_delete
export stream_publish, stream_subscribe, stream_unsubscribe
export stream_message_get, stream_message_delete
Expand Down
10 changes: 0 additions & 10 deletions src/jetstream/api/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,6 @@ end
external::Union{ExternalStreamSource, Nothing} = nothing
end

import Base: convert

function convert(::Type{StreamSource}, name::String)
StreamSource(; name)
end

function convert(::Type{StreamSource}, t::NamedTuple)
StreamSource(; t...)
end

@kwdef struct Republish
"The source subject to republish"
src::String
Expand Down
69 changes: 60 additions & 9 deletions test/jetstream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ end
name = "SOME_STREAM",
description = "SOME_STREAM stream",
subjects = ["SOME_STREAM.*"],
retention = :workqueue,
retention = :workqueue,
storage = :memory,
)
stream_info = JetStream.stream_create(connection, stream_config)
Expand Down Expand Up @@ -82,7 +82,7 @@ end
name = "SOME_STREAM",
description = "SOME_STREAM stream",
subjects = ["SOME_STREAM.*"],
retention = :workqueue,
retention = :workqueue,
storage = :memory,
)
stream_info = stream_create(connection, stream_config)
Expand All @@ -105,7 +105,7 @@ end
name = "SOME_STREAM",
description = "SOME_STREAM stream",
subjects = ["SOME_STREAM.*"],
retention = :workqueue,
retention = :workqueue,
storage = :memory,
allow_direct = true
)
Expand Down Expand Up @@ -134,7 +134,7 @@ end

@testset "Create stream, publish and subscribe." begin
connection = NATS.connect()

stream_name = randstring(10)
subject_prefix = randstring(4)

Expand Down Expand Up @@ -214,7 +214,7 @@ uint8_vec(s::String) = convert.(UInt8, collect(s))
for i in 1:100
@test kv["key_$i"] == "value_$i"
end

@test length(kv) == 100
@test length(collect(kv)) == 100
@test length(keys(kv)) == 100
Expand All @@ -236,7 +236,7 @@ end
@test kv["!@#%^&"] == "5"
@test collect(kv) == ["!@#%^&" => "5"]
keyvalue_stream_delete(connection, "test_kv")

@test_throws "No `encodekey` implemented for wrongencoding encoding" JetStream.JetDict{String}(connection, "test_kv", :wrongencoding)
end

Expand Down Expand Up @@ -287,14 +287,14 @@ end
connection = NATS.connect()
kv = JetStream.JetDict{String}(connection, "test_kv")

with_optimistic_concurrency(kv) do
with_optimistic_concurrency(kv) do
kv["a"] = "4"
kv["a"] = "5"
kv["a"] = "5"
end

@async (sleep(2); kv["a"] = "6")

@test_throws "wrong last sequence" with_optimistic_concurrency(kv) do
@test_throws "wrong last sequence" with_optimistic_concurrency(kv) do
old = kv["a"]
sleep(3)
kv["a"] = "$(old)_updated"
Expand Down Expand Up @@ -339,3 +339,54 @@ end

destroy!(ch)
end

@testset "Stream with complicated configuration" begin
connection = NATS.connect()
stream_config = StreamConfiguration(
name = "SOME_STREAM",
description = "SOME_STREAM stream",
subjects = ["SOME_STREAM.*"],
retention = :workqueue,
storage = :memory,
republish = Republish(
src = "SOME_STREAM.foo",
dest = "REPUBLISHED.foo"
),
consumer_limits = StreamConsumerLimit(
max_ack_pending = 100,
),

)
stream_info = JetStream.stream_create(connection, stream_config)

# mirror_stream_config = StreamConfiguration(
# name = "MIRROR_STREAM",
# description = "MIRROR_STREAM stream",
# storage = :memory,
# mirror = StreamSource(
# name = "SOME_STREAM"
# )
# )
# mirror_stream_info = JetStream.stream_create(connection, mirror_stream_config)

republished = []
sub = subscribe(connection, "REPUBLISHED.foo") do msg
push!(republished, msg)
end

sleep(0.1)
stream_publish(connection, "SOME_STREAM.foo", "test message")
drain(connection, sub)

# mirror_consumer_config = ConsumerConfiguration(
# name ="mirror_consumer_test",
# ack_policy = :explicit
# )
# consumer_info = consumer_create(connection, mirror_consumer_config, mirror_stream_info)
# msg = consumer_next(connection, consumer_info)
# @test msg isa NATS.Msg
# @test msg.subject == "SOME_STREAM.foo"
@test length(republished) == 1
stream_delete(connection, stream_info)
# stream_delete(connection, mirror_stream_info)
end

0 comments on commit 4a25118

Please sign in to comment.