Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cdc): fix privilege check for shared mysql source #15395

Merged
merged 9 commits into from
Mar 11, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor cdc privilege check for share source and backfill table
StrikeW committed Mar 6, 2024
commit b526a062876190ef86a348fe89fc9ac2a7a73085
Original file line number Diff line number Diff line change
@@ -84,25 +84,24 @@ static void ensureRequiredProps(Map<String, String> props, boolean isMultiTableS
public static void validateSource(ConnectorServiceProto.ValidateSourceRequest request)
throws Exception {
var props = request.getPropertiesMap();
var commonParam = request.getCommonParam();
boolean isMultiTableShared = commonParam.getIsMultiTableShared();

boolean isCdcSourceJob = request.getIsSourceJob();

TableSchema tableSchema = TableSchema.fromProto(request.getTableSchema());
switch (request.getSourceType()) {
case POSTGRES:
ensureRequiredProps(props, isMultiTableShared);
ensureRequiredProps(props, isCdcSourceJob);
ensurePropNotBlank(props, DbzConnectorConfig.PG_SCHEMA_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_SLOT_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_PUB_CREATE);
try (var validator =
new PostgresValidator(props, tableSchema, isMultiTableShared)) {
try (var validator = new PostgresValidator(props, tableSchema, isCdcSourceJob)) {
validator.validateAll();
}
break;

case CITUS:
ensureRequiredProps(props, isMultiTableShared);
ensureRequiredProps(props, isCdcSourceJob);
ensurePropNotBlank(props, DbzConnectorConfig.TABLE_NAME);
ensurePropNotBlank(props, DbzConnectorConfig.PG_SCHEMA_NAME);
try (var coordinatorValidator = new CitusValidator(props, tableSchema)) {
@@ -131,9 +130,9 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re

break;
case MYSQL:
ensureRequiredProps(props, isMultiTableShared);
ensureRequiredProps(props, isCdcSourceJob);
ensurePropNotBlank(props, DbzConnectorConfig.MYSQL_SERVER_ID);
try (var validator = new MySqlValidator(props, tableSchema, isMultiTableShared)) {
try (var validator = new MySqlValidator(props, tableSchema, isCdcSourceJob)) {
validator.validateAll();
}
break;
Original file line number Diff line number Diff line change
@@ -19,9 +19,9 @@ public abstract class DatabaseValidator {
public void validateAll() {
validateDbConfig();
validateUserPrivilege();
// If the source connector is shared by multiple tables, it will capture events from
// If the source connector is created by share source, it will capture events from
// multiple tables, skip validate its schema
if (!isMultiTableShared()) {
if (!isCdcSourceJob()) {
validateTable();
}
}
@@ -35,5 +35,5 @@ public void validateAll() {
/** Validate the properties of the source table */
abstract void validateTable();

abstract boolean isMultiTableShared();
abstract boolean isCdcSourceJob();
}
Original file line number Diff line number Diff line change
@@ -115,7 +115,7 @@ public DbzConnectorConfig(
String startOffset,
Map<String, String> userProps,
boolean snapshotDone,
boolean isMultiTableShared) {
boolean isCdcSourceJob) {

StringSubstitutor substitutor = new StringSubstitutor(userProps);
var dbzProps = initiateDbConfig(DBZ_CONFIG_FILE, substitutor);
@@ -124,13 +124,13 @@ public DbzConnectorConfig(
&& userProps.get(SNAPSHOT_MODE_KEY).equals(SNAPSHOT_MODE_BACKFILL);

LOG.info(
"DbzConnectorConfig: source={}, sourceId={}, startOffset={}, snapshotDone={}, isCdcBackfill={}, isMultiTableShared={}",
"DbzConnectorConfig: source={}, sourceId={}, startOffset={}, snapshotDone={}, isCdcBackfill={}, isCdcSourceJob={}",
source,
sourceId,
startOffset,
snapshotDone,
isCdcBackfill,
isMultiTableShared);
isCdcSourceJob);

if (source == SourceTypeE.MYSQL) {
var mysqlProps = initiateDbConfig(MYSQL_CONFIG_FILE, substitutor);
@@ -196,7 +196,7 @@ public DbzConnectorConfig(

dbzProps.putAll(postgresProps);

if (isMultiTableShared) {
if (isCdcSourceJob) {
// remove table filtering for the shared Postgres source, since we
// allow user to ingest tables in different schemas
LOG.info("Disable table filtering for the shared Postgres source");
Original file line number Diff line number Diff line change
@@ -55,7 +55,7 @@ void validateTable() {
}

@Override
boolean isMultiTableShared() {
boolean isCdcSourceJob() {
return false;
}
}
Original file line number Diff line number Diff line change
@@ -32,10 +32,14 @@ public class MySqlValidator extends DatabaseValidator implements AutoCloseable {

private final Connection jdbcConnection;

private final boolean isMultiTableShared;
private final boolean isCdcSourceJob;
private final boolean isBackfillTable;

public MySqlValidator(
Map<String, String> userProps, TableSchema tableSchema, boolean isMultiTableShared)
Map<String, String> userProps,
TableSchema tableSchema,
boolean isCdcSourceJob,
boolean isBackfillTable)
throws SQLException {
this.userProps = userProps;
this.tableSchema = tableSchema;
@@ -48,7 +52,8 @@ public MySqlValidator(
var user = userProps.get(DbzConnectorConfig.USER);
var password = userProps.get(DbzConnectorConfig.PASSWORD);
this.jdbcConnection = DriverManager.getConnection(jdbcUrl, user, password);
this.isMultiTableShared = isMultiTableShared;
this.isCdcSourceJob = isCdcSourceJob;
this.isBackfillTable = isBackfillTable;
}

@Override
@@ -115,8 +120,8 @@ public void validateTable() {
}

@Override
boolean isMultiTableShared() {
return isMultiTableShared;
boolean isCdcSourceJob() {
return isCdcSourceJob;
}

private void validateTableSchema() throws SQLException {
@@ -181,10 +186,7 @@ private void validateTableSchema() throws SQLException {
}

private void validatePrivileges() throws SQLException {
String[] privilegesRequired = {
"SELECT", "RELOAD", "REPLICATION SLAVE", "REPLICATION CLIENT",
};

String[] privilegesRequired = getRequiredPrivileges();
var hashSet = new HashSet<>(List.of(privilegesRequired));
try (var stmt = jdbcConnection.createStatement()) {
var res = stmt.executeQuery(ValidatorUtils.getSql("mysql.grants"));
@@ -208,6 +210,20 @@ private void validatePrivileges() throws SQLException {
}
}

private String[] getRequiredPrivileges() {
if (isCdcSourceJob) {
return new String[] {"SELECT", "REPLICATION SLAVE", "REPLICATION CLIENT"};
} else if (isBackfillTable) {
// check privilege again to ensure the user has the privilege for backfill
return new String[] {"SELECT", "REPLICATION SLAVE", "REPLICATION CLIENT"};
} else {
// dedicated source needs more privileges to acquire global lock
return new String[] {
"SELECT", "RELOAD", "SHOW DATABASES", "REPLICATION SLAVE", "REPLICATION CLIENT"
};
}
}

@Override
public void close() throws Exception {
if (null != jdbcConnection) {
Original file line number Diff line number Diff line change
@@ -47,10 +47,14 @@ public class PostgresValidator extends DatabaseValidator implements AutoCloseabl

// Whether the properties to validate is shared by multiple tables.
// If true, we will skip validation check for table
private final boolean isMultiTableShared;
private final boolean isCdcSourceJob;
private final boolean isBackfillTable;

public PostgresValidator(
Map<String, String> userProps, TableSchema tableSchema, boolean isMultiTableShared)
Map<String, String> userProps,
TableSchema tableSchema,
boolean isCdcSourceJob,
boolean isBackfillTable)
throws SQLException {
this.userProps = userProps;
this.tableSchema = tableSchema;
@@ -74,7 +78,8 @@ public PostgresValidator(

this.pubAutoCreate =
userProps.get(DbzConnectorConfig.PG_PUB_CREATE).equalsIgnoreCase("true");
this.isMultiTableShared = isMultiTableShared;
this.isCdcSourceJob = isCdcSourceJob;
this.isBackfillTable = isBackfillTable;
}

@Override
@@ -137,8 +142,8 @@ public void validateTable() {
}

@Override
boolean isMultiTableShared() {
return isMultiTableShared;
boolean isCdcSourceJob() {
return isCdcSourceJob;
}

/** For Citus which is a distributed version of PG */
@@ -157,7 +162,7 @@ public void validateDistributedTable() throws SQLException {
}

private void validateTableSchema() throws SQLException {
if (isMultiTableShared) {
if (isCdcSourceJob) {
return;
}
try (var stmt = jdbcConnection.prepareStatement(ValidatorUtils.getSql("postgres.table"))) {
@@ -285,7 +290,8 @@ private void validatePrivileges() throws SQLException {
}

private void validateTablePrivileges(boolean isSuperUser) throws SQLException {
if (isSuperUser || isMultiTableShared) {
// cdc source job doesn't have table schema to validate, since its schema is fixed to jsonb
if (isSuperUser || isCdcSourceJob) {
return;
}

@@ -339,9 +345,9 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException
}
}

// If the source properties is shared by multiple tables, skip the following
// If the source properties is created by share source, skip the following
// check of publication
if (isMultiTableShared) {
if (isCdcSourceJob) {
return;
}

@@ -424,7 +430,7 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException
}

private void validatePublicationPrivileges() throws SQLException {
if (isMultiTableShared) {
if (isCdcSourceJob) {
throw ValidatorUtils.invalidArgument(
"The connector properties is shared by multiple tables unexpectedly");
}
@@ -496,9 +502,9 @@ private void validatePublicationPrivileges() throws SQLException {
}

protected void alterPublicationIfNeeded() throws SQLException {
if (isMultiTableShared) {
if (isCdcSourceJob) {
throw ValidatorUtils.invalidArgument(
"The connector properties is shared by multiple tables unexpectedly");
"The connector properties is created by a shared source unexpectedly");
}

String alterPublicationSql =
9 changes: 3 additions & 6 deletions proto/connector_service.proto
Original file line number Diff line number Diff line change
@@ -175,17 +175,13 @@
MONGODB = 4;
}

message SourceCommonParam {
bool is_multi_table_shared = 1;
}

message GetEventStreamRequest {
uint64 source_id = 1;
SourceType source_type = 2;
string start_offset = 3;
map<string, string> properties = 4;
bool snapshot_done = 5;
SourceCommonParam common_param = 6;
bool is_source_job = 6;

Check failure on line 184 in proto/connector_service.proto

GitHub Actions / Check breaking changes in Protobuf files

Field "6" with name "is_source_job" on message "GetEventStreamRequest" changed option "json_name" from "commonParam" to "isSourceJob".

Check failure on line 184 in proto/connector_service.proto

GitHub Actions / Check breaking changes in Protobuf files

Field "6" on message "GetEventStreamRequest" changed type from "message" to "bool". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.

Check failure on line 184 in proto/connector_service.proto

GitHub Actions / Check breaking changes in Protobuf files

Field "6" on message "GetEventStreamRequest" changed name from "common_param" to "is_source_job".
}

message GetEventStreamResponse {
@@ -202,7 +198,8 @@
SourceType source_type = 2;
map<string, string> properties = 3;
TableSchema table_schema = 4;
SourceCommonParam common_param = 5;
bool is_source_job = 5;

Check failure on line 201 in proto/connector_service.proto

GitHub Actions / Check breaking changes in Protobuf files

Field "5" with name "is_source_job" on message "ValidateSourceRequest" changed option "json_name" from "commonParam" to "isSourceJob".

Check failure on line 201 in proto/connector_service.proto

GitHub Actions / Check breaking changes in Protobuf files

Field "5" on message "ValidateSourceRequest" changed type from "message" to "bool". See https://developers.google.com/protocol-buffers/docs/proto3#updating for wire compatibility rules and https://developers.google.com/protocol-buffers/docs/proto3#json for JSON compatibility rules.

Check failure on line 201 in proto/connector_service.proto

GitHub Actions / Check breaking changes in Protobuf files

Field "5" on message "ValidateSourceRequest" changed name from "common_param" to "is_source_job".
bool is_backfill_table = 6;
}

message ValidateSourceResponse {
9 changes: 3 additions & 6 deletions src/connector/src/source/cdc/enumerator/mod.rs
Original file line number Diff line number Diff line change
@@ -23,9 +23,7 @@ use prost::Message;
use risingwave_common::util::addr::HostAddr;
use risingwave_jni_core::call_static_method;
use risingwave_jni_core::jvm_runtime::JVM;
use risingwave_pb::connector_service::{
SourceCommonParam, SourceType, ValidateSourceRequest, ValidateSourceResponse,
};
use risingwave_pb::connector_service::{SourceType, ValidateSourceRequest, ValidateSourceResponse};

use crate::error::ConnectorResult;
use crate::source::cdc::{
@@ -80,9 +78,8 @@ where
source_type: props.get_source_type_pb() as _,
properties: props.properties,
table_schema: Some(props.table_schema),
common_param: Some(SourceCommonParam {
is_multi_table_shared: props.is_multi_table_shared,
}),
is_source_job: props.is_cdc_source_job,
is_backfill_table: props.is_backfill_table,
};

let validate_source_request_bytes =
18 changes: 11 additions & 7 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
@@ -85,8 +85,11 @@ pub struct CdcProperties<T: CdcSourceTypeTrait> {
/// Schema of the source specified by users
pub table_schema: TableSchema,

/// Whether the properties is shared by multiple tables
pub is_multi_table_shared: bool,
/// Whether it is created by a cdc source job
pub is_cdc_source_job: bool,

/// For validation purpose, mark if the table is a backfill cdc table
pub is_backfill_table: bool,

pub _phantom: PhantomData<T>,
}
@@ -96,14 +99,15 @@ impl<T: CdcSourceTypeTrait> TryFromHashmap for CdcProperties<T> {
properties: HashMap<String, String>,
_deny_unknown_fields: bool,
) -> ConnectorResult<Self> {
let is_multi_table_shared = properties
let is_share_source = properties
.get(CDC_SHARING_MODE_KEY)
.is_some_and(|v| v == "true");
Ok(CdcProperties {
properties,
table_schema: Default::default(),
// TODO(siyuan): use serde to deserialize input hashmap
is_multi_table_shared,
is_cdc_source_job: is_share_source,
is_backfill_table: false,
_phantom: PhantomData,
})
}
@@ -144,7 +148,7 @@ where
};
self.table_schema = table_schema;
if let Some(info) = source.info.as_ref() {
self.is_multi_table_shared = info.cdc_source_job;
self.is_cdc_source_job = info.cdc_source_job;
}
}

@@ -159,8 +163,8 @@ where

self.properties = properties;
self.table_schema = table_schema;
// properties are not shared, so mark it as false
self.is_multi_table_shared = false;
self.is_cdc_source_job = false;
self.is_backfill_table = true;
}
}

8 changes: 2 additions & 6 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
@@ -24,9 +24,7 @@ use risingwave_common::metrics::GLOBAL_ERROR_METRICS;
use risingwave_common::util::addr::HostAddr;
use risingwave_jni_core::jvm_runtime::JVM;
use risingwave_jni_core::{call_static_method, JniReceiverType, JniSenderType};
use risingwave_pb::connector_service::{
GetEventStreamRequest, GetEventStreamResponse, SourceCommonParam,
};
use risingwave_pb::connector_service::{GetEventStreamRequest, GetEventStreamResponse};
use thiserror_ext::AsReport;
use tokio::sync::mpsc;

@@ -106,9 +104,7 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
start_offset: split.start_offset().clone().unwrap_or_default(),
properties,
snapshot_done: split.snapshot_done(),
common_param: Some(SourceCommonParam {
is_multi_table_shared: conn_props.is_multi_table_shared,
}),
is_source_job: conn_props.is_cdc_source_job,
};

std::thread::spawn(move || {
10 changes: 6 additions & 4 deletions src/rpc_client/src/connector_client.rs
Original file line number Diff line number Diff line change
@@ -203,7 +203,7 @@ impl ConnectorClient {
start_offset: Option<String>,
properties: HashMap<String, String>,
snapshot_done: bool,
common_param: SourceCommonParam,
is_source_job: bool,
) -> Result<Streaming<GetEventStreamResponse>> {
Ok(self
.rpc_client
@@ -214,7 +214,7 @@ impl ConnectorClient {
start_offset: start_offset.unwrap_or_default(),
properties,
snapshot_done,
common_param: Some(common_param),
is_source_job,
})
.await
.inspect_err(|err| {
@@ -234,7 +234,8 @@ impl ConnectorClient {
source_type: SourceType,
properties: HashMap<String, String>,
table_schema: Option<TableSchema>,
common_param: SourceCommonParam,
is_source_job: bool,
is_backfill_table: bool,
) -> Result<()> {
let response = self
.rpc_client
@@ -244,7 +245,8 @@ impl ConnectorClient {
source_type: source_type as _,
properties,
table_schema,
common_param: Some(common_param),
is_source_job,
is_backfill_table,
})
.await
.inspect_err(|err| {