Skip to content

Commit

Permalink
Merge pull request #13 from jakubwro/jakubwro/publish-optimization
Browse files Browse the repository at this point in the history
Optimize publish
  • Loading branch information
jakubwro authored Jan 13, 2024
2 parents b1e5745 + 431ccf7 commit b4bac9e
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 22 deletions.
1 change: 1 addition & 0 deletions src/protocol/protocol.jl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
### Code:

include("structs.jl")
include("validate.jl")
include("parser.jl")
include("payload.jl")
include("headers.jl")
Expand Down
5 changes: 2 additions & 3 deletions src/protocol/show.jl
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ function show(io::IO, ::MIME_PROTOCOL, connect::Connect)
end

function show(io::IO, ::MIME_PROTOCOL, pub::Pub)
hbytes = length(pub.headers)
hbytes = pub.headers_length
nbytes = length(pub.payload)
if hbytes > 0
write(io, "H")
Expand All @@ -105,8 +105,7 @@ function show(io::IO, ::MIME_PROTOCOL, pub::Pub)
if hbytes > 0
write(io, " $hbytes")
end
write(io, " $(hbytes + nbytes)\r\n")
write(io, pub.headers)
write(io, " $(nbytes)\r\n")
write(io, pub.payload)
write(io, "\r\n")
end
Expand Down
10 changes: 5 additions & 5 deletions src/protocol/structs.jl
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ struct Pub <: ProtocolMessage
subject::String
"The reply subject that subscribers can use to send a response back to the publisher/requestor."
reply_to::Union{String, Nothing}
"Header version `NATS/1.0␍␊` followed by one or more `name: value` pairs, each separated by `␍␊`."
headers::Vector{UInt8}
"The message payload data."
"Length of headers data inside payload"
headers_length::Int64
"Optional headers (`NATS/1.0␍␊` followed by one or more `name: value` pairs, each separated by `␍␊`) followed by payload data."
payload::Vector{UInt8}
end

Expand Down Expand Up @@ -173,9 +173,9 @@ struct Msg <: ProtocolMessage
sid::String
"The subject on which the publisher is listening for responses."
reply_to::Union{String, Nothing}
"Header version `NATS/1.0␍␊` followed by one or more `name: value` pairs, each separated by `␍␊`."
"Length of headers data inside payload"
headers_length::Int64
"The message payload data."
"Optional headers (`NATS/1.0␍␊` followed by one or more `name: value` pairs, each separated by `␍␊`) followed by payload data."
payload::AbstractVector{UInt8}
end

Expand Down
28 changes: 28 additions & 0 deletions src/protocol/validate.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
### validate.jl
#
# Copyright (C) 2024 Jakub Wronowski.
#
# Maintainer: Jakub Wronowski <[email protected]>
# Keywords: nats, nats-client, julia
#
# This file is a part of NATS.jl.
#
# License is MIT.
#
### Commentary:
#
# This file contains implementation of functions for validation of protocol messages.
#
### Code:

function validate(pub::Pub)
isempty(pub.subject) && error("Publication subject is empty")
startswith(pub.subject, '.') && error("Publication subject '$(pub.subject)' cannot start with '.'")
endswith(pub.subject, '.') && error("Publication subject '$(pub.subject)' cannot end with '.'")
for c in pub.subject
if c == ' '
error("Publication subject contains invalid character '$c'.")
end
end
true
end
14 changes: 11 additions & 3 deletions src/pubsub/publish.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
#
### Code:

const mime_payload = MIME_PAYLOAD()
const mime_headers = MIME_HEADERS()

