Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(parser): redact SqlOption #14760

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions e2e_test/source/basic/pubsub.slt
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,47 @@ select v1, v2 FROM s2;
statement ok
DROP TABLE s2;

# redact pubsub.credentials
statement ok
CREATE TABLE s3 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-1',
pubsub.emulator_host = 'localhost:5980',
pubsub.credentials = '123456'
) FORMAT PLAIN ENCODE JSON;

# pubsub.credentials is not redacted for table owner.
query IT rowsort
SHOW CREATE TABLE s3;
----
public.s3 CREATE TABLE s3 (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'google_pubsub', pubsub.subscription = 'test-subscription-1', pubsub.emulator_host = 'localhost:5980', pubsub.credentials = '123456') FORMAT PLAIN ENCODE JSON

# pubsub.credentials is not redacted for table owner.
query IT rowsort
SELECT definition FROM rw_tables WHERE name='s3';
----
CREATE TABLE s3 (v1 INT, v2 CHARACTER VARYING) WITH (connector = 'google_pubsub', pubsub.subscription = 'test-subscription-1', pubsub.emulator_host = 'localhost:5980', pubsub.credentials = '123456') FORMAT PLAIN ENCODE JSON

statement ok
CREATE USER other_user;

statement ok
ALTER TABLE s3 owner to other_user;

# pubsub.credentials is redacted for non table owner.
query IT rowsort
SHOW CREATE TABLE s3;
----
public.s3 CREATE TABLE s3 (v1 INT, v2 CHARACTER VARYING) WITH (pubsub.subscription = 'test-subscription-1', pubsub.emulator_host = 'localhost:5980', pubsub.credentials = '[REDACTED]') FORMAT PLAIN ENCODE JSON

# pubsub.credentials is redacted for non table owner.
query IT rowsort
SELECT definition FROM rw_tables WHERE name='s3';
----
CREATE TABLE s3 (v1 INT, v2 CHARACTER VARYING) WITH (pubsub.subscription = 'test-subscription-1', pubsub.emulator_host = 'localhost:5980', pubsub.credentials = '[REDACTED]') FORMAT PLAIN ENCODE JSON

statement ok
DROP TABLE s3;

statement ok
DROP USER other_user;
1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ rdkafka = { workspace = true, features = [
"gssapi",
"zstd",
] }
redact = { version = "0.1.5", features = ["serde"] }
redis = { version = "0.24.0", features = ["aio","tokio-comp","async-std-comp"] }
regex = "1.4"
reqwest = { version = "0.11", features = ["json"] }
Expand Down
52 changes: 27 additions & 25 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params};
use pulsar::{Authentication, Pulsar, TokioExecutor};
use rdkafka::ClientConfig;
use risingwave_common::bail;
use serde_derive::Deserialize;
use serde_derive::{Deserialize, Serialize};
use serde_with::json::JsonString;
use serde_with::{serde_as, DisplayFromStr};
use tempfile::NamedTempFile;
Expand All @@ -44,7 +44,7 @@ use crate::source::nats::source::NatsOffset;
pub const PRIVATE_LINK_BROKER_REWRITE_MAP_KEY: &str = "broker.rewrite.endpoints";
pub const PRIVATE_LINK_TARGETS_KEY: &str = "privatelink.targets";

#[derive(Debug, Clone, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AwsPrivateLinkItem {
pub az_id: Option<String>,
pub port: u16,
Expand All @@ -56,14 +56,16 @@ use aws_credential_types::provider::SharedCredentialsProvider;
use aws_types::region::Region;
use aws_types::SdkConfig;

use crate::source::SecretString;

