From 487f5e467e036143fbe15ce1b858868eafaeb9ad Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 4 Mar 2024 10:03:20 +0800 Subject: [PATCH 1/9] fix mysql shared source privileges --- .../source/SourceValidateHandler.java | 6 +++--- .../source/common/DatabaseValidator.java | 6 ++++-- .../source/common/MySqlValidator.java | 19 +++++++++++++++++-- .../source/common/PostgresValidator.java | 5 +++++ 4 files changed, 29 insertions(+), 7 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java index d6afb17fb0ccf..de3a12e5e25d3 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java @@ -97,7 +97,7 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_CREATE); try (var validator = new PostgresValidator(props, tableSchema, isMultiTableShared)) { - validator.validateAll(isMultiTableShared); + validator.validateAll(); } break; @@ -133,8 +133,8 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re case MYSQL: ensureRequiredProps(props, isMultiTableShared); ensurePropNotBlank(props, DbzConnectorConfig.MYSQL_SERVER_ID); - try (var validator = new MySqlValidator(props, tableSchema)) { - validator.validateAll(isMultiTableShared); + try (var validator = new MySqlValidator(props, tableSchema, isMultiTableShared)) { + validator.validateAll(); } break; case MONGODB: diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DatabaseValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DatabaseValidator.java index 85010147bac59..4e56662b1b99b 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DatabaseValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DatabaseValidator.java @@ -16,12 +16,12 @@ public abstract class DatabaseValidator { - public void validateAll(boolean isMultiTableShared) { + public void validateAll() { validateDbConfig(); validateUserPrivilege(); // If the source connector is shared by multiple tables, it will capture events from // multiple tables, skip validate its schema - if (!isMultiTableShared) { + if (!isMultiTableShared()) { validateTable(); } } @@ -34,4 +34,6 @@ public void validateAll(boolean isMultiTableShared) { /** Validate the properties of the source table */ abstract void validateTable(); + + abstract boolean isMultiTableShared(); } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java index c5a58ede7c773..b01dd76b001bf 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java @@ -32,7 +32,10 @@ public class MySqlValidator extends DatabaseValidator implements AutoCloseable { private final Connection jdbcConnection; - public MySqlValidator(Map userProps, TableSchema tableSchema) + private final boolean isMultiTableShared; + + public MySqlValidator( + Map userProps, TableSchema tableSchema, boolean isMultiTableShared) throws SQLException { this.userProps = userProps; this.tableSchema = tableSchema; @@ -45,6 +48,7 @@ public MySqlValidator(Map userProps, TableSchema tableSchema) var user = userProps.get(DbzConnectorConfig.USER); var password = userProps.get(DbzConnectorConfig.PASSWORD); this.jdbcConnection = DriverManager.getConnection(jdbcUrl, user, password); + this.isMultiTableShared = isMultiTableShared; } @Override @@ -110,6 +114,11 @@ public void validateTable() { } } + @Override + boolean isMultiTableShared() { + return isMultiTableShared; + } + private void validateTableSchema() throws SQLException { // check whether table exist var dbName = userProps.get(DbzConnectorConfig.DB_NAME); @@ -172,10 +181,16 @@ private void validateTableSchema() throws SQLException { } private void validatePrivileges() throws SQLException { - String[] privilegesRequired = { + final String[] dedicatedSourcePrivileges = { "SELECT", "RELOAD", "SHOW DATABASES", "REPLICATION SLAVE", "REPLICATION CLIENT", }; + final String[] sharedSourcePrivileges = { + "SELECT", "REPLICATION SLAVE", "REPLICATION CLIENT", + }; + + String[] privilegesRequired = + isMultiTableShared ? sharedSourcePrivileges : dedicatedSourcePrivileges; var hashSet = new HashSet<>(List.of(privilegesRequired)); try (var stmt = jdbcConnection.createStatement()) { var res = stmt.executeQuery(ValidatorUtils.getSql("mysql.grants")); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index 7014c3adb92da..17a38dc7a77df 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -136,6 +136,11 @@ public void validateTable() { } } + @Override + boolean isMultiTableShared() { + return isMultiTableShared; + } + /** For Citus which is a distributed version of PG */ public void validateDistributedTable() throws SQLException { try (var stmt = From 5d7d8df1c7f2c05c7ab5629296320fbc9acf8222 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 4 Mar 2024 10:45:43 +0800 Subject: [PATCH 2/9] fix --- .../risingwave/connector/source/common/MongoDbValidator.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MongoDbValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MongoDbValidator.java index 0ce5634e34b46..e33c704c91348 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MongoDbValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MongoDbValidator.java @@ -53,4 +53,9 @@ void validateUserPrivilege() { void validateTable() { // do nothing since MongoDB is schemaless } + + @Override + boolean isMultiTableShared() { + return false; + } } From 8aa96112ba9a44a5984dd16e822e4fb229d4ab2b Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 4 Mar 2024 11:06:30 +0800 Subject: [PATCH 3/9] e2e test --- e2e_test/source/cdc/mysql_cdc.sql | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/e2e_test/source/cdc/mysql_cdc.sql b/e2e_test/source/cdc/mysql_cdc.sql index 81d7a46f17876..35d6d1cd47ee2 100644 --- a/e2e_test/source/cdc/mysql_cdc.sql +++ b/e2e_test/source/cdc/mysql_cdc.sql @@ -49,8 +49,9 @@ VALUES (1,1,'no'), (3,3,'no'), (4,4,'no'); +-- user dbz used for shared mysql source CREATE USER 'dbz'@'%' IDENTIFIED BY '123456'; -GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbz'@'%'; +GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbz'@'%'; CREATE TABLE tt3 (v1 int primary key, v2 timestamp); INSERT INTO tt3 VALUES (1, '2020-07-30 10:08:22'); From 56fc6ba20997031a2d8e2556d4e2646a207fa38f Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 4 Mar 2024 13:31:39 +0800 Subject: [PATCH 4/9] fix e2e test --- e2e_test/source/cdc/cdc.share_stream.slt | 2 +- e2e_test/source/cdc/mysql_cdc.sql | 8 ++++++-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 7739d3f1ad6ea..07ffd6eb470ac 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -17,7 +17,7 @@ create source mysql_mytest with ( connector = 'mysql-cdc', hostname = '${MYSQL_HOST:localhost}', port = '${MYSQL_TCP_PORT:8306}', - username = 'dbz', + username = 'rwcdc', password = '${MYSQL_PWD:}', database.name = 'mytest', server.id = '5601' diff --git a/e2e_test/source/cdc/mysql_cdc.sql b/e2e_test/source/cdc/mysql_cdc.sql index 35d6d1cd47ee2..c7d0d1b02e89d 100644 --- a/e2e_test/source/cdc/mysql_cdc.sql +++ b/e2e_test/source/cdc/mysql_cdc.sql @@ -49,10 +49,14 @@ VALUES (1,1,'no'), (3,3,'no'), (4,4,'no'); --- user dbz used for shared mysql source -CREATE USER 'dbz'@'%' IDENTIFIED BY '123456'; +-- user rwcdc used for shared mysql source +CREATE USER 'rwcdc'@'%' IDENTIFIED BY '123456'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbz'@'%'; +CREATE USER 'dbz'@'%' IDENTIFIED BY '123456'; +GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbz'@'%'; + + CREATE TABLE tt3 (v1 int primary key, v2 timestamp); INSERT INTO tt3 VALUES (1, '2020-07-30 10:08:22'); INSERT INTO tt3 VALUES (2, '2020-07-31 10:09:22'); From 79d301ae7130d0487553c81f5fe08bb651597da6 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 4 Mar 2024 14:10:29 +0800 Subject: [PATCH 5/9] minor --- e2e_test/source/cdc/mysql_cdc.sql | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/e2e_test/source/cdc/mysql_cdc.sql b/e2e_test/source/cdc/mysql_cdc.sql index c7d0d1b02e89d..dd1a1f70628a0 100644 --- a/e2e_test/source/cdc/mysql_cdc.sql +++ b/e2e_test/source/cdc/mysql_cdc.sql @@ -51,12 +51,11 @@ VALUES (1,1,'no'), -- user rwcdc used for shared mysql source CREATE USER 'rwcdc'@'%' IDENTIFIED BY '123456'; -GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbz'@'%'; +GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'rwcdc'@'%'; CREATE USER 'dbz'@'%' IDENTIFIED BY '123456'; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbz'@'%'; - CREATE TABLE tt3 (v1 int primary key, v2 timestamp); INSERT INTO tt3 VALUES (1, '2020-07-30 10:08:22'); INSERT INTO tt3 VALUES (2, '2020-07-31 10:09:22'); From 08c235a50c2003aa5e08ede496c38ca4644780b3 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 4 Mar 2024 15:50:18 +0800 Subject: [PATCH 6/9] quick workaround --- e2e_test/source/cdc/mysql_cdc.sql | 9 +++++---- .../connector/source/common/MySqlValidator.java | 10 ++-------- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/e2e_test/source/cdc/mysql_cdc.sql b/e2e_test/source/cdc/mysql_cdc.sql index dd1a1f70628a0..0ac47fc7fbd1b 100644 --- a/e2e_test/source/cdc/mysql_cdc.sql +++ b/e2e_test/source/cdc/mysql_cdc.sql @@ -49,13 +49,14 @@ VALUES (1,1,'no'), (3,3,'no'), (4,4,'no'); --- user rwcdc used for shared mysql source -CREATE USER 'rwcdc'@'%' IDENTIFIED BY '123456'; -GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'rwcdc'@'%'; - CREATE USER 'dbz'@'%' IDENTIFIED BY '123456'; GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'dbz'@'%'; +CREATE USER 'rwcdc'@'%' IDENTIFIED BY '123456'; +GRANT SELECT, RELOAD, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'rwcdc'@'%'; + +FLUSH PRIVILEGES; + CREATE TABLE tt3 (v1 int primary key, v2 timestamp); INSERT INTO tt3 VALUES (1, '2020-07-30 10:08:22'); INSERT INTO tt3 VALUES (2, '2020-07-31 10:09:22'); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java index b01dd76b001bf..2c6cee3dfba73 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java @@ -181,16 +181,10 @@ private void validateTableSchema() throws SQLException { } private void validatePrivileges() throws SQLException { - final String[] dedicatedSourcePrivileges = { - "SELECT", "RELOAD", "SHOW DATABASES", "REPLICATION SLAVE", "REPLICATION CLIENT", + String[] privilegesRequired = { + "SELECT", "RELOAD", "REPLICATION SLAVE", "REPLICATION CLIENT", }; - final String[] sharedSourcePrivileges = { - "SELECT", "REPLICATION SLAVE", "REPLICATION CLIENT", - }; - - String[] privilegesRequired = - isMultiTableShared ? sharedSourcePrivileges : dedicatedSourcePrivileges; var hashSet = new HashSet<>(List.of(privilegesRequired)); try (var stmt = jdbcConnection.createStatement()) { var res = stmt.executeQuery(ValidatorUtils.getSql("mysql.grants")); From b526a062876190ef86a348fe89fc9ac2a7a73085 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Wed, 6 Mar 2024 11:11:10 +0800 Subject: [PATCH 7/9] refactor cdc privilege check for share source and backfill table --- .../source/SourceValidateHandler.java | 15 ++++---- .../source/common/DatabaseValidator.java | 6 ++-- .../source/common/DbzConnectorConfig.java | 8 ++--- .../source/common/MongoDbValidator.java | 2 +- .../source/common/MySqlValidator.java | 34 ++++++++++++++----- .../source/common/PostgresValidator.java | 30 +++++++++------- proto/connector_service.proto | 9 ++--- .../src/source/cdc/enumerator/mod.rs | 9 ++--- src/connector/src/source/cdc/mod.rs | 18 ++++++---- src/connector/src/source/cdc/source/reader.rs | 8 ++--- src/rpc_client/src/connector_client.rs | 10 +++--- 11 files changed, 83 insertions(+), 66 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java index de3a12e5e25d3..7890b8f50a31b 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java @@ -84,25 +84,24 @@ static void ensureRequiredProps(Map props, boolean isMultiTableS public static void validateSource(ConnectorServiceProto.ValidateSourceRequest request) throws Exception { var props = request.getPropertiesMap(); - var commonParam = request.getCommonParam(); - boolean isMultiTableShared = commonParam.getIsMultiTableShared(); + + boolean isCdcSourceJob = request.getIsSourceJob(); TableSchema tableSchema = TableSchema.fromProto(request.getTableSchema()); switch (request.getSourceType()) { case POSTGRES: - ensureRequiredProps(props, isMultiTableShared); + ensureRequiredProps(props, isCdcSourceJob); ensurePropNotBlank(props, DbzConnectorConfig.PG_SCHEMA_NAME); ensurePropNotBlank(props, DbzConnectorConfig.PG_SLOT_NAME); ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_NAME); ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_CREATE); - try (var validator = - new PostgresValidator(props, tableSchema, isMultiTableShared)) { + try (var validator = new PostgresValidator(props, tableSchema, isCdcSourceJob)) { validator.validateAll(); } break; case CITUS: - ensureRequiredProps(props, isMultiTableShared); + ensureRequiredProps(props, isCdcSourceJob); ensurePropNotBlank(props, DbzConnectorConfig.TABLE_NAME); ensurePropNotBlank(props, DbzConnectorConfig.PG_SCHEMA_NAME); try (var coordinatorValidator = new CitusValidator(props, tableSchema)) { @@ -131,9 +130,9 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re break; case MYSQL: - ensureRequiredProps(props, isMultiTableShared); + ensureRequiredProps(props, isCdcSourceJob); ensurePropNotBlank(props, DbzConnectorConfig.MYSQL_SERVER_ID); - try (var validator = new MySqlValidator(props, tableSchema, isMultiTableShared)) { + try (var validator = new MySqlValidator(props, tableSchema, isCdcSourceJob)) { validator.validateAll(); } break; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DatabaseValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DatabaseValidator.java index 4e56662b1b99b..8626eb773694d 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DatabaseValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DatabaseValidator.java @@ -19,9 +19,9 @@ public abstract class DatabaseValidator { public void validateAll() { validateDbConfig(); validateUserPrivilege(); - // If the source connector is shared by multiple tables, it will capture events from + // If the source connector is created by share source, it will capture events from // multiple tables, skip validate its schema - if (!isMultiTableShared()) { + if (!isCdcSourceJob()) { validateTable(); } } @@ -35,5 +35,5 @@ public void validateAll() { /** Validate the properties of the source table */ abstract void validateTable(); - abstract boolean isMultiTableShared(); + abstract boolean isCdcSourceJob(); } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index 5406f6fd5e952..f69ff4924c376 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -115,7 +115,7 @@ public DbzConnectorConfig( String startOffset, Map userProps, boolean snapshotDone, - boolean isMultiTableShared) { + boolean isCdcSourceJob) { StringSubstitutor substitutor = new StringSubstitutor(userProps); var dbzProps = initiateDbConfig(DBZ_CONFIG_FILE, substitutor); @@ -124,13 +124,13 @@ public DbzConnectorConfig( && userProps.get(SNAPSHOT_MODE_KEY).equals(SNAPSHOT_MODE_BACKFILL); LOG.info( - "DbzConnectorConfig: source={}, sourceId={}, startOffset={}, snapshotDone={}, isCdcBackfill={}, isMultiTableShared={}", + "DbzConnectorConfig: source={}, sourceId={}, startOffset={}, snapshotDone={}, isCdcBackfill={}, isCdcSourceJob={}", source, sourceId, startOffset, snapshotDone, isCdcBackfill, - isMultiTableShared); + isCdcSourceJob); if (source == SourceTypeE.MYSQL) { var mysqlProps = initiateDbConfig(MYSQL_CONFIG_FILE, substitutor); @@ -196,7 +196,7 @@ public DbzConnectorConfig( dbzProps.putAll(postgresProps); - if (isMultiTableShared) { + if (isCdcSourceJob) { // remove table filtering for the shared Postgres source, since we // allow user to ingest tables in different schemas LOG.info("Disable table filtering for the shared Postgres source"); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MongoDbValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MongoDbValidator.java index e33c704c91348..46b0c36304d78 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MongoDbValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MongoDbValidator.java @@ -55,7 +55,7 @@ void validateTable() { } @Override - boolean isMultiTableShared() { + boolean isCdcSourceJob() { return false; } } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java index 2c6cee3dfba73..ce0af4fb3554b 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java @@ -32,10 +32,14 @@ public class MySqlValidator extends DatabaseValidator implements AutoCloseable { private final Connection jdbcConnection; - private final boolean isMultiTableShared; + private final boolean isCdcSourceJob; + private final boolean isBackfillTable; public MySqlValidator( - Map userProps, TableSchema tableSchema, boolean isMultiTableShared) + Map userProps, + TableSchema tableSchema, + boolean isCdcSourceJob, + boolean isBackfillTable) throws SQLException { this.userProps = userProps; this.tableSchema = tableSchema; @@ -48,7 +52,8 @@ public MySqlValidator( var user = userProps.get(DbzConnectorConfig.USER); var password = userProps.get(DbzConnectorConfig.PASSWORD); this.jdbcConnection = DriverManager.getConnection(jdbcUrl, user, password); - this.isMultiTableShared = isMultiTableShared; + this.isCdcSourceJob = isCdcSourceJob; + this.isBackfillTable = isBackfillTable; } @Override @@ -115,8 +120,8 @@ public void validateTable() { } @Override - boolean isMultiTableShared() { - return isMultiTableShared; + boolean isCdcSourceJob() { + return isCdcSourceJob; } private void validateTableSchema() throws SQLException { @@ -181,10 +186,7 @@ private void validateTableSchema() throws SQLException { } private void validatePrivileges() throws SQLException { - String[] privilegesRequired = { - "SELECT", "RELOAD", "REPLICATION SLAVE", "REPLICATION CLIENT", - }; - + String[] privilegesRequired = getRequiredPrivileges(); var hashSet = new HashSet<>(List.of(privilegesRequired)); try (var stmt = jdbcConnection.createStatement()) { var res = stmt.executeQuery(ValidatorUtils.getSql("mysql.grants")); @@ -208,6 +210,20 @@ private void validatePrivileges() throws SQLException { } } + private String[] getRequiredPrivileges() { + if (isCdcSourceJob) { + return new String[] {"SELECT", "REPLICATION SLAVE", "REPLICATION CLIENT"}; + } else if (isBackfillTable) { + // check privilege again to ensure the user has the privilege for backfill + return new String[] {"SELECT", "REPLICATION SLAVE", "REPLICATION CLIENT"}; + } else { + // dedicated source needs more privileges to acquire global lock + return new String[] { + "SELECT", "RELOAD", "SHOW DATABASES", "REPLICATION SLAVE", "REPLICATION CLIENT" + }; + } + } + @Override public void close() throws Exception { if (null != jdbcConnection) { diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index 17a38dc7a77df..3acf6cb6e206b 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -47,10 +47,14 @@ public class PostgresValidator extends DatabaseValidator implements AutoCloseabl // Whether the properties to validate is shared by multiple tables. // If true, we will skip validation check for table - private final boolean isMultiTableShared; + private final boolean isCdcSourceJob; + private final boolean isBackfillTable; public PostgresValidator( - Map userProps, TableSchema tableSchema, boolean isMultiTableShared) + Map userProps, + TableSchema tableSchema, + boolean isCdcSourceJob, + boolean isBackfillTable) throws SQLException { this.userProps = userProps; this.tableSchema = tableSchema; @@ -74,7 +78,8 @@ public PostgresValidator( this.pubAutoCreate = userProps.get(DbzConnectorConfig.PG_PUB_CREATE).equalsIgnoreCase("true"); - this.isMultiTableShared = isMultiTableShared; + this.isCdcSourceJob = isCdcSourceJob; + this.isBackfillTable = isBackfillTable; } @Override @@ -137,8 +142,8 @@ public void validateTable() { } @Override - boolean isMultiTableShared() { - return isMultiTableShared; + boolean isCdcSourceJob() { + return isCdcSourceJob; } /** For Citus which is a distributed version of PG */ @@ -157,7 +162,7 @@ public void validateDistributedTable() throws SQLException { } private void validateTableSchema() throws SQLException { - if (isMultiTableShared) { + if (isCdcSourceJob) { return; } try (var stmt = jdbcConnection.prepareStatement(ValidatorUtils.getSql("postgres.table"))) { @@ -285,7 +290,8 @@ private void validatePrivileges() throws SQLException { } private void validateTablePrivileges(boolean isSuperUser) throws SQLException { - if (isSuperUser || isMultiTableShared) { + // cdc source job doesn't have table schema to validate, since its schema is fixed to jsonb + if (isSuperUser || isCdcSourceJob) { return; } @@ -339,9 +345,9 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException } } - // If the source properties is shared by multiple tables, skip the following + // If the source properties is created by share source, skip the following // check of publication - if (isMultiTableShared) { + if (isCdcSourceJob) { return; } @@ -424,7 +430,7 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException } private void validatePublicationPrivileges() throws SQLException { - if (isMultiTableShared) { + if (isCdcSourceJob) { throw ValidatorUtils.invalidArgument( "The connector properties is shared by multiple tables unexpectedly"); } @@ -496,9 +502,9 @@ private void validatePublicationPrivileges() throws SQLException { } protected void alterPublicationIfNeeded() throws SQLException { - if (isMultiTableShared) { + if (isCdcSourceJob) { throw ValidatorUtils.invalidArgument( - "The connector properties is shared by multiple tables unexpectedly"); + "The connector properties is created by a shared source unexpectedly"); } String alterPublicationSql = diff --git a/proto/connector_service.proto b/proto/connector_service.proto index a03af6305192b..127601ee52a6d 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -175,17 +175,13 @@ enum SourceType { MONGODB = 4; } -message SourceCommonParam { - bool is_multi_table_shared = 1; -} - message GetEventStreamRequest { uint64 source_id = 1; SourceType source_type = 2; string start_offset = 3; map properties = 4; bool snapshot_done = 5; - SourceCommonParam common_param = 6; + bool is_source_job = 6; } message GetEventStreamResponse { @@ -202,7 +198,8 @@ message ValidateSourceRequest { SourceType source_type = 2; map properties = 3; TableSchema table_schema = 4; - SourceCommonParam common_param = 5; + bool is_source_job = 5; + bool is_backfill_table = 6; } message ValidateSourceResponse { diff --git a/src/connector/src/source/cdc/enumerator/mod.rs b/src/connector/src/source/cdc/enumerator/mod.rs index a3e891735d3f2..db1df10606576 100644 --- a/src/connector/src/source/cdc/enumerator/mod.rs +++ b/src/connector/src/source/cdc/enumerator/mod.rs @@ -23,9 +23,7 @@ use prost::Message; use risingwave_common::util::addr::HostAddr; use risingwave_jni_core::call_static_method; use risingwave_jni_core::jvm_runtime::JVM; -use risingwave_pb::connector_service::{ - SourceCommonParam, SourceType, ValidateSourceRequest, ValidateSourceResponse, -}; +use risingwave_pb::connector_service::{SourceType, ValidateSourceRequest, ValidateSourceResponse}; use crate::error::ConnectorResult; use crate::source::cdc::{ @@ -80,9 +78,8 @@ where source_type: props.get_source_type_pb() as _, properties: props.properties, table_schema: Some(props.table_schema), - common_param: Some(SourceCommonParam { - is_multi_table_shared: props.is_multi_table_shared, - }), + is_source_job: props.is_cdc_source_job, + is_backfill_table: props.is_backfill_table, }; let validate_source_request_bytes = diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index b283913b3479f..02e94dd337bd6 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -85,8 +85,11 @@ pub struct CdcProperties { /// Schema of the source specified by users pub table_schema: TableSchema, - /// Whether the properties is shared by multiple tables - pub is_multi_table_shared: bool, + /// Whether it is created by a cdc source job + pub is_cdc_source_job: bool, + + /// For validation purpose, mark if the table is a backfill cdc table + pub is_backfill_table: bool, pub _phantom: PhantomData, } @@ -96,14 +99,15 @@ impl TryFromHashmap for CdcProperties { properties: HashMap, _deny_unknown_fields: bool, ) -> ConnectorResult { - let is_multi_table_shared = properties + let is_share_source = properties .get(CDC_SHARING_MODE_KEY) .is_some_and(|v| v == "true"); Ok(CdcProperties { properties, table_schema: Default::default(), // TODO(siyuan): use serde to deserialize input hashmap - is_multi_table_shared, + is_cdc_source_job: is_share_source, + is_backfill_table: false, _phantom: PhantomData, }) } @@ -144,7 +148,7 @@ where }; self.table_schema = table_schema; if let Some(info) = source.info.as_ref() { - self.is_multi_table_shared = info.cdc_source_job; + self.is_cdc_source_job = info.cdc_source_job; } } @@ -159,8 +163,8 @@ where self.properties = properties; self.table_schema = table_schema; - // properties are not shared, so mark it as false - self.is_multi_table_shared = false; + self.is_cdc_source_job = false; + self.is_backfill_table = true; } } diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index 3e63d506fb9bf..f79176365d396 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -24,9 +24,7 @@ use risingwave_common::metrics::GLOBAL_ERROR_METRICS; use risingwave_common::util::addr::HostAddr; use risingwave_jni_core::jvm_runtime::JVM; use risingwave_jni_core::{call_static_method, JniReceiverType, JniSenderType}; -use risingwave_pb::connector_service::{ - GetEventStreamRequest, GetEventStreamResponse, SourceCommonParam, -}; +use risingwave_pb::connector_service::{GetEventStreamRequest, GetEventStreamResponse}; use thiserror_ext::AsReport; use tokio::sync::mpsc; @@ -106,9 +104,7 @@ impl SplitReader for CdcSplitReader { start_offset: split.start_offset().clone().unwrap_or_default(), properties, snapshot_done: split.snapshot_done(), - common_param: Some(SourceCommonParam { - is_multi_table_shared: conn_props.is_multi_table_shared, - }), + is_source_job: conn_props.is_cdc_source_job, }; std::thread::spawn(move || { diff --git a/src/rpc_client/src/connector_client.rs b/src/rpc_client/src/connector_client.rs index d627a692735c3..1acc538eca23a 100644 --- a/src/rpc_client/src/connector_client.rs +++ b/src/rpc_client/src/connector_client.rs @@ -203,7 +203,7 @@ impl ConnectorClient { start_offset: Option, properties: HashMap, snapshot_done: bool, - common_param: SourceCommonParam, + is_source_job: bool, ) -> Result> { Ok(self .rpc_client @@ -214,7 +214,7 @@ impl ConnectorClient { start_offset: start_offset.unwrap_or_default(), properties, snapshot_done, - common_param: Some(common_param), + is_source_job, }) .await .inspect_err(|err| { @@ -234,7 +234,8 @@ impl ConnectorClient { source_type: SourceType, properties: HashMap, table_schema: Option, - common_param: SourceCommonParam, + is_source_job: bool, + is_backfill_table: bool, ) -> Result<()> { let response = self .rpc_client @@ -244,7 +245,8 @@ impl ConnectorClient { source_type: source_type as _, properties, table_schema, - common_param: Some(common_param), + is_source_job, + is_backfill_table, }) .await .inspect_err(|err| { From f475bde9aa183ce05c39f14f5eef6b82360c1a65 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 11 Mar 2024 10:26:42 +0800 Subject: [PATCH 8/9] minior --- .../source/common/DatabaseValidator.java | 1 + .../source/common/MySqlValidator.java | 79 +++++++++---------- 2 files changed, 40 insertions(+), 40 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DatabaseValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DatabaseValidator.java index 8626eb773694d..4435ecc5128ea 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DatabaseValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DatabaseValidator.java @@ -35,5 +35,6 @@ public void validateAll() { /** Validate the properties of the source table */ abstract void validateTable(); + /** Check if the validation is for CDC source job */ abstract boolean isCdcSourceJob(); } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java index ce0af4fb3554b..c92b4dad540c9 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java @@ -32,7 +32,9 @@ public class MySqlValidator extends DatabaseValidator implements AutoCloseable { private final Connection jdbcConnection; + // validation is for cdc source job private final boolean isCdcSourceJob; + // validation is for backfill table private final boolean isBackfillTable; public MySqlValidator( @@ -104,12 +106,48 @@ private void validateBinlogConfig() throws SQLException { @Override public void validateUserPrivilege() { try { - validatePrivileges(); + String[] privilegesRequired = getRequiredPrivileges(); + var hashSet = new HashSet<>(List.of(privilegesRequired)); + try (var stmt = jdbcConnection.createStatement()) { + var res = stmt.executeQuery(ValidatorUtils.getSql("mysql.grants")); + while (res.next()) { + String granted = res.getString(1).toUpperCase(); + // mysql 5.7 root user has all privileges + if (granted.contains("ALL")) { + // all privileges granted, check passed + return; + } + + // remove granted privilege from the set + hashSet.removeIf(granted::contains); + if (hashSet.isEmpty()) { + break; + } + } + if (!hashSet.isEmpty()) { + throw ValidatorUtils.invalidArgument( + "MySQL user doesn't have enough privileges: " + hashSet); + } + } } catch (SQLException e) { throw ValidatorUtils.internalError(e.getMessage()); } } + private String[] getRequiredPrivileges() { + if (isCdcSourceJob) { + return new String[] {"SELECT", "REPLICATION SLAVE", "REPLICATION CLIENT"}; + } else if (isBackfillTable) { + // check privilege again to ensure the user has the privilege to backfill + return new String[] {"SELECT", "REPLICATION SLAVE", "REPLICATION CLIENT"}; + } else { + // dedicated source needs more privileges to acquire global lock + return new String[] { + "SELECT", "RELOAD", "SHOW DATABASES", "REPLICATION SLAVE", "REPLICATION CLIENT" + }; + } + } + @Override public void validateTable() { try { @@ -185,45 +223,6 @@ private void validateTableSchema() throws SQLException { } } - private void validatePrivileges() throws SQLException { - String[] privilegesRequired = getRequiredPrivileges(); - var hashSet = new HashSet<>(List.of(privilegesRequired)); - try (var stmt = jdbcConnection.createStatement()) { - var res = stmt.executeQuery(ValidatorUtils.getSql("mysql.grants")); - while (res.next()) { - String granted = res.getString(1).toUpperCase(); - // all privileges granted, check passed - if (granted.contains("ALL")) { - break; - } - - // remove granted privilege from the set - hashSet.removeIf(granted::contains); - if (hashSet.isEmpty()) { - break; - } - } - if (!hashSet.isEmpty()) { - throw ValidatorUtils.invalidArgument( - "MySQL user doesn't have enough privileges: " + hashSet); - } - } - } - - private String[] getRequiredPrivileges() { - if (isCdcSourceJob) { - return new String[] {"SELECT", "REPLICATION SLAVE", "REPLICATION CLIENT"}; - } else if (isBackfillTable) { - // check privilege again to ensure the user has the privilege for backfill - return new String[] {"SELECT", "REPLICATION SLAVE", "REPLICATION CLIENT"}; - } else { - // dedicated source needs more privileges to acquire global lock - return new String[] { - "SELECT", "RELOAD", "SHOW DATABASES", "REPLICATION SLAVE", "REPLICATION CLIENT" - }; - } - } - @Override public void close() throws Exception { if (null != jdbcConnection) { From be8188c168641770c396d51a8c17c54be5de9a1a Mon Sep 17 00:00:00 2001 From: StrikeW Date: Mon, 11 Mar 2024 11:07:44 +0800 Subject: [PATCH 9/9] fix complie bugs --- .../risingwave/connector/source/SourceRequestHandler.java | 2 +- .../connector/source/SourceValidateHandler.java | 8 +++++--- .../connector/source/common/PostgresValidator.java | 7 +------ .../connector/source/core/JniDbzSourceHandler.java | 5 ++--- .../connector/source/core/SourceHandlerFactory.java | 4 ++-- 5 files changed, 11 insertions(+), 15 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceRequestHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceRequestHandler.java index 3568deec9d2dc..c6d5336c72573 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceRequestHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceRequestHandler.java @@ -36,7 +36,7 @@ public void handle(ConnectorServiceProto.GetEventStreamRequest request) { request.getStartOffset(), request.getPropertiesMap(), request.getSnapshotDone(), - request.getCommonParam().getIsMultiTableShared()); + request.getIsSourceJob()); handler.startSource( (ServerCallStreamObserver) responseObserver); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java index 7890b8f50a31b..6890195c1cbef 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java @@ -69,14 +69,14 @@ private static void ensurePropNotBlank(Map props, String name) { } } - static void ensureRequiredProps(Map props, boolean isMultiTableShared) { + static void ensureRequiredProps(Map props, boolean isCdcSourceJob) { ensurePropNotBlank(props, DbzConnectorConfig.HOST); ensurePropNotBlank(props, DbzConnectorConfig.PORT); ensurePropNotBlank(props, DbzConnectorConfig.DB_NAME); ensurePropNotBlank(props, DbzConnectorConfig.USER); ensurePropNotBlank(props, DbzConnectorConfig.PASSWORD); // ensure table name is passed by user in non-sharing mode - if (!isMultiTableShared) { + if (!isCdcSourceJob) { ensurePropNotBlank(props, DbzConnectorConfig.TABLE_NAME); } } @@ -86,6 +86,7 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re var props = request.getPropertiesMap(); boolean isCdcSourceJob = request.getIsSourceJob(); + boolean isBackfillTable = request.getIsBackfillTable(); TableSchema tableSchema = TableSchema.fromProto(request.getTableSchema()); switch (request.getSourceType()) { @@ -132,7 +133,8 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re case MYSQL: ensureRequiredProps(props, isCdcSourceJob); ensurePropNotBlank(props, DbzConnectorConfig.MYSQL_SERVER_ID); - try (var validator = new MySqlValidator(props, tableSchema, isCdcSourceJob)) { + try (var validator = + new MySqlValidator(props, tableSchema, isCdcSourceJob, isBackfillTable)) { validator.validateAll(); } break; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index 3acf6cb6e206b..6dbb52b287488 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -48,13 +48,9 @@ public class PostgresValidator extends DatabaseValidator implements AutoCloseabl // Whether the properties to validate is shared by multiple tables. // If true, we will skip validation check for table private final boolean isCdcSourceJob; - private final boolean isBackfillTable; public PostgresValidator( - Map userProps, - TableSchema tableSchema, - boolean isCdcSourceJob, - boolean isBackfillTable) + Map userProps, TableSchema tableSchema, boolean isCdcSourceJob) throws SQLException { this.userProps = userProps; this.tableSchema = tableSchema; @@ -79,7 +75,6 @@ public PostgresValidator( this.pubAutoCreate = userProps.get(DbzConnectorConfig.PG_PUB_CREATE).equalsIgnoreCase("true"); this.isCdcSourceJob = isCdcSourceJob; - this.isBackfillTable = isBackfillTable; } @Override diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java index 00b387eff2bb0..949ccd403edc9 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/JniDbzSourceHandler.java @@ -51,8 +51,7 @@ public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long // userProps extracted from request, underlying implementation is UnmodifiableMap Map mutableUserProps = new HashMap<>(request.getPropertiesMap()); mutableUserProps.put("source.id", Long.toString(request.getSourceId())); - var commonParam = request.getCommonParam(); - boolean isMultiTableShared = commonParam.getIsMultiTableShared(); + boolean isCdcSourceJob = request.getIsSourceJob(); if (request.getSourceType() == POSTGRES) { DbzSourceUtils.createPostgresPublicationIfNeeded( @@ -66,7 +65,7 @@ public static void runJniDbzSourceThread(byte[] getEventStreamRequestBytes, long request.getStartOffset(), mutableUserProps, request.getSnapshotDone(), - isMultiTableShared); + isCdcSourceJob); JniDbzSourceHandler handler = new JniDbzSourceHandler(config); handler.start(channelPtr); } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/SourceHandlerFactory.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/SourceHandlerFactory.java index 9f6ba9c9a013a..e47df568e812f 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/SourceHandlerFactory.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/SourceHandlerFactory.java @@ -31,7 +31,7 @@ public static SourceHandler createSourceHandler( String startOffset, Map userProps, boolean snapshotDone, - boolean isMultiTableShared) { + boolean isCdcSourceJob) { // userProps extracted from grpc request, underlying implementation is UnmodifiableMap Map mutableUserProps = new HashMap<>(userProps); mutableUserProps.put("source.id", Long.toString(sourceId)); @@ -42,7 +42,7 @@ public static SourceHandler createSourceHandler( startOffset, mutableUserProps, snapshotDone, - isMultiTableShared); + isCdcSourceJob); return new DbzSourceHandler(config); } }