diff --git a/.typos.toml b/.typos.toml index b19d0a08c541d..052a051bd1e00 100644 --- a/.typos.toml +++ b/.typos.toml @@ -9,6 +9,7 @@ steam = "stream" # You played with Steam games too much. # Some weird short variable names ot = "ot" bui = "bui" +mosquitto = "mosquitto" # This is a MQTT broker. [default.extend-identifiers] diff --git a/integration_tests/mqtt/create_sink.sql b/integration_tests/mqtt/create_sink.sql new file mode 100644 index 0000000000000..69b6886943944 --- /dev/null +++ b/integration_tests/mqtt/create_sink.sql @@ -0,0 +1,28 @@ +CREATE SINK mqtt_sink +FROM + personnel +WITH +( + connector='mqtt', + url='tcp://mqtt-server', + topic= 'test', + type = 'append-only', + retain = 'true', + qos = 'at_least_once', +) FORMAT PLAIN ENCODE JSON ( + force_append_only='true', +); + +INSERT INTO + personnel +VALUES + (1, 'Alice'), + (2, 'Bob'), + (3, 'Tom'), + (4, 'Jerry'), + (5, 'Araminta'), + (6, 'Clover'), + (7, 'Posey'), + (8, 'Waverly'); + +FLUSH; \ No newline at end of file diff --git a/integration_tests/mqtt/create_source.sql b/integration_tests/mqtt/create_source.sql index 8c63216125280..068d7e0a6cb46 100644 --- a/integration_tests/mqtt/create_source.sql +++ b/integration_tests/mqtt/create_source.sql @@ -12,33 +12,3 @@ WITH ( topic= 'test', qos = 'at_least_once', ) FORMAT PLAIN ENCODE JSON; - - -CREATE SINK mqtt_sink -FROM - personnel -WITH - ( - connector='mqtt', - url='tcp://mqtt-server', - topic= 'test', - type = 'append-only', - retain = 'false', - qos = 'at_least_once', - ) FORMAT PLAIN ENCODE JSON ( - force_append_only='true', - ); - -INSERT INTO - personnel -VALUES - (1, 'Alice'), - (2, 'Bob'), - (3, 'Tom'), - (4, 'Jerry'), - (5, 'Araminta'), - (6, 'Clover'), - (7, 'Posey'), - (8, 'Waverly'); - -FLUSH; diff --git a/integration_tests/mqtt/docker-compose.yml b/integration_tests/mqtt/docker-compose.yml index 87969f8ad9044..9db7e7c04f8fc 100644 --- a/integration_tests/mqtt/docker-compose.yml +++ b/integration_tests/mqtt/docker-compose.yml @@ -6,7 +6,11 @@ services: file: ../../docker/docker-compose.yml service: risingwave-standalone mqtt-server: - image: emqx/emqx:5.2.1 + image: eclipse-mosquitto + command: + - sh + - -c + - echo "running command"; printf 'allow_anonymous true\nlistener 1883 0.0.0.0' > /mosquitto/config/mosquitto.conf; echo "starting service..."; cat /mosquitto/config/mosquitto.conf;/docker-entrypoint.sh;/usr/sbin/mosquitto -c /mosquitto/config/mosquitto.conf ports: - 1883:1883 etcd-0: diff --git a/integration_tests/mqtt/sink_check.py b/integration_tests/mqtt/sink_check.py new file mode 100644 index 0000000000000..cb74a12e9fe29 --- /dev/null +++ b/integration_tests/mqtt/sink_check.py @@ -0,0 +1,14 @@ +import sys +import subprocess + + +output = subprocess.Popen(["docker", "compose", "exec", "mqtt-server", "mosquitto_sub", "-h", "localhost", "-t", "test", "-p", "1883", "-C", "1", "-W", "120"], + stdout=subprocess.PIPE) +rows = subprocess.check_output(["wc", "-l"], stdin=output.stdout) +output.stdout.close() +output.wait() +rows = int(rows.decode('utf8').strip()) +print(f"{rows} rows in 'test'") +if rows < 1: + print(f"Data check failed for case 'test'") + sys.exit(1) diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index 433905a186791..e18833a9d10cc 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -28,17 +28,16 @@ use risingwave_common::bail; use serde_derive::Deserialize; use serde_with::json::JsonString; use serde_with::{serde_as, DisplayFromStr}; -use strum_macros::{Display, EnumString}; use tempfile::NamedTempFile; use time::OffsetDateTime; use url::Url; use with_options::WithOptions; use crate::aws_utils::load_file_descriptor_from_s3; +use crate::deserialize_duration_from_string; use crate::error::ConnectorResult; use crate::sink::SinkError; use crate::source::nats::source::NatsOffset; -use crate::{deserialize_bool_from_string, deserialize_duration_from_string}; // The file describes the common abstractions for each connector and can be used in both source and // sink. @@ -686,161 +685,9 @@ impl NatsCommon { } } -#[derive(Debug, Clone, PartialEq, Display, Deserialize, EnumString)] -#[strum(serialize_all = "snake_case")] -#[allow(clippy::enum_variant_names)] -pub enum QualityOfService { - AtLeastOnce, - AtMostOnce, - ExactlyOnce, -} - -#[serde_as] -#[derive(Deserialize, Debug, Clone, WithOptions)] -pub struct MqttCommon { - /// The url of the broker to connect to. e.g. tcp://localhost. - /// Must be prefixed with one of either `tcp://`, `mqtt://`, `ssl://`,`mqtts://`, - /// to denote the protocol for establishing a connection with the broker. - /// `mqtts://`, `ssl://` will use the native certificates if no ca is specified - pub url: String, - - /// The topic name to subscribe or publish to. When subscribing, it can be a wildcard topic. e.g /topic/# - pub topic: String, - - /// The quality of service to use when publishing messages. Defaults to at_most_once. - /// Could be at_most_once, at_least_once or exactly_once - #[serde_as(as = "Option")] - pub qos: Option, - - /// Username for the mqtt broker - #[serde(rename = "username")] - pub user: Option, - - /// Password for the mqtt broker - pub password: Option, - - /// Prefix for the mqtt client id. - /// The client id will be generated as `client_prefix`_`actor_id`_`(executor_id|source_id)`. Defaults to risingwave - pub client_prefix: Option, - - /// `clean_start = true` removes all the state from queues & instructs the broker - /// to clean all the client state when client disconnects. - /// - /// When set `false`, broker will hold the client state and performs pending - /// operations on the client when reconnection with same `client_id` - /// happens. Local queue state is also held to retransmit packets after reconnection. - #[serde(default, deserialize_with = "deserialize_bool_from_string")] - pub clean_start: bool, - - /// The maximum number of inflight messages. Defaults to 100 - #[serde_as(as = "Option")] - pub inflight_messages: Option, - - /// Path to CA certificate file for verifying the broker's key. - #[serde(rename = "tls.client_key")] - pub ca: Option, - /// Path to client's certificate file (PEM). Required for client authentication. - /// Can be a file path under fs:// or a string with the certificate content. - #[serde(rename = "tls.client_cert")] - pub client_cert: Option, - - /// Path to client's private key file (PEM). Required for client authentication. - /// Can be a file path under fs:// or a string with the private key content. - #[serde(rename = "tls.client_key")] - pub client_key: Option, -} - -impl MqttCommon { - pub(crate) fn build_client( - &self, - actor_id: u32, - id: u64, - ) -> ConnectorResult<(rumqttc::v5::AsyncClient, rumqttc::v5::EventLoop)> { - let client_id = format!( - "{}_{}_{}", - self.client_prefix.as_deref().unwrap_or("risingwave"), - actor_id, - id - ); - - let mut url = url::Url::parse(&self.url)?; - - let ssl = matches!(url.scheme(), "mqtts" | "ssl"); - - url.query_pairs_mut().append_pair("client_id", &client_id); - - tracing::debug!("connecting mqtt using url: {}", url.as_str()); - - let mut options = rumqttc::v5::MqttOptions::try_from(url)?; - options.set_keep_alive(std::time::Duration::from_secs(10)); - - options.set_clean_start(self.clean_start); - - if ssl { - let tls_config = self.get_tls_config()?; - options.set_transport(rumqttc::Transport::tls_with_config( - rumqttc::TlsConfiguration::Rustls(std::sync::Arc::new(tls_config)), - )); - } - - if let Some(user) = &self.user { - options.set_credentials(user, self.password.as_deref().unwrap_or_default()); - } - - Ok(rumqttc::v5::AsyncClient::new( - options, - self.inflight_messages.unwrap_or(100), - )) - } - - pub(crate) fn qos(&self) -> rumqttc::v5::mqttbytes::QoS { - self.qos - .as_ref() - .map(|qos| match qos { - QualityOfService::AtMostOnce => rumqttc::v5::mqttbytes::QoS::AtMostOnce, - QualityOfService::AtLeastOnce => rumqttc::v5::mqttbytes::QoS::AtLeastOnce, - QualityOfService::ExactlyOnce => rumqttc::v5::mqttbytes::QoS::ExactlyOnce, - }) - .unwrap_or(rumqttc::v5::mqttbytes::QoS::AtMostOnce) - } - - fn get_tls_config(&self) -> ConnectorResult { - let mut root_cert_store = tokio_rustls::rustls::RootCertStore::empty(); - if let Some(ca) = &self.ca { - let certificates = load_certs(ca)?; - for cert in certificates { - root_cert_store.add(&cert).unwrap(); - } - } else { - for cert in - rustls_native_certs::load_native_certs().expect("could not load platform certs") - { - root_cert_store - .add(&tokio_rustls::rustls::Certificate(cert.0)) - .unwrap(); - } - } - - let builder = tokio_rustls::rustls::ClientConfig::builder() - .with_safe_defaults() - .with_root_certificates(root_cert_store); - - let tls_config = if let (Some(client_cert), Some(client_key)) = - (self.client_cert.as_ref(), self.client_key.as_ref()) - { - let certs = load_certs(client_cert)?; - let key = load_private_key(client_key)?; - - builder.with_client_auth_cert(certs, key)? - } else { - builder.with_no_client_auth() - }; - - Ok(tls_config) - } -} - -fn load_certs(certificates: &str) -> ConnectorResult> { +pub(crate) fn load_certs( + certificates: &str, +) -> ConnectorResult> { let cert_bytes = if let Some(path) = certificates.strip_prefix("fs://") { std::fs::read_to_string(path).map(|cert| cert.as_bytes().to_owned())? } else { @@ -855,7 +702,9 @@ fn load_certs(certificates: &str) -> ConnectorResult ConnectorResult { +pub(crate) fn load_private_key( + certificate: &str, +) -> ConnectorResult { let cert_bytes = if let Some(path) = certificate.strip_prefix("fs://") { std::fs::read_to_string(path).map(|cert| cert.as_bytes().to_owned())? } else { diff --git a/src/connector/src/lib.rs b/src/connector/src/lib.rs index 821c91e166b1e..12bda11f65d5c 100644 --- a/src/connector/src/lib.rs +++ b/src/connector/src/lib.rs @@ -51,6 +51,7 @@ pub mod sink; pub mod source; pub mod common; +pub mod mqtt_common; pub use paste::paste; diff --git a/src/connector/src/mqtt_common.rs b/src/connector/src/mqtt_common.rs new file mode 100644 index 0000000000000..d66f534a2e16f --- /dev/null +++ b/src/connector/src/mqtt_common.rs @@ -0,0 +1,164 @@ +use rumqttc::v5::mqttbytes::QoS; +use rumqttc::v5::{AsyncClient, EventLoop, MqttOptions}; +use serde_derive::Deserialize; +use serde_with::{serde_as, DisplayFromStr}; +use strum_macros::{Display, EnumString}; +use with_options::WithOptions; + +use crate::common::{load_certs, load_private_key}; +use crate::deserialize_bool_from_string; +use crate::error::ConnectorResult; + +#[derive(Debug, Clone, PartialEq, Display, Deserialize, EnumString)] +#[strum(serialize_all = "snake_case")] +#[allow(clippy::enum_variant_names)] +pub enum QualityOfService { + AtLeastOnce, + AtMostOnce, + ExactlyOnce, +} + +#[serde_as] +#[derive(Deserialize, Debug, Clone, WithOptions)] +pub struct MqttCommon { + /// The url of the broker to connect to. e.g. tcp://localhost. + /// Must be prefixed with one of either `tcp://`, `mqtt://`, `ssl://`,`mqtts://`, + /// to denote the protocol for establishing a connection with the broker. + /// `mqtts://`, `ssl://` will use the native certificates if no ca is specified + pub url: String, + + /// The topic name to subscribe or publish to. When subscribing, it can be a wildcard topic. e.g /topic/# + pub topic: String, + + /// The quality of service to use when publishing messages. Defaults to at_most_once. + /// Could be at_most_once, at_least_once or exactly_once + #[serde_as(as = "Option")] + pub qos: Option, + + /// Username for the mqtt broker + #[serde(rename = "username")] + pub user: Option, + + /// Password for the mqtt broker + pub password: Option, + + /// Prefix for the mqtt client id. + /// The client id will be generated as `client_prefix`_`actor_id`_`(executor_id|source_id)`. Defaults to risingwave + pub client_prefix: Option, + + /// `clean_start = true` removes all the state from queues & instructs the broker + /// to clean all the client state when client disconnects. + /// + /// When set `false`, broker will hold the client state and performs pending + /// operations on the client when reconnection with same `client_id` + /// happens. Local queue state is also held to retransmit packets after reconnection. + #[serde(default, deserialize_with = "deserialize_bool_from_string")] + pub clean_start: bool, + + /// The maximum number of inflight messages. Defaults to 100 + #[serde_as(as = "Option")] + pub inflight_messages: Option, + + /// Path to CA certificate file for verifying the broker's key. + #[serde(rename = "tls.client_key")] + pub ca: Option, + /// Path to client's certificate file (PEM). Required for client authentication. + /// Can be a file path under fs:// or a string with the certificate content. + #[serde(rename = "tls.client_cert")] + pub client_cert: Option, + + /// Path to client's private key file (PEM). Required for client authentication. + /// Can be a file path under fs:// or a string with the private key content. + #[serde(rename = "tls.client_key")] + pub client_key: Option, +} + +impl MqttCommon { + pub(crate) fn build_client( + &self, + actor_id: u32, + id: u64, + ) -> ConnectorResult<(AsyncClient, EventLoop)> { + let client_id = format!( + "{}_{}_{}", + self.client_prefix.as_deref().unwrap_or("risingwave"), + actor_id, + id + ); + + let mut url = url::Url::parse(&self.url)?; + + let ssl = matches!(url.scheme(), "mqtts" | "ssl"); + + url.query_pairs_mut().append_pair("client_id", &client_id); + + tracing::debug!("connecting mqtt using url: {}", url.as_str()); + + let mut options = MqttOptions::try_from(url)?; + options.set_keep_alive(std::time::Duration::from_secs(10)); + + options.set_clean_start(self.clean_start); + + if ssl { + let tls_config = self.get_tls_config()?; + options.set_transport(rumqttc::Transport::tls_with_config( + rumqttc::TlsConfiguration::Rustls(std::sync::Arc::new(tls_config)), + )); + } + + if let Some(user) = &self.user { + options.set_credentials(user, self.password.as_deref().unwrap_or_default()); + } + + Ok(rumqttc::v5::AsyncClient::new( + options, + self.inflight_messages.unwrap_or(100), + )) + } + + pub(crate) fn qos(&self) -> QoS { + self.qos + .as_ref() + .map(|qos| match qos { + QualityOfService::AtMostOnce => QoS::AtMostOnce, + QualityOfService::AtLeastOnce => QoS::AtLeastOnce, + QualityOfService::ExactlyOnce => QoS::ExactlyOnce, + }) + .unwrap_or(QoS::AtMostOnce) + } + + fn get_tls_config(&self) -> ConnectorResult { + let mut root_cert_store = tokio_rustls::rustls::RootCertStore::empty(); + if let Some(ca) = &self.ca { + let certificates = load_certs(ca)?; + for cert in certificates { + root_cert_store.add(&cert).unwrap(); + } + } else { + for cert in + rustls_native_certs::load_native_certs().expect("could not load platform certs") + { + root_cert_store + .add(&tokio_rustls::rustls::Certificate(cert.0)) + .unwrap(); + } + } + + let builder = tokio_rustls::rustls::ClientConfig::builder() + .with_safe_defaults() + .with_root_certificates(root_cert_store); + + let tls_config = if let (Some(client_cert), Some(client_key)) = + (self.client_cert.as_ref(), self.client_key.as_ref()) + { + let certs = load_certs(client_cert)?; + let key = load_private_key(client_key)?; + + builder.with_client_auth_cert(certs, key)? + } else { + builder.with_no_client_auth() + }; + + Ok(tls_config) + } +} diff --git a/src/connector/src/sink/mqtt.rs b/src/connector/src/sink/mqtt.rs index 54c483c7076a0..1aebdf4f70062 100644 --- a/src/connector/src/sink/mqtt.rs +++ b/src/connector/src/sink/mqtt.rs @@ -30,7 +30,7 @@ use super::catalog::SinkFormatDesc; use super::formatter::SinkFormatterImpl; use super::writer::FormattedSink; use super::{DummySinkCommitCoordinator, SinkWriterParam}; -use crate::common::MqttCommon; +use crate::mqtt_common::MqttCommon; use crate::sink::catalog::desc::SinkDesc; use crate::sink::log_store::DeliveryFutureManagerAddFuture; use crate::sink::writer::{ @@ -123,7 +123,7 @@ impl Sink for MqttSink { async fn validate(&self) -> Result<()> { if !self.is_append_only { return Err(SinkError::Mqtt(anyhow!( - "Nats sink only support append-only mode" + "Mqtt sink only support append-only mode" ))); } diff --git a/src/connector/src/source/mqtt/mod.rs b/src/connector/src/source/mqtt/mod.rs index 033e8585f57f8..aec17f0454f18 100644 --- a/src/connector/src/source/mqtt/mod.rs +++ b/src/connector/src/source/mqtt/mod.rs @@ -22,7 +22,7 @@ use serde_derive::Deserialize; use serde_with::{serde_as, DisplayFromStr}; use with_options::WithOptions; -use crate::common::{MqttCommon, QualityOfService}; +use crate::mqtt_common::{MqttCommon, QualityOfService}; use crate::source::mqtt::enumerator::MqttSplitEnumerator; use crate::source::mqtt::source::{MqttSplit, MqttSplitReader}; use crate::source::SourceProperties; diff --git a/src/connector/src/with_options.rs b/src/connector/src/with_options.rs index 1bdda0b484ce4..a5c810834727a 100644 --- a/src/connector/src/with_options.rs +++ b/src/connector/src/with_options.rs @@ -52,7 +52,7 @@ impl WithOptions for i32 {} impl WithOptions for i64 {} impl WithOptions for f64 {} impl WithOptions for std::time::Duration {} -impl WithOptions for crate::common::QualityOfService {} +impl WithOptions for crate::mqtt_common::QualityOfService {} impl WithOptions for crate::sink::kafka::CompressionCodec {} impl WithOptions for nexmark::config::RateShape {} impl WithOptions for nexmark::event::EventType {} diff --git a/src/frontend/src/handler/create_sink.rs b/src/frontend/src/handler/create_sink.rs index 33fd407f4e987..a8a00181d745a 100644 --- a/src/frontend/src/handler/create_sink.rs +++ b/src/frontend/src/handler/create_sink.rs @@ -748,9 +748,9 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock vec![Encode::Json], ), MqttSink::SINK_NAME => hashmap!( - Format::Plain => vec![Encode::Json,Encode::Bytes], - Format::Upsert => vec![Encode::Json,Encode::Bytes], - Format::Debezium => vec![Encode::Json,Encode::Bytes], + Format::Plain => vec![Encode::Json], + Format::Upsert => vec![Encode::Json], + Format::Debezium => vec![Encode::Json], ), PulsarSink::SINK_NAME => hashmap!( Format::Plain => vec![Encode::Json],