/// A flatten config map for aws auth.
#[derive(Deserialize, Debug, Clone, WithOptions)]
#[derive(Serialize, Deserialize, Debug, Clone, WithOptions)]
pub struct AwsAuthProps {
pub region: Option<String>,
#[serde(alias = "endpoint_url")]
pub endpoint: Option<String>,
pub access_key: Option<String>,
pub secret_key: Option<String>,
pub secret_key: Option<SecretString>,
pub session_token: Option<String>,
pub arn: Option<String>,
/// This field was added for kinesis. Not sure if it's useful for other connectors.
Expand Down Expand Up @@ -95,7 +97,7 @@ impl AwsAuthProps {
Ok(SharedCredentialsProvider::new(
aws_credential_types::Credentials::from_keys(
self.access_key.as_ref().unwrap(),
self.secret_key.as_ref().unwrap(),
self.secret_key.as_ref().unwrap().expose_secret(),
self.session_token.clone(),
),
))
Expand Down Expand Up @@ -141,7 +143,7 @@ impl AwsAuthProps {
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions)]
#[derive(Debug, Clone, Serialize, Deserialize, WithOptions)]
pub struct KafkaCommon {
#[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")]
pub brokers: String,
Expand Down Expand Up @@ -179,7 +181,7 @@ pub struct KafkaCommon {

/// Passphrase of client's private key.
#[serde(rename = "properties.ssl.key.password")]
ssl_key_password: Option<String>,
ssl_key_password: Option<SecretString>,

/// SASL mechanism if SASL is enabled. Currently support PLAIN, SCRAM and GSSAPI.
#[serde(rename = "properties.sasl.mechanism")]
Expand All @@ -191,7 +193,7 @@ pub struct KafkaCommon {

/// SASL password for SASL/PLAIN and SASL/SCRAM.
#[serde(rename = "properties.sasl.password")]
sasl_password: Option<String>,
sasl_password: Option<SecretString>,

/// Kafka server's Kerberos principal name under SASL/GSSAPI, not including /hostname@REALM.
#[serde(rename = "properties.sasl.kerberos.service.name")]
Expand Down Expand Up @@ -219,7 +221,7 @@ pub struct KafkaCommon {
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions)]
#[derive(Debug, Clone, Serialize, Deserialize, WithOptions)]
pub struct KafkaPrivateLinkCommon {
/// This is generated from `private_link_targets` and `private_link_endpoint` in frontend, instead of given by users.
#[serde(rename = "broker.rewrite.endpoints")]
Expand All @@ -232,7 +234,7 @@ const fn default_kafka_sync_call_timeout() -> Duration {
}

#[serde_as]
#[derive(Debug, Clone, Deserialize, WithOptions)]
#[derive(Debug, Clone, Serialize, Deserialize, WithOptions)]
pub struct RdKafkaPropertiesCommon {
/// Maximum Kafka protocol request message size. Due to differing framing overhead between
/// protocol versions the producer is unable to reliably enforce a strict max message limit at
Expand Down Expand Up @@ -302,7 +304,7 @@ impl KafkaCommon {
config.set("ssl.key.location", ssl_key_location);
}
if let Some(ssl_key_password) = self.ssl_key_password.as_ref() {
config.set("ssl.key.password", ssl_key_password);
config.set("ssl.key.password", ssl_key_password.expose_secret());
}
if let Some(ssl_endpoint_identification_algorithm) =
self.ssl_endpoint_identification_algorithm.as_ref()
Expand All @@ -324,7 +326,7 @@ impl KafkaCommon {
config.set("sasl.username", sasl_username);
}
if let Some(sasl_password) = self.sasl_password.as_ref() {
config.set("sasl.password", sasl_password);
config.set("sasl.password", sasl_password.expose_secret());
}

// SASL/GSSAPI
Expand Down Expand Up @@ -358,7 +360,7 @@ impl KafkaCommon {
}
}

#[derive(Clone, Debug, Deserialize, WithOptions)]
#[derive(Clone, Debug, Serialize, Deserialize, WithOptions)]
pub struct PulsarCommon {
#[serde(rename = "topic", alias = "pulsar.topic")]
pub topic: String,
Expand All @@ -367,10 +369,10 @@ pub struct PulsarCommon {
pub service_url: String,

#[serde(rename = "auth.token")]
pub auth_token: Option<String>,
pub auth_token: Option<SecretString>,
}

#[derive(Clone, Debug, Deserialize, WithOptions)]
#[derive(Clone, Debug, Serialize, Deserialize, WithOptions)]
pub struct PulsarOauthCommon {
#[serde(rename = "oauth.issuer.url")]
pub issuer_url: String,
Expand Down Expand Up @@ -440,7 +442,7 @@ impl PulsarCommon {
} else if let Some(auth_token) = &self.auth_token {
pulsar_builder = pulsar_builder.with_auth(Authentication {
name: "token".to_string(),
data: Vec::from(auth_token.as_str()),
data: Vec::from(auth_token.expose_secret()),
});
}

Expand All @@ -450,7 +452,7 @@ impl PulsarCommon {
}
}

#[derive(Deserialize, Debug, Clone, WithOptions)]
#[derive(Serialize, Deserialize, Debug, Clone, WithOptions)]
pub struct KinesisCommon {
#[serde(rename = "stream", alias = "kinesis.stream.name")]
pub stream_name: String,
Expand All @@ -467,7 +469,7 @@ pub struct KinesisCommon {
rename = "aws.credentials.secret_access_key",
alias = "kinesis.credentials.secret"
)]
pub credentials_secret_access_key: Option<String>,
pub credentials_secret_access_key: Option<SecretString>,
#[serde(
rename = "aws.credentials.session_token",
alias = "kinesis.credentials.session_token"
Expand Down Expand Up @@ -502,7 +504,7 @@ impl KinesisCommon {
Ok(KinesisClient::from_conf(builder.build()))
}
}
#[derive(Debug, Deserialize)]
#[derive(Debug, Serialize, Deserialize)]
pub struct UpsertMessage<'a> {
#[serde(borrow)]
pub primary_key: Cow<'a, [u8]>,
Expand All @@ -511,7 +513,7 @@ pub struct UpsertMessage<'a> {
}

#[serde_as]
#[derive(Deserialize, Debug, Clone, WithOptions)]
#[derive(Serialize, Deserialize, Debug, Clone, WithOptions)]
pub struct NatsCommon {
#[serde(rename = "server_url")]
pub server_url: String,
Expand All @@ -522,9 +524,9 @@ pub struct NatsCommon {
#[serde(rename = "username")]
pub user: Option<String>,
#[serde(rename = "password")]
pub password: Option<String>,
pub password: Option<SecretString>,
#[serde(rename = "jwt")]
pub jwt: Option<String>,
pub jwt: Option<SecretString>,
#[serde(rename = "nkey")]
pub nkey: Option<String>,
#[serde(rename = "max_bytes")]
Expand Down Expand Up @@ -552,8 +554,8 @@ impl NatsCommon {
if let (Some(v_user), Some(v_password)) =
(self.user.as_ref(), self.password.as_ref())
{
connect_options =
connect_options.user_and_password(v_user.into(), v_password.into())
connect_options = connect_options
.user_and_password(v_user.into(), v_password.expose_secret().into())
} else {
bail!("nats connect mode is user_and_password, but user or password is empty");
}
Expand All @@ -562,7 +564,7 @@ impl NatsCommon {
"credential" => {
if let (Some(v_nkey), Some(v_jwt)) = (self.nkey.as_ref(), self.jwt.as_ref()) {
connect_options = connect_options
.credentials(&self.create_credential(v_nkey, v_jwt)?)
.credentials(&self.create_credential(v_nkey, v_jwt.expose_secret())?)
.expect("failed to parse static creds")
} else {
bail!("nats connect mode is credential, but nkey or jwt is empty");
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ macro_rules! dispatch_source_prop {
#[macro_export]
macro_rules! impl_connector_properties {
({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {
#[derive(Clone, Debug)]
#[derive(Clone, Debug, serde::Serialize)]
pub enum ConnectorProperties {
$(
$variant_name(Box<$prop_name>),
Expand Down
6 changes: 5 additions & 1 deletion src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,17 +77,21 @@ impl CdcSourceType {
}
}

#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, serde::Serialize)]
pub struct CdcProperties<T: CdcSourceTypeTrait> {
/// Properties specified in the WITH clause by user
#[serde(flatten)]
pub properties: HashMap<String, String>,

/// Schema of the source specified by users
#[serde(skip)]
pub table_schema: TableSchema,

/// Whether the properties is shared by multiple tables
#[serde(skip)]
pub is_multi_table_shared: bool,

#[serde(skip)]
pub _phantom: PhantomData<T>,
}

Expand Down
34 changes: 34 additions & 0 deletions src/connector/src/source/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{Display, Formatter};

use futures::{Stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::array::StreamChunk;
use serde::{Deserialize, Serialize, Serializer};

use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::ParserConfig;
use crate::source::{SourceContextRef, SourceMessage, SplitReader};
use crate::with_options::WithOptions;

pub(crate) trait CommonSplitReader: SplitReader + 'static {
fn into_data_stream(self) -> impl Stream<Item = ConnectorResult<Vec<SourceMessage>>> + Send;
Expand Down Expand Up @@ -87,3 +91,33 @@ pub(crate) async fn into_chunk_stream(
yield msg_batch?;
}
}

#[derive(Clone, Debug, Deserialize, PartialEq)]
pub struct SecretString(redact::Secret<String>);

impl Serialize for SecretString {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
"[REDACTED]".serialize(serializer)
}
}

impl WithOptions for SecretString {}

impl SecretString {
pub fn expose_secret(&self) -> &str {
self.0.expose_secret()
}

pub fn new(s: impl Into<String>) -> Self {
Self(redact::Secret::new(s.into()))
}
}

impl Display for SecretString {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "[REDACTED]")
}
}
4 changes: 2 additions & 2 deletions src/connector/src/source/datagen/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub mod split;
use std::collections::HashMap;

pub use enumerator::*;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DisplayFromStr};
pub use source::*;
pub use split::*;
Expand All @@ -29,7 +29,7 @@ use crate::source::SourceProperties;
pub const DATAGEN_CONNECTOR: &str = "datagen";

#[serde_as]
#[derive(Clone, Debug, Deserialize, with_options::WithOptions)]
#[derive(Clone, Debug, Serialize, Deserialize, with_options::WithOptions)]
pub struct DatagenProperties {
/// split_num means data source partition
#[serde(rename = "datagen.split.num")]
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ pub mod file_common;
pub mod nd_streaming;
pub use file_common::{FsPage, FsPageItem, FsSplit, OpendalFsSplit};
pub mod opendal_source;
mod s3;
pub mod s3;
pub mod s3_v2;
Loading
Loading