Skip to content

Commit

Permalink
feat: support more SSL related configurations in Kafka connector (#18361
Browse files Browse the repository at this point in the history
)
  • Loading branch information
lmatz authored Sep 5, 2024
1 parent f2f5827 commit cb29fe0
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 0 deletions.
21 changes: 21 additions & 0 deletions src/connector/src/connector_common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,26 @@ pub struct KafkaCommon {
#[serde(rename = "properties.ssl.ca.location")]
ssl_ca_location: Option<String>,

/// CA certificate string (PEM format) for verifying the broker's key.
#[serde(rename = "properties.ssl.ca.pem")]
ssl_ca_pem: Option<String>,

/// Path to client's certificate file (PEM).
#[serde(rename = "properties.ssl.certificate.location")]
ssl_certificate_location: Option<String>,

/// Client's public key string (PEM format) used for authentication.
#[serde(rename = "properties.ssl.certificate.pem")]
ssl_certificate_pem: Option<String>,

/// Path to client's private key file (PEM).
#[serde(rename = "properties.ssl.key.location")]
ssl_key_location: Option<String>,

/// Client's private key string (PEM format) used for authentication.
#[serde(rename = "properties.ssl.key.pem")]
ssl_key_pem: Option<String>,

/// Passphrase of client's private key.
#[serde(rename = "properties.ssl.key.password")]
ssl_key_password: Option<String>,
Expand Down Expand Up @@ -325,12 +337,21 @@ impl KafkaCommon {
if let Some(ssl_ca_location) = self.ssl_ca_location.as_ref() {
config.set("ssl.ca.location", ssl_ca_location);
}
if let Some(ssl_ca_pem) = self.ssl_ca_pem.as_ref() {
config.set("ssl.ca.pem", ssl_ca_pem);
}
if let Some(ssl_certificate_location) = self.ssl_certificate_location.as_ref() {
config.set("ssl.certificate.location", ssl_certificate_location);
}
if let Some(ssl_certificate_pem) = self.ssl_certificate_pem.as_ref() {
config.set("ssl.certificate.pem", ssl_certificate_pem);
}
if let Some(ssl_key_location) = self.ssl_key_location.as_ref() {
config.set("ssl.key.location", ssl_key_location);
}
if let Some(ssl_key_pem) = self.ssl_key_pem.as_ref() {
config.set("ssl.key.pem", ssl_key_pem);
}
if let Some(ssl_key_password) = self.ssl_key_password.as_ref() {
config.set("ssl.key.password", ssl_key_password);
}
Expand Down
12 changes: 12 additions & 0 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -373,14 +373,26 @@ KafkaConfig:
field_type: String
comments: Path to CA certificate file for verifying the broker's key.
required: false
- name: properties.ssl.ca.pem
field_type: String
comments: CA certificate string (PEM format) for verifying the broker's key.
required: false
- name: properties.ssl.certificate.location
field_type: String
comments: Path to client's certificate file (PEM).
required: false
- name: properties.ssl.certificate.pem
field_type: String
comments: Client's public key string (PEM format) used for authentication.
required: false
- name: properties.ssl.key.location
field_type: String
comments: Path to client's private key file (PEM).
required: false
- name: properties.ssl.key.pem
field_type: String
comments: Client's private key string (PEM format) used for authentication.
required: false
- name: properties.ssl.key.password
field_type: String
comments: Passphrase of client's private key.
Expand Down
12 changes: 12 additions & 0 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,26 @@ KafkaProperties:
field_type: String
comments: Path to CA certificate file for verifying the broker's key.
required: false
- name: properties.ssl.ca.pem
field_type: String
comments: CA certificate string (PEM format) for verifying the broker's key.
required: false
- name: properties.ssl.certificate.location
field_type: String
comments: Path to client's certificate file (PEM).
required: false
- name: properties.ssl.certificate.pem
field_type: String
comments: Client's public key string (PEM format) used for authentication.
required: false
- name: properties.ssl.key.location
field_type: String
comments: Path to client's private key file (PEM).
required: false
- name: properties.ssl.key.pem
field_type: String
comments: Client's private key string (PEM format) used for authentication.
required: false
- name: properties.ssl.key.password
field_type: String
comments: Passphrase of client's private key.
Expand Down

0 comments on commit cb29fe0

Please sign in to comment.