Skip to content

Commit

Permalink
stream mirroring test
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubwro committed Feb 3, 2024
1 parent 4a25118 commit 049c6d9
Showing 1 changed file with 33 additions and 20 deletions.
53 changes: 33 additions & 20 deletions test/jetstream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ end
destroy!(ch)
end

@testset "Stream with complicated configuration" begin
@testset "Stream republish" begin
connection = NATS.connect()
stream_config = StreamConfiguration(
name = "SOME_STREAM",
Expand All @@ -359,16 +359,6 @@ end
)
stream_info = JetStream.stream_create(connection, stream_config)

# mirror_stream_config = StreamConfiguration(
# name = "MIRROR_STREAM",
# description = "MIRROR_STREAM stream",
# storage = :memory,
# mirror = StreamSource(
# name = "SOME_STREAM"
# )
# )
# mirror_stream_info = JetStream.stream_create(connection, mirror_stream_config)

republished = []
sub = subscribe(connection, "REPUBLISHED.foo") do msg
push!(republished, msg)
Expand All @@ -378,15 +368,38 @@ end
stream_publish(connection, "SOME_STREAM.foo", "test message")
drain(connection, sub)

# mirror_consumer_config = ConsumerConfiguration(
# name ="mirror_consumer_test",
# ack_policy = :explicit
# )
# consumer_info = consumer_create(connection, mirror_consumer_config, mirror_stream_info)
# msg = consumer_next(connection, consumer_info)
# @test msg isa NATS.Msg
# @test msg.subject == "SOME_STREAM.foo"
@test length(republished) == 1
stream_delete(connection, stream_info)
# stream_delete(connection, mirror_stream_info)
end

@testset "Stream mirroring" begin
connection = NATS.connect()
stream_config = StreamConfiguration(
name = "SOME_STREAM",
description = "SOME_STREAM stream",
subjects = ["SOME_STREAM.*"],
retention = :workqueue,
storage = :memory,
)
stream_info = JetStream.stream_create(connection, stream_config)

mirror_stream_config = StreamConfiguration(
name = "MIRROR_STREAM",
description = "MIRROR_STREAM stream",
storage = :memory,
sources = [ StreamSource(name = "SOME_STREAM") ]
)
mirror_stream_info = JetStream.stream_create(connection, mirror_stream_config)

mirror_consumer_config = ConsumerConfiguration(
name ="mirror_consumer_test",
ack_policy = :explicit
)
stream_publish(connection, "SOME_STREAM.foo", "test message")
consumer_info = consumer_create(connection, mirror_consumer_config, mirror_stream_info)
msg = consumer_next(connection, consumer_info)
@test msg isa NATS.Msg
@test msg.subject == "SOME_STREAM.foo"
stream_delete(connection, stream_info)
stream_delete(connection, mirror_stream_info)
end

0 comments on commit 049c6d9

Please sign in to comment.