Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#9 Encoders for MQTT #104

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .github/workflows/rust_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
# Setup musl if needed
- run: sudo apt-get update
if: matrix.target == 'x86_64-unknown-linux-musl'
- run: sudo apt-get install -y musl musl-dev musl-tools cmake
- run: sudo apt-get install -y musl musl-dev musl-tools cmake libssl-dev pkg-config
if: matrix.target == 'x86_64-unknown-linux-musl'

# Caching stuff
Expand Down Expand Up @@ -132,3 +132,11 @@ jobs:
command: build
toolchain: ${{ matrix.toolchain }}
args: --target ${{ matrix.target }} --manifest-path ./example-projects/warp-example/Cargo.toml

- uses: actions-rs/cargo@v1
name: "Build paho-mqtt-example"
if: matrix.target == 'x86_64-unknown-linux-gnu' && matrix.toolchain == 'stable'
with:
command: build
toolchain: ${{ matrix.toolchain }}
args: --target ${{ matrix.target }} --manifest-path ./example-projects/paho-mqtt-example/Cargo.toml
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ bitflags = "^1.2"

[target."cfg(not(target_arch = \"wasm32\"))".dependencies]
hostname = "^0.3"
openssl-sys = "*"
openssl = { version = "*", features = ["vendored"] }
Comment on lines +31 to +32
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may we should add some version restrictions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Why in the main toml we need openssl?

