Skip to content

Commit

Permalink
feat(stream): Add mqtt connector
Browse files Browse the repository at this point in the history
  • Loading branch information
bakjos committed Mar 2, 2024
1 parent 672ab0e commit d202e0f
Show file tree
Hide file tree
Showing 20 changed files with 697 additions and 4 deletions.
46 changes: 44 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions ci/scripts/gen-integration-test-yaml.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
'mindsdb': ['json'],
'vector': ['json'],
'nats': ['json'],
'mqtt-source': ['json'],
'doris-sink': ['json'],
'starrocks-sink': ['json'],
'deltalake-sink': ['json'],
Expand Down
11 changes: 11 additions & 0 deletions integration_tests/mqtt-source/create_source.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@

CREATE TABLE mqtt_source_table
(
id integer,
name varchar,
)
WITH (
connector='mqtt',
host='mqtt-server',
topic= 'test'
) FORMAT PLAIN ENCODE JSON;
1 change: 1 addition & 0 deletions integration_tests/mqtt-source/data_check
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mqtt_source_table
45 changes: 45 additions & 0 deletions integration_tests/mqtt-source/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
---
version: "3"
services:
risingwave-standalone:
extends:
file: ../../docker/docker-compose.yml
service: risingwave-standalone
mqtt-server:
image: emqx/emqx:5.2.1
ports:
- 1883:1883
etcd-0:
extends:
file: ../../docker/docker-compose.yml
service: etcd-0
grafana-0:
extends:
file: ../../docker/docker-compose.yml
service: grafana-0
minio-0:
extends:
file: ../../docker/docker-compose.yml
service: minio-0
prometheus-0:
extends:
file: ../../docker/docker-compose.yml
service: prometheus-0
message_queue:
extends:
file: ../../docker/docker-compose.yml
service: message_queue
volumes:
compute-node-0:
external: false
etcd-0:
external: false
grafana-0:
external: false
minio-0:
external: false
prometheus-0:
external: false
message_queue:
external: false
name: risingwave-compose
8 changes: 8 additions & 0 deletions integration_tests/mqtt-source/query.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
select
*
from
mqtt_source_table
order by
id
LIMIT
10;
4 changes: 4 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,10 @@ risingwave_common = { workspace = true }
risingwave_jni_core = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
rumqttc = "0.22.0"
rust_decimal = "1"
rustls-native-certs = "0.6"
rustls-pemfile = "1"
rw_futures_util = { workspace = true }
serde = { version = "1", features = ["derive", "rc"] }
serde_derive = "1"
Expand All @@ -137,6 +140,7 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [
] }
tokio-postgres = { version = "0.7", features = ["with-uuid-1"] }
tokio-retry = "0.3"
tokio-rustls = "0.24"
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["codec", "io"] }
tonic = { workspace = true }
Expand Down
126 changes: 126 additions & 0 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -684,3 +684,129 @@ impl NatsCommon {
Ok(creds)
}
}

#[serde_as]
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct MqttCommon {
/// Protocol used for RisingWave to communicate with Kafka brokers. Could be tcp or ssl
#[serde(rename = "protocol")]
pub protocol: Option<String>,
#[serde(rename = "host")]
pub host: String,
#[serde(rename = "port")]
pub port: Option<i32>,
#[serde(rename = "topic")]
pub topic: String,
#[serde(rename = "username")]
pub user: Option<String>,
#[serde(rename = "password")]
pub password: Option<String>,
#[serde(rename = "client_prefix")]
pub client_prefix: Option<String>,
#[serde(rename = "tls.ca")]
pub ca: Option<String>,
#[serde(rename = "tls.client_cert")]
pub client_cert: Option<String>,
#[serde(rename = "tls.client_key")]
pub client_key: Option<String>,
}

