Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#9 Encoders for MQTT #104

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open

#9 Encoders for MQTT #104

wants to merge 20 commits into from

Conversation

sbcd90
Copy link

@sbcd90 sbcd90 commented Nov 8, 2020

No description provided.

cloudevents-sdk-mqtt/Cargo.toml Outdated Show resolved Hide resolved
Signed-off-by: Subhobrata Dey <[email protected]>
Signed-off-by: Subhobrata Dey <[email protected]>
@slinkydeveloper slinkydeveloper added the enhancement New feature or request label Nov 10, 2020
@slinkydeveloper slinkydeveloper added this to the 0.4 milestone Nov 10, 2020
@slinkydeveloper
Copy link
Member

I marked this as 0.4, hopefully paho.mqtt new version will be released before our new version and we can get this one in. I'll review this PR asap

@slinkydeveloper slinkydeveloper self-assigned this Nov 10, 2020
@slinkydeveloper
Copy link
Member

Seems like musl doesn't like paho.mqtt? Is there a way to fix it? I see this issue is open here: eclipse-paho/paho.mqtt.rust#57

One thing I wanted with the integration between sdk-rust and mqtt is a crate supporting no_std (given we're also investing a lot of time in supporting no std #94). Is there any particular reason why @sbcd90 you chose paho.mqtt? Did you considered other mqtt clients supporting no_std?

@sbcd90
Copy link
Author

sbcd90 commented Nov 10, 2020

I did evaluate https://github.com/bytebeamio/rumqtt . I can use any other library.
However, i went with paho mqtt because I was not sure of future maintenance guarantees for rumqtt library.

@sbcd90
Copy link
Author

sbcd90 commented Nov 10, 2020

Please let me know if i should try switching the library.

@fpagliughi
Copy link

A Paho release went out last week (v0.8).

Another (v0.9) is in active development and should be out by the end of the year. The initial purpose for the next release was to get HTTP/S proxy support for websocket connections, but a few other additions popped up, and now we're also going to push for better cross-compiling support and fully static linking. That includes (optional) static OpenSSL linking and getting that musl support stable.

@slinkydeveloper
Copy link
Member

slinkydeveloper commented Nov 30, 2020

@sbcd90 I think we can go ahead and work on this one, but let's be explicit we're supporting paho, so maybe rename it to cloudevents-sdk-paho-mqtt. Can you set the version to 0.8?

Signed-off-by: Subhobrata Dey <[email protected]>
@sbcd90
Copy link
Author

sbcd90 commented Nov 30, 2020

@sbcd90 I think we can go ahead and work on this one, but let's be explicit we're supporting paho, so maybe rename it to cloudevents-sdk-paho-mqtt. Can you set the version to 0.8?

@slinkydeveloper I have set version to 0.8

}

