diff --git a/cloudevents-sdk-paho-mqtt/src/headers.rs b/cloudevents-sdk-paho-mqtt/src/headers.rs index 4a7a9bdb..26924652 100644 --- a/cloudevents-sdk-paho-mqtt/src/headers.rs +++ b/cloudevents-sdk-paho-mqtt/src/headers.rs @@ -25,11 +25,8 @@ lazy_static! { } pub(crate) static SPEC_VERSION_HEADER: &'static str = "ce_specversion"; +pub(crate) static MQTT_VERSION_HEADER: &'static str = "ce_mqttversion"; pub(crate) static CLOUDEVENTS_JSON_HEADER: &'static str = "application/cloudevents+json"; pub(crate) static CONTENT_TYPE: &'static str = "content-type"; -pub enum MqttVersion { - V3_1, - V3_1_1, - V5, -} +pub(crate) static MQTT_V5_BINARY: &'static str = "V5_BINARY"; diff --git a/cloudevents-sdk-paho-mqtt/src/lib.rs b/cloudevents-sdk-paho-mqtt/src/lib.rs index ff58243a..64ca356e 100644 --- a/cloudevents-sdk-paho-mqtt/src/lib.rs +++ b/cloudevents-sdk-paho-mqtt/src/lib.rs @@ -9,6 +9,5 @@ pub use mqtt_consumer_record::record_to_event; pub use mqtt_consumer_record::ConsumerMessageDeserializer; pub use mqtt_consumer_record::MessageExt; -pub use headers::MqttVersion; pub use mqtt_producer_record::MessageBuilderExt; pub use mqtt_producer_record::MessageRecord; diff --git a/cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs b/cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs index 4c18a94c..b7885614 100644 --- a/cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs +++ b/cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs @@ -4,7 +4,7 @@ use cloudevents::message::{ BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer, Result, StructuredDeserializer, StructuredSerializer, }; -use cloudevents::{message, Event}; +use cloudevents::{message, Data, Event}; use paho_mqtt::{Message, PropertyCode}; use std::collections::HashMap; use std::convert::TryFrom; @@ -16,7 +16,7 @@ pub struct ConsumerMessageDeserializer { } impl ConsumerMessageDeserializer { - fn get_mqtt_headers(message: &Message) -> Result>> { + fn get_mqtt_headers(message: &Message) -> HashMap> { let mut hm = HashMap::new(); let prop_iterator = message.properties().iter(PropertyCode::UserProperty); @@ -25,12 +25,12 @@ impl ConsumerMessageDeserializer { hm.insert(header.0.to_string(), Vec::from(header.1)); } - Ok(hm) + hm } pub fn new(message: &Message) -> Result { Ok(ConsumerMessageDeserializer { - headers: Self::get_mqtt_headers(message)?, + headers: Self::get_mqtt_headers(message), payload: Some(message.payload()).map(|s| Vec::from(s)), }) } @@ -110,22 +110,27 @@ impl MessageDeserializer for ConsumerMessageDeserializer { fn encoding(&self) -> Encoding { match ( self.headers - .get("content-type") + .get(headers::CONTENT_TYPE) .map(|s| String::from_utf8(s.to_vec()).ok()) .flatten() .map(|s| s.starts_with(headers::CLOUDEVENTS_JSON_HEADER)) .unwrap_or(false), - self.headers.get(headers::SPEC_VERSION_HEADER), + self.headers.get(headers::MQTT_VERSION_HEADER) + .map(|s| String::from_utf8(s.to_vec()).ok()) + .flatten() + .map(|s| s.eq(headers::MQTT_V5_BINARY)) + .unwrap_or(false), ) { - (true, _) => Encoding::STRUCTURED, - (_, Some(_)) => Encoding::BINARY, - _ => Encoding::UNKNOWN, + (true, true) => Encoding::STRUCTURED, + (_, true) => Encoding::BINARY, + _ => Encoding::STRUCTURED, } } } -pub fn record_to_event(msg: &Message, version: headers::MqttVersion) -> Result { - match version { +pub fn record_to_event(msg: &Message) -> Result { + MessageDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?) +/* match version { headers::MqttVersion::V5 => { BinaryDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?) } @@ -135,16 +140,16 @@ pub fn record_to_event(msg: &Message, version: headers::MqttVersion) -> Result { StructuredDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?) } - } + }*/ } pub trait MessageExt { - fn to_event(&self, version: headers::MqttVersion) -> Result; + fn to_event(&self) -> Result; } impl MessageExt for Message { - fn to_event(&self, version: headers::MqttVersion) -> Result { - record_to_event(self, version) + fn to_event(&self) -> Result { + record_to_event(self) } } @@ -155,7 +160,6 @@ mod tests { use crate::MessageBuilderExt; use chrono::Utc; - use cloudevents::event::Data; use cloudevents::{EventBuilder, EventBuilderV10}; use paho_mqtt::MessageBuilder; use serde_json::json; @@ -170,10 +174,10 @@ mod tests { .time(time) .source("http://localhost") .data( - "application/json", - Data::Binary(String::from("{\"hello\":\"world\"}").into_bytes()), + "application/octet-stream", + Data::Binary(String::from("hello rust").into_bytes()), ) - .extension("someint", "10") + .extension("mqttversion", headers::MQTT_V5_BINARY) .build() .unwrap(); @@ -183,11 +187,12 @@ mod tests { .ty("example.test") .time(time) .source("http://localhost") - .extension("someint", "10") - .data("application/json", json!({"hello": "world"})) + .extension("mqttversion", headers::MQTT_V5_BINARY) + .data( + "application/octet-stream", + Data::Binary(String::from("hello rust").into_bytes())) .build() .unwrap(), - headers::MqttVersion::V5, ) .unwrap(); @@ -197,7 +202,7 @@ mod tests { .qos(1) .finalize(); - assert_eq!(msg.to_event(headers::MqttVersion::V5).unwrap(), expected) + assert_eq!(msg.to_event().unwrap(), expected) } #[test] @@ -209,7 +214,6 @@ mod tests { .ty("example.test") .source("http://localhost") .data("application/cloudevents+json", j.clone()) - .extension("someint", "10") .build() .unwrap(); @@ -218,7 +222,6 @@ mod tests { .ty("example.test") .source("http://localhost") .data("application/cloudevents+json", j.clone()) - .extension("someint", "10") .build() .unwrap(); @@ -232,7 +235,7 @@ mod tests { .finalize(); assert_eq!( - msg.to_event(headers::MqttVersion::V3_1_1).unwrap(), + msg.to_event().unwrap(), expected ) } diff --git a/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs b/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs index 387b813f..94f79bca 100644 --- a/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs +++ b/cloudevents-sdk-paho-mqtt/src/mqtt_producer_record.rs @@ -22,17 +22,12 @@ impl MessageRecord { } } - pub fn from_event(event: Event, version: headers::MqttVersion) -> Result { - match version { - headers::MqttVersion::V5 => { - BinaryDeserializer::deserialize_binary(event, MessageRecord::new()) - } - headers::MqttVersion::V3_1 => { - StructuredDeserializer::deserialize_structured(event, MessageRecord::new()) - } - headers::MqttVersion::V3_1_1 => { - StructuredDeserializer::deserialize_structured(event, MessageRecord::new()) - } + pub fn from_event(event: Event) -> Result { + match event.extension(headers::MQTT_VERSION_HEADER) + .map(|e| e.to_string().eq(headers::MQTT_V5_BINARY)) + .unwrap_or(false) { + true => BinaryDeserializer::deserialize_binary(event, MessageRecord::new()), + _ => StructuredDeserializer::deserialize_structured(event, MessageRecord::new()) } } } diff --git a/example-projects/paho-mqtt-example/src/main.rs b/example-projects/paho-mqtt-example/src/main.rs index 42456f2f..ddcf1896 100644 --- a/example-projects/paho-mqtt-example/src/main.rs +++ b/example-projects/paho-mqtt-example/src/main.rs @@ -39,7 +39,7 @@ fn consume_v3(broker: &str, topic_name: &str) { while let Some(msg_opt) = strm.next().await { if let Some(msg) = msg_opt { - let event = msg.to_event(MqttVersion::V3_1_1).unwrap(); + let event = msg.to_event().unwrap(); println!("Received Event: {:#?}", event); } else { @@ -90,7 +90,7 @@ fn consume_v5(broker: &str, topic_name: &str) { while let Some(msg_opt) = strm.next().await { if let Some(msg) = msg_opt { - let event = msg.to_event(MqttVersion::V5).unwrap(); + let event = msg.to_event().unwrap(); println!("Received Event: {:#?}", event); } else { @@ -131,12 +131,12 @@ fn produce_v3(broker: &str, topic_name: &str) { .id("1".to_string()) .ty("example.test") .source("http://localhost/") - .data("application/json", json!({"hello": "world"})) + .data("application/cloudevents+json", json!({"hello": "world"})) .build() .unwrap(); let message_record = - MessageRecord::from_event(event, MqttVersion::V3_1_1).expect("error while serializing the event"); + MessageRecord::from_event(event).expect("error while serializing the event"); // Create a message and publish it let msg = mqtt::MessageBuilder::new() @@ -185,7 +185,7 @@ fn produce_v5(broker: &str, topic_name: &str) { .unwrap(); let message_record = - MessageRecord::from_event(event, MqttVersion::V5).expect("error while serializing the event"); + MessageRecord::from_event(event).expect("error while serializing the event"); // Create a message and publish it let msg = mqtt::MessageBuilder::new()