impl MqttCommon {
pub(crate) fn build_client(
&self,
id: u32,
) -> ConnectorResult<(rumqttc::v5::AsyncClient, rumqttc::v5::EventLoop)> {
let ssl = self
.protocol
.as_ref()
.map(|p| p == "ssl")
.unwrap_or_default();

let client_id = format!(
"{}_{}{}",
self.client_prefix.as_deref().unwrap_or("risingwave"),
id,
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis()
% 100000,
);

let port = self.port.unwrap_or(if ssl { 8883 } else { 1883 }) as u16;

let mut options = rumqttc::v5::MqttOptions::new(client_id, &self.host, port);
if ssl {
let mut root_cert_store = tokio_rustls::rustls::RootCertStore::empty();
if let Some(ca) = &self.ca {
let certificates = load_certs(ca)?;
for cert in certificates {
root_cert_store.add(&cert).unwrap();
}
} else {
for cert in
rustls_native_certs::load_native_certs().expect("could not load platform certs")
{
root_cert_store
.add(&tokio_rustls::rustls::Certificate(cert.0))
.unwrap();
}
}

let builder = tokio_rustls::rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(root_cert_store);

let tls_config = if let (Some(client_cert), Some(client_key)) =
(self.client_cert.as_ref(), self.client_key.as_ref())
{
let certs = load_certs(client_cert)?;
let key = load_private_key(client_key)?;

builder.with_client_auth_cert(certs, key)?
} else {
builder.with_no_client_auth()
};

options.set_transport(rumqttc::Transport::tls_with_config(
rumqttc::TlsConfiguration::Rustls(std::sync::Arc::new(tls_config)),
));
}

if let Some(user) = &self.user {
options.set_credentials(user, self.password.as_deref().unwrap_or_default());
}

Ok(rumqttc::v5::AsyncClient::new(options, 10))
}
}

fn load_certs(certificates: &str) -> ConnectorResult<Vec<tokio_rustls::rustls::Certificate>> {
let cert_bytes = if let Some(path) = certificates.strip_prefix("fs://") {
std::fs::read_to_string(path).map(|cert| cert.as_bytes().to_owned())?
} else {
certificates.as_bytes().to_owned()
};

let certs = rustls_pemfile::certs(&mut cert_bytes.as_slice())?;

Ok(certs
.into_iter()
.map(tokio_rustls::rustls::Certificate)
.collect())
}

fn load_private_key(certificate: &str) -> ConnectorResult<tokio_rustls::rustls::PrivateKey> {
let cert_bytes = if let Some(path) = certificate.strip_prefix("fs://") {
std::fs::read_to_string(path).map(|cert| cert.as_bytes().to_owned())?
} else {
certificate.as_bytes().to_owned()
};

let certs = rustls_pemfile::pkcs8_private_keys(&mut cert_bytes.as_slice())?;
let cert = certs
.into_iter()
.next()
.ok_or_else(|| anyhow!("No private key found"))?;
Ok(tokio_rustls::rustls::PrivateKey(cert))
}
2 changes: 2 additions & 0 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ def_anyhow_newtype! {
redis::RedisError => "Redis error",
arrow_schema::ArrowError => "Arrow error",
google_cloud_pubsub::client::google_cloud_auth::error::Error => "Google Cloud error",
tokio_rustls::rustls::Error => "TLS error",
rumqttc::v5::ClientError => "MQTT error",
}

pub type ConnectorResult<T, E = ConnectorError> = std::result::Result<T, E>;
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ macro_rules! for_all_classified_sources {
{ Nexmark, $crate::source::nexmark::NexmarkProperties, $crate::source::nexmark::NexmarkSplit },
{ Datagen, $crate::source::datagen::DatagenProperties, $crate::source::datagen::DatagenSplit },
{ GooglePubsub, $crate::source::google_pubsub::PubsubProperties, $crate::source::google_pubsub::PubsubSplit },
{ Mqtt, $crate::source::mqtt::MqttProperties, $crate::source::mqtt::split::MqttSplit },
{ Nats, $crate::source::nats::NatsProperties, $crate::source::nats::split::NatsSplit },
{ S3, $crate::source::filesystem::S3Properties, $crate::source::filesystem::FsSplit },
{ Gcs, $crate::source::filesystem::opendal_source::GcsProperties , $crate::source::filesystem::OpendalFsSplit<$crate::source::filesystem::opendal_source::OpendalGcs> },
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub mod google_pubsub;
pub mod kafka;
pub mod kinesis;
pub mod monitor;
pub mod mqtt;
pub mod nats;
pub mod nexmark;
pub mod pulsar;
Expand All @@ -29,6 +30,7 @@ pub(crate) use common::*;
pub use google_pubsub::GOOGLE_PUBSUB_CONNECTOR;
pub use kafka::KAFKA_CONNECTOR;
pub use kinesis::KINESIS_CONNECTOR;
pub use mqtt::MQTT_CONNECTOR;
pub use nats::NATS_CONNECTOR;
mod common;
pub mod iceberg;
Expand Down
Loading

0 comments on commit d202e0f

Please sign in to comment.