impl ConsumerMessageDeserializer {
fn get_mqtt_headers(message: &Message) -> Result<HashMap<String, Vec<u8>>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the result return here? I don't see any failing stmt here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated this

fn encoding(&self) -> Encoding {
match (
self.headers
.get("content-type")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use the constant in headers?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated this

headers::MqttVersion::V3_1_1 => {
StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?)
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may miss some knowledge about mqtt here, but why do we need the headers version? Also, why in V5 only Binary Deserializer is supported (aka only binary mode is supported), more than the general MessageDeserializer?

Also, can you infer the headers version from Message more than providing it manually as parameter?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now, V5 should allow both structured & binary mode. V3 only structured mode.

Copy link
Member

@slinkydeveloper slinkydeveloper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So i have some concerns about the whole v3/v5 difference, why did you defined an extension for it? do we need it?

Comment on lines 1 to 2
//! This library provides Mqtt protocol bindings for CloudEvents
//! using the [paho.mqtt.rust](https://github.com/eclipse/paho.mqtt.rust) library.\\
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have here a sample usage in the docs? (look at actix-web integration module for details on how)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

up

use cloudevents::{EventBuilderV10, EventBuilder};
use cloudevents_sdk_mqtt::{MessageRecord, MessageBuilderExt, MqttVersion, MessageExt};

fn consume_v3(broker: &str, topic_name: &str) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make the version just an argument more than 2 separate functions?

process::exit(1);
});

if let Err(err) = block_on(async {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder, can you make the consume_v3 function async in order to push the "async concern" to the outer scope (caller of consume)

process::exit(1);
});

if let Err(err) = block_on(async {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

}
}

fn produce_v3(broker: &str, topic_name: &str) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

.long("mode")
.help("enter \"consmer\" or \"producer\"")
.takes_value(true)
.possible_values(&["consumerV3", "producerV3", "consumerV5", "producerV5"])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make the version a separate arg and choose some default?

Comment on lines 20 to 21
let mut hm = HashMap::new();
let prop_iterator = message.properties().iter(PropertyCode::UserProperty);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this copy? Can't you just access to the user properties in the deserialize_binary method?

Comment on lines 118 to 123
self.headers
.get(headers::MQTT_VERSION_HEADER)
.map(|s| String::from_utf8(s.to_vec()).ok())
.flatten()
.map(|s| s.eq(headers::MQTT_V5_BINARY))
.unwrap_or(false),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So do we expect incoming messages to have this header?


pub fn from_event(event: Event) -> Result<Self> {
match event
.extension(headers::MQTT_VERSION_HEADER)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this part of the spec?

@sbcd90
Copy link
Author

sbcd90 commented Dec 9, 2020

So i have some concerns about the whole v3/v5 difference, why did you defined an extension for it? do we need it?

Hi @slinkydeveloper , currently, v3 supports only structured mode. v5 supports both structured as well as binary mode according to spec.
This is because mqtt v3 has no support for headers while mqtt v5 has.
Now, in the current approach the default mode is structured mode which is supported by both v3 & v5. If binary mode needs to be used, then we need a flag to tell us if the message has to be deserialized using BinaryDeserializer.

The spec doesnt define how to set this mqtt version info.

Earlier, I was passing the flag as a parameter, now I pass it as an extension field. paho-mqtt passes version info as a param.
See

Another approach, we can follow is, ignore the mqtt version, & just trust the Content-Type header. This leaves us with a risk that we're depending on library api behavior a little bit.

match (
            self.headers
                .get(headers::CONTENT_TYPE)
                .map(|s| String::from_utf8(s.to_vec()).ok())
                .flatten()
                .map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER))
                .unwrap_or(false),
            self.headers.len() > 0,
        ) {
            // v5 & structured
            (true, true) => Encoding::STRUCTURED,
            // v3 & structured
            (false, false) => Encoding::STRUCTURED,
            // v5 & binary
            (false, true) => Encoding::BINARY,
            _ => Encoding::UNKNOWN
        }

Kindly let me know according to you which is the best approach.

@slinkydeveloper
Copy link
Member

Ok so one step at the time.

MQTT -> CloudEvent

First of all, i think you should remove the headers map you create and you should directly access to the Property borrow, in order to avoid allocating intermediate memory, which in this case it's not required.

For figuring out v3 vs v5 messages, I think we should simply do best effort: from the paho.mqtt rustdocs i see that you can always get the properties from a message https://docs.rs/paho-mqtt/0.8.0/paho_mqtt/message/struct.Message.html#method.properties and in case the UserProperty is empty, then we can assume the properties are not there so it's not mqtt v5 (hence we should just deserialize as structured).

CloudEvent -> MQTT

In this case, we should have an additional input in the serialization that explicits the mqtt version, like an enum MqttVersion created by us with MQTT_3 and MQTT_5 variants. I thought there was such enum in paho.mqtt, but there isn't. No extensions, the event must not be mutated.

Also note that i don't expect the user providing MessageRecord, this should be done under the hood:

pub trait MessageBuilderExt {
    fn event(self, Event, MqttVersion) -> MessageBuilder;
}

And the implementation would look like:

