Skip to content

Commit

Permalink
refactor cdc privilege check for share source and backfill table
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW committed Mar 6, 2024
1 parent 08c235a commit b526a06
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,24 @@ static void ensureRequiredProps(Map<String, String> props, boolean isMultiTableS
public static void validateSource(ConnectorServiceProto.ValidateSourceRequest request)
throws Exception {
var props = request.getPropertiesMap();
var commonParam = request.getCommonParam();
boolean isMultiTableShared = commonParam.getIsMultiTableShared();

boolean isCdcSourceJob = request.getIsSourceJob();

TableSchema tableSchema = TableSchema.fromProto(request.getTableSchema());
switch (request.getSourceType()) {
case POSTGRES:
ensureRequiredProps(props, isMultiTableShared);
ensureRequiredProps(props, isCdcSourceJob);
ensurePropNotBlank(props, DbzConnectorConfig.PG_SCHEMA_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_SLOT_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_CREATE);
try (var validator =
new PostgresValidator(props, tableSchema, isMultiTableShared)) {
try (var validator = new PostgresValidator(props, tableSchema, isCdcSourceJob)) {
validator.validateAll();
}
break;

case CITUS:
ensureRequiredProps(props, isMultiTableShared);
ensureRequiredProps(props, isCdcSourceJob);
ensurePropNotBlank(props, DbzConnectorConfig.TABLE_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_SCHEMA_NAME);
try (var coordinatorValidator = new CitusValidator(props, tableSchema)) {
Expand Down Expand Up @@ -131,9 +130,9 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re

break;
case MYSQL:
ensureRequiredProps(props, isMultiTableShared);
ensureRequiredProps(props, isCdcSourceJob);
ensurePropNotBlank(props, DbzConnectorConfig.MYSQL_SERVER_ID);
try (var validator = new MySqlValidator(props, tableSchema, isMultiTableShared)) {
try (var validator = new MySqlValidator(props, tableSchema, isCdcSourceJob)) {
validator.validateAll();
}
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ public abstract class DatabaseValidator {
public void validateAll() {
validateDbConfig();
validateUserPrivilege();
// If the source connector is shared by multiple tables, it will capture events from
// If the source connector is created by share source, it will capture events from
// multiple tables, skip validate its schema
if (!isMultiTableShared()) {
if (!isCdcSourceJob()) {
validateTable();
}
}
Expand All @@ -35,5 +35,5 @@ public void validateAll() {
/** Validate the properties of the source table */
abstract void validateTable();

abstract boolean isMultiTableShared();
abstract boolean isCdcSourceJob();
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public DbzConnectorConfig(
String startOffset,
Map<String, String> userProps,
boolean snapshotDone,
boolean isMultiTableShared) {
boolean isCdcSourceJob) {

StringSubstitutor substitutor = new StringSubstitutor(userProps);
var dbzProps = initiateDbConfig(DBZ_CONFIG_FILE, substitutor);
Expand All @@ -124,13 +124,13 @@ public DbzConnectorConfig(
&& userProps.get(SNAPSHOT_MODE_KEY).equals(SNAPSHOT_MODE_BACKFILL);

LOG.info(
"DbzConnectorConfig: source={}, sourceId={}, startOffset={}, snapshotDone={}, isCdcBackfill={}, isMultiTableShared={}",
"DbzConnectorConfig: source={}, sourceId={}, startOffset={}, snapshotDone={}, isCdcBackfill={}, isCdcSourceJob={}",
source,
sourceId,
startOffset,
snapshotDone,
isCdcBackfill,
isMultiTableShared);
isCdcSourceJob);

if (source == SourceTypeE.MYSQL) {
var mysqlProps = initiateDbConfig(MYSQL_CONFIG_FILE, substitutor);
Expand Down Expand Up @@ -196,7 +196,7 @@ public DbzConnectorConfig(

dbzProps.putAll(postgresProps);

if (isMultiTableShared) {
if (isCdcSourceJob) {
// remove table filtering for the shared Postgres source, since we
// allow user to ingest tables in different schemas
LOG.info("Disable table filtering for the shared Postgres source");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ void validateTable() {
}

@Override
boolean isMultiTableShared() {
boolean isCdcSourceJob() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,14 @@ public class MySqlValidator extends DatabaseValidator implements AutoCloseable {

private final Connection jdbcConnection;

private final boolean isMultiTableShared;
private final boolean isCdcSourceJob;
private final boolean isBackfillTable;

public MySqlValidator(
Map<String, String> userProps, TableSchema tableSchema, boolean isMultiTableShared)
Map<String, String> userProps,
TableSchema tableSchema,
boolean isCdcSourceJob,
boolean isBackfillTable)
throws SQLException {
this.userProps = userProps;
this.tableSchema = tableSchema;
Expand All @@ -48,7 +52,8 @@ public MySqlValidator(
var user = userProps.get(DbzConnectorConfig.USER);
var password = userProps.get(DbzConnectorConfig.PASSWORD);
this.jdbcConnection = DriverManager.getConnection(jdbcUrl, user, password);
this.isMultiTableShared = isMultiTableShared;
this.isCdcSourceJob = isCdcSourceJob;
this.isBackfillTable = isBackfillTable;
}

@Override
Expand Down Expand Up @@ -115,8 +120,8 @@ public void validateTable() {
}

@Override
boolean isMultiTableShared() {
return isMultiTableShared;
boolean isCdcSourceJob() {
return isCdcSourceJob;
}

private void validateTableSchema() throws SQLException {
Expand Down Expand Up @@ -181,10 +186,7 @@ private void validateTableSchema() throws SQLException {
}

private void validatePrivileges() throws SQLException {
String[] privilegesRequired = {
"SELECT", "RELOAD", "REPLICATION SLAVE", "REPLICATION CLIENT",
};

String[] privilegesRequired = getRequiredPrivileges();
var hashSet = new HashSet<>(List.of(privilegesRequired));
try (var stmt = jdbcConnection.createStatement()) {
var res = stmt.executeQuery(ValidatorUtils.getSql("mysql.grants"));
Expand All @@ -208,6 +210,20 @@ private void validatePrivileges() throws SQLException {
}
}

private String[] getRequiredPrivileges() {
if (isCdcSourceJob) {
return new String[] {"SELECT", "REPLICATION SLAVE", "REPLICATION CLIENT"};
} else if (isBackfillTable) {
// check privilege again to ensure the user has the privilege for backfill
return new String[] {"SELECT", "REPLICATION SLAVE", "REPLICATION CLIENT"};
} else {
// dedicated source needs more privileges to acquire global lock
return new String[] {
"SELECT", "RELOAD", "SHOW DATABASES", "REPLICATION SLAVE", "REPLICATION CLIENT"
};
}
}

@Override
public void close() throws Exception {
if (null != jdbcConnection) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@ public class PostgresValidator extends DatabaseValidator implements AutoCloseabl

// Whether the properties to validate is shared by multiple tables.
// If true, we will skip validation check for table
private final boolean isMultiTableShared;
private final boolean isCdcSourceJob;
private final boolean isBackfillTable;

public PostgresValidator(
Map<String, String> userProps, TableSchema tableSchema, boolean isMultiTableShared)
Map<String, String> userProps,
TableSchema tableSchema,
boolean isCdcSourceJob,
boolean isBackfillTable)
throws SQLException {
this.userProps = userProps;
this.tableSchema = tableSchema;
Expand All @@ -74,7 +78,8 @@ public PostgresValidator(

this.pubAutoCreate =
userProps.get(DbzConnectorConfig.PG_PUB_CREATE).equalsIgnoreCase("true");
this.isMultiTableShared = isMultiTableShared;
this.isCdcSourceJob = isCdcSourceJob;
this.isBackfillTable = isBackfillTable;
}

@Override
Expand Down Expand Up @@ -137,8 +142,8 @@ public void validateTable() {
}

@Override
boolean isMultiTableShared() {
return isMultiTableShared;
boolean isCdcSourceJob() {
return isCdcSourceJob;
}

/** For Citus which is a distributed version of PG */
Expand All @@ -157,7 +162,7 @@ public void validateDistributedTable() throws SQLException {
}

private void validateTableSchema() throws SQLException {
if (isMultiTableShared) {
if (isCdcSourceJob) {
return;
}
try (var stmt = jdbcConnection.prepareStatement(ValidatorUtils.getSql("postgres.table"))) {
Expand Down Expand Up @@ -285,7 +290,8 @@ private void validatePrivileges() throws SQLException {
}

private void validateTablePrivileges(boolean isSuperUser) throws SQLException {
if (isSuperUser || isMultiTableShared) {
// cdc source job doesn't have table schema to validate, since its schema is fixed to jsonb
if (isSuperUser || isCdcSourceJob) {
return;
}

Expand Down Expand Up @@ -339,9 +345,9 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException
}
}

// If the source properties is shared by multiple tables, skip the following
// If the source properties is created by share source, skip the following
// check of publication
if (isMultiTableShared) {
if (isCdcSourceJob) {
return;
}

Expand Down Expand Up @@ -424,7 +430,7 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException
}

private void validatePublicationPrivileges() throws SQLException {
if (isMultiTableShared) {
if (isCdcSourceJob) {
throw ValidatorUtils.invalidArgument(
"The connector properties is shared by multiple tables unexpectedly");
}
Expand Down Expand Up @@ -496,9 +502,9 @@ private void validatePublicationPrivileges() throws SQLException {
}

protected void alterPublicationIfNeeded() throws SQLException {
if (isMultiTableShared) {
if (isCdcSourceJob) {
throw ValidatorUtils.invalidArgument(
"The connector properties is shared by multiple tables unexpectedly");
"The connector properties is created by a shared source unexpectedly");
}

String alterPublicationSql =
Expand Down
9 changes: 3 additions & 6 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,13 @@ enum SourceType {
MONGODB = 4;
}

message SourceCommonParam {
bool is_multi_table_shared = 1;
}

message GetEventStreamRequest {
uint64 source_id = 1;
SourceType source_type = 2;
string start_offset = 3;
map<string, string> properties = 4;
bool snapshot_done = 5;
SourceCommonParam common_param = 6;
bool is_source_job = 6;

Check failure on line 184 in proto/connector_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "6" with name "is_source_job" on message "GetEventStreamRequest" changed option "json_name" from "commonParam" to "isSourceJob".

Check failure on line 184 in proto/connector_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "6" on message "GetEventStreamRequest" changed type from "message" to "bool". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.

Check failure on line 184 in proto/connector_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "6" on message "GetEventStreamRequest" changed name from "common_param" to "is_source_job".
}

message GetEventStreamResponse {
Expand All @@ -202,7 +198,8 @@ message ValidateSourceRequest {
SourceType source_type = 2;
map<string, string> properties = 3;
TableSchema table_schema = 4;
SourceCommonParam common_param = 5;
bool is_source_job = 5;

Check failure on line 201 in proto/connector_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "5" with name "is_source_job" on message "ValidateSourceRequest" changed option "json_name" from "commonParam" to "isSourceJob".

Check failure on line 201 in proto/connector_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "5" on message "ValidateSourceRequest" changed type from "message" to "bool". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.

Check failure on line 201 in proto/connector_service.proto

View workflow job for this annotation

GitHub Actions / Check breaking changes in Protobuf files

Field "5" on message "ValidateSourceRequest" changed name from "common_param" to "is_source_job".
bool is_backfill_table = 6;
}

message ValidateSourceResponse {
Expand Down
9 changes: 3 additions & 6 deletions src/connector/src/source/cdc/enumerator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ use prost::Message;
use risingwave_common::util::addr::HostAddr;
use risingwave_jni_core::call_static_method;
use risingwave_jni_core::jvm_runtime::JVM;
use risingwave_pb::connector_service::{
SourceCommonParam, SourceType, ValidateSourceRequest, ValidateSourceResponse,
};
use risingwave_pb::connector_service::{SourceType, ValidateSourceRequest, ValidateSourceResponse};

use crate::error::ConnectorResult;
use crate::source::cdc::{
Expand Down Expand Up @@ -80,9 +78,8 @@ where
source_type: props.get_source_type_pb() as _,
properties: props.properties,
table_schema: Some(props.table_schema),
common_param: Some(SourceCommonParam {
is_multi_table_shared: props.is_multi_table_shared,
}),
is_source_job: props.is_cdc_source_job,
is_backfill_table: props.is_backfill_table,
};

let validate_source_request_bytes =
Expand Down
18 changes: 11 additions & 7 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,11 @@ pub struct CdcProperties<T: CdcSourceTypeTrait> {
/// Schema of the source specified by users
pub table_schema: TableSchema,

/// Whether the properties is shared by multiple tables
pub is_multi_table_shared: bool,
/// Whether it is created by a cdc source job
pub is_cdc_source_job: bool,

/// For validation purpose, mark if the table is a backfill cdc table
pub is_backfill_table: bool,

pub _phantom: PhantomData<T>,
}
Expand All @@ -96,14 +99,15 @@ impl<T: CdcSourceTypeTrait> TryFromHashmap for CdcProperties<T> {
properties: HashMap<String, String>,
_deny_unknown_fields: bool,
) -> ConnectorResult<Self> {
let is_multi_table_shared = properties
let is_share_source = properties
.get(CDC_SHARING_MODE_KEY)
.is_some_and(|v| v == "true");
Ok(CdcProperties {
properties,
table_schema: Default::default(),
// TODO(siyuan): use serde to deserialize input hashmap
is_multi_table_shared,
is_cdc_source_job: is_share_source,
is_backfill_table: false,
_phantom: PhantomData,
})
}
Expand Down Expand Up @@ -144,7 +148,7 @@ where
};
self.table_schema = table_schema;
if let Some(info) = source.info.as_ref() {
self.is_multi_table_shared = info.cdc_source_job;
self.is_cdc_source_job = info.cdc_source_job;
}
}

Expand All @@ -159,8 +163,8 @@ where

self.properties = properties;
self.table_schema = table_schema;
// properties are not shared, so mark it as false
self.is_multi_table_shared = false;
self.is_cdc_source_job = false;
self.is_backfill_table = true;
}
}

Expand Down
8 changes: 2 additions & 6 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
use risingwave_common::util::addr::HostAddr;
use risingwave_jni_core::jvm_runtime::JVM;
use risingwave_jni_core::{call_static_method, JniReceiverType, JniSenderType};
use risingwave_pb::connector_service::{
GetEventStreamRequest, GetEventStreamResponse, SourceCommonParam,
};
use risingwave_pb::connector_service::{GetEventStreamRequest, GetEventStreamResponse};
use thiserror_ext::AsReport;
use tokio::sync::mpsc;

Expand Down Expand Up @@ -106,9 +104,7 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
start_offset: split.start_offset().clone().unwrap_or_default(),
properties,
snapshot_done: split.snapshot_done(),
common_param: Some(SourceCommonParam {
is_multi_table_shared: conn_props.is_multi_table_shared,
}),
is_source_job: conn_props.is_cdc_source_job,
};

std::thread::spawn(move || {
Expand Down
Loading

0 comments on commit b526a06

Please sign in to comment.