uuid = { version = "^0.8", features = ["v4"] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
Expand All @@ -46,11 +48,13 @@ members = [
"cloudevents-sdk-actix-web",
"cloudevents-sdk-reqwest",
"cloudevents-sdk-rdkafka",
"cloudevents-sdk-warp"
"cloudevents-sdk-warp",
"cloudevents-sdk-paho-mqtt"
]
exclude = [
"example-projects/actix-web-example",
"example-projects/reqwest-wasm-example",
"example-projects/rdkafka-example",
"example-projects/warp-example",
"example-projects/paho-mqtt-example"
]
18 changes: 18 additions & 0 deletions cloudevents-sdk-paho-mqtt/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "cloudevents-sdk-paho-mqtt"
version = "0.3.0"
authors = ["Francesco Guardiani <[email protected]>"]
license-file = "../LICENSE"
edition = "2018"
description = "CloudEvents official Rust SDK - Mqtt integration"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
cloudevents-sdk = { version = "0.3.0", path = ".." }
lazy_static = "1.4.0"
paho-mqtt = "0.9.1"
chrono = { version = "^0.4", features = ["serde"] }

[dev-dependencies]
serde_json = "^1.0"
42 changes: 42 additions & 0 deletions cloudevents-sdk-paho-mqtt/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# CloudEvents SDK Rust - paho-mqtt [![Crates badge]][crates.io] [![Docs badge]][docs.rs]

Integration of [CloudEvents SDK](https://github.com/cloudevents/sdk-rust/) with [paho-mqtt](https://www.eclipse.org/paho/).

Look at [CloudEvents SDK README](https://github.com/cloudevents/sdk-rust/) for more info.

## Development & Contributing

If you're interested in contributing to sdk-rust, look at [Contributing documentation](../CONTRIBUTING.md)

## Community

## Sample usage

- Check the example [paho-mqtt-example](../example-projects/paho-mqtt-example)

### MQTT V3
- Start the MQTT V3 Consumer

```
run --package <package-name> --bin <binary-name> -- --mode consumerV3 --broker tcp://localhost:1883 --topic test
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is run? can we have some more specific examples?

```

- Start the MQTT V3 Producer

```
run --package <package-name> --bin <binary-name> -- --broker tcp://localhost:1883 --topic test --mode producerV3
```

### MQTT V5
- Start the MQTT V5 Consumer

```
run --package <package-name> --bin <binary-name> -- --mode consumerV5 --broker tcp://localhost:1883 --topic test
```

- Start the MQTT V5 Producer

```
run --package <package-name> --bin <binary-name> -- --broker tcp://localhost:1883 --topic test --mode producerV5
```

34 changes: 34 additions & 0 deletions cloudevents-sdk-paho-mqtt/src/headers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use cloudevents::event::SpecVersion;
use lazy_static::lazy_static;
use std::collections::HashMap;

macro_rules! attribute_name_to_header {
($attribute:expr) => {
format!("ce_{}", $attribute)
};
}

fn attributes_to_headers(it: impl Iterator<Item = &'static str>) -> HashMap<&'static str, String> {
it.map(|s| {
if s == "datacontenttype" {
(s, String::from("content-type"))
} else {
(s, attribute_name_to_header!(s))
}
})
.collect()
}

lazy_static! {
pub(crate) static ref ATTRIBUTES_TO_MQTT_HEADERS: HashMap<&'static str, String> =
attributes_to_headers(SpecVersion::all_attribute_names());
}

pub(crate) static SPEC_VERSION_HEADER: &'static str = "ce_specversion";
pub(crate) static CLOUDEVENTS_JSON_HEADER: &'static str = "application/cloudevents+json";
pub(crate) static CONTENT_TYPE: &'static str = "content-type";

pub enum MqttVersion {
MQTT_3,
MQTT_5,
}
13 changes: 13 additions & 0 deletions cloudevents-sdk-paho-mqtt/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//! This library provides Mqtt protocol bindings for CloudEvents using the [paho.mqtt.rust](https://github.com/eclipse/paho.mqtt.rust) library.\\
#[macro_use]
mod headers;
mod mqtt_consumer_record;
mod mqtt_producer_record;

pub use mqtt_consumer_record::record_to_event;
pub use mqtt_consumer_record::ConsumerMessageDeserializer;
pub use mqtt_consumer_record::MessageExt;

pub use headers::MqttVersion;
pub use mqtt_producer_record::MessageBuilderExt;
pub use mqtt_producer_record::MessageRecord;
179 changes: 179 additions & 0 deletions cloudevents-sdk-paho-mqtt/src/mqtt_consumer_record.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
use crate::headers;
use cloudevents::event::SpecVersion;
use cloudevents::message::{
BinaryDeserializer, BinarySerializer, Encoding, MessageAttributeValue, MessageDeserializer,
Result, StructuredDeserializer, StructuredSerializer,
};
use cloudevents::{message, Event};
use paho_mqtt::{Message, Properties, PropertyCode};
use std::convert::TryFrom;

pub struct ConsumerMessageDeserializer<'a> {
pub(crate) headers: &'a Properties,
pub(crate) payload: Option<Vec<u8>>,
}

impl<'a> ConsumerMessageDeserializer<'a> {
fn get_mqtt_headers(message: &Message) -> &Properties {
message.properties()
}

pub fn new(message: &Message) -> Result<ConsumerMessageDeserializer> {
Ok(ConsumerMessageDeserializer {
headers: Self::get_mqtt_headers(message),
payload: Some(message.payload()).map(|s| Vec::from(s)),
})
}
}

impl<'a> BinaryDeserializer for ConsumerMessageDeserializer<'a> {
fn deserialize_binary<R: Sized, V: BinarySerializer<R>>(self, mut visitor: V) -> Result<R> {
if self.encoding() != Encoding::BINARY {
return Err(message::Error::WrongEncoding {});
}

let spec_version = SpecVersion::try_from(
self.headers
.find_user_property(headers::SPEC_VERSION_HEADER)
.unwrap()
.as_str(),
)?;

visitor = visitor.set_spec_version(spec_version.clone())?;

let attributes = spec_version.attribute_names();

if let Some(hv) = self.headers.find_user_property(headers::CONTENT_TYPE) {
visitor = visitor.set_attribute("datacontenttype", MessageAttributeValue::String(hv))?
}

for (hn, hv) in self
.headers
.user_iter()
.filter(|(hn, _)| headers::SPEC_VERSION_HEADER != *hn && hn.starts_with("ce_"))
{
let name = &hn["ce_".len()..];

if attributes.contains(&name) {
visitor = visitor.set_attribute(name, MessageAttributeValue::String(hv))?
} else {
visitor = visitor.set_extension(name, MessageAttributeValue::String(hv))?
}
}

if self.payload != None {
visitor.end_with_data(self.payload.unwrap())
} else {
visitor.end()
}
}
}

impl<'a> StructuredDeserializer for ConsumerMessageDeserializer<'a> {
fn deserialize_structured<R: Sized, V: StructuredSerializer<R>>(self, visitor: V) -> Result<R> {
visitor.set_structured_event(self.payload.unwrap())
}
}

impl<'a> MessageDeserializer for ConsumerMessageDeserializer<'a> {
fn encoding(&self) -> Encoding {
match self.headers.iter(PropertyCode::UserProperty).count() == 0 {
true => Encoding::STRUCTURED,
false => Encoding::BINARY,
}
}
}

pub fn record_to_event(msg: &Message) -> Result<Event> {
MessageDeserializer::into_event(ConsumerMessageDeserializer::new(msg)?)
}

pub trait MessageExt {
fn to_event(&self) -> Result<Event>;
}

impl MessageExt for Message {
fn to_event(&self) -> Result<Event> {
record_to_event(self)
}
}

#[cfg(test)]
mod tests {
use super::*;

use crate::headers::MqttVersion::{MQTT_3, MQTT_5};
use crate::MessageBuilderExt;
use chrono::Utc;
use cloudevents::event::Data;
use cloudevents::{EventBuilder, EventBuilderV10};
use paho_mqtt::MessageBuilder;
use serde_json::json;

#[test]
fn test_binary_record() {
let time = Utc::now();

let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.time(time)
.source("http://localhost")
.data(
"application/json",
Data::Binary(String::from("{\"hello\":\"world\"}").into_bytes()),
)
.extension("someint", "10")
.build()
.unwrap();

let event = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.time(time)
.source("http://localhost")
.extension("someint", "10")
.data("application/json", json!({"hello": "world"}))
.build()
.unwrap();

let msg = MessageBuilder::new()
.topic("test")
.event(event, MQTT_5)
.qos(1)
.finalize();

assert_eq!(msg.to_event().unwrap(), expected)
}

#[test]
fn test_structured_record() {
let j = json!({"hello": "world"});

let expected = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost")
.data("application/cloudevents+json", j.clone())
.extension("someint", "10")
.build()
.unwrap();

let input = EventBuilderV10::new()
.id("0001")
.ty("example.test")
.source("http://localhost")
.data("application/cloudevents+json", j.clone())
.extension("someint", "10")
.build()
.unwrap();

let msg = MessageBuilder::new()
.topic("test")
.event(input, MQTT_3)
.qos(1)
.finalize();

assert_eq!(msg.to_event().unwrap(), expected)
}
}
Loading