diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java index de3a12e5e25d3..7890b8f50a31b 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/SourceValidateHandler.java @@ -84,25 +84,24 @@ static void ensureRequiredProps(Map props, boolean isMultiTableS public static void validateSource(ConnectorServiceProto.ValidateSourceRequest request) throws Exception { var props = request.getPropertiesMap(); - var commonParam = request.getCommonParam(); - boolean isMultiTableShared = commonParam.getIsMultiTableShared(); + + boolean isCdcSourceJob = request.getIsSourceJob(); TableSchema tableSchema = TableSchema.fromProto(request.getTableSchema()); switch (request.getSourceType()) { case POSTGRES: - ensureRequiredProps(props, isMultiTableShared); + ensureRequiredProps(props, isCdcSourceJob); ensurePropNotBlank(props, DbzConnectorConfig.PG_SCHEMA_NAME); ensurePropNotBlank(props, DbzConnectorConfig.PG_SLOT_NAME); ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_NAME); ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_CREATE); - try (var validator = - new PostgresValidator(props, tableSchema, isMultiTableShared)) { + try (var validator = new PostgresValidator(props, tableSchema, isCdcSourceJob)) { validator.validateAll(); } break; case CITUS: - ensureRequiredProps(props, isMultiTableShared); + ensureRequiredProps(props, isCdcSourceJob); ensurePropNotBlank(props, DbzConnectorConfig.TABLE_NAME); ensurePropNotBlank(props, DbzConnectorConfig.PG_SCHEMA_NAME); try (var coordinatorValidator = new CitusValidator(props, tableSchema)) { @@ -131,9 +130,9 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re break; case MYSQL: - ensureRequiredProps(props, isMultiTableShared); + ensureRequiredProps(props, isCdcSourceJob); ensurePropNotBlank(props, DbzConnectorConfig.MYSQL_SERVER_ID); - try (var validator = new MySqlValidator(props, tableSchema, isMultiTableShared)) { + try (var validator = new MySqlValidator(props, tableSchema, isCdcSourceJob)) { validator.validateAll(); } break; diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DatabaseValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DatabaseValidator.java index 4e56662b1b99b..8626eb773694d 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DatabaseValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DatabaseValidator.java @@ -19,9 +19,9 @@ public abstract class DatabaseValidator { public void validateAll() { validateDbConfig(); validateUserPrivilege(); - // If the source connector is shared by multiple tables, it will capture events from + // If the source connector is created by share source, it will capture events from // multiple tables, skip validate its schema - if (!isMultiTableShared()) { + if (!isCdcSourceJob()) { validateTable(); } } @@ -35,5 +35,5 @@ public void validateAll() { /** Validate the properties of the source table */ abstract void validateTable(); - abstract boolean isMultiTableShared(); + abstract boolean isCdcSourceJob(); } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java index 5406f6fd5e952..f69ff4924c376 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzConnectorConfig.java @@ -115,7 +115,7 @@ public DbzConnectorConfig( String startOffset, Map userProps, boolean snapshotDone, - boolean isMultiTableShared) { + boolean isCdcSourceJob) { StringSubstitutor substitutor = new StringSubstitutor(userProps); var dbzProps = initiateDbConfig(DBZ_CONFIG_FILE, substitutor); @@ -124,13 +124,13 @@ public DbzConnectorConfig( && userProps.get(SNAPSHOT_MODE_KEY).equals(SNAPSHOT_MODE_BACKFILL); LOG.info( - "DbzConnectorConfig: source={}, sourceId={}, startOffset={}, snapshotDone={}, isCdcBackfill={}, isMultiTableShared={}", + "DbzConnectorConfig: source={}, sourceId={}, startOffset={}, snapshotDone={}, isCdcBackfill={}, isCdcSourceJob={}", source, sourceId, startOffset, snapshotDone, isCdcBackfill, - isMultiTableShared); + isCdcSourceJob); if (source == SourceTypeE.MYSQL) { var mysqlProps = initiateDbConfig(MYSQL_CONFIG_FILE, substitutor); @@ -196,7 +196,7 @@ public DbzConnectorConfig( dbzProps.putAll(postgresProps); - if (isMultiTableShared) { + if (isCdcSourceJob) { // remove table filtering for the shared Postgres source, since we // allow user to ingest tables in different schemas LOG.info("Disable table filtering for the shared Postgres source"); diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MongoDbValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MongoDbValidator.java index e33c704c91348..46b0c36304d78 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MongoDbValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MongoDbValidator.java @@ -55,7 +55,7 @@ void validateTable() { } @Override - boolean isMultiTableShared() { + boolean isCdcSourceJob() { return false; } } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java index 2c6cee3dfba73..ce0af4fb3554b 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/MySqlValidator.java @@ -32,10 +32,14 @@ public class MySqlValidator extends DatabaseValidator implements AutoCloseable { private final Connection jdbcConnection; - private final boolean isMultiTableShared; + private final boolean isCdcSourceJob; + private final boolean isBackfillTable; public MySqlValidator( - Map userProps, TableSchema tableSchema, boolean isMultiTableShared) + Map userProps, + TableSchema tableSchema, + boolean isCdcSourceJob, + boolean isBackfillTable) throws SQLException { this.userProps = userProps; this.tableSchema = tableSchema; @@ -48,7 +52,8 @@ public MySqlValidator( var user = userProps.get(DbzConnectorConfig.USER); var password = userProps.get(DbzConnectorConfig.PASSWORD); this.jdbcConnection = DriverManager.getConnection(jdbcUrl, user, password); - this.isMultiTableShared = isMultiTableShared; + this.isCdcSourceJob = isCdcSourceJob; + this.isBackfillTable = isBackfillTable; } @Override @@ -115,8 +120,8 @@ public void validateTable() { } @Override - boolean isMultiTableShared() { - return isMultiTableShared; + boolean isCdcSourceJob() { + return isCdcSourceJob; } private void validateTableSchema() throws SQLException { @@ -181,10 +186,7 @@ private void validateTableSchema() throws SQLException { } private void validatePrivileges() throws SQLException { - String[] privilegesRequired = { - "SELECT", "RELOAD", "REPLICATION SLAVE", "REPLICATION CLIENT", - }; - + String[] privilegesRequired = getRequiredPrivileges(); var hashSet = new HashSet<>(List.of(privilegesRequired)); try (var stmt = jdbcConnection.createStatement()) { var res = stmt.executeQuery(ValidatorUtils.getSql("mysql.grants")); @@ -208,6 +210,20 @@ private void validatePrivileges() throws SQLException { } } + private String[] getRequiredPrivileges() { + if (isCdcSourceJob) { + return new String[] {"SELECT", "REPLICATION SLAVE", "REPLICATION CLIENT"}; + } else if (isBackfillTable) { + // check privilege again to ensure the user has the privilege for backfill + return new String[] {"SELECT", "REPLICATION SLAVE", "REPLICATION CLIENT"}; + } else { + // dedicated source needs more privileges to acquire global lock + return new String[] { + "SELECT", "RELOAD", "SHOW DATABASES", "REPLICATION SLAVE", "REPLICATION CLIENT" + }; + } + } + @Override public void close() throws Exception { if (null != jdbcConnection) { diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index 17a38dc7a77df..3acf6cb6e206b 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -47,10 +47,14 @@ public class PostgresValidator extends DatabaseValidator implements AutoCloseabl // Whether the properties to validate is shared by multiple tables. // If true, we will skip validation check for table - private final boolean isMultiTableShared; + private final boolean isCdcSourceJob; + private final boolean isBackfillTable; public PostgresValidator( - Map userProps, TableSchema tableSchema, boolean isMultiTableShared) + Map userProps, + TableSchema tableSchema, + boolean isCdcSourceJob, + boolean isBackfillTable) throws SQLException { this.userProps = userProps; this.tableSchema = tableSchema; @@ -74,7 +78,8 @@ public PostgresValidator( this.pubAutoCreate = userProps.get(DbzConnectorConfig.PG_PUB_CREATE).equalsIgnoreCase("true"); - this.isMultiTableShared = isMultiTableShared; + this.isCdcSourceJob = isCdcSourceJob; + this.isBackfillTable = isBackfillTable; } @Override @@ -137,8 +142,8 @@ public void validateTable() { } @Override - boolean isMultiTableShared() { - return isMultiTableShared; + boolean isCdcSourceJob() { + return isCdcSourceJob; } /** For Citus which is a distributed version of PG */ @@ -157,7 +162,7 @@ public void validateDistributedTable() throws SQLException { } private void validateTableSchema() throws SQLException { - if (isMultiTableShared) { + if (isCdcSourceJob) { return; } try (var stmt = jdbcConnection.prepareStatement(ValidatorUtils.getSql("postgres.table"))) { @@ -285,7 +290,8 @@ private void validatePrivileges() throws SQLException { } private void validateTablePrivileges(boolean isSuperUser) throws SQLException { - if (isSuperUser || isMultiTableShared) { + // cdc source job doesn't have table schema to validate, since its schema is fixed to jsonb + if (isSuperUser || isCdcSourceJob) { return; } @@ -339,9 +345,9 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException } } - // If the source properties is shared by multiple tables, skip the following + // If the source properties is created by share source, skip the following // check of publication - if (isMultiTableShared) { + if (isCdcSourceJob) { return; } @@ -424,7 +430,7 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException } private void validatePublicationPrivileges() throws SQLException { - if (isMultiTableShared) { + if (isCdcSourceJob) { throw ValidatorUtils.invalidArgument( "The connector properties is shared by multiple tables unexpectedly"); } @@ -496,9 +502,9 @@ private void validatePublicationPrivileges() throws SQLException { } protected void alterPublicationIfNeeded() throws SQLException { - if (isMultiTableShared) { + if (isCdcSourceJob) { throw ValidatorUtils.invalidArgument( - "The connector properties is shared by multiple tables unexpectedly"); + "The connector properties is created by a shared source unexpectedly"); } String alterPublicationSql = diff --git a/proto/connector_service.proto b/proto/connector_service.proto index a03af6305192b..127601ee52a6d 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -175,17 +175,13 @@ enum SourceType { MONGODB = 4; } -message SourceCommonParam { - bool is_multi_table_shared = 1; -} - message GetEventStreamRequest { uint64 source_id = 1; SourceType source_type = 2; string start_offset = 3; map properties = 4; bool snapshot_done = 5; - SourceCommonParam common_param = 6; + bool is_source_job = 6; } message GetEventStreamResponse { @@ -202,7 +198,8 @@ message ValidateSourceRequest { SourceType source_type = 2; map properties = 3; TableSchema table_schema = 4; - SourceCommonParam common_param = 5; + bool is_source_job = 5; + bool is_backfill_table = 6; } message ValidateSourceResponse { diff --git a/src/connector/src/source/cdc/enumerator/mod.rs b/src/connector/src/source/cdc/enumerator/mod.rs index a3e891735d3f2..db1df10606576 100644 --- a/src/connector/src/source/cdc/enumerator/mod.rs +++ b/src/connector/src/source/cdc/enumerator/mod.rs @@ -23,9 +23,7 @@ use prost::Message; use risingwave_common::util::addr::HostAddr; use risingwave_jni_core::call_static_method; use risingwave_jni_core::jvm_runtime::JVM; -use risingwave_pb::connector_service::{ - SourceCommonParam, SourceType, ValidateSourceRequest, ValidateSourceResponse, -}; +use risingwave_pb::connector_service::{SourceType, ValidateSourceRequest, ValidateSourceResponse}; use crate::error::ConnectorResult; use crate::source::cdc::{ @@ -80,9 +78,8 @@ where source_type: props.get_source_type_pb() as _, properties: props.properties, table_schema: Some(props.table_schema), - common_param: Some(SourceCommonParam { - is_multi_table_shared: props.is_multi_table_shared, - }), + is_source_job: props.is_cdc_source_job, + is_backfill_table: props.is_backfill_table, }; let validate_source_request_bytes = diff --git a/src/connector/src/source/cdc/mod.rs b/src/connector/src/source/cdc/mod.rs index b283913b3479f..02e94dd337bd6 100644 --- a/src/connector/src/source/cdc/mod.rs +++ b/src/connector/src/source/cdc/mod.rs @@ -85,8 +85,11 @@ pub struct CdcProperties { /// Schema of the source specified by users pub table_schema: TableSchema, - /// Whether the properties is shared by multiple tables - pub is_multi_table_shared: bool, + /// Whether it is created by a cdc source job + pub is_cdc_source_job: bool, + + /// For validation purpose, mark if the table is a backfill cdc table + pub is_backfill_table: bool, pub _phantom: PhantomData, } @@ -96,14 +99,15 @@ impl TryFromHashmap for CdcProperties { properties: HashMap, _deny_unknown_fields: bool, ) -> ConnectorResult { - let is_multi_table_shared = properties + let is_share_source = properties .get(CDC_SHARING_MODE_KEY) .is_some_and(|v| v == "true"); Ok(CdcProperties { properties, table_schema: Default::default(), // TODO(siyuan): use serde to deserialize input hashmap - is_multi_table_shared, + is_cdc_source_job: is_share_source, + is_backfill_table: false, _phantom: PhantomData, }) } @@ -144,7 +148,7 @@ where }; self.table_schema = table_schema; if let Some(info) = source.info.as_ref() { - self.is_multi_table_shared = info.cdc_source_job; + self.is_cdc_source_job = info.cdc_source_job; } } @@ -159,8 +163,8 @@ where self.properties = properties; self.table_schema = table_schema; - // properties are not shared, so mark it as false - self.is_multi_table_shared = false; + self.is_cdc_source_job = false; + self.is_backfill_table = true; } } diff --git a/src/connector/src/source/cdc/source/reader.rs b/src/connector/src/source/cdc/source/reader.rs index 3e63d506fb9bf..f79176365d396 100644 --- a/src/connector/src/source/cdc/source/reader.rs +++ b/src/connector/src/source/cdc/source/reader.rs @@ -24,9 +24,7 @@ use risingwave_common::metrics::GLOBAL_ERROR_METRICS; use risingwave_common::util::addr::HostAddr; use risingwave_jni_core::jvm_runtime::JVM; use risingwave_jni_core::{call_static_method, JniReceiverType, JniSenderType}; -use risingwave_pb::connector_service::{ - GetEventStreamRequest, GetEventStreamResponse, SourceCommonParam, -}; +use risingwave_pb::connector_service::{GetEventStreamRequest, GetEventStreamResponse}; use thiserror_ext::AsReport; use tokio::sync::mpsc; @@ -106,9 +104,7 @@ impl SplitReader for CdcSplitReader { start_offset: split.start_offset().clone().unwrap_or_default(), properties, snapshot_done: split.snapshot_done(), - common_param: Some(SourceCommonParam { - is_multi_table_shared: conn_props.is_multi_table_shared, - }), + is_source_job: conn_props.is_cdc_source_job, }; std::thread::spawn(move || { diff --git a/src/rpc_client/src/connector_client.rs b/src/rpc_client/src/connector_client.rs index d627a692735c3..1acc538eca23a 100644 --- a/src/rpc_client/src/connector_client.rs +++ b/src/rpc_client/src/connector_client.rs @@ -203,7 +203,7 @@ impl ConnectorClient { start_offset: Option, properties: HashMap, snapshot_done: bool, - common_param: SourceCommonParam, + is_source_job: bool, ) -> Result> { Ok(self .rpc_client @@ -214,7 +214,7 @@ impl ConnectorClient { start_offset: start_offset.unwrap_or_default(), properties, snapshot_done, - common_param: Some(common_param), + is_source_job, }) .await .inspect_err(|err| { @@ -234,7 +234,8 @@ impl ConnectorClient { source_type: SourceType, properties: HashMap, table_schema: Option, - common_param: SourceCommonParam, + is_source_job: bool, + is_backfill_table: bool, ) -> Result<()> { let response = self .rpc_client @@ -244,7 +245,8 @@ impl ConnectorClient { source_type: source_type as _, properties, table_schema, - common_param: Some(common_param), + is_source_job, + is_backfill_table, }) .await .inspect_err(|err| {