"""
$(SIGNATURES)
Expand All @@ -38,9 +41,14 @@ function publish(
data = nothing;
reply_to::Union{String, Nothing} = nothing
)
payload_bytes = repr(MIME_PAYLOAD(), data)
headers_bytes = repr(MIME_HEADERS(), data)
send(connection, Pub(subject, reply_to, headers_bytes, payload_bytes))
payload_io = IOBuffer()
show(payload_io, mime_headers, data)
headers_length = payload_io.size
show(payload_io, mime_payload, data)
payload_bytes = take!(payload_io)
pub = Pub(subject, reply_to, headers_length, payload_bytes)
validate(pub)
send(connection, pub)
inc_stats(:msgs_published, 1, connection.stats, state.stats)
sub_stats = ScopedValues.get(scoped_subscription_stats)
if !isnothing(sub_stats)
Expand Down
4 changes: 1 addition & 3 deletions test/benchmarks.jl
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ function msgs_per_second(connection::NATS.Connection, connection2::NATS.Connecti
Threads.atomic_add!(msgs_after_timeout, 1)
end
end
pub = NATS.Pub(subject, nothing, UInt8[], uint8_vec("Hi!"))
pub = NATS.Pub(subject, nothing, 0, uint8_vec("Hi!"))
t = Threads.@spawn :default begin
@time while isopen(tm)
if time_first_pub == 0.0
Expand Down Expand Up @@ -128,8 +128,6 @@ end
@testset "Publisher benchmark." begin
connection = NATS.connect()

# pub = NATS.Pub("zxc", nothing, UInt8[], uint8_vec("Hello world!!!!!"))

tm = Timer(1.0)
counter = 0
c = 0
Expand Down
2 changes: 1 addition & 1 deletion test/connection.jl
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ NATS.status()
sleep(0.1)
@test_throws ErrorException publish(connection, "test_publish_on_drained")

pub = NATS.Pub("test_publish_on_drained", nothing, UInt8[], UInt8[])
pub = NATS.Pub("test_publish_on_drained", nothing, 0, UInt8[])
@test_throws ErrorException NATS.send(connection, repeat([pub], 10))
end

Expand Down
14 changes: 7 additions & 7 deletions test/protocol.jl
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,15 @@ end
json = """{"verbose":false,"pedantic":false,"tls_required":false,"lang":"julia","version":"0.0.1"}"""
@test serialize(JSON3.read(json, NATS.Connect)) == """CONNECT $json\r\n"""

@test serialize(Pub("FOO", nothing, UInt8[], uint8_vec("Hello NATS!"))) == "PUB FOO 11\r\nHello NATS!\r\n"
@test serialize(Pub("FRONT.DOOR", "JOKE.22", UInt8[], uint8_vec("Knock Knock"))) == "PUB FRONT.DOOR JOKE.22 11\r\nKnock Knock\r\n"
@test serialize(Pub("NOTIFY", nothing, UInt8[], UInt8[])) == "PUB NOTIFY 0\r\n\r\n"
@test serialize(Pub("FOO", nothing, 0, uint8_vec("Hello NATS!"))) == "PUB FOO 11\r\nHello NATS!\r\n"
@test serialize(Pub("FRONT.DOOR", "JOKE.22", 0, uint8_vec("Knock Knock"))) == "PUB FRONT.DOOR JOKE.22 11\r\nKnock Knock\r\n"
@test serialize(Pub("NOTIFY", nothing, 0, UInt8[])) == "PUB NOTIFY 0\r\n\r\n"

@test serialize(Pub("FOO", nothing, uint8_vec("NATS/1.0\r\nBar: Baz\r\n\r\n"), uint8_vec("Hello NATS!"))) == "HPUB FOO 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"
@test serialize(Pub("FRONT.DOOR", "JOKE.22", uint8_vec("NATS/1.0\r\nBREAKFAST: donut\r\nLUNCH: burger\r\n\r\n"), uint8_vec("Knock Knock"))) == "HPUB FRONT.DOOR JOKE.22 45 56\r\nNATS/1.0\r\nBREAKFAST: donut\r\nLUNCH: burger\r\n\r\nKnock Knock\r\n"
@test serialize(Pub("FOO", nothing, 22, uint8_vec("NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"))) == "HPUB FOO 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n"
@test serialize(Pub("FRONT.DOOR", "JOKE.22", 45, uint8_vec("NATS/1.0\r\nBREAKFAST: donut\r\nLUNCH: burger\r\n\r\nKnock Knock"))) == "HPUB FRONT.DOOR JOKE.22 45 56\r\nNATS/1.0\r\nBREAKFAST: donut\r\nLUNCH: burger\r\n\r\nKnock Knock\r\n"

@test serialize(Pub("NOTIFY", nothing, uint8_vec("NATS/1.0\r\nBar: Baz\r\n\r\n"), UInt8[])) == "HPUB NOTIFY 22 22\r\nNATS/1.0\r\nBar: Baz\r\n\r\n\r\n"
@test serialize(Pub("MORNING.MENU", nothing, uint8_vec("NATS/1.0\r\nBREAKFAST: donut\r\nBREAKFAST: eggs\r\n\r\n"), uint8_vec("Yum!"))) == "HPUB MORNING.MENU 47 51\r\nNATS/1.0\r\nBREAKFAST: donut\r\nBREAKFAST: eggs\r\n\r\nYum!\r\n"
@test serialize(Pub("NOTIFY", nothing, 22, uint8_vec("NATS/1.0\r\nBar: Baz\r\n\r\n"))) == "HPUB NOTIFY 22 22\r\nNATS/1.0\r\nBar: Baz\r\n\r\n\r\n"
@test serialize(Pub("MORNING.MENU", nothing, 47, uint8_vec("NATS/1.0\r\nBREAKFAST: donut\r\nBREAKFAST: eggs\r\n\r\nYum!"))) == "HPUB MORNING.MENU 47 51\r\nNATS/1.0\r\nBREAKFAST: donut\r\nBREAKFAST: eggs\r\n\r\nYum!\r\n"

@test serialize(Sub("FOO", nothing, "1")) == "SUB FOO 1\r\n"
@test serialize(Sub("BAR", "G1", "44")) == "SUB BAR G1 44\r\n"
Expand Down

0 comments on commit b4bac9e

Please sign in to comment.