From 35836a1fb75576c95a415f16c0ddda07c17733d2 Mon Sep 17 00:00:00 2001 From: Gio Gutierrez Date: Tue, 5 Mar 2024 12:25:20 -0500 Subject: [PATCH] feat: Address PR comments --- src/connector/src/common.rs | 22 +++++++--------------- src/connector/with_options_sink.yaml | 9 +++++---- src/connector/with_options_source.yaml | 9 +++++---- 3 files changed, 17 insertions(+), 23 deletions(-) diff --git a/src/connector/src/common.rs b/src/connector/src/common.rs index 6944652fb1e8f..b64d9e281eaff 100644 --- a/src/connector/src/common.rs +++ b/src/connector/src/common.rs @@ -698,28 +698,24 @@ pub enum QualityOfService { #[derive(Debug, Clone, PartialEq, Display, Deserialize, EnumString)] #[strum(serialize_all = "snake_case")] pub enum Protocol { - Tls, + Tcp, Ssl, } #[serde_as] #[derive(Deserialize, Debug, Clone, WithOptions)] pub struct MqttCommon { - /// Protocol used for RisingWave to communicate with the mqtt brokers. Could be tcp or ssl + /// Protocol used for RisingWave to communicate with the mqtt brokers. Could be `tcp` or `ssl`, defaults to `tcp` #[serde_as(as = "Option")] - #[serde(rename = "protocol")] pub protocol: Option, /// Hostname of the mqtt broker - #[serde(rename = "host")] pub host: String, /// Port of the mqtt broker, defaults to 1883 for tcp and 8883 for ssl - #[serde(rename = "port")] pub port: Option, /// The topic name to subscribe or publish to. When subscribing, it can be a wildcard topic. e.g /topic/# - #[serde(rename = "topic")] pub topic: String, /// Username for the mqtt broker @@ -727,11 +723,10 @@ pub struct MqttCommon { pub user: Option, /// Password for the mqtt broker - #[serde(rename = "password")] pub password: Option, - #[serde(rename = "client_prefix")] - /// Prefix for the mqtt client id + /// Prefix for the mqtt client id. + /// The client id will be generated as `client_prefix`_`id`_`timestamp`. Defaults to risingwave pub client_prefix: Option, /// `clean_start = true` removes all the state from queues & instructs the broker @@ -740,27 +735,24 @@ pub struct MqttCommon { /// When set `false`, broker will hold the client state and performs pending /// operations on the client when reconnection with same `client_id` /// happens. Local queue state is also held to retransmit packets after reconnection. - #[serde(rename = "clean_start")] #[serde(default, deserialize_with = "deserialize_bool_from_string")] pub clean_start: bool, /// The maximum number of inflight messages. Defaults to 100 - #[serde(rename = "inflight_messages")] #[serde_as(as = "Option")] pub inflight_messages: Option, - #[serde(rename = "tls.ca")] /// Path to CA certificate file for verifying the broker's key. + #[serde(rename = "tls.client_key")] pub ca: Option, - #[serde(rename = "tls.client_cert")] - /// Path to client's certificate file (PEM). Required for client authentication. /// Can be a file path under fs:// or a string with the certificate content. + #[serde(rename = "tls.client_cert")] pub client_cert: Option, - #[serde(rename = "tls.client_key")] /// Path to client's private key file (PEM). Required for client authentication. /// Can be a file path under fs:// or a string with the private key content. + #[serde(rename = "tls.client_key")] pub client_key: Option, } diff --git a/src/connector/with_options_sink.yaml b/src/connector/with_options_sink.yaml index 36ece730c0156..70f1240297cd6 100644 --- a/src/connector/with_options_sink.yaml +++ b/src/connector/with_options_sink.yaml @@ -350,7 +350,7 @@ MqttConfig: fields: - name: protocol field_type: Protocol - comments: Protocol used for RisingWave to communicate with the mqtt brokers. Could be tcp or ssl + comments: Protocol used for RisingWave to communicate with the mqtt brokers. Could be `tcp` or `ssl`, defaults to `tcp` required: false - name: host field_type: String @@ -374,17 +374,18 @@ MqttConfig: required: false - name: client_prefix field_type: String - comments: Prefix for the mqtt client id + comments: Prefix for the mqtt client id. The client id will be generated as `client_prefix`_`id`_`timestamp`. Defaults to risingwave required: false - name: clean_start field_type: bool comments: '`clean_start = true` removes all the state from queues & instructs the broker to clean all the client state when client disconnects. When set `false`, broker will hold the client state and performs pending operations on the client when reconnection with same `client_id` happens. Local queue state is also held to retransmit packets after reconnection.' - required: true + required: false + default: Default::default - name: inflight_messages field_type: usize comments: The maximum number of inflight messages. Defaults to 100 required: false - - name: tls.ca + - name: tls.client_key field_type: String comments: Path to CA certificate file for verifying the broker's key. required: false diff --git a/src/connector/with_options_source.yaml b/src/connector/with_options_source.yaml index 9f906d1c5e2c2..cb929d32ddce4 100644 --- a/src/connector/with_options_source.yaml +++ b/src/connector/with_options_source.yaml @@ -247,7 +247,7 @@ MqttProperties: fields: - name: protocol field_type: Protocol - comments: Protocol used for RisingWave to communicate with the mqtt brokers. Could be tcp or ssl + comments: Protocol used for RisingWave to communicate with the mqtt brokers. Could be `tcp` or `ssl`, defaults to `tcp` required: false - name: host field_type: String @@ -271,17 +271,18 @@ MqttProperties: required: false - name: client_prefix field_type: String - comments: Prefix for the mqtt client id + comments: Prefix for the mqtt client id. The client id will be generated as `client_prefix`_`id`_`timestamp`. Defaults to risingwave required: false - name: clean_start field_type: bool comments: '`clean_start = true` removes all the state from queues & instructs the broker to clean all the client state when client disconnects. When set `false`, broker will hold the client state and performs pending operations on the client when reconnection with same `client_id` happens. Local queue state is also held to retransmit packets after reconnection.' - required: true + required: false + default: Default::default - name: inflight_messages field_type: usize comments: The maximum number of inflight messages. Defaults to 100 required: false - - name: tls.ca + - name: tls.client_key field_type: String comments: Path to CA certificate file for verifying the broker's key. required: false