Skip to content

Commit

Permalink
stream sub structure
Browse files Browse the repository at this point in the history
  • Loading branch information
jakubwro committed Jan 27, 2024
1 parent 978c24b commit dd95bcd
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 5 deletions.
6 changes: 6 additions & 0 deletions src/jetstream/stream/stream.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@

struct StreamSub
sub::NATS.Sub
consumer::ConsumerInfo
end


include("manage.jl")
include("info.jl")
include("message.jl")
Expand Down
2 changes: 1 addition & 1 deletion src/jetstream/stream/subscribe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@ function stream_subscribe(f, connection, subject)
f_typed(msg)
end
end
sub, consumer
StreamSub(sub, consumer)
end
7 changes: 3 additions & 4 deletions src/jetstream/stream/unsubscribe.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ $(SIGNATURES)
Unsubscribe stream subscription.
"""
function stream_unsubscribe(connection, subscription::Tuple{NATS.Sub, JetStream.ConsumerInfo})
sub, cons = subscription
NATS.unsubscribe(connection, sub)
consumer_delete(connection, cons)
function stream_unsubscribe(connection, stream_sub::StreamSub)
NATS.unsubscribe(connection, stream_sub.sub)
consumer_delete(connection, stream_sub.consumer)
end

0 comments on commit dd95bcd

Please sign in to comment.