Skip to content

Commit

Permalink
feat: add support for SSL mode in postgres connector
Browse files Browse the repository at this point in the history
  • Loading branch information
jetjinser committed Mar 19, 2024
1 parent eb60978 commit 425a119
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 9 deletions.
25 changes: 20 additions & 5 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,10 @@ mysql_common = { version = "0.31", default-features = false, features = [
nexmark = { version = "0.2", features = ["serde"] }
num-bigint = "0.4"
opendal = "0.44"
openssl = "0.10"
parking_lot = "0.12"
paste = "1"
postgres-openssl = "0.5.0"
prometheus = { version = "0.13", features = ["process"] }
prost = { version = "0.12", features = ["no-recursion-limit"] }
prost-reflect = "0.13"
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ def_anyhow_newtype! {
tokio_rustls::rustls::Error => "TLS error",
rumqttc::v5::ClientError => "MQTT error",
rumqttc::v5::OptionError => "MQTT error",

openssl::error::ErrorStack => "OpenSSL error",
}

pub type ConnectorResult<T, E = ConnectorError> = std::result::Result<T, E>;
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ pub struct ExternalTableConfig {
pub schema: String,
#[serde(rename = "table.name")]
pub table: String,
#[serde(rename = "ssl.name", default = "Default::default")]
#[serde(rename = "ssl.mode", default = "Default::default")]
pub sslmode: SslMode,
}

Expand Down
10 changes: 7 additions & 3 deletions src/connector/src/source/cdc/external/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ use futures::stream::BoxStream;
use futures::{pin_mut, StreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use openssl::ssl::{SslConnector, SslMethod};
use postgres_openssl::MakeTlsConnector;
use risingwave_common::catalog::Schema;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::DatumRef;
use serde_derive::{Deserialize, Serialize};
use thiserror_ext::AsReport;
use tokio_postgres::types::PgLsn;
use tokio_postgres::NoTls;

use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::postgres_row_to_owned_row;
Expand Down Expand Up @@ -135,10 +136,13 @@ impl PostgresExternalTableReader {
config.host,
config.port,
config.database,
dbg!(&config.sslmode)
config.sslmode
);

let (client, connection) = tokio_postgres::connect(&database_url, NoTls).await?;
let builder = SslConnector::builder(SslMethod::tls())?;
let connector = MakeTlsConnector::new(builder.build());

let (client, connection) = tokio_postgres::connect(&database_url, connector).await?;

tokio::spawn(async move {
if let Err(e) = connection.await {
Expand Down

0 comments on commit 425a119

Please sign in to comment.