diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index 33808447caad..a84c8722ab59 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -47,6 +47,7 @@ public class DbzConnectorConfig { /* MySQL configs */ public static final String MYSQL_SERVER_ID = "server.id"; + public static final String MYSQL_SSL_MODE = "ssl.mode"; /* Postgres configs */ public static final String PG_SLOT_NAME = "slot.name"; @@ -231,8 +232,8 @@ public DbzConnectorConfig( ConfigurableOffsetBackingStore.OFFSET_STATE_VALUE, startOffset); } - var mongodbUrl = userProps.get("mongodb.url"); - var collection = userProps.get("collection.name"); + var mongodbUrl = userProps.get(MongoDb.MONGO_URL); + var collection = userProps.get(MongoDb.MONGO_COLLECTION_NAME); var connectionStr = new ConnectionString(mongodbUrl); var connectorName = String.format( diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java index c92b4dad540c..a130c7107499 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java @@ -20,10 +20,7 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; +import java.util.*; public class MySqlValidator extends DatabaseValidator implements AutoCloseable { private final Map userProps; @@ -51,9 +48,14 @@ public MySqlValidator( var dbName = userProps.get(DbzConnectorConfig.DB_NAME); var jdbcUrl = ValidatorUtils.getJdbcUrl(SourceTypeE.MYSQL, dbHost, dbPort, dbName); - var user = userProps.get(DbzConnectorConfig.USER); - var password = userProps.get(DbzConnectorConfig.PASSWORD); - this.jdbcConnection = DriverManager.getConnection(jdbcUrl, user, password); + var properties = new Properties(); + properties.setProperty("user", userProps.get(DbzConnectorConfig.USER)); + properties.setProperty("password", userProps.get(DbzConnectorConfig.PASSWORD)); + properties.setProperty( + "sslMode", userProps.getOrDefault(DbzConnectorConfig.MYSQL_SSL_MODE, "DISABLED")); + properties.setProperty("allowPublicKeyRetrieval", "true"); + + this.jdbcConnection = DriverManager.getConnection(jdbcUrl, properties); this.isCdcSourceJob = isCdcSourceJob; this.isBackfillTable = isBackfillTable; } diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties index bcf99d0c2958..f77dd3c1ea4f 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/mysql.properties @@ -15,6 +15,8 @@ schema.history.internal.store.only.captured.databases.ddl=true # default to disable schema change events include.schema.changes=${debezium.include.schema.changes:-false} database.server.id=${server.id} +# default to use unencrypted connection +database.ssl.mode=${ssl.mode:-disabled} # default heartbeat interval 60 seconds heartbeat.interval.ms=${debezium.heartbeat.interval.ms:-60000} # In sharing cdc mode, we will subscribe to multiple tables in the given database, diff --git a/java/connector-node/risingwave-sink-iceberg/pom.xml b/java/connector-node/risingwave-sink-iceberg/pom.xml index 9f733d830a47..a491823bb07f 100644 --- a/java/connector-node/risingwave-sink-iceberg/pom.xml +++ b/java/connector-node/risingwave-sink-iceberg/pom.xml @@ -113,8 +113,8 @@ postgresql - mysql - mysql-connector-java + com.mysql + mysql-connector-j org.xerial diff --git a/java/connector-node/risingwave-sink-jdbc/pom.xml b/java/connector-node/risingwave-sink-jdbc/pom.xml index 0390d57a6687..52947595c741 100644 --- a/java/connector-node/risingwave-sink-jdbc/pom.xml +++ b/java/connector-node/risingwave-sink-jdbc/pom.xml @@ -38,8 +38,8 @@ postgresql - mysql - mysql-connector-java + com.mysql + mysql-connector-j diff --git a/java/connector-node/risingwave-source-cdc/pom.xml b/java/connector-node/risingwave-source-cdc/pom.xml index fd1dccd3f092..5ee531ef805e 100644 --- a/java/connector-node/risingwave-source-cdc/pom.xml +++ b/java/connector-node/risingwave-source-cdc/pom.xml @@ -50,8 +50,8 @@ - mysql - mysql-connector-java + com.mysql + mysql-connector-j com.zendesk diff --git a/java/pom.xml b/java/pom.xml index 89df5b870077..922c62ead69e 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -85,7 +85,7 @@ 1.18.0 1.17.6 42.5.5 - 8.0.28 + 8.0.33 4.11.1 3.45.0.0 2.21.42 @@ -178,8 +178,8 @@ ${postgresql.version} - mysql - mysql-connector-java + com.mysql + mysql-connector-j ${mysql.connector.java.version} @@ -360,11 +360,6 @@ apache-client ${aws.version} - - org.apache.hadoop - hadoop-common - ${hadoop.version} - org.apache.hive hive-metastore diff --git a/src/connector/src/source/cdc/external/mod.rs b/src/connector/src/source/cdc/external/mod.rs index ed197975b766..9c9e2ee557e0 100644 --- a/src/connector/src/source/cdc/external/mod.rs +++ b/src/connector/src/source/cdc/external/mod.rs @@ -229,7 +229,6 @@ pub enum ExternalTableReaderImpl { #[derive(Debug)] pub struct MySqlExternalTableReader { - config: ExternalTableConfig, rw_schema: Schema, field_names: String, // use mutex to provide shared mutable access to the connection @@ -250,7 +249,7 @@ pub struct ExternalTableConfig { #[serde(rename = "table.name")] pub table: String, /// `ssl.mode` specifies the SSL/TLS encryption level for secure communication with Postgres. - /// Choices include `disable`, `prefer`, and `require`. + /// Choices include `disabled`, `preferred`, and `required`. /// This field is optional. #[serde(rename = "ssl.mode", default = "Default::default")] pub sslmode: SslMode, @@ -259,24 +258,24 @@ pub struct ExternalTableConfig { #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "lowercase")] pub enum SslMode { - Disable, - Prefer, - Require, + Disabled, + Preferred, + Required, } impl Default for SslMode { fn default() -> Self { - // default to `disable` for backward compatibility - Self::Disable + // default to `disabled` for backward compatibility + Self::Disabled } } impl fmt::Display for SslMode { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(match self { - SslMode::Disable => "disable", - SslMode::Prefer => "prefer", - SslMode::Require => "require", + SslMode::Disabled => "disabled", + SslMode::Preferred => "preferred", + SslMode::Required => "required", }) } } @@ -320,19 +319,29 @@ impl MySqlExternalTableReader { with_properties: HashMap, rw_schema: Schema, ) -> ConnectorResult { - tracing::debug!(?rw_schema, "create mysql external table reader"); - let config = serde_json::from_value::( serde_json::to_value(with_properties).unwrap(), ) .context("failed to extract mysql connector properties")?; - let database_url = format!( - "mysql://{}:{}@{}:{}/{}", - config.username, config.password, config.host, config.port, config.database - ); - let opts = mysql_async::Opts::from_url(&database_url).map_err(mysql_async::Error::Url)?; - let conn = mysql_async::Conn::new(opts).await?; + let mut opts_builder = mysql_async::OptsBuilder::default() + .user(Some(config.username)) + .pass(Some(config.password)) + .ip_or_hostname(config.host) + .tcp_port(config.port.parse::().unwrap()) + .db_name(Some(config.database)); + + opts_builder = match config.sslmode { + SslMode::Disabled | SslMode::Preferred => opts_builder.ssl_opts(None), + SslMode::Required => { + let ssl_without_verify = mysql_async::SslOpts::default() + .with_danger_accept_invalid_certs(true) + .with_danger_skip_domain_validation(true); + opts_builder.ssl_opts(Some(ssl_without_verify)) + } + }; + + let conn = mysql_async::Conn::new(mysql_async::Opts::from(opts_builder)).await?; let field_names = rw_schema .fields @@ -342,7 +351,6 @@ impl MySqlExternalTableReader { .join(","); Ok(Self { - config, rw_schema, field_names, conn: tokio::sync::Mutex::new(conn), diff --git a/src/connector/src/source/cdc/external/postgres.rs b/src/connector/src/source/cdc/external/postgres.rs index 9df4b32e3f5e..80cb322fc56c 100644 --- a/src/connector/src/source/cdc/external/postgres.rs +++ b/src/connector/src/source/cdc/external/postgres.rs @@ -132,31 +132,36 @@ impl PostgresExternalTableReader { ) .context("failed to extract postgres connector properties")?; - let database_url = format!( - "postgresql://{}:{}@{}:{}/{}?sslmode={}", - config.username, - config.password, - config.host, - config.port, - config.database, - config.sslmode - ); + let mut pg_config = tokio_postgres::Config::new(); + pg_config + .user(&config.username) + .password(&config.password) + .host(&config.host) + .port(config.port.parse::().unwrap()) + .dbname(&config.database); #[cfg(not(madsim))] let connector = match config.sslmode { - SslMode::Disable => MaybeMakeTlsConnector::NoTls(NoTls), - SslMode::Prefer => match SslConnector::builder(SslMethod::tls()) { - Ok(mut builder) => { - // disable certificate verification for `prefer` - builder.set_verify(SslVerifyMode::NONE); - MaybeMakeTlsConnector::Tls(MakeTlsConnector::new(builder.build())) - } - Err(e) => { - tracing::warn!(error = %e.as_report(), "SSL connector error"); - MaybeMakeTlsConnector::NoTls(NoTls) + SslMode::Disabled => { + pg_config.ssl_mode(tokio_postgres::config::SslMode::Disable); + MaybeMakeTlsConnector::NoTls(NoTls) + } + SslMode::Preferred => { + pg_config.ssl_mode(tokio_postgres::config::SslMode::Prefer); + match SslConnector::builder(SslMethod::tls()) { + Ok(mut builder) => { + // disable certificate verification for `prefer` + builder.set_verify(SslVerifyMode::NONE); + MaybeMakeTlsConnector::Tls(MakeTlsConnector::new(builder.build())) + } + Err(e) => { + tracing::warn!(error = %e.as_report(), "SSL connector error"); + MaybeMakeTlsConnector::NoTls(NoTls) + } } - }, - SslMode::Require => { + } + SslMode::Required => { + pg_config.ssl_mode(tokio_postgres::config::SslMode::Require); let mut builder = SslConnector::builder(SslMethod::tls())?; // disable certificate verification for `require` builder.set_verify(SslVerifyMode::NONE); @@ -166,7 +171,7 @@ impl PostgresExternalTableReader { #[cfg(madsim)] let connector = NoTls; - let (client, connection) = tokio_postgres::connect(&database_url, connector).await?; + let (client, connection) = pg_config.connect(connector).await?; tokio::spawn(async move { if let Err(e) = connection.await {