diff --git a/src/protocol/protocol.jl b/src/protocol/protocol.jl index f7721d90..cbb27bd1 100644 --- a/src/protocol/protocol.jl +++ b/src/protocol/protocol.jl @@ -16,6 +16,7 @@ ### Code: include("structs.jl") +include("validate.jl") include("parser.jl") include("payload.jl") include("headers.jl") diff --git a/src/protocol/show.jl b/src/protocol/show.jl index 6eb9cd1b..287ae221 100644 --- a/src/protocol/show.jl +++ b/src/protocol/show.jl @@ -91,7 +91,7 @@ function show(io::IO, ::MIME_PROTOCOL, connect::Connect) end function show(io::IO, ::MIME_PROTOCOL, pub::Pub) - hbytes = length(pub.headers) + hbytes = pub.headers_length nbytes = length(pub.payload) if hbytes > 0 write(io, "H") @@ -105,8 +105,7 @@ function show(io::IO, ::MIME_PROTOCOL, pub::Pub) if hbytes > 0 write(io, " $hbytes") end - write(io, " $(hbytes + nbytes)\r\n") - write(io, pub.headers) + write(io, " $(nbytes)\r\n") write(io, pub.payload) write(io, "\r\n") end diff --git a/src/protocol/structs.jl b/src/protocol/structs.jl index 5a2024d8..d8ca4229 100644 --- a/src/protocol/structs.jl +++ b/src/protocol/structs.jl @@ -127,9 +127,9 @@ struct Pub <: ProtocolMessage subject::String "The reply subject that subscribers can use to send a response back to the publisher/requestor." reply_to::Union{String, Nothing} - "Header version `NATS/1.0␍␊` followed by one or more `name: value` pairs, each separated by `␍␊`." - headers::Vector{UInt8} - "The message payload data." + "Length of headers data inside payload" + headers_length::Int64 + "Optional headers (`NATS/1.0␍␊` followed by one or more `name: value` pairs, each separated by `␍␊`) followed by payload data." payload::Vector{UInt8} end @@ -173,9 +173,9 @@ struct Msg <: ProtocolMessage sid::String "The subject on which the publisher is listening for responses." reply_to::Union{String, Nothing} - "Header version `NATS/1.0␍␊` followed by one or more `name: value` pairs, each separated by `␍␊`." + "Length of headers data inside payload" headers_length::Int64 - "The message payload data." + "Optional headers (`NATS/1.0␍␊` followed by one or more `name: value` pairs, each separated by `␍␊`) followed by payload data." payload::AbstractVector{UInt8} end diff --git a/src/protocol/validate.jl b/src/protocol/validate.jl new file mode 100644 index 00000000..4ad36d76 --- /dev/null +++ b/src/protocol/validate.jl @@ -0,0 +1,28 @@ +### validate.jl +# +# Copyright (C) 2024 Jakub Wronowski. +# +# Maintainer: Jakub Wronowski +# Keywords: nats, nats-client, julia +# +# This file is a part of NATS.jl. +# +# License is MIT. +# +### Commentary: +# +# This file contains implementation of functions for validation of protocol messages. +# +### Code: + +function validate(pub::Pub) + isempty(pub.subject) && error("Publication subject is empty") + startswith(pub.subject, '.') && error("Publication subject '$(pub.subject)' cannot start with '.'") + endswith(pub.subject, '.') && error("Publication subject '$(pub.subject)' cannot end with '.'") + for c in pub.subject + if c == ' ' + error("Publication subject contains invalid character '$c'.") + end + end + true +end diff --git a/src/pubsub/publish.jl b/src/pubsub/publish.jl index 29ceb2a0..6fc66e7b 100644 --- a/src/pubsub/publish.jl +++ b/src/pubsub/publish.jl @@ -15,6 +15,9 @@ # ### Code: +const mime_payload = MIME_PAYLOAD() +const mime_headers = MIME_HEADERS() + """ $(SIGNATURES) @@ -38,9 +41,14 @@ function publish( data = nothing; reply_to::Union{String, Nothing} = nothing ) - payload_bytes = repr(MIME_PAYLOAD(), data) - headers_bytes = repr(MIME_HEADERS(), data) - send(connection, Pub(subject, reply_to, headers_bytes, payload_bytes)) + payload_io = IOBuffer() + show(payload_io, mime_headers, data) + headers_length = payload_io.size + show(payload_io, mime_payload, data) + payload_bytes = take!(payload_io) + pub = Pub(subject, reply_to, headers_length, payload_bytes) + validate(pub) + send(connection, pub) inc_stats(:msgs_published, 1, connection.stats, state.stats) sub_stats = ScopedValues.get(scoped_subscription_stats) if !isnothing(sub_stats) diff --git a/test/benchmarks.jl b/test/benchmarks.jl index 594c2a16..02198529 100644 --- a/test/benchmarks.jl +++ b/test/benchmarks.jl @@ -45,7 +45,7 @@ function msgs_per_second(connection::NATS.Connection, connection2::NATS.Connecti Threads.atomic_add!(msgs_after_timeout, 1) end end - pub = NATS.Pub(subject, nothing, UInt8[], uint8_vec("Hi!")) + pub = NATS.Pub(subject, nothing, 0, uint8_vec("Hi!")) t = Threads.@spawn :default begin @time while isopen(tm) if time_first_pub == 0.0 @@ -128,8 +128,6 @@ end @testset "Publisher benchmark." begin connection = NATS.connect() - # pub = NATS.Pub("zxc", nothing, UInt8[], uint8_vec("Hello world!!!!!")) - tm = Timer(1.0) counter = 0 c = 0 diff --git a/test/connection.jl b/test/connection.jl index 0c54f15b..bc30b72b 100644 --- a/test/connection.jl +++ b/test/connection.jl @@ -241,7 +241,7 @@ NATS.status() sleep(0.1) @test_throws ErrorException publish(connection, "test_publish_on_drained") - pub = NATS.Pub("test_publish_on_drained", nothing, UInt8[], UInt8[]) + pub = NATS.Pub("test_publish_on_drained", nothing, 0, UInt8[]) @test_throws ErrorException NATS.send(connection, repeat([pub], 10)) end diff --git a/test/protocol.jl b/test/protocol.jl index a9e0eb3d..a03b487d 100644 --- a/test/protocol.jl +++ b/test/protocol.jl @@ -87,15 +87,15 @@ end json = """{"verbose":false,"pedantic":false,"tls_required":false,"lang":"julia","version":"0.0.1"}""" @test serialize(JSON3.read(json, NATS.Connect)) == """CONNECT $json\r\n""" - @test serialize(Pub("FOO", nothing, UInt8[], uint8_vec("Hello NATS!"))) == "PUB FOO 11\r\nHello NATS!\r\n" - @test serialize(Pub("FRONT.DOOR", "JOKE.22", UInt8[], uint8_vec("Knock Knock"))) == "PUB FRONT.DOOR JOKE.22 11\r\nKnock Knock\r\n" - @test serialize(Pub("NOTIFY", nothing, UInt8[], UInt8[])) == "PUB NOTIFY 0\r\n\r\n" + @test serialize(Pub("FOO", nothing, 0, uint8_vec("Hello NATS!"))) == "PUB FOO 11\r\nHello NATS!\r\n" + @test serialize(Pub("FRONT.DOOR", "JOKE.22", 0, uint8_vec("Knock Knock"))) == "PUB FRONT.DOOR JOKE.22 11\r\nKnock Knock\r\n" + @test serialize(Pub("NOTIFY", nothing, 0, UInt8[])) == "PUB NOTIFY 0\r\n\r\n" - @test serialize(Pub("FOO", nothing, uint8_vec("NATS/1.0\r\nBar: Baz\r\n\r\n"), uint8_vec("Hello NATS!"))) == "HPUB FOO 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n" - @test serialize(Pub("FRONT.DOOR", "JOKE.22", uint8_vec("NATS/1.0\r\nBREAKFAST: donut\r\nLUNCH: burger\r\n\r\n"), uint8_vec("Knock Knock"))) == "HPUB FRONT.DOOR JOKE.22 45 56\r\nNATS/1.0\r\nBREAKFAST: donut\r\nLUNCH: burger\r\n\r\nKnock Knock\r\n" + @test serialize(Pub("FOO", nothing, 22, uint8_vec("NATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!"))) == "HPUB FOO 22 33\r\nNATS/1.0\r\nBar: Baz\r\n\r\nHello NATS!\r\n" + @test serialize(Pub("FRONT.DOOR", "JOKE.22", 45, uint8_vec("NATS/1.0\r\nBREAKFAST: donut\r\nLUNCH: burger\r\n\r\nKnock Knock"))) == "HPUB FRONT.DOOR JOKE.22 45 56\r\nNATS/1.0\r\nBREAKFAST: donut\r\nLUNCH: burger\r\n\r\nKnock Knock\r\n" - @test serialize(Pub("NOTIFY", nothing, uint8_vec("NATS/1.0\r\nBar: Baz\r\n\r\n"), UInt8[])) == "HPUB NOTIFY 22 22\r\nNATS/1.0\r\nBar: Baz\r\n\r\n\r\n" - @test serialize(Pub("MORNING.MENU", nothing, uint8_vec("NATS/1.0\r\nBREAKFAST: donut\r\nBREAKFAST: eggs\r\n\r\n"), uint8_vec("Yum!"))) == "HPUB MORNING.MENU 47 51\r\nNATS/1.0\r\nBREAKFAST: donut\r\nBREAKFAST: eggs\r\n\r\nYum!\r\n" + @test serialize(Pub("NOTIFY", nothing, 22, uint8_vec("NATS/1.0\r\nBar: Baz\r\n\r\n"))) == "HPUB NOTIFY 22 22\r\nNATS/1.0\r\nBar: Baz\r\n\r\n\r\n" + @test serialize(Pub("MORNING.MENU", nothing, 47, uint8_vec("NATS/1.0\r\nBREAKFAST: donut\r\nBREAKFAST: eggs\r\n\r\nYum!"))) == "HPUB MORNING.MENU 47 51\r\nNATS/1.0\r\nBREAKFAST: donut\r\nBREAKFAST: eggs\r\n\r\nYum!\r\n" @test serialize(Sub("FOO", nothing, "1")) == "SUB FOO 1\r\n" @test serialize(Sub("BAR", "G1", "44")) == "SUB BAR G1 44\r\n"