impl MessageBuilderExt for MessageBuilder {
    fn event(self, Event, MqttVersion) -> MessageBuilder {
        // mutate directly MessageBuilder here using MessageRecord
    }
}

Check out the actix-web, because it's a better one (I see that you probably looked at the MessageRecord thingy from Kafka, but that's kinda bad because of the rdkafka apis themselves)

Does that makes sense? Did I understood the problems here?

Signed-off-by: Subhobrata Dey <[email protected]>
@sbcd90
Copy link
Author

sbcd90 commented Dec 22, 2020

Hi @slinkydeveloper , Thanks a lot for your detailed review. Refactored the code accordingly.
Added a sample README.

Can you please have a look now?

Signed-off-by: Subhobrata Dey <[email protected]>
Copy link
Member

@slinkydeveloper slinkydeveloper left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me this PR sounds good, @sbcd90 can you fix MUSL tests? Is paho-mqtt compatible with musl?

@Lazzaretti can you review this PR?

Comment on lines 1 to 2
//! This library provides Mqtt protocol bindings for CloudEvents
//! using the [paho.mqtt.rust](https://github.com/eclipse/paho.mqtt.rust) library.\\
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

up


match version {
MQTT_5 => {
self = self.properties(message_record.headers.clone());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can avoid clone destructuring message_record

Comment on lines 143 to 145
if let Some(s) = message_record.payload.as_ref() {
self = self.payload(s.to_vec());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to perform this copy, after destructing (as in the above comment) use a pattern matching here so you get ownership of s

Signed-off-by: Subhobrata Dey <[email protected]>
Signed-off-by: Subhobrata Dey <[email protected]>
@sbcd90
Copy link
Author

sbcd90 commented Jan 25, 2021

hi @slinkydeveloper , i fixed the remaining issues.
For musl, i get the error

Make sure you also have the development packages of openssl installed.
  For example, `libssl-dev` on Ubuntu or `openssl-devel` on Fedora.

@slinkydeveloper
Copy link
Member

@sbcd90 try to modify the github action here: https://github.com/cloudevents/sdk-rust/blob/master/.github/workflows/rust_tests.yml#L31 adding openssl dev package setup

serde_json = "^1.0"
futures = "^0.3"
tokio = { version = "^0.2", features = ["full"] }
clap = "2.33.1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Signed-off-by: Subhobrata Dey <[email protected]>
Signed-off-by: Subhobrata Dey <[email protected]>
Signed-off-by: Subhobrata Dey <[email protected]>
Signed-off-by: Subhobrata Dey <[email protected]>
Signed-off-by: Subhobrata Dey <[email protected]>
@sbcd90
Copy link
Author

sbcd90 commented Jan 27, 2021

hi @slinkydeveloper , the builds are fixed now.

@slinkydeveloper slinkydeveloper linked an issue Jan 28, 2021 that may be closed by this pull request
- Start the MQTT V3 Consumer

```
run --package <package-name> --bin <binary-name> -- --mode consumerV3 --broker tcp://localhost:1883 --topic test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is run? can we have some more specific examples?

Comment on lines +31 to +32
openssl-sys = "*"
openssl = { version = "*", features = ["vendored"] }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may we should add some version restrictions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Why in the main toml we need openssl?

Comment on lines +26 to +30
pub fn from_event(event: Event, version: &headers::MqttVersion) -> Result<Self> {
match version {
headers::MqttVersion::MQTT_5 => {
BinaryDeserializer::deserialize_binary(event, MessageRecord::new())
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understood MQTTv5 should support Binary and Structured Content Mode.
https://github.com/cloudevents/spec/blob/v1.0.1/mqtt-protocol-binding.md#32-structured-content-mode

So I would add a parameter to define the content mode (Binary or Structured) and not the MQTT version directly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More than version you would use the Encoding struct? sounds good

ctron added a commit to ctron/sdk-rust that referenced this pull request Mar 1, 2021
@slinkydeveloper slinkydeveloper removed this from the 0.4 milestone Jul 7, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Encoders for MQTT
4 participants