Skip to content

Commit

Permalink
feat: Address PR comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bakjos committed Mar 5, 2024
1 parent fb3daee commit 35836a1
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 23 deletions.
22 changes: 7 additions & 15 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -698,40 +698,35 @@ pub enum QualityOfService {
#[derive(Debug, Clone, PartialEq, Display, Deserialize, EnumString)]
#[strum(serialize_all = "snake_case")]
pub enum Protocol {
Tls,
Tcp,
Ssl,
}

#[serde_as]
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct MqttCommon {
/// Protocol used for RisingWave to communicate with the mqtt brokers. Could be tcp or ssl
/// Protocol used for RisingWave to communicate with the mqtt brokers. Could be `tcp` or `ssl`, defaults to `tcp`
#[serde_as(as = "Option<DisplayFromStr>")]
#[serde(rename = "protocol")]
pub protocol: Option<Protocol>,

/// Hostname of the mqtt broker
#[serde(rename = "host")]
pub host: String,

/// Port of the mqtt broker, defaults to 1883 for tcp and 8883 for ssl
#[serde(rename = "port")]
pub port: Option<i32>,

/// The topic name to subscribe or publish to. When subscribing, it can be a wildcard topic. e.g /topic/#
#[serde(rename = "topic")]
pub topic: String,

/// Username for the mqtt broker
#[serde(rename = "username")]
pub user: Option<String>,

/// Password for the mqtt broker
#[serde(rename = "password")]
pub password: Option<String>,
#[serde(rename = "client_prefix")]

/// Prefix for the mqtt client id
/// Prefix for the mqtt client id.
/// The client id will be generated as `client_prefix`_`id`_`timestamp`. Defaults to risingwave
pub client_prefix: Option<String>,

/// `clean_start = true` removes all the state from queues & instructs the broker
Expand All @@ -740,27 +735,24 @@ pub struct MqttCommon {
/// When set `false`, broker will hold the client state and performs pending
/// operations on the client when reconnection with same `client_id`
/// happens. Local queue state is also held to retransmit packets after reconnection.
#[serde(rename = "clean_start")]
#[serde(default, deserialize_with = "deserialize_bool_from_string")]
pub clean_start: bool,

/// The maximum number of inflight messages. Defaults to 100
#[serde(rename = "inflight_messages")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub inflight_messages: Option<usize>,
#[serde(rename = "tls.ca")]

/// Path to CA certificate file for verifying the broker's key.
#[serde(rename = "tls.client_key")]
pub ca: Option<String>,
#[serde(rename = "tls.client_cert")]

/// Path to client's certificate file (PEM). Required for client authentication.
/// Can be a file path under fs:// or a string with the certificate content.
#[serde(rename = "tls.client_cert")]
pub client_cert: Option<String>,
#[serde(rename = "tls.client_key")]

/// Path to client's private key file (PEM). Required for client authentication.
/// Can be a file path under fs:// or a string with the private key content.
#[serde(rename = "tls.client_key")]
pub client_key: Option<String>,
}

Expand Down
9 changes: 5 additions & 4 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ MqttConfig:
fields:
- name: protocol
field_type: Protocol
comments: Protocol used for RisingWave to communicate with the mqtt brokers. Could be tcp or ssl
comments: Protocol used for RisingWave to communicate with the mqtt brokers. Could be `tcp` or `ssl`, defaults to `tcp`
required: false
- name: host
field_type: String
Expand All @@ -374,17 +374,18 @@ MqttConfig:
required: false
- name: client_prefix
field_type: String
comments: Prefix for the mqtt client id
comments: Prefix for the mqtt client id. The client id will be generated as `client_prefix`_`id`_`timestamp`. Defaults to risingwave
required: false
- name: clean_start
field_type: bool
comments: '`clean_start = true` removes all the state from queues & instructs the broker to clean all the client state when client disconnects. When set `false`, broker will hold the client state and performs pending operations on the client when reconnection with same `client_id` happens. Local queue state is also held to retransmit packets after reconnection.'
required: true
required: false
default: Default::default
- name: inflight_messages
field_type: usize
comments: The maximum number of inflight messages. Defaults to 100
required: false
- name: tls.ca
- name: tls.client_key
field_type: String
comments: Path to CA certificate file for verifying the broker's key.
required: false
Expand Down
9 changes: 5 additions & 4 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ MqttProperties:
fields:
- name: protocol
field_type: Protocol
comments: Protocol used for RisingWave to communicate with the mqtt brokers. Could be tcp or ssl
comments: Protocol used for RisingWave to communicate with the mqtt brokers. Could be `tcp` or `ssl`, defaults to `tcp`
required: false
- name: host
field_type: String
Expand All @@ -271,17 +271,18 @@ MqttProperties:
required: false
- name: client_prefix
field_type: String
comments: Prefix for the mqtt client id
comments: Prefix for the mqtt client id. The client id will be generated as `client_prefix`_`id`_`timestamp`. Defaults to risingwave
required: false
- name: clean_start
field_type: bool
comments: '`clean_start = true` removes all the state from queues & instructs the broker to clean all the client state when client disconnects. When set `false`, broker will hold the client state and performs pending operations on the client when reconnection with same `client_id` happens. Local queue state is also held to retransmit packets after reconnection.'
required: true
required: false
default: Default::default
- name: inflight_messages
field_type: usize
comments: The maximum number of inflight messages. Defaults to 100
required: false
- name: tls.ca
- name: tls.client_key
field_type: String
comments: Path to CA certificate file for verifying the broker's key.
required: false
Expand Down

0 comments on commit 35836a1

Please sign in to comment.