Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(parser): redact SqlOption #14760

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/connector/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ rdkafka = { workspace = true, features = [
"gssapi",
"zstd",
] }
redact = { version = "0.1.5", features = ["serde"] }
redis = { version = "0.24.0", features = ["aio","tokio-comp","async-std-comp"] }
regex = "1.4"
reqwest = { version = "0.11", features = ["json"] }
Expand Down
8 changes: 5 additions & 3 deletions src/connector/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,16 @@ use aws_credential_types::provider::SharedCredentialsProvider;
use aws_types::region::Region;
use aws_types::SdkConfig;

use crate::source::SecretString;

/// A flatten config map for aws auth.
#[derive(Deserialize, Debug, Clone, WithOptions)]
pub struct AwsAuthProps {
pub region: Option<String>,
#[serde(alias = "endpoint_url")]
pub endpoint: Option<String>,
pub access_key: Option<String>,
pub secret_key: Option<String>,
pub secret_key: Option<SecretString>,
pub session_token: Option<String>,
pub arn: Option<String>,
/// This field was added for kinesis. Not sure if it's useful for other connectors.
Expand Down Expand Up @@ -95,7 +97,7 @@ impl AwsAuthProps {
Ok(SharedCredentialsProvider::new(
aws_credential_types::Credentials::from_keys(
self.access_key.as_ref().unwrap(),
self.secret_key.as_ref().unwrap(),
self.secret_key.as_ref().unwrap().expose_secret(),
self.session_token.clone(),
),
))
Expand Down Expand Up @@ -453,7 +455,7 @@ pub struct KinesisCommon {
rename = "aws.credentials.secret_access_key",
alias = "kinesis.credentials.secret"
)]
pub credentials_secret_access_key: Option<String>,
pub credentials_secret_access_key: Option<SecretString>,
#[serde(
rename = "aws.credentials.session_token",
alias = "kinesis.credentials.session_token"
Expand Down
34 changes: 34 additions & 0 deletions src/connector/src/source/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::fmt::{Display, Formatter};

use futures::{Stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use risingwave_common::error::RwError;
use serde::{Deserialize, Serialize, Serializer};

use crate::parser::ParserConfig;
use crate::source::{SourceContextRef, SourceMessage, SplitReader, StreamChunkWithState};
use crate::with_options::WithOptions;

pub(crate) trait CommonSplitReader: SplitReader + 'static {
fn into_data_stream(
Expand Down Expand Up @@ -74,3 +78,33 @@ pub(crate) async fn into_chunk_stream(
yield msg_batch?;
}
}

#[derive(Clone, Debug, Deserialize, PartialEq)]
pub struct SecretString(redact::Secret<String>);

impl Serialize for SecretString {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
format!("{:?}", self.0).serialize(serializer)
}
}

impl WithOptions for SecretString {}

impl SecretString {
pub fn expose_secret(&self) -> &str {
self.0.expose_secret()
}

pub fn new(s: impl Into<String>) -> Self {
Self(redact::Secret::new(s.into()))
}
}

impl Display for SecretString {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.0)
}
}
2 changes: 1 addition & 1 deletion src/connector/src/source/filesystem/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ pub mod file_common;
pub mod nd_streaming;
pub use file_common::{FsPage, FsPageItem, FsSplit, OpendalFsSplit};
pub mod opendal_source;
mod s3;
pub mod s3;
pub mod s3_v2;
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<Src: OpendalSource> OpendalEnumerator<Src> {
}

