From 5d446f486aafd0f1fa2899ff15003892e1f06a02 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Sun, 8 Nov 2020 10:09:50 -0800 Subject: [PATCH 01/19] #9 Encoders for MQTT Signed-off-by: Subhobrata Dey --- Cargo.toml | 4 +- cloudevents-sdk-mqtt/Cargo.toml | 18 ++ cloudevents-sdk-mqtt/src/headers.rs | 35 +++ cloudevents-sdk-mqtt/src/lib.rs | 14 + .../src/mqtt_consumer_record.rs | 225 +++++++++++++++ .../src/mqtt_producer_record.rs | 137 +++++++++ example-projects/mqtt-example/Cargo.toml | 18 ++ example-projects/mqtt-example/src/main.rs | 264 ++++++++++++++++++ 8 files changed, 714 insertions(+), 1 deletion(-) create mode 100644 cloudevents-sdk-mqtt/Cargo.toml create mode 100644 cloudevents-sdk-mqtt/src/headers.rs create mode 100644 cloudevents-sdk-mqtt/src/lib.rs create mode 100644 cloudevents-sdk-mqtt/src/mqtt_consumer_record.rs create mode 100644 cloudevents-sdk-mqtt/src/mqtt_producer_record.rs create mode 100644 example-projects/mqtt-example/Cargo.toml create mode 100644 example-projects/mqtt-example/src/main.rs diff --git a/Cargo.toml b/Cargo.toml index 01cbe5a9..7dd41ac9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,11 +45,13 @@ members = [ "cloudevents-sdk-actix-web", "cloudevents-sdk-reqwest", "cloudevents-sdk-rdkafka", - "cloudevents-sdk-warp" + "cloudevents-sdk-warp", + "cloudevents-sdk-mqtt" ] exclude = [ "example-projects/actix-web-example", "example-projects/reqwest-wasm-example", "example-projects/rdkafka-example", "example-projects/warp-example", + "example-projects/mqtt-example" ] \ No newline at end of file diff --git a/cloudevents-sdk-mqtt/Cargo.toml b/cloudevents-sdk-mqtt/Cargo.toml new file mode 100644 index 00000000..f01e197d --- /dev/null +++ b/cloudevents-sdk-mqtt/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "cloudevents-sdk-mqtt" +version = "0.2.0" +authors = ["Francesco Guardiani "] +license-file = "../LICENSE" +edition = "2018" +description = "CloudEvents official Rust SDK - Mqtt integration" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +cloudevents-sdk = { version = "0.2.0", path = ".." } +lazy_static = "1.4.0" +paho-mqtt = { path = "../../paho.mqtt.rust" } +chrono = { version = "^0.4", features = ["serde"] } + +[dev-dependencies] +serde_json = "^1.0" \ No newline at end of file diff --git a/cloudevents-sdk-mqtt/src/headers.rs b/cloudevents-sdk-mqtt/src/headers.rs new file mode 100644 index 00000000..f9f57a6c --- /dev/null +++ b/cloudevents-sdk-mqtt/src/headers.rs @@ -0,0 +1,35 @@ +use cloudevents::event::SpecVersion; +use lazy_static::lazy_static; +use std::collections::HashMap; + +macro_rules! attribute_name_to_header { + ($attribute:expr) => { + format!("ce_{}", $attribute) + }; +} + +fn attributes_to_headers(it: impl Iterator) -> HashMap<&'static str, String> { + it.map(|s| { + if s == "datacontenttype" { + (s, String::from("content-type")) + } else { + (s, attribute_name_to_header!(s)) + } + }) + .collect() +} + +lazy_static! { + pub(crate) static ref ATTRIBUTES_TO_MQTT_HEADERS: HashMap<&'static str, String> = + attributes_to_headers(SpecVersion::all_attribute_names()); +} + +pub(crate) static SPEC_VERSION_HEADER: &'static str = "ce_specversion"; +pub(crate) static CLOUDEVENTS_JSON_HEADER: &'static str = "application/cloudevents+json"; +pub(crate) static CONTENT_TYPE: &'static str = "content-type"; + +pub enum MqttVersion { + V3_1, + V3_1_1, + V5, +} \ No newline at end of file diff --git a/cloudevents-sdk-mqtt/src/lib.rs b/cloudevents-sdk-mqtt/src/lib.rs new file mode 100644 index 00000000..2499b500 --- /dev/null +++ b/cloudevents-sdk-mqtt/src/lib.rs @@ -0,0 +1,14 @@ +//! This library provides Mqtt protocol bindings for CloudEvents +//! using the [paho.mqtt.rust](https://github.com/eclipse/paho.mqtt.rust) library.\\ +#[macro_use] +mod headers; +mod mqtt_producer_record; +mod mqtt_consumer_record; + +pub use mqtt_consumer_record::record_to_event; +pub use mqtt_consumer_record::ConsumerMessageDeserializer; +pub use mqtt_consumer_record::MessageExt; + +pub use mqtt_producer_record::MessageBuilderExt; +pub use mqtt_producer_record::MessageRecord; +pub use headers::MqttVersion; \ No newline at end of file diff --git a/cloudevents-sdk-mqtt/src/mqtt_consumer_record.rs b/cloudevents-sdk-mqtt/src/mqtt_consumer_record.rs new file mode 100644 index 00000000..2ad83b3e --- /dev/null +++ b/cloudevents-sdk-mqtt/src/mqtt_consumer_record.rs @@ -0,0 +1,225 @@ +use crate::headers; +use cloudevents::event::SpecVersion; +use cloudevents::message::{Result, BinarySerializer, BinaryDeserializer, MessageAttributeValue, + MessageDeserializer, Encoding, StructuredSerializer, StructuredDeserializer}; +use cloudevents::{message, Event}; +use paho_mqtt::{Message, PropertyCode}; +use std::collections::HashMap; +use std::convert::TryFrom; +use std::str; + +pub struct ConsumerMessageDeserializer { + pub(crate) headers: HashMap>, + pub(crate) payload: Option>, +} + +impl ConsumerMessageDeserializer { + fn get_mqtt_headers(message: &Message) -> Result>> { + let mut hm = HashMap::new(); + let prop_iterator = message.properties().iter(PropertyCode::UserProperty); + + for property in prop_iterator { + let header = property.get_string_pair().unwrap(); + hm.insert(header.0.to_string(), Vec::from(header.1)); + } + + Ok(hm) + } + + pub fn new(message: &Message) -> Result { + Ok(ConsumerMessageDeserializer { + headers: Self::get_mqtt_headers(message)?, + payload: Some(message.payload()).map(|s| Vec::from(s)), + }) + } +} + +impl BinaryDeserializer for ConsumerMessageDeserializer { + fn deserialize_binary>(mut self, mut visitor: V) -> Result { + if self.encoding() != Encoding::BINARY { + return Err(message::Error::WrongEncoding {}) + } + + let spec_version = SpecVersion::try_from( + str::from_utf8(&self.headers.remove(headers::SPEC_VERSION_HEADER).unwrap()[..]) + .map_err(|e| cloudevents::message::Error::Other { + source: Box::new(e), + })?, + )?; + + visitor = visitor.set_spec_version(spec_version.clone())?; + + let attributes = spec_version.attribute_names(); + + if let Some(hv) = self.headers.remove(headers::CONTENT_TYPE) { + visitor = visitor.set_attribute( + "datacontenttype", + MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| { + cloudevents::message::Error::Other { + source: Box::new(e), + } + })?), + )? + } + + for (hn, hv) in self + .headers + .into_iter() + .filter(|(hn, _)| headers::SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_")) + { + let name = &hn["ce_".len()..]; + + if attributes.contains(&name) { + visitor = visitor.set_attribute( + name, + MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| { + cloudevents::message::Error::Other { + source: Box::new(e), + } + })?), + )? + } else { + visitor = visitor.set_extension( + name, + MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| { + cloudevents::message::Error::Other { + source: Box::new(e), + } + })?), + )? + } + } + + if self.payload != None { + visitor.end_with_data(self.payload.unwrap()) + } else { + visitor.end() + } + } +} + +impl StructuredDeserializer for ConsumerMessageDeserializer { + fn deserialize_structured>(self, visitor: V) -> Result { + visitor.set_structured_event(self.payload.unwrap()) + } +} + +impl MessageDeserializer for ConsumerMessageDeserializer { + fn encoding(&self) -> Encoding { + match ( + self.headers + .get("content-type") + .map(|s| String::from_utf8(s.to_vec()).ok()) + .flatten() + .map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER)) + .unwrap_or(false), + self.headers.get(headers::SPEC_VERSION_HEADER), + ) { + (true, _) => Encoding::STRUCTURED, + (_, Some(_)) => Encoding::BINARY, + _ => Encoding::UNKNOWN, + } + } +} + +pub fn record_to_event(msg: &Message, version: headers::MqttVersion) -> Result { + match version { + headers::MqttVersion::V5 => BinaryDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?), + headers::MqttVersion::V3_1 => StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?), + headers::MqttVersion::V3_1_1 => StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?), + } +} + +pub trait MessageExt { + fn to_event(&self, version: headers::MqttVersion) -> Result; +} + +impl MessageExt for Message { + fn to_event(&self, version: headers::MqttVersion) -> Result { + record_to_event(self, version) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::mqtt_producer_record::MessageRecord; + + use chrono::Utc; + use cloudevents::{EventBuilder, EventBuilderV10}; + use crate::MessageBuilderExt; + use serde_json::json; + use cloudevents::event::Data; + + #[test] + fn test_binary_record() { + let time = Utc::now(); + + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .time(time) + .source("http://localhost") + .data("application/json", + Data::Binary(String::from("{\"hello\":\"world\"}").into_bytes())) + .extension("someint", "10") + .build() + .unwrap(); + + let message_record = MessageRecord::from_event( + EventBuilderV10::new() + .id("0001") + .ty("example.test") + .time(time) + .source("http://localhost") + .extension("someint", "10") + .data("application/json", json!({"hello": "world"})) + .build() + .unwrap(), + headers::MqttVersion::V5, + ) + .unwrap(); + + let msg = MessageBuilder::new() + .topic("test") + .message_record(&message_record) + .qos(1) + .finalize(); + + assert_eq!(msg.to_event(headers::MqttVersion::V5).unwrap(), expected) + } + + #[test] + fn test_structured_record() { + let j = json!({"hello": "world"}); + + let expected = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost") + .data("application/cloudevents+json", j.clone()) + .extension("someint", "10") + .build() + .unwrap(); + + let input = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .source("http://localhost") + .data("application/cloudevents+json", j.clone()) + .extension("someint", "10") + .build() + .unwrap(); + + let serialized_event = + StructuredDeserializer::deserialize_structured(input, MessageRecord::new()).unwrap(); + + let msg = MessageBuilder::new() + .topic("test") + .message_record(&serialized_event) + .qos(1) + .finalize(); + + assert_eq!(msg.to_event(headers::MqttVersion::V3_1_1).unwrap(), expected) + } +} \ No newline at end of file diff --git a/cloudevents-sdk-mqtt/src/mqtt_producer_record.rs b/cloudevents-sdk-mqtt/src/mqtt_producer_record.rs new file mode 100644 index 00000000..1b894dc0 --- /dev/null +++ b/cloudevents-sdk-mqtt/src/mqtt_producer_record.rs @@ -0,0 +1,137 @@ +use super::headers; +use paho_mqtt::{Properties, Property, PropertyCode, MessageBuilder}; +use cloudevents::message::{BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, + StructuredDeserializer, StructuredSerializer, Error}; +use cloudevents::Event; +use cloudevents::event::SpecVersion; +use std::option::Option::Some; + +pub struct MessageRecord { + pub(crate) headers: Properties, + pub(crate) payload: Option>, +} + +impl MessageRecord { + /// Create a new empty [`MessageRecord`] + pub fn new() -> Self { + MessageRecord { + headers: Properties::new(), + payload: None, + } + } + + pub fn from_event(event: Event, version: headers::MqttVersion) -> Result { + match version { + headers::MqttVersion::V5 => BinaryDeserializer::deserialize_binary(event, MessageRecord::new()), + headers::MqttVersion::V3_1 => StructuredDeserializer::deserialize_structured(event, MessageRecord::new()), + headers::MqttVersion::V3_1_1 => StructuredDeserializer::deserialize_structured(event, MessageRecord::new()), + } + } +} + +impl BinarySerializer for MessageRecord { + fn set_spec_version(mut self, spec_version: SpecVersion) -> Result { + match Property::new_string_pair(PropertyCode::UserProperty, headers::SPEC_VERSION_HEADER, + spec_version.as_str()) { + Ok(property) => { + match self.headers.push(property) { + Err(e) => Err(Error::Other { + source: Box::new(e) + }), + _ => Ok(self) + } + }, + _ => Err(Error::UnrecognizedAttributeName { + name: headers::SPEC_VERSION_HEADER.to_string() + }) + } + } + + fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result { + match Property::new_string_pair(PropertyCode::UserProperty, &headers::ATTRIBUTES_TO_MQTT_HEADERS + .get(name) + .ok_or(cloudevents::message::Error::UnrecognizedAttributeName { + name: String::from(name), + })? + .clone()[..], + &value.to_string()[..]) { + Ok(property) => { + match self.headers.push(property) { + Err(e) => Err(Error::Other { + source: Box::new(e) + }), + _ => Ok(self) + } + }, + _ => Err(Error::UnrecognizedAttributeName { + name: headers::SPEC_VERSION_HEADER.to_string() + }) + } + } + + fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result { + match Property::new_string_pair(PropertyCode::UserProperty, + &attribute_name_to_header!(name)[..], + &value.to_string()[..]) { + Ok(property) => { + match self.headers.push(property) { + Err(e) => Err(Error::Other { + source: Box::new(e) + }), + _ => Ok(self) + } + }, + _ => Err(Error::UnrecognizedAttributeName { + name: headers::SPEC_VERSION_HEADER.to_string() + }) + } + } + + fn end_with_data(mut self, bytes: Vec) -> Result { + self.payload = Some(bytes); + + Ok(self) + } + + fn end(self) -> Result { + Ok(self) + } +} + +impl StructuredSerializer for MessageRecord { + fn set_structured_event(mut self, bytes: Vec) -> Result { + match Property::new_string_pair(PropertyCode::UserProperty, + headers::CONTENT_TYPE, headers::CLOUDEVENTS_JSON_HEADER) { + Ok(property) => { + match self.headers.push(property) { + _ => () + } + }, + _ => () + } + self.payload = Some(bytes); + + Ok(self) + } +} + +pub trait MessageBuilderExt { + fn message_record( + self, + message_record: & MessageRecord, + ) -> MessageBuilder; +} + +impl MessageBuilderExt for MessageBuilder { + fn message_record(mut self, + message_record: & MessageRecord + ) -> MessageBuilder { + self = self.properties(message_record.headers.clone()); + + if let Some(s) = message_record.payload.as_ref() { + self = self.payload(s.to_vec()); + } + + self + } +} \ No newline at end of file diff --git a/example-projects/mqtt-example/Cargo.toml b/example-projects/mqtt-example/Cargo.toml new file mode 100644 index 00000000..dce76b2e --- /dev/null +++ b/example-projects/mqtt-example/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "mqtt-example" +version = "0.2.0" +authors = ["Subhobrata Dey "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-trait = "^0.1.33" +cloudevents-sdk = { path = "../sdk-rust" } +cloudevents-sdk-mqtt = { path = "../sdk-rust/cloudevents-sdk-mqtt"} +env_logger = "0.7.1" +paho-mqtt = { path = "../paho.mqtt.rust" } +serde_json = "^1.0" +futures = "^0.3" +tokio = { version = "^0.2", features = ["full"] } +clap = "2.33.1" \ No newline at end of file diff --git a/example-projects/mqtt-example/src/main.rs b/example-projects/mqtt-example/src/main.rs new file mode 100644 index 00000000..42456f2f --- /dev/null +++ b/example-projects/mqtt-example/src/main.rs @@ -0,0 +1,264 @@ +use clap::{App, Arg}; +use std::process; +use futures::executor::block_on; +use paho_mqtt as mqtt; +use tokio::time::Duration; +use serde_json::json; +use std::option::Option::Some; +use tokio::stream::StreamExt; +use cloudevents::{EventBuilderV10, EventBuilder}; +use cloudevents_sdk_mqtt::{MessageRecord, MessageBuilderExt, MqttVersion, MessageExt}; + +fn consume_v3(broker: &str, topic_name: &str) { + + let create_opts = mqtt::CreateOptionsBuilder::new() + .server_uri(broker) + .client_id("rust_async_consumer") + .finalize(); + + let mut cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| { + println!("Error creating the client: {:?}", e); + process::exit(1); + }); + + if let Err(err) = block_on(async { + let mut strm = cli.get_stream(25); + + let conn_opts = mqtt::ConnectOptionsBuilder::new() + .keep_alive_interval(Duration::from_secs(20)) + .clean_session(false) + .finalize(); + + println!("Connecting to the MQTT server..."); + cli.connect(conn_opts).await?; + + println!("Subscribing to topics: {:?}", topic_name); + cli.subscribe(topic_name, 1).await?; + + println!("Waiting for messages..."); + + while let Some(msg_opt) = strm.next().await { + if let Some(msg) = msg_opt { + let event = msg.to_event(MqttVersion::V3_1_1).unwrap(); + println!("Received Event: {:#?}", event); + } + else { + // A "None" means we were disconnected. Try to reconnect... + println!("Lost connection. Attempting reconnect."); + while let Err(_err) = cli.reconnect().await { + // For tokio use: tokio::time::delay_for() + tokio::time::delay_for(Duration::from_millis(1000)).await; + } + } + } + + Ok::<(), mqtt::Error>(()) + }) { + eprintln!("{}", err); + } +} + +fn consume_v5(broker: &str, topic_name: &str) { + + let create_opts = mqtt::CreateOptionsBuilder::new() + .server_uri(broker) + .client_id("rust_async_consumer") + .mqtt_version(5) + .finalize(); + + let mut cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| { + println!("Error creating the client: {:?}", e); + process::exit(1); + }); + + if let Err(err) = block_on(async { + let mut strm = cli.get_stream(25); + + let conn_opts = mqtt::ConnectOptionsBuilder::new() + .keep_alive_interval(Duration::from_secs(20)) + .clean_session(false) + .mqtt_version(5) + .finalize(); + + println!("Connecting to the MQTT server..."); + cli.connect(conn_opts).await?; + + println!("Subscribing to topics: {:?}", topic_name); + cli.subscribe(topic_name, 1).await?; + + println!("Waiting for messages..."); + + while let Some(msg_opt) = strm.next().await { + if let Some(msg) = msg_opt { + let event = msg.to_event(MqttVersion::V5).unwrap(); + println!("Received Event: {:#?}", event); + } + else { + // A "None" means we were disconnected. Try to reconnect... + println!("Lost connection. Attempting reconnect."); + while let Err(_err) = cli.reconnect().await { + // For tokio use: tokio::time::delay_for() + tokio::time::delay_for(Duration::from_millis(1000)).await; + } + } + } + + Ok::<(), mqtt::Error>(()) + }) { + eprintln!("{}", err); + } +} + +fn produce_v3(broker: &str, topic_name: &str) { + env_logger::init(); + + let create_opts = mqtt::CreateOptionsBuilder::new() + .server_uri(broker) + .finalize(); + + let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|_err| { + process::exit(1); + }); + + if let Err(err) = block_on(async { + let conn_opts = mqtt::ConnectOptions::new(); + + cli.connect(conn_opts).await?; + + println!("Publishing a message on the topic"); + + let event = EventBuilderV10::new() + .id("1".to_string()) + .ty("example.test") + .source("http://localhost/") + .data("application/json", json!({"hello": "world"})) + .build() + .unwrap(); + + let message_record = + MessageRecord::from_event(event, MqttVersion::V3_1_1).expect("error while serializing the event"); + + // Create a message and publish it + let msg = mqtt::MessageBuilder::new() + .topic(topic_name) + .message_record(&message_record) + .qos(1) + .finalize(); + + cli.publish(msg).await?; + + cli.disconnect(None).await?; + + Ok::<(), mqtt::Error>(()) + }) { + eprintln!("{}", err); + } +} + +fn produce_v5(broker: &str, topic_name: &str) { + env_logger::init(); + + let create_opts = mqtt::CreateOptionsBuilder::new() + .server_uri(broker) + .mqtt_version(5) + .finalize(); + + let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|_err| { + process::exit(1); + }); + + if let Err(err) = block_on(async { + let conn_opts = mqtt::ConnectOptionsBuilder::new() + .mqtt_version(5) + .finalize(); + + cli.connect(conn_opts).await?; + + println!("Publishing a message on the topic"); + + let event = EventBuilderV10::new() + .id("1".to_string()) + .ty("example.test") + .source("http://localhost/") + .data("application/json", json!({"hello": "world"})) + .build() + .unwrap(); + + let message_record = + MessageRecord::from_event(event, MqttVersion::V5).expect("error while serializing the event"); + + // Create a message and publish it + let msg = mqtt::MessageBuilder::new() + .topic(topic_name) + .message_record(&message_record) + .qos(1) + .finalize(); + + cli.publish(msg).await?; + + cli.disconnect(None).await?; + + Ok::<(), mqtt::Error>(()) + }) { + eprintln!("{}", err); + } +} + +fn main() { + let selector = App::new("CloudEvents Mqtt Example") + .version(option_env!("CARGO_PKG_VERSION").unwrap_or("")) + .about("select consumer or producer") + .arg( + Arg::with_name("mode") + .long("mode") + .help("enter \"consmer\" or \"producer\"") + .takes_value(true) + .possible_values(&["consumerV3", "producerV3", "consumerV5", "producerV5"]) + .required(true), + ) + .arg( + Arg::with_name("topic") + .long("topic") + .help("Mqtt topic") + .takes_value(true) + .required(true), + ) + .arg( + Arg::with_name("broker") + .short("b") + .long("broker") + .help("Broker list in mqtt format") + .takes_value(true) + .default_value("tcp://localhost:1883"), + ) + .get_matches(); + + + match selector.value_of("mode").unwrap() { + "producerV3" => { + produce_v3( + selector.value_of("broker").unwrap(), + selector.value_of("topic").unwrap(), + ) + } + "consumerV3" => { + consume_v3( + selector.value_of("broker").unwrap(), + selector.value_of("topic").unwrap(), + ) + } + "producerV5" => { + produce_v5( + selector.value_of("broker").unwrap(), + selector.value_of("topic").unwrap(), + ) + } + "consumerV5" => { + consume_v5( + selector.value_of("broker").unwrap(), + selector.value_of("topic").unwrap(), + ) + } + _ => (), + } +} \ No newline at end of file From f5302ed46ce7d14d1d62dbccff7b398cb7e39f61 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Sun, 8 Nov 2020 16:01:17 -0800 Subject: [PATCH 02/19] 9 Encoders for MQTT Signed-off-by: Subhobrata Dey --- cloudevents-sdk-mqtt/Cargo.toml | 6 +- cloudevents-sdk-mqtt/src/headers.rs | 4 +- cloudevents-sdk-mqtt/src/lib.rs | 4 +- .../src/mqtt_consumer_record.rs | 40 ++++-- .../src/mqtt_producer_record.rs | 134 +++++++++--------- 5 files changed, 104 insertions(+), 84 deletions(-) diff --git a/cloudevents-sdk-mqtt/Cargo.toml b/cloudevents-sdk-mqtt/Cargo.toml index f01e197d..adaaab45 100644 --- a/cloudevents-sdk-mqtt/Cargo.toml +++ b/cloudevents-sdk-mqtt/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "cloudevents-sdk-mqtt" -version = "0.2.0" +version = "0.3.0" authors = ["Francesco Guardiani "] license-file = "../LICENSE" edition = "2018" @@ -9,9 +9,9 @@ description = "CloudEvents official Rust SDK - Mqtt integration" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -cloudevents-sdk = { version = "0.2.0", path = ".." } +cloudevents-sdk = { version = "0.3.0", path = ".." } lazy_static = "1.4.0" -paho-mqtt = { path = "../../paho.mqtt.rust" } +paho-mqtt = { git = "https://github.com/eclipse/paho.mqtt.rust.git" } chrono = { version = "^0.4", features = ["serde"] } [dev-dependencies] diff --git a/cloudevents-sdk-mqtt/src/headers.rs b/cloudevents-sdk-mqtt/src/headers.rs index f9f57a6c..4a7a9bdb 100644 --- a/cloudevents-sdk-mqtt/src/headers.rs +++ b/cloudevents-sdk-mqtt/src/headers.rs @@ -16,7 +16,7 @@ fn attributes_to_headers(it: impl Iterator) -> HashMap<&'st (s, attribute_name_to_header!(s)) } }) - .collect() + .collect() } lazy_static! { @@ -32,4 +32,4 @@ pub enum MqttVersion { V3_1, V3_1_1, V5, -} \ No newline at end of file +} diff --git a/cloudevents-sdk-mqtt/src/lib.rs b/cloudevents-sdk-mqtt/src/lib.rs index 2499b500..ff58243a 100644 --- a/cloudevents-sdk-mqtt/src/lib.rs +++ b/cloudevents-sdk-mqtt/src/lib.rs @@ -2,13 +2,13 @@ //! using the [paho.mqtt.rust](https://github.com/eclipse/paho.mqtt.rust) library.\\ #[macro_use] mod headers; -mod mqtt_producer_record; mod mqtt_consumer_record; +mod mqtt_producer_record; pub use mqtt_consumer_record::record_to_event; pub use mqtt_consumer_record::ConsumerMessageDeserializer; pub use mqtt_consumer_record::MessageExt; +pub use headers::MqttVersion; pub use mqtt_producer_record::MessageBuilderExt; pub use mqtt_producer_record::MessageRecord; -pub use headers::MqttVersion; \ No newline at end of file diff --git a/cloudevents-sdk-mqtt/src/mqtt_consumer_record.rs b/cloudevents-sdk-mqtt/src/mqtt_consumer_record.rs index 2ad83b3e..4c18a94c 100644 --- a/cloudevents-sdk-mqtt/src/mqtt_consumer_record.rs +++ b/cloudevents-sdk-mqtt/src/mqtt_consumer_record.rs @@ -1,7 +1,9 @@ use crate::headers; use cloudevents::event::SpecVersion; -use cloudevents::message::{Result, BinarySerializer, BinaryDeserializer, MessageAttributeValue, - MessageDeserializer, Encoding, StructuredSerializer, StructuredDeserializer}; +use cloudevents::message::{ + BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer, + Result, StructuredDeserializer, StructuredSerializer, +}; use cloudevents::{message, Event}; use paho_mqtt::{Message, PropertyCode}; use std::collections::HashMap; @@ -37,7 +39,7 @@ impl ConsumerMessageDeserializer { impl BinaryDeserializer for ConsumerMessageDeserializer { fn deserialize_binary>(mut self, mut visitor: V) -> Result { if self.encoding() != Encoding::BINARY { - return Err(message::Error::WrongEncoding {}) + return Err(message::Error::WrongEncoding {}); } let spec_version = SpecVersion::try_from( @@ -124,9 +126,15 @@ impl MessageDeserializer for ConsumerMessageDeserializer { pub fn record_to_event(msg: &Message, version: headers::MqttVersion) -> Result { match version { - headers::MqttVersion::V5 => BinaryDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?), - headers::MqttVersion::V3_1 => StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?), - headers::MqttVersion::V3_1_1 => StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?), + headers::MqttVersion::V5 => { + BinaryDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?) + } + headers::MqttVersion::V3_1 => { + StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?) + } + headers::MqttVersion::V3_1_1 => { + StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?) + } } } @@ -145,11 +153,12 @@ mod tests { use super::*; use crate::mqtt_producer_record::MessageRecord; + use crate::MessageBuilderExt; use chrono::Utc; + use cloudevents::event::Data; use cloudevents::{EventBuilder, EventBuilderV10}; - use crate::MessageBuilderExt; + use paho_mqtt::MessageBuilder; use serde_json::json; - use cloudevents::event::Data; #[test] fn test_binary_record() { @@ -160,8 +169,10 @@ mod tests { .ty("example.test") .time(time) .source("http://localhost") - .data("application/json", - Data::Binary(String::from("{\"hello\":\"world\"}").into_bytes())) + .data( + "application/json", + Data::Binary(String::from("{\"hello\":\"world\"}").into_bytes()), + ) .extension("someint", "10") .build() .unwrap(); @@ -178,7 +189,7 @@ mod tests { .unwrap(), headers::MqttVersion::V5, ) - .unwrap(); + .unwrap(); let msg = MessageBuilder::new() .topic("test") @@ -220,6 +231,9 @@ mod tests { .qos(1) .finalize(); - assert_eq!(msg.to_event(headers::MqttVersion::V3_1_1).unwrap(), expected) + assert_eq!( + msg.to_event(headers::MqttVersion::V3_1_1).unwrap(), + expected + ) } -} \ No newline at end of file +} diff --git a/cloudevents-sdk-mqtt/src/mqtt_producer_record.rs b/cloudevents-sdk-mqtt/src/mqtt_producer_record.rs index 1b894dc0..387b813f 100644 --- a/cloudevents-sdk-mqtt/src/mqtt_producer_record.rs +++ b/cloudevents-sdk-mqtt/src/mqtt_producer_record.rs @@ -1,9 +1,11 @@ use super::headers; -use paho_mqtt::{Properties, Property, PropertyCode, MessageBuilder}; -use cloudevents::message::{BinaryDeserializer, BinarySerializer, MessageAttributeValue, Result, - StructuredDeserializer, StructuredSerializer, Error}; -use cloudevents::Event; use cloudevents::event::SpecVersion; +use cloudevents::message::{ + BinaryDeserializer, BinarySerializer, Error, MessageAttributeValue, Result, + StructuredDeserializer, StructuredSerializer, +}; +use cloudevents::Event; +use paho_mqtt::{MessageBuilder, Properties, Property, PropertyCode}; use std::option::Option::Some; pub struct MessageRecord { @@ -22,68 +24,76 @@ impl MessageRecord { pub fn from_event(event: Event, version: headers::MqttVersion) -> Result { match version { - headers::MqttVersion::V5 => BinaryDeserializer::deserialize_binary(event, MessageRecord::new()), - headers::MqttVersion::V3_1 => StructuredDeserializer::deserialize_structured(event, MessageRecord::new()), - headers::MqttVersion::V3_1_1 => StructuredDeserializer::deserialize_structured(event, MessageRecord::new()), + headers::MqttVersion::V5 => { + BinaryDeserializer::deserialize_binary(event, MessageRecord::new()) + } + headers::MqttVersion::V3_1 => { + StructuredDeserializer::deserialize_structured(event, MessageRecord::new()) + } + headers::MqttVersion::V3_1_1 => { + StructuredDeserializer::deserialize_structured(event, MessageRecord::new()) + } } } } impl BinarySerializer for MessageRecord { fn set_spec_version(mut self, spec_version: SpecVersion) -> Result { - match Property::new_string_pair(PropertyCode::UserProperty, headers::SPEC_VERSION_HEADER, - spec_version.as_str()) { - Ok(property) => { - match self.headers.push(property) { - Err(e) => Err(Error::Other { - source: Box::new(e) - }), - _ => Ok(self) - } + match Property::new_string_pair( + PropertyCode::UserProperty, + headers::SPEC_VERSION_HEADER, + spec_version.as_str(), + ) { + Ok(property) => match self.headers.push(property) { + Err(e) => Err(Error::Other { + source: Box::new(e), + }), + _ => Ok(self), }, - _ => Err(Error::UnrecognizedAttributeName { - name: headers::SPEC_VERSION_HEADER.to_string() - }) + _ => Err(Error::UnknownAttribute { + name: headers::SPEC_VERSION_HEADER.to_string(), + }), } } fn set_attribute(mut self, name: &str, value: MessageAttributeValue) -> Result { - match Property::new_string_pair(PropertyCode::UserProperty, &headers::ATTRIBUTES_TO_MQTT_HEADERS - .get(name) - .ok_or(cloudevents::message::Error::UnrecognizedAttributeName { - name: String::from(name), - })? - .clone()[..], - &value.to_string()[..]) { - Ok(property) => { - match self.headers.push(property) { - Err(e) => Err(Error::Other { - source: Box::new(e) - }), - _ => Ok(self) - } + match Property::new_string_pair( + PropertyCode::UserProperty, + &headers::ATTRIBUTES_TO_MQTT_HEADERS + .get(name) + .ok_or(cloudevents::message::Error::UnknownAttribute { + name: String::from(name), + })? + .clone()[..], + &value.to_string()[..], + ) { + Ok(property) => match self.headers.push(property) { + Err(e) => Err(Error::Other { + source: Box::new(e), + }), + _ => Ok(self), }, - _ => Err(Error::UnrecognizedAttributeName { - name: headers::SPEC_VERSION_HEADER.to_string() - }) + _ => Err(Error::UnknownAttribute { + name: headers::SPEC_VERSION_HEADER.to_string(), + }), } } fn set_extension(mut self, name: &str, value: MessageAttributeValue) -> Result { - match Property::new_string_pair(PropertyCode::UserProperty, - &attribute_name_to_header!(name)[..], - &value.to_string()[..]) { - Ok(property) => { - match self.headers.push(property) { - Err(e) => Err(Error::Other { - source: Box::new(e) - }), - _ => Ok(self) - } + match Property::new_string_pair( + PropertyCode::UserProperty, + &attribute_name_to_header!(name)[..], + &value.to_string()[..], + ) { + Ok(property) => match self.headers.push(property) { + Err(e) => Err(Error::Other { + source: Box::new(e), + }), + _ => Ok(self), }, - _ => Err(Error::UnrecognizedAttributeName { - name: headers::SPEC_VERSION_HEADER.to_string() - }) + _ => Err(Error::UnknownAttribute { + name: headers::SPEC_VERSION_HEADER.to_string(), + }), } } @@ -100,14 +110,15 @@ impl BinarySerializer for MessageRecord { impl StructuredSerializer for MessageRecord { fn set_structured_event(mut self, bytes: Vec) -> Result { - match Property::new_string_pair(PropertyCode::UserProperty, - headers::CONTENT_TYPE, headers::CLOUDEVENTS_JSON_HEADER) { - Ok(property) => { - match self.headers.push(property) { - _ => () - } + match Property::new_string_pair( + PropertyCode::UserProperty, + headers::CONTENT_TYPE, + headers::CLOUDEVENTS_JSON_HEADER, + ) { + Ok(property) => match self.headers.push(property) { + _ => (), }, - _ => () + _ => (), } self.payload = Some(bytes); @@ -116,16 +127,11 @@ impl StructuredSerializer for MessageRecord { } pub trait MessageBuilderExt { - fn message_record( - self, - message_record: & MessageRecord, - ) -> MessageBuilder; + fn message_record(self, message_record: &MessageRecord) -> MessageBuilder; } impl MessageBuilderExt for MessageBuilder { - fn message_record(mut self, - message_record: & MessageRecord - ) -> MessageBuilder { + fn message_record(mut self, message_record: &MessageRecord) -> MessageBuilder { self = self.properties(message_record.headers.clone()); if let Some(s) = message_record.payload.as_ref() { @@ -134,4 +140,4 @@ impl MessageBuilderExt for MessageBuilder { self } -} \ No newline at end of file +} From a3ad7a9977cd7c0799ccd265a77f1b41c6855412 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Mon, 30 Nov 2020 12:14:32 -0800 Subject: [PATCH 03/19] 9 Encoders for MQTT Signed-off-by: Subhobrata Dey --- Cargo.toml | 4 ++-- .../Cargo.toml | 4 ++-- .../src/headers.rs | 0 .../src/lib.rs | 0 .../src/mqtt_consumer_record.rs | 0 .../src/mqtt_producer_record.rs | 0 .../{mqtt-example => paho-mqtt-example}/Cargo.toml | 4 ++-- .../{mqtt-example => paho-mqtt-example}/src/main.rs | 0 8 files changed, 6 insertions(+), 6 deletions(-) rename {cloudevents-sdk-mqtt => cloudevents-sdk-paho-mqtt}/Cargo.toml (82%) rename {cloudevents-sdk-mqtt => cloudevents-sdk-paho-mqtt}/src/headers.rs (100%) rename {cloudevents-sdk-mqtt => cloudevents-sdk-paho-mqtt}/src/lib.rs (100%) rename {cloudevents-sdk-mqtt => cloudevents-sdk-paho-mqtt}/src/mqtt_consumer_record.rs (100%) rename {cloudevents-sdk-mqtt => cloudevents-sdk-paho-mqtt}/src/mqtt_producer_record.rs (100%) rename example-projects/{mqtt-example => paho-mqtt-example}/Cargo.toml (87%) rename example-projects/{mqtt-example => paho-mqtt-example}/src/main.rs (100%) diff --git a/Cargo.toml b/Cargo.toml index 7dd41ac9..f65cbec1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,12 +46,12 @@ members = [ "cloudevents-sdk-reqwest", "cloudevents-sdk-rdkafka", "cloudevents-sdk-warp", - "cloudevents-sdk-mqtt" + "cloudevents-sdk-paho-mqtt" ] exclude = [ "example-projects/actix-web-example", "example-projects/reqwest-wasm-example", "example-projects/rdkafka-example", "example-projects/warp-example", - "example-projects/mqtt-example" + "example-projects/paho-mqtt-example" ] \ No newline at end of file diff --git a/cloudevents-sdk-mqtt/Cargo.toml b/cloudevents-sdk-paho-mqtt/Cargo.toml similarity index 82% rename from cloudevents-sdk-mqtt/Cargo.toml rename to cloudevents-sdk-paho-mqtt/Cargo.toml index adaaab45..160eb58d 100644 --- a/cloudevents-sdk-mqtt/Cargo.toml +++ b/cloudevents-sdk-paho-mqtt/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "cloudevents-sdk-mqtt" +name = "cloudevents-sdk-paho-mqtt" version = "0.3.0" authors = ["Francesco Guardiani "] license-file = "../LICENSE" @@ -11,7 +11,7 @@ description = "CloudEvents official Rust SDK - Mqtt integration" [dependencies] cloudevents-sdk = { version = "0.3.0", path = ".." } lazy_static = "1.4.0" -paho-mqtt = { git = "https://github.com/eclipse/paho.mqtt.rust.git" } +paho-mqtt = "0.8" chrono = { version = "^0.4", features = ["serde"] } [dev-dependencies] diff --git a/cloudevents-sdk-mqtt/src/headers.rs b/cloudevents-sdk-paho-mqtt/src/headers.rs similarity index 100% rename from cloudevents-sdk-mqtt/src/headers.rs rename to cloudevents-sdk-paho-mqtt/src/headers.rs diff --git a/cloudevents-sdk-mqtt/src/lib.rs b/cloudevents-sdk-paho-mqtt/src/lib.rs similarity index 100% rename from cloudevents-sdk-mqtt/src/lib.rs rename to cloudevents-sdk-paho-mqtt/src/lib.rs diff --git a/cloudevents-sdk-mqtt/src/mqtt_consumer_record.rs b/cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs similarity index 100% rename from cloudevents-sdk-mqtt/src/mqtt_consumer_record.rs rename to cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs diff --git a/cloudevents-sdk-mqtt/src/mqtt_producer_record.rs b/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs similarity index 100% rename from cloudevents-sdk-mqtt/src/mqtt_producer_record.rs rename to cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs diff --git a/example-projects/mqtt-example/Cargo.toml b/example-projects/paho-mqtt-example/Cargo.toml similarity index 87% rename from example-projects/mqtt-example/Cargo.toml rename to example-projects/paho-mqtt-example/Cargo.toml index dce76b2e..1f5ff62e 100644 --- a/example-projects/mqtt-example/Cargo.toml +++ b/example-projects/paho-mqtt-example/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "mqtt-example" +name = "paho-mqtt-example" version = "0.2.0" authors = ["Subhobrata Dey "] edition = "2018" @@ -11,7 +11,7 @@ async-trait = "^0.1.33" cloudevents-sdk = { path = "../sdk-rust" } cloudevents-sdk-mqtt = { path = "../sdk-rust/cloudevents-sdk-mqtt"} env_logger = "0.7.1" -paho-mqtt = { path = "../paho.mqtt.rust" } +paho-mqtt = "0.8" serde_json = "^1.0" futures = "^0.3" tokio = { version = "^0.2", features = ["full"] } diff --git a/example-projects/mqtt-example/src/main.rs b/example-projects/paho-mqtt-example/src/main.rs similarity index 100% rename from example-projects/mqtt-example/src/main.rs rename to example-projects/paho-mqtt-example/src/main.rs From dc040206433718e2700cefac572ec65940348acd Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Tue, 22 Dec 2020 11:32:32 -0800 Subject: [PATCH 04/19] #9 Encoders for MQTT Signed-off-by: Subhobrata Dey --- cloudevents-sdk-paho-mqtt/README.md | 42 +++ cloudevents-sdk-paho-mqtt/src/headers.rs | 5 +- .../src/mqtt_consumer_record.rs | 106 ++---- .../src/mqtt_producer_record.rs | 14 +- .../paho-mqtt-example/src/main.rs | 322 +++++++++--------- 5 files changed, 231 insertions(+), 258 deletions(-) create mode 100644 cloudevents-sdk-paho-mqtt/README.md diff --git a/cloudevents-sdk-paho-mqtt/README.md b/cloudevents-sdk-paho-mqtt/README.md new file mode 100644 index 00000000..78a81ce8 --- /dev/null +++ b/cloudevents-sdk-paho-mqtt/README.md @@ -0,0 +1,42 @@ +# CloudEvents SDK Rust - paho-mqtt [![Crates badge]][crates.io] [![Docs badge]][docs.rs] + +Integration of [CloudEvents SDK](https://github.com/cloudevents/sdk-rust/) with [paho-mqtt](https://www.eclipse.org/paho/). + +Look at [CloudEvents SDK README](https://github.com/cloudevents/sdk-rust/) for more info. + +## Development & Contributing + +If you're interested in contributing to sdk-rust, look at [Contributing documentation](../CONTRIBUTING.md) + +## Community + +## Sample usage + +- Check the example [paho-mqtt-example](../example-projects/paho-mqtt-example) + +### MQTT V3 +- Start the MQTT V3 Consumer + +``` +run --package --bin -- --mode consumerV3 --broker tcp://localhost:1883 --topic test +``` + +- Start the MQTT V3 Producer + +``` +run --package --bin -- --broker tcp://localhost:1883 --topic test --mode producerV3 +``` + +### MQTT V5 +- Start the MQTT V5 Consumer + +``` +run --package --bin -- --mode consumerV5 --broker tcp://localhost:1883 --topic test +``` + +- Start the MQTT V5 Producer + +``` +run --package --bin -- --broker tcp://localhost:1883 --topic test --mode producerV5 +``` + diff --git a/cloudevents-sdk-paho-mqtt/src/headers.rs b/cloudevents-sdk-paho-mqtt/src/headers.rs index 4a7a9bdb..733fcbdd 100644 --- a/cloudevents-sdk-paho-mqtt/src/headers.rs +++ b/cloudevents-sdk-paho-mqtt/src/headers.rs @@ -29,7 +29,6 @@ pub(crate) static CLOUDEVENTS_JSON_HEADER: &'static str = "application/cloudeven pub(crate) static CONTENT_TYPE: &'static str = "content-type"; pub enum MqttVersion { - V3_1, - V3_1_1, - V5, + MQTT_3, + MQTT_5, } diff --git a/cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs b/cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs index 4c18a94c..208400a8 100644 --- a/cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs +++ b/cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs @@ -5,90 +5,59 @@ use cloudevents::message::{ Result, StructuredDeserializer, StructuredSerializer, }; use cloudevents::{message, Event}; -use paho_mqtt::{Message, PropertyCode}; -use std::collections::HashMap; +use paho_mqtt::{Message, Properties, PropertyCode}; use std::convert::TryFrom; -use std::str; -pub struct ConsumerMessageDeserializer { - pub(crate) headers: HashMap>, +pub struct ConsumerMessageDeserializer<'a> { + pub(crate) headers: &'a Properties, pub(crate) payload: Option>, } -impl ConsumerMessageDeserializer { - fn get_mqtt_headers(message: &Message) -> Result>> { - let mut hm = HashMap::new(); - let prop_iterator = message.properties().iter(PropertyCode::UserProperty); - - for property in prop_iterator { - let header = property.get_string_pair().unwrap(); - hm.insert(header.0.to_string(), Vec::from(header.1)); - } - - Ok(hm) +impl<'a> ConsumerMessageDeserializer<'a> { + fn get_mqtt_headers(message: &Message) -> &Properties { + message.properties() } pub fn new(message: &Message) -> Result { Ok(ConsumerMessageDeserializer { - headers: Self::get_mqtt_headers(message)?, + headers: Self::get_mqtt_headers(message), payload: Some(message.payload()).map(|s| Vec::from(s)), }) } } -impl BinaryDeserializer for ConsumerMessageDeserializer { - fn deserialize_binary>(mut self, mut visitor: V) -> Result { +impl<'a> BinaryDeserializer for ConsumerMessageDeserializer<'a> { + fn deserialize_binary>(self, mut visitor: V) -> Result { if self.encoding() != Encoding::BINARY { return Err(message::Error::WrongEncoding {}); } let spec_version = SpecVersion::try_from( - str::from_utf8(&self.headers.remove(headers::SPEC_VERSION_HEADER).unwrap()[..]) - .map_err(|e| cloudevents::message::Error::Other { - source: Box::new(e), - })?, + self.headers + .find_user_property(headers::SPEC_VERSION_HEADER) + .unwrap() + .as_str(), )?; visitor = visitor.set_spec_version(spec_version.clone())?; let attributes = spec_version.attribute_names(); - if let Some(hv) = self.headers.remove(headers::CONTENT_TYPE) { - visitor = visitor.set_attribute( - "datacontenttype", - MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| { - cloudevents::message::Error::Other { - source: Box::new(e), - } - })?), - )? + if let Some(hv) = self.headers.find_user_property(headers::CONTENT_TYPE) { + visitor = visitor.set_attribute("datacontenttype", MessageAttributeValue::String(hv))? } for (hn, hv) in self .headers - .into_iter() + .user_iter() .filter(|(hn, _)| headers::SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_")) { let name = &hn["ce_".len()..]; if attributes.contains(&name) { - visitor = visitor.set_attribute( - name, - MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| { - cloudevents::message::Error::Other { - source: Box::new(e), - } - })?), - )? + visitor = visitor.set_attribute(name, MessageAttributeValue::String(hv))? } else { - visitor = visitor.set_extension( - name, - MessageAttributeValue::String(String::from_utf8(hv).map_err(|e| { - cloudevents::message::Error::Other { - source: Box::new(e), - } - })?), - )? + visitor = visitor.set_extension(name, MessageAttributeValue::String(hv))? } } @@ -100,51 +69,32 @@ impl BinaryDeserializer for ConsumerMessageDeserializer { } } -impl StructuredDeserializer for ConsumerMessageDeserializer { +impl<'a> StructuredDeserializer for ConsumerMessageDeserializer<'a> { fn deserialize_structured>(self, visitor: V) -> Result { visitor.set_structured_event(self.payload.unwrap()) } } -impl MessageDeserializer for ConsumerMessageDeserializer { +impl<'a> MessageDeserializer for ConsumerMessageDeserializer<'a> { fn encoding(&self) -> Encoding { - match ( - self.headers - .get("content-type") - .map(|s| String::from_utf8(s.to_vec()).ok()) - .flatten() - .map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER)) - .unwrap_or(false), - self.headers.get(headers::SPEC_VERSION_HEADER), - ) { - (true, _) => Encoding::STRUCTURED, - (_, Some(_)) => Encoding::BINARY, - _ => Encoding::UNKNOWN, + match self.headers.iter(PropertyCode::UserProperty).count() == 0 { + true => Encoding::STRUCTURED, + false => Encoding::BINARY, } } } -pub fn record_to_event(msg: &Message, version: headers::MqttVersion) -> Result { - match version { - headers::MqttVersion::V5 => { - BinaryDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?) - } - headers::MqttVersion::V3_1 => { - StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?) - } - headers::MqttVersion::V3_1_1 => { - StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?) - } - } +pub fn record_to_event(msg: &Message) -> Result { + MessageDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?) } pub trait MessageExt { - fn to_event(&self, version: headers::MqttVersion) -> Result; + fn to_event(&self) -> Result; } impl MessageExt for Message { - fn to_event(&self, version: headers::MqttVersion) -> Result { - record_to_event(self, version) + fn to_event(&self) -> Result { + record_to_event(self) } } diff --git a/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs b/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs index 387b813f..04618a1c 100644 --- a/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs +++ b/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs @@ -24,13 +24,10 @@ impl MessageRecord { pub fn from_event(event: Event, version: headers::MqttVersion) -> Result { match version { - headers::MqttVersion::V5 => { + headers::MqttVersion::MQTT_5 => { BinaryDeserializer::deserialize_binary(event, MessageRecord::new()) } - headers::MqttVersion::V3_1 => { - StructuredDeserializer::deserialize_structured(event, MessageRecord::new()) - } - headers::MqttVersion::V3_1_1 => { + headers::MqttVersion::MQTT_3 => { StructuredDeserializer::deserialize_structured(event, MessageRecord::new()) } } @@ -127,11 +124,14 @@ impl StructuredSerializer for MessageRecord { } pub trait MessageBuilderExt { - fn message_record(self, message_record: &MessageRecord) -> MessageBuilder; + fn event(self, event: Event, version: headers::MqttVersion) -> MessageBuilder; } impl MessageBuilderExt for MessageBuilder { - fn message_record(mut self, message_record: &MessageRecord) -> MessageBuilder { + fn event(mut self, event: Event, version: headers::MqttVersion) -> MessageBuilder { + let message_record = + MessageRecord::from_event(event, version).expect("error while serializing the event"); + self = self.properties(message_record.headers.clone()); if let Some(s) = message_record.payload.as_ref() { diff --git a/example-projects/paho-mqtt-example/src/main.rs b/example-projects/paho-mqtt-example/src/main.rs index 42456f2f..0f5a03f1 100644 --- a/example-projects/paho-mqtt-example/src/main.rs +++ b/example-projects/paho-mqtt-example/src/main.rs @@ -7,201 +7,136 @@ use serde_json::json; use std::option::Option::Some; use tokio::stream::StreamExt; use cloudevents::{EventBuilderV10, EventBuilder}; -use cloudevents_sdk_mqtt::{MessageRecord, MessageBuilderExt, MqttVersion, MessageExt}; +use cloudevents_sdk_paho_mqtt::{MessageBuilderExt, MessageExt, MqttVersion}; +use paho_mqtt::AsyncClient; -fn consume_v3(broker: &str, topic_name: &str) { +async fn consume_v3(cli: &mut AsyncClient, topic_name: &str) -> Result<(), mqtt::Error> { + let mut strm = cli.get_stream(25); - let create_opts = mqtt::CreateOptionsBuilder::new() - .server_uri(broker) - .client_id("rust_async_consumer") + let conn_opts = mqtt::ConnectOptionsBuilder::new() + .keep_alive_interval(Duration::from_secs(20)) + .clean_session(false) .finalize(); - let mut cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| { - println!("Error creating the client: {:?}", e); - process::exit(1); - }); + println!("Connecting to the MQTT server..."); + cli.connect(conn_opts).await?; - if let Err(err) = block_on(async { - let mut strm = cli.get_stream(25); + println!("Subscribing to topics: {:?}", topic_name); + cli.subscribe(topic_name, 1).await?; - let conn_opts = mqtt::ConnectOptionsBuilder::new() - .keep_alive_interval(Duration::from_secs(20)) - .clean_session(false) - .finalize(); + println!("Waiting for messages..."); - println!("Connecting to the MQTT server..."); - cli.connect(conn_opts).await?; - - println!("Subscribing to topics: {:?}", topic_name); - cli.subscribe(topic_name, 1).await?; - - println!("Waiting for messages..."); - - while let Some(msg_opt) = strm.next().await { - if let Some(msg) = msg_opt { - let event = msg.to_event(MqttVersion::V3_1_1).unwrap(); - println!("Received Event: {:#?}", event); - } - else { - // A "None" means we were disconnected. Try to reconnect... - println!("Lost connection. Attempting reconnect."); - while let Err(_err) = cli.reconnect().await { - // For tokio use: tokio::time::delay_for() - tokio::time::delay_for(Duration::from_millis(1000)).await; - } + while let Some(msg_opt) = strm.next().await { + if let Some(msg) = msg_opt { + let event = msg.to_event().unwrap(); + println!("Received Event: {:#?}", event); + } + else { + // A "None" means we were disconnected. Try to reconnect... + println!("Lost connection. Attempting reconnect."); + while let Err(_err) = cli.reconnect().await { + // For tokio use: tokio::time::delay_for() + tokio::time::delay_for(Duration::from_millis(1000)).await; } } - - Ok::<(), mqtt::Error>(()) - }) { - eprintln!("{}", err); } + + Ok::<(), mqtt::Error>(()) } -fn consume_v5(broker: &str, topic_name: &str) { +async fn consume_v5(cli: &mut AsyncClient, topic_name: &str) -> Result<(), mqtt::Error> { + let mut strm = cli.get_stream(25); - let create_opts = mqtt::CreateOptionsBuilder::new() - .server_uri(broker) - .client_id("rust_async_consumer") + let conn_opts = mqtt::ConnectOptionsBuilder::new() + .keep_alive_interval(Duration::from_secs(20)) + .clean_session(false) .mqtt_version(5) .finalize(); - let mut cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| { - println!("Error creating the client: {:?}", e); - process::exit(1); - }); - - if let Err(err) = block_on(async { - let mut strm = cli.get_stream(25); - - let conn_opts = mqtt::ConnectOptionsBuilder::new() - .keep_alive_interval(Duration::from_secs(20)) - .clean_session(false) - .mqtt_version(5) - .finalize(); - - println!("Connecting to the MQTT server..."); - cli.connect(conn_opts).await?; + println!("Connecting to the MQTT server..."); + cli.connect(conn_opts).await?; - println!("Subscribing to topics: {:?}", topic_name); - cli.subscribe(topic_name, 1).await?; + println!("Subscribing to topics: {:?}", topic_name); + cli.subscribe(topic_name, 1).await?; - println!("Waiting for messages..."); + println!("Waiting for messages..."); - while let Some(msg_opt) = strm.next().await { - if let Some(msg) = msg_opt { - let event = msg.to_event(MqttVersion::V5).unwrap(); - println!("Received Event: {:#?}", event); - } - else { - // A "None" means we were disconnected. Try to reconnect... - println!("Lost connection. Attempting reconnect."); - while let Err(_err) = cli.reconnect().await { - // For tokio use: tokio::time::delay_for() - tokio::time::delay_for(Duration::from_millis(1000)).await; - } + while let Some(msg_opt) = strm.next().await { + if let Some(msg) = msg_opt { + let event = msg.to_event().unwrap(); + println!("Received Event: {:#?}", event); + } + else { + // A "None" means we were disconnected. Try to reconnect... + println!("Lost connection. Attempting reconnect."); + while let Err(_err) = cli.reconnect().await { + // For tokio use: tokio::time::delay_for() + tokio::time::delay_for(Duration::from_millis(1000)).await; } } - - Ok::<(), mqtt::Error>(()) - }) { - eprintln!("{}", err); } -} - -fn produce_v3(broker: &str, topic_name: &str) { - env_logger::init(); - let create_opts = mqtt::CreateOptionsBuilder::new() - .server_uri(broker) - .finalize(); - - let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|_err| { - process::exit(1); - }); - - if let Err(err) = block_on(async { - let conn_opts = mqtt::ConnectOptions::new(); + Ok::<(), mqtt::Error>(()) +} - cli.connect(conn_opts).await?; +async fn produce_v3(cli: &AsyncClient, topic_name: &str) -> Result<(), mqtt::Error> { + let conn_opts = mqtt::ConnectOptions::new(); - println!("Publishing a message on the topic"); + cli.connect(conn_opts).await?; - let event = EventBuilderV10::new() - .id("1".to_string()) - .ty("example.test") - .source("http://localhost/") - .data("application/json", json!({"hello": "world"})) - .build() - .unwrap(); + println!("Publishing a message on the topic"); - let message_record = - MessageRecord::from_event(event, MqttVersion::V3_1_1).expect("error while serializing the event"); + let event = EventBuilderV10::new() + .id("1".to_string()) + .ty("example.test") + .source("http://localhost/") + .data("application/json", json!({"hello": "world"})) + .build() + .unwrap(); - // Create a message and publish it - let msg = mqtt::MessageBuilder::new() - .topic(topic_name) - .message_record(&message_record) - .qos(1) - .finalize(); + // Create a message and publish it + let msg = mqtt::MessageBuilder::new() + .topic(topic_name) + .event(event, MqttVersion::MQTT_3) + .qos(1) + .finalize(); - cli.publish(msg).await?; + cli.publish(msg).await?; - cli.disconnect(None).await?; + cli.disconnect(None).await?; - Ok::<(), mqtt::Error>(()) - }) { - eprintln!("{}", err); - } + Ok::<(), mqtt::Error>(()) } -fn produce_v5(broker: &str, topic_name: &str) { - env_logger::init(); - - let create_opts = mqtt::CreateOptionsBuilder::new() - .server_uri(broker) +async fn produce_v5(cli: &AsyncClient, topic_name: &str) -> Result<(), mqtt::Error> { + let conn_opts = mqtt::ConnectOptionsBuilder::new() .mqtt_version(5) .finalize(); - let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|_err| { - process::exit(1); - }); - - if let Err(err) = block_on(async { - let conn_opts = mqtt::ConnectOptionsBuilder::new() - .mqtt_version(5) - .finalize(); - - cli.connect(conn_opts).await?; - - println!("Publishing a message on the topic"); + cli.connect(conn_opts).await?; - let event = EventBuilderV10::new() - .id("1".to_string()) - .ty("example.test") - .source("http://localhost/") - .data("application/json", json!({"hello": "world"})) - .build() - .unwrap(); + println!("Publishing a message on the topic"); - let message_record = - MessageRecord::from_event(event, MqttVersion::V5).expect("error while serializing the event"); + let event = EventBuilderV10::new() + .id("1".to_string()) + .ty("example.test") + .source("http://localhost/") + .data("application/json", json!({"hello": "world"})) + .build() + .unwrap(); - // Create a message and publish it - let msg = mqtt::MessageBuilder::new() - .topic(topic_name) - .message_record(&message_record) - .qos(1) - .finalize(); + // Create a message and publish it + let msg = mqtt::MessageBuilder::new() + .topic(topic_name) + .event(event, MqttVersion::MQTT_5) + .qos(1) + .finalize(); - cli.publish(msg).await?; + cli.publish(msg).await?; - cli.disconnect(None).await?; + cli.disconnect(None).await?; - Ok::<(), mqtt::Error>(()) - }) { - eprintln!("{}", err); - } + Ok::<(), mqtt::Error>(()) } fn main() { @@ -211,9 +146,9 @@ fn main() { .arg( Arg::with_name("mode") .long("mode") - .help("enter \"consmer\" or \"producer\"") + .help("enter \"producerV3\" or \"producerV5\" or \"consumerV3\" or \"consumerV5\"") .takes_value(true) - .possible_values(&["consumerV3", "producerV3", "consumerV5", "producerV5"]) + .possible_values(&["producerV3", "producerV5", "consumerV3", "consumerV5"]) .required(true), ) .arg( @@ -236,28 +171,75 @@ fn main() { match selector.value_of("mode").unwrap() { "producerV3" => { - produce_v3( - selector.value_of("broker").unwrap(), - selector.value_of("topic").unwrap(), - ) + env_logger::init(); + + let create_opts = mqtt::CreateOptionsBuilder::new() + .server_uri(selector.value_of("broker").unwrap()) + .finalize(); + + let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|_err| { + process::exit(1); + }); + + if let Err(err) = block_on(produce_v3( + &cli, selector.value_of("topic").unwrap(), + )) { + eprintln!("{}", err); + } } "consumerV3" => { - consume_v3( - selector.value_of("broker").unwrap(), - selector.value_of("topic").unwrap(), - ) + let create_opts = mqtt::CreateOptionsBuilder::new() + .server_uri(selector.value_of("broker").unwrap()) + .client_id("rust_async_consumer") + .finalize(); + + let mut cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| { + println!("Error creating the client: {:?}", e); + process::exit(1); + }); + + if let Err(err) = block_on(consume_v3( + &mut cli, selector.value_of("topic").unwrap(), + )) { + eprintln!("{}", err); + } } "producerV5" => { - produce_v5( - selector.value_of("broker").unwrap(), - selector.value_of("topic").unwrap(), - ) + env_logger::init(); + + let create_opts = mqtt::CreateOptionsBuilder::new() + .server_uri(selector.value_of("broker").unwrap()) + .mqtt_version(5) + .finalize(); + + let cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|_err| { + process::exit(1); + }); + + if let Err(err) = block_on(produce_v5( + &cli, selector.value_of("topic").unwrap(), + )) { + eprintln!("{}", err); + } } "consumerV5" => { - consume_v5( - selector.value_of("broker").unwrap(), + let create_opts = mqtt::CreateOptionsBuilder::new() + .server_uri(selector.value_of("broker").unwrap()) + .client_id("rust_async_consumer") + .mqtt_version(5) + .finalize(); + + let mut cli = mqtt::AsyncClient::new(create_opts).unwrap_or_else(|e| { + println!("Error creating the client: {:?}", e); + process::exit(1); + }); + + if let Err(err) = block_on(consume_v5( + &mut cli, selector.value_of("topic").unwrap(), - ) + )) { + eprintln!("{}", err); + } } _ => (), } From 4f39c7985d4d391910afb684860309bf4c08c23b Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Tue, 22 Dec 2020 12:01:50 -0800 Subject: [PATCH 05/19] #9 Encoders for MQTT Signed-off-by: Subhobrata Dey --- .../src/mqtt_consumer_record.rs | 38 +++++++------------ .../src/mqtt_producer_record.rs | 12 ++++-- 2 files changed, 23 insertions(+), 27 deletions(-) diff --git a/cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs b/cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs index 208400a8..1b6dfea3 100644 --- a/cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs +++ b/cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs @@ -101,8 +101,8 @@ impl MessageExt for Message { #[cfg(test)] mod tests { use super::*; - use crate::mqtt_producer_record::MessageRecord; + use crate::headers::MqttVersion::{MQTT_3, MQTT_5}; use crate::MessageBuilderExt; use chrono::Utc; use cloudevents::event::Data; @@ -127,27 +127,23 @@ mod tests { .build() .unwrap(); - let message_record = MessageRecord::from_event( - EventBuilderV10::new() - .id("0001") - .ty("example.test") - .time(time) - .source("http://localhost") - .extension("someint", "10") - .data("application/json", json!({"hello": "world"})) - .build() - .unwrap(), - headers::MqttVersion::V5, - ) - .unwrap(); + let event = EventBuilderV10::new() + .id("0001") + .ty("example.test") + .time(time) + .source("http://localhost") + .extension("someint", "10") + .data("application/json", json!({"hello": "world"})) + .build() + .unwrap(); let msg = MessageBuilder::new() .topic("test") - .message_record(&message_record) + .event(event, MQTT_5) .qos(1) .finalize(); - assert_eq!(msg.to_event(headers::MqttVersion::V5).unwrap(), expected) + assert_eq!(msg.to_event().unwrap(), expected) } #[test] @@ -172,18 +168,12 @@ mod tests { .build() .unwrap(); - let serialized_event = - StructuredDeserializer::deserialize_structured(input, MessageRecord::new()).unwrap(); - let msg = MessageBuilder::new() .topic("test") - .message_record(&serialized_event) + .event(input, MQTT_3) .qos(1) .finalize(); - assert_eq!( - msg.to_event(headers::MqttVersion::V3_1_1).unwrap(), - expected - ) + assert_eq!(msg.to_event().unwrap(), expected) } } diff --git a/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs b/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs index 04618a1c..ecf1184b 100644 --- a/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs +++ b/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs @@ -1,4 +1,5 @@ use super::headers; +use crate::headers::MqttVersion::MQTT_5; use cloudevents::event::SpecVersion; use cloudevents::message::{ BinaryDeserializer, BinarySerializer, Error, MessageAttributeValue, Result, @@ -22,7 +23,7 @@ impl MessageRecord { } } - pub fn from_event(event: Event, version: headers::MqttVersion) -> Result { + pub fn from_event(event: Event, version: &headers::MqttVersion) -> Result { match version { headers::MqttVersion::MQTT_5 => { BinaryDeserializer::deserialize_binary(event, MessageRecord::new()) @@ -130,9 +131,14 @@ pub trait MessageBuilderExt { impl MessageBuilderExt for MessageBuilder { fn event(mut self, event: Event, version: headers::MqttVersion) -> MessageBuilder { let message_record = - MessageRecord::from_event(event, version).expect("error while serializing the event"); + MessageRecord::from_event(event, &version).expect("error while serializing the event"); - self = self.properties(message_record.headers.clone()); + match version { + MQTT_5 => { + self = self.properties(message_record.headers.clone()); + } + _ => (), + } if let Some(s) = message_record.payload.as_ref() { self = self.payload(s.to_vec()); From f5c3e8a3bf626f954b419c5fa6b59847f9be578a Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Mon, 25 Jan 2021 08:19:45 -0800 Subject: [PATCH 06/19] #9 Encoders for MQTT Signed-off-by: Subhobrata Dey --- cloudevents-sdk-paho-mqtt/Cargo.toml | 2 +- cloudevents-sdk-paho-mqtt/src/lib.rs | 3 +-- cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs | 8 +++++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/cloudevents-sdk-paho-mqtt/Cargo.toml b/cloudevents-sdk-paho-mqtt/Cargo.toml index 160eb58d..f6076876 100644 --- a/cloudevents-sdk-paho-mqtt/Cargo.toml +++ b/cloudevents-sdk-paho-mqtt/Cargo.toml @@ -11,7 +11,7 @@ description = "CloudEvents official Rust SDK - Mqtt integration" [dependencies] cloudevents-sdk = { version = "0.3.0", path = ".." } lazy_static = "1.4.0" -paho-mqtt = "0.8" +paho-mqtt = "0.9.1" chrono = { version = "^0.4", features = ["serde"] } [dev-dependencies] diff --git a/cloudevents-sdk-paho-mqtt/src/lib.rs b/cloudevents-sdk-paho-mqtt/src/lib.rs index ff58243a..e1c76030 100644 --- a/cloudevents-sdk-paho-mqtt/src/lib.rs +++ b/cloudevents-sdk-paho-mqtt/src/lib.rs @@ -1,5 +1,4 @@ -//! This library provides Mqtt protocol bindings for CloudEvents -//! using the [paho.mqtt.rust](https://github.com/eclipse/paho.mqtt.rust) library.\\ +//! This library provides Mqtt protocol bindings for CloudEvents using the [paho.mqtt.rust](https://github.com/eclipse/paho.mqtt.rust) library.\\ #[macro_use] mod headers; mod mqtt_consumer_record; diff --git a/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs b/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs index ecf1184b..8e6d4d92 100644 --- a/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs +++ b/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs @@ -135,13 +135,15 @@ impl MessageBuilderExt for MessageBuilder { match version { MQTT_5 => { - self = self.properties(message_record.headers.clone()); + self = self.properties(message_record.headers); } _ => (), } - if let Some(s) = message_record.payload.as_ref() { - self = self.payload(s.to_vec()); + match message_record.payload { + Some(s) => + self = self.payload(s), + None => () } self From 445ccd48f153dd7d095a74bcb2120f98c3244542 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Mon, 25 Jan 2021 08:31:21 -0800 Subject: [PATCH 07/19] #9 Encoders for MQTT Signed-off-by: Subhobrata Dey --- cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs b/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs index 8e6d4d92..dbd5c3ff 100644 --- a/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs +++ b/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs @@ -141,9 +141,8 @@ impl MessageBuilderExt for MessageBuilder { } match message_record.payload { - Some(s) => - self = self.payload(s), - None => () + Some(s) => self = self.payload(s), + None => (), } self From 7b1bdd73fb98a4329d28acf8bc17ab4c2e1b541c Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Tue, 26 Jan 2021 17:26:42 -0800 Subject: [PATCH 08/19] #9 Encoders for MQTT Signed-off-by: Subhobrata Dey --- .github/workflows/rust_tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust_tests.yml b/.github/workflows/rust_tests.yml index d0ffe986..2157b27e 100644 --- a/.github/workflows/rust_tests.yml +++ b/.github/workflows/rust_tests.yml @@ -28,7 +28,7 @@ jobs: # Setup musl if needed - run: sudo apt-get update if: matrix.target == 'x86_64-unknown-linux-musl' - - run: sudo apt-get install -y musl musl-dev musl-tools cmake + - run: sudo apt-get install -y musl musl-dev musl-tools cmake libssl-dev if: matrix.target == 'x86_64-unknown-linux-musl' # Caching stuff From 8015532cff493d20de233f0f86edea82d7fd14ab Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Tue, 26 Jan 2021 17:35:19 -0800 Subject: [PATCH 09/19] #9 Encoders for MQTT Signed-off-by: Subhobrata Dey --- .github/workflows/rust_tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust_tests.yml b/.github/workflows/rust_tests.yml index 2157b27e..a27e53cb 100644 --- a/.github/workflows/rust_tests.yml +++ b/.github/workflows/rust_tests.yml @@ -28,7 +28,7 @@ jobs: # Setup musl if needed - run: sudo apt-get update if: matrix.target == 'x86_64-unknown-linux-musl' - - run: sudo apt-get install -y musl musl-dev musl-tools cmake libssl-dev + - run: sudo apt-get install -y musl musl-dev musl-tools cmake libssl-dev pkg-config if: matrix.target == 'x86_64-unknown-linux-musl' # Caching stuff From 9763b2e33237e81493c7f89238449b1122d2aaad Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Tue, 26 Jan 2021 17:57:10 -0800 Subject: [PATCH 10/19] #9 Encoders for MQTT Signed-off-by: Subhobrata Dey --- .github/workflows/rust_tests.yml | 2 +- cloudevents-sdk-paho-mqtt/Cargo.toml | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/rust_tests.yml b/.github/workflows/rust_tests.yml index a27e53cb..d297b8c4 100644 --- a/.github/workflows/rust_tests.yml +++ b/.github/workflows/rust_tests.yml @@ -76,7 +76,7 @@ jobs: with: command: build toolchain: ${{ matrix.toolchain }} - args: --target ${{ matrix.target }} --workspace + args: --target ${{ matrix.target }} --features vendored --workspace env: CC: musl-gcc CXX: g++ diff --git a/cloudevents-sdk-paho-mqtt/Cargo.toml b/cloudevents-sdk-paho-mqtt/Cargo.toml index f6076876..6e75ba40 100644 --- a/cloudevents-sdk-paho-mqtt/Cargo.toml +++ b/cloudevents-sdk-paho-mqtt/Cargo.toml @@ -13,6 +13,10 @@ cloudevents-sdk = { version = "0.3.0", path = ".." } lazy_static = "1.4.0" paho-mqtt = "0.9.1" chrono = { version = "^0.4", features = ["serde"] } +openssl-sys = "*" [dev-dependencies] -serde_json = "^1.0" \ No newline at end of file +serde_json = "^1.0" + +[features] +vendored = ["openssl-sys/vendored"] \ No newline at end of file From d736c1169c809e826a44f1ff03ab042c224edb88 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Tue, 26 Jan 2021 18:08:31 -0800 Subject: [PATCH 11/19] #9 Encoders for MQTT Signed-off-by: Subhobrata Dey --- Cargo.toml | 4 ++++ cloudevents-sdk-paho-mqtt/Cargo.toml | 6 +----- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f65cbec1..8bcd6516 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ delegate-attr = "^0.2" base64 = "^0.12" url = { version = "^2.1", features = ["serde"] } snafu = "^0.6" +openssl-sys = "*" [target."cfg(not(target_arch = \"wasm32\"))".dependencies] hostname = "^0.3" @@ -39,6 +40,9 @@ rstest = "0.6" claim = "0.3.1" version-sync = "^0.9" +[features] +vendored = ["openssl-sys/vendored"] + [workspace] members = [ ".", diff --git a/cloudevents-sdk-paho-mqtt/Cargo.toml b/cloudevents-sdk-paho-mqtt/Cargo.toml index 6e75ba40..f6076876 100644 --- a/cloudevents-sdk-paho-mqtt/Cargo.toml +++ b/cloudevents-sdk-paho-mqtt/Cargo.toml @@ -13,10 +13,6 @@ cloudevents-sdk = { version = "0.3.0", path = ".." } lazy_static = "1.4.0" paho-mqtt = "0.9.1" chrono = { version = "^0.4", features = ["serde"] } -openssl-sys = "*" [dev-dependencies] -serde_json = "^1.0" - -[features] -vendored = ["openssl-sys/vendored"] \ No newline at end of file +serde_json = "^1.0" \ No newline at end of file From 2a681db63449d485c2a6d435f0105420c8c67307 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Tue, 26 Jan 2021 18:24:49 -0800 Subject: [PATCH 12/19] #9 Encoders for MQTT Signed-off-by: Subhobrata Dey --- .github/workflows/rust_tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust_tests.yml b/.github/workflows/rust_tests.yml index d297b8c4..1c77f3a3 100644 --- a/.github/workflows/rust_tests.yml +++ b/.github/workflows/rust_tests.yml @@ -86,7 +86,7 @@ jobs: with: command: test toolchain: ${{ matrix.toolchain }} - args: --target ${{ matrix.target }} --workspace + args: --target ${{ matrix.target }} --features vendored --workspace env: CC: musl-gcc CXX: g++ From fa7fde9913ebc1437d85b6f464b9be41ca470e1d Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Tue, 26 Jan 2021 18:34:46 -0800 Subject: [PATCH 13/19] #9 Encoders for MQTT Signed-off-by: Subhobrata Dey --- .github/workflows/rust_tests.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/rust_tests.yml b/.github/workflows/rust_tests.yml index 1c77f3a3..5622d80f 100644 --- a/.github/workflows/rust_tests.yml +++ b/.github/workflows/rust_tests.yml @@ -98,7 +98,7 @@ jobs: with: command: build toolchain: ${{ matrix.toolchain }} - args: --target wasm32-unknown-unknown --package cloudevents-sdk --package cloudevents-sdk-reqwest + args: --target wasm32-unknown-unknown --features vendored --package cloudevents-sdk --package cloudevents-sdk-reqwest # Build examples - uses: actions-rs/cargo@v1 @@ -107,7 +107,7 @@ jobs: with: command: build toolchain: ${{ matrix.toolchain }} - args: --target ${{ matrix.target }} --manifest-path ./example-projects/reqwest-wasm-example/Cargo.toml + args: --target ${{ matrix.target }} --features vendored --manifest-path ./example-projects/reqwest-wasm-example/Cargo.toml - uses: actions-rs/cargo@v1 name: "Build rdkafka-example" From 9de2c819c0927e8922d13f2939bddcb3d868909a Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Tue, 26 Jan 2021 21:43:26 -0800 Subject: [PATCH 14/19] #9 Encoders for MQTT Signed-off-by: Subhobrata Dey --- .github/workflows/rust_tests.yml | 16 ++++++++++++---- Cargo.toml | 6 ++---- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/.github/workflows/rust_tests.yml b/.github/workflows/rust_tests.yml index 5622d80f..221eb9bf 100644 --- a/.github/workflows/rust_tests.yml +++ b/.github/workflows/rust_tests.yml @@ -76,7 +76,7 @@ jobs: with: command: build toolchain: ${{ matrix.toolchain }} - args: --target ${{ matrix.target }} --features vendored --workspace + args: --target ${{ matrix.target }} --workspace env: CC: musl-gcc CXX: g++ @@ -86,7 +86,7 @@ jobs: with: command: test toolchain: ${{ matrix.toolchain }} - args: --target ${{ matrix.target }} --features vendored --workspace + args: --target ${{ matrix.target }} --workspace env: CC: musl-gcc CXX: g++ @@ -98,7 +98,7 @@ jobs: with: command: build toolchain: ${{ matrix.toolchain }} - args: --target wasm32-unknown-unknown --features vendored --package cloudevents-sdk --package cloudevents-sdk-reqwest + args: --target wasm32-unknown-unknown --package cloudevents-sdk --package cloudevents-sdk-reqwest # Build examples - uses: actions-rs/cargo@v1 @@ -107,7 +107,7 @@ jobs: with: command: build toolchain: ${{ matrix.toolchain }} - args: --target ${{ matrix.target }} --features vendored --manifest-path ./example-projects/reqwest-wasm-example/Cargo.toml + args: --target ${{ matrix.target }} --manifest-path ./example-projects/reqwest-wasm-example/Cargo.toml - uses: actions-rs/cargo@v1 name: "Build rdkafka-example" @@ -132,3 +132,11 @@ jobs: command: build toolchain: ${{ matrix.toolchain }} args: --target ${{ matrix.target }} --manifest-path ./example-projects/warp-example/Cargo.toml + + - uses: actions-rs/cargo@v1 + name: "Build paho-mqtt-example" + if: matrix.target == 'x86_64-unknown-linux-gnu' && matrix.toolchain == 'stable' + with: + command: build + toolchain: ${{ matrix.toolchain }} + args: --target ${{ matrix.target }} --manifest-path ./example-projects/paho-mqtt-example/Cargo.toml diff --git a/Cargo.toml b/Cargo.toml index 8793bd5a..fcc8c5fd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,11 +24,12 @@ delegate-attr = "^0.2" base64 = "^0.12" url = { version = "^2.1", features = ["serde"] } snafu = "^0.6" -openssl-sys = "*" bitflags = "^1.2" [target."cfg(not(target_arch = \"wasm32\"))".dependencies] hostname = "^0.3" +openssl-sys = "*" +openssl = { version = "*", features = ["vendored"] } uuid = { version = "^0.8", features = ["v4"] } [target.'cfg(target_arch = "wasm32")'.dependencies] @@ -41,9 +42,6 @@ claim = "0.3.1" version-sync = "^0.9" serde_yaml = "0.8" -[features] -vendored = ["openssl-sys/vendored"] - [workspace] members = [ ".", From f6f3e83bb862568c8793a91a6809bacb94fb4156 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Wed, 27 Jan 2021 06:55:42 -0800 Subject: [PATCH 15/19] #9 Encoders for MQTT Signed-off-by: Subhobrata Dey --- example-projects/paho-mqtt-example/Cargo.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/example-projects/paho-mqtt-example/Cargo.toml b/example-projects/paho-mqtt-example/Cargo.toml index 1f5ff62e..8b85e39f 100644 --- a/example-projects/paho-mqtt-example/Cargo.toml +++ b/example-projects/paho-mqtt-example/Cargo.toml @@ -15,4 +15,6 @@ paho-mqtt = "0.8" serde_json = "^1.0" futures = "^0.3" tokio = { version = "^0.2", features = ["full"] } -clap = "2.33.1" \ No newline at end of file +clap = "2.33.1" + +[workspace] \ No newline at end of file From 995d221baa8c702650a8e93069ed8ce4b537d0f3 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Wed, 27 Jan 2021 08:24:25 -0800 Subject: [PATCH 16/19] #9 Encoders for MQTT Signed-off-by: Subhobrata Dey --- example-projects/paho-mqtt-example/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/example-projects/paho-mqtt-example/Cargo.toml b/example-projects/paho-mqtt-example/Cargo.toml index 8b85e39f..5268b0db 100644 --- a/example-projects/paho-mqtt-example/Cargo.toml +++ b/example-projects/paho-mqtt-example/Cargo.toml @@ -8,8 +8,8 @@ edition = "2018" [dependencies] async-trait = "^0.1.33" -cloudevents-sdk = { path = "../sdk-rust" } -cloudevents-sdk-mqtt = { path = "../sdk-rust/cloudevents-sdk-mqtt"} +cloudevents-sdk = { path = "../.." } +cloudevents-sdk-mqtt = { path = "../../cloudevents-sdk-mqtt"} env_logger = "0.7.1" paho-mqtt = "0.8" serde_json = "^1.0" From 374288ba7334de8a7c720661c32392836864f54d Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Wed, 27 Jan 2021 08:47:11 -0800 Subject: [PATCH 17/19] #9 Encoders for MQTT Signed-off-by: Subhobrata Dey --- example-projects/paho-mqtt-example/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example-projects/paho-mqtt-example/Cargo.toml b/example-projects/paho-mqtt-example/Cargo.toml index 5268b0db..7620c350 100644 --- a/example-projects/paho-mqtt-example/Cargo.toml +++ b/example-projects/paho-mqtt-example/Cargo.toml @@ -9,7 +9,7 @@ edition = "2018" [dependencies] async-trait = "^0.1.33" cloudevents-sdk = { path = "../.." } -cloudevents-sdk-mqtt = { path = "../../cloudevents-sdk-mqtt"} +cloudevents-sdk-mqtt = { path = "../../cloudevents-sdk-paho-mqtt"} env_logger = "0.7.1" paho-mqtt = "0.8" serde_json = "^1.0" From b09a66180d33b2a8c57db57adb9df8472d74a029 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Wed, 27 Jan 2021 09:08:08 -0800 Subject: [PATCH 18/19] #9 Encoders for MQTT Signed-off-by: Subhobrata Dey --- example-projects/paho-mqtt-example/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example-projects/paho-mqtt-example/Cargo.toml b/example-projects/paho-mqtt-example/Cargo.toml index 7620c350..96bd13f4 100644 --- a/example-projects/paho-mqtt-example/Cargo.toml +++ b/example-projects/paho-mqtt-example/Cargo.toml @@ -9,7 +9,7 @@ edition = "2018" [dependencies] async-trait = "^0.1.33" cloudevents-sdk = { path = "../.." } -cloudevents-sdk-mqtt = { path = "../../cloudevents-sdk-paho-mqtt"} +cloudevents-sdk-paho-mqtt = { path = "../../cloudevents-sdk-paho-mqtt"} env_logger = "0.7.1" paho-mqtt = "0.8" serde_json = "^1.0" From c52975f3d74696de35a78b0c93951a2adfcc7bd2 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Wed, 27 Jan 2021 13:51:48 -0800 Subject: [PATCH 19/19] #9 Encoders for MQTT Signed-off-by: Subhobrata Dey --- example-projects/paho-mqtt-example/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/example-projects/paho-mqtt-example/Cargo.toml b/example-projects/paho-mqtt-example/Cargo.toml index 96bd13f4..96c5ab21 100644 --- a/example-projects/paho-mqtt-example/Cargo.toml +++ b/example-projects/paho-mqtt-example/Cargo.toml @@ -11,7 +11,7 @@ async-trait = "^0.1.33" cloudevents-sdk = { path = "../.." } cloudevents-sdk-paho-mqtt = { path = "../../cloudevents-sdk-paho-mqtt"} env_logger = "0.7.1" -paho-mqtt = "0.8" +paho-mqtt = "0.9.1" serde_json = "^1.0" futures = "^0.3" tokio = { version = "^0.2", features = ["full"] }