if let Some(secret) = s3_properties.secret {
builder.secret_access_key(&secret);
builder.secret_access_key(secret.expose_secret());
} else {
tracing::error!(
"secret access key of aws s3 is not set, bucket {}",
Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/source/filesystem/s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,17 @@ use std::collections::HashMap;

pub use enumerator::S3SplitEnumerator;
mod source;
use serde::Deserialize;
use serde::{Deserialize, Serialize};
pub use source::S3FileReader;

use crate::common::AwsAuthProps;
use crate::source::filesystem::FsSplit;
use crate::source::{SourceProperties, UnknownFields};
use crate::source::{SecretString, SourceProperties, UnknownFields};

pub const S3_CONNECTOR: &str = "s3";

/// These are supported by both `s3` and `s3_v2` (opendal) sources.
#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, with_options::WithOptions)]
pub struct S3PropertiesCommon {
#[serde(rename = "s3.region_name")]
pub region_name: String,
Expand All @@ -38,7 +38,7 @@ pub struct S3PropertiesCommon {
#[serde(rename = "s3.credentials.access", default)]
pub access: Option<String>,
#[serde(rename = "s3.credentials.secret", default)]
pub secret: Option<String>,
pub secret: Option<SecretString>,
#[serde(rename = "s3.endpoint_url")]
pub endpoint_url: Option<String>,
}
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub mod nats;
pub mod nexmark;
pub mod pulsar;
pub use base::{UPSTREAM_SOURCE_KEY, *};
pub use common::SecretString;
pub(crate) use common::*;
pub use google_pubsub::GOOGLE_PUBSUB_CONNECTOR;
pub use kafka::KAFKA_CONNECTOR;
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/pulsar/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ impl PulsarIcebergReader {
if let Some(secret_key) = &self.props.aws_auth_props.secret_key {
iceberg_configs.insert(
"iceberg.table.io.secret_access_key".to_string(),
secret_key.to_string(),
secret_key.expose_secret().to_owned(),
);
}

Expand Down
6 changes: 3 additions & 3 deletions src/connector/with_options_sink.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ BigQueryConfig:
field_type: String
required: false
- name: secret_key
field_type: String
field_type: SecretString
required: false
- name: session_token
field_type: String
Expand Down Expand Up @@ -324,7 +324,7 @@ KinesisSinkConfig:
required: false
alias: kinesis.credentials.access
- name: aws.credentials.secret_access_key
field_type: String
field_type: SecretString
required: false
alias: kinesis.credentials.secret
- name: aws.credentials.session_token
Expand Down Expand Up @@ -424,7 +424,7 @@ PulsarConfig:
field_type: String
required: false
- name: secret_key
field_type: String
field_type: SecretString
required: false
- name: session_token
field_type: String
Expand Down
8 changes: 4 additions & 4 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ KinesisProperties:
required: false
alias: kinesis.credentials.access
- name: aws.credentials.secret_access_key
field_type: String
field_type: SecretString
required: false
alias: kinesis.credentials.secret
- name: aws.credentials.session_token
Expand Down Expand Up @@ -447,7 +447,7 @@ OpendalS3Properties:
required: false
default: Default::default
- name: s3.credentials.secret
field_type: String
field_type: SecretString
required: false
default: Default::default
- name: s3.endpoint_url
Expand Down Expand Up @@ -535,7 +535,7 @@ PulsarProperties:
field_type: String
required: false
- name: secret_key
field_type: String
field_type: SecretString
required: false
- name: session_token
field_type: String
Expand Down Expand Up @@ -574,7 +574,7 @@ S3Properties:
required: false
default: Default::default
- name: s3.credentials.secret
field_type: String
field_type: SecretString
required: false
default: Default::default
- name: s3.endpoint_url
Expand Down
54 changes: 54 additions & 0 deletions src/frontend/src/handler/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,9 @@ mod tests {
use postgres_types::{ToSql, Type};
use risingwave_common::array::*;
use risingwave_common::types::Timestamptz;
use risingwave_connector::source::filesystem::s3::S3PropertiesCommon;
use risingwave_connector::source::SecretString;
use risingwave_sqlparser::ast::{Ident, ObjectName, SqlOption, Value};

use super::*;

Expand Down Expand Up @@ -441,4 +444,55 @@ mod tests {
"1969-12-31 23:59:59.999999+00:00"
);
}

fn to_object_name(s: &str) -> ObjectName {
ObjectName(vec![Ident::new_unchecked(s)])
}

#[test]
fn test_redact() {
use risingwave_sqlparser::ast::utils::SqlOptionVecSerializer;
use serde::Serialize;

let p = S3PropertiesCommon {
region_name: "region".to_string(),
bucket_name: "bucket".to_string(),
match_pattern: Some("pattern".into()),
access: None,
secret: Some(SecretString::new("123")),
endpoint_url: None,
};
let mut s = SqlOptionVecSerializer::default();
p.serialize(&mut s).unwrap();
let sql_options: Vec<SqlOption> = s.into();
assert_eq!(
sql_options,
vec![
SqlOption {
name: to_object_name("s3.region_name"),
value: Value::SingleQuotedString("region".into())
},
SqlOption {
name: to_object_name("s3.bucket_name"),
value: Value::SingleQuotedString("bucket".into())
},
SqlOption {
name: to_object_name("match_pattern"),
value: Value::SingleQuotedString("pattern".into())
},
SqlOption {
name: to_object_name("s3.credentials.access"),
value: Value::Null
},
SqlOption {
name: to_object_name("s3.credentials.secret"),
value: Value::SingleQuotedString("[REDACTED alloc::string::String]".into())
zwang28 marked this conversation as resolved.
Show resolved Hide resolved
},
SqlOption {
name: to_object_name("s3.endpoint_url"),
value: Value::Null
},
]
);
}
}
2 changes: 1 addition & 1 deletion src/sqlparser/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ normal = ["workspace-hack"]

[dependencies]
itertools = "0.12"
serde = { version = "1.0", features = ["derive"], optional = true }
serde = { version = "1.0", features = ["derive"] }
tracing = "0.1"

[target.'cfg(not(madsim))'.dependencies]
Expand Down
1 change: 1 addition & 0 deletions src/sqlparser/src/ast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod legacy_source;
mod operator;
mod query;
mod statement;
pub mod utils;
zwang28 marked this conversation as resolved.
Show resolved Hide resolved
mod value;

#[cfg(not(feature = "std"))]
Expand Down
